Skip to content

single_ch_rx_task

rfsoc_rfdc.receiver.single_ch_rx_task

Attributes

Classes

SingleChRxTask(overlay, mode='real2iq', buff_size=2 ** 18)

Bases: OverlayTask

Single-Channel ADC Task

Initialize SingleChRxTask

Source code in rfsoc_rfdc/receiver/single_ch_rx_task.py
def __init__(self, overlay, mode="real2iq", buff_size=2**18):
    """Initialize SingleChRxTask"""
    super().__init__(overlay, name="SingleChRxTask")
    self.mode = mode
    # Receiver datapath parameters
    self.buff_size = buff_size
    # Hardware IPs
    self.dma_ip = self.ol.adc_datapath.data_mover_ctrl
    self.fifo_count_ip = AxiGPIO(
        self.ol.ip_dict['adc_datapath/fifo_count']).channel1
    # Initialize Rx channel
    self._receiver_factory(mode)
Attributes
mode = mode instance-attribute
buff_size = buff_size instance-attribute
dma_ip = self.ol.adc_datapath.data_mover_ctrl instance-attribute
fifo_count_ip = AxiGPIO(self.ol.ip_dict['adc_datapath/fifo_count']).channel1 instance-attribute
Functions
run()

Main task loop

Source code in rfsoc_rfdc/receiver/single_ch_rx_task.py
def run(self):
    """Main task loop"""
    while self.task_state != TASK_STATE["STOP"]:
        if self.task_state == TASK_STATE["RUNNING"]:
            time.sleep(1)
            # DMA transfer
            self.rx_ch.transfer()
            wave_rx = self.rx_ch.data
            self.rx_analyzer.proc_rx(wave_rx)
        else:
            time.sleep(1)