Fix ZMQ socket interface (used for everything) for strange queue.Queue issues.

This commit is contained in:
Nick Foster 2019-10-15 13:11:05 -07:00
parent 37f45f8a4d
commit 2b10c06e15
2 changed files with 22 additions and 18 deletions

View File

@ -20,6 +20,7 @@
# #
from gnuradio import gr, blocks, filter from gnuradio import gr, blocks, filter
import air_modes
class rx_path(gr.hier_block2): class rx_path(gr.hier_block2):
@ -53,10 +54,10 @@ class rx_path(gr.hier_block2):
self._avg = blocks.moving_average_ff(48*self._spc, 1.0/(48*self._spc))#, self._rate) # 3 preambles self._avg = blocks.moving_average_ff(48*self._spc, 1.0/(48*self._spc))#, self._rate) # 3 preambles
# Synchronize to Mode-S preamble # Synchronize to Mode-S preamble
self._sync = air_modes_swig.preamble(self._rate, self._threshold) self._sync = air_modes.preamble(self._rate, self._threshold)
# Slice Mode-S bits and send to message queue # Slice Mode-S bits and send to message queue
self._slicer = air_modes_swig.slicer(self._queue) self._slicer = air_modes.slicer(self._queue)
# Wire up the flowgraph # Wire up the flowgraph
self.connect(self._bb, (self._sync, 0)) self.connect(self._bb, (self._sync, 0))

View File

@ -33,7 +33,7 @@ class zmq_pubsub_iface(threading.Thread):
def __init__(self, context, subaddr=None, pubaddr=None): def __init__(self, context, subaddr=None, pubaddr=None):
threading.Thread.__init__(self) threading.Thread.__init__(self)
#private data #private data
self._queue = queue.queue() self._queue = queue.Queue()
self._subsocket = context.socket(zmq.SUB) self._subsocket = context.socket(zmq.SUB)
self._pubsocket = context.socket(zmq.PUB) self._pubsocket = context.socket(zmq.PUB)
self._subaddr = subaddr self._subaddr = subaddr
@ -46,11 +46,11 @@ class zmq_pubsub_iface(threading.Thread):
self._pubsub = pubsub() self._pubsub = pubsub()
if self._pubaddr is not None: if self._pubaddr is not None:
for addr in self._pubaddr: for addr in self._pubaddr:
self._pubsocket.bind(addr) self._pubsocket.bind(addr.encode('ascii'))
self._poller = zmq.Poller() self._poller = zmq.Poller()
self._poller.register(self._subsocket, zmq.POLLIN) self._poller.register(self._subsocket, zmq.POLLIN)
#public data #public data
self.shutdown = threading.Event() self.shutdown = threading.Event()
self.finished = threading.Event() self.finished = threading.Event()
@ -63,14 +63,14 @@ class zmq_pubsub_iface(threading.Thread):
if not self._subaddr: if not self._subaddr:
raise Exception("No subscriber address set") raise Exception("No subscriber address set")
for addr in self._subaddr: for addr in self._subaddr:
self._subsocket.connect(addr) self._subsocket.connect(addr.encode('ascii'))
self._sub_connected = True self._sub_connected = True
self._subsocket.setsockopt(zmq.SUBSCRIBE, key) self._subsocket.setsockopt(zmq.SUBSCRIBE, key.encode('ascii'))
self._pubsub.subscribe(key, subscriber) self._pubsub.subscribe(key.encode('ascii'), subscriber)
def unsubscribe(self, key, subscriber): def unsubscribe(self, key, subscriber):
self._subsocket.setsockopt(zmq.UNSUBSCRIBE, key) self._subsocket.setsockopt(zmq.UNSUBSCRIBE, key.encode('ascii'))
self._pubsub.unsubscribe(key, subscriber) self._pubsub.unsubscribe(key.encode('ascii'), subscriber)
#executed from the thread context(s) of the caller(s) #executed from the thread context(s) of the caller(s)
#so we use a queue to push sending into the run loop #so we use a queue to push sending into the run loop
@ -79,10 +79,10 @@ class zmq_pubsub_iface(threading.Thread):
if not self._pubaddr: if not self._pubaddr:
raise Exception("No publisher address set") raise Exception("No publisher address set")
if not self.shutdown.is_set(): if not self.shutdown.is_set():
self._queue.put([key, val]) self._queue.put([key.encode('ascii'), val])
def __getitem__(self, key): def __getitem__(self, key):
return self._pubsub[key] return self._pubsub[key.encode('ascii')]
def run(self): def run(self):
done = False done = False
@ -90,16 +90,19 @@ class zmq_pubsub_iface(threading.Thread):
if self.shutdown.is_set(): if self.shutdown.is_set():
done = True done = True
#send #send
while not self._queue.empty(): while True:
self._pubsocket.send_multipart(self._queue.get()) try:
msg = self._queue.get(block=False)
self._pubsocket.send_multipart(msg)
except queue.Empty:
break
#receive #receive
if self._sub_connected: if self._sub_connected:
socks = dict(self._poller.poll(timeout=0)) socks = [s[0].underlying for s in self._poller.poll(timeout=0) if s[1] == zmq.POLLIN]
while self._subsocket in socks \ while self._subsocket.underlying in socks:
and socks[self._subsocket] == zmq.POLLIN:
[address, msg] = self._subsocket.recv_multipart() [address, msg] = self._subsocket.recv_multipart()
self._pubsub[address] = msg self._pubsub[address] = msg
socks = dict(self._poller.poll(timeout=0)) socks = [s[0].underlying for s in self._poller.poll(timeout=0) if s[1] == zmq.POLLIN]
#snooze #snooze
if not done: if not done:
time.sleep(0.1) time.sleep(0.1)