change modeslive to multiprocessing

This commit is contained in:
Junzi Sun 2019-08-26 00:21:29 +02:00
parent 28a6e53d49
commit 86f302f05e
6 changed files with 183 additions and 139 deletions

View File

@ -1,7 +1,6 @@
import numpy as np
import pyModeS as pms
from rtlsdr import RtlSdr
from threading import Thread
import time
modes_sample_rate = 2e6
@ -16,17 +15,20 @@ th_amp = 0.2 # signal amplitude threshold for 0 and 1 bit
th_amp_diff = 0.8 # signal amplitude threshold difference between 0 and 1 bit
class RtlReader(Thread):
def __init__(self, debug=False):
class RtlReader(object):
def __init__(self, **kwargs):
super(RtlReader, self).__init__()
self.signal_buffer = []
self.debug = debug
self.sdr = RtlSdr()
self.sdr.sample_rate = modes_sample_rate
self.sdr.center_freq = modes_frequency
self.sdr.gain = "auto"
# sdr.freq_correction = 75
self.debug = kwargs.get("debug", False)
self.raw_event = None
self.raw_queue = None
def _process_buffer(self):
messages = []
@ -127,7 +129,7 @@ class RtlReader(Thread):
if len(self.signal_buffer) >= buffer_size:
messages = self._process_buffer()
self.handle_messages(messages)
self.handle_messages(messages, self.raw_event, self.raw_queue)
def handle_messages(self, messages):
"""re-implement this method to handle the messages"""
@ -138,7 +140,9 @@ class RtlReader(Thread):
def stop(self):
self.sdr.cancel_read_async()
def run(self):
def run(self, raw_event=None, raw_queue=None):
self.raw_event = raw_event
self.raw_queue = raw_queue
self.sdr.read_samples_async(self._read_callback, read_size)
# count = 1

View File

@ -5,7 +5,6 @@ import os
import sys
import time
import pyModeS as pms
from threading import Thread
import traceback
import zmq
@ -15,9 +14,9 @@ else:
PY_VERSION = 2
class BaseClient(Thread):
class TcpClient(object):
def __init__(self, host, port, datatype):
Thread.__init__(self)
super(TcpClient, self).__init__()
self.host = host
self.port = port
self.buffer = []
@ -136,6 +135,9 @@ class BaseClient(Thread):
# Other message tupe
continue
if len(msg) not in [14, 28]:
continue
df = pms.df(msg)
# skip incomplete message
@ -247,12 +249,12 @@ class BaseClient(Thread):
self.buffer = self.buffer[1:]
return messages
def handle_messages(self, messages):
def handle_messages(self, messages, raw_event=None, raw_queue=None):
"""re-implement this method to handle the messages"""
for msg, t in messages:
print("%15.9f %s" % (t, msg))
def run(self):
def run(self, raw_event=None, raw_queue=None):
self.connect()
while True:
@ -280,11 +282,10 @@ class BaseClient(Thread):
if not messages:
continue
else:
self.handle_messages(messages)
self.handle_messages(messages, raw_event, raw_queue)
time.sleep(0.001)
except Exception as e:
# Provides the user an option to supply the environment
# variable PYMODES_DEBUG to halt the execution
# for debugging purposes
@ -306,6 +307,5 @@ if __name__ == "__main__":
host = sys.argv[1]
port = int(sys.argv[2])
datatype = sys.argv[3]
client = BaseClient(host=host, port=port, datatype=datatype)
client.daemon = True
client = TcpClient(host=host, port=port, datatype=datatype)
client.run()

View File

@ -3,21 +3,18 @@
from __future__ import print_function, division
import os
import sys
import time
import argparse
import curses
from threading import Lock
import pyModeS as pms
from pyModeS.extra.tcpclient import BaseClient
from pyModeS.extra.rtlreader import RtlReader
import signal
import multiprocessing
from pyModeS.streamer.stream import Stream
from pyModeS.streamer.screen import Screen
from pyModeS.streamer.source import NetSource, RtlSdrSource
# redirect all stdout to null, avoiding messing up with the screen
sys.stdout = open(os.devnull, "w")
LOCK = Lock()
ADSB_MSG = []
ADSB_TS = []
COMMB_MSG = []
COMMB_TS = []
support_rawtypes = ["raw", "beast", "skysense"]
@ -93,118 +90,48 @@ if DUMPTO is not None:
sys.exit(1)
class ModesClient(BaseClient):
def __init__(self, host, port, rawtype):
super(ModesClient, self).__init__(host, port, rawtype)
raw_event = multiprocessing.Event()
ac_event = multiprocessing.Event()
def handle_messages(self, messages):
local_buffer_adsb_msg = []
local_buffer_adsb_ts = []
local_buffer_ehs_msg = []
local_buffer_ehs_ts = []
for msg, t in messages:
if len(msg) < 28: # only process long messages
continue
df = pms.df(msg)
if df == 17 or df == 18:
local_buffer_adsb_msg.append(msg)
local_buffer_adsb_ts.append(t)
elif df == 20 or df == 21:
local_buffer_ehs_msg.append(msg)
local_buffer_ehs_ts.append(t)
else:
continue
LOCK.acquire()
ADSB_MSG.extend(local_buffer_adsb_msg)
ADSB_TS.extend(local_buffer_adsb_ts)
COMMB_MSG.extend(local_buffer_ehs_msg)
COMMB_TS.extend(local_buffer_ehs_ts)
LOCK.release()
class ModesRtlReader(RtlReader):
"""docstring for ModesRtlReader."""
def __init__(self):
super(ModesRtlReader, self).__init__()
def handle_messages(self, messages):
local_buffer_adsb_msg = []
local_buffer_adsb_ts = []
local_buffer_ehs_msg = []
local_buffer_ehs_ts = []
for msg, t in messages:
if len(msg) < 28: # only process long messages
continue
df = pms.df(msg)
if df == 17 or df == 18:
local_buffer_adsb_msg.append(msg)
local_buffer_adsb_ts.append(t)
elif df == 20 or df == 21:
local_buffer_ehs_msg.append(msg)
local_buffer_ehs_ts.append(t)
else:
continue
LOCK.acquire()
ADSB_MSG.extend(local_buffer_adsb_msg)
ADSB_TS.extend(local_buffer_adsb_ts)
COMMB_MSG.extend(local_buffer_ehs_msg)
COMMB_TS.extend(local_buffer_ehs_ts)
# print(len(ADSB_MSG))
# print(len(COMMB_MSG))
LOCK.release()
# redirect all stdout to null, avoiding messing up with the screen
sys.stdout = open(os.devnull, "w")
raw_queue = multiprocessing.Queue()
aircraft_queue = multiprocessing.Queue()
if SOURCE == "net":
client = ModesClient(host=SERVER, port=PORT, rawtype=DATATYPE)
client.daemon = True
client.start()
source = NetSource(host=SERVER, port=PORT, rawtype=DATATYPE)
elif SOURCE == "rtlsdr":
rtl = ModesRtlReader()
# rtl.debug = True
rtl.daemon = True
rtl.start()
source = RtlSdrSource()
recv_process = multiprocessing.Process(
target=source.run, args=(raw_event, raw_queue)
)
stream = Stream(latlon=LATLON, dumpto=DUMPTO)
stream_process = multiprocessing.Process(
target=stream.run, args=(raw_event, ac_event, raw_queue, aircraft_queue)
)
try:
screen = Screen(uncertainty=UNCERTAINTY)
screen.daemon = True
screen.start()
screen = Screen(uncertainty=UNCERTAINTY)
screen_process = multiprocessing.Process(
target=screen.run, args=(ac_event, aircraft_queue)
)
while True:
if len(ADSB_MSG) > 1:
LOCK.acquire()
stream.process_raw(ADSB_TS, ADSB_MSG, COMMB_TS, COMMB_MSG)
ADSB_MSG = []
ADSB_TS = []
COMMB_MSG = []
COMMB_TS = []
LOCK.release()
acs = stream.get_aircraft()
try:
screen.update_data(acs)
screen.update()
time.sleep(1)
except KeyboardInterrupt:
raise
except:
continue
except KeyboardInterrupt:
sys.exit(0)
finally:
def closeall(signal, frame):
print("KeyboardInterrupt (ID: {}). Cleaning up...".format(signal))
curses.endwin()
recv_process.terminate()
stream_process.terminate()
screen_process.terminate()
recv_process.join()
stream_process.join()
screen_process.join()
exit(0)
signal.signal(signal.SIGINT, closeall)
recv_process.start()
stream_process.start()
screen_process.start()

View File

@ -1,9 +1,8 @@
from __future__ import print_function, division
import os
import curses
import numpy as np
import time
from threading import Thread
import threading
COLUMNS = [
("call", 10),
@ -39,9 +38,9 @@ UNCERTAINTY_COLUMNS = [
]
class Screen(Thread):
class Screen(object):
def __init__(self, uncertainty=False):
Thread.__init__(self)
super(Screen, self).__init__()
self.screen = curses.initscr()
curses.noecho()
curses.mousemask(1)
@ -59,7 +58,7 @@ class Screen(Thread):
def reset_cursor_pos(self):
self.screen.move(self.y, self.x)
def update_data(self, acs):
def update_ac(self, acs):
self.acs = acs
def draw_frame(self):
@ -154,7 +153,7 @@ class Screen(Thread):
self.reset_cursor_pos()
def run(self):
def kye_handling(self):
self.draw_frame()
self.scr_h, self.scr_w = self.screen.getmaxyx()
@ -184,6 +183,27 @@ class Screen(Thread):
self.y = y_intent
elif c == curses.KEY_ENTER or c == 10 or c == 13:
self.lock_icao = (self.screen.instr(self.y, 1, 6)).decode()
elif c == 27: # escape key
self.lock_icao = None
elif c == curses.KEY_F5:
self.screen.refresh()
self.draw_frame()
def run(self, ac_event, ac_queue):
key_thread = threading.Thread(target=self.kye_handling)
key_thread.start()
while True:
if ac_event.is_set():
while not ac_queue.empty():
acs = ac_queue.get()
self.update_ac(acs)
ac_event.clear()
try:
self.update()
except:
pass
time.sleep(0.001)

View File

@ -0,0 +1,77 @@
import pyModeS as pms
from pyModeS.extra.tcpclient import TcpClient
from pyModeS.extra.rtlreader import RtlReader
class NetSource(TcpClient):
def __init__(self, host, port, rawtype):
super(NetSource, self).__init__(host, port, rawtype)
self.local_buffer_adsb_msg = []
self.local_buffer_adsb_ts = []
self.local_buffer_commb_msg = []
self.local_buffer_commb_ts = []
def handle_messages(self, messages, raw_event, raw_queue):
for msg, t in messages:
if len(msg) < 28: # only process long messages
continue
df = pms.df(msg)
if df == 17 or df == 18:
self.local_buffer_adsb_msg.append(msg)
self.local_buffer_adsb_ts.append(t)
elif df == 20 or df == 21:
self.local_buffer_commb_msg.append(msg)
self.local_buffer_commb_ts.append(t)
else:
continue
if len(self.local_buffer_adsb_msg) > 1:
raw_queue.put(
{
"adsb_ts": self.local_buffer_adsb_ts,
"adsb_msg": self.local_buffer_adsb_msg,
"commb_ts": self.local_buffer_commb_ts,
"commb_msg": self.local_buffer_commb_msg,
}
)
raw_event.set()
class RtlSdrSource(RtlReader):
def __init__(self):
super(RtlSdrSource, self).__init__()
self.local_buffer_adsb_msg = []
self.local_buffer_adsb_ts = []
self.local_buffer_commb_msg = []
self.local_buffer_commb_ts = []
def handle_messages(self, messages, raw_event, raw_queue):
for msg, t in messages:
if len(msg) < 28: # only process long messages
continue
df = pms.df(msg)
if df == 17 or df == 18:
self.local_buffer_adsb_msg.append(msg)
self.local_buffer_adsb_ts.append(t)
elif df == 20 or df == 21:
self.local_buffer_commb_msg.append(msg)
self.local_buffer_commb_ts.append(t)
else:
continue
if len(self.local_buffer_adsb_msg) > 1:
raw_queue.put(
{
"adsb_ts": self.local_buffer_adsb_ts,
"adsb_msg": self.local_buffer_adsb_msg,
"commb_ts": self.local_buffer_commb_ts,
"commb_msg": self.local_buffer_commb_msg,
}
)
raw_event.set()

View File

@ -26,7 +26,7 @@ class Stream:
else:
self.dumpto = None
def process_raw(self, adsb_ts, adsb_msgs, commb_ts, commb_msgs, tnow=None):
def process_raw(self, adsb_ts, adsb_msg, commb_ts, commb_msg, tnow=None):
"""process a chunk of adsb and commb messages recieved in the same
time period.
"""
@ -39,7 +39,7 @@ class Stream:
output_buffer = []
# process adsb message
for t, msg in zip(adsb_ts, adsb_msgs):
for t, msg in zip(adsb_ts, adsb_msg):
icao = pms.icao(msg)
tc = pms.adsb.typecode(msg)
@ -192,7 +192,7 @@ class Stream:
ac["nic_a"], ac["nic_bc"] = pms.adsb.nic_a_c(msg)
# process commb message
for t, msg in zip(commb_ts, commb_msgs):
for t, msg in zip(commb_ts, commb_msg):
icao = pms.icao(msg)
if icao not in self.acs:
@ -266,3 +266,19 @@ class Stream:
"""all aircraft that are stored in memeory"""
acs = self.acs
return acs
def run(self, raw_event, ac_event, raw_queue, aircraft_queue):
while True:
if raw_event.is_set():
data = raw_queue.get()
self.process_raw(
data["adsb_ts"],
data["adsb_msg"],
data["commb_ts"],
data["commb_msg"],
)
aircraft_queue.put(self.get_aircraft())
ac_event.set()
raw_event.clear()
time.sleep(0.001)