diff --git a/python/rx_path.py b/python/rx_path.py index 41914c6..9fa6cb4 100644 --- a/python/rx_path.py +++ b/python/rx_path.py @@ -20,6 +20,7 @@ # from gnuradio import gr, blocks, filter +import air_modes class rx_path(gr.hier_block2): @@ -53,10 +54,10 @@ class rx_path(gr.hier_block2): self._avg = blocks.moving_average_ff(48*self._spc, 1.0/(48*self._spc))#, self._rate) # 3 preambles # Synchronize to Mode-S preamble - self._sync = air_modes_swig.preamble(self._rate, self._threshold) + self._sync = air_modes.preamble(self._rate, self._threshold) # Slice Mode-S bits and send to message queue - self._slicer = air_modes_swig.slicer(self._queue) + self._slicer = air_modes.slicer(self._queue) # Wire up the flowgraph self.connect(self._bb, (self._sync, 0)) diff --git a/python/zmq_socket.py b/python/zmq_socket.py index 802e9f7..bab450a 100644 --- a/python/zmq_socket.py +++ b/python/zmq_socket.py @@ -33,7 +33,7 @@ class zmq_pubsub_iface(threading.Thread): def __init__(self, context, subaddr=None, pubaddr=None): threading.Thread.__init__(self) #private data - self._queue = queue.queue() + self._queue = queue.Queue() self._subsocket = context.socket(zmq.SUB) self._pubsocket = context.socket(zmq.PUB) self._subaddr = subaddr @@ -46,11 +46,11 @@ class zmq_pubsub_iface(threading.Thread): self._pubsub = pubsub() if self._pubaddr is not None: for addr in self._pubaddr: - self._pubsocket.bind(addr) + self._pubsocket.bind(addr.encode('ascii')) self._poller = zmq.Poller() self._poller.register(self._subsocket, zmq.POLLIN) - + #public data self.shutdown = threading.Event() self.finished = threading.Event() @@ -63,14 +63,14 @@ class zmq_pubsub_iface(threading.Thread): if not self._subaddr: raise Exception("No subscriber address set") for addr in self._subaddr: - self._subsocket.connect(addr) + self._subsocket.connect(addr.encode('ascii')) self._sub_connected = True - self._subsocket.setsockopt(zmq.SUBSCRIBE, key) - self._pubsub.subscribe(key, subscriber) + self._subsocket.setsockopt(zmq.SUBSCRIBE, key.encode('ascii')) + self._pubsub.subscribe(key.encode('ascii'), subscriber) def unsubscribe(self, key, subscriber): - self._subsocket.setsockopt(zmq.UNSUBSCRIBE, key) - self._pubsub.unsubscribe(key, subscriber) + self._subsocket.setsockopt(zmq.UNSUBSCRIBE, key.encode('ascii')) + self._pubsub.unsubscribe(key.encode('ascii'), subscriber) #executed from the thread context(s) of the caller(s) #so we use a queue to push sending into the run loop @@ -79,10 +79,10 @@ class zmq_pubsub_iface(threading.Thread): if not self._pubaddr: raise Exception("No publisher address set") if not self.shutdown.is_set(): - self._queue.put([key, val]) + self._queue.put([key.encode('ascii'), val]) def __getitem__(self, key): - return self._pubsub[key] + return self._pubsub[key.encode('ascii')] def run(self): done = False @@ -90,16 +90,19 @@ class zmq_pubsub_iface(threading.Thread): if self.shutdown.is_set(): done = True #send - while not self._queue.empty(): - self._pubsocket.send_multipart(self._queue.get()) + while True: + try: + msg = self._queue.get(block=False) + self._pubsocket.send_multipart(msg) + except queue.Empty: + break #receive if self._sub_connected: - socks = dict(self._poller.poll(timeout=0)) - while self._subsocket in socks \ - and socks[self._subsocket] == zmq.POLLIN: + socks = [s[0].underlying for s in self._poller.poll(timeout=0) if s[1] == zmq.POLLIN] + while self._subsocket.underlying in socks: [address, msg] = self._subsocket.recv_multipart() self._pubsub[address] = msg - socks = dict(self._poller.poll(timeout=0)) + socks = [s[0].underlying for s in self._poller.poll(timeout=0) if s[1] == zmq.POLLIN] #snooze if not done: time.sleep(0.1)