New universal pubsub interface in zmq_socket.py. Needs more work.

This commit is contained in:
Nick Foster 2013-06-03 08:38:26 -04:00
parent 33874893b7
commit b71c978e27
7 changed files with 161 additions and 19 deletions

View File

@ -29,11 +29,11 @@ from air_modes.exceptions import *
import zmq
class screen_printer(threading.Thread):
def __init__(self, position, context, addr=None):
def __init__(self, position, context, addr=None, port=None):
threading.Thread.__init__(self)
self._subscriber = context.socket(zmq.SUB)
if addr is not None:
self._subscriber.connect("tcp://%s" % addr)
self._subscriber.connect("tcp://%s:%i" % (addr, port))
else:
self._subscriber.connect("inproc://modes-radio-pub")
self._subscriber.setsockopt(zmq.SUBSCRIBE, "dl_data")
@ -45,7 +45,6 @@ class screen_printer(threading.Thread):
self.start()
def run(self):
queue_empty = False
while not self.done.is_set():
[address, msg] = self._subscriber.recv_multipart() #blocking
try:
@ -55,7 +54,6 @@ class screen_printer(threading.Thread):
self._subscriber.close()
self.finished.set()
def main():
my_position = None
@ -82,6 +80,7 @@ def main():
#construct the radio
context = zmq.Context(1)
tb = air_modes.modes_radio(options, context)
relay = air_modes.zmq_pubsub_iface(context, subaddr="inproc://modes-radio-pub", pubaddr=None)
if options.location is not None:
reader = csv.reader([options.location], quoting=csv.QUOTE_NONNUMERIC)
@ -94,7 +93,8 @@ def main():
printer = None
if options.no_print is not True:
printer = screen_printer(my_position, context)
relay.subscribe("dl_data", air_modes.output_print(my_position).output)
#printer = screen_printer(my_position, context)
# if options.multiplayer is not None:
# [fghost, fgport] = options.multiplayer.split(':')
@ -105,13 +105,12 @@ def main():
# sbs1port = air_modes.output_sbs1(my_position, 30003)
# tb.subscribe('dl_data', sbs1port.output)
#updates.append(sbs1port.add_pending_conns)
tb.run()
tb.cleanup()
if printer is not None:
printer.done.set()
printer.finished.wait(0.2)
relay.shutdown.set()
relay.finished.wait(0.2)
if options.kml is not None:
sqldb.done = True
#sqldb.join()

View File

@ -46,6 +46,7 @@ GR_PYTHON_INSTALL(
rx_path.py
sbs1.py
sql.py
zmq_socket.py
Quaternion.py
DESTINATION ${GR_PYTHON_DIR}/air_modes
)

View File

@ -52,6 +52,7 @@ from air_modes_swig import *
# import any pure python here
#
from rx_path import rx_path
from zmq_socket import zmq_pubsub_iface
from parse import parse,modes_reply
from msprint import output_print
from sql import output_sql

View File

@ -69,9 +69,12 @@ class output_print(air_modes.parse):
pass
def output(self, msg):
parsed = self.parse(msg)
if parsed is not None:
try:
parsed = self.parse(msg)
if parsed is not None:
print self.parse(msg)
except ADSBError:
pass
def print0(self, shortdata, ecc):
[vs, cc, sl, ri, altitude] = self.parse0(shortdata)

View File

@ -47,14 +47,14 @@ class radio_publisher(threading.Thread):
self._publisher.bind("tcp://*:%i" % port)
self.setDaemon(True)
self.done = threading.Event()
self.shutdown = threading.Event()
self.finished = threading.Event()
self.start()
def run(self):
done_yet = False
while not self.done.is_set() and not done_yet:
if self.done.is_set(): #gives it another round after done is set
while not self.shutdown.is_set() and not done_yet:
if self.shutdown.is_set(): #gives it another round after done is set
done_yet = True #so we can clean up the last of the queue
while not self._queue.empty_p():
self._publisher.send_multipart([DOWNLINK_DATA_TYPE, self._queue.delete_head().to_string()])
@ -87,7 +87,13 @@ class modes_radio (gr.top_block):
self.connect(self._u, self.rx_path)
#Publish messages when they come back off the queue
self._pubsub = radio_publisher(None, context, self._queue)
#self._sender = radio_publisher(None, context, self._queue)
#TODO use address
self._sender = air_modes.zmq_pubsub_iface(context, subaddr=None, pubaddr="inproc://modes-radio-pub")
self._async_sender = gru.msgq_runner(self._queue, self.send)
def send(self, data):
self._sender["dl_data"] = data
@staticmethod
def add_radio_options(parser):
@ -200,5 +206,4 @@ class modes_radio (gr.top_block):
print "Rate is %i" % (options.rate,)
def cleanup(self):
self._pubsub.done.set()
self._pubsub.finished.wait(0.2)
self._sender.close()

View File

@ -27,14 +27,14 @@ from air_modes.exceptions import *
import zmq
class output_sql(air_modes.parse, threading.Thread):
def __init__(self, mypos, filename, context, addr=None):
def __init__(self, mypos, filename, context, addr=None, port=None):
threading.Thread.__init__(self)
air_modes.parse.__init__(self, mypos)
#init socket
self._subscriber = context.socket(zmq.SUB)
if addr is not None:
self._subscriber.connect("tcp://%s" % addr)
self._subscriber.connect("tcp://%s:%i" % (addr, port))
else:
self._subscriber.connect("inproc://modes-radio-pub")
self._subscriber.setsockopt(zmq.SUBSCRIBE, "dl_data")

133
python/zmq_socket.py Normal file
View File

@ -0,0 +1,133 @@
# Copyright 2013 Nick Foster
#
# This file is part of gr-air-modes
#
# gr-air-modes is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3, or (at your option)
# any later version.
#
# gr-air-modes is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with gr-air-modes; see the file COPYING. If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
#this serves as a bridge between 0MQ subscriber and the GR pubsub callbacks interface
#creates a thread, publishes socket data to pubsub subscribers
#just a convenient way to create an aggregating socket with callbacks on receive
#can use this for inproc:// signalling with minimal overhead
#not sure if it's a good idea to use this yet
#TODO support multiple subscriber addresses for aggregation
import time
import threading
import zmq
from gnuradio.gr.pubsub import pubsub
import Queue
class zmq_pubsub_iface(threading.Thread):
def __init__(self, context, subaddr=None, pubaddr=None):
threading.Thread.__init__(self)
#private data
self._queue = Queue.Queue()
self._subsocket = context.socket(zmq.SUB)
self._pubsocket = context.socket(zmq.PUB)
self._subaddr = subaddr
self._pubaddr = pubaddr
self._sub_connected = False
#self._pub_connected = False
self._publock = threading.Lock()
self._pubsub = pubsub()
if pubaddr is not None:
self._pubsocket.bind(pubaddr)
self._poller = zmq.Poller()
self._poller.register(self._subsocket, zmq.POLLIN)
#public data
self.shutdown = threading.Event()
self.finished = threading.Event()
#init
self.setDaemon(True)
self.start()
def subscribe(self, key, subscriber):
if not self._sub_connected:
if not self._subaddr:
raise Exception("No subscriber address set")
self._subsocket.connect(self._subaddr)
self._sub_connected = True
self._subsocket.setsockopt(zmq.SUBSCRIBE, key)
self._pubsub.subscribe(key, subscriber)
def unsubscribe(self, key, subscriber):
self._subsocket.setsockopt(zmq.UNSUBSCRIBE, key)
self._pubsub.unsubscribe(key, subscriber)
#TODO keep track of subs (_sub_connected is value not bool)
#so you don't recv when zero subs
#executed from the thread context(s) of the caller(s)
#so we use a queue to push sending into the run loop
#since sockets must be used in the thread they were created in
def __setitem__(self, key, val):
if not self._pubaddr:
raise Exception("No publisher address set")
with self._publock:
if not self.shutdown.is_set():
self._queue.put([key, val.to_string()]) #TODO FIXME MSGQ
def __getitem__(self, key):
return self._pubsub[key]
def run(self):
while not self.shutdown.is_set():
#send
while not self._queue.empty():
self._pubsocket.send_multipart(self._queue.get())
#receive
if self._sub_connected:
socks = dict(self._poller.poll(timeout=0))
while self._subsocket in socks \
and socks[self._subsocket] == zmq.POLLIN:
[address, msg] = self._subsocket.recv_multipart()
self._pubsub[address] = msg
#snooze
time.sleep(0.1)
while not self._queue.empty():
self._pubsocket.send_multipart(self._queue.get())
self._subsocket.close()
self._pubsocket.close()
self.finished.set()
def close(self):
self.shutdown.set()
#self._queue.join() #why does this block forever
self.finished.wait(0.2)
def pr(x):
print x
if __name__ == "__main__":
#create socket pair
context = zmq.Context(1)
sock1 = zmq_pubsub_iface(context, subaddr="inproc://sock2-pub", pubaddr="inproc://sock1-pub")
sock2 = zmq_pubsub_iface(context, subaddr="inproc://sock1-pub", pubaddr="inproc://sock2-pub")
sock1.subscribe("data1", pr)
sock2.subscribe("data2", pr)
for i in range(10):
sock1["data2"] = "HOWDY"
sock2["data1"] = "PARDNER"
time.sleep(0.01)
sock1.close()
sock2.close()