gr-air-modes/python/parse.py
Nick Foster 9bdac2a499 Use whole/fractional timestamps in the whole chain. This prevents loss
of precision when setting time to UTC.
2015-04-17 14:07:28 -07:00

438 lines
15 KiB
Python

#
# Copyright 2010, 2012 Nick Foster
#
# This file is part of gr-air-modes
#
# gr-air-modes is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3, or (at your option)
# any later version.
#
# gr-air-modes is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with gr-air-modes; see the file COPYING. If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
#
import time, os, sys
from string import split, join
from altitude import decode_alt
import math
import air_modes
from air_modes.exceptions import *
#this implements a packet class which can retrieve its own fields.
class data_field:
def __init__(self, data):
self.data = data
self.fields = self.parse()
types = { }
offset = 1 #field offset applied to all fields. used for offsetting
#subtypes to reconcile with the spec. Really just for readability.
#get a particular field from the data
def __getitem__(self, fieldname):
mytype = self.get_type()
if mytype in self.types:
if fieldname in self.fields: #verify it exists in this packet type
return self.fields[fieldname]
else:
raise FieldNotInPacket(fieldname)
else:
raise NoHandlerError(mytype)
#grab all the fields in the packet as a dict
#done once on init so you don't have to iterate down every time you grab a field
def parse(self):
fields = {}
mytype = self.get_type()
if mytype in self.types:
for field in self.types[mytype]:
bits = self.types[self.get_type()][field]
if len(bits) == 3:
obj = bits[2](self.get_bits(bits[0], bits[1]))
fields.update(obj.parse())
fields.update({field: obj})
else:
fields.update({field: self.get_bits(bits[0], bits[1])})
else:
raise NoHandlerError(mytype)
return fields
def get_type(self):
raise NotImplementedError
def get_numbits(self):
raise NotImplementedError
#retrieve bits from data given the offset and number of bits.
#the offset is both left-justified (LSB) and starts at 1, to
#correspond to the Mode S spec. Blame them.
def get_bits(self, *args):
startbit = args[0]
num = args[1]
bits = 0
try:
bits = (self.data \
>> (self.get_numbits() - startbit - num + self.offset)) \
& ((1 << num) - 1)
#the exception handler catches instances when you try to shift more than
#the number of bits. this can happen when a garbage packet gets through
#which reports itself as a short packet but of type long.
#TODO: should find more productive way to throw this out
except ValueError:
pass
#print "Short packet received for long packet type: %x" % self.data
return bits
class bds09_reply(data_field):
offset = 6
types = { #BDS0,9 subtype 0
0: {"sub": (6,3), "dew": (10,1), "vew": (11,11), "dns": (22,1),
"vns": (23,11), "str": (34,1), "tr": (35,6), "dvr": (41,1),
"vr": (42,9)},
#BDS0,9 subtypes 1-2 (differ only in velocity encoding)
1: {"sub": (6,3), "icf": (9,1), "ifr": (10,1), "nuc": (11,3),
"dew": (14,1), "vew": (15,10), "dns": (25,1), "vns": (26,10),
"vrsrc": (36,1), "dvr": (37,1), "vr": (38,9), "dhd": (49,1), "hd": (50,6)},
#BDS0,9 subtype 3-4 (airspeed and heading)
3: {"sub": (6,3), "icf": (9,1), "ifr": (10,1), "nuc": (11,3), "mhs": (14,1),
"hdg": (15,10), "ast": (25,1), "spd": (26,10), "vrsrc": (36,1),
"dvr": (37,1), "vr": (38,9), "dhd": (49,1), "hd": (50,6)}
}
def get_type(self):
sub = self.get_bits(6,3)
if sub == 0:
return 0
if 1 <= sub <= 2:
return 1
if 3 <= sub <= 4:
return 3
def get_numbits(self):
return 51
#type 17 extended squitter data
class me_reply(data_field):
#types in this format are listed by BDS register
#TODO: add comments explaining these fields
types = { 0x05: {"ftc": (1,5), "ss": (6,2), "saf": (8,1), "alt": (9,12), "time": (21,1), "cpr": (22,1), "lat": (23,17), "lon": (40,17)}, #airborne position
0x06: {"ftc": (1,5), "mvt": (6,7), "gts": (13,1), "gtk": (14,7), "time": (21,1), "cpr": (22,1), "lat": (23,17), "lon": (40,17)}, #surface position
0x07: {"ftc": (1,5),}, #TODO extended squitter status
0x08: {"ftc": (1,5), "cat": (6,3), "ident": (9,48)}, #extended squitter identification and type
0x09: {"ftc": (1,5), "bds09": (6,51, bds09_reply)},
#0x0A: data link capability report
#0x17: common usage capability report
#0x18-0x1F: Mode S specific services capability report
#0x20: aircraft identification
0x61: {"ftc": (1,5), "eps": (9,3)}
}
#maps ftc to BDS register
def get_type(self):
ftc = self.get_bits(1,5)
if 1 <= ftc <= 4:
return 0x08
elif 5 <= ftc <= 8:
return 0x06
elif 9 <= ftc <= 18 and ftc != 15: #FTC 15 does not appear to be valid
return 0x05
elif ftc == 19:
return 0x09
elif ftc == 28:
return 0x61
else:
return NoHandlerError(ftc)
def get_numbits(self):
return 56
#resolves the TCAS reply types from TTI info
class tcas_reply(data_field):
offset = 61
types = { 0: {"tti": (61,2)}, #UNKNOWN
1: {"tti": (61,2), "tid": (63,26)},
2: {"tti": (61,2), "tida": (63,13), "tidr": (76,7), "tidb": (83,6)}
}
def get_type(self):
return self.get_bits(61,2)
def get_numbits(self):
return 28
#extended squitter types 20,21 MB subfield
class mb_reply(data_field):
offset = 33 #fields offset by 33 to match documentation
#types are based on bds1 subfield
types = { 0: {"bds1": (33,4), "bds2": (37,4)}, #TODO
1: {"bds1": (33,4), "bds2": (37,4), "cfs": (41,4), "acs": (45,20), "bcs": (65,16), "ecs": (81,8)},
2: {"bds1": (33,4), "bds2": (37,4), "ais": (41,48)},
3: {"bds1": (33,4), "bds2": (37,4), "ara": (41,14), "rac": (55,4), "rat": (59,1),
"mte": (60,1), "tcas": (61, 28, tcas_reply)}
}
def get_type(self):
bds1 = self.get_bits(33,4)
bds2 = self.get_bits(37,4)
if bds1 not in (0,1,2,3) or bds2 not in (0,):
raise NoHandlerError(bds1)
return int(bds1)
def get_numbits(self):
return 56
# #type MV (extended squitter type 16) subfields
# mv_fields = { "ara": (41,14), "mte": (60,1), "rac": (55,4), "rat": (59,1),
# "vds": (33,8), "vds1": (33,4), "vds2": (37,4)
# }
class mv_reply(data_field):
offset = 33
types = { "ara": (41,14), "mte": (60,1), "rac": (55,4), "rat": (59,1),
"vds": (33,8), "vds1": (33,4), "vds2": (37,4)
}
def get_type(self):
vds1 = self.get_bits(33,4)
vds2 = self.get_bits(37,4)
if vds1 not in (3,) or vds2 not in (0,):
raise NoHandlerError(bds1)
return int(vds1)
def get_numbits(self):
return 56
#the whole Mode S packet type
class modes_reply(data_field):
types = { 0: {"df": (1,5), "vs": (6,1), "cc": (7,1), "sl": (9,3), "ri": (14,4), "ac": (20,13), "ap": (33,24)},
4: {"df": (1,5), "fs": (6,3), "dr": (9,5), "um": (14,6), "ac": (20,13), "ap": (33,24)},
5: {"df": (1,5), "fs": (6,3), "dr": (9,5), "um": (14,6), "id": (20,13), "ap": (33,24)},
11: {"df": (1,5), "ca": (6,3), "aa": (9,24), "pi": (33,24)},
16: {"df": (1,5), "vs": (6,1), "sl": (9,3), "ri": (14,4), "ac": (20,13), "mv": (33,56), "ap": (88,24)},
17: {"df": (1,5), "ca": (6,3), "aa": (9,24), "me": (33,56, me_reply), "pi": (88,24)},
20: {"df": (1,5), "fs": (6,3), "dr": (9,5), "um": (14,6), "ac": (20,13), "mb": (33,56, mb_reply), "ap": (88,24)},
21: {"df": (1,5), "fs": (6,3), "dr": (9,5), "um": (14,6), "id": (20,13), "mb": (33,56, mb_reply), "ap": (88,24)},
24: {"df": (1,5), "ke": (6,1), "nd": (7,4), "md": (11,80), "ap": (88,24)}
}
def is_long(self):
return self.data > (1 << 56)
def get_numbits(self):
return 112 if self.is_long() else 56
def get_type(self):
return self.get_bits(1,5)
#unscramble mode A/C-style squawk codes for type 5 replies below
def decode_id(id):
C1 = 0x1000
A1 = 0x0800
C2 = 0x0400
A2 = 0x0200 #this represents the order in which the bits come
C4 = 0x0100
A4 = 0x0080
B1 = 0x0020
D1 = 0x0010
B2 = 0x0008
D2 = 0x0004
B4 = 0x0002
D4 = 0x0001
a = ((id & A1) >> 11) + ((id & A2) >> 8) + ((id & A4) >> 5)
b = ((id & B1) >> 5) + ((id & B2) >> 2) + ((id & B4) << 1)
c = ((id & C1) >> 12) + ((id & C2) >> 9) + ((id & C4) >> 6)
d = ((id & D1) >> 2) + ((id & D2) >> 1) + ((id & D4) << 2)
return (a * 1000) + (b * 100) + (c * 10) + d
#decode ident squawks
def charmap(d):
if d > 0 and d < 27:
retval = chr(ord("A")+d-1)
elif d == 32:
retval = " "
elif d > 47 and d < 58:
retval = chr(ord("0")+d-48)
else:
retval = " "
return retval
def parseBDS08(data):
categories = [["NO INFO", "RESERVED", "RESERVED", "RESERVED", "RESERVED", "RESERVED", "RESERVED", "RESERVED"],\
["NO INFO", "SURFACE EMERGENCY VEHICLE", "SURFACE SERVICE VEHICLE", "FIXED OBSTRUCTION", "CLUSTER OBSTRUCTION", "LINE OBSTRUCTION", "RESERVED"],\
["NO INFO", "GLIDER", "BALLOON/BLIMP", "PARACHUTE", "ULTRALIGHT", "RESERVED", "UAV", "SPACECRAFT"],\
["NO INFO", "LIGHT", "SMALL", "LARGE", "LARGE HIGH VORTEX", "HEAVY", "HIGH PERFORMANCE", "ROTORCRAFT"]]
catstring = categories[data["ftc"]-1][data["cat"]]
msg = ""
for i in range(0, 8):
msg += charmap(data["ident"] >> (42-6*i) & 0x3F)
return (msg, catstring)
#NOTE: this is stateful -- requires CPR decoder
def parseBDS05(data, cprdec):
altitude = decode_alt(data["alt"], False)
[decoded_lat, decoded_lon, rnge, bearing] = cprdec.decode(data["aa"], data["lat"], data["lon"], data["cpr"], 0)
return [altitude, decoded_lat, decoded_lon, rnge, bearing]
#NOTE: this is stateful -- requires CPR decoder
def parseBDS06(data, cprdec):
ground_track = data["gtk"] * 360. / 128
[decoded_lat, decoded_lon, rnge, bearing] = cprdec.decode(data["aa"], data["lat"], data["lon"], data["cpr"], 1)
return [ground_track, decoded_lat, decoded_lon, rnge, bearing]
def parseBDS09_0(data):
#0: ["sub", "dew", "vew", "dns", "vns", "str", "tr", "svr", "vr"],
vert_spd = data["vr"] * 32
ud = bool(data["dvr"])
if ud:
vert_spd = 0 - vert_spd
turn_rate = data["tr"] * 15/62
rl = data["str"]
if rl:
turn_rate = 0 - turn_rate
ns_vel = data["vns"] - 1
ns = bool(data["dns"])
ew_vel = data["vew"] - 1
ew = bool(data["dew"])
velocity = math.hypot(ns_vel, ew_vel)
if ew:
ew_vel = 0 - ew_vel
if ns:
ns_vel = 0 - ns_vel
heading = math.atan2(ew_vel, ns_vel) * (180.0 / math.pi)
if heading < 0:
heading += 360
return [velocity, heading, vert_spd, turn_rate]
def parseBDS09_1(data):
#1: ["sub", "icf", "ifr", "nuc", "dew", "vew", "dns", "vns", "vrsrc", "dvr", "vr", "dhd", "hd"],
alt_geo_diff = data["hd"] * 25
above_below = bool(data["dhd"])
if above_below:
alt_geo_diff = 0 - alt_geo_diff;
vert_spd = float(data["vr"] - 1) * 64
ud = bool(data["dvr"])
if ud:
vert_spd = 0 - vert_spd
vert_src = bool(data["vrsrc"])
ns_vel = float(data["vns"])
ns = bool(data["dns"])
ew_vel = float(data["vew"])
ew = bool(data["dew"])
subtype = data["sub"]
if subtype == 0x02:
ns_vel <<= 2
ew_vel <<= 2
velocity = math.hypot(ns_vel, ew_vel)
if ew:
ew_vel = 0 - ew_vel
if ns_vel == 0:
heading = 0
else:
heading = math.atan(float(ew_vel) / float(ns_vel)) * (180.0 / math.pi)
if ns:
heading = 180 - heading
if heading < 0:
heading += 360
return [velocity, heading, vert_spd]
def parseBDS09_3(data):
#3: {"sub", "icf", "ifr", "nuc", "mhs", "hdg", "ast", "spd", "vrsrc",
# "dvr", "vr", "dhd", "hd"}
mag_hdg = data["mhs"] * 360. / 1024
vel_src = "TAS" if data["ast"] == 1 else "IAS"
vel = data["spd"]
if data["sub"] == 4:
vel *= 4
vert_spd = float(data["vr"] - 1) * 64
if data["dvr"] == 1:
vert_spd = 0 - vert_spd
geo_diff = float(data["hd"] - 1) * 25
return [mag_hdg, vel_src, vel, vert_spd, geo_diff]
def parseBDS62(data):
eps_strings = ["NO EMERGENCY", "GENERAL EMERGENCY", "LIFEGUARD/MEDICAL", "FUEL EMERGENCY",
"NO COMMUNICATIONS", "UNLAWFUL INTERFERENCE", "RESERVED", "RESERVED"]
return eps_strings[data["eps"]]
def parseMB_id(data): #bds1 == 2, bds2 == 0
msg = ""
for i in range(0, 8):
msg += charmap( data["ais"] >> (42-6*i) & 0x3F)
return (msg)
def parseMB_TCAS_resolutions(data):
#these are LSB because the ICAO are asshats
ara_bits = {41: "CLIMB", 42: "DON'T DESCEND", 43: "DON'T DESCEND >500FPM", 44: "DON'T DESCEND >1000FPM",
45: "DON'T DESCEND >2000FPM", 46: "DESCEND", 47: "DON'T CLIMB", 48: "DON'T CLIMB >500FPM",
49: "DON'T CLIMB >1000FPM", 50: "DON'T CLIMB >2000FPM", 51: "TURN LEFT", 52: "TURN RIGHT",
53: "DON'T TURN LEFT", 54: "DON'T TURN RIGHT"}
rac_bits = {55: "DON'T DESCEND", 56: "DON'T CLIMB", 57: "DON'T TURN LEFT", 58: "DON'T TURN RIGHT"}
ara = data["ara"]
rac = data["rac"]
#check to see which bits are set
resolutions = ""
for bit in ara_bits:
if ara & (1 << (54-bit)):
resolutions += " " + ara_bits[bit]
complements = ""
for bit in rac_bits:
if rac & (1 << (58-bit)):
complements += " " + rac_bits[bit]
return (resolutions, complements)
#rat is 1 if resolution advisory terminated <18s ago
#mte is 1 if multiple threats indicated
#tti is threat type: 1 if ID, 2 if range/brg/alt
#tida is threat altitude in Mode C format
def parseMB_TCAS_threatid(data): #bds1==3, bds2==0, TTI==1
#3: {"bds1": (33,4), "bds2": (37,4), "ara": (41,14), "rac": (55,4), "rat": (59,1),
# "mte": (60,1), "tti": (61,2), "tida": (63,13), "tidr": (76,7), "tidb": (83,6)}
(resolutions, complements) = parseMB_TCAS_resolutions(data)
return (resolutions, complements, data["rat"], data["mte"], data["tid"])
def parseMB_TCAS_threatloc(data): #bds1==3, bds2==0, TTI==2
(resolutions, complements) = parseMB_TCAS_resolutions(data)
threat_alt = decode_alt(data["tida"], True)
return (resolutions, complements, data["rat"], data["mte"], threat_alt, data["tidr"], data["tidb"])
#type 16 Coordination Reply Message
def parse_TCAS_CRM(data):
(resolutions, complements) = parseMB_TCAS_resolutions(data)
return (resolutions, complements, data["rat"], data["mte"])
#this decorator takes a pubsub and returns a function which parses and publishes messages
def make_parser(pub):
publisher = pub
def publish(message):
[data, ecc, reference, int_timestamp, frac_timestamp] = message.split()
try:
ret = air_modes.modes_report(modes_reply(int(data, 16)),
int(ecc, 16),
10.0*math.log10(max(1e-8,float(reference))),
air_modes.stamp(int(int_timestamp), float(frac_timestamp)))
pub["modes_dl"] = ret
pub["type%i_dl" % ret.data.get_type()] = ret
except ADSBError:
pass
return publish