class MyRFdc:
"""My class for controlling the RF Data Converter."""
def __init__(self, rfdc_ip, debug_mode=False):
self.rfdc = rfdc_ip
self.rfdc_status = MyRFdcStatus(self.rfdc)
self.rfdc_cfg = MyRFdcConfig()
self.dac_tiles = [MyRFdcDACTile(
tile_id, tile) for tile_id, tile in enumerate(self.rfdc.dac_tiles)]
self.adc_tiles = [MyRFdcADCTile(
tile_id, tile) for tile_id, tile in enumerate(self.rfdc.adc_tiles)]
self.clock_src = self.rfdc_cfg.freq_cfg.clock_src
self.debug_mode = debug_mode
def debug(self):
# Attributes for both DAC and ADC blocks
block_attr = ['BlockStatus', 'MixerSettings', 'QMCSettings',
'CoarseDelaySettings', 'NyquistZone', 'EnabledInterrupts', 'PwrMode']
# Attributes for DAC block only
dac_only_attr = ['InterpolationFactor', 'DecoderMode', # 'OutputCurr',
'InvSincFIR', 'FabRdVldWords', 'FabWrVldWords', 'DataPathMode', 'IMRPassMode', 'DACCompMode']
# Attributes for ADC block only
adc_only_attr = ['DecimationFactor', 'CalibrationMode', 'FabRdVldWords',
'FabWrVldWords', 'DecimationFactorObs', 'FabRdVldWordsObs', 'FabWrVldWordsObs', 'Dither', 'CalFreeze', 'DSA']
for tile in self.dac_tiles:
if self.rfdc_status.get_dac_tile_enb(tile.tile_id):
for block_id, block in enumerate(tile.blocks):
if self.rfdc_status.get_dac_block_enb(
tile.tile_id, block_id):
for attr in block_attr:
logging.info(attr+": "+str(getattr(block, attr)))
for attr in dac_only_attr:
logging.info(attr+": "+str(getattr(block, attr)))
for tile in self.adc_tiles:
if self.rfdc_status.get_adc_tile_enb(tile.tile_id):
for block_id, block in enumerate(tile.blocks):
if self.rfdc_status.get_adc_block_enb(
tile.tile_id, block_id):
for attr in block_attr:
logging.info(attr+": "+str(getattr(block, attr)))
for attr in adc_only_attr:
logging.info(attr+": "+str(getattr(block, attr)))
def init(self):
"""Power on DAC/ADC tiles"""
# Power on DAC tiles
for tile in self.dac_tiles:
self.power_on_dac_tile(tile)
# Power on ADC tiles
for tile in self.adc_tiles:
self.power_on_adc_tile(tile)
def setup(self):
"""Configure DAC/ADC blocks"""
# Configure DAC blocks within each tile
for tile in self.dac_tiles:
self.config_dac_blocks(tile)
# Configure ADC blocks within each tile
for tile in self.adc_tiles:
self.config_adc_blocks(tile)
def is_ready(self):
"""Check if all tiles are powered up"""
for tile in self.dac_tiles:
if not self.is_dac_tile_ready(tile):
return False
for tile in self.adc_tiles:
if not self.is_adc_tile_ready(tile):
return False
return True
def shutdown_tiles(self):
"""Safely shutdown all tiles."""
for tile in self.dac_tiles:
if self.rfdc_status.get_dac_tile_enb(tile.tile_id):
tile.ShutDown()
for tile in self.adc_tiles:
if self.rfdc_status.get_adc_tile_enb(tile.tile_id):
tile.ShutDown()
logging.info(f"All tiles has been safely shutdown!")
def is_dac_tile_ready(self, tile):
tile_state = self.rfdc_status.get_dac_tile_state(tile.tile_id)
if self.rfdc_status.get_dac_tile_enb(tile.tile_id):
return False if tile_state < 15 else True
else:
return True
def power_on_dac_tile(self, tile):
"""Power on DAC tiles."""
if self.rfdc_status.get_dac_tile_enb(tile.tile_id):
# Configure a single tile
tile.DynamicPLLConfig(
self.clock_src, self.rfdc_cfg.freq_cfg.ref_clk, self.rfdc_cfg.freq_cfg.dac_samp_rate)
time.sleep(1)
tile.SetupFIFO(True)
# Check tile state
tile_state = self.rfdc_status.get_dac_tile_state(tile.tile_id)
if tile_state < 15:
err_msg = f"DAC tile {tile.tile_id} ({tile.tile_phy_id}) is NOT fully powered up! Stuck at Step {tile_state}: {MyRFdcType.POWER_ON_SEQUENCE_STEPS[tile_state]['State']} Description: {MyRFdcType.POWER_ON_SEQUENCE_STEPS[tile_state]['Description']}"
if 6 <= tile_state <= 10 and self.debug_mode:
logging.info(
self.rfdc_status.get_dac_clk_dist(tile.tile_id))
raise Exception(err_msg)
logging.info(
f"DAC tile {tile.tile_id} ({tile.tile_phy_id}) is fully powered up!")
def is_adc_tile_ready(self, tile):
tile_state = self.rfdc_status.get_adc_tile_state(tile.tile_id)
if self.rfdc_status.get_adc_tile_enb(tile.tile_id):
return False if tile_state < 15 else True
else:
return True
def power_on_adc_tile(self, tile):
"""Power on ADC tiles."""
if self.rfdc_status.get_adc_tile_enb(tile.tile_id):
# Configure a single tile
tile.DynamicPLLConfig(
self.clock_src, self.rfdc_cfg.freq_cfg.ref_clk, self.rfdc_cfg.freq_cfg.adc_samp_rate)
time.sleep(1)
tile.SetupFIFO(True)
# Check tile state
tile_state = self.rfdc_status.get_adc_tile_state(tile.tile_id)
if tile_state < 15:
err_msg = f"ADC tile {tile.tile_id} ({tile.tile_phy_id}) is NOT fully powered up! Stuck at Step {tile_state}: {MyRFdcType.POWER_ON_SEQUENCE_STEPS[tile_state]['State']} Description: {MyRFdcType.POWER_ON_SEQUENCE_STEPS[tile_state]['Description']}"
if 6 <= tile_state <= 10 and self.debug_mode:
logging.info(
self.rfdc_status.get_adc_clk_dist(tile.tile_id))
raise Exception(err_msg)
logging.info(
f"ADC tile {tile.tile_id} ({tile.tile_phy_id}) is fully powered up!")
def config_dac_nco(self, tile, block_id, nco_freq_mhz):
"""Configure the NCO freq of a DAC block"""
assert nco_freq_mhz >= 0, f"DAC NCO frequency {nco_freq_mhz} shall be >= 0."
block_enabled = self.rfdc_status.get_dac_block_enb(
tile.tile_id, block_id)
if block_enabled:
block = tile.blocks[block_id]
block.MixerSettings['Freq'] = nco_freq_mhz
block.UpdateEvent(xrfdc.EVENT_MIXER)
logging.info(
f"DAC tile {tile.tile_id} DAC block {block_id} NCO frequency is set to {nco_freq_mhz} MHz!")
else:
logging.info(
f"DAC tile {tile.tile_id} DAC block {block_id} is NOT enabled!")
def config_dac_block(self, tile, block_id):
"""Configure a single DAC block within a tile."""
block_enabled = self.rfdc_status.get_dac_block_enb(
tile.tile_id, block_id)
if block_enabled:
block = tile.blocks[block_id]
block.NyquistZone = self.rfdc_cfg.dac_block_cfg['NyquistZone']
block.MixerSettings = self.rfdc_cfg.dac_block_mixer_cfg
block.InterpolationFactor = self.rfdc_cfg.dac_block_cfg['InterpolationFactor']
block.UpdateEvent(self.rfdc_cfg.dac_block_cfg['UpdateEvent'])
block.QMCSettings = {'EnablePhase': 0, 'EnableGain': 0, 'GainCorrectionFactor': 0.0,
'PhaseCorrectionFactor': 0.0, 'OffsetCorrectionFactor': 0, 'EventSource': xrfdc.EVNT_SRC_IMMEDIATE}
block.InvSincFIR = self.rfdc_cfg.dac_block_cfg['InvSincFIR']
logging.info(
f"DAC tile {tile.tile_id} DAC block {block_id} is enabled!")
else:
logging.info(
f"DAC tile {tile.tile_id} DAC block {block_id} is NOT enabled!")
def config_dac_blocks(self, tile):
"""Configure all DAC blocks within a tile."""
for block_id in range(len(tile.blocks)):
self.config_dac_block(tile, block_id)
def config_adc_nco(self, tile, block_id, nco_freq_mhz):
"""Configure the NCO freq of an ADC block"""
assert nco_freq_mhz <= 0, f"ADC NCO frequency {nco_freq_mhz} shall be <= 0 "
block_enabled = self.rfdc_status.get_adc_block_enb(
tile.tile_id, block_id)
block = tile.blocks[block_id]
if block_enabled:
block.MixerSettings['Freq'] = nco_freq_mhz
block.UpdateEvent(xrfdc.EVENT_MIXER)
logging.info(
f"ADC tile {tile.tile_id} ADC block {block_id} NCO frequency is set to {nco_freq_mhz} MHz!")
else:
logging.info(
f"ADC tile {tile.tile_id} ADC block {block_id} is NOT enabled!")
def config_adc_block(self, tile, block_id):
"""Configure a single ADC block within a tile."""
block_enabled = self.rfdc_status.get_adc_block_enb(
tile.tile_id, block_id)
if block_enabled:
block = tile.blocks[block_id]
block.NyquistZone = self.rfdc_cfg.adc_block_cfg['NyquistZone']
block.MixerSettings = self.rfdc_cfg.adc_block_mixer_cfg
block.DecimationFactor = self.rfdc_cfg.adc_block_cfg['DecimationFactor']
block.UpdateEvent(self.rfdc_cfg.adc_block_cfg['UpdateEvent'])
block.QMCSettings = {'EnablePhase': 0, 'EnableGain': 0, 'GainCorrectionFactor': 0.0,
'PhaseCorrectionFactor': 0.0, 'OffsetCorrectionFactor': 0, 'EventSource': xrfdc.EVNT_SRC_TILE}
logging.info(
f"ADC tile {tile.tile_id} ADC block {block_id} is enabled!")
else:
logging.info(
f"ADC tile {tile.tile_id} ADC block {block_id} is NOT enabled!")
def config_adc_blocks(self, tile):
"""Configure all ADC blocks within a tile."""
for block_id in range(len(tile.blocks)):
self.config_adc_block(tile, block_id)