mlat server working in the client-server direction. other way round untested.

This commit is contained in:
Nick Foster 2012-12-18 19:23:52 -08:00
parent 1e2b8a4f46
commit fd12402462
7 changed files with 128 additions and 61 deletions

View File

@ -21,6 +21,7 @@ include(GrPython)
GR_PYTHON_INSTALL(
PROGRAMS
mlat_server
modes_rx
modes_gui
uhd_modes.py

View File

@ -20,6 +20,16 @@
# Boston, MA 02110-1301, USA.
#
#simple multilateration server app.
#looks at received messages and then attempts to multilaterate positions
#will not attempt clock synchronization yet; it's up to clients to present
#accurate timestamps. later on we can do clock sync based on ADS-B packets.
#SECURITY NOTE: THIS SERVER WAS WRITTEN WITH NO REGARD TOWARD SECURITY.
#NO REAL ATTEMPT AT DATA VALIDATION, MUCH LESS SECURITY, HAS BEEN MADE.
#USE THIS PROGRAM AT YOUR OWN RISK AND WITH THE UNDERSTANDING THAT THE
#INTERNET IS A SCARY PLACE FULL OF MEAN PEOPLE.
import time, os, sys, socket, struct
from string import split, join
from datetime import *
@ -28,29 +38,26 @@ import pickle
import time
import bisect
#simple multilateration server app.
#accepts connections from clients (need a raw_client output thing)
#looks at received messages and then attempts to multilaterate positions
#will not attempt clock synchronization yet; it's up to clients to present
#accurate timestamps. later on we can do clock sync based on ADS-B packets.
#how to store records for quick retrieval?
#the data is really a hash; we use it to find correlated records.
#self._records should be a dict of replies
#so: { <adsbdata>: [{ "addr": "192.168.10.1", "secs": 11, "frac_secs": 0.123456 }, {....}...] ... }
#adsbdata should be an int.
#change this to 0 for ASCII format for debugging. use HIGHEST_PROTOCOL
#for actual use to keep the pickle size down.
#pickle_prot = 0
pickle_prot = pickle.HIGHEST_PROTOCOL
pickle_prot = 0
#pickle_prot = pickle.HIGHEST_PROTOCOL
class rx_data:
def __init__(self):
self.secs = 0
self.frac_secs = 0.0
self.data = None
secs = 0
frac_secs = 0.0
data = None
class mlat_data:
data = None
secs = 0
frac_secs = 0.0
numstations = 0
lat = 0
lon = 0
alt = 0
hdop = 0.0
vdop = 0.0
class stamp:
def __init__(self, clientinfo, secs, frac_secs):
@ -84,6 +91,7 @@ class client_info:
self.position = []
self.offset_secs = 0
self.offset_frac_secs = 0.0
self.time_source = None
class connection:
def __init__(self, addr, sock, clientinfo):
@ -101,8 +109,8 @@ class mlat_server:
self._conns = [] #list of active connections
self._reports = {} #hashed by data
self._lastreport = 0 #used for pruning
self._parser = air_modes.parse(None)
self._remnant = None #for piecing together messages across packets
def __del__(self):
self._s.close()
@ -111,24 +119,34 @@ class mlat_server:
for conn in self._conns:
pkt = None
try:
pkt = conn.sock.recv(1024)
pkt = conn.sock.recv(4096)
except socket.error:
self._conns.remove(conn)
if not pkt: break
#print "Received message from %s with length %i" % (conn.clientinfo.name, len(pkt))
try:
msglist = pickle.loads(pkt)
for msg in msglist:
st = stamp(conn.clientinfo, msg.secs, msg.frac_secs)
if msg.data not in self._reports:
self._reports[msg.data] = []
for msg in pkt.splitlines(True):
if msg.endswith("\n"):
if self._remnant:
msg = self._remnant + msg
self._remnant = None
[data, ecc, reference, timestamp_int, timestamp_frac] = msg.split()
st = stamp(conn.clientinfo, int(timestamp_int), float(timestamp_frac))
if data not in self._reports:
self._reports[data] = []
#ordered insert
ordered_insert(self._reports[msg.data], st)
if st.tofloat() > self._lastreport:
self._lastreport = st.tofloat()
#ordered insert
ordered_insert(self._reports[data], st)
if st.tofloat() > self._lastreport:
self._lastreport = st.tofloat()
else:
if self._remnant is not None:
raise Exception("Malformed data: " + msg)
else:
self._remnant = msg
except Exception as e:
print "Invalid message from %s: %s" % (conn.addr, pkt)
print "Invalid message from %s: %s..." % (conn.addr, pkt[:64])
print e
#self.prune()
@ -160,9 +178,9 @@ class mlat_server:
def get_eligible_reports(self):
groups = []
for data,stamps in self._reports.iteritems():
if len(stamps) > 2: #quick check before we do a set()
if len(stamps) >= 4: #quick check before we do a set()
stations = set([st.clientinfo for st in stamps])
if len(stations) > 2:
if len(stations) >= 4:
i=0
#it's O(n) since the list is already sorted
#can probably be cleaner and more concise
@ -178,30 +196,32 @@ class mlat_server:
if st.clientinfo == cinfo:
deduped.append(st)
break
if len(deduped) > 2:
if len(deduped) >= 4:
groups.append({"data": data, "stamps": deduped})
#TODO: pop the entries so you don't issue duplicate reports
if len(groups) > 0:
return groups
return None
#issue multilaterated positions
def output(self, msg):
#do something here to compose a message
if msg is not None:
try:
for conn in self._conns[:]: #iterate over a copy of the list
conn.sock.send(msg)
except socket.error:
print "Client %s disconnected" % conn.clientinfo.name
self._conns.remove(conn)
print "Connections: ", len(self._conns)
def output(self, txlist):
#TODO: buffer this like the client does
msg = pickle.dumps(txlist, pickle_prot)
if msg is not None:
try:
for conn in self._conns[:]: #iterate over a copy of the list
conn.sock.send(msg)
except socket.error:
print "Client %s disconnected" % conn.clientinfo.name
self._conns.remove(conn)
print "Connections: ", len(self._conns)
#add a new connection to the list
def add_pending_conns(self):
try:
conn, addr = self._s.accept()
conn.send("HELO\n") #yeah it's like that
conn.send("HELO") #yeah it's like that
msg = conn.recv(1024)
if not msg:
return
@ -237,9 +257,9 @@ def get_modes_altitude(data):
return None
if __name__=="__main__":
srv = mlat_server("nothin'", 31337)
srv = mlat_server("nothin'", 19005)
while 1:
srv.output("Buttes")
#srv.output("Buttes")
srv.get_messages()
srv.add_pending_conns()
reps = srv.get_eligible_reports()
@ -254,6 +274,7 @@ if __name__=="__main__":
#it's expecting a list of tuples [(station[], timestamp)...]
#also have to parse the data to pull altitude out of the mix
if reps:
txlist = []
for rep in reps:
alt = get_modes_altitude(air_modes.modes_reply(rep["data"]))
if (alt is None and len(rep["stamps"]) > 3) or alt is not None:
@ -261,10 +282,25 @@ if __name__=="__main__":
print mlat_list
#multilaterate!
try:
pos = air_modes.mlat.mlat(mlat_list, alt)
pos, senttime = air_modes.mlat.mlat(mlat_list, alt)
if pos is not None:
print pos
print "Resolved position: ", pos
#add to message stack
msg = mlat_data()
msg.data = rep["data"]
#TODO: fix loss of precision in mlat() timestamp output
msg.secs = int(senttime)
msg.frac_secs = float(senttime) - msg.secs
msg.numstations = len(mlat_list)
msg.lat = pos[0]
msg.lon = pos[1]
msg.alt = pos[2]
msg.hdop = 0.0
msg.vdop = 0.0
txlist.append(msg)
except Exception as e:
print e
srv.output(txlist)
time.sleep(0.3)

View File

@ -36,13 +36,15 @@ def get_pos(time):
ac_starting_pos[2]]
def get_simulated_timestamp(time, position):
return time + numpy.linalg.norm(numpy.array(air_modes.mlat.llh2ecef(position))-numpy.array(air_modes.mlat.llh2geoid(info.position))) / air_modes.mlat.c
prange = numpy.linalg.norm(numpy.array(air_modes.mlat.llh2ecef(position))-numpy.array(air_modes.mlat.llh2geoid(info.position))) / air_modes.mlat.c
return time + prange + random.random()*60e-9 - 30e-9
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(1)
sock.connect(("localhost", 31337))
sock.connect(("localhost", 19005))
sock.send(pickle.dumps(info))
print sock.recv(1024)
sock.setblocking(0)
ts = 0.0
while 1:
pos = get_pos(ts)
@ -53,6 +55,11 @@ while 1:
data1.frac_secs = float(stamp)
data1.frac_secs -= int(data1.frac_secs)
sock.send(pickle.dumps([data1]))
try:
positions = sock.recv(1024)
except:
pass
if positions: print positions
ts+=1
time.sleep(1)

View File

@ -51,17 +51,22 @@ class adsb_rx_block (gr.top_block):
self.args = args
rate = int(options.rate)
use_resampler = False
self.time_source = None
if options.filename is None and options.udp is None and not options.rtlsdr:
#UHD source by default
from gnuradio import uhd
self.u = uhd.single_usrp_source(options.args, uhd.io_type_t.COMPLEX_FLOAT32, 1)
time_spec = uhd.time_spec(0.0)
self.u.set_time_now(time_spec)
#if(options.rx_subdev_spec is None):
# options.rx_subdev_spec = ""
#self.u.set_subdev_spec(options.rx_subdev_spec)
#check for GPSDO
#if you have a GPSDO, UHD will automatically set the timestamp to UTC time
#as well as automatically set the clock to lock to GPSDO.
if self.u.get_time_source(0) == 'gpsdo':
self.time_source = 'gpsdo'
else:
self.time_source = None
self.u.set_time_now(uhd.time_spec(0.0))
if not options.antenna is None:
self.u.set_antenna(options.antenna)
@ -126,6 +131,11 @@ class adsb_rx_block (gr.top_block):
def printraw(msg):
print msg
def printmlat(msg):
msglist = pickle.loads(msg)
for msg in msglist:
print "mlat message: %x at [%f, %f]" % (msg.data, msg.lat, msg.lon)
if __name__ == '__main__':
usage = "%prog: [options] output filename"
parser = OptionParser(option_class=eng_option, usage=usage)
@ -165,6 +175,8 @@ if __name__ == '__main__':
help="Use RTLSDR dongle instead of UHD source")
parser.add_option("-p","--pmf", action="store_true", default=False,
help="Use pulse matched filtering")
parser.add_option("-s","--mlat-server", type="string", default=None,
help="Use a multilateration server to track non-ADS-B aircraft")
(options, args) = parser.parse_args()
@ -173,8 +185,10 @@ if __name__ == '__main__':
my_position = reader.next()
queue = gr.msg_queue()
mlat_queue = None
outputs = [] #registry of plugin output functions
mlat_outputs = [] #registry of plugin mlat handling functions
updates = [] #registry of plugin update functions
if options.raw is True:
@ -205,13 +219,21 @@ if __name__ == '__main__':
outputs.append(fgout.output)
fg = adsb_rx_block(options, args, queue)
if options.mlat_server is not None:
mlat_queue = gr.msg_queue()
mlat_client = air_modes.mlat_client(mlat_queue, my_position, options.mlat_server, fg.time_source)
outputs.append(mlat_client.output) #output to server when we get a report
updates.append(mlat_client.get_mlat_positions) #check for replies from the server
#note: this means that get_mlat_positions and printmlat execute in the same thread.
#you could just have mlat_client spawn a thread to check its socket. might make more sense to do this.
mlat_outputs.append(printmlat)
runner = top_block_runner(fg)
while 1:
try:
#the update registry is really for the SBS1 and raw server plugins -- we're looking for new TCP connections.
#i think we have to do this here rather than in the output handler because otherwise connections will stack up
#until the next output arrives
#handle the once-per-loop updates (check for mlat responses, add TCP conns to SBS-1 plugin, etc.)
for update in updates:
update()

View File

@ -174,8 +174,7 @@ int air_modes_slicer::work(int noutput_items,
unsigned long crc = modes_check_crc(d_data, packet_length);
//crc for packets that aren't type 11 or type 17 is encoded with the transponder ID, which we don't know
//therefore we toss 'em if there's syndrome
//crc for the other short packets is usually nonzero, so they can't really be trusted that far
//we forward them if they have no low-confidence bits (see above), but this still lets some crap through.
if(crc && (message_type == 11 || message_type == 17)) {continue;}
std::ostringstream payload;
for(int m = 0; m < packet_length/8; m++) {

View File

@ -35,6 +35,7 @@ GR_PYTHON_INSTALL(
az_map.py
cpr.py
mlat.py
mlat_client.py
exceptions.py
flightgear.py
gui_model.py

View File

@ -58,6 +58,7 @@ from sql import output_sql
from sbs1 import output_sbs1
from kml import output_kml
from raw_server import raw_server
from mlat_client import mlat_client
from exceptions import *
from az_map import *
#this is try/excepted in case the user doesn't have numpy installed