improve modeslive

This commit is contained in:
Junzi Sun 2020-02-25 21:30:42 +01:00
parent e52d43f963
commit 768b80df8e
5 changed files with 102 additions and 71 deletions

View File

@ -1,3 +1,4 @@
import traceback
import numpy as np
import pyModeS as pms
from rtlsdr import RtlSdr
@ -7,13 +8,12 @@ sampling_rate = 2e6
smaples_per_microsec = 2
modes_frequency = 1090e6
buffer_size = 1024 * 100
read_size = 1024 * 10
buffer_size = 1024 * 200
read_size = 1024 * 100
pbits = 8
fbits = 112
preamble = [1, 0, 1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0]
# th_amp = 0.2 # signal amplitude threshold for 0 and 1 bit
th_amp_diff = 0.8 # signal amplitude threshold difference between 0 and 1 bit
@ -34,7 +34,12 @@ class RtlReader(object):
def _calc_noise(self):
"""Calculate noise floor"""
window = smaples_per_microsec * 100
means = np.array(self.signal_buffer).reshape(-1, window).mean(axis=1)
total_len = len(self.signal_buffer)
means = (
np.array(self.signal_buffer[: total_len // window * window])
.reshape(-1, window)
.mean(axis=1)
)
return min(means)
def _process_buffer(self):
@ -153,18 +158,24 @@ class RtlReader(object):
pass
def stop(self, *args, **kwargs):
self.sdr.cancel_read_async()
self.sdr.close()
def run(self, raw_pipe_in=None, stop_flag=None):
def run(self, raw_pipe_in=None, stop_flag=None, exception_queue=None):
self.raw_pipe_in = raw_pipe_in
self.stop_flag = stop_flag
self.sdr.read_samples_async(self._read_callback, read_size)
# count = 1
# while count < 1000:
# count += 1
# data = self.sdr.read_samples(read_size)
# self._read_callback(data, None)
try:
# raise RuntimeError("test exception")
while True:
data = self.sdr.read_samples(read_size)
self._read_callback(data, None)
except Exception as e:
tb = traceback.format_exc()
if exception_queue is not None:
exception_queue.put(tb)
raise e
if __name__ == "__main__":

View File

@ -254,7 +254,7 @@ class TcpClient(object):
for msg, t in messages:
print("%15.9f %s" % (t, msg))
def run(self, raw_pipe_in=None, stop_flag=None):
def run(self, raw_pipe_in=None, stop_flag=None, exception_queue=None):
self.raw_pipe_in = raw_pipe_in
self.stop_flag = stop_flag
self.connect()
@ -269,11 +269,6 @@ class TcpClient(object):
self.buffer.extend(received)
# print(''.join(x.encode('hex') for x in self.buffer))
# process self.buffer when it is longer enough
# if len(self.buffer) < 2048:
# continue
# -- Removed!! Cause delay in low data rate scenario --
if self.datatype == "beast":
messages = self.read_beast_buffer()
elif self.datatype == "raw":
@ -286,22 +281,12 @@ class TcpClient(object):
else:
self.handle_messages(messages)
except Exception as e:
# Provides the user an option to supply the environment
# variable PYMODES_DEBUG to halt the execution
# for debugging purposes
debug_intent = os.environ.get("PYMODES_DEBUG", "false")
if debug_intent.lower() == "true":
traceback.print_exc()
sys.exit()
else:
print("Unexpected Error:", e)
# raise RuntimeError("test exception")
try:
sock = self.connect()
time.sleep(1)
except Exception as e:
print("Unexpected Error:", e)
except Exception as e:
tb = traceback.format_exc()
exception_queue.put(tb)
raise e
if __name__ == "__main__":

View File

@ -263,22 +263,27 @@ class Decode:
acs = self.acs
return acs
def run(self, raw_pipe_out, ac_pipe_in):
def run(self, raw_pipe_out, ac_pipe_in, exception_queue):
local_buffer = []
while True:
while raw_pipe_out.poll():
data = raw_pipe_out.recv()
local_buffer.append(data)
try:
while raw_pipe_out.poll():
data = raw_pipe_out.recv()
local_buffer.append(data)
for data in local_buffer:
self.process_raw(
data["adsb_ts"],
data["adsb_msg"],
data["commb_ts"],
data["commb_msg"],
)
local_buffer = []
for data in local_buffer:
self.process_raw(
data["adsb_ts"],
data["adsb_msg"],
data["commb_ts"],
data["commb_msg"],
)
local_buffer = []
acs = self.get_aircraft()
ac_pipe_in.send(acs)
time.sleep(0.001)
acs = self.get_aircraft()
ac_pipe_in.send(acs)
time.sleep(0.001)
except Exception as e:
tb = traceback.format_exc()
exception_queue.put((e, tb))

View File

@ -3,6 +3,7 @@
from __future__ import print_function, division
import os
import sys
import time
import argparse
import curses
import signal
@ -15,7 +16,6 @@ from pyModeS.streamer.source import NetSource, RtlSdrSource
# redirect all stdout to null, avoiding messing up with the screen
sys.stdout = open(os.devnull, "w")
support_rawtypes = ["raw", "beast", "skysense"]
parser = argparse.ArgumentParser()
@ -87,13 +87,9 @@ if DUMPTO is not None:
sys.exit(1)
# raw_event = multiprocessing.Event()
# ac_event = multiprocessing.Event()
# raw_queue = multiprocessing.Queue()
# ac_queue = multiprocessing.Queue()
raw_pipe_in, raw_pipe_out = multiprocessing.Pipe()
ac_pipe_in, ac_pipe_out = multiprocessing.Pipe()
exception_queue = multiprocessing.Queue()
stop_flag = multiprocessing.Value("b", False)
if SOURCE == "net":
@ -102,29 +98,38 @@ elif SOURCE == "rtlsdr":
source = RtlSdrSource()
recv_process = multiprocessing.Process(target=source.run, args=(raw_pipe_in, stop_flag))
recv_process = multiprocessing.Process(
target=source.run, args=(raw_pipe_in, stop_flag, exception_queue)
)
decode = Decode(latlon=LATLON, dumpto=DUMPTO)
decode_process = multiprocessing.Process(
target=decode.run, args=(raw_pipe_out, ac_pipe_in)
target=decode.run, args=(raw_pipe_out, ac_pipe_in, exception_queue)
)
screen = Screen(uncertainty=UNCERTAINTY)
screen_process = multiprocessing.Process(target=screen.run, args=(ac_pipe_out,))
screen_process = multiprocessing.Process(
target=screen.run, args=(ac_pipe_out, exception_queue)
)
def closeall(signal, frame):
print("KeyboardInterrupt (ID: {}). Cleaning up...".format(signal))
def shutdown():
stop_flag.value = True
curses.endwin()
sys.stdout = sys.__stdout__
recv_process.terminate()
decode_process.terminate()
screen_process.terminate()
recv_process.join()
decode_process.join()
screen_process.join()
exit(0)
def closeall(signal, frame):
print("KeyboardInterrupt (ID: {}). Cleaning up...".format(signal))
shutdown()
sys.exit(0)
signal.signal(signal.SIGINT, closeall)
@ -132,3 +137,19 @@ signal.signal(signal.SIGINT, closeall)
recv_process.start()
decode_process.start()
screen_process.start()
while True:
if (
(not recv_process.is_alive())
or (not decode_process.is_alive())
or (not screen_process.is_alive())
):
shutdown()
while not exception_queue.empty():
trackback = exception_queue.get()
print(trackback)
sys.exit(1)
time.sleep(0.01)

View File

@ -3,6 +3,7 @@ import curses
import numpy as np
import time
import threading
import traceback
COLUMNS = [
("call", 10),
@ -187,24 +188,32 @@ class Screen(object):
self.screen.refresh()
self.draw_frame()
def run(self, ac_pipe_out):
def run(self, ac_pipe_out, exception_queue):
local_buffer = []
key_thread = threading.Thread(target=self.kye_handling)
key_thread.daemon = True
key_thread.start()
while True:
while ac_pipe_out.poll():
acs = ac_pipe_out.recv()
local_buffer.append(acs)
for acs in local_buffer:
self.update_ac(acs)
local_buffer = []
try:
# raise RuntimeError("test exception")
while ac_pipe_out.poll():
acs = ac_pipe_out.recv()
local_buffer.append(acs)
for acs in local_buffer:
self.update_ac(acs)
local_buffer = []
self.update()
except:
except curses.error:
pass
except Exception as e:
tb = traceback.format_exc()
exception_queue.put(tb)
time.sleep(0.1)
raise e
time.sleep(0.001)