# Copyright 2013 Nick Foster # # This file is part of gr-air-modes # # gr-air-modes is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3, or (at your option) # any later version. # # gr-air-modes is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with gr-air-modes; see the file COPYING. If not, write to # the Free Software Foundation, Inc., 51 Franklin Street, # Boston, MA 02110-1301, USA. #this serves as a bridge between ZMQ subscriber and the GR pubsub callbacks interface #creates a thread, publishes socket data to pubsub subscribers #just a convenient way to create an aggregating socket with callbacks on receive #can use this for inproc:// signalling with minimal overhead #not sure if it's a good idea to use this yet import time import threading import zmq from gnuradio.gr.pubsub import pubsub import Queue class zmq_pubsub_iface(threading.Thread): def __init__(self, context, subaddr=None, pubaddr=None): threading.Thread.__init__(self) #private data self._queue = Queue.Queue() self._subsocket = context.socket(zmq.SUB) self._pubsocket = context.socket(zmq.PUB) self._subaddr = subaddr self._pubaddr = pubaddr if type(self._subaddr) is str: self._subaddr = [self._subaddr] if type(self._pubaddr) is str: self._pubaddr = [self._pubaddr] self._sub_connected = False self._pubsub = pubsub() if self._pubaddr is not None: for addr in self._pubaddr: self._pubsocket.bind(addr) self._poller = zmq.Poller() self._poller.register(self._subsocket, zmq.POLLIN) #public data self.shutdown = threading.Event() self.finished = threading.Event() #init self.setDaemon(True) self.start() def subscribe(self, key, subscriber): if not self._sub_connected: if not self._subaddr: raise Exception("No subscriber address set") for addr in self._subaddr: self._subsocket.connect(addr) self._sub_connected = True self._subsocket.setsockopt(zmq.SUBSCRIBE, key) self._pubsub.subscribe(key, subscriber) def unsubscribe(self, key, subscriber): self._subsocket.setsockopt(zmq.UNSUBSCRIBE, key) self._pubsub.unsubscribe(key, subscriber) #executed from the thread context(s) of the caller(s) #so we use a queue to push sending into the run loop #since sockets must be used in the thread they were created in def __setitem__(self, key, val): if not self._pubaddr: raise Exception("No publisher address set") if not self.shutdown.is_set(): self._queue.put([key, val]) def __getitem__(self, key): return self._pubsub[key] def run(self): done = False while not self.shutdown.is_set() and not done: if self.shutdown.is_set(): done = True #send while not self._queue.empty(): self._pubsocket.send_multipart(self._queue.get()) #receive if self._sub_connected: socks = dict(self._poller.poll(timeout=0)) while self._subsocket in socks \ and socks[self._subsocket] == zmq.POLLIN: [address, msg] = self._subsocket.recv_multipart() self._pubsub[address] = msg socks = dict(self._poller.poll(timeout=0)) #snooze if not done: time.sleep(0.1) self._subsocket.close() self._pubsocket.close() self.finished.set() def close(self): self.shutdown.set() #self._queue.join() #why does this block forever self.finished.wait(0.2) def pr(x): print x if __name__ == "__main__": #create socket pair context = zmq.Context(1) sock1 = zmq_pubsub_iface(context, subaddr="inproc://sock2-pub", pubaddr="inproc://sock1-pub") sock2 = zmq_pubsub_iface(context, subaddr="inproc://sock1-pub", pubaddr=["inproc://sock2-pub", "tcp://*:5433"]) sock3 = zmq_pubsub_iface(context, subaddr="tcp://localhost:5433", pubaddr=None) sock1.subscribe("data1", pr) sock2.subscribe("data2", pr) sock3.subscribe("data3", pr) for i in range(10): sock1["data2"] = "HOWDY" sock2["data3"] = "DRAW" sock2["data1"] = "PARDNER" time.sleep(0.1) time.sleep(0.1) sock1.close() sock2.close() sock3.close()