Skip to content

ofdm_analyer

rfsoc_rfdc.receiver.ofdm_analyer

Attributes

Classes

OFDMAnalyzer(ofdm_scheme, detect_scheme, server_config=('server.local', 1234))

Initialize RxAnalyzer with processing schemes and network configuration

Source code in rfsoc_rfdc/receiver/ofdm_analyer.py
def __init__(self, ofdm_scheme, detect_scheme, server_config=("server.local", 1234)):
    """Initialize RxAnalyzer with processing schemes and network configuration"""
    # Processing schemes
    self.ofdm_scheme = ofdm_scheme
    self.detect_scheme = detect_scheme

    self.time_wv_plotter = ComplexSignalPlotter(
        title="Waveform Plotter (Time)")

    # Initialize plotters
    self.time_pkt_plotter = ComplexSignalPlotter(
        title="Packet Plotter (Time)")
    dac_samp_rate = ZCU216_CONFIG['DACSampleRate'] / \
        ZCU216_CONFIG['DACInterpolationRate'] * 1e6
    self.fft_pkt_plotter = FFTPlotter(sample_rate=dac_samp_rate)

    # TCP socket setup
    self.server_config = server_config
    self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Attributes
ofdm_scheme = ofdm_scheme instance-attribute
detect_scheme = detect_scheme instance-attribute
time_wv_plotter = ComplexSignalPlotter(title='Waveform Plotter (Time)') instance-attribute
time_pkt_plotter = ComplexSignalPlotter(title='Packet Plotter (Time)') instance-attribute
fft_pkt_plotter = FFTPlotter(sample_rate=dac_samp_rate) instance-attribute
server_config = server_config instance-attribute
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) instance-attribute
Functions
__del__()

Clean up network resources

Source code in rfsoc_rfdc/receiver/ofdm_analyer.py
def __del__(self):
    """Clean up network resources"""
    self.sock.close()
data_logging_handler(iq_data, file_name)

Save IQ data to file

Source code in rfsoc_rfdc/receiver/ofdm_analyer.py
def data_logging_handler(self, iq_data, file_name):
    """Save IQ data to file"""
    np.save(file_name, iq_data)
tcp_reconnect(net_config)

Attempt to reconnect TCP socket

Source code in rfsoc_rfdc/receiver/ofdm_analyer.py
def tcp_reconnect(self, net_config):
    """Attempt to reconnect TCP socket"""
    new_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    new_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    try:
        new_sock.connect(net_config)
    except socket.error:
        return None
    return new_sock
tcp_pkt_handler(samples)

Send samples over TCP connection

Source code in rfsoc_rfdc/receiver/ofdm_analyer.py
def tcp_pkt_handler(self, samples):
    """Send samples over TCP connection"""
    samples = np.array(samples, dtype=np.csingle)
    try:
        byte_data = pickle.dumps(samples)
        self.sock.sendall(byte_data)
    except socket.error:
        for _ in range(5):
            s = self.tcp_reconnect(self.server_config)
            if s is not None:
                self.sock = s
                break
process_packet(pkt_iq_data)

Process and visualize packet data

Source code in rfsoc_rfdc/receiver/ofdm_analyer.py
def process_packet(self, pkt_iq_data):
    """Process and visualize packet data"""
    start_t = time.time_ns()

    thd_list = []
    # Time plot thread
    thd_list.append(threading.Thread(
        target=self.time_pkt_plotter.update_plot,
        args=(pkt_iq_data, 0.5)))
    # FFT plot thread
    thd_list.append(threading.Thread(
        target=self.fft_pkt_plotter.update_plot,
        args=(pkt_iq_data,)))
    # Save captured packet thread
    thd_list.append(threading.Thread(
        target=self.data_logging_handler,
        args=(pkt_iq_data, self.detect_scheme.rx_file)))
    # TCP samples thread (commented out by default)
    # thd_list.append(threading.Thread(
    #     target=self.tcp_pkt_handler,
    #     args=(pkt_iq_data,)))

    for thd in thd_list:
        thd.start()
    for thd in thd_list:
        thd.join()

    elapse = time.time_ns() - start_t
process_waveform(wave_iq_data)

Process and visualize waveform data

Source code in rfsoc_rfdc/receiver/ofdm_analyer.py
def process_waveform(self, wave_iq_data):
    """Process and visualize waveform data"""
    start_t = time.time_ns()

    thd_list = []
    # Time plot thread
    thd_list.append(threading.Thread(
        target=self.time_wv_plotter.update_plot,
        args=(wave_iq_data, 0.1)))
    # Save captured waveform thread
    thd_list.append(threading.Thread(
        target=self.data_logging_handler,
        args=(wave_iq_data, self.detect_scheme.path2wave + "/Rx_raw.npy")))

    for thd in thd_list:
        thd.start()
    for thd in thd_list:
        thd.join()

    elapse = time.time_ns() - start_t
analyze_packet(wave_rx)

Perform packet detection and analysis

Source code in rfsoc_rfdc/receiver/ofdm_analyer.py
def analyze_packet(self, wave_rx):
    """Perform packet detection and analysis"""

    try:
        packet_rx, snr, cfo = self.detect_scheme.proc_rx(wave_rx)
    except Exception:
        logging.error("Failed to detect Rx packet")
        return None

    config_name = ZCU216_CONFIG['CONFIG_NAME']
    evm, ber = self.ofdm_scheme.analyze(
        packet_rx,
        plot_fname=f"{self.detect_scheme.path2wave}/{config_name}_const_diagram.png",
        constel_data_fname=None,
        constel_map_fname=None
    )

    # Log results
    logging.info(
        f"SNR: {snr:.3f}, CFO: {cfo:.3f}, EVM: {evm:.3f}, BER: {ber:.10f}")

    # Write results to file
    with open(f"{self.detect_scheme.path2wave}/{config_name}_res.log", 'w') as f:
        f.write(f"{snr:.3f}, {cfo:.3f}, {evm:.3f}, {ber:.10f}")

    return packet_rx, snr, cfo, evm, ber