improve multiprocessing efficiency

This commit is contained in:
Junzi Sun 2019-08-26 12:00:14 +02:00
parent 86f302f05e
commit cfcd21b692
6 changed files with 78 additions and 50 deletions

View File

@ -26,8 +26,8 @@ class RtlReader(object):
# sdr.freq_correction = 75 # sdr.freq_correction = 75
self.debug = kwargs.get("debug", False) self.debug = kwargs.get("debug", False)
self.raw_event = None self.raw_pipe_in = None
self.raw_queue = None self.stop_flag = False
def _process_buffer(self): def _process_buffer(self):
messages = [] messages = []
@ -129,7 +129,7 @@ class RtlReader(object):
if len(self.signal_buffer) >= buffer_size: if len(self.signal_buffer) >= buffer_size:
messages = self._process_buffer() messages = self._process_buffer()
self.handle_messages(messages, self.raw_event, self.raw_queue) self.handle_messages(messages)
def handle_messages(self, messages): def handle_messages(self, messages):
"""re-implement this method to handle the messages""" """re-implement this method to handle the messages"""
@ -137,12 +137,12 @@ class RtlReader(object):
# print("%15.9f %s" % (t, msg)) # print("%15.9f %s" % (t, msg))
pass pass
def stop(self): def stop(self, *args, **kwargs):
self.sdr.cancel_read_async() self.sdr.cancel_read_async()
def run(self, raw_event=None, raw_queue=None): def run(self, raw_pipe_in=None, stop_flag=None):
self.raw_event = raw_event self.raw_pipe_in = raw_pipe_in
self.raw_queue = raw_queue self.stop_flag = stop_flag
self.sdr.read_samples_async(self._read_callback, read_size) self.sdr.read_samples_async(self._read_callback, read_size)
# count = 1 # count = 1
@ -153,7 +153,10 @@ class RtlReader(object):
if __name__ == "__main__": if __name__ == "__main__":
import signal
rtl = RtlReader() rtl = RtlReader()
signal.signal(signal.SIGINT, rtl.stop)
rtl.debug = True rtl.debug = True
rtl.start() rtl.run()
rtl.join()

View File

@ -26,12 +26,18 @@ class TcpClient(object):
print("datatype must be either raw, beast or skysense") print("datatype must be either raw, beast or skysense")
os._exit(1) os._exit(1)
self.raw_pipe_in = None
self.stop_flag = False
def connect(self): def connect(self):
self.socket = zmq.Context().socket(zmq.STREAM) self.socket = zmq.Context().socket(zmq.STREAM)
self.socket.setsockopt(zmq.LINGER, 0) self.socket.setsockopt(zmq.LINGER, 0)
self.socket.setsockopt(zmq.RCVTIMEO, 10000) self.socket.setsockopt(zmq.RCVTIMEO, 10000)
self.socket.connect("tcp://%s:%s" % (self.host, self.port)) self.socket.connect("tcp://%s:%s" % (self.host, self.port))
def stop(self):
self.socket.disconnect()
def read_raw_buffer(self): def read_raw_buffer(self):
""" Read raw ADS-B data type. """ Read raw ADS-B data type.
@ -249,12 +255,14 @@ class TcpClient(object):
self.buffer = self.buffer[1:] self.buffer = self.buffer[1:]
return messages return messages
def handle_messages(self, messages, raw_event=None, raw_queue=None): def handle_messages(self, messages):
"""re-implement this method to handle the messages""" """re-implement this method to handle the messages"""
for msg, t in messages: for msg, t in messages:
print("%15.9f %s" % (t, msg)) print("%15.9f %s" % (t, msg))
def run(self, raw_event=None, raw_queue=None): def run(self, raw_pipe_in=None, stop_flag=None):
self.raw_pipe_in = raw_pipe_in
self.stop_flag = stop_flag
self.connect() self.connect()
while True: while True:
@ -282,9 +290,8 @@ class TcpClient(object):
if not messages: if not messages:
continue continue
else: else:
self.handle_messages(messages, raw_event, raw_queue) self.handle_messages(messages)
time.sleep(0.001)
except Exception as e: except Exception as e:
# Provides the user an option to supply the environment # Provides the user an option to supply the environment
# variable PYMODES_DEBUG to halt the execution # variable PYMODES_DEBUG to halt the execution
@ -298,6 +305,7 @@ class TcpClient(object):
try: try:
sock = self.connect() sock = self.connect()
time.sleep(1)
except Exception as e: except Exception as e:
print("Unexpected Error:", e) print("Unexpected Error:", e)

View File

@ -6,7 +6,7 @@ import csv
import pyModeS as pms import pyModeS as pms
class Stream: class Decode:
def __init__(self, latlon=None, dumpto=None): def __init__(self, latlon=None, dumpto=None):
self.acs = dict() self.acs = dict()
@ -267,18 +267,22 @@ class Stream:
acs = self.acs acs = self.acs
return acs return acs
def run(self, raw_event, ac_event, raw_queue, aircraft_queue): def run(self, raw_pipe_out, ac_pipe_in):
local_buffer = []
while True: while True:
if raw_event.is_set(): while raw_pipe_out.poll():
data = raw_queue.get() data = raw_pipe_out.recv()
local_buffer.append(data)
for data in local_buffer:
self.process_raw( self.process_raw(
data["adsb_ts"], data["adsb_ts"],
data["adsb_msg"], data["adsb_msg"],
data["commb_ts"], data["commb_ts"],
data["commb_msg"], data["commb_msg"],
) )
local_buffer = []
aircraft_queue.put(self.get_aircraft()) acs = self.get_aircraft()
ac_event.set() ac_pipe_in.send(acs)
raw_event.clear()
time.sleep(0.001) time.sleep(0.001)

View File

@ -7,7 +7,7 @@ import argparse
import curses import curses
import signal import signal
import multiprocessing import multiprocessing
from pyModeS.streamer.stream import Stream from pyModeS.streamer.decode import Decode
from pyModeS.streamer.screen import Screen from pyModeS.streamer.screen import Screen
from pyModeS.streamer.source import NetSource, RtlSdrSource from pyModeS.streamer.source import NetSource, RtlSdrSource
@ -90,11 +90,14 @@ if DUMPTO is not None:
sys.exit(1) sys.exit(1)
raw_event = multiprocessing.Event() # raw_event = multiprocessing.Event()
ac_event = multiprocessing.Event() # ac_event = multiprocessing.Event()
# raw_queue = multiprocessing.Queue()
# ac_queue = multiprocessing.Queue()
raw_queue = multiprocessing.Queue() raw_pipe_in, raw_pipe_out = multiprocessing.Pipe()
aircraft_queue = multiprocessing.Queue() ac_pipe_in, ac_pipe_out = multiprocessing.Pipe()
stop_flag = multiprocessing.Value("b", False)
if SOURCE == "net": if SOURCE == "net":
source = NetSource(host=SERVER, port=PORT, rawtype=DATATYPE) source = NetSource(host=SERVER, port=PORT, rawtype=DATATYPE)
@ -103,29 +106,30 @@ elif SOURCE == "rtlsdr":
recv_process = multiprocessing.Process( recv_process = multiprocessing.Process(
target=source.run, args=(raw_event, raw_queue) target=source.run, args=(raw_pipe_in, stop_flag)
) )
stream = Stream(latlon=LATLON, dumpto=DUMPTO) decode = Decode(latlon=LATLON, dumpto=DUMPTO)
stream_process = multiprocessing.Process( decode_process = multiprocessing.Process(
target=stream.run, args=(raw_event, ac_event, raw_queue, aircraft_queue) target=decode.run, args=(raw_pipe_out, ac_pipe_in)
) )
screen = Screen(uncertainty=UNCERTAINTY) screen = Screen(uncertainty=UNCERTAINTY)
screen_process = multiprocessing.Process( screen_process = multiprocessing.Process(
target=screen.run, args=(ac_event, aircraft_queue) target=screen.run, args=(ac_pipe_out,)
) )
def closeall(signal, frame): def closeall(signal, frame):
print("KeyboardInterrupt (ID: {}). Cleaning up...".format(signal)) print("KeyboardInterrupt (ID: {}). Cleaning up...".format(signal))
stop_flag.value = True
curses.endwin() curses.endwin()
recv_process.terminate() recv_process.terminate()
stream_process.terminate() decode_process.terminate()
screen_process.terminate() screen_process.terminate()
recv_process.join() recv_process.join()
stream_process.join() decode_process.join()
screen_process.join() screen_process.join()
exit(0) exit(0)
@ -133,5 +137,5 @@ def closeall(signal, frame):
signal.signal(signal.SIGINT, closeall) signal.signal(signal.SIGINT, closeall)
recv_process.start() recv_process.start()
stream_process.start() decode_process.start()
screen_process.start() screen_process.start()

View File

@ -189,21 +189,24 @@ class Screen(object):
self.screen.refresh() self.screen.refresh()
self.draw_frame() self.draw_frame()
def run(self, ac_event, ac_queue): def run(self, ac_pipe_out):
local_buffer = []
key_thread = threading.Thread(target=self.kye_handling) key_thread = threading.Thread(target=self.kye_handling)
key_thread.start() key_thread.start()
while True: while True:
if ac_event.is_set(): while ac_pipe_out.poll():
while not ac_queue.empty(): acs = ac_pipe_out.recv()
acs = ac_queue.get() local_buffer.append(acs)
self.update_ac(acs)
ac_event.clear() for acs in local_buffer:
try: self.update_ac(acs)
self.update()
except: local_buffer = []
pass
try:
self.update()
except:
pass
time.sleep(0.001) time.sleep(0.001)

View File

@ -11,7 +11,11 @@ class NetSource(TcpClient):
self.local_buffer_commb_msg = [] self.local_buffer_commb_msg = []
self.local_buffer_commb_ts = [] self.local_buffer_commb_ts = []
def handle_messages(self, messages, raw_event, raw_queue): def handle_messages(self, messages):
if self.stop_flag.value is True:
self.stop()
return
for msg, t in messages: for msg, t in messages:
if len(msg) < 28: # only process long messages if len(msg) < 28: # only process long messages
@ -29,7 +33,7 @@ class NetSource(TcpClient):
continue continue
if len(self.local_buffer_adsb_msg) > 1: if len(self.local_buffer_adsb_msg) > 1:
raw_queue.put( self.raw_pipe_in.send(
{ {
"adsb_ts": self.local_buffer_adsb_ts, "adsb_ts": self.local_buffer_adsb_ts,
"adsb_msg": self.local_buffer_adsb_msg, "adsb_msg": self.local_buffer_adsb_msg,
@ -37,7 +41,6 @@ class NetSource(TcpClient):
"commb_msg": self.local_buffer_commb_msg, "commb_msg": self.local_buffer_commb_msg,
} }
) )
raw_event.set()
class RtlSdrSource(RtlReader): class RtlSdrSource(RtlReader):
@ -48,7 +51,11 @@ class RtlSdrSource(RtlReader):
self.local_buffer_commb_msg = [] self.local_buffer_commb_msg = []
self.local_buffer_commb_ts = [] self.local_buffer_commb_ts = []
def handle_messages(self, messages, raw_event, raw_queue): def handle_messages(self, messages):
if self.stop_flag.value is True:
self.stop()
return
for msg, t in messages: for msg, t in messages:
if len(msg) < 28: # only process long messages if len(msg) < 28: # only process long messages
@ -66,7 +73,7 @@ class RtlSdrSource(RtlReader):
continue continue
if len(self.local_buffer_adsb_msg) > 1: if len(self.local_buffer_adsb_msg) > 1:
raw_queue.put( self.raw_pipe_in.send(
{ {
"adsb_ts": self.local_buffer_adsb_ts, "adsb_ts": self.local_buffer_adsb_ts,
"adsb_msg": self.local_buffer_adsb_msg, "adsb_msg": self.local_buffer_adsb_msg,
@ -74,4 +81,3 @@ class RtlSdrSource(RtlReader):
"commb_msg": self.local_buffer_commb_msg, "commb_msg": self.local_buffer_commb_msg,
} }
) )
raw_event.set()