Skip to content

multi_ch_rx_task

rfsoc_rfdc.receiver.multi_ch_rx_task

Attributes

Classes

MultiChRxTask(overlay, mode='real2iq', channel_count=4, dp_vect_dim=1, buff_size=2 ** 21)

Bases: OverlayTask

Multi-Channel Rx Task

Source code in rfsoc_rfdc/receiver/multi_ch_rx_task.py
def __init__(self, overlay, mode="real2iq", channel_count=4, dp_vect_dim=1, buff_size=2**21):
    super().__init__(overlay, name="MultiChRxTask")
    self.mode = mode
    # Number of DACs controlled by a DMA
    self.channel_count = channel_count
    # IQ samples vectorization in the datapath
    self.dp_vect_dim = dp_vect_dim
    # Receiver datapath parameters
    self.buff_size = buff_size
    # Hardware IPs
    self.ch_dma = self.ol.adc_datapath.data_mover_ctrl
    self.ch_fifo_count_ip = AxiGPIO(
        self.ol.ip_dict['adc_datapath/fifo_count']).channel1
Attributes
mode = mode instance-attribute
channel_count = channel_count instance-attribute
dp_vect_dim = dp_vect_dim instance-attribute
buff_size = buff_size instance-attribute
ch_dma = self.ol.adc_datapath.data_mover_ctrl instance-attribute
ch_fifo_count_ip = AxiGPIO(self.ol.ip_dict['adc_datapath/fifo_count']).channel1 instance-attribute
Functions
run()
Source code in rfsoc_rfdc/receiver/multi_ch_rx_task.py
def run(self):
    # Rx init
    self._channel_factory()
    """Main task loop"""
    while self.task_state != TASK_STATE["STOP"]:
        if self.task_state == TASK_STATE["RUNNING"]:
            # DMA transfer
            self.rx_ch.transfer()  # Additional DMA transfer that clears out remainings in FIFO
            self.rx_ch.transfer()
            raw_mch_data = self.rx_ch.data
            mch_complex_arr = self._layout_factory(raw_mch_data)
            # Process multi-channel data here
        else:
            time.sleep(1)