Skip to content

MIMO Detection

rfsoc_rfdc.dsp.mimo_detection

Classes

MIMODetection

Bases: Detection

Source code in rfsoc_rfdc/dsp/mimo_detection.py
class MIMODetection(Detection):

    def __init__(self, sample_rate=1e9, tx_num=2, rx_num=2, MCS=0, debug_mode=False):
        super().__init__(sample_rate=sample_rate, debug_mode=debug_mode)
        # Initialize with dimension
        self.MCS = MCS
        # Tx/Rx MIMO dimension
        self.tx_num = tx_num
        self.rx_num = rx_num
        # Tx/Rx file name
        self.txFile = f"waveTx_{self.tx_num}_{self.MCS}.mat"
        self.rxFile = f"waveRx_{self.rx_num}_{self.MCS}.mat"
        # Initialize processing variables
        self.packet_len = None
        self.pad_len = None

    def proc_tx(self, packet_tx_list):
        """
        Process transmission for MIMO.
        packet_tx_list: shape (tx_num, packet_len)
        """
        # Update vars
        self.packet_len = packet_tx_list.shape[1]
        self.pad_len = 10000

        head_len = sum(self.zadoff_set) * 3

        # Generate Preamble
        head_tx = np.zeros(head_len, dtype=np.complex64)
        offset = 0
        for zadoff_len in self.zadoff_set:
            zadoff_single = np.exp(
                1j * 2 * np.pi * np.random.rand(1, zadoff_len))
            zadoff_double = np.tile(zadoff_single, 2)
            head_tx[offset:offset + 2 * zadoff_len] = zadoff_double
            offset += 3 * zadoff_len

        pad_tx = np.zeros(self.pad_len, dtype=np.complex64)

        wave_tx_list = []
        for tx_idx in range(self.tx_num):
            packet_tx = packet_tx_list[tx_idx, :]
            self._check_saturation(packet_tx)

            wave_tx = np.concatenate(
                (pad_tx, head_tx, packet_tx, pad_tx), axis=0)
            wave_tx_list.append(wave_tx)

        return np.vstack(wave_tx_list)

    def proc_rx(self, wave_rx):
        """Detect preamble and extract packets from multiple Rx channels"""

        if self.packet_len is None:
            # Try to infer or warn if packet_len is not set
            logging.warning(
                "MIMODetection: packet_len not set. Ensure proc_tx was called or set manually.")
            return None, None, None

        packet_rx_list = np.zeros(
            (self.rx_num, self.packet_len), dtype=np.complex64)
        snr_list = np.full(self.rx_num, np.nan)
        cfo_list = np.full(self.rx_num, np.nan)

        # Path to the default plot generated by Detection.proc_rx
        default_plot_name = ZCU216_CONFIG['CONFIG_NAME'] + '_detection.png'
        default_plot_path = self.logger.get_file_path(default_plot_name)

        for rx_idx in range(self.rx_num):
            wave_stream = wave_rx[rx_idx, :]

            # Delegate to parent class for detection, CFO, extraction, and SNR
            # This will also generate a plot at default_plot_path
            packet_rx, snr, cfo = super().proc_rx(wave_stream)

            if packet_rx is not None:
                packet_rx_list[rx_idx, :] = packet_rx
                snr_list[rx_idx] = snr
                cfo_list[rx_idx] = cfo

                # Rename the plot generated by parent class to channel-specific name
                if os.path.exists(default_plot_path):
                    fprefix = f"{ZCU216_CONFIG['CONFIG_NAME']}_CH{rx_idx}_MCS{self.MCS}"
                    target_name = fprefix + "_detection.png"
                    target_path = self.logger.get_file_path(target_name)
                    shutil.move(default_plot_path, target_path)

                    # Save log (MIMO specific requirement)
                    log_file = self.logger.get_file_path(fprefix + "_res.log")
                    with open(log_file, 'w') as f:
                        f.write(f"{snr:.3f}, {cfo:.3f}")
            else:
                if self.debug_mode:
                    logging.warning(f"CH #{rx_idx}: Failed to detect packet")

        return packet_rx_list, snr_list, cfo_list
Functions
proc_tx(packet_tx_list)

Process transmission for MIMO. packet_tx_list: shape (tx_num, packet_len)

Source code in rfsoc_rfdc/dsp/mimo_detection.py
def proc_tx(self, packet_tx_list):
    """
    Process transmission for MIMO.
    packet_tx_list: shape (tx_num, packet_len)
    """
    # Update vars
    self.packet_len = packet_tx_list.shape[1]
    self.pad_len = 10000

    head_len = sum(self.zadoff_set) * 3

    # Generate Preamble
    head_tx = np.zeros(head_len, dtype=np.complex64)
    offset = 0
    for zadoff_len in self.zadoff_set:
        zadoff_single = np.exp(
            1j * 2 * np.pi * np.random.rand(1, zadoff_len))
        zadoff_double = np.tile(zadoff_single, 2)
        head_tx[offset:offset + 2 * zadoff_len] = zadoff_double
        offset += 3 * zadoff_len

    pad_tx = np.zeros(self.pad_len, dtype=np.complex64)

    wave_tx_list = []
    for tx_idx in range(self.tx_num):
        packet_tx = packet_tx_list[tx_idx, :]
        self._check_saturation(packet_tx)

        wave_tx = np.concatenate(
            (pad_tx, head_tx, packet_tx, pad_tx), axis=0)
        wave_tx_list.append(wave_tx)

    return np.vstack(wave_tx_list)
proc_rx(wave_rx)

Detect preamble and extract packets from multiple Rx channels

Source code in rfsoc_rfdc/dsp/mimo_detection.py
def proc_rx(self, wave_rx):
    """Detect preamble and extract packets from multiple Rx channels"""

    if self.packet_len is None:
        # Try to infer or warn if packet_len is not set
        logging.warning(
            "MIMODetection: packet_len not set. Ensure proc_tx was called or set manually.")
        return None, None, None

    packet_rx_list = np.zeros(
        (self.rx_num, self.packet_len), dtype=np.complex64)
    snr_list = np.full(self.rx_num, np.nan)
    cfo_list = np.full(self.rx_num, np.nan)

    # Path to the default plot generated by Detection.proc_rx
    default_plot_name = ZCU216_CONFIG['CONFIG_NAME'] + '_detection.png'
    default_plot_path = self.logger.get_file_path(default_plot_name)

    for rx_idx in range(self.rx_num):
        wave_stream = wave_rx[rx_idx, :]

        # Delegate to parent class for detection, CFO, extraction, and SNR
        # This will also generate a plot at default_plot_path
        packet_rx, snr, cfo = super().proc_rx(wave_stream)

        if packet_rx is not None:
            packet_rx_list[rx_idx, :] = packet_rx
            snr_list[rx_idx] = snr
            cfo_list[rx_idx] = cfo

            # Rename the plot generated by parent class to channel-specific name
            if os.path.exists(default_plot_path):
                fprefix = f"{ZCU216_CONFIG['CONFIG_NAME']}_CH{rx_idx}_MCS{self.MCS}"
                target_name = fprefix + "_detection.png"
                target_path = self.logger.get_file_path(target_name)
                shutil.move(default_plot_path, target_path)

                # Save log (MIMO specific requirement)
                log_file = self.logger.get_file_path(fprefix + "_res.log")
                with open(log_file, 'w') as f:
                    f.write(f"{snr:.3f}, {cfo:.3f}")
        else:
            if self.debug_mode:
                logging.warning(f"CH #{rx_idx}: Failed to detect packet")

    return packet_rx_list, snr_list, cfo_list