BladeRF: Two Board Near Coherency over Large Distances

You will need the lib.bladeandnumpy module from this page. The bladerf module comes with the BladeRF repository from GitHub. You can install Numpy and Scipy with python3 -m pip install numpy scipy.

I wanted to work on a special project here which involves bringing two BladeRF boards into near coherency over a large distance. This distance would be such that the usage of a shared clock signal would be impractical. This has been an old problem of mine that I always wanted to solve.

The one thing you do have is an accurate spatial position of the two boards relative to each other and an out of band data connection like the Internet or Ethernet. It should be possible using this information and the transmit and receive channels to bring the two boards together such that a near approximation of sample or sub-sample coherency could be obtained.

However, I wasn’t able to get the setup through some final tests.

I did not implement the subtraction of the propagation delay but considering my setup’s distance it would have been near zero in samples. There would have been a sub-sample delay though.

Here is the code to do the test.

import argparse
import socket
import threading
import queue
import yaml
import pickle
import bladerf
import random
import scipy.signal as signal
from lib.bladeandnumpy import BladeRFAndNumpy

def build_dual_sine(sps, samps, lo_freq, theta0, theta1):
    t = samps / sps
    base = np.exp(1j * np.linspace(0, np.pi * 2 * lo_freq * t, samps))
    tx_data2 = np.zeros((len(base) * 4), np.int16)

    if theta0 is not None:
        tx_data0 = base * np.exp(1j * theta0)
        tx_data0 /= np.max(np.abs(tx_data0))
        tx_data0 *= 2000
        tx_data2[0::4] = tx_data0.real
        tx_data2[1::4] = tx_data0.imag
    else:
        tx_data2[0::4] = 0
        tx_data2[1::4] = 0

    if theta1 is not None:
        tx_data1 = base * np.exp(1j * theta1)
        tx_data1 /= np.max(np.abs(tx_data1))
        tx_data1 *= 2000
        tx_data2[2::4] = tx_data1.real
        tx_data2[3::4] = tx_data1.imag
    else:
        tx_data2[2::4] = 0
        tx_data2[3::4] = 0            

    tx_data = tx_data2.tobytes()
    return tx_data

def build_fm(sps, lo_freq, sub_freq, theta0, theta1):
    samps = sps * 4
    t = samps / sps

    sub_freq = 0.25

    center = np.exp(1j * np.linspace(0, np.pi * 2 * lo_freq * t, samps))

    deviation = 5e3
    theta_step = np.pi * 2 * deviation / sps

    # Start the lowest deviation at the first sample hence the negative and
    # cosine function. It should produce -1 at theta zero which should produce
    # the maximum frequency deviation centered at `lo_freq`.
    base_theta = np.add.accumulate(
        -np.cos(np.linspace(0, np.pi * 2 * sub_freq * t, samps)) * theta_step 
    )

    base = np.exp(1j * base_theta) * center

    tx_data2 = np.zeros((len(base) * 4), np.int16)

    if theta0 is not None:
        tx_data0 = base * np.exp(1j * theta0)
        tx_data0 /= np.max(np.abs(tx_data0))
        tx_data0 *= 2000
        tx_data2[0::4] = tx_data0.real
        tx_data2[1::4] = tx_data0.imag
    else:
        tx_data2[0::4] = 0
        tx_data2[1::4] = 0

    if theta1 is not None:
        tx_data1 = base * np.exp(1j * theta1)
        tx_data1 /= np.max(np.abs(tx_data1))
        tx_data1 *= 2000
        tx_data2[2::4] = tx_data1.real
        tx_data2[3::4] = tx_data1.imag
    else:
        tx_data2[2::4] = 0
        tx_data2[3::4] = 0            

    tx_data = tx_data2.tobytes()
    return tx_data

def tx_thread(dev, num_buffers, buffer_size, freq, lo_freq, sps, q, qout):
    print('[tx-thread] started')
    dev.sync_config(
        bladerf.ChannelLayout.TX_X2,
        bladerf.Format.SC16_Q11,
        num_buffers=num_buffers,
        buffer_size=buffer_size,
        num_transfers=8,
        stream_timeout=20000
    )

    for ch in [1, 3]:
        dev.set_gain_mode(ch, bladerf.GainMode.Manual)
        dev.set_bias_tee(ch, False)
        dev.set_frequency(ch, freq)
        dev.set_bandwidth(ch, sps)
        dev.set_sample_rate(ch, sps)
        dev.set_gain(ch, 30)
        print('tx-gain', dev.get_gain(ch))
        dev.enable_module(ch, True)

    buffer_samps = int((num_buffers * buffer_size) // 8)

    assert sps % lo_freq == 0

    samps = int(sps / lo_freq)
    samps = samps * int(buffer_samps // samps)
    t = samps / sps

    print('[tx-thread] waiting on first command')
    cmd = q.get()

    counter = 0

    while True:
        if cmd == 'single_sine_output':
            print('[tx-thread] got single_sine_output')
            tx_data = build_dual_sine(sps, samps, lo_freq, 0.0, None)
            qout.put('single_sine_output')
            while True:
                try:
                    cmd = q.get_nowait()
                    break
                except queue.Empty:
                    pass
                samps = int(len(tx_data) // 4)
                # We are counting two channels not total samples.
                counter += samps // 2
                dev.sync_tx(tx_data, samps)
        elif cmd == 'single_fm_output':
            print('[tx-thread] got single_fm_output')
            sub_freq = 4
            tx_data = build_fm(sps, lo_freq, sub_freq, 0.0, 0.0)
            qout.put('single_fm_output')
            while True:
                try:
                    cmd = q.get_nowait()
                    break
                except queue.Empty:
                    pass
                samps = int(len(tx_data) // 4)
                qout.put(('counter', counter))
                # We are counting two channels not total samples.
                counter += samps // 2
                # There should be one full FM cycle in this sample data.
                dev.sync_tx(tx_data, samps)

def rx_thread(dev, num_buffers, buffer_size, freq, sps, rx, tx):
    print('[rx-thread] started')
    dev.sync_config(
        bladerf.ChannelLayout.RX_X2,
        bladerf.Format.SC16_Q11,
        num_buffers=num_buffers,
        buffer_size=buffer_size,
        num_transfers=8,
        stream_timeout=20000
    )

    for ch in [0, 2]:
        dev.set_gain_mode(ch, bladerf.GainMode.Manual)
        dev.set_bias_tee(ch, False)
        dev.set_frequency(ch, freq)
        dev.set_bandwidth(ch, sps)
        dev.set_sample_rate(ch, sps)
        dev.set_gain(ch, 60)
        dev.enable_module(ch, True)
    
    buffer_samps = int(num_buffers * buffer_size / 8)

    counter = 0

    while True:
        sa, sb = dev.sample_as_f64(buffer_samps, 2, 4, 0)
        start_counter = counter 
        counter += len(sa)
        try:
            cmd = rx.get_nowait()
            print(f'[rx-thread] got sample command for {cmd} samples')
            sabuf = [sa]
            sbbuf = [sb]
            got = len(sa)
            while got < cmd:
                sa, sb = dev.sample_as_f64(buffer_samps, 2, 4, 0)
                sabuf.append(sa)
                sbbuf.append(sb)
                got += len(sa)
                counter += len(sa)
            sa = np.hstack(sabuf)
            sb = np.hstack(sbbuf)
            tx.put((start_counter, sa[0:cmd], sb[0:cmd]))
        except queue.Empty:
            pass

def slave_thread(tx_tx, tx_rx, slave_rx, slave_tx):
    """The slave thread.

    This runs remotely from the master thread and the queues could be
    interfaced with Ethernet/Internet.
    """
    print('[slave] started')

    tx_tx.put('single_fm_output')

    def queue_reader(qin, qout, prefix):
        while True:
            item = qin.get()
            qout.put((prefix, item))

    nq = queue.Queue()

    th_a = threading.Thread(target=queue_reader, args=(tx_rx, nq, 'tx_rx'), daemon=True)
    th_b = threading.Thread(target=queue_reader, args=(slave_rx, nq, 'slave_rx'), daemon=True)

    th_a.start()
    th_b.start()

    tx_counter = None

    while True:
        item = nq.get()

        if item[0] == 'tx_rx':
            tx_counter = item[1][1]
        elif item[0] == 'slave_rx':
            assert item[1] == 'tx_counter'
            slave_tx.put(tx_counter)

def find_peak_frequency_random_probe(left, right, sa, sps, lo_freq, duration):
    assert right > left
    delta = right - left
    t = len(sa) / sps
    best_mag = 0
    best_freq = None
    st = time.time()
    ist = st
    while time.time() - st < duration:
        if time.time() - ist > 3:
            ist = time.time()
            rem = duration - (time.time() - st)
            print('[find_peak_frequency_random_probe] remaining seconds %.01f' % rem)
        check_freq = np.random.random() * delta + left
        check_signal = np.exp(1j * np.linspace(
            0, np.pi * 2 * check_freq * t, len(sa)
        ))
        mag = np.abs(np.sum(sa * np.conjugate(check_signal)))
        if mag > best_mag:
            st = time.time()
            best_mag = mag
            best_freq = check_freq
            print('[find_peak_frequency_random_probe] mag: %.02f freq:%.02f' % (
                best_mag, best_freq
            ))
    return best_mag, best_freq

def find_peak_frequency(left, right, sa, sps, lo_freq, steps):
    check_freq_space = np.linspace(left, right, steps)
    t = len(sa) / sps
    best_mag = 0
    best_freq = None
    st = time.time()
    for ndx, check_freq in enumerate(check_freq_space):
        if time.time() - st > 3:
            st = time.time()
            prog = ndx / (len(check_freq_space) - 1) * 100.0
            print('[find_peak_frequency] progress: %.01f' % prog)
        check_signal = np.exp(1j * np.linspace(
            0, np.pi * 2 * check_freq * t, len(sa)
        ))
        mag = np.abs(np.sum(sa * np.conjugate(check_signal)))
        if mag > best_mag:
            best_mag = mag
            best_freq = check_freq
    return best_mag, best_freq

def calib_host(rx_tx, rx_rx, slave_tx, slave_rx, lo_freq, sps, rx_channel):
    sos_a = signal.butter(8, (lo_freq - 10e3, lo_freq + 10e3), btype='bandpass', output='sos', fs=sps)
    sos_b = signal.butter(8, 500, btype='lowpass', output='sos', fs=sps)

    best_min = None
    best_rx_counter = None
    best_tx_counter = None

    #fd = open('plot', 'wb')
    st = time.time()
    while time.time() - st < 60:
        #print('[master] sent rx sample command')
        rx_tx.put(1024 * 100)
        rx_counter, sa, sb = rx_rx.get()

        if rx_channel == 'a':
            pass
        elif rx_channel == 'b':
            sc = sa
            sa = sb
            sb = sc
        else:
            raise Exception('unknown rx channel')

        sa_mean = np.mean(np.abs(sa))
        sb_mean = np.mean(np.abs(sb))
        print('sa_mean', sa_mean, 'sb_mean', sb_mean)

        if sa_mean > 1.0 or sb_mean > 1.0:
            print('WARNING: clipping on RX nearing threshold')
        #pickle.dump(sa, fd)

        with open('plot', 'wb') as fd:
            pickle.dump(sa, fd)
            pickle.dump(sb, fd)
        print('wrote plot')
        exit()

        sa = signal.sosfiltfilt(sos_a, sa)
        sa = sa[500:-500]
        sa = np.angle(sa[1:] * np.conjugate(sa[:-1]))
        sa = signal.sosfiltfilt(sos_b, sa)
        sa = sa[1700:-1700]
        sa = sa / (np.pi * 2 / sps)

        the_min_ndx = np.argmin(sa)
        the_min = sa[the_min_ndx]

        #print('the_min', the_min)    

        if best_min is None or the_min < best_min:
            st = time.time()
            slave_tx.put('tx_counter')
            best_tx_counter = slave_rx.get()
            if best_tx_counter is None:
                continue            
            best_min = the_min
            best_rx_counter = rx_counter + 500 + 1700 + the_min_ndx
            print('best_min', best_min, best_rx_counter)
            print('delta', best_rx_counter - best_tx_counter)

    return best_rx_counter, best_tx_counter

def freq_calib_host():
    # track sine signal for frequency calibration
    if slave_rx.get() != 'single_sine_output':
        raise Exception('unexpected stage')
    # compute SPS offset for slave
    for _ in range(0):
        print('[master] sent rx sample command')
        rx_tx.put(1024 * 10)
        _, sa, sb = rx_rx.get()
        peak_mag, peak_freq = find_peak_frequency(
            left=-10e3 + lo_freq,
            right=10e3 + lo_freq,
            sa=sa,
            sps=sps,
            lo_freq=lo_freq,
            steps=1000
        )
        print('[peak-1st-round]', 'peak_mag', peak_mag, 'peak_freq', peak_freq)
        peak_mag, peak_freq = find_peak_frequency_random_probe(
            left=peak_freq - 500,
            right=peak_freq + 500,
            sa=sa,
            sps=sps,
            lo_freq=lo_freq,
            duration=20
        )
        print('[peak-2nd-round]', 'peak_mag', peak_mag, 'peak_freq', peak_freq)
    abs_freq = freq + lo_freq

def master_thread(
        rxa_tx,
        rxa_rx,
        rxb_tx,
        rxb_rx,
        slavea_tx,
        slavea_rx,
        slaveb_tx,
        slaveb_rx,
        lo_freqa,
        lo_freqb,
        sps,
        freq):
    """The master thread.
    """

    '''
        connect
            RXA_1 -> TXA_1
            RXB_1 -> TXB_1
            RXA_2 -> TXB_2
    '''

    # lock RXA to TXA
    rxa_cnt, txa_cnt = calib_host(rxa_tx, rxa_rx, slavea_tx, slavea_rx, lo_freqa, sps, 'a')
    # lock RXB to TXB
    rxb_cnt, txb_cnt = calib_host(rxb_tx, rxb_rx, slaveb_tx, slaveb_rx, lo_freqb, sps, 'a')
    # lock RXA to TXB
    rxab_cnt, txab_cnt = calib_host(rxa_tx, rxa_rx, slaveb_tx, slaveb_rx, lo_freqb, sps, 'b')

    print('rxa_cnt', rxa_cnt)
    print('txa_cnt', txa_cnt)
    print('rxb_cnt', rxb_cnt)
    print('txb_cnt', txb_cnt)
    print('rxab_cnt', rxab_cnt)
    print('txab_cnt', txab_cnt)


def main():
    devs = []

    print('opening cards')
    for serial in ['9da', 'e85']:
        devs.append(BladeRFAndNumpy(f'libusb:serial={serial}'))

    sps = 1000000

    num_buffers = 16
    buffer_size = 1024 * 32
    buffer_samps = num_buffers * buffer_size

    freq = 1.83e9
    #freq = random.randint(int(2e9), int(3e9))
    print('freq', freq)
    lo_freqa = 20e3
    lo_freqb = 50e3

    buffer_samps = int((num_buffers * buffer_size) // 4)

    txa_tx = queue.Queue()
    txa_rx = queue.Queue()
    txb_tx = queue.Queue()
    txb_rx = queue.Queue()
    rxa_tx = queue.Queue()
    rxa_rx = queue.Queue()
    rxb_tx = queue.Queue()
    rxb_rx = queue.Queue()

    tx_th_a = threading.Thread(
        target=tx_thread,
        args=(
            devs[1],
            num_buffers,
            buffer_size,
            freq,
            lo_freqa,
            sps,
            txa_tx,
            txa_rx,
        ),
        daemon=True
    )

    tx_th_b = threading.Thread(
        target=tx_thread,
        args=(
            devs[0],
            num_buffers,
            buffer_size,
            freq,
            lo_freqb,
            sps,
            txb_tx,
            txb_rx,
        ),
        daemon=True
    )    

    rx_th_a = threading.Thread(
        target=rx_thread,
        args=(
            devs[1],
            num_buffers,
            buffer_size,
            freq,
            sps,
            rxa_tx,
            rxa_rx,
        ),
        daemon=True
    )

    rx_th_b = threading.Thread(
        target=rx_thread,
        args=(
            devs[0],
            num_buffers,
            buffer_size,
            freq,
            sps,
            rxb_tx,
            rxb_rx,
        ),
        daemon=True
    )

    slavea_tx = queue.Queue()
    slavea_rx = queue.Queue()
    slaveb_tx = queue.Queue()
    slaveb_rx = queue.Queue()    
    master_tx = queue.Queue()
    master_rx = queue.Queue()

    slavea_th = threading.Thread(
        target=slave_thread,
        args=(
            txa_tx, txa_rx, slavea_tx, slavea_rx
        ),
        daemon=True
    )

    slaveb_th = threading.Thread(
        target=slave_thread,
        args=(
            txb_tx, txb_rx, slaveb_tx, slaveb_rx
        ),
        daemon=True
    )

    print('started tx thread')
    tx_th_a.start()
    tx_th_b.start()
    rx_th_a.start()
    rx_th_b.start()
    slavea_th.start()
    slaveb_th.start()

    def clear_rx_buffer():
        a, b = devs[0].sample_as_f64(buffer_samps, 2, 4, 0)

    tx0_freq_off = -3937.0

    master_thread(
        rxa_tx, rxa_rx, rxb_tx, rxb_rx,
        slavea_tx, slavea_rx,
        slaveb_tx, slaveb_rx,
        lo_freqa, lo_freqb, sps, freq
    )

if __name__ == '__main__':
    main()

Most of the work happens in master_thread. There are three calib_host calls and each of these peg a TX sample index to an RX sample index.

The first call takes board A and pegs the TX to the RX so now we know what sample going out of the DAC corresponds to what sample coming in the ADC minus the propagation delay which also includes the circuitry.

The second call does the same for board B, and finally the third call pegs board A’s RX to board B’s TX.

At this point, all four asynchronous chains have been synchronized! As I said, the propagation delay would still need to be computed but if you had the boards close together, you’d have to do a sub-sample.

I had wanted to aim for a sub-sample synchronization, and I feel like it would have been possible to do. I hope maybe it serves as something someone else can do and extend. It just isn’t possible for me to experiment with my boards with such a high-powered RF transmitter nearby, but I hope for someone else the fun of learning and experimenting will be there!

Please, enjoy! I hope it works just as I intended.

I was able to get some small tests done early on before the interference became too great. It synchronized between two boards on a TX and RX channel. So, I know it works. I should take some time and explain more about how it works.

What I do is generate a slowly modulated FM signal. The deviation is like 5e3-hertz and on the RX side you grab snapshots. I could have tried to do a more continuous processing (real-time) but I opted for the less processing intensive method of just grabbing snapshots. You look for the lowest deviation and you keep grabbing snapshot and trying to find the perfect point that has that low deviation.

Each time you find a low deviation to request the TX/slave thread to send its counter value which is the sample index it sends the last 4-second transmission on. I’m using a FM modulation frequency of 0.25-hertz so it lasts 4-seconds. I also start the lowest deviation on the first sample. So, this means the TX’s counter value (sample index) will match the RX’s counter (sample index) minus the propagation delay.

Now, since the entire cycle takes 4-seconds that is a big enough window for the message to go from the master thread to the TX thread and back. This is an important part of the process. This is also why you might not want to sample for an entire 4-second window because by the time you’re done the TX counter may have advanced. If the code isn’t quick enough you get the next counter value and of course the synchronization is off by 4-seconds.

There are some ways to test the synchronization! This means the experiment could have used a second part to validate correctness. I leave that as an exercise for whoever does this experiment.

I also feel that 4-seconds is a good amount of time for synchronization over something like Ethernet or the Internet. So, in theory, you could do more than two boards and you could put them as a great distance! This would allow you to do some cool things.