Skip to content

Detection

rfsoc_rfdc.dsp.detection

Classes

Detection

Source code in rfsoc_rfdc/dsp/detection.py
class Detection:

    def __init__(self, sample_rate=1e9, debug_mode=False):
        super(Detection, self).__init__()
        self.sample_rate = sample_rate
        self.debug_mode = debug_mode

        # Access the logger from global config
        self.logger = ZCU216_CONFIG['LOGGER']

        # Initialize file paths using logger
        self.txFile = self.logger.get_file_path('Tx.npy')
        self.txMatVarKey = "waveTx"
        self.rxFile = self.logger.get_file_path('Rx.npy')
        self.rxMatVarKey = "waveRx"

        # DSP-related
        self.zadoff_set = [139, 839]  # ascent

    def _check_saturation(self, packet, threshold=1):
        """Check if the packet is saturated."""
        ratio = np.sum(np.abs(packet) > threshold) / np.shape(packet)[0]
        if ratio > 0:
            logging.info(f'Warning: Packet Saturated for {ratio*100:.2f}%')

    def _zadoff_detection(self, wave, win_len, delta_len, threshold):
        """Perform Zadoff-Chu sequence detection using efficient vectorized approach."""
        wave_len = len(wave)

        # Check if waveform is long enough
        if wave_len < delta_len + win_len:
            raise ValueError(f"Waveform too short for detection: "
                             f"got {wave_len} samples, need at least {delta_len + win_len}")

        # Compute correlation using vectorized sliding window
        num_positions = wave_len - delta_len - win_len + 1
        corr_all = self._compute_sliding_correlation(
            wave, win_len, delta_len, num_positions)

        # Print statistics
        if self.debug_mode:
            logging.info(f"Correlation stats:")
            logging.info(
                f"  Min: {np.min(corr_all):.4f}, Max: {np.max(corr_all):.4f}")
            logging.info(
                f"  Mean: {np.mean(corr_all):.4f}, Median: {np.median(corr_all):.4f}")
            logging.info(
                f"  Values above threshold ({threshold}): {np.sum(corr_all > threshold)}")

        # Find peaks
        offset_list, corr_list = self._find_peaks(
            corr_all, threshold, neighbor=10)

        if self.debug_mode:
            # Plot correlation
            self._plot_correlation(corr_all, threshold)
            logging.info(
                f"  Found {len(offset_list)} peaks above threshold {threshold}")
            if offset_list:
                logging.info(f"  Peak offsets: {offset_list[:10]}...")
                logging.info(
                    f"  Peak correlations: {[f'{c:.3f}' for c in corr_list[:10]]}...")

        return offset_list, corr_list

    def _compute_sliding_correlation(self, wave, win_len, delta_len, num_positions):
        """Compute normalized correlation for all window positions using Numba-accelerated parallel processing."""
        if self.debug_mode:
            accel_type = "Numba (parallel)" if NUMBA_AVAILABLE else "Python (no acceleration)"
            logging.info(
                f"Computing correlation for {num_positions} positions using {accel_type}")

        # Call the JIT-compiled parallel function with progress indicator
        with tqdm(total=num_positions, desc="Computing correlations", unit="pos", leave=False) as pbar:
            corr_all = _compute_correlation_numba(
                wave, win_len, delta_len, num_positions)
            pbar.update(num_positions)  # Update to completion

        return corr_all

    def _normalize_correlation(self, product, sum1, sum2, energy1, energy2, win_len):
        """Compute normalized correlation coefficient."""
        mean_product = sum1 * np.conj(sum2) / win_len
        var1 = energy1 - np.abs(sum1)**2 / win_len
        var2 = energy2 - np.abs(sum2)**2 / win_len

        # Avoid division by zero
        denominator = np.sqrt(var1 * var2)
        if denominator < 1e-10:
            return 0.0

        return np.abs(product - mean_product) / denominator

    def _find_peaks(self, corr_all, threshold, neighbor=10):
        """Find local maxima above threshold."""
        offset_list = []
        corr_list = []
        corr_len = len(corr_all)

        for offset in range(corr_len):
            corr = corr_all[offset]

            # Skip if below threshold
            if corr <= threshold:
                continue

            # Check if local maximum
            start = max(0, offset - neighbor)
            end = min(corr_len, offset + neighbor + 1)

            # Must be strictly greater than all neighbors
            is_peak = True
            if start < offset and corr <= np.max(corr_all[start:offset]):
                is_peak = False
            if end > offset + 1 and corr < np.max(corr_all[offset + 1:end]):
                is_peak = False

            if is_peak:
                offset_list.append(offset)
                corr_list.append(corr)

        return offset_list, corr_list

    def _plot_correlation(self, corr_all, threshold):
        """Plot correlation values."""
        plt.figure(figsize=(12, 4))
        plt.plot(corr_all, linewidth=0.5)
        plt.axhline(y=threshold, color='r', linestyle='--',
                    linewidth=2, label=f'Threshold={threshold}')
        plt.title(f"Zadoff-Chu Correlation")
        plt.xlabel("Offset (samples)")
        plt.ylabel("Normalized Correlation")
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.tight_layout()

        filename = f"correlation_plot.png"
        file_path = self.logger.get_file_path(filename)
        plt.savefig(file_path, dpi=150)
        plt.show()
        plt.close()

        logging.info(f"Saved correlation plot to {file_path}")

    def _get_energy(self, wave):
        """Calculate the energy of the wave."""
        wave_cal = wave  # - np.mean(wave)
        energy = np.mean(np.abs(wave_cal)**2)
        return energy

    def proc_tx(self, packet_tx):
        """Pad preamble to Tx packet"""
        self._check_saturation(packet_tx)
        self.packet_len = np.shape(packet_tx)[0]
        # int(max(1e-3*self.sample_rate, round(0.1*self.packet_len)))
        self.pad_len = 10000

        head_tx = np.zeros((sum(self.zadoff_set) * 3), dtype=np.complex64)
        offset = 0
        for zadoff_len in self.zadoff_set:
            zadoff_single = np.exp(
                1j * 2 * np.pi * np.random.rand(1, zadoff_len))
            zadoff_double = np.tile(zadoff_single, 2)
            head_tx[offset:offset + 2 * zadoff_len] = zadoff_double
            offset += 3 * zadoff_len
        pad_tx = np.zeros((self.pad_len))
        wave_tx = np.concatenate((pad_tx, head_tx, packet_tx, pad_tx), axis=0)

        return wave_tx

    def proc_rx(self, wave_rx):
        """Detect preamble received packet"""
        cap_num = 3
        noise_num = 1000
        head_len = sum(self.zadoff_set) * 3
        wave_len = 2 * self.pad_len + head_len + self.packet_len

        # Extract the last portion of the waveform for detection
        # Use cap_num windows to search for repeating patterns
        search_len = min(len(wave_rx), cap_num * wave_len)
        wave_rx_search = wave_rx[-search_len:]

        if self.debug_mode:
            # Visualize the search window
            plt.figure(figsize=(14, 5))
            time_axis = np.arange(len(wave_rx_search))
            plt.subplot(2, 1, 1)
            plt.plot(time_axis, np.real(wave_rx_search),
                     label='I (Real)', alpha=0.7)
            plt.plot(time_axis, np.imag(wave_rx_search),
                     label='Q (Imaginary)', alpha=0.7)
            plt.xlabel('Sample Index')
            plt.ylabel('ADC Reading')
            plt.title(
                f'Search Window - Time Domain (I/Q components, {len(wave_rx_search)} samples)')
            plt.legend()
            plt.grid(True, alpha=0.3)

            plt.subplot(2, 1, 2)
            magnitude_db = 20 * np.log10(np.abs(wave_rx_search) + 1e-10)
            plt.plot(time_axis, magnitude_db)
            plt.xlabel('Sample Index')
            plt.ylabel('Magnitude (dB)')
            plt.title('Search Window - Magnitude')
            plt.grid(True, alpha=0.3)
            plt.ylim(bottom=0, top=100)

            plt.tight_layout()
            search_window_file = 'search_window_visualization.png'
            file_path = self.logger.get_file_path(search_window_file)
            plt.savefig(file_path)
            plt.show()
            plt.close()
            logging.info(f"Saved correlation plot to {file_path}")

        offset_list, corr_list = self._zadoff_detection(
            wave_rx_search, self.zadoff_set[-1], self.zadoff_set[-1], 0.7)

        if not offset_list:
            logging.error("Failed to detect peaks in waveform")
            return None, np.nan, np.nan

        # Use the strongest peak
        offset_idx = np.argmax(np.array(corr_list))
        detected_offset = offset_list[offset_idx]

        if self.debug_mode:
            logging.info(
                f"  Strongest peak found at offset {detected_offset} with correlation {corr_list[offset_idx]:.4f}")

        offset_zadoff = detected_offset - 3 * sum(self.zadoff_set[:-1])
        offset_packet = offset_zadoff + head_len

        if offset_zadoff < 0:
            logging.warning(f"Calculated preamble start is negative ({offset_zadoff}). "
                            f"Preamble may be truncated or detection is incorrect.")
            logging.info(f"Using detected offset directly: {detected_offset}")
            offset_zadoff = detected_offset
            offset_packet = offset_zadoff + head_len

        if self.debug_mode:
            logging.info(f"  Preamble starts at offset {offset_zadoff}")
            logging.info(f"  Packet data starts at offset {offset_packet}")

        cfo_set = []
        offset = offset_zadoff

        # Ensure we have enough samples for CFO calculation
        if offset_packet + self.packet_len > len(wave_rx_search):
            logging.error(f"Packet extends beyond search window: "
                          f"packet ends at {offset_packet + self.packet_len}, "
                          f"but search window is only {len(wave_rx_search)} samples")
            return None, np.nan, np.nan

        for idx, zadoff_len in enumerate(self.zadoff_set):
            # Check bounds
            if offset + 2 * zadoff_len > len(wave_rx_search):
                if self.debug_mode:
                    logging.info(
                        f"Warning: Not enough samples for Zadoff sequence {idx+1} CFO calculation")
                break

            zadoff_1 = wave_rx_search[offset:offset + zadoff_len]
            zadoff_2 = wave_rx_search[offset +
                                      zadoff_len:offset + 2 * zadoff_len]

            cfo_temp = (-self.sample_rate / zadoff_len *
                        np.angle(np.sum(zadoff_1 * np.conj(zadoff_2))) / 2 / np.pi)
            cfo_set.append(cfo_temp)

            # Apply CFO correction to this section of the preamble
            section_len = 3 * zadoff_len
            if offset + section_len <= len(wave_rx_search):
                wave_rx_search[offset:offset + section_len] = (
                    wave_rx_search[offset:offset + section_len] *
                    np.exp(-1j * 2 * np.pi * np.arange(section_len) /
                           self.sample_rate * cfo_temp)
                )

            offset += 3 * zadoff_len

            if self.debug_mode:
                logging.info(
                    f"Zadoff sequence {idx+1}/{len(self.zadoff_set)}: CFO = {cfo_temp:.2f} Hz")

        cfo = 0  # This line turn off CFO
        # cfo = sum(cfo_set)

        # Extract the packet from the search window
        packet_rx = wave_rx_search[offset_packet:offset_packet +
                                   self.packet_len]

        # Apply additional CFO correction to the packet if needed
        packet_rx = packet_rx * \
            np.exp(-1j * 2 * np.pi * np.arange(self.packet_len) /
                   self.sample_rate * cfo)

        noise_list = []
        iterator = range(noise_num)
        iterator = tqdm(
            iterator, desc="Computing noise samples", unit="samples", leave=False)

        for noise_idx in iterator:
            start_idx = round(len(wave_rx_search) / noise_num * noise_idx)
            end_idx = round(len(wave_rx_search) / noise_num *
                            noise_idx) + int(np.ceil(100))
            if end_idx > len(wave_rx_search):
                continue
            noise_sym = wave_rx_search[start_idx:end_idx]
            noise = self._get_energy(noise_sym)
            noise_list.append(noise)

        noise = np.percentile(noise_list, 10)
        signal = self._get_energy(packet_rx) - noise
        snr = 10 * np.log10(signal / noise)

        logging.info(
            f"SNR: {snr:.2f} dB, Signal: {10*np.log10(signal):.2f} dB, Noise: {10*np.log10(noise):.2f} dB")

        plt.ioff()
        fig, ax = plt.subplots(figsize=(12, 6))
        ax.plot(np.arange(len(wave_rx_search)), 20 *
                np.log10(np.abs(wave_rx_search) + 1e-10))
        ax.axvline(offset_zadoff, color='g', linestyle='--', linewidth=2,
                   label=f'Preamble start (offset={offset_zadoff})')
        ax.axvline(offset_packet, color='b', linestyle='--', linewidth=2,
                   label=f'Packet start (offset={offset_packet})')
        ax.axvline(offset_packet + self.packet_len, color='r', linestyle='--', linewidth=2,
                   label=f'Packet end (offset={offset_packet + self.packet_len})')
        ax.set_ylim(bottom=0, top=100)
        ax.set_xlabel('Sample Index')
        ax.set_ylabel('Magnitude (dB)')
        ax.set_title(
            f'SNR: {snr:.2f}dB Signal: {10*np.log10(signal):.2f} Noise: {10*np.log10(noise):.2f} CFO:{cfo:.2f}Hz')
        ax.legend()
        ax.grid(True)

        detection_file = self.logger.get_file_path(
            ZCU216_CONFIG['CONFIG_NAME']+'_detection.png')
        fig.savefig(detection_file)
        plt.close()
        if self.debug_mode:
            logging.info(f"  SNR plot saved to {detection_file}")

        # Store internal attributes for child classes to access
        self._last_offset_packet = offset_packet
        self._last_search_len = search_len

        return packet_rx, snr, cfo
Functions
proc_tx(packet_tx)

Pad preamble to Tx packet

Source code in rfsoc_rfdc/dsp/detection.py
def proc_tx(self, packet_tx):
    """Pad preamble to Tx packet"""
    self._check_saturation(packet_tx)
    self.packet_len = np.shape(packet_tx)[0]
    # int(max(1e-3*self.sample_rate, round(0.1*self.packet_len)))
    self.pad_len = 10000

    head_tx = np.zeros((sum(self.zadoff_set) * 3), dtype=np.complex64)
    offset = 0
    for zadoff_len in self.zadoff_set:
        zadoff_single = np.exp(
            1j * 2 * np.pi * np.random.rand(1, zadoff_len))
        zadoff_double = np.tile(zadoff_single, 2)
        head_tx[offset:offset + 2 * zadoff_len] = zadoff_double
        offset += 3 * zadoff_len
    pad_tx = np.zeros((self.pad_len))
    wave_tx = np.concatenate((pad_tx, head_tx, packet_tx, pad_tx), axis=0)

    return wave_tx
proc_rx(wave_rx)

Detect preamble received packet

Source code in rfsoc_rfdc/dsp/detection.py
def proc_rx(self, wave_rx):
    """Detect preamble received packet"""
    cap_num = 3
    noise_num = 1000
    head_len = sum(self.zadoff_set) * 3
    wave_len = 2 * self.pad_len + head_len + self.packet_len

    # Extract the last portion of the waveform for detection
    # Use cap_num windows to search for repeating patterns
    search_len = min(len(wave_rx), cap_num * wave_len)
    wave_rx_search = wave_rx[-search_len:]

    if self.debug_mode:
        # Visualize the search window
        plt.figure(figsize=(14, 5))
        time_axis = np.arange(len(wave_rx_search))
        plt.subplot(2, 1, 1)
        plt.plot(time_axis, np.real(wave_rx_search),
                 label='I (Real)', alpha=0.7)
        plt.plot(time_axis, np.imag(wave_rx_search),
                 label='Q (Imaginary)', alpha=0.7)
        plt.xlabel('Sample Index')
        plt.ylabel('ADC Reading')
        plt.title(
            f'Search Window - Time Domain (I/Q components, {len(wave_rx_search)} samples)')
        plt.legend()
        plt.grid(True, alpha=0.3)

        plt.subplot(2, 1, 2)
        magnitude_db = 20 * np.log10(np.abs(wave_rx_search) + 1e-10)
        plt.plot(time_axis, magnitude_db)
        plt.xlabel('Sample Index')
        plt.ylabel('Magnitude (dB)')
        plt.title('Search Window - Magnitude')
        plt.grid(True, alpha=0.3)
        plt.ylim(bottom=0, top=100)

        plt.tight_layout()
        search_window_file = 'search_window_visualization.png'
        file_path = self.logger.get_file_path(search_window_file)
        plt.savefig(file_path)
        plt.show()
        plt.close()
        logging.info(f"Saved correlation plot to {file_path}")

    offset_list, corr_list = self._zadoff_detection(
        wave_rx_search, self.zadoff_set[-1], self.zadoff_set[-1], 0.7)

    if not offset_list:
        logging.error("Failed to detect peaks in waveform")
        return None, np.nan, np.nan

    # Use the strongest peak
    offset_idx = np.argmax(np.array(corr_list))
    detected_offset = offset_list[offset_idx]

    if self.debug_mode:
        logging.info(
            f"  Strongest peak found at offset {detected_offset} with correlation {corr_list[offset_idx]:.4f}")

    offset_zadoff = detected_offset - 3 * sum(self.zadoff_set[:-1])
    offset_packet = offset_zadoff + head_len

    if offset_zadoff < 0:
        logging.warning(f"Calculated preamble start is negative ({offset_zadoff}). "
                        f"Preamble may be truncated or detection is incorrect.")
        logging.info(f"Using detected offset directly: {detected_offset}")
        offset_zadoff = detected_offset
        offset_packet = offset_zadoff + head_len

    if self.debug_mode:
        logging.info(f"  Preamble starts at offset {offset_zadoff}")
        logging.info(f"  Packet data starts at offset {offset_packet}")

    cfo_set = []
    offset = offset_zadoff

    # Ensure we have enough samples for CFO calculation
    if offset_packet + self.packet_len > len(wave_rx_search):
        logging.error(f"Packet extends beyond search window: "
                      f"packet ends at {offset_packet + self.packet_len}, "
                      f"but search window is only {len(wave_rx_search)} samples")
        return None, np.nan, np.nan

    for idx, zadoff_len in enumerate(self.zadoff_set):
        # Check bounds
        if offset + 2 * zadoff_len > len(wave_rx_search):
            if self.debug_mode:
                logging.info(
                    f"Warning: Not enough samples for Zadoff sequence {idx+1} CFO calculation")
            break

        zadoff_1 = wave_rx_search[offset:offset + zadoff_len]
        zadoff_2 = wave_rx_search[offset +
                                  zadoff_len:offset + 2 * zadoff_len]

        cfo_temp = (-self.sample_rate / zadoff_len *
                    np.angle(np.sum(zadoff_1 * np.conj(zadoff_2))) / 2 / np.pi)
        cfo_set.append(cfo_temp)

        # Apply CFO correction to this section of the preamble
        section_len = 3 * zadoff_len
        if offset + section_len <= len(wave_rx_search):
            wave_rx_search[offset:offset + section_len] = (
                wave_rx_search[offset:offset + section_len] *
                np.exp(-1j * 2 * np.pi * np.arange(section_len) /
                       self.sample_rate * cfo_temp)
            )

        offset += 3 * zadoff_len

        if self.debug_mode:
            logging.info(
                f"Zadoff sequence {idx+1}/{len(self.zadoff_set)}: CFO = {cfo_temp:.2f} Hz")

    cfo = 0  # This line turn off CFO
    # cfo = sum(cfo_set)

    # Extract the packet from the search window
    packet_rx = wave_rx_search[offset_packet:offset_packet +
                               self.packet_len]

    # Apply additional CFO correction to the packet if needed
    packet_rx = packet_rx * \
        np.exp(-1j * 2 * np.pi * np.arange(self.packet_len) /
               self.sample_rate * cfo)

    noise_list = []
    iterator = range(noise_num)
    iterator = tqdm(
        iterator, desc="Computing noise samples", unit="samples", leave=False)

    for noise_idx in iterator:
        start_idx = round(len(wave_rx_search) / noise_num * noise_idx)
        end_idx = round(len(wave_rx_search) / noise_num *
                        noise_idx) + int(np.ceil(100))
        if end_idx > len(wave_rx_search):
            continue
        noise_sym = wave_rx_search[start_idx:end_idx]
        noise = self._get_energy(noise_sym)
        noise_list.append(noise)

    noise = np.percentile(noise_list, 10)
    signal = self._get_energy(packet_rx) - noise
    snr = 10 * np.log10(signal / noise)

    logging.info(
        f"SNR: {snr:.2f} dB, Signal: {10*np.log10(signal):.2f} dB, Noise: {10*np.log10(noise):.2f} dB")

    plt.ioff()
    fig, ax = plt.subplots(figsize=(12, 6))
    ax.plot(np.arange(len(wave_rx_search)), 20 *
            np.log10(np.abs(wave_rx_search) + 1e-10))
    ax.axvline(offset_zadoff, color='g', linestyle='--', linewidth=2,
               label=f'Preamble start (offset={offset_zadoff})')
    ax.axvline(offset_packet, color='b', linestyle='--', linewidth=2,
               label=f'Packet start (offset={offset_packet})')
    ax.axvline(offset_packet + self.packet_len, color='r', linestyle='--', linewidth=2,
               label=f'Packet end (offset={offset_packet + self.packet_len})')
    ax.set_ylim(bottom=0, top=100)
    ax.set_xlabel('Sample Index')
    ax.set_ylabel('Magnitude (dB)')
    ax.set_title(
        f'SNR: {snr:.2f}dB Signal: {10*np.log10(signal):.2f} Noise: {10*np.log10(noise):.2f} CFO:{cfo:.2f}Hz')
    ax.legend()
    ax.grid(True)

    detection_file = self.logger.get_file_path(
        ZCU216_CONFIG['CONFIG_NAME']+'_detection.png')
    fig.savefig(detection_file)
    plt.close()
    if self.debug_mode:
        logging.info(f"  SNR plot saved to {detection_file}")

    # Store internal attributes for child classes to access
    self._last_offset_packet = offset_packet
    self._last_search_len = search_len

    return packet_rx, snr, cfo