BladeRF: Two Board Synchronization with Two Element Beamforming

Here is a very simple set of two scripts to take two BladeRF boards and synchronize them using the same host system. It transmits a signal at one frequency and then scans around in frequency and beam space to find it. It reports the best magnitude of the correlation and the best beam angle. After that I introduce a modified script that searches for the TX beam angle/theta.

lib/bladeandnumpy.py

This is the utility library.

import bladerf
import numpy as np
import numpy.typing as npt

class BladeRFAndNumpy(bladerf.BladeRF):
    """A helper class that assists in converting the raw samples from the BladeRF
    card into a Numpy array of floating point numbers. *It currently does not support
    8-bit samples but could easily be modified to do so.*
    """
    def sample_as_null(
            self,
            samps: int,
            chan_cnt: int,
            samp_size: int,
            trash_samps: int = 1000
            ):
        samps = int(samps)
        samp_size = int(samp_size)
        chan_cnt = int(chan_cnt)
        trash_samps = int(trash_samps)

        debug_tail = 8
        total_bytes = \
            (trash_samps + samps) * (samp_size * chan_cnt) + debug_tail
        buf = bytes(total_bytes)

        self.sync_rx(
            buf,
            (trash_samps + samps) * chan_cnt,
            timeout_ms=5000
        )

        if chan_cnt == 2:
            return
        elif chan_cnt == 1:
            return
        else:
            raise Exception('unexpected channel count')

    def sample_as_f64(
            self,
            samps: int,
            chan_cnt: int,
            samp_size: int,
            trash_samps: int = 1000
            ):
        samps = int(samps)
        samp_size = int(samp_size)
        chan_cnt = int(chan_cnt)
        trash_samps = int(trash_samps)

        debug_tail = 8
        total_bytes = \
            (trash_samps + samps) * (samp_size * chan_cnt) + debug_tail
        buf = bytes(total_bytes)

        self.sync_rx(
            buf, 
            (trash_samps + samps) * chan_cnt,
            timeout_ms=5000
        )

        assert (buf[-debug_tail:] == b'\x00' * debug_tail)

        buf = buf[trash_samps * (samp_size * chan_cnt):-debug_tail]

        sbuf = np.ndarray((len(buf) // 2,), buffer=buf, dtype=np.int16)
        del buf

        sbuf = sbuf.astype(np.float64) / 2049

        if chan_cnt == 2:
            b0 = sbuf[0::2][0::2] + 1j * sbuf[1::2][0::2]
            b1 = sbuf[0::2][1::2] + 1j * sbuf[1::2][1::2]
            return b0, b1
        elif chan_cnt == 1:
            b0 = sbuf[0::2] + 1j * sbuf[1::2]
            return b0
        else:
            raise Exception('unexpected channel count')

    def sample_as_f32(
            self,
            samps: int,
            chan_cnt: int,
            samp_size: int,
            trash_samps: int = 1000
            ):
        samps = int(samps)
        samp_size = int(samp_size)
        chan_cnt = int(chan_cnt)
        trash_samps = int(trash_samps)

        debug_tail = 8
        total_bytes = \
            (trash_samps + samps) * (samp_size * chan_cnt) + debug_tail
        buf = bytes(total_bytes)

        self.sync_rx(
            buf, 
            (trash_samps + samps) * chan_cnt,
            timeout_ms=5000
        )

        assert (buf[-debug_tail:] == b'\x00' * debug_tail)

        buf = buf[trash_samps * (samp_size * chan_cnt):-debug_tail]

        sbuf = np.ndarray((len(buf) // 2,), buffer=buf, dtype=np.int16)
        del buf

        sbuf = sbuf.astype(np.float32) / 2049

        if chan_cnt == 2:
            b0 = sbuf[0::2][0::2] + 1j * sbuf[1::2][0::2]
            b1 = sbuf[0::2][1::2] + 1j * sbuf[1::2][1::2]
            return b0, b1
        elif chan_cnt == 1:
            b0 = sbuf[0::2] + 1j * sbuf[1::2]
            return b0
        else:
            raise Exception('unexpected channel count')

    def sample_as_f64_with_meta(
            self,
            samps: int,
            chan_cnt: int,
            samp_size: int,
            trash_samps: int,
            when_timestamp: int,
            ):
        samps = int(samps)
        samp_size = int(samp_size)
        chan_cnt = int(chan_cnt)
        trash_samps = int(trash_samps)

        debug_tail = 8
        total_bytes = \
            (trash_samps + samps) * (samp_size * chan_cnt) + debug_tail
        buf = bytes(total_bytes)

        print('meta_timestamp', when_timestamp, self.get_timestamp(bladerf.Direction.RX))
        print('samps', samps, 'chan_cnt', chan_cnt, 'trash_samps', trash_samps)
        print('(trash_samps + samps) * chan_cnt', (trash_samps + samps) * chan_cnt)

        status, actual_count, timestamp = self.sync_rx_with_metadata(
            buf, 
            (trash_samps + samps) * chan_cnt,
            meta_flags = 1 << 31,
            meta_timestamp = when_timestamp,
            timeout_ms=20000
        )

        assert (buf[-debug_tail:] == b'\x00' * debug_tail)

        buf = buf[trash_samps * (samp_size * chan_cnt):]

        sbuf = np.ndarray((len(buf) // 2,), buffer=buf, dtype=np.int16)
        del buf

        sbuf = sbuf.astype(np.float64) / 2049

        if chan_cnt == 2:
            b0 = sbuf[0::2][0::2] + 1j * sbuf[1::2][0::2]
            b1 = sbuf[0::2][1::2] + 1j * sbuf[1::2][1::2]
            return b0, b1
        elif chan_cnt == 1:
            b0 = sbuf[0::2] + 1j * sbuf[1::2]
            return b0
        else:
            raise Exception('unexpected channel count')

theta_test.py

This is the script that you run. It configures the two boards, sets the parameters, transmits a signal, and tries to find the signal with the other board. It then reports the search results on each iteration.

import argparse
import socket
import threading
import queue
import yaml
import pickle
import bladerf
from lib.bladeandnumpy import BladeRFAndNumpy

def tx_thread(dev, num_buffers, buffer_size, freq, lo_freq, theta, sps):
    dev.sync_config(
        bladerf.ChannelLayout.TX_X2,
        bladerf.Format.SC16_Q11,
        num_buffers=num_buffers,
        buffer_size=buffer_size,
        num_transfers=8,
        stream_timeout=20000
    )

    for ch in [1, 3]:
        dev.set_gain_mode(ch, bladerf.GainMode.Manual)
        dev.set_bias_tee(ch, False)
        dev.set_frequency(ch, freq)
        dev.set_bandwidth(ch, 200e3)
        dev.set_sample_rate(ch, sps)
        dev.set_gain(ch, 30)
        print('tx-gain', dev.get_gain(ch))
        dev.enable_module(ch, True)

    buffer_samps = int((num_buffers * buffer_size) // 8)

    assert sps % lo_freq == 0

    samps = int(sps / lo_freq)
    samps = samps * int(buffer_samps // samps)
    t = samps / sps

    tx_data0 = np.exp(1j * np.linspace(0, np.pi * 2 * lo_freq * t, samps))
    tx_data1 = tx_data0 * np.exp(1j * theta)

    tx_data0 /= np.max(np.abs(tx_data0))
    tx_data1 /= np.max(np.abs(tx_data1))
    tx_data0 *= 2000
    tx_data1 *= 2000

    tx_data2 = np.zeros((len(tx_data0) * 4), np.int16)
    tx_data2[0::4] = tx_data0.real
    tx_data2[1::4] = tx_data0.imag
    tx_data2[2::4] = 0 #tx_data1.real
    tx_data2[3::4] = 0 #tx_data1.imag

    tx_data = tx_data2.tobytes()

    print('tx thread going')

    while True:
        dev.sync_tx(tx_data, int(len(tx_data) // 4))

def main():

    devs = []

    print('opening cards')
    for serial in ['9da', 'e85']:
        devs.append(BladeRFAndNumpy(f'libusb:serial={serial}'))

    sps = 1000000

    num_buffers = 16
    buffer_size = 1024 * 32
    buffer_samps = num_buffers * buffer_size

    print('configuring rx')
    devs[0].sync_config(
        bladerf.ChannelLayout.RX_X2,
        bladerf.Format.SC16_Q11,
        num_buffers=num_buffers,
        buffer_size=buffer_size,
        num_transfers=8,
        stream_timeout=20000
    )

    # (dev, num_buffers, buffer_size, freq, lo_freq, theta):

    freq = 1.83e9
    lo_freq = 5e3

    buffer_samps = int((num_buffers * buffer_size) // 8)

    tx_th = threading.Thread(
        target=tx_thread,
        args=(devs[1], num_buffers, buffer_size, freq, lo_freq, 0.0, sps),
        daemon=True
    )

    print('started tx thread')
    tx_th.start()
    time.sleep(2)

    print('setting rx params')
    for ch in [0, 2]:
        devs[0].set_gain_mode(ch, bladerf.GainMode.Manual)
        devs[0].set_bias_tee(ch, False)
        devs[0].set_frequency(ch, freq)
        devs[0].set_bandwidth(ch, 200e3)
        devs[0].set_sample_rate(ch, sps)
        devs[0].set_gain(ch, 60)
        devs[0].enable_module(ch, True)

    samps = int(sps / lo_freq)
    samps = samps * int(buffer_samps // samps)
    t = samps / sps

    while True:
        # the two streams from each antenna
        a, b = devs[0].sample_as_f64(samps, 2, 4, 0)

        mag_high = 0
        mag_theta = None
        mag_freq_off = None
        
        # search between -5khz and +5khz offset
        for freq_off in np.linspace(-5e3, 5e3, 125):
            _lo_freq = lo_freq + freq_off
            signal = np.exp(1j * np.linspace(0, np.pi * 2 * _lo_freq * t, samps))
            # search all possible, even redundant, angles
            for theta in np.linspace(0, np.pi * 2, 100):
                c = a + b * np.exp(1j * theta)
                mag = np.abs(np.sum(c * np.conjugate(signal))) / len(c)
                if mag > mag_high:
                    mag_high = mag
                    mag_theta = theta
                    mag_freq_off = freq_off
            
        print(f'mag:{mag_high} theta:{mag_theta} offset:{mag_freq_off}')

if __name__ == '__main__':
    main()

I set the code up to beamform on the TX side too, but I cleared out the second antenna to all zeros to disable it. If you give it the appropriate theta and uncomment out the zeros it should beamform on the TX side. I believe the RX should still report the same theta unless you’re TX beam creates a reflection that is particularly strong BUT you should see an increase in magnitude as the TX beam crosses the RX antennas with its beam.

If the idea is still confusing, then I can say in a nutshell what we are dealing with is the difference in arrival time between the two antennas on the RX side and this difference in arrival time manifests as a phase difference per sample since both RX elements are sampled at the same time.

Both RX elements/antennas are sampled at the same time, but they sample different parts of space, and the signal has a different phase in these different spatial positions which correspond to the RX element/antenna positions.

The same works for the TX antennas. If you feed each antenna a specific phase then you create a field around the antennas which has a pattern that creates a main lobe. Even if you feed each TX antenna the same phase you still create a field pattern with a main lobe. In actuality you create two main lobes with two elements if you look down from the top.

The pattern is created by the constructive/deconstructive interference of the field from each antenna in three-dimensional space. The phase and magnitude controls this but since each antenna has relatively the same magnitude they will cancel out (subtract) in certain areas and build up (add) in others and in particular the adding happens to create a main lobe.

I don’t think the pattern is perfect though. It has some fringes/fingers/side lobes that appear, but I can’t remember if I am correct or how pronounced these are. This is just an effect of the not so perfect overlay of the two fields on top of each other if that is the case.

In reality, the pattern is three dimensional and the main lobe is in the shape of a torus as the phase difference is increased the main lobe leans leftward or rightward towards. With the same phase fed the lobe is ejected perpendicular to the two antennas axis which is straight out.

I do believe it is not a true torus because the field will fall off upward along each antenna’s axis (straight up and down). I find it a challenge to visualize the pattern in my head but I hope this gets you started and gives you some point to start thinking about it.

The RX side works the same as the TX and you can also imagine a field pattern or lobes. When both fields are aligned you get the highest magnitude.

Let me try to modify the program to search not only for the RX field but also to try to determine the best TX field.

Searching for the TX Theta/Vector

In this program, we first find the optimal RX theta/vector. Then, we randomly try different TX thetas and track the best one. It is possible to switch roles by having the RX become the TX and that should yield a very good approximation of the correct theta if not the exact one. In this case, I decided to keep them running the same but to turn on the additional TX antenna and try creating different fields in hopes of eventually guessing the right one.

This should be similar but not exactly like sounding in 802.11. We now have a feedback loop. In effect the RX side is letting the TX side know what kind of power level it got, and the TX is then adjusting the theta trying to optimize that power level. I believe the way 802.11 works is that it computes the steering matrix (theta and weights) in one single transaction versus what I’m doing is over multiple transactions searching.

There should be better and quicker ways than randomly probing for the right theta value, but I thought it was the most simple and self-explanatory method.

I also modified the RX routine to measure instead of search because this speeds up the process by a lot. Since we already know the best RX theta and frequency offset there isn’t any reason to continue to track it right now. It could drift over time and that would make for a good experiment by itself, but I just didn’t include it.

import argparse
import socket
import threading
import queue
import yaml
import pickle
import bladerf
from lib.bladeandnumpy import BladeRFAndNumpy
from lib.general import *
import lib.bsocket as bsocket

def tx_thread(dev, num_buffers, buffer_size, freq, lo_freq, theta, sps, q):
    dev.sync_config(
        bladerf.ChannelLayout.TX_X2,
        bladerf.Format.SC16_Q11,
        num_buffers=num_buffers,
        buffer_size=buffer_size,
        num_transfers=8,
        stream_timeout=20000
    )

    for ch in [1, 3]:
        dev.set_gain_mode(ch, bladerf.GainMode.Manual)
        dev.set_bias_tee(ch, False)
        dev.set_frequency(ch, freq)
        dev.set_bandwidth(ch, 200e3)
        dev.set_sample_rate(ch, sps)
        dev.set_gain(ch, 25)
        print('tx-gain', dev.get_gain(ch))
        dev.enable_module(ch, True)

    buffer_samps = int((num_buffers * buffer_size) // 8)

    assert sps % lo_freq == 0

    samps = int(sps / lo_freq)
    samps = samps * int(buffer_samps // samps)
    t = samps / sps

    def build_tx_data(theta):
        tx_data0 = np.exp(1j * np.linspace(0, np.pi * 2 * lo_freq * t, samps))
        tx_data0 /= np.max(np.abs(tx_data0))
        tx_data0 *= 2000
        tx_data2 = np.zeros((len(tx_data0) * 4), np.int16)
        tx_data2[0::4] = tx_data0.real
        tx_data2[1::4] = tx_data0.imag

        if theta is not None:
            tx_data1 = tx_data0 * np.exp(1j * theta)
            tx_data1 /= np.max(np.abs(tx_data1))
            tx_data1 *= 2000
            tx_data2[2::4] = tx_data1.real
            tx_data2[3::4] = tx_data1.imag
        else:
            tx_data2[2::4] = 0
            tx_data2[3::4] = 0            

        tx_data = tx_data2.tobytes()
        return tx_data

    print('tx thread going')

    theta = None
    tx_data = build_tx_data(theta)

    while True:
        try:
            theta = q.get()
            tx_data = build_tx_data(theta)
        except queue.Empty:
            pass
        dev.sync_tx(tx_data, int(len(tx_data) // 4))

def main():
    devs = []

    print('opening cards')
    for serial in ['9da', 'e85']:
        devs.append(BladeRFAndNumpy(f'libusb:serial={serial}'))

    sps = 1000000

    num_buffers = 16
    buffer_size = 1024 * 32
    buffer_samps = num_buffers * buffer_size

    print('configuring rx')
    devs[0].sync_config(
        bladerf.ChannelLayout.RX_X2,
        bladerf.Format.SC16_Q11,
        num_buffers=num_buffers,
        buffer_size=buffer_size,
        num_transfers=8,
        stream_timeout=20000
    )

    # (dev, num_buffers, buffer_size, freq, lo_freq, theta):

    freq = 1.83e9
    lo_freq = 5e3

    buffer_samps = int((num_buffers * buffer_size) // 8)

    q = queue.Queue()

    tx_th = threading.Thread(
        target=tx_thread,
        args=(devs[1], num_buffers, buffer_size, freq, lo_freq, 0.0, sps, q),
        daemon=True
    )

    print('started tx thread')
    tx_th.start()
    time.sleep(2)

    print('setting rx params')
    for ch in [0, 2]:
        devs[0].set_gain_mode(ch, bladerf.GainMode.Manual)
        devs[0].set_bias_tee(ch, False)
        devs[0].set_frequency(ch, freq)
        devs[0].set_bandwidth(ch, 200e3)
        devs[0].set_sample_rate(ch, sps)
        devs[0].set_gain(ch, 60)
        devs[0].enable_module(ch, True)

    samps = int(sps / lo_freq)
    samps = samps * int(buffer_samps // samps)
    t = samps / sps

    def measure(freq_off, theta):
        signal = np.exp(1j * np.linspace(0, np.pi * 2 * freq_off * t, samps))        
        a, b = devs[0].sample_as_f64(samps, 2, 4, 0)
        c = a + b * np.exp(1j * theta)
        mag = np.abs(np.sum(c * np.conjugate(signal))) / len(c)
        return mag

    def search(iterations):
        mag_high = 0
        mag_theta = None
        mag_freq_off = None

        for _ in range(iterations):
            a, b = devs[0].sample_as_f64(samps, 2, 4, 0)
            for freq_off in np.linspace(-5e3, 5e3, 125):
                _lo_freq = lo_freq + freq_off
                signal = np.exp(1j * np.linspace(0, np.pi * 2 * _lo_freq * t, samps))
                for theta in np.linspace(0, np.pi * 2, 100):
                    c = a + b * np.exp(1j * theta)
                    mag = np.abs(np.sum(c * np.conjugate(signal))) / len(c)
                    if mag > mag_high:
                        mag_high = mag
                        mag_theta = theta
                        mag_freq_off = freq_off
                        #print(f'mag:{mag_high} theta:{mag_theta} offset:{mag_freq_off}')
        return mag_high, mag_theta, mag_freq_off

    mag_high, mag_theta, mag_freq_off = search(2)
    print(f'mag:{mag_high} theta:{mag_theta} offset:{mag_freq_off}')

    bmh = 0         # best rx magnitude
    btxt = None     # best tx theta

    print('searching for tx theta')

    # Now, try to search for the TX theta.
    while True:
        theta = np.random.random() * np.pi * 2
        #print('trying tx theta', theta)
        q.put(theta)
        # give the TX a little time to start sending it
        time.sleep(1)
        n_mag_high = measure(mag_freq_off, mag_theta)
        if n_mag_high > bmh:
            bmh = n_mag_high
            btxt = theta
            print(f'[best] rx-mag:{bmh} tx-theta:{btxt}')

if __name__ == '__main__':
    main()

It should produce some output like the following:

[root@2603-9005-0700-52fd-6a1d-efff-fe1b-df67 test_bench]# LD_LIBRARY_PATH=/home/outpost/bladeRF/host/build/output python3 theta_test.py
opening cards
opened device libusb:serial=9da
opened device libusb:serial=e85
configuring rx
started tx thread
setting rx params
[WARNING @ host/libraries/libbladeRF/src/board/bladerf2/rfic_host.c:588] _rfic_host_set_gain_mode: automatic gain control not valid for TX channels
tx-gain 25
[WARNING @ host/libraries/libbladeRF/src/board/bladerf2/rfic_host.c:588] _rfic_host_set_gain_mode: automatic gain control not valid for TX channels
tx-gain 25
tx thread going
>> here we found the RX theta and frequency offset
mag:1.179196331083262 theta:6.156252270670908 offset:-3951.612903225807
searching for tx theta
[best] rx-mag:0.000848590302128108 tx-theta:0.7224857995719817
[best] rx-mag:0.0009858890125604104 tx-theta:1.9199277517647855
[best] rx-mag:0.002570921010167817 tx-theta:1.4631717552503374
[best] rx-mag:0.003108498991348187 tx-theta:1.4819606314287737
[best] rx-mag:0.003197854341596225 tx-theta:3.9327868289341343
[best] rx-mag:0.003342666060939334 tx-theta:4.812804984205143
[best] rx-mag:0.003914908027940953 tx-theta:5.07781681584898
>> after a period of time the best TX theta is found
[best] rx-mag:0.00406657910144763 tx-theta:1.069675189349976

Now, if you don’t believe it is working then try rotating one or both boards. If you rotate the RX board then you will only see the change next time you run the program since it only searches for the RX theta at the beginning. You might not see the TX theta change if you move the boards _unless_ you re-run the program because it has already locked on via the magnitude – they’d need to move closer to increase the magnitude for it to detect a new theta value.