Skip to content

TX Channel

rfsoc_rfdc.transmitter.tx_channel

Classes

TxChannel

Source code in rfsoc_rfdc/transmitter/tx_channel.py
class TxChannel:
    def __init__(self, dma_ip, fifo_count_ip, target_device, debug_mode=False):
        self.tx_buff = None
        self.tx_dma = dma_ip
        self.warning_cnt = 0
        self.debug_mode = debug_mode
        self.target_device = target_device
        # Config FIFO count IP
        self.fifo_count = fifo_count_ip
        self.fifo_count.setdirection("in")
        self.fifo_count.setlength(32)

    def data_type_check(self, buff):
        # Validations for input buffers
        if not isinstance(buff, np.ndarray):
            raise TypeError("buff must be of type numpy.ndarray")
        if buff.dtype != MyRFdcType.DATA_PATH_DTYPE:
            raise TypeError("buff must be of data type numpy.int16")

    def data_copy(self, buff):
        self.data_type_check(buff)
        # Release previous buffer if it exists
        self.close()
        # Buffer copy
        self.tx_buff = allocate(shape=(buff.size,),
                                dtype=MyRFdcType.DATA_PATH_DTYPE, target=self.target_device)
        self.tx_buff[:] = buff[:]

    def _monitor_fifo(self):
        if self.debug_mode:
            fifo_count = self.fifo_count.read()

            # Warning for low FIFO count
            if fifo_count == 0:
                self.warning_cnt += 1
            if self.warning_cnt > 1000:
                self.warning_cnt = 0
                logging.info(
                    f"[Tx] Warning: Tx FIFO count {fifo_count} is zero. DMA transfer is too slow!")

    def transfer(self):
        # Monitor FIFO in debug mode
        self._monitor_fifo()
        # Trigger DMA transfer
        self.tx_dma.transfer(self.tx_buff)

    def stream(self, duty_cycle=100):
        # Monitor FIFO in debug mode
        self._monitor_fifo()
        # Trigger DMA transfer
        self.tx_dma.stream(self.tx_buff, duty_cycle=duty_cycle)

    def close(self):
        """Release the DMA buffer"""
        if hasattr(self, 'tx_buff') and self.tx_buff is not None:
            # PYNQ buffers have a close() method to release the memory
            if hasattr(self.tx_buff, 'close'):
                self.tx_buff.close()
            self.tx_buff = None

    def __del__(self):
        self.close()
Functions
close()

Release the DMA buffer

Source code in rfsoc_rfdc/transmitter/tx_channel.py
def close(self):
    """Release the DMA buffer"""
    if hasattr(self, 'tx_buff') and self.tx_buff is not None:
        # PYNQ buffers have a close() method to release the memory
        if hasattr(self.tx_buff, 'close'):
            self.tx_buff.close()
        self.tx_buff = None

rfsoc_rfdc.transmitter.tx_channel_iq2real

Classes

TxChannelIq2Real

Bases: TxChannel

A IQ-to-real Tx channel, also works for IQ-to-IQ Tx channel.

Source code in rfsoc_rfdc/transmitter/tx_channel_iq2real.py
class TxChannelIq2Real(TxChannel):
    """
    A IQ-to-real Tx channel, also works for IQ-to-IQ Tx channel.
    """

    def __init__(self, dma_ip, fifo_count_ip, target_device, debug_mode=False):
        super().__init__(dma_ip, fifo_count_ip, target_device, debug_mode)

    def data_copy(self, complex_array):
        i_arr, q_arr = np.int16(
            complex_array.real), np.int16(complex_array.imag)

        if i_arr.size != q_arr.size:
            raise ValueError("i/q array must have the same size")
        self.data_type_check(i_arr)
        self.data_type_check(q_arr)

        # Release previous buffer if it exists
        self.close()

        self.tx_buff = allocate(shape=(complex_array.size*2,),
                                dtype=MyRFdcType.DATA_PATH_DTYPE, target=self.target_device)
        # iq samples shall be interleaved, i samples for even indices and q samples for odd indices
        self.tx_buff[0::2] = i_arr
        self.tx_buff[1::2] = q_arr

rfsoc_rfdc.transmitter.tx_channel_real2real

Classes

TxChannelReal2Real

Bases: TxChannel

A real to real Tx channel.

Source code in rfsoc_rfdc/transmitter/tx_channel_real2real.py
class TxChannelReal2Real(TxChannel):
    """
    A real to real Tx channel.
    """

    def __init__(self, dma_ip, fifo_count_ip, target_device, debug_mode=False):
        super().__init__(dma_ip, fifo_count_ip, target_device, debug_mode)

    def data_copy(self, real_array):
        self.data_type_check(real_array)
        # Release previous buffer if it exists
        self.close()
        self.tx_buff = allocate(shape=(real_array.size,),
                                dtype=MyRFdcType.DATA_PATH_DTYPE, target=self.target_device)
        self.tx_buff[:] = real_array[:]