Skip to content

RX Analyzer Drivers

rfsoc_rfdc.receiver.rx_analyzer.drivers.base_driver

Classes

BaseDriver

Bases: ABC

Base class for common driver functionality

Source code in rfsoc_rfdc/receiver/rx_analyzer/drivers/base_driver.py
class BaseDriver(ABC):
    """Base class for common driver functionality"""

    def __init__(self, channel_id):
        self.thd_list = []
        self.channel_id = channel_id

    def _run_thds(self, task_list=None):
        """Common thread processing"""
        target_list = task_list if task_list is not None else self.thd_list

        for thd in target_list:
            thd.start()
        for thd in target_list:
            thd.join()

        if task_list is None:
            self.thd_list.clear()

    def finalize(self):
        """Wait for all async file writes to complete before exiting."""
        _sample_logger.wait_for_writes()

    @abstractmethod
    def proc_rx(self, data):
        pass
Functions
finalize()

Wait for all async file writes to complete before exiting.

Source code in rfsoc_rfdc/receiver/rx_analyzer/drivers/base_driver.py
def finalize(self):
    """Wait for all async file writes to complete before exiting."""
    _sample_logger.wait_for_writes()

rfsoc_rfdc.receiver.rx_analyzer.drivers.real2iq_driver

Classes

Real2IqDriver

Bases: BaseDriver

Driver for Real to IQ conversion using injected Pipeline

Source code in rfsoc_rfdc/receiver/rx_analyzer/drivers/real2iq_driver.py
class Real2IqDriver(BaseDriver):
    """Driver for Real to IQ conversion using injected Pipeline"""

    def __init__(self, pipeline, channel_id=0):
        super().__init__(channel_id)
        self.pipeline = pipeline

    def proc_rx(self, *data):
        # Delegate processing to pipeline, providing the thread runner callback
        return self.pipeline.process(*data, self._run_thds)

    def close(self):
        if hasattr(self.pipeline, 'close'):
            self.pipeline.close()

    def __del__(self):
        self.close()

rfsoc_rfdc.receiver.rx_analyzer.drivers.real2real_driver

Classes

Real2RealDriver

Bases: BaseDriver

Driver for Real to Real conversion using injected Pipeline

Source code in rfsoc_rfdc/receiver/rx_analyzer/drivers/real2real_driver.py
class Real2RealDriver(BaseDriver):
    """Driver for Real to Real conversion using injected Pipeline"""

    def __init__(self, pipeline, channel_id):
        super().__init__(channel_id)
        self.pipeline = pipeline

    def proc_rx(self, data):
        # Delegate processing to pipeline, providing the thread runner callback
        # Real2RealPipeline returns (None, None, None, None) for snr, cfo, evm, ber
        return self.pipeline.process(data, self._run_thds)

    def close(self):
        if hasattr(self.pipeline, 'close'):
            self.pipeline.close()

    def __del__(self):
        self.close()