From 768b80df8e3278ec4c42e4f2ffc373422aac24f8 Mon Sep 17 00:00:00 2001 From: Junzi Sun Date: Tue, 25 Feb 2020 21:30:42 +0100 Subject: [PATCH] improve modeslive --- pyModeS/extra/rtlreader.py | 35 +++++++++++++++++++---------- pyModeS/extra/tcpclient.py | 27 +++++------------------ pyModeS/streamer/decode.py | 35 ++++++++++++++++------------- pyModeS/streamer/modeslive | 45 ++++++++++++++++++++++++++++---------- pyModeS/streamer/screen.py | 31 ++++++++++++++++---------- 5 files changed, 102 insertions(+), 71 deletions(-) diff --git a/pyModeS/extra/rtlreader.py b/pyModeS/extra/rtlreader.py index fc35270..3c40ebc 100644 --- a/pyModeS/extra/rtlreader.py +++ b/pyModeS/extra/rtlreader.py @@ -1,3 +1,4 @@ +import traceback import numpy as np import pyModeS as pms from rtlsdr import RtlSdr @@ -7,13 +8,12 @@ sampling_rate = 2e6 smaples_per_microsec = 2 modes_frequency = 1090e6 -buffer_size = 1024 * 100 -read_size = 1024 * 10 +buffer_size = 1024 * 200 +read_size = 1024 * 100 pbits = 8 fbits = 112 preamble = [1, 0, 1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0] -# th_amp = 0.2 # signal amplitude threshold for 0 and 1 bit th_amp_diff = 0.8 # signal amplitude threshold difference between 0 and 1 bit @@ -34,7 +34,12 @@ class RtlReader(object): def _calc_noise(self): """Calculate noise floor""" window = smaples_per_microsec * 100 - means = np.array(self.signal_buffer).reshape(-1, window).mean(axis=1) + total_len = len(self.signal_buffer) + means = ( + np.array(self.signal_buffer[: total_len // window * window]) + .reshape(-1, window) + .mean(axis=1) + ) return min(means) def _process_buffer(self): @@ -153,18 +158,24 @@ class RtlReader(object): pass def stop(self, *args, **kwargs): - self.sdr.cancel_read_async() + self.sdr.close() - def run(self, raw_pipe_in=None, stop_flag=None): + def run(self, raw_pipe_in=None, stop_flag=None, exception_queue=None): self.raw_pipe_in = raw_pipe_in self.stop_flag = stop_flag - self.sdr.read_samples_async(self._read_callback, read_size) - # count = 1 - # while count < 1000: - # count += 1 - # data = self.sdr.read_samples(read_size) - # self._read_callback(data, None) + try: + # raise RuntimeError("test exception") + + while True: + data = self.sdr.read_samples(read_size) + self._read_callback(data, None) + + except Exception as e: + tb = traceback.format_exc() + if exception_queue is not None: + exception_queue.put(tb) + raise e if __name__ == "__main__": diff --git a/pyModeS/extra/tcpclient.py b/pyModeS/extra/tcpclient.py index ed54363..4216e3e 100644 --- a/pyModeS/extra/tcpclient.py +++ b/pyModeS/extra/tcpclient.py @@ -254,7 +254,7 @@ class TcpClient(object): for msg, t in messages: print("%15.9f %s" % (t, msg)) - def run(self, raw_pipe_in=None, stop_flag=None): + def run(self, raw_pipe_in=None, stop_flag=None, exception_queue=None): self.raw_pipe_in = raw_pipe_in self.stop_flag = stop_flag self.connect() @@ -269,11 +269,6 @@ class TcpClient(object): self.buffer.extend(received) # print(''.join(x.encode('hex') for x in self.buffer)) - # process self.buffer when it is longer enough - # if len(self.buffer) < 2048: - # continue - # -- Removed!! Cause delay in low data rate scenario -- - if self.datatype == "beast": messages = self.read_beast_buffer() elif self.datatype == "raw": @@ -286,22 +281,12 @@ class TcpClient(object): else: self.handle_messages(messages) - except Exception as e: - # Provides the user an option to supply the environment - # variable PYMODES_DEBUG to halt the execution - # for debugging purposes - debug_intent = os.environ.get("PYMODES_DEBUG", "false") - if debug_intent.lower() == "true": - traceback.print_exc() - sys.exit() - else: - print("Unexpected Error:", e) + # raise RuntimeError("test exception") - try: - sock = self.connect() - time.sleep(1) - except Exception as e: - print("Unexpected Error:", e) + except Exception as e: + tb = traceback.format_exc() + exception_queue.put(tb) + raise e if __name__ == "__main__": diff --git a/pyModeS/streamer/decode.py b/pyModeS/streamer/decode.py index bdfff22..cee02e0 100644 --- a/pyModeS/streamer/decode.py +++ b/pyModeS/streamer/decode.py @@ -263,22 +263,27 @@ class Decode: acs = self.acs return acs - def run(self, raw_pipe_out, ac_pipe_in): + def run(self, raw_pipe_out, ac_pipe_in, exception_queue): local_buffer = [] while True: - while raw_pipe_out.poll(): - data = raw_pipe_out.recv() - local_buffer.append(data) + try: + while raw_pipe_out.poll(): + data = raw_pipe_out.recv() + local_buffer.append(data) - for data in local_buffer: - self.process_raw( - data["adsb_ts"], - data["adsb_msg"], - data["commb_ts"], - data["commb_msg"], - ) - local_buffer = [] + for data in local_buffer: + self.process_raw( + data["adsb_ts"], + data["adsb_msg"], + data["commb_ts"], + data["commb_msg"], + ) + local_buffer = [] - acs = self.get_aircraft() - ac_pipe_in.send(acs) - time.sleep(0.001) + acs = self.get_aircraft() + ac_pipe_in.send(acs) + time.sleep(0.001) + + except Exception as e: + tb = traceback.format_exc() + exception_queue.put((e, tb)) diff --git a/pyModeS/streamer/modeslive b/pyModeS/streamer/modeslive index fba4c57..6e803ee 100755 --- a/pyModeS/streamer/modeslive +++ b/pyModeS/streamer/modeslive @@ -3,6 +3,7 @@ from __future__ import print_function, division import os import sys +import time import argparse import curses import signal @@ -15,7 +16,6 @@ from pyModeS.streamer.source import NetSource, RtlSdrSource # redirect all stdout to null, avoiding messing up with the screen sys.stdout = open(os.devnull, "w") - support_rawtypes = ["raw", "beast", "skysense"] parser = argparse.ArgumentParser() @@ -87,13 +87,9 @@ if DUMPTO is not None: sys.exit(1) -# raw_event = multiprocessing.Event() -# ac_event = multiprocessing.Event() -# raw_queue = multiprocessing.Queue() -# ac_queue = multiprocessing.Queue() - raw_pipe_in, raw_pipe_out = multiprocessing.Pipe() ac_pipe_in, ac_pipe_out = multiprocessing.Pipe() +exception_queue = multiprocessing.Queue() stop_flag = multiprocessing.Value("b", False) if SOURCE == "net": @@ -102,29 +98,38 @@ elif SOURCE == "rtlsdr": source = RtlSdrSource() -recv_process = multiprocessing.Process(target=source.run, args=(raw_pipe_in, stop_flag)) +recv_process = multiprocessing.Process( + target=source.run, args=(raw_pipe_in, stop_flag, exception_queue) +) decode = Decode(latlon=LATLON, dumpto=DUMPTO) decode_process = multiprocessing.Process( - target=decode.run, args=(raw_pipe_out, ac_pipe_in) + target=decode.run, args=(raw_pipe_out, ac_pipe_in, exception_queue) ) screen = Screen(uncertainty=UNCERTAINTY) -screen_process = multiprocessing.Process(target=screen.run, args=(ac_pipe_out,)) +screen_process = multiprocessing.Process( + target=screen.run, args=(ac_pipe_out, exception_queue) +) -def closeall(signal, frame): - print("KeyboardInterrupt (ID: {}). Cleaning up...".format(signal)) +def shutdown(): stop_flag.value = True curses.endwin() + sys.stdout = sys.__stdout__ recv_process.terminate() decode_process.terminate() screen_process.terminate() recv_process.join() decode_process.join() screen_process.join() - exit(0) + + +def closeall(signal, frame): + print("KeyboardInterrupt (ID: {}). Cleaning up...".format(signal)) + shutdown() + sys.exit(0) signal.signal(signal.SIGINT, closeall) @@ -132,3 +137,19 @@ signal.signal(signal.SIGINT, closeall) recv_process.start() decode_process.start() screen_process.start() + + +while True: + if ( + (not recv_process.is_alive()) + or (not decode_process.is_alive()) + or (not screen_process.is_alive()) + ): + shutdown() + while not exception_queue.empty(): + trackback = exception_queue.get() + print(trackback) + + sys.exit(1) + + time.sleep(0.01) diff --git a/pyModeS/streamer/screen.py b/pyModeS/streamer/screen.py index 5de1825..5d83d41 100644 --- a/pyModeS/streamer/screen.py +++ b/pyModeS/streamer/screen.py @@ -3,6 +3,7 @@ import curses import numpy as np import time import threading +import traceback COLUMNS = [ ("call", 10), @@ -187,24 +188,32 @@ class Screen(object): self.screen.refresh() self.draw_frame() - def run(self, ac_pipe_out): + def run(self, ac_pipe_out, exception_queue): local_buffer = [] key_thread = threading.Thread(target=self.kye_handling) + key_thread.daemon = True key_thread.start() while True: - while ac_pipe_out.poll(): - acs = ac_pipe_out.recv() - local_buffer.append(acs) - - for acs in local_buffer: - self.update_ac(acs) - - local_buffer = [] - try: + # raise RuntimeError("test exception") + + while ac_pipe_out.poll(): + acs = ac_pipe_out.recv() + local_buffer.append(acs) + + for acs in local_buffer: + self.update_ac(acs) + + local_buffer = [] + self.update() - except: + except curses.error: pass + except Exception as e: + tb = traceback.format_exc() + exception_queue.put(tb) + time.sleep(0.1) + raise e time.sleep(0.001)