Skip to content

dma_monitor

Streaming DMA controller drivers (v1–v4) and TxDmaMonitor / RxDmaMonitor wrappers.

rfsoc_rfdc.dma_monitor

Classes

DmaMonitor(dma_ip, fifo_count_ip)

Bases: ABC

Source code in rfsoc_rfdc/dma_monitor.py
def __init__(self, dma_ip, fifo_count_ip):
    # DMA IP core instance
    self.dma = dma_ip
    # Configure FIFO count IP core
    self.fifo_count = fifo_count_ip
    self.fifo_count.setdirection("in")
    self.fifo_count.setlength(32)
Attributes
dma = dma_ip instance-attribute
fifo_count = fifo_count_ip instance-attribute
Functions
get_fifo_count()
Source code in rfsoc_rfdc/dma_monitor.py
def get_fifo_count(self):
    return self.fifo_count.read()
get_debug_info()
Source code in rfsoc_rfdc/dma_monitor.py
def get_debug_info(self):
    return self.dma.register_map
transfer(buffer) abstractmethod
Source code in rfsoc_rfdc/dma_monitor.py
@abstractmethod
def transfer(self, buffer):
    pass
wait() abstractmethod
Source code in rfsoc_rfdc/dma_monitor.py
@abstractmethod
def wait(self):
    pass
stop() abstractmethod
Source code in rfsoc_rfdc/dma_monitor.py
@abstractmethod
def stop(self):
    pass

StreamingDmaBase(description, fsm_lut)

Bases: DefaultIP

This class serves as a base class for StreamingDma. StreamingDma uses a custom IP to control the Xilins's AXI Data Mover IP.

Source code in rfsoc_rfdc/dma_monitor.py
def __init__(self, description, fsm_lut):
    super().__init__(description=description)
    self._FSM_LUT = fsm_lut
Attributes
MASK_32b = 4294967295 class-attribute instance-attribute
MAX_BTT = 8388607 class-attribute instance-attribute
Functions
state()
Source code in rfsoc_rfdc/dma_monitor.py
def state(self):
    return self._FSM_LUT[self._state]
get_debug_info()
Source code in rfsoc_rfdc/dma_monitor.py
def get_debug_info(self):
    base_addr = ((self._base_addr_upper & self.MASK_32b) <<
                 32) | (self._base_addr_lower & self.MASK_32b)
    debug_info = f"start = {self._cmd}, btt = {self._btt}, base_addr = {base_addr}, state = {self.state()}"
    return debug_info
__del__()
Source code in rfsoc_rfdc/dma_monitor.py
def __del__(self):
    self.stop()

StreamingDmaV1(description)

Bases: StreamingDmaBase

Streaming DMA driver v1.0 allows user to start and stop continuously DMA transfer by slipping self._cmd between 0 (stop) and 1 (start).

Source code in rfsoc_rfdc/dma_monitor.py
def __init__(self, description):
    super().__init__(description, fsm_lut=['S_IDLE', 'S_STREAM', 'S_ERROR'])
Attributes
bindto = ['user.org:user:data_mover_ctrl:1.0'] class-attribute instance-attribute
Functions
transfer(buffer)
Source code in rfsoc_rfdc/dma_monitor.py
def transfer(self, buffer):
    self._config(buffer.physical_address, buffer.nbytes)
    self._cmd = 1
stop()
Source code in rfsoc_rfdc/dma_monitor.py
def stop(self):
    self._cmd = 0

StreamingDmaV2(description)

Bases: StreamingDmaBase

Streaming DMA driver v2.0 allows user to start and stop continuously DMA transfer by slipping self._cmd between 0 (stop) and 1 (start).

Source code in rfsoc_rfdc/dma_monitor.py
def __init__(self, description):
    super().__init__(description, fsm_lut=[
        'S_IDLE', 'S_STREAM', 'S_HALT', 'S_HALT_RST', 'S_ERROR'])
Attributes
bindto = ['user.org:user:data_mover_ctrl:2.0'] class-attribute instance-attribute
Functions
transfer(buffer)
Source code in rfsoc_rfdc/dma_monitor.py
def transfer(self, buffer):
    self._config(buffer.physical_address, buffer.nbytes)
    self._cmd = 1
stop()
Source code in rfsoc_rfdc/dma_monitor.py
def stop(self):
    self._cmd = 0

StreamingDmaV3(description)

Bases: StreamingDmaBase

Streaming DMA driver v3.0 allows users to send the following commands.

1) IDLE: DMA goes to IDLE state 2) SINGLE: DMA make a single data transfer 3) STREAM: DMA stream data continuously

Source code in rfsoc_rfdc/dma_monitor.py
def __init__(self, description):
    super().__init__(description, fsm_lut=[
        'S_IDLE', 'S_STREAM', 'S_HALT', 'S_HALT_RST', 'S_ERROR', 'S_SINGLE'])
Attributes
bindto = ['user.org:user:data_mover_ctrl:3.0'] class-attribute instance-attribute
Functions
transfer(buffer)
Source code in rfsoc_rfdc/dma_monitor.py
def transfer(self, buffer):
    self._config(buffer.physical_address, buffer.nbytes)
    self._cmd = self._CMD_LUT['SINGLE']
    while True:
        if self.state() == 'S_IDLE':
            break  # CPU polling the FSM state
        else:
            time.sleep(0.01)
    self._cmd = self._CMD_LUT['IDLE']
stream(buffer)
Source code in rfsoc_rfdc/dma_monitor.py
def stream(self, buffer):
    self._config(buffer.physical_address, buffer.nbytes)
    self._cmd = self._CMD_LUT['STREAM']
stop()
Source code in rfsoc_rfdc/dma_monitor.py
def stop(self):
    self._cmd = self._CMD_LUT['IDLE']

StreamingDmaV4(description)

Bases: StreamingDmaBase

Streaming DMA driver v4.0 allows users to send the following commands.

1) IDLE: DMA goes to IDLE state 2) SINGLE: DMA make a single data transfer 3) STREAM: DMA stream data continuously 4) DUTY: DMA stream data continuously with duty cycling

Source code in rfsoc_rfdc/dma_monitor.py
def __init__(self, description):
    super().__init__(description, fsm_lut=[
        'S_IDLE', 'S_STREAM', 'S_HALT', 'S_HALT_RST', 'S_ERROR', 'S_SINGLE', 'S_WAIT', 'S_DUTY', 'S_SING_WAIT'])
    self.iq_samples_per_cyc = 8
Attributes
iq_samples_per_cyc = 8 instance-attribute
bindto = ['user.org:user:data_mover_ctrl:4.0'] class-attribute instance-attribute
Functions
transfer(buffer)
Source code in rfsoc_rfdc/dma_monitor.py
def transfer(self, buffer):
    self._config(buffer.physical_address, buffer.nbytes)
    self._cmd = self._CMD_LUT['SINGLE']
    # Trigger FSM to jump from S_SING_WAIT to S_IDLE
    self._cmd = self._CMD_LUT['IDLE']
    while True:
        if self.state() == 'S_IDLE':
            break  # CPU polling the FSM state
        else:
            time.sleep(0.01)
stream(buffer, duty_cycle=100)
Source code in rfsoc_rfdc/dma_monitor.py
def stream(self, buffer, duty_cycle=100):
    if duty_cycle != 100:  # Duty cycling
        ratio_fp = (100 - duty_cycle) / duty_cycle
        dma_time_in_cyc = int(buffer.size / self.iq_samples_per_cyc)
        self._duty_cyc_target = int(ratio_fp * dma_time_in_cyc)
        self._config(buffer.physical_address, buffer.nbytes)
        self._cmd = self._CMD_LUT['DUTY']
    else:
        self._config(buffer.physical_address, buffer.nbytes)
        self._cmd = self._CMD_LUT['STREAM']
stop()
Source code in rfsoc_rfdc/dma_monitor.py
def stop(self):
    self._cmd = self._CMD_LUT['IDLE']

TxDmaMonitor(dma_ip, fifo_count_ip)

Bases: DmaMonitor

This is the Tx DMA driver for Xilinx's DMA IP

Source code in rfsoc_rfdc/dma_monitor.py
def __init__(self, dma_ip, fifo_count_ip):
    # DMA IP core instance
    self.dma = dma_ip
    # Configure FIFO count IP core
    self.fifo_count = fifo_count_ip
    self.fifo_count.setdirection("in")
    self.fifo_count.setlength(32)
Functions
transfer(buffer)
Source code in rfsoc_rfdc/dma_monitor.py
def transfer(self, buffer):
    self.dma.sendchannel.transfer(buffer)
wait()
Source code in rfsoc_rfdc/dma_monitor.py
def wait(self):
    self.dma.sendchannel.wait()
stop()
Source code in rfsoc_rfdc/dma_monitor.py
def stop(self):
    pass

RxDmaMonitor(dma_ip, fifo_count_ip)

Bases: DmaMonitor

This is the Rx DMA driver for Xilinx's DMA IP

Source code in rfsoc_rfdc/dma_monitor.py
def __init__(self, dma_ip, fifo_count_ip):
    # DMA IP core instance
    self.dma = dma_ip
    # Configure FIFO count IP core
    self.fifo_count = fifo_count_ip
    self.fifo_count.setdirection("in")
    self.fifo_count.setlength(32)
Functions
transfer(buffer)
Source code in rfsoc_rfdc/dma_monitor.py
def transfer(self, buffer):
    self.dma.recvchannel.transfer(buffer)
wait()
Source code in rfsoc_rfdc/dma_monitor.py
def wait(self):
    self.dma.recvchannel.wait()
stop()
Source code in rfsoc_rfdc/dma_monitor.py
def stop(self):
    pass