Add getters/setters in preamble/slicer, bring them out to radio.py via pubsub.

This commit is contained in:
Nick Foster 2013-06-05 15:56:09 -04:00
parent 2ace332b89
commit f62813f039
9 changed files with 145 additions and 44 deletions

View File

@ -24,7 +24,6 @@ from optparse import OptionParser
import time, os, sys, threading
from string import split, join
import air_modes
import csv
from air_modes.exceptions import *
import zmq
@ -55,6 +54,7 @@ class screen_printer(threading.Thread):
self._subscriber.close()
self.finished.set()
#todo: maybe move plugins to separate programs (flightgear, SBS1, etc.)
def main():
my_position = None
usage = "%prog: [options]"
@ -64,8 +64,8 @@ def main():
help="filename for Google Earth KML output")
parser.add_option("-P","--sbs1", action="store_true", default=False,
help="open an SBS-1-compatible server on port 30003")
parser.add_option("-w","--raw", action="store_true", default=False,
help="open a server outputting raw timestamped data on port 9988")
parser.add_option("-s","--servers", type="string", default="",
help="specify additional servers from which to take data in format tcp://x.x.x.x:y,tcp://....")
parser.add_option("-n","--no-print", action="store_true", default=False,
help="disable printing decoded packets to stdout")
parser.add_option("-l","--location", type="string", default=None,
@ -74,17 +74,19 @@ def main():
help="Use UDP source on specified port")
parser.add_option("-m","--multiplayer", type="string", default=None,
help="FlightGear server to send aircraft data, in format host:port")
parser.add_option("-t","--tcp", type="int", default=None,
help="Open a TCP server on this port to publish reports")
(options, args) = parser.parse_args()
#construct the radio
context = zmq.Context(1)
tb = air_modes.modes_radio(options, context)
relay = air_modes.zmq_pubsub_iface(context, subaddr="inproc://modes-radio-pub", pubaddr=None)
servers = options.servers.split(",") + ["inproc://modes-radio-pub"]
relay = air_modes.zmq_pubsub_iface(context, subaddr=servers[-1], pubaddr=None)
if options.location is not None:
reader = csv.reader([options.location], quoting=csv.QUOTE_NONNUMERIC)
my_position = reader.next()
my_position = [float(n) for n in options.location.split(",")]
if options.kml is not None:
dbname = 'adsb.db'

View File

@ -57,6 +57,10 @@ public:
gr_vector_int &ninput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items);
void set_rate(int channel_rate);
void set_threshold(float threshold_db);
float get_threshold(void);
};
#endif /* INCLUDED_AIR_MODES_PREAMBLE_H */

View File

@ -53,6 +53,8 @@ public:
int work (int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items);
void set_rate(int channel_rate);
};
#endif /* INCLUDED_AIR_MODES_slicer_H */

View File

@ -42,14 +42,8 @@ air_modes_preamble::air_modes_preamble(int channel_rate, float threshold_db) :
gr_make_io_signature2 (2, 2, sizeof(float), sizeof(float)), //stream 0 is received data, stream 1 is moving average for reference
gr_make_io_signature (1, 1, sizeof(float))) //the output packets
{
d_chip_rate = 2000000; //2Mchips per second
d_samples_per_chip = channel_rate / d_chip_rate; //must be integer number of samples per chip to work
d_samples_per_symbol = d_samples_per_chip * 2;
d_check_width = 120 * d_samples_per_symbol; //only search to this far from the end of the stream buffer
d_threshold_db = threshold_db;
d_threshold = powf(10., threshold_db/20.); //the level that the sample must be above the moving average in order to qualify as a pulse
d_secs_per_sample = 1.0 / channel_rate;
set_output_multiple(1+d_check_width*2);
set_rate(channel_rate);
set_threshold(threshold_db);
std::stringstream str;
str << name() << unique_id();
@ -58,6 +52,27 @@ air_modes_preamble::air_modes_preamble(int channel_rate, float threshold_db) :
set_history(d_samples_per_symbol);
}
void air_modes_preamble::set_rate(int channel_rate)
{
d_chip_rate = 2000000; //2Mchips per second
d_samples_per_chip = channel_rate / d_chip_rate; //must be integer number of samples per chip to work
d_samples_per_symbol = d_samples_per_chip * 2;
d_secs_per_sample = 1.0 / channel_rate;
d_check_width = 240 * d_samples_per_symbol; //only search to this far from the end of the stream buffer
set_output_multiple(1+d_check_width);
}
void air_modes_preamble::set_threshold(float threshold_db)
{
d_threshold_db = threshold_db;
d_threshold = powf(10., threshold_db/20.); //the level that the sample must be above the moving average in order to qualify as a pulse
}
float air_modes_preamble::get_threshold(void)
{
return d_threshold_db;
}
static void integrate_and_dump(float *out, const float *in, int chips, int samps_per_chip) {
for(int i=0; i<chips; i++) {
float acc = 0;
@ -169,7 +184,7 @@ int air_modes_preamble::general_work(int noutput_items,
if(!valid_preamble) continue;
//be sure we've got enough room in the input buffer to copy out a whole packet
if(ninputs-i < 240*d_samples_per_chip) {
if(ninputs-i < d_check_width) {
consume_each(std::max(i-1,0));
if(0) std::cout << "Preamble consumed " << std::max(i-1,0) << ", returned 0 (no room)" << std::endl;
return 0;

View File

@ -51,14 +51,17 @@ air_modes_slicer::air_modes_slicer(int channel_rate, gr_msg_queue_sptr queue) :
gr_make_io_signature (1, 1, sizeof(float)), //stream 0 is received data, stream 1 is binary preamble detector output
gr_make_io_signature (0, 0, 0) )
{
//initialize private data here
set_rate(channel_rate);
d_queue = queue;
}
void air_modes_slicer::set_rate(int channel_rate)
{
d_chip_rate = 2000000; //2Mchips per second
d_samples_per_chip = 2;//FIXME this is constant now channel_rate / d_chip_rate;
d_samples_per_symbol = d_samples_per_chip * 2;
d_check_width = 120 * d_samples_per_symbol; //how far you will have to look ahead
d_queue = queue;
set_output_multiple(d_check_width*2); //how do you specify buffer size for sinks?
d_check_width = 240 * d_samples_per_symbol; //how far you will have to look ahead
set_output_multiple(d_check_width); //how do you specify buffer size for sinks?
}
//this slicer is courtesy of Lincoln Labs. supposedly it is more resistant to mode A/C FRUIT.

View File

@ -25,6 +25,7 @@
from gnuradio import gr, gru, optfir, eng_notation, blks2
from gnuradio.eng_option import eng_option
from gnuradio.gr.pubsub import pubsub
from optparse import OptionParser
import air_modes
import zmq
@ -63,9 +64,10 @@ class radio_publisher(threading.Thread):
self.finished.set()
class modes_radio (gr.top_block):
class modes_radio (gr.top_block, pubsub):
def __init__(self, options, context):
gr.top_block.__init__(self)
pubsub.__init__(self)
self._options = options
self._queue = gr.msg_queue()
@ -75,10 +77,22 @@ class modes_radio (gr.top_block):
self._setup_source(options)
#TODO allow setting rate, threshold (drill down into slicer & preamble)
self.rx_path = air_modes.rx_path(rate, options.threshold, self._queue, options.pmf)
#now subscribe to set various options via pubsub
self.subscribe("FREQ", self.set_freq)
self.subscribe("GAIN", self.set_gain)
self.subscribe("RATE", self.set_rate)
self.subscribe("RATE", self.rx_path.set_rate)
self.subscribe("THRESHOLD", self.rx_path.set_threshold)
self.subscribe("PMF", self.rx_path.set_pmf)
self.publish("FREQ", self.get_freq)
self.publish("GAIN", self.get_gain)
self.publish("RATE", self.get_rate)
self.publish("THRESHOLD", self.rx_path.get_threshold)
self.publish("PMF", self.rx_path.get_pmf)
if use_resampler:
self.lpfiltcoeffs = gr.firdes.low_pass(1, 5*3.2e6, 1.6e6, 300e3)
self.resample = blks2.rational_resampler_ccf(interpolation=5, decimation=4, taps=self.lpfiltcoeffs)
@ -87,9 +101,11 @@ class modes_radio (gr.top_block):
self.connect(self._u, self.rx_path)
#Publish messages when they come back off the queue
#self._sender = radio_publisher(None, context, self._queue)
#TODO use address
self._sender = air_modes.zmq_pubsub_iface(context, subaddr=None, pubaddr="inproc://modes-radio-pub")
server_addr = ["inproc://modes-radio-pub"]
if options.tcp is not None:
server_addr += ["tcp://*:%i"] % options.tcp
self._sender = air_modes.zmq_pubsub_iface(context, subaddr=None, pubaddr=server_addr)
self._async_sender = gru.msgq_runner(self._queue, self.send)
def send(self, msg):
@ -131,9 +147,33 @@ class modes_radio (gr.top_block):
def set_gain(self, gain):
try:
self._u.set_gain(gain)
self._u.set_gain(gain)
except:
pass
def set_rate(self, rate):
try:
self._u.set_rate(rate)
except:
pass
def get_freq(self, freq):
try:
return self._u.get_center_freq(freq, 0)
except:
pass
def get_gain(self, gain):
try:
return self._u.get_gain()
except:
pass
def get_rate(self, rate):
try:
return self._u.get_rate()
except:
pass
def _setup_source(self, options):
if options.filename is None and options.udp is None and options.osmocom is None:

View File

@ -40,12 +40,12 @@ class rx_path(gr.hier_block2):
# Pulse matched filter for 0.5us pulses
if use_pmf:
self._pmf = gr.moving_average_ff(self._spc, 1.0/self._spc, self._rate)
self._pmf = gr.moving_average_ff(self._spc, 1.0/self._spc)#, self._rate)
self.connect(self._demod, self._pmf)
self._bb = self._pmf
# Establish baseline amplitude (noise, interference)
self._avg = gr.moving_average_ff(48*self._spc, 1.0/(48*self._spc), self._rate) # 3 preambles
self._avg = gr.moving_average_ff(48*self._spc, 1.0/(48*self._spc))#, self._rate) # 3 preambles
# Synchronize to Mode-S preamble
self._sync = air_modes_swig.modes_preamble(self._rate, self._threshold)
@ -58,3 +58,25 @@ class rx_path(gr.hier_block2):
self.connect(self._bb, (self._sync, 0))
self.connect(self._bb, self._avg, (self._sync, 1))
self.connect(self._sync, self._slicer)
def set_rate(self, rate):
self._sync.set_rate(rate)
self._slicer.set_rate(rate)
self._spc = int(rate/2e6)
self._avg.set_length_and_scale(48*self._spc, 1.0/(48*self._spc))
if self._bb != self._demod:
self._pmf.set_length_and_scale(self._spc, 1.0/self._spc)
def set_threshold(self, threshold):
self._sync.set_threshold(threshold)
def set_pmf(self, pmf):
#TODO must be done when top block is stopped
pass
def get_pmf(self, pmf):
return not (self._bb == self._demod)
def get_threshold(self, threshold):
return self._sync.get_threshold()

View File

@ -17,12 +17,11 @@
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
#this serves as a bridge between 0MQ subscriber and the GR pubsub callbacks interface
#this serves as a bridge between ZMQ subscriber and the GR pubsub callbacks interface
#creates a thread, publishes socket data to pubsub subscribers
#just a convenient way to create an aggregating socket with callbacks on receive
#can use this for inproc:// signalling with minimal overhead
#not sure if it's a good idea to use this yet
#TODO support multiple subscriber addresses for aggregation
import time
import threading
@ -39,10 +38,15 @@ class zmq_pubsub_iface(threading.Thread):
self._pubsocket = context.socket(zmq.PUB)
self._subaddr = subaddr
self._pubaddr = pubaddr
if type(self._subaddr) is str:
self._subaddr = [self._subaddr]
if type(self._pubaddr) is str:
self._pubaddr = [self._pubaddr]
self._sub_connected = False
self._pubsub = pubsub()
if pubaddr is not None:
self._pubsocket.bind(pubaddr)
if self._pubaddr is not None:
for addr in self._pubaddr:
self._pubsocket.bind(addr)
self._poller = zmq.Poller()
self._poller.register(self._subsocket, zmq.POLLIN)
@ -58,7 +62,8 @@ class zmq_pubsub_iface(threading.Thread):
if not self._sub_connected:
if not self._subaddr:
raise Exception("No subscriber address set")
self._subsocket.connect(self._subaddr)
for addr in self._subaddr:
self._subsocket.connect(addr)
self._sub_connected = True
self._subsocket.setsockopt(zmq.SUBSCRIBE, key)
self._pubsub.subscribe(key, subscriber)
@ -66,8 +71,6 @@ class zmq_pubsub_iface(threading.Thread):
def unsubscribe(self, key, subscriber):
self._subsocket.setsockopt(zmq.UNSUBSCRIBE, key)
self._pubsub.unsubscribe(key, subscriber)
#TODO keep track of subs (_sub_connected is value not bool)
#so you don't recv when zero subs
#executed from the thread context(s) of the caller(s)
#so we use a queue to push sending into the run loop
@ -90,14 +93,14 @@ class zmq_pubsub_iface(threading.Thread):
if self._sub_connected:
socks = dict(self._poller.poll(timeout=0))
while self._subsocket in socks \
and socks[self._subsocket] == zmq.POLLIN:
and socks[self._subsocket] == zmq.POLLIN:
[address, msg] = self._subsocket.recv_multipart()
self._pubsub[address] = msg
socks = dict(self._poller.poll(timeout=0))
#snooze
time.sleep(0.1)
#one more send loop to clean up on shutdown
#one more send loop to clean up on shutdown (can probably express this better above)
while not self._queue.empty():
self._pubsocket.send_multipart(self._queue.get())
@ -111,21 +114,27 @@ class zmq_pubsub_iface(threading.Thread):
self.finished.wait(0.2)
def pr(x):
print x
print x
if __name__ == "__main__":
#create socket pair
context = zmq.Context(1)
sock1 = zmq_pubsub_iface(context, subaddr="inproc://sock2-pub", pubaddr="inproc://sock1-pub")
sock2 = zmq_pubsub_iface(context, subaddr="inproc://sock1-pub", pubaddr="inproc://sock2-pub")
sock1 = zmq_pubsub_iface(context, subaddr="inproc://sock2-pub", pubaddr=["inproc://sock1-pub"])
sock2 = zmq_pubsub_iface(context, subaddr="inproc://sock1-pub", pubaddr=["inproc://sock2-pub", "tcp://*:5433"])
sock3 = zmq_pubsub_iface(context, subaddr="tcp://localhost:5433", pubaddr=None)
sock1.subscribe("data1", pr)
sock2.subscribe("data2", pr)
sock3.subscribe("data3", pr)
for i in range(10):
sock1["data2"] = "HOWDY"
#sock1["data2"] = "HOWDY"
#sock2["data3"] = "DRAW"
sock2["data1"] = "PARDNER"
time.sleep(0.01)
#time.sleep(0.1)
sock1.close()
sock2.close()
sock3.close()

View File

@ -23,6 +23,9 @@ air_modes_preamble_sptr air_make_modes_preamble (int channel_rate, float thresho
class air_modes_preamble : public gr_sync_block
{
set_rate(int channel_rate);
set_threshold(float threshold_db);
int get_threshold(void);
private:
air_modes_preamble (int channel_rate, float threshold_db);
};
@ -33,8 +36,9 @@ air_modes_slicer_sptr air_make_modes_slicer (int channel_rate, gr_msg_queue_sptr
class air_modes_slicer : public gr_block
{
set_rate(int channel_rate);
private:
air_modes_slicer (int channel_rate, gr_msg_queue_sptr queue);
air_modes_slicer (int channel_rate, gr_msg_queue_sptr queue);
};
// ----------------------------------------------------------------