Skip to content

multi_ch_tx_task

rfsoc_rfdc.transmitter.multi_ch_tx_task

Attributes

Classes

MultiChTxTask(overlay, mode='iq2real', channel_count=4, dp_vect_dim=1)

Bases: OverlayTask

Source code in rfsoc_rfdc/transmitter/multi_ch_tx_task.py
def __init__(self, overlay, mode="iq2real", channel_count=4, dp_vect_dim=1):
    super().__init__(overlay, name="MultiChTxTask")
    self.mode = mode
    # Number of DACs controlled by a DMA
    self.channel_count = channel_count
    # IQ samples vectorization in the datapath
    self.dp_vect_dim = dp_vect_dim
    # Hardware IPs
    self.ch_dma = self.ol.dac_datapath.data_mover_ctrl
    self.ch_fifo_count_ip = AxiGPIO(
        self.ol.ip_dict['dac_datapath/fifo_count']).channel1
    self._channel_factory()
    self._layout_factory()
Attributes
mode = mode instance-attribute
channel_count = channel_count instance-attribute
dp_vect_dim = dp_vect_dim instance-attribute
ch_dma = self.ol.dac_datapath.data_mover_ctrl instance-attribute
ch_fifo_count_ip = AxiGPIO(self.ol.ip_dict['dac_datapath/fifo_count']).channel1 instance-attribute
Functions
data_preparation()
Source code in rfsoc_rfdc/transmitter/multi_ch_tx_task.py
def data_preparation(self):
    if self.mode == "iq2real":
        tx_data_generator = TxDataGenerator()
        sch_data = tx_data_generator.get_iq_samples()
    else:
        sch_data = wg.generate_sine_wave(repeat_time=100, sample_pts=8)
    # Replicate across all channels
    data = np.tile(sch_data, (self.channel_count, 1))
    # Generate multi-channel memory layout
    mch_data = self.mch_mem_layout.gen_layout(data)
    # Convert to np.int16
    mch_data = mch_data.astype(np.int16)
    # Perform data copy
    self.tx_ch.data_copy(mch_data)
    logging.info(f"Tx data preparation done.")
run()
Source code in rfsoc_rfdc/transmitter/multi_ch_tx_task.py
def run(self):
    self.data_preparation()
    # Start transmission
    while self.task_state != TASK_STATE["STOP"]:
        if self.task_state == TASK_STATE["RUNNING"]:
            # Streaming IQ samples
            self.tx_ch.stream(duty_cycle=100)
            time.sleep(1)
        else:
            self.tx_ch.tx_dma.stop()
            time.sleep(1)