Skip to content

FMCW RX Task

rfsoc_rfdc.receiver.fmcw_rx_task

Classes

FmcwRxTask

Bases: MultiChRxTask

FMCW Radar Rx Task

Source code in rfsoc_rfdc/receiver/fmcw_rx_task.py
class FmcwRxTask(MultiChRxTask):
    """FMCW Radar Rx Task"""

    def __init__(self, overlay, channel_count=2, dp_vect_dim=4):
        super().__init__(overlay, "real2iq", channel_count, dp_vect_dim, buff_size=2**21)

    def _channel_factory(self):
        super()._channel_factory()

        if self.mode == "real2iq":
            # FMCW usually requires RadarDetection to get packet indices
            fs = ZCU216_CONFIG['ADCSampleRate'] / \
                ZCU216_CONFIG['ADCInterpolationRate'] * 1e6
            detect_scheme = RadarDetection(sample_rate=fs)
            pipeline = FmcwPipeline(channel_id=0, detect_scheme=detect_scheme)

            self.rx_analyzers = Real2IqDriver(pipeline=pipeline, channel_id=0)
        else:
            raise RuntimeError(f"Unrecognized mode {self.mode}")

    def run(self):
        while not self._stop_event.is_set():
            self._pause_event.wait()
            if self._stop_event.is_set():
                break

            self.rx_ch.transfer()
            self.rx_ch.transfer()  # This is necessary to cleanup samples from previous capture
            print("transfer done")

            raw_mch_data = self.rx_ch.data
            mch_complex_arr = self._layout_factory(raw_mch_data)

            # Assumed 2 RX for FMCW: Ch0 loopback, Ch1 OTA
            lpb_ch = mch_complex_arr[0]
            ota_ch = mch_complex_arr[1]

            snr, cfo = self.rx_analyzers.proc_rx(lpb_ch, ota_ch)
            logging.info(f"SNR: {snr}, CFO: {cfo}")