Skip to content

RX Analyzer Components

rfsoc_rfdc.receiver.rx_analyzer.components.base_component

rfsoc_rfdc.receiver.rx_analyzer.components.time_domain_visualizer

Classes

TimeDomainVisualizer

Bases: BaseComponent

Visualize and save raw waveform data

Source code in rfsoc_rfdc/receiver/rx_analyzer/components/time_domain_visualizer.py
class TimeDomainVisualizer(BaseComponent):
    """Visualize and save raw waveform data"""

    def __init__(self, channel_id, mode="real2iq"):
        super().__init__(channel_id)
        if mode == "real2iq" or mode == "iq2iq":
            self.time_domain_plotter = ComplexSignalPlotter(
                title=f"TimeDomainVisualizer (IQ) (Channel {channel_id})")
        elif mode == "real2real":
            self.time_domain_plotter = RealSignalPlotter(
                title=f"TimeDomainVisualizer (Channel {channel_id})")
        else:
            raise RuntimeError(f"Unrecognize mode {mode}")

    def io_logging_thd(self, data):
        fname = f"{ZCU216_CONFIG['LOGGER'].get_log_dir()}/Rx{self.channel_id}_raw"
        thd = threading.Thread(target=save_to_file, args=(
            data, fname))
        return thd

    def plot_thd(self, data):
        thd = threading.Thread(
            target=self.time_domain_plotter.update_plot, args=(data, ))
        return thd

    def close(self):
        self.time_domain_plotter.close()

    def __del__(self):
        self.close()

TimeDomainPacketVisualizer

Bases: BaseComponent

Visualizer for time-domain packet analysis (assumed real2iq mode)

Source code in rfsoc_rfdc/receiver/rx_analyzer/components/time_domain_visualizer.py
class TimeDomainPacketVisualizer(BaseComponent):
    """Visualizer for time-domain packet analysis (assumed real2iq mode)"""

    def __init__(self, channel_id):
        super().__init__(channel_id)
        self.time_pkt_plotter = ComplexSignalPlotter(
            title=f"TimeDomainPacketVisualizer (Channel {channel_id})")

    def io_logging_thd(self, pkt_iq_data):
        fname = f"{ZCU216_CONFIG['LOGGER'].get_log_dir()}/Rx_pkt"
        thd = threading.Thread(target=save_to_file, args=(
            pkt_iq_data, fname))
        return thd

    def plot_thd(self, pkt_iq_data):
        thd = threading.Thread(
            target=self.time_pkt_plotter.update_plot, args=(pkt_iq_data, ))
        return thd

    def close(self):
        self.time_pkt_plotter.close()

    def __del__(self):
        self.close()

Functions

rfsoc_rfdc.receiver.rx_analyzer.components.spectrum_visualizer

Classes

SpectrumVisualizer

Bases: BaseComponent

Visualizer for frequency-domain packet analysis

Source code in rfsoc_rfdc/receiver/rx_analyzer/components/spectrum_visualizer.py
class SpectrumVisualizer(BaseComponent):
    """Visualizer for frequency-domain packet analysis"""

    def __init__(self, channel_id, mode="real2iq"):
        super().__init__(channel_id)
        dac_samp_rate = ZCU216_CONFIG['DACSampleRate'] / \
            ZCU216_CONFIG['DACInterpolationRate'] * 1e6
        self.fft_pkt_plotter = FFTPlotter(
            sample_rate=dac_samp_rate,
            title=f"SpectrumVisualizer (Channel {channel_id})")

    def io_logging_thd(self, pkt_iq_data):
        pass

    def plot_thd(self, pkt_iq_data):
        thd = threading.Thread(
            target=self.fft_pkt_plotter.update_plot, args=(pkt_iq_data,))
        return thd

    def close(self):
        self.fft_pkt_plotter.close()

    def __del__(self):
        self.close()

rfsoc_rfdc.receiver.rx_analyzer.components.ofdm_metrics_calculator

Classes

OfdmMetricsCalculator

Bases: BaseComponent

Calculator for OFDM-specific processing (assume real2iq mode)

Source code in rfsoc_rfdc/receiver/rx_analyzer/components/ofdm_metrics_calculator.py
class OfdmMetricsCalculator(BaseComponent):
    """Calculator for OFDM-specific processing (assume real2iq mode)"""

    def __init__(self, channel_id, ofdm_scheme, detect_scheme):
        super().__init__(channel_id)
        self.ofdm_scheme = ofdm_scheme
        self.detect_scheme = detect_scheme

    def io_logging_thd(self, raw_iq):
        pass

    def plot_thd(self, raw_iq):
        thd = threading.Thread(
            target=self.detect_scheme.update_plot, args=(raw_iq,))
        return thd

    def analyze_packet(self, raw_iq):
        """Perform packet detection and OFDM analysis"""
        rx_packet, snr, cfo = self.detect_scheme.proc_rx(raw_iq)

        if rx_packet is None:
            return None, snr, cfo, np.nan, np.nan

        evm, ber = self.ofdm_scheme.analyze(rx_packet, self.channel_id)

        # Log results using logger from config
        logger = ZCU216_CONFIG['LOGGER']
        config_name = ZCU216_CONFIG['CONFIG_NAME']
        log_line = f"{snr:.3f}, {cfo:.3f}, {evm:.3f}, {ber:.10f}"
        logger.log_metrics(
            f"{config_name}_CH{self.channel_id}_res.log", log_line)

        return rx_packet, snr, cfo, evm, ber
Functions
analyze_packet(raw_iq)

Perform packet detection and OFDM analysis

Source code in rfsoc_rfdc/receiver/rx_analyzer/components/ofdm_metrics_calculator.py
def analyze_packet(self, raw_iq):
    """Perform packet detection and OFDM analysis"""
    rx_packet, snr, cfo = self.detect_scheme.proc_rx(raw_iq)

    if rx_packet is None:
        return None, snr, cfo, np.nan, np.nan

    evm, ber = self.ofdm_scheme.analyze(rx_packet, self.channel_id)

    # Log results using logger from config
    logger = ZCU216_CONFIG['LOGGER']
    config_name = ZCU216_CONFIG['CONFIG_NAME']
    log_line = f"{snr:.3f}, {cfo:.3f}, {evm:.3f}, {ber:.10f}"
    logger.log_metrics(
        f"{config_name}_CH{self.channel_id}_res.log", log_line)

    return rx_packet, snr, cfo, evm, ber

rfsoc_rfdc.receiver.rx_analyzer.components.fmcw_metrics_calculator

Classes

FmcwMetricsCalculator

Bases: BaseComponent

Calculator for FMCW-specific processing (assume real2iq mode)

Source code in rfsoc_rfdc/receiver/rx_analyzer/components/fmcw_metrics_calculator.py
class FmcwMetricsCalculator(BaseComponent):
    """Calculator for FMCW-specific processing (assume real2iq mode)"""

    def __init__(self, channel_id, fmcw_scheme):
        super().__init__(channel_id)
        self.fmcw_scheme = fmcw_scheme

    def io_logging_thd(self, raw_iq):
        pass

    def plot_thd(self, raw_iq):
        thd = threading.Thread(
            target=self.detect_scheme.update_plot, args=(raw_iq,))
        return thd

    def analyze_packet(self, rx_packet_ref, rx_packet_air):
        """Perform FMCW analysis"""
        adc_iq_samp_rate = ZCU216_CONFIG['ADCSampleRate'] / \
            ZCU216_CONFIG['ADCInterpolationRate']
        config_name = ZCU216_CONFIG['CONFIG_NAME']

        delay_axis, chirp_mat = self.fmcw_scheme.analyze_digital(
            rx_packet_ref, rx_packet_air, sample_rate=adc_iq_samp_rate)

        # Plot single chirp response
        fig, ax = plt.subplots()
        ax.plot(delay_axis, 20 * np.log10(chirp_mat[0, :]))
        ax.set_xlabel('Delay (s)')
        ax.set_ylabel('Magnitude (dB)')
        fig.savefig(
            f"{self.detect_scheme.path2wave}/{config_name}_FMCW_chirp_response.png")
        plt.close(fig)

        # Plot range map
        fig, ax = plt.subplots()
        ax.imshow(chirp_mat, aspect='auto')
        ax.set_title('Range Map')
        ax.set_xlabel('Delay (samples)')
        ax.set_ylabel('Chirp Number')
        fig.savefig(
            f"{self.detect_scheme.path2wave}/{config_name}_FMCW_range_map.png")
        plt.close(fig)
Functions
analyze_packet(rx_packet_ref, rx_packet_air)

Perform FMCW analysis

Source code in rfsoc_rfdc/receiver/rx_analyzer/components/fmcw_metrics_calculator.py
def analyze_packet(self, rx_packet_ref, rx_packet_air):
    """Perform FMCW analysis"""
    adc_iq_samp_rate = ZCU216_CONFIG['ADCSampleRate'] / \
        ZCU216_CONFIG['ADCInterpolationRate']
    config_name = ZCU216_CONFIG['CONFIG_NAME']

    delay_axis, chirp_mat = self.fmcw_scheme.analyze_digital(
        rx_packet_ref, rx_packet_air, sample_rate=adc_iq_samp_rate)

    # Plot single chirp response
    fig, ax = plt.subplots()
    ax.plot(delay_axis, 20 * np.log10(chirp_mat[0, :]))
    ax.set_xlabel('Delay (s)')
    ax.set_ylabel('Magnitude (dB)')
    fig.savefig(
        f"{self.detect_scheme.path2wave}/{config_name}_FMCW_chirp_response.png")
    plt.close(fig)

    # Plot range map
    fig, ax = plt.subplots()
    ax.imshow(chirp_mat, aspect='auto')
    ax.set_title('Range Map')
    ax.set_xlabel('Delay (samples)')
    ax.set_ylabel('Chirp Number')
    fig.savefig(
        f"{self.detect_scheme.path2wave}/{config_name}_FMCW_range_map.png")
    plt.close(fig)