class MIMODetection(Detection):
def __init__(self, sample_rate=1e9, tx_num=2, rx_num=2, MCS=0, debug_mode=False):
super().__init__(sample_rate=sample_rate, debug_mode=debug_mode)
# Initialize with dimension
self.MCS = MCS
# Tx/Rx MIMO dimension
self.tx_num = tx_num
self.rx_num = rx_num
# Tx/Rx file name
self.txFile = f"waveTx_{self.tx_num}_{self.MCS}.mat"
self.rxFile = f"waveRx_{self.rx_num}_{self.MCS}.mat"
# Initialize processing variables
self.packet_len = None
self.pad_len = None
def proc_tx(self, packet_tx_list):
"""
Process transmission for MIMO.
packet_tx_list: shape (tx_num, packet_len)
"""
# Update vars
self.packet_len = packet_tx_list.shape[1]
self.pad_len = 10000
head_len = sum(self.zadoff_set) * 3
# Generate Preamble
head_tx = np.zeros(head_len, dtype=np.complex64)
offset = 0
for zadoff_len in self.zadoff_set:
zadoff_single = np.exp(
1j * 2 * np.pi * np.random.rand(1, zadoff_len))
zadoff_double = np.tile(zadoff_single, 2)
head_tx[offset:offset + 2 * zadoff_len] = zadoff_double
offset += 3 * zadoff_len
pad_tx = np.zeros(self.pad_len, dtype=np.complex64)
wave_tx_list = []
for tx_idx in range(self.tx_num):
packet_tx = packet_tx_list[tx_idx, :]
self._check_saturation(packet_tx)
wave_tx = np.concatenate(
(pad_tx, head_tx, packet_tx, pad_tx), axis=0)
wave_tx_list.append(wave_tx)
return np.vstack(wave_tx_list)
def proc_rx(self, wave_rx):
"""Detect preamble and extract packets from multiple Rx channels"""
if self.packet_len is None:
# Try to infer or warn if packet_len is not set
logging.warning(
"MIMODetection: packet_len not set. Ensure proc_tx was called or set manually.")
return None, None, None
packet_rx_list = np.zeros(
(self.rx_num, self.packet_len), dtype=np.complex64)
snr_list = np.full(self.rx_num, np.nan)
cfo_list = np.full(self.rx_num, np.nan)
# Path to the default plot generated by Detection.proc_rx
default_plot_name = ZCU216_CONFIG['CONFIG_NAME'] + '_detection.png'
default_plot_path = self.logger.get_file_path(default_plot_name)
for rx_idx in range(self.rx_num):
wave_stream = wave_rx[rx_idx, :]
# Delegate to parent class for detection, CFO, extraction, and SNR
# This will also generate a plot at default_plot_path
packet_rx, snr, cfo = super().proc_rx(wave_stream)
if packet_rx is not None:
packet_rx_list[rx_idx, :] = packet_rx
snr_list[rx_idx] = snr
cfo_list[rx_idx] = cfo
# Rename the plot generated by parent class to channel-specific name
if os.path.exists(default_plot_path):
fprefix = f"{ZCU216_CONFIG['CONFIG_NAME']}_CH{rx_idx}_MCS{self.MCS}"
target_name = fprefix + "_detection.png"
target_path = self.logger.get_file_path(target_name)
shutil.move(default_plot_path, target_path)
# Save log (MIMO specific requirement)
log_file = self.logger.get_file_path(fprefix + "_res.log")
with open(log_file, 'w') as f:
f.write(f"{snr:.3f}, {cfo:.3f}")
else:
if self.debug_mode:
logging.warning(f"CH #{rx_idx}: Failed to detect packet")
return packet_rx_list, snr_list, cfo_list