You will need the lib.bladeandnumpy
module from this page. The bladerf
module comes with the BladeRF repository from GitHub. You can install Numpy and Scipy with python3 -m pip install numpy scipy
.
I wanted to work on a special project here which involves bringing two BladeRF boards into near coherency over a large distance. This distance would be such that the usage of a shared clock signal would be impractical. This has been an old problem of mine that I always wanted to solve.
The one thing you do have is an accurate spatial position of the two boards relative to each other and an out of band data connection like the Internet or Ethernet. It should be possible using this information and the transmit and receive channels to bring the two boards together such that a near approximation of sample or sub-sample coherency could be obtained.
However, I wasn’t able to get the setup through some final tests.
I did not implement the subtraction of the propagation delay but considering my setup’s distance it would have been near zero in samples. There would have been a sub-sample delay though.
Here is the code to do the test.
import argparse
import socket
import threading
import queue
import yaml
import pickle
import bladerf
import random
import scipy.signal as signal
from lib.bladeandnumpy import BladeRFAndNumpy
def build_dual_sine(sps, samps, lo_freq, theta0, theta1):
t = samps / sps
base = np.exp(1j * np.linspace(0, np.pi * 2 * lo_freq * t, samps))
tx_data2 = np.zeros((len(base) * 4), np.int16)
if theta0 is not None:
tx_data0 = base * np.exp(1j * theta0)
tx_data0 /= np.max(np.abs(tx_data0))
tx_data0 *= 2000
tx_data2[0::4] = tx_data0.real
tx_data2[1::4] = tx_data0.imag
else:
tx_data2[0::4] = 0
tx_data2[1::4] = 0
if theta1 is not None:
tx_data1 = base * np.exp(1j * theta1)
tx_data1 /= np.max(np.abs(tx_data1))
tx_data1 *= 2000
tx_data2[2::4] = tx_data1.real
tx_data2[3::4] = tx_data1.imag
else:
tx_data2[2::4] = 0
tx_data2[3::4] = 0
tx_data = tx_data2.tobytes()
return tx_data
def build_fm(sps, lo_freq, sub_freq, theta0, theta1):
samps = sps * 4
t = samps / sps
sub_freq = 0.25
center = np.exp(1j * np.linspace(0, np.pi * 2 * lo_freq * t, samps))
deviation = 5e3
theta_step = np.pi * 2 * deviation / sps
# Start the lowest deviation at the first sample hence the negative and
# cosine function. It should produce -1 at theta zero which should produce
# the maximum frequency deviation centered at `lo_freq`.
base_theta = np.add.accumulate(
-np.cos(np.linspace(0, np.pi * 2 * sub_freq * t, samps)) * theta_step
)
base = np.exp(1j * base_theta) * center
tx_data2 = np.zeros((len(base) * 4), np.int16)
if theta0 is not None:
tx_data0 = base * np.exp(1j * theta0)
tx_data0 /= np.max(np.abs(tx_data0))
tx_data0 *= 2000
tx_data2[0::4] = tx_data0.real
tx_data2[1::4] = tx_data0.imag
else:
tx_data2[0::4] = 0
tx_data2[1::4] = 0
if theta1 is not None:
tx_data1 = base * np.exp(1j * theta1)
tx_data1 /= np.max(np.abs(tx_data1))
tx_data1 *= 2000
tx_data2[2::4] = tx_data1.real
tx_data2[3::4] = tx_data1.imag
else:
tx_data2[2::4] = 0
tx_data2[3::4] = 0
tx_data = tx_data2.tobytes()
return tx_data
def tx_thread(dev, num_buffers, buffer_size, freq, lo_freq, sps, q, qout):
print('[tx-thread] started')
dev.sync_config(
bladerf.ChannelLayout.TX_X2,
bladerf.Format.SC16_Q11,
num_buffers=num_buffers,
buffer_size=buffer_size,
num_transfers=8,
stream_timeout=20000
)
for ch in [1, 3]:
dev.set_gain_mode(ch, bladerf.GainMode.Manual)
dev.set_bias_tee(ch, False)
dev.set_frequency(ch, freq)
dev.set_bandwidth(ch, sps)
dev.set_sample_rate(ch, sps)
dev.set_gain(ch, 30)
print('tx-gain', dev.get_gain(ch))
dev.enable_module(ch, True)
buffer_samps = int((num_buffers * buffer_size) // 8)
assert sps % lo_freq == 0
samps = int(sps / lo_freq)
samps = samps * int(buffer_samps // samps)
t = samps / sps
print('[tx-thread] waiting on first command')
cmd = q.get()
counter = 0
while True:
if cmd == 'single_sine_output':
print('[tx-thread] got single_sine_output')
tx_data = build_dual_sine(sps, samps, lo_freq, 0.0, None)
qout.put('single_sine_output')
while True:
try:
cmd = q.get_nowait()
break
except queue.Empty:
pass
samps = int(len(tx_data) // 4)
# We are counting two channels not total samples.
counter += samps // 2
dev.sync_tx(tx_data, samps)
elif cmd == 'single_fm_output':
print('[tx-thread] got single_fm_output')
sub_freq = 4
tx_data = build_fm(sps, lo_freq, sub_freq, 0.0, 0.0)
qout.put('single_fm_output')
while True:
try:
cmd = q.get_nowait()
break
except queue.Empty:
pass
samps = int(len(tx_data) // 4)
qout.put(('counter', counter))
# We are counting two channels not total samples.
counter += samps // 2
# There should be one full FM cycle in this sample data.
dev.sync_tx(tx_data, samps)
def rx_thread(dev, num_buffers, buffer_size, freq, sps, rx, tx):
print('[rx-thread] started')
dev.sync_config(
bladerf.ChannelLayout.RX_X2,
bladerf.Format.SC16_Q11,
num_buffers=num_buffers,
buffer_size=buffer_size,
num_transfers=8,
stream_timeout=20000
)
for ch in [0, 2]:
dev.set_gain_mode(ch, bladerf.GainMode.Manual)
dev.set_bias_tee(ch, False)
dev.set_frequency(ch, freq)
dev.set_bandwidth(ch, sps)
dev.set_sample_rate(ch, sps)
dev.set_gain(ch, 60)
dev.enable_module(ch, True)
buffer_samps = int(num_buffers * buffer_size / 8)
counter = 0
while True:
sa, sb = dev.sample_as_f64(buffer_samps, 2, 4, 0)
start_counter = counter
counter += len(sa)
try:
cmd = rx.get_nowait()
print(f'[rx-thread] got sample command for {cmd} samples')
sabuf = [sa]
sbbuf = [sb]
got = len(sa)
while got < cmd:
sa, sb = dev.sample_as_f64(buffer_samps, 2, 4, 0)
sabuf.append(sa)
sbbuf.append(sb)
got += len(sa)
counter += len(sa)
sa = np.hstack(sabuf)
sb = np.hstack(sbbuf)
tx.put((start_counter, sa[0:cmd], sb[0:cmd]))
except queue.Empty:
pass
def slave_thread(tx_tx, tx_rx, slave_rx, slave_tx):
"""The slave thread.
This runs remotely from the master thread and the queues could be
interfaced with Ethernet/Internet.
"""
print('[slave] started')
tx_tx.put('single_fm_output')
def queue_reader(qin, qout, prefix):
while True:
item = qin.get()
qout.put((prefix, item))
nq = queue.Queue()
th_a = threading.Thread(target=queue_reader, args=(tx_rx, nq, 'tx_rx'), daemon=True)
th_b = threading.Thread(target=queue_reader, args=(slave_rx, nq, 'slave_rx'), daemon=True)
th_a.start()
th_b.start()
tx_counter = None
while True:
item = nq.get()
if item[0] == 'tx_rx':
tx_counter = item[1][1]
elif item[0] == 'slave_rx':
assert item[1] == 'tx_counter'
slave_tx.put(tx_counter)
def find_peak_frequency_random_probe(left, right, sa, sps, lo_freq, duration):
assert right > left
delta = right - left
t = len(sa) / sps
best_mag = 0
best_freq = None
st = time.time()
ist = st
while time.time() - st < duration:
if time.time() - ist > 3:
ist = time.time()
rem = duration - (time.time() - st)
print('[find_peak_frequency_random_probe] remaining seconds %.01f' % rem)
check_freq = np.random.random() * delta + left
check_signal = np.exp(1j * np.linspace(
0, np.pi * 2 * check_freq * t, len(sa)
))
mag = np.abs(np.sum(sa * np.conjugate(check_signal)))
if mag > best_mag:
st = time.time()
best_mag = mag
best_freq = check_freq
print('[find_peak_frequency_random_probe] mag: %.02f freq:%.02f' % (
best_mag, best_freq
))
return best_mag, best_freq
def find_peak_frequency(left, right, sa, sps, lo_freq, steps):
check_freq_space = np.linspace(left, right, steps)
t = len(sa) / sps
best_mag = 0
best_freq = None
st = time.time()
for ndx, check_freq in enumerate(check_freq_space):
if time.time() - st > 3:
st = time.time()
prog = ndx / (len(check_freq_space) - 1) * 100.0
print('[find_peak_frequency] progress: %.01f' % prog)
check_signal = np.exp(1j * np.linspace(
0, np.pi * 2 * check_freq * t, len(sa)
))
mag = np.abs(np.sum(sa * np.conjugate(check_signal)))
if mag > best_mag:
best_mag = mag
best_freq = check_freq
return best_mag, best_freq
def calib_host(rx_tx, rx_rx, slave_tx, slave_rx, lo_freq, sps, rx_channel):
sos_a = signal.butter(8, (lo_freq - 10e3, lo_freq + 10e3), btype='bandpass', output='sos', fs=sps)
sos_b = signal.butter(8, 500, btype='lowpass', output='sos', fs=sps)
best_min = None
best_rx_counter = None
best_tx_counter = None
#fd = open('plot', 'wb')
st = time.time()
while time.time() - st < 60:
#print('[master] sent rx sample command')
rx_tx.put(1024 * 100)
rx_counter, sa, sb = rx_rx.get()
if rx_channel == 'a':
pass
elif rx_channel == 'b':
sc = sa
sa = sb
sb = sc
else:
raise Exception('unknown rx channel')
sa_mean = np.mean(np.abs(sa))
sb_mean = np.mean(np.abs(sb))
print('sa_mean', sa_mean, 'sb_mean', sb_mean)
if sa_mean > 1.0 or sb_mean > 1.0:
print('WARNING: clipping on RX nearing threshold')
#pickle.dump(sa, fd)
with open('plot', 'wb') as fd:
pickle.dump(sa, fd)
pickle.dump(sb, fd)
print('wrote plot')
exit()
sa = signal.sosfiltfilt(sos_a, sa)
sa = sa[500:-500]
sa = np.angle(sa[1:] * np.conjugate(sa[:-1]))
sa = signal.sosfiltfilt(sos_b, sa)
sa = sa[1700:-1700]
sa = sa / (np.pi * 2 / sps)
the_min_ndx = np.argmin(sa)
the_min = sa[the_min_ndx]
#print('the_min', the_min)
if best_min is None or the_min < best_min:
st = time.time()
slave_tx.put('tx_counter')
best_tx_counter = slave_rx.get()
if best_tx_counter is None:
continue
best_min = the_min
best_rx_counter = rx_counter + 500 + 1700 + the_min_ndx
print('best_min', best_min, best_rx_counter)
print('delta', best_rx_counter - best_tx_counter)
return best_rx_counter, best_tx_counter
def freq_calib_host():
# track sine signal for frequency calibration
if slave_rx.get() != 'single_sine_output':
raise Exception('unexpected stage')
# compute SPS offset for slave
for _ in range(0):
print('[master] sent rx sample command')
rx_tx.put(1024 * 10)
_, sa, sb = rx_rx.get()
peak_mag, peak_freq = find_peak_frequency(
left=-10e3 + lo_freq,
right=10e3 + lo_freq,
sa=sa,
sps=sps,
lo_freq=lo_freq,
steps=1000
)
print('[peak-1st-round]', 'peak_mag', peak_mag, 'peak_freq', peak_freq)
peak_mag, peak_freq = find_peak_frequency_random_probe(
left=peak_freq - 500,
right=peak_freq + 500,
sa=sa,
sps=sps,
lo_freq=lo_freq,
duration=20
)
print('[peak-2nd-round]', 'peak_mag', peak_mag, 'peak_freq', peak_freq)
abs_freq = freq + lo_freq
def master_thread(
rxa_tx,
rxa_rx,
rxb_tx,
rxb_rx,
slavea_tx,
slavea_rx,
slaveb_tx,
slaveb_rx,
lo_freqa,
lo_freqb,
sps,
freq):
"""The master thread.
"""
'''
connect
RXA_1 -> TXA_1
RXB_1 -> TXB_1
RXA_2 -> TXB_2
'''
# lock RXA to TXA
rxa_cnt, txa_cnt = calib_host(rxa_tx, rxa_rx, slavea_tx, slavea_rx, lo_freqa, sps, 'a')
# lock RXB to TXB
rxb_cnt, txb_cnt = calib_host(rxb_tx, rxb_rx, slaveb_tx, slaveb_rx, lo_freqb, sps, 'a')
# lock RXA to TXB
rxab_cnt, txab_cnt = calib_host(rxa_tx, rxa_rx, slaveb_tx, slaveb_rx, lo_freqb, sps, 'b')
print('rxa_cnt', rxa_cnt)
print('txa_cnt', txa_cnt)
print('rxb_cnt', rxb_cnt)
print('txb_cnt', txb_cnt)
print('rxab_cnt', rxab_cnt)
print('txab_cnt', txab_cnt)
def main():
devs = []
print('opening cards')
for serial in ['9da', 'e85']:
devs.append(BladeRFAndNumpy(f'libusb:serial={serial}'))
sps = 1000000
num_buffers = 16
buffer_size = 1024 * 32
buffer_samps = num_buffers * buffer_size
freq = 1.83e9
#freq = random.randint(int(2e9), int(3e9))
print('freq', freq)
lo_freqa = 20e3
lo_freqb = 50e3
buffer_samps = int((num_buffers * buffer_size) // 4)
txa_tx = queue.Queue()
txa_rx = queue.Queue()
txb_tx = queue.Queue()
txb_rx = queue.Queue()
rxa_tx = queue.Queue()
rxa_rx = queue.Queue()
rxb_tx = queue.Queue()
rxb_rx = queue.Queue()
tx_th_a = threading.Thread(
target=tx_thread,
args=(
devs[1],
num_buffers,
buffer_size,
freq,
lo_freqa,
sps,
txa_tx,
txa_rx,
),
daemon=True
)
tx_th_b = threading.Thread(
target=tx_thread,
args=(
devs[0],
num_buffers,
buffer_size,
freq,
lo_freqb,
sps,
txb_tx,
txb_rx,
),
daemon=True
)
rx_th_a = threading.Thread(
target=rx_thread,
args=(
devs[1],
num_buffers,
buffer_size,
freq,
sps,
rxa_tx,
rxa_rx,
),
daemon=True
)
rx_th_b = threading.Thread(
target=rx_thread,
args=(
devs[0],
num_buffers,
buffer_size,
freq,
sps,
rxb_tx,
rxb_rx,
),
daemon=True
)
slavea_tx = queue.Queue()
slavea_rx = queue.Queue()
slaveb_tx = queue.Queue()
slaveb_rx = queue.Queue()
master_tx = queue.Queue()
master_rx = queue.Queue()
slavea_th = threading.Thread(
target=slave_thread,
args=(
txa_tx, txa_rx, slavea_tx, slavea_rx
),
daemon=True
)
slaveb_th = threading.Thread(
target=slave_thread,
args=(
txb_tx, txb_rx, slaveb_tx, slaveb_rx
),
daemon=True
)
print('started tx thread')
tx_th_a.start()
tx_th_b.start()
rx_th_a.start()
rx_th_b.start()
slavea_th.start()
slaveb_th.start()
def clear_rx_buffer():
a, b = devs[0].sample_as_f64(buffer_samps, 2, 4, 0)
tx0_freq_off = -3937.0
master_thread(
rxa_tx, rxa_rx, rxb_tx, rxb_rx,
slavea_tx, slavea_rx,
slaveb_tx, slaveb_rx,
lo_freqa, lo_freqb, sps, freq
)
if __name__ == '__main__':
main()
Most of the work happens in master_thread
. There are three calib_host
calls and each of these peg a TX sample index to an RX sample index.
The first call takes board A and pegs the TX to the RX so now we know what sample going out of the DAC corresponds to what sample coming in the ADC minus the propagation delay which also includes the circuitry.
The second call does the same for board B, and finally the third call pegs board A’s RX to board B’s TX.
At this point, all four asynchronous chains have been synchronized! As I said, the propagation delay would still need to be computed but if you had the boards close together, you’d have to do a sub-sample.
I had wanted to aim for a sub-sample synchronization, and I feel like it would have been possible to do. I hope maybe it serves as something someone else can do and extend. It just isn’t possible for me to experiment with my boards with such a high-powered RF transmitter nearby, but I hope for someone else the fun of learning and experimenting will be there!
Please, enjoy! I hope it works just as I intended.
I was able to get some small tests done early on before the interference became too great. It synchronized between two boards on a TX and RX channel. So, I know it works. I should take some time and explain more about how it works.
What I do is generate a slowly modulated FM signal. The deviation is like 5e3-hertz and on the RX side you grab snapshots. I could have tried to do a more continuous processing (real-time) but I opted for the less processing intensive method of just grabbing snapshots. You look for the lowest deviation and you keep grabbing snapshot and trying to find the perfect point that has that low deviation.
Each time you find a low deviation to request the TX/slave thread to send its counter value which is the sample index it sends the last 4-second transmission on. I’m using a FM modulation frequency of 0.25-hertz so it lasts 4-seconds. I also start the lowest deviation on the first sample. So, this means the TX’s counter value (sample index) will match the RX’s counter (sample index) minus the propagation delay.
Now, since the entire cycle takes 4-seconds that is a big enough window for the message to go from the master thread to the TX thread and back. This is an important part of the process. This is also why you might not want to sample for an entire 4-second window because by the time you’re done the TX counter may have advanced. If the code isn’t quick enough you get the next counter value and of course the synchronization is off by 4-seconds.
There are some ways to test the synchronization! This means the experiment could have used a second part to validate correctness. I leave that as an exercise for whoever does this experiment.
I also feel that 4-seconds is a good amount of time for synchronization over something like Ethernet or the Internet. So, in theory, you could do more than two boards and you could put them as a great distance! This would allow you to do some cool things.