Skip to content

Overlay Task

rfsoc_rfdc.overlay_task

Classes

OverlayTask

Bases: ABC

An abstract base class for creating tasks to be run on an RFSoCOverlay.

Attributes:

Name Type Description
ol RFSoCOverlay

An instance of RFSoCOverlay to operate on.

task_name str

The name of the task.

thread Thread

The thread on which the task runs.

Methods:

Name Description
run

An abstract method to define the task's behavior.

start

Starts the task's thread.

join

Waits for the task's thread to complete.

Source code in rfsoc_rfdc/overlay_task.py
class OverlayTask(ABC):
    """
    An abstract base class for creating tasks to be run on an RFSoCOverlay.

    Attributes:
        ol (RFSoCOverlay): An instance of RFSoCOverlay to operate on.
        task_name (str): The name of the task.
        thread (threading.Thread): The thread on which the task runs.

    Methods:
        run: An abstract method to define the task's behavior.
        start: Starts the task's thread.
        join: Waits for the task's thread to complete.
    """

    def __init__(self, overlay, name="OverlayTask"):
        """
        Initializes the OverlayTask with a given RFSoCOverlay instance and task name.

        Args:
            overlay (RFSoCOverlay): The RFSoCOverlay instance to operate on.
            name (str): The name of the task. Defaults to "OverlayTask".

        Raises:
            TypeError: If the overlay is not an instance of RFSoCOverlay.
        """
        if not isinstance(overlay, RFSoCOverlay):
            raise TypeError("This task is not an RFSoCOverlay instance.")
        self.ol = overlay
        self.task_name = name
        self.task_state = TASK_STATE['IDLE']
        self._stop_event = threading.Event()
        self._pause_event = threading.Event()
        self.thread = threading.Thread(target=self.run)

    @abstractmethod
    def run(self):
        """
        Abstract method that defines the task's behavior. Must be implemented by subclasses.
        """
        pass

    def start(self):
        """Starts the task's thread, causing it to run concurrently"""
        self._stop_event.clear()
        self._pause_event.set()
        self.task_state = TASK_STATE['RUNNING']
        self.thread.start()

    def pause(self):
        """Pause task execution by changing task state"""
        self._pause_event.clear()
        self.task_state = TASK_STATE['PAUSE']

    def resume(self):
        """Resume task execution by changing task state"""
        self._pause_event.set()
        self.task_state = TASK_STATE['RUNNING']

    def stop(self):
        """Stop task execution"""
        self._stop_event.set()
        self._pause_event.set()  # Unblock if paused
        self.task_state = TASK_STATE['STOP']
        self.thread.join()

    def join(self):
        """Blocks until the task's thread terminates"""
        self.thread.join()
Functions
run() abstractmethod

Abstract method that defines the task's behavior. Must be implemented by subclasses.

Source code in rfsoc_rfdc/overlay_task.py
@abstractmethod
def run(self):
    """
    Abstract method that defines the task's behavior. Must be implemented by subclasses.
    """
    pass
start()

Starts the task's thread, causing it to run concurrently

Source code in rfsoc_rfdc/overlay_task.py
def start(self):
    """Starts the task's thread, causing it to run concurrently"""
    self._stop_event.clear()
    self._pause_event.set()
    self.task_state = TASK_STATE['RUNNING']
    self.thread.start()
pause()

Pause task execution by changing task state

Source code in rfsoc_rfdc/overlay_task.py
def pause(self):
    """Pause task execution by changing task state"""
    self._pause_event.clear()
    self.task_state = TASK_STATE['PAUSE']
resume()

Resume task execution by changing task state

Source code in rfsoc_rfdc/overlay_task.py
def resume(self):
    """Resume task execution by changing task state"""
    self._pause_event.set()
    self.task_state = TASK_STATE['RUNNING']
stop()

Stop task execution

Source code in rfsoc_rfdc/overlay_task.py
def stop(self):
    """Stop task execution"""
    self._stop_event.set()
    self._pause_event.set()  # Unblock if paused
    self.task_state = TASK_STATE['STOP']
    self.thread.join()
join()

Blocks until the task's thread terminates

Source code in rfsoc_rfdc/overlay_task.py
def join(self):
    """Blocks until the task's thread terminates"""
    self.thread.join()

BlinkLedTask

Bases: OverlayTask

A task that blinks LEDs on an RFSoCOverlay.

Inherits from OverlayTask.

Attributes:

Name Type Description
green_leds AxiGPIO

AxiGPIO instance for controlling green LEDs.

red_leds AxiGPIO

AxiGPIO instance for controlling red LEDs.

Methods:

Name Description
run

Implements the LED blinking behavior.

Source code in rfsoc_rfdc/overlay_task.py
class BlinkLedTask(OverlayTask):
    """
    A task that blinks LEDs on an RFSoCOverlay.

    Inherits from OverlayTask.

    Attributes:
        green_leds (AxiGPIO): AxiGPIO instance for controlling green LEDs.
        red_leds (AxiGPIO): AxiGPIO instance for controlling red LEDs.

    Methods:
        run: Implements the LED blinking behavior.
    """

    def __init__(self, overlay):
        """
        Initializes the BlinkLedTask with a given RFSoCOverlay instance.

        Args:
            overlay (RFSoCOverlay): The RFSoCOverlay instance to operate on.
        """
        super().__init__(overlay, name="BlinkLedTask")
        self.green_leds = AxiGPIO(self.ol.ip_dict['axi_gpio_led']).channel1
        self.red_leds = AxiGPIO(self.ol.ip_dict['axi_gpio_led']).channel2
        for leds in [self.red_leds, self.green_leds]:
            leds.setdirection("out")
            leds.setlength(8)

    def run(self):
        """
        Runs the LED blinking task. Alternates the LEDs between on and off states at a fixed interval.
        """
        interval = 0.3
        while not self._stop_event.is_set():
            self._pause_event.wait()
            if self._stop_event.is_set():
                break
            self.green_leds.write(0xff, 0xff)  # Turn on all green LEDs
            time.sleep(interval)
            self.green_leds.write(0x00, 0xff)  # Turn off all green LEDs
            time.sleep(interval)
Functions
run()

Runs the LED blinking task. Alternates the LEDs between on and off states at a fixed interval.

Source code in rfsoc_rfdc/overlay_task.py
def run(self):
    """
    Runs the LED blinking task. Alternates the LEDs between on and off states at a fixed interval.
    """
    interval = 0.3
    while not self._stop_event.is_set():
        self._pause_event.wait()
        if self._stop_event.is_set():
            break
        self.green_leds.write(0xff, 0xff)  # Turn on all green LEDs
        time.sleep(interval)
        self.green_leds.write(0x00, 0xff)  # Turn off all green LEDs
        time.sleep(interval)