Skip to content

detection

Zadoff-Chu preamble detection, carrier frequency offset (CFO) estimation, and SNR calculation.

rfsoc_rfdc.dsp.detection

Attributes

ofdm = OFDM(sym_num=100, fft_size=64, sub_num=48, modu='16QAM', cp_rate=0.25) module-attribute

packet_tx = ofdm.generate() module-attribute

det = Detection(sample_rate=2500000000.0) module-attribute

wave_tx = det.proc_tx(packet_tx) module-attribute

wave_rx = np.load(det.rx_file) module-attribute

Classes

Detection(sample_rate=1000000000.0)

Source code in rfsoc_rfdc/dsp/detection.py
def __init__(self, sample_rate=1e9):
    super(Detection, self).__init__()
    # Check path to waveform
    file_dir = os.path.dirname(__file__)
    self.path2wave = os.path.join(
        os.path.dirname(file_dir), "../wave_files")
    if not os.path.exists(self.path2wave):
        os.mkdir(self.path2wave)
    # Tx/Rx file name
    self.txFile = os.path.join(self.path2wave, 'Tx.npy')
    self.txMatVarKey = "waveTx"
    self.rxFile = os.path.join(self.path2wave, 'Rx.npy')
    self.rxMatVarKey = "waveRx"

    self.sample_rate = sample_rate
    # DSP-related
    self.max_detect_trials = 1
    self.zadoff_set = [139, 839]  # ascent
Attributes
path2wave = os.path.join(os.path.dirname(file_dir), '../wave_files') instance-attribute
txFile = os.path.join(self.path2wave, 'Tx.npy') instance-attribute
txMatVarKey = 'waveTx' instance-attribute
rxFile = os.path.join(self.path2wave, 'Rx.npy') instance-attribute
rxMatVarKey = 'waveRx' instance-attribute
sample_rate = sample_rate instance-attribute
max_detect_trials = 1 instance-attribute
zadoff_set = [139, 839] instance-attribute
Functions
proc_tx(packet_tx)

Pad preamble to Tx packet

Source code in rfsoc_rfdc/dsp/detection.py
def proc_tx(self, packet_tx):
    """Pad preamble to Tx packet"""
    self._check_saturation(packet_tx)
    self.packet_len = np.shape(packet_tx)[0]
    # int(max(1e-3*self.sample_rate, round(0.1*self.packet_len)))
    self.pad_len = 10000

    head_tx = np.zeros((sum(self.zadoff_set) * 3), dtype=np.complex64)
    offset = 0
    for zadoff_len in self.zadoff_set:
        zadoff_single = np.exp(
            1j * 2 * np.pi * np.random.rand(1, zadoff_len))
        zadoff_double = np.tile(zadoff_single, 2)
        head_tx[offset:offset + 2 * zadoff_len] = zadoff_double
        offset += 3 * zadoff_len
    pad_tx = np.zeros((self.pad_len))
    wave_tx = np.concatenate((pad_tx, head_tx, packet_tx, pad_tx), axis=0)

    return wave_tx
proc_rx(wave_rx)

Detect preamble received packet

Source code in rfsoc_rfdc/dsp/detection.py
def proc_rx(self, wave_rx):
    """Detect preamble received packet"""
    cap_num = 3
    noise_num = 1000
    head_len = sum(self.zadoff_set) * 3
    wave_len = 2 * self.pad_len + head_len + self.packet_len

    trial = self.max_detect_trials
    while trial > 0:
        trial = trial - 1
        wave_rx = wave_rx[-cap_num * wave_len:]
        offset_list, corr_list = self._zadoff_detection(
            wave_rx[:wave_len], self.zadoff_set[-1], self.zadoff_set[-1], 0.7)
        offset_list_after, _ = self._zadoff_detection(
            wave_rx[wave_len:2 * wave_len], self.zadoff_set[-1], self.zadoff_set[-1], 0.7)
        if (not offset_list) or (not offset_list_after):
            continue
        offset_idx = np.argmax(np.array(corr_list))
        offset_zadoff = offset_list[offset_idx] - \
            3 * sum(self.zadoff_set[:-1])
        offset_packet = offset_zadoff + head_len
        if offset_zadoff <= 0:
            continue
        break

    if trial == -1:
        raise Exception(
            f"Failed to detect for more than {self.max_detect_trials} times")

    cfo_set = []
    offset = offset_zadoff
    for zadoff_len in self.zadoff_set:
        zadoff_1 = wave_rx[offset:offset + zadoff_len]
        zadoff_2 = wave_rx[offset + zadoff_len:offset + 2 * zadoff_len]

        cfo_temp = (-self.sample_rate / zadoff_len *
                    np.angle(np.sum(zadoff_1 * np.conj(zadoff_2))) / 2 / np.pi)
        cfo_set.append(cfo_temp)
        offset += 3 * zadoff_len

        wave_rx[offset_zadoff:offset_zadoff + head_len] = (
            wave_rx[offset_zadoff:offset_zadoff + head_len] *
            np.exp(-1j * 2 * np.pi * np.arange(head_len) /
                   self.sample_rate * cfo_temp)
        )

    cfo = 0  # This line turn off CFO
    # cfo = sum(cfo_set)
    packet_rx = wave_rx[offset_packet:offset_packet + self.packet_len]
    packet_rx = packet_rx * \
        np.exp(-1j * 2 * np.pi * np.arange(self.packet_len) /
               self.sample_rate * cfo)

    noise_list = []
    for noise_idx in range(noise_num):
        start_idx = round(cap_num * wave_len / noise_num * noise_idx)
        end_idx = round(cap_num * wave_len / noise_num *
                        noise_idx) + int(np.ceil(100))
        noise_sym = wave_rx[start_idx:end_idx]
        noise = self._get_energy(noise_sym)
        noise_list.append(noise)
    noise = np.percentile(noise_list, 10)
    signal = self._get_energy(packet_rx) - noise
    snr = 10 * np.log10(signal / noise)

    plt.ioff()
    fig, ax = plt.subplots()
    ax.plot(np.arange(cap_num * wave_len), 20 *
            np.log10(np.abs(wave_rx) + 1e-10))
    ax.vlines(offset_zadoff, ymin=-1e10, ymax=+1e10)
    ax.vlines(offset_packet, ymin=-1e10, ymax=+1e10)
    ax.vlines(offset_packet + self.packet_len, ymin=-1e10, ymax=+1e10)
    ax.set_ylim(bottom=0, top=100)
    ax.set_title(
        f'SNR: {snr:.2f}dB Signal: {10*np.log10(signal):.2f} Noise: {10*np.log10(noise):.2f} CFO:{cfo:.2f}Hz')

    detection_file = os.path.join(
        self.path2wave, ZCU216_CONFIG['CONFIG_NAME']+'_detection.png')
    fig.savefig(detection_file)
    plt.close()

    return packet_rx, snr, cfo