Skip to content

RX Analyzer Pipelines

rfsoc_rfdc.receiver.rx_analyzer.pipelines.base_pipeline

Classes

BasePipeline

Bases: ABC

Abstract base class for DSP pipelines

Source code in rfsoc_rfdc/receiver/rx_analyzer/pipelines/base_pipeline.py
class BasePipeline(ABC):
    """Abstract base class for DSP pipelines"""

    def __init__(self, channel_id=-1):
        self.channel_id = channel_id

    @abstractmethod
    def process(self, data, run_async_func):
        """
        Process received data.

        Args:
            data: Input data (numpy array)
            run_async_func: Function to execute async tasks (threads). 
                            Signature: run_async_func(task_list)

        Returns:
            metrics tuple (snr, cfo, evm, ber, etc.)
        """
        pass
Functions
process(data, run_async_func) abstractmethod

Process received data.

Parameters:

Name Type Description Default
data

Input data (numpy array)

required
run_async_func

Function to execute async tasks (threads). Signature: run_async_func(task_list)

required

Returns:

Type Description

metrics tuple (snr, cfo, evm, ber, etc.)

Source code in rfsoc_rfdc/receiver/rx_analyzer/pipelines/base_pipeline.py
@abstractmethod
def process(self, data, run_async_func):
    """
    Process received data.

    Args:
        data: Input data (numpy array)
        run_async_func: Function to execute async tasks (threads). 
                        Signature: run_async_func(task_list)

    Returns:
        metrics tuple (snr, cfo, evm, ber, etc.)
    """
    pass

rfsoc_rfdc.receiver.rx_analyzer.pipelines.ofdm_pipeline

Classes

OfdmPipeline

Bases: BasePipeline

Pipeline for OFDM Signal Processing

Source code in rfsoc_rfdc/receiver/rx_analyzer/pipelines/ofdm_pipeline.py
class OfdmPipeline(BasePipeline):
    """Pipeline for OFDM Signal Processing"""

    def __init__(self, channel_id, ofdm_scheme, detect_scheme):
        super().__init__(channel_id)
        path2wave = str(detect_scheme.logger.get_log_dir())

        self.waveform_visualizer = TimeDomainVisualizer(
            channel_id, mode="real2iq")
        # self.packet_visualizer = TimeDomainPacketVisualizer(
        #     channel_id)
        # self.spectrum_visualizer = SpectrumVisualizer(channel_id)
        self.ofdm_calculator = OfdmMetricsCalculator(channel_id,
                                                     ofdm_scheme, detect_scheme)  # Pass detect_scheme if needed by calculator

    def process(self, data, run_async_func):
        thd_list = []
        thd_list.append(self.waveform_visualizer.plot_thd(data))
        # thd_list.append(self.waveform_visualizer.io_logging_thd(data))

        # Run initial visualization threads
        run_async_func(thd_list)

        raw_iq = data

        try:
            rx_packet, snr, cfo, evm, ber = self.ofdm_calculator.analyze_packet(
                raw_iq)
        except Exception as e:
            logging.error(
                f"Rx #{self.channel_id} Failed to decode Rx packet: {e}")
            return np.nan, np.nan, np.nan, np.nan

        # # Visualization for decoded packet
        # if rx_packet is not None:
        #     thd_list = []
        #     thd_list.append(self.packet_visualizer.plot_thd(rx_packet))
        #     thd_list.append(self.packet_visualizer.io_logging_thd(rx_packet))
        #     # thd_list.append(self.spectrum_visualizer.plot_thd(rx_packet))

        #     run_async_func(thd_list)

        return snr, cfo, evm, ber

    def close(self):
        self.waveform_visualizer.close()
        # self.packet_visualizer.close()
        # self.spectrum_visualizer.close()

    def __del__(self):
        self.close()

rfsoc_rfdc.receiver.rx_analyzer.pipelines.mimo_pipeline

Classes

MimoPipeline

Bases: BasePipeline

Pipeline for MIMO Processing

Source code in rfsoc_rfdc/receiver/rx_analyzer/pipelines/mimo_pipeline.py
class MimoPipeline(BasePipeline):
    """Pipeline for MIMO Processing"""

    def __init__(self, detection_scheme, rx_ant_count):
        super().__init__(channel_id=-1)
        self.channel_count = rx_ant_count
        self.time_visualizer_list = [TimeDomainVisualizer(channel_id=id,
                                                          mode="real2iq")
                                     for id in range(self.channel_count)]
        self.mimo_detection = detection_scheme

    def process(self, data, run_async_func):
        """
        Args:
            data: Multi-channel data array
        """
        thd_list = []
        for id in range(self.channel_count):
            thd_list.append(
                self.time_visualizer_list[id].plot_thd(data[id]))
            # thd_list.append(self.time_visualizer_list[id].io_logging_thd(data[id]))

        run_async_func(thd_list)

        rx_packet_list, snr_list, cfo_list = self.mimo_detection.proc_rx(data)
        return rx_packet_list, snr_list, cfo_list
Functions
process(data, run_async_func)

Parameters:

Name Type Description Default
data

Multi-channel data array

required
Source code in rfsoc_rfdc/receiver/rx_analyzer/pipelines/mimo_pipeline.py
def process(self, data, run_async_func):
    """
    Args:
        data: Multi-channel data array
    """
    thd_list = []
    for id in range(self.channel_count):
        thd_list.append(
            self.time_visualizer_list[id].plot_thd(data[id]))
        # thd_list.append(self.time_visualizer_list[id].io_logging_thd(data[id]))

    run_async_func(thd_list)

    rx_packet_list, snr_list, cfo_list = self.mimo_detection.proc_rx(data)
    return rx_packet_list, snr_list, cfo_list

rfsoc_rfdc.receiver.rx_analyzer.pipelines.fmcw_pipeline

Classes

FmcwPipeline

Bases: BasePipeline

Pipeline for FMCW Radar Processing

Source code in rfsoc_rfdc/receiver/rx_analyzer/pipelines/fmcw_pipeline.py
class FmcwPipeline(BasePipeline):
    """Pipeline for FMCW Radar Processing"""

    def __init__(self, channel_id, detect_scheme):
        super().__init__(channel_id)

        self.detect_scheme = detect_scheme
        self.fmcw = FMCW()
        # The packet_len for detection must match the generated FMCW waveform length
        self.detect_scheme.packet_len = self.fmcw.chirp_num * self.fmcw.duration
        self.detect_scheme.pad_len = 10000

    def process(self, lbp_ch, ota_ch, run_async_func):

        lpb_pkt, snr, cfo, start_idx, end_idx = self.detect_scheme.proc_rx(
            lbp_ch)

        if lpb_pkt is None:
            logging.error(
                "FMCW pipeline: Failed to detect packet in loopback channel.")
            return np.nan, np.nan

        ota_pkt = ota_ch[start_idx:end_idx]

        # Ensure sliced packet has the expected length
        if len(ota_pkt) != len(lpb_pkt):
            logging.warning(
                f"OTA packet length ({len(ota_pkt)}) does not match loopback packet length ({len(lpb_pkt)}).")
            # Truncate or pad as necessary, here we truncate the longer one
            min_len = min(len(ota_pkt), len(lpb_pkt))
            ota_pkt = ota_pkt[:min_len]
            lpb_pkt = lpb_pkt[:min_len]

        sample_rate = ZCU216_CONFIG['ADCSampleRate'] / \
            ZCU216_CONFIG['ADCInterpolationRate'] * 1e6
        delay_axis, chirp_mat = self.fmcw.analyze_digital(
            lpb_pkt, ota_pkt, sample_rate=sample_rate)

        # Plotting results, following fmcw.py example
        # Define limits for plots, e.g. 2x the chirp duration
        limit_time = (self.fmcw.duration / sample_rate) * 2
        limit_idx = np.searchsorted(delay_axis, limit_time)
        limit_samples = int(limit_time * sample_rate)

        logger = self.detect_scheme.logger
        config_name = ZCU216_CONFIG['CONFIG_NAME']

        # Plot signals
        fig, ax = plt.subplots()
        ax.plot(np.arange(len(lpb_pkt))[:limit_samples], np.real(
            lpb_pkt)[:limit_samples], label='Loopback Packet')
        ax.plot(np.arange(len(ota_pkt))[:limit_samples], np.real(
            ota_pkt)[:limit_samples], label='OTA Packet')
        ax.plot(np.arange(len(ota_pkt))[:limit_samples], np.real(
            ota_pkt * np.conj(lpb_pkt))[:limit_samples], label='Mixed')
        ax.legend()
        ax.set_xlabel('Time (samples)')
        ax.set_ylabel('Amplitude')
        fig.savefig(logger.get_file_path(f'{config_name}_fmcw_wave.png'))
        path = logger.get_file_path(f'{config_name}_fmcw_wave.png')
        print(f"saved to {path}")
        plt.close(fig)

        # Plot single chirp response
        fig, ax = plt.subplots()
        ax.plot(delay_axis[:limit_idx], 20 *
                np.log10(chirp_mat[0, :limit_idx] + 1e-9))
        ax.set_xlabel('Delay (s)')
        ax.set_ylabel('Magnitude (dB)')
        fig.savefig(logger.get_file_path(
            f'{config_name}_fmcw_single_chirp.png'))
        plt.close(fig)

        # Plot range map
        fig, ax = plt.subplots()
        im = ax.imshow(20 * np.log10(chirp_mat[:, :limit_idx] + 1e-9), aspect='auto', cmap='viridis',
                       extent=[0, delay_axis[limit_idx - 1], chirp_mat.shape[0], 0])
        fig.colorbar(im, ax=ax, label='Magnitude (dB)')
        ax.set_title('Range Map')
        ax.set_xlabel('Delay (s)')
        ax.set_ylabel('Chirp Number')
        fig.savefig(logger.get_file_path(f'{config_name}_fmcw_range_map.png'))
        plt.close(fig)

        return snr, cfo

    def close(self):
        # In case any visualizers are added back or other resources need closing
        pass

    def __del__(self):
        self.close()

rfsoc_rfdc.receiver.rx_analyzer.pipelines.real2real_pipeline

Classes

Real2RealPipeline

Bases: BasePipeline

Pipeline for Real to Real conversion (Simple Pass-through for plotting)

Source code in rfsoc_rfdc/receiver/rx_analyzer/pipelines/real2real_pipeline.py
class Real2RealPipeline(BasePipeline):
    """Pipeline for Real to Real conversion (Simple Pass-through for plotting)"""

    def __init__(self, channel_id):
        super().__init__(channel_id)
        # Initialize real visualizer directly as it's simple
        self.waveform_visualizer = TimeDomainVisualizer(
            self.channel_id, mode="real2real")

    def process(self, data, run_async_func):
        thd_list = []
        thd_list.append(self.waveform_visualizer.plot_thd(data))
        # thd_list.append(self.waveform_visualizer.io_logging_thd(data))

        run_async_func(thd_list)
        # Real2Real doesn't return metrics like snr, cfo, evm, ber.
        # It's primarily for visualization of raw data.
        return None, None, None, None

    def close(self):
        self.waveform_visualizer.close()

    def __del__(self):
        self.close()

rfsoc_rfdc.receiver.rx_analyzer.pipelines.ch_power_pipeline

Classes

ChPowerPipeline

Bases: BasePipeline

Pipeline for Channel Power Measurement

Source code in rfsoc_rfdc/receiver/rx_analyzer/pipelines/ch_power_pipeline.py
class ChPowerPipeline(BasePipeline):
    """Pipeline for Channel Power Measurement"""

    def __init__(self, channel_id=0):
        super().__init__(channel_id)

        self.waveform_visualizer = TimeDomainVisualizer(
            channel_id, mode="real2iq")
        self.spectrum_visualizer = SpectrumVisualizer(channel_id)

    def process(self, data, run_async_func):
        # Time Domain and Spectrum Visualization
        thd_list = []
        thd_list.append(self.waveform_visualizer.plot_thd(data))
        thd_list.append(self.spectrum_visualizer.plot_thd(data))

        # Run visualization threads
        run_async_func(thd_list)

        try:
            power = np.mean(np.abs(data)**2)
            dBm = 10 * np.log10(power * 1000)
            return dBm
        except Exception as e:
            logging.error(
                f"Rx #{self.channel_id} Failed to calculate power spectrum: {e}")
            return 0.0