diff --git a/apps/modes_rx b/apps/modes_rx index c4f9eb0..6a1fcc1 100755 --- a/apps/modes_rx +++ b/apps/modes_rx @@ -19,8 +19,6 @@ # Boston, MA 02110-1301, USA. # -my_position = None - from gnuradio.eng_option import eng_option from optparse import OptionParser import time, os, sys, threading @@ -28,11 +26,33 @@ from string import split, join import air_modes import csv from air_modes.exceptions import * +import zmq -def printraw(msg): - print msg +class screen_printer(threading.Thread): + def __init__(self, position, context): + threading.Thread.__init__(self) + self._subscriber = context.socket(zmq.SUB) + self._subscriber.connect("inproc://modes-radio-pub") + self._subscriber.setsockopt(zmq.SUBSCRIBE, "dl_data") -if __name__ == '__main__': + self._printer = air_modes.output_print(position) + self.done = False + self.setDaemon(True) + self.start() + + def run(self): + while not self.done: + [address, msg] = self._subscriber.recv_multipart() #blocking + try: + self._printer.output(msg) + except ADSBError: + pass + + self._subscriber.close() + + +def main(): + my_position = None usage = "%prog: [options] output filename" parser = OptionParser(option_class=eng_option, usage=usage) parser.add_option("-R", "--subdev", type="string", @@ -49,8 +69,6 @@ if __name__ == '__main__': help="set ADC sample rate [default=%default]") parser.add_option("-T", "--threshold", type="eng_float", default=5.0, help="set pulse detection threshold above noise in dB [default=%default]") - parser.add_option("-a","--output-all", action="store_true", default=False, - help="output all frames") parser.add_option("-F","--filename", type="string", default=None, help="read data from file instead of USRP") parser.add_option("-K","--kml", type="string", default=None, @@ -67,65 +85,64 @@ if __name__ == '__main__': help="Use UDP source on specified port") parser.add_option("-m","--multiplayer", type="string", default=None, help="FlightGear server to send aircraft data, in format host:port") - parser.add_option("-d","--rtlsdr", action="store_true", default=False, - help="Use RTLSDR dongle instead of UHD source") + parser.add_option("-o","--osmocom", action="store_true", default=False, + help="Use gr-osmocom source (RTLSDR or HackRF) instead of UHD source") parser.add_option("-p","--pmf", action="store_true", default=False, help="Use pulse matched filtering") - + (options, args) = parser.parse_args() - tb = air_modes.modes_radio(options) - - updates = [] + #construct the radio + context = zmq.Context(1) + tb = air_modes.modes_radio(options, context) + tb.start() + #tb will publish via ZMQ on the "radio-pub" feed + #clients connect to tb socket and do things. + #the radio-pub feed is channelized into "dl_data" and (not yet) + #"ul_data" which correspond to downlink and uplink data if options.location is not None: reader = csv.reader([options.location], quoting=csv.QUOTE_NONNUMERIC) my_position = reader.next() - if options.raw is True: - rawport = air_modes.raw_server(9988) #port - tb.subscribe('dl_data', rawport.output) - tb.subscribe('dl_data', printraw) - updates.append(rawport.add_pending_conns) - if options.kml is not None: #we spawn a thread to run every 30 seconds (or whatever) to generate KML dbname = 'adsb.db' - lock = threading.Lock() - sqldb = air_modes.output_sql(my_position, dbname, lock) #input into the db - kmlgen = air_modes.output_kml(options.kml, dbname, my_position, lock) #create a KML generating thread to read from the db - tb.subscribe('dl_data', sqldb.output) + sqldb = air_modes.output_sql(my_position, dbname, context) #input into the db + #kmlgen = air_modes.output_kml(options.kml, dbname, my_position, lock) #create a KML generating thread to read from the db + #tb.subscribe('dl_data', sqldb.output) - if options.sbs1 is True: - sbs1port = air_modes.output_sbs1(my_position, 30003) - tb.subscribe('dl_data', sbs1port.output) - updates.append(sbs1port.add_pending_conns) - +# if options.sbs1 is True: +# sbs1port = air_modes.output_sbs1(my_position, 30003) +# tb.subscribe('dl_data', sbs1port.output) + #updates.append(sbs1port.add_pending_conns) + + printer = None if options.no_print is not True: - tb.subscribe('dl_data', air_modes.output_print(my_position).output) + printer = screen_printer(my_position, context) - if options.multiplayer is not None: - [fghost, fgport] = options.multiplayer.split(':') - fgout = air_modes.output_flightgear(my_position, fghost, int(fgport)) - tb.subscribe('dl_data', fgout.output) - - tb.start() - - while 1: - try: - #the update registry is really for the SBS1 and raw server plugins -- we're looking for new TCP connections. - #i think we have to do this here rather than in the output handler because otherwise connections will stack up - #until the next output arrives - for update in updates: - update() - time.sleep(0.1) - - except KeyboardInterrupt: - break - - tb.stop() - tb.wait() - if options.kml is not None: - kmlgen.done = True +# if options.multiplayer is not None: +# [fghost, fgport] = options.multiplayer.split(':') +# fgout = air_modes.output_flightgear(my_position, fghost, int(fgport)) +# tb.subscribe('dl_data', fgout.output) + #start an updater thread to handle adding/removing TCP connections + #TODO this can be removed when we start using 0MQ +# updater = threading.Thread(target=timed_callback, args=(updates,)) +# updater.setDaemon(True) +# updater.start() + tb.wait() + + if printer is not None: + printer.done = True + #printer.join() + if options.kml is not None: + sqldb.done = True + #sqldb.join() + + time.sleep(0.1) + + +if __name__ == '__main__': + main() diff --git a/python/kml.py b/python/kml.py index 908a4ab..973f070 100644 --- a/python/kml.py +++ b/python/kml.py @@ -39,7 +39,7 @@ class output_kml(threading.Thread): self._db = sqlite3.connect(self._dbname) #read from the db while self.done is False: self.writekml() - time.sleep(self._timeout) + time.sleep(self._timeout) self.done = True self._db.close() diff --git a/python/radio.py b/python/radio.py index 6449483..7aeea32 100644 --- a/python/radio.py +++ b/python/radio.py @@ -25,15 +25,39 @@ from gnuradio import gr, gru, optfir, eng_notation, blks2 from gnuradio.eng_option import eng_option -from gnuradio.gr.pubsub import pubsub import air_modes +import zmq +import threading +import time -DOWNLINK_DATA_TYPE = 'dl_data' +DOWNLINK_DATA_TYPE = "dl_data" -class modes_radio (gr.top_block, pubsub): - def __init__(self, options): +#ZMQ message publisher. +#TODO: limit high water mark +#TODO: limit number of subscribers +class radio_publisher(threading.Thread): + def __init__(self, port, context, queue): + threading.Thread.__init__(self) + self._queue = queue + self._publisher = context.socket(zmq.PUB) + if port is None: + self._publisher.bind("inproc://modes-radio-pub") + else: + self._publisher.bind("tcp://*:%i" % port) + + self.setDaemon(True) + self.done = False + + def run(self): + while self.done is False: + while not self._queue.empty_p(): + self._publisher.send_multipart([DOWNLINK_DATA_TYPE, self._queue.delete_head().to_string()]) + time.sleep(0.1) #can use time.sleep(0) to yield, but it'll suck a whole CPU + + +class modes_radio (gr.top_block): + def __init__(self, options, context): gr.top_block.__init__(self) - pubsub.__init__(self) self._options = options self._queue = gr.msg_queue() @@ -43,14 +67,6 @@ class modes_radio (gr.top_block, pubsub): self._setup_source(options) - self.subscribe('gain', self.set_gain) - self.subscribe('freq', self.set_freq) - - self['gain'] = options.gain - self['freq'] = options.freq - self['rate'] = options.rate - self['filename'] = options.filename - #TODO allow setting rate, threshold (drill down into slicer & preamble) self.rx_path = air_modes.rx_path(rate, options.threshold, self._queue, options.pmf) @@ -63,10 +79,8 @@ class modes_radio (gr.top_block, pubsub): self.connect(self._u, self.rx_path) #Publish messages when they come back off the queue - self._async_rcv = gru.msgq_runner(self._queue, self.async_callback) - - def async_callback(self, msg): - self[DOWNLINK_DATA_TYPE] = msg.to_string() + self._pubsub = radio_publisher(None, context, self._queue) + self._pubsub.start() #these are wrapped with try/except because file sources and udp sources #don't have set_center_freq/set_gain functions. this should check to see @@ -85,7 +99,7 @@ class modes_radio (gr.top_block, pubsub): pass def _setup_source(self, options): - if options.filename is None and options.udp is None and options.rtlsdr is None: + if options.filename is None and options.udp is None and options.osmocom is None: #UHD source by default from gnuradio import uhd self._u = uhd.single_usrp_source(options.args, uhd.io_type_t.COMPLEX_FLOAT32, 1) @@ -105,8 +119,8 @@ class modes_radio (gr.top_block, pubsub): if options.antenna is not None: self._u.set_antenna(options.antenna) - self._u.set_samp_rate(rate) - rate = int(self._u.get_samp_rate()) #retrieve actual + self._u.set_samp_rate(options.rate) + options.rate = int(self._u.get_samp_rate()) #retrieve actual if options.gain is None: #set to halfway g = self._u.get_gain_range() @@ -115,18 +129,26 @@ class modes_radio (gr.top_block, pubsub): print "Setting gain to %i" % options.gain self._u.set_gain(options.gain) print "Gain is %i" % self._u.get_gain() - - elif options.rtlsdr: #RTLSDR dongle + + #TODO: detect if you're using an RTLSDR or Jawbreaker + #and set up accordingly. + #ALSO TODO: Actually set gain appropriately using gain bins in HackRF driver. + elif options.osmocom: #RTLSDR dongle or HackRF Jawbreaker import osmosdr self._u = osmosdr.source_c(options.args) - self._u.set_sample_rate(3.2e6) #fixed for RTL dongles +# self._u.set_sample_rate(3.2e6) #fixed for RTL dongles + self._u.set_sample_rate(options.rate) if not self._u.set_center_freq(options.freq): print "Failed to set initial frequency" self._u.set_gain_mode(0) #manual gain mode if options.gain is None: options.gain = 34 - +###DO NOT COMMIT + self._u.set_gain(14, "RF", 0) + self._u.set_gain(40, "IF", 0) + self._u.set_gain(14, "RF", 0) +###DO NOT COMMIT self._u.set_gain(options.gain) print "Gain is %i" % self._u.get_gain() diff --git a/python/sql.py b/python/sql.py index 585e35d..d41f01f 100644 --- a/python/sql.py +++ b/python/sql.py @@ -24,18 +24,25 @@ from string import split, join import air_modes import sqlite3 from air_modes.exceptions import * +import zmq -class output_sql(air_modes.parse): - def __init__(self, mypos, filename, lock): +class output_sql(air_modes.parse, threading.Thread): + def __init__(self, mypos, filename, context): + threading.Thread.__init__(self) air_modes.parse.__init__(self, mypos) - self._lock = lock + #init socket + self._subscriber = context.socket(zmq.SUB) + self._subscriber.connect("inproc://modes-radio-pub") #TODO allow spec addr + self._subscriber.setsockopt(zmq.SUBSCRIBE, "dl_data") + + self._lock = threading.Lock() with self._lock: #create the database self.filename = filename - self.db = sqlite3.connect(filename) + self._db = sqlite3.connect(filename) #now execute a schema to create the tables you need - c = self.db.cursor() + c = self._db.cursor() query = """CREATE TABLE IF NOT EXISTS "positions" ( "icao" INTEGER KEY NOT NULL, "seen" TEXT NOT NULL, @@ -58,30 +65,42 @@ class output_sql(air_modes.parse): );""" c.execute(query) c.close() - self.db.commit() + self._db.commit() #we close the db conn now to reopen it in the output() thread context. - self.db.close() - self.db = None + self._db.close() + self._db = None - def __del__(self): - self.db = None + self.setDaemon(True) + self.done = False + self.start() - def output(self, message): + def run(self): + while not self.done: + [address, msg] = self._subscriber.recv_multipart() #blocking + try: + self.insert(msg) + except ADSBError: + pass + + self._subscriber.close() + self._db = None + + def insert(self, message): with self._lock: try: #we're checking to see if the db is empty, and creating the db object #if it is. the reason for this is so that the db writing is done within #the thread context of output(), rather than the thread context of the - #constructor. that way you can spawn a thread to do output(). - if self.db is None: - self.db = sqlite3.connect(self.filename) + #constructor. + if self._db is None: + self._db = sqlite3.connect(self.filename) query = self.make_insert_query(message) if query is not None: - c = self.db.cursor() + c = self._db.cursor() c.execute(query) c.close() - self.db.commit() + self._db.commit() except ADSBError: pass