From b71c978e2722172140397f5ca024723e5ad34676 Mon Sep 17 00:00:00 2001 From: Nick Foster Date: Mon, 3 Jun 2013 08:38:26 -0400 Subject: [PATCH] New universal pubsub interface in zmq_socket.py. Needs more work. --- apps/modes_rx | 17 +++--- python/CMakeLists.txt | 1 + python/__init__.py | 1 + python/msprint.py | 7 ++- python/radio.py | 17 ++++-- python/sql.py | 4 +- python/zmq_socket.py | 133 ++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 161 insertions(+), 19 deletions(-) create mode 100644 python/zmq_socket.py diff --git a/apps/modes_rx b/apps/modes_rx index 29c9fc9..7f4b3b9 100755 --- a/apps/modes_rx +++ b/apps/modes_rx @@ -29,11 +29,11 @@ from air_modes.exceptions import * import zmq class screen_printer(threading.Thread): - def __init__(self, position, context, addr=None): + def __init__(self, position, context, addr=None, port=None): threading.Thread.__init__(self) self._subscriber = context.socket(zmq.SUB) if addr is not None: - self._subscriber.connect("tcp://%s" % addr) + self._subscriber.connect("tcp://%s:%i" % (addr, port)) else: self._subscriber.connect("inproc://modes-radio-pub") self._subscriber.setsockopt(zmq.SUBSCRIBE, "dl_data") @@ -45,7 +45,6 @@ class screen_printer(threading.Thread): self.start() def run(self): - queue_empty = False while not self.done.is_set(): [address, msg] = self._subscriber.recv_multipart() #blocking try: @@ -55,7 +54,6 @@ class screen_printer(threading.Thread): self._subscriber.close() self.finished.set() - def main(): my_position = None @@ -82,6 +80,7 @@ def main(): #construct the radio context = zmq.Context(1) tb = air_modes.modes_radio(options, context) + relay = air_modes.zmq_pubsub_iface(context, subaddr="inproc://modes-radio-pub", pubaddr=None) if options.location is not None: reader = csv.reader([options.location], quoting=csv.QUOTE_NONNUMERIC) @@ -94,7 +93,8 @@ def main(): printer = None if options.no_print is not True: - printer = screen_printer(my_position, context) + relay.subscribe("dl_data", air_modes.output_print(my_position).output) + #printer = screen_printer(my_position, context) # if options.multiplayer is not None: # [fghost, fgport] = options.multiplayer.split(':') @@ -105,13 +105,12 @@ def main(): # sbs1port = air_modes.output_sbs1(my_position, 30003) # tb.subscribe('dl_data', sbs1port.output) #updates.append(sbs1port.add_pending_conns) - tb.run() tb.cleanup() - if printer is not None: - printer.done.set() - printer.finished.wait(0.2) + relay.shutdown.set() + relay.finished.wait(0.2) + if options.kml is not None: sqldb.done = True #sqldb.join() diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index ae62909..b70d46c 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -46,6 +46,7 @@ GR_PYTHON_INSTALL( rx_path.py sbs1.py sql.py + zmq_socket.py Quaternion.py DESTINATION ${GR_PYTHON_DIR}/air_modes ) diff --git a/python/__init__.py b/python/__init__.py index 18f4372..1eaf4ab 100644 --- a/python/__init__.py +++ b/python/__init__.py @@ -52,6 +52,7 @@ from air_modes_swig import * # import any pure python here # from rx_path import rx_path +from zmq_socket import zmq_pubsub_iface from parse import parse,modes_reply from msprint import output_print from sql import output_sql diff --git a/python/msprint.py b/python/msprint.py index bf54217..d9af70d 100644 --- a/python/msprint.py +++ b/python/msprint.py @@ -69,9 +69,12 @@ class output_print(air_modes.parse): pass def output(self, msg): - parsed = self.parse(msg) - if parsed is not None: + try: + parsed = self.parse(msg) + if parsed is not None: print self.parse(msg) + except ADSBError: + pass def print0(self, shortdata, ecc): [vs, cc, sl, ri, altitude] = self.parse0(shortdata) diff --git a/python/radio.py b/python/radio.py index d3b1655..d5f27b8 100644 --- a/python/radio.py +++ b/python/radio.py @@ -47,14 +47,14 @@ class radio_publisher(threading.Thread): self._publisher.bind("tcp://*:%i" % port) self.setDaemon(True) - self.done = threading.Event() + self.shutdown = threading.Event() self.finished = threading.Event() self.start() def run(self): done_yet = False - while not self.done.is_set() and not done_yet: - if self.done.is_set(): #gives it another round after done is set + while not self.shutdown.is_set() and not done_yet: + if self.shutdown.is_set(): #gives it another round after done is set done_yet = True #so we can clean up the last of the queue while not self._queue.empty_p(): self._publisher.send_multipart([DOWNLINK_DATA_TYPE, self._queue.delete_head().to_string()]) @@ -87,7 +87,13 @@ class modes_radio (gr.top_block): self.connect(self._u, self.rx_path) #Publish messages when they come back off the queue - self._pubsub = radio_publisher(None, context, self._queue) + #self._sender = radio_publisher(None, context, self._queue) + #TODO use address + self._sender = air_modes.zmq_pubsub_iface(context, subaddr=None, pubaddr="inproc://modes-radio-pub") + self._async_sender = gru.msgq_runner(self._queue, self.send) + + def send(self, data): + self._sender["dl_data"] = data @staticmethod def add_radio_options(parser): @@ -200,5 +206,4 @@ class modes_radio (gr.top_block): print "Rate is %i" % (options.rate,) def cleanup(self): - self._pubsub.done.set() - self._pubsub.finished.wait(0.2) + self._sender.close() diff --git a/python/sql.py b/python/sql.py index 02dff5a..d9752cd 100644 --- a/python/sql.py +++ b/python/sql.py @@ -27,14 +27,14 @@ from air_modes.exceptions import * import zmq class output_sql(air_modes.parse, threading.Thread): - def __init__(self, mypos, filename, context, addr=None): + def __init__(self, mypos, filename, context, addr=None, port=None): threading.Thread.__init__(self) air_modes.parse.__init__(self, mypos) #init socket self._subscriber = context.socket(zmq.SUB) if addr is not None: - self._subscriber.connect("tcp://%s" % addr) + self._subscriber.connect("tcp://%s:%i" % (addr, port)) else: self._subscriber.connect("inproc://modes-radio-pub") self._subscriber.setsockopt(zmq.SUBSCRIBE, "dl_data") diff --git a/python/zmq_socket.py b/python/zmq_socket.py new file mode 100644 index 0000000..deba0df --- /dev/null +++ b/python/zmq_socket.py @@ -0,0 +1,133 @@ +# Copyright 2013 Nick Foster +# +# This file is part of gr-air-modes +# +# gr-air-modes is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# gr-air-modes is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with gr-air-modes; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. + +#this serves as a bridge between 0MQ subscriber and the GR pubsub callbacks interface +#creates a thread, publishes socket data to pubsub subscribers +#just a convenient way to create an aggregating socket with callbacks on receive +#can use this for inproc:// signalling with minimal overhead +#not sure if it's a good idea to use this yet +#TODO support multiple subscriber addresses for aggregation + +import time +import threading +import zmq +from gnuradio.gr.pubsub import pubsub +import Queue + +class zmq_pubsub_iface(threading.Thread): + def __init__(self, context, subaddr=None, pubaddr=None): + threading.Thread.__init__(self) + #private data + self._queue = Queue.Queue() + self._subsocket = context.socket(zmq.SUB) + self._pubsocket = context.socket(zmq.PUB) + self._subaddr = subaddr + self._pubaddr = pubaddr + self._sub_connected = False + #self._pub_connected = False + self._publock = threading.Lock() + self._pubsub = pubsub() + if pubaddr is not None: + self._pubsocket.bind(pubaddr) + + self._poller = zmq.Poller() + self._poller.register(self._subsocket, zmq.POLLIN) + + #public data + self.shutdown = threading.Event() + self.finished = threading.Event() + #init + self.setDaemon(True) + self.start() + + def subscribe(self, key, subscriber): + if not self._sub_connected: + if not self._subaddr: + raise Exception("No subscriber address set") + self._subsocket.connect(self._subaddr) + self._sub_connected = True + self._subsocket.setsockopt(zmq.SUBSCRIBE, key) + self._pubsub.subscribe(key, subscriber) + + def unsubscribe(self, key, subscriber): + self._subsocket.setsockopt(zmq.UNSUBSCRIBE, key) + self._pubsub.unsubscribe(key, subscriber) + #TODO keep track of subs (_sub_connected is value not bool) + #so you don't recv when zero subs + + #executed from the thread context(s) of the caller(s) + #so we use a queue to push sending into the run loop + #since sockets must be used in the thread they were created in + def __setitem__(self, key, val): + if not self._pubaddr: + raise Exception("No publisher address set") + with self._publock: + if not self.shutdown.is_set(): + self._queue.put([key, val.to_string()]) #TODO FIXME MSGQ + + def __getitem__(self, key): + return self._pubsub[key] + + def run(self): + while not self.shutdown.is_set(): + #send + while not self._queue.empty(): + self._pubsocket.send_multipart(self._queue.get()) + #receive + if self._sub_connected: + socks = dict(self._poller.poll(timeout=0)) + while self._subsocket in socks \ + and socks[self._subsocket] == zmq.POLLIN: + [address, msg] = self._subsocket.recv_multipart() + self._pubsub[address] = msg + #snooze + time.sleep(0.1) + + while not self._queue.empty(): + self._pubsocket.send_multipart(self._queue.get()) + + + self._subsocket.close() + self._pubsocket.close() + self.finished.set() + + def close(self): + self.shutdown.set() + #self._queue.join() #why does this block forever + self.finished.wait(0.2) + +def pr(x): + print x + +if __name__ == "__main__": + #create socket pair + context = zmq.Context(1) + sock1 = zmq_pubsub_iface(context, subaddr="inproc://sock2-pub", pubaddr="inproc://sock1-pub") + sock2 = zmq_pubsub_iface(context, subaddr="inproc://sock1-pub", pubaddr="inproc://sock2-pub") + + sock1.subscribe("data1", pr) + sock2.subscribe("data2", pr) + + for i in range(10): + sock1["data2"] = "HOWDY" + sock2["data1"] = "PARDNER" + time.sleep(0.01) + + sock1.close() + sock2.close()