Progress toward rewriting the parser to be less insane.

This commit is contained in:
Nick Foster 2013-06-18 17:34:11 -07:00
parent 230356bcaa
commit a1e2297134
5 changed files with 260 additions and 288 deletions

View File

@ -20,10 +20,12 @@
# #
from gnuradio.eng_option import eng_option from gnuradio.eng_option import eng_option
from gnuradio.gr.pubsub import pubsub
from optparse import OptionParser from optparse import OptionParser
import time, os, sys, threading import time, os, sys, threading, math
from string import split, join from string import split, join
import air_modes import air_modes
from air_modes.types import *
from air_modes.exceptions import * from air_modes.exceptions import *
import zmq import zmq
@ -31,26 +33,25 @@ import zmq
def main(): def main():
my_position = None my_position = None
usage = "%prog: [options]" usage = "%prog: [options]"
parser = OptionParser(option_class=eng_option, usage=usage) optparser = OptionParser(option_class=eng_option, usage=usage)
air_modes.modes_radio.add_radio_options(parser) air_modes.modes_radio.add_radio_options(optparser)
parser.add_option("-l","--location", type="string", default=None, optparser.add_option("-l","--location", type="string", default=None,
help="GPS coordinates of receiving station in format xx.xxxxx,xx.xxxxx") help="GPS coordinates of receiving station in format xx.xxxxx,xx.xxxxx")
#data source options #data source options
parser.add_option("-a","--remote", type="string", default=None, optparser.add_option("-a","--remote", type="string", default=None,
help="specify additional servers from which to take data in format tcp://x.x.x.x:y,tcp://....") help="specify additional servers from which to take data in format tcp://x.x.x.x:y,tcp://....")
parser.add_option("-n","--no-print", action="store_true", default=False, optparser.add_option("-n","--no-print", action="store_true", default=False,
help="disable printing decoded packets to stdout") help="disable printing decoded packets to stdout")
#output plugins #output plugins
parser.add_option("-K","--kml", type="string", default=None, optparser.add_option("-K","--kml", type="string", default=None,
help="filename for Google Earth KML output") help="filename for Google Earth KML output")
parser.add_option("-P","--sbs1", action="store_true", default=False, optparser.add_option("-P","--sbs1", action="store_true", default=False,
help="open an SBS-1-compatible server on port 30003") help="open an SBS-1-compatible server on port 30003")
parser.add_option("-m","--multiplayer", type="string", default=None, optparser.add_option("-m","--multiplayer", type="string", default=None,
help="FlightGear server to send aircraft data, in format host:port") help="FlightGear server to send aircraft data, in format host:port")
(options, args) = optparser.parse_args()
(options, args) = parser.parse_args()
#construct the radio #construct the radio
context = zmq.Context(1) context = zmq.Context(1)
@ -60,6 +61,20 @@ def main():
servers += options.remote.split(",") servers += options.remote.split(",")
relay = air_modes.zmq_pubsub_iface(context, subaddr=servers, pubaddr=None) relay = air_modes.zmq_pubsub_iface(context, subaddr=servers, pubaddr=None)
#ok now relay is gonna get all those tasty strings
#internally we want to distribute parsed data instead, lighten the load
publisher = pubsub()
def send(message):
[data, ecc, reference, timestamp] = message.split()
try:
ret = air_modes.modes_report(air_modes.modes_reply(int(data, 16)), int(ecc, 16), 20.0*math.log10(float(reference)), air_modes.stamp(0, float(timestamp)))
publisher["modes_dl"] = ret
publisher["type%i_dl" % ret.data.get_type()] = ret
except ADSBError:
pass
relay.subscribe("dl_data", send)
if options.location is not None: if options.location is not None:
my_position = [float(n) for n in options.location.split(",")] my_position = [float(n) for n in options.location.split(",")]
@ -71,7 +86,8 @@ def main():
relay.subscribe("dl_data", sqldb.insert) relay.subscribe("dl_data", sqldb.insert)
if options.no_print is not True: if options.no_print is not True:
relay.subscribe("dl_data", air_modes.output_print(my_position).output) #relay.subscribe("dl_data", air_modes.output_print(my_position).output)
printer = air_modes.output_print(my_position, publisher)
if options.multiplayer is not None: if options.multiplayer is not None:
[fghost, fgport] = options.multiplayer.split(':') [fghost, fgport] = options.multiplayer.split(':')

View File

@ -46,6 +46,7 @@ GR_PYTHON_INSTALL(
rx_path.py rx_path.py
sbs1.py sbs1.py
sql.py sql.py
types.py
zmq_socket.py zmq_socket.py
Quaternion.py Quaternion.py
DESTINATION ${GR_PYTHON_DIR}/air_modes DESTINATION ${GR_PYTHON_DIR}/air_modes

View File

@ -53,7 +53,7 @@ from air_modes_swig import *
# #
from rx_path import rx_path from rx_path import rx_path
from zmq_socket import zmq_pubsub_iface from zmq_socket import zmq_pubsub_iface
from parse import parse,modes_reply from parse import *
from msprint import output_print from msprint import output_print
from sql import output_sql from sql import output_sql
from sbs1 import output_sbs1 from sbs1 import output_sbs1
@ -62,6 +62,8 @@ from raw_server import raw_server
from radio import modes_radio from radio import modes_radio
from exceptions import * from exceptions import *
from az_map import * from az_map import *
from types import *
from altitude import *
#this is try/excepted in case the user doesn't have numpy installed #this is try/excepted in case the user doesn't have numpy installed
try: try:
from flightgear import output_flightgear from flightgear import output_flightgear

View File

@ -25,122 +25,102 @@ import air_modes
from air_modes.exceptions import * from air_modes.exceptions import *
import math import math
class output_print(air_modes.parse): #TODO get rid of class and convert to functions
def __init__(self, mypos): #no need for class here
air_modes.parse.__init__(self, mypos) class output_print:
def __init__(self, mypos, publisher):
#self._cpr = air_modes.cpr_decoder(mypos)
#sub to every function that starts with "print"
self._fns = [int(l[6:]) for l in dir(self) if l.startswith("handle")]
for i in self._fns:
publisher.subscribe("type%i_dl" % i, getattr(self, "handle%i" % i))
def parse(self, message): publisher.subscribe("modes_dl", self.catch_nohandler)
[data, ecc, reference, timestamp] = message.split()
ecc = long(ecc, 16) @staticmethod
reference = float(reference) def prefix(msg):
timestamp = float(timestamp) return "(%i %.8f) " % (msg.rssi, msg.timestamp)
if reference == 0.0: def catch_nohandler(self, msg):
refdb = -150.0 if msg.data.get_type() not in self._fns:
else: retstr = output_print.prefix(msg)
refdb = 20.0*math.log10(reference) retstr += "No handler for message type %i" % msg.data.get_type()
output = "(%.0f %.10f) " % (refdb, timestamp); if "ap" in msg.data.fields:
retstr += " from %.6x" % msg.data["ap"]
print retstr
def handle0(self, msg):
try: try:
data = air_modes.modes_reply(long(data, 16)) retstr = output_print.prefix(msg)
msgtype = data["df"] retstr += "Type 0 (short A-A surveillance) from %x at %ift" % (msg.ecc, air_modes.decode_alt(msg.data["ac"], True))
if msgtype == 0: ri = msg.data["ri"]
output += self.print0(data, ecc) if ri == 0:
elif msgtype == 4: retstr += " (No TCAS)"
output += self.print4(data, ecc) elif ri == 2:
elif msgtype == 5: retstr += " (TCAS resolution inhibited)"
output += self.print5(data, ecc) elif ri == 3:
elif msgtype == 11: retstr += " (Vertical TCAS resolution only)"
output += self.print11(data, ecc) elif ri == 4:
elif msgtype == 17: retstr += " (Full TCAS resolution)"
output += self.print17(data) elif ri == 9:
elif msgtype == 20 or msgtype == 21 or msgtype == 16: retstr += " (speed <75kt)"
output += self.printTCAS(data, ecc) elif ri > 9:
retstr += " (speed %i-%ikt)" % (75 * (1 << (ri-10)), 75 * (1 << (ri-9)))
else: else:
output += "No handler for message type %i from %x (but it's in modes_parse)" % (msgtype, ecc) raise ADSBError
return output
except NoHandlerError as e:
output += "No handler for message type %s from %x" % (e.msgtype, ecc)
return output
except MetricAltError:
pass
except CPRNoPositionError:
pass
def output(self, msg): except ADSBError:
try: return
parsed = self.parse(msg)
if parsed is not None:
print self.parse(msg)
except ADSBError:
pass
def print0(self, shortdata, ecc): if msg.data["vs"] is 1:
[vs, cc, sl, ri, altitude] = self.parse0(shortdata)
retstr = "Type 0 (short A-A surveillance) from %x at %ift" % (ecc, altitude)
if ri == 0:
retstr += " (No TCAS)"
elif ri == 2:
retstr += " (TCAS resolution inhibited)"
elif ri == 3:
retstr += " (Vertical TCAS resolution only)"
elif ri == 4:
retstr += " (Full TCAS resolution)"
elif ri == 9:
retstr += " (speed <75kt)"
elif ri > 9:
retstr += " (speed %i-%ikt)" % (75 * (1 << (ri-10)), 75 * (1 << (ri-9)))
if vs is True:
retstr += " (aircraft is on the ground)" retstr += " (aircraft is on the ground)"
return retstr print retstr
def print4(self, shortdata, ecc):
[fs, dr, um, altitude] = self.parse4(shortdata)
retstr = "Type 4 (short surveillance altitude reply) from %x at %ift" % (ecc, altitude)
@staticmethod
def fs_text(fs):
if fs == 1: if fs == 1:
retstr += " (aircraft is on the ground)" return " (aircraft is on the ground)"
elif fs == 2: elif fs == 2:
retstr += " (AIRBORNE ALERT)" return " (AIRBORNE ALERT)"
elif fs == 3: elif fs == 3:
retstr += " (GROUND ALERT)" return " (GROUND ALERT)"
elif fs == 4: elif fs == 4:
retstr += " (SPI ALERT)" return " (SPI ALERT)"
elif fs == 5: elif fs == 5:
retstr += " (SPI)" return " (SPI)"
else:
raise ADSBError
return retstr def handle4(self, msg):
try:
retstr = output_print.prefix(msg)
retstr += "Type 4 (short surveillance altitude reply) from %x at %ift" % (msg.ecc, air_modes.decode_alt(msg.data["ac"], True))
retstr += output_print.fs_text(msg.data["fs"])
except ADSBError:
return
print retstr
def print5(self, shortdata, ecc): def handle5(self, msg):
[fs, dr, um, ident] = self.parse5(shortdata) try:
retstr = output_print.prefix(msg)
retstr += "Type 5 (short surveillance ident reply) from %x with ident %i" % (msg.ecc, air_modes.decode_id(msg.data["id"]))
retstr += output_print.fs_text(msg.data["fs"])
except ADSBError:
return
print retstr
retstr = "Type 5 (short surveillance ident reply) from %x with ident %i" % (ecc, ident) def handle11(self, msg):
try:
retstr = output_print.prefix(msg)
retstr += "Type 11 (all call reply) from %x in reply to interrogator %i with capability level %i" % (msg.data["aa"], msg.ecc & 0xF, msg.data["ca"]+1)
except ADSBError:
return
print retstr
if fs == 1: #the only one which requires state
retstr += " (aircraft is on the ground)" def handle17(self, data):
elif fs == 2: return
retstr += " (AIRBORNE ALERT)"
elif fs == 3:
retstr += " (GROUND ALERT)"
elif fs == 4:
retstr += " (SPI ALERT)"
elif fs == 5:
retstr += " (SPI)"
return retstr
def print11(self, data, ecc):
[icao24, interrogator, ca] = self.parse11(data, ecc)
retstr = "Type 11 (all call reply) from %x in reply to interrogator %i with capability level %i" % (icao24, interrogator, ca+1)
return retstr
def print17(self, data):
icao24 = data["aa"] icao24 = data["aa"]
bdsreg = data["me"].get_type() bdsreg = data["me"].get_type()
@ -189,7 +169,8 @@ class output_print(air_modes.parse):
return retstr return retstr
def printTCAS(self, data, ecc): def printTCAS(self, msg):
return
msgtype = data["df"] msgtype = data["df"]
if msgtype == 20 or msgtype == 16: if msgtype == 20 or msgtype == 16:
#type 16 does not have fs, dr, um but we get alt here #type 16 does not have fs, dr, um but we get alt here
@ -244,3 +225,7 @@ class output_print(air_modes.parse):
retstr += " ident %x" % ident retstr += " ident %x" % ident
return retstr return retstr
handle16 = printTCAS
handle20 = printTCAS
handle21 = printTCAS

View File

@ -254,199 +254,167 @@ def decode_id(id):
return (a * 1000) + (b * 100) + (c * 10) + d return (a * 1000) + (b * 100) + (c * 10) + d
class parse: #decode ident squawks
def __init__(self, mypos): def charmap(self, d):
self.my_location = mypos if d > 0 and d < 27:
self.cpr = cpr.cpr_decoder(self.my_location) retval = chr(ord("A")+d-1)
elif d == 32:
retval = " "
elif d > 47 and d < 58:
retval = chr(ord("0")+d-48)
else:
retval = " "
def parse0(self, data): return retval
altitude = decode_alt(data["ac"], True)
return [data["vs"], data["cc"], data["sl"], data["ri"], altitude]
def parse4(self, data):
altitude = decode_alt(data["ac"], True)
return [data["fs"], data["dr"], data["um"], altitude]
def parse5(self, data):
squawk = decode_id(data["id"])
return [data["fs"], data["dr"], data["um"], squawk]
def parse11(self, data, ecc):
interrogator = ecc & 0x0F
return [data["aa"], interrogator, data["ca"]]
def parseBDS08(self, data):
categories = [["NO INFO", "RESERVED", "RESERVED", "RESERVED", "RESERVED", "RESERVED", "RESERVED", "RESERVED"],\ categories = [["NO INFO", "RESERVED", "RESERVED", "RESERVED", "RESERVED", "RESERVED", "RESERVED", "RESERVED"],\
["NO INFO", "SURFACE EMERGENCY VEHICLE", "SURFACE SERVICE VEHICLE", "FIXED OBSTRUCTION", "CLUSTER OBSTRUCTION", "LINE OBSTRUCTION", "RESERVED"],\ ["NO INFO", "SURFACE EMERGENCY VEHICLE", "SURFACE SERVICE VEHICLE", "FIXED OBSTRUCTION", "CLUSTER OBSTRUCTION", "LINE OBSTRUCTION", "RESERVED"],\
["NO INFO", "GLIDER", "BALLOON/BLIMP", "PARACHUTE", "ULTRALIGHT", "RESERVED", "UAV", "SPACECRAFT"],\ ["NO INFO", "GLIDER", "BALLOON/BLIMP", "PARACHUTE", "ULTRALIGHT", "RESERVED", "UAV", "SPACECRAFT"],\
["NO INFO", "LIGHT", "SMALL", "LARGE", "LARGE HIGH VORTEX", "HEAVY", "HIGH PERFORMANCE", "ROTORCRAFT"]] ["NO INFO", "LIGHT", "SMALL", "LARGE", "LARGE HIGH VORTEX", "HEAVY", "HIGH PERFORMANCE", "ROTORCRAFT"]]
def parseBDS08(self, data): catstring = categories[data["ftc"]-1][data["cat"]]
catstring = self.categories[data["ftc"]-1][data["cat"]]
msg = "" msg = ""
for i in range(0, 8): for i in range(0, 8):
msg += self.charmap(data["ident"] >> (42-6*i) & 0x3F) msg += charmap(data["ident"] >> (42-6*i) & 0x3F)
return (msg, catstring) return (msg, catstring)
def charmap(self, d): #NOTE: this is stateful -- requires CPR decoder
if d > 0 and d < 27: def parseBDS05(self, data, cpr):
retval = chr(ord("A")+d-1) altitude = decode_alt(data["alt"], False)
elif d == 32: [decoded_lat, decoded_lon, rnge, bearing] = cpr.decode(data["aa"], data["lat"], data["lon"], data["cpr"], 0)
retval = " " return [altitude, decoded_lat, decoded_lon, rnge, bearing]
elif d > 47 and d < 58:
retval = chr(ord("0")+d-48)
else:
retval = " "
return retval #NOTE: this is stateful -- requires CPR decoder
def parseBDS06(self, data, cpr):
ground_track = data["gtk"] * 360. / 128
[decoded_lat, decoded_lon, rnge, bearing] = cpr.decode(data["aa"], data["lat"], data["lon"], data["cpr"], 1)
return [ground_track, decoded_lat, decoded_lon, rnge, bearing]
def parseBDS05(self, data): def parseBDS09_0(self, data):
icao24 = data["aa"] #0: ["sub", "dew", "vew", "dns", "vns", "str", "tr", "svr", "vr"],
vert_spd = data["vr"] * 32
ud = bool(data["dvr"])
if ud:
vert_spd = 0 - vert_spd
turn_rate = data["tr"] * 15/62
rl = data["str"]
if rl:
turn_rate = 0 - turn_rate
ns_vel = data["vns"] - 1
ns = bool(data["dns"])
ew_vel = data["vew"] - 1
ew = bool(data["dew"])
encoded_lon = data["lon"] velocity = math.hypot(ns_vel, ew_vel)
encoded_lat = data["lat"] if ew:
cpr_format = data["cpr"] ew_vel = 0 - ew_vel
altitude = decode_alt(data["alt"], False) if ns:
ns_vel = 0 - ns_vel
heading = math.atan2(ew_vel, ns_vel) * (180.0 / math.pi)
if heading < 0:
heading += 360
[decoded_lat, decoded_lon, rnge, bearing] = self.cpr.decode(icao24, encoded_lat, encoded_lon, cpr_format, 0) return [velocity, heading, vert_spd, turn_rate]
return [altitude, decoded_lat, decoded_lon, rnge, bearing] def parseBDS09_1(self, data):
#1: ["sub", "icf", "ifr", "nuc", "dew", "vew", "dns", "vns", "vrsrc", "dvr", "vr", "dhd", "hd"],
alt_geo_diff = data["hd"] * 25
above_below = bool(data["dhd"])
if above_below:
alt_geo_diff = 0 - alt_geo_diff;
vert_spd = float(data["vr"] - 1) * 64
ud = bool(data["dvr"])
if ud:
vert_spd = 0 - vert_spd
vert_src = bool(data["vrsrc"])
ns_vel = float(data["vns"])
ns = bool(data["dns"])
ew_vel = float(data["vew"])
ew = bool(data["dew"])
subtype = data["sub"]
if subtype == 0x02:
ns_vel <<= 2
ew_vel <<= 2
velocity = math.hypot(ns_vel, ew_vel)
if ew:
ew_vel = 0 - ew_vel
#welp turns out it looks like there's only 17 bits in the BDS0,6 ground packet after all. if ns_vel == 0:
def parseBDS06(self, data): heading = 0
icao24 = data["aa"] else:
heading = math.atan(float(ew_vel) / float(ns_vel)) * (180.0 / math.pi)
if ns:
heading = 180 - heading
if heading < 0:
heading += 360
encoded_lon = data["lon"] return [velocity, heading, vert_spd]
encoded_lat = data["lat"]
cpr_format = data["cpr"]
ground_track = data["gtk"] * 360. / 128
[decoded_lat, decoded_lon, rnge, bearing] = self.cpr.decode(icao24, encoded_lat, encoded_lon, cpr_format, 1)
return [ground_track, decoded_lat, decoded_lon, rnge, bearing]
def parseBDS09_0(self, data): def parseBDS09_3(self, data):
#0: ["sub", "dew", "vew", "dns", "vns", "str", "tr", "svr", "vr"],
vert_spd = data["vr"] * 32
ud = bool(data["dvr"])
if ud:
vert_spd = 0 - vert_spd
turn_rate = data["tr"] * 15/62
rl = data["str"]
if rl:
turn_rate = 0 - turn_rate
ns_vel = data["vns"] - 1
ns = bool(data["dns"])
ew_vel = data["vew"] - 1
ew = bool(data["dew"])
velocity = math.hypot(ns_vel, ew_vel)
if ew:
ew_vel = 0 - ew_vel
if ns:
ns_vel = 0 - ns_vel
heading = math.atan2(ew_vel, ns_vel) * (180.0 / math.pi)
if heading < 0:
heading += 360
return [velocity, heading, vert_spd, turn_rate]
def parseBDS09_1(self, data):
#1: ["sub", "icf", "ifr", "nuc", "dew", "vew", "dns", "vns", "vrsrc", "dvr", "vr", "dhd", "hd"],
alt_geo_diff = data["hd"] * 25
above_below = bool(data["dhd"])
if above_below:
alt_geo_diff = 0 - alt_geo_diff;
vert_spd = float(data["vr"] - 1) * 64
ud = bool(data["dvr"])
if ud:
vert_spd = 0 - vert_spd
vert_src = bool(data["vrsrc"])
ns_vel = float(data["vns"])
ns = bool(data["dns"])
ew_vel = float(data["vew"])
ew = bool(data["dew"])
subtype = data["sub"]
if subtype == 0x02:
ns_vel <<= 2
ew_vel <<= 2
velocity = math.hypot(ns_vel, ew_vel)
if ew:
ew_vel = 0 - ew_vel
if ns_vel == 0:
heading = 0
else:
heading = math.atan(float(ew_vel) / float(ns_vel)) * (180.0 / math.pi)
if ns:
heading = 180 - heading
if heading < 0:
heading += 360
return [velocity, heading, vert_spd]
def parseBDS09_3(self, data):
#3: {"sub", "icf", "ifr", "nuc", "mhs", "hdg", "ast", "spd", "vrsrc", #3: {"sub", "icf", "ifr", "nuc", "mhs", "hdg", "ast", "spd", "vrsrc",
# "dvr", "vr", "dhd", "hd"} # "dvr", "vr", "dhd", "hd"}
mag_hdg = data["mhs"] * 360. / 1024 mag_hdg = data["mhs"] * 360. / 1024
vel_src = "TAS" if data["ast"] == 1 else "IAS" vel_src = "TAS" if data["ast"] == 1 else "IAS"
vel = data["spd"] vel = data["spd"]
if data["sub"] == 4: if data["sub"] == 4:
vel *= 4 vel *= 4
vert_spd = float(data["vr"] - 1) * 64 vert_spd = float(data["vr"] - 1) * 64
if data["dvr"] == 1: if data["dvr"] == 1:
vert_spd = 0 - vert_spd vert_spd = 0 - vert_spd
geo_diff = float(data["hd"] - 1) * 25 geo_diff = float(data["hd"] - 1) * 25
return [mag_hdg, vel_src, vel, vert_spd, geo_diff] return [mag_hdg, vel_src, vel, vert_spd, geo_diff]
def parseBDS62(self, data): def parseBDS62(self, data):
eps_strings = ["NO EMERGENCY", "GENERAL EMERGENCY", "LIFEGUARD/MEDICAL", "FUEL EMERGENCY", eps_strings = ["NO EMERGENCY", "GENERAL EMERGENCY", "LIFEGUARD/MEDICAL", "FUEL EMERGENCY",
"NO COMMUNICATIONS", "UNLAWFUL INTERFERENCE", "RESERVED", "RESERVED"] "NO COMMUNICATIONS", "UNLAWFUL INTERFERENCE", "RESERVED", "RESERVED"]
return eps_strings[data["eps"]] return eps_strings[data["eps"]]
def parseMB_id(self, data): #bds1 == 2, bds2 == 0 def parseMB_id(self, data): #bds1 == 2, bds2 == 0
msg = "" msg = ""
for i in range(0, 8): for i in range(0, 8):
msg += self.charmap( data["ais"] >> (42-6*i) & 0x3F) msg += self.charmap( data["ais"] >> (42-6*i) & 0x3F)
return (msg) return (msg)
def parseMB_TCAS_resolutions(self, data): def parseMB_TCAS_resolutions(self, data):
#these are LSB because the ICAO are asshats #these are LSB because the ICAO are asshats
ara_bits = {41: "CLIMB", 42: "DON'T DESCEND", 43: "DON'T DESCEND >500FPM", 44: "DON'T DESCEND >1000FPM", ara_bits = {41: "CLIMB", 42: "DON'T DESCEND", 43: "DON'T DESCEND >500FPM", 44: "DON'T DESCEND >1000FPM",
45: "DON'T DESCEND >2000FPM", 46: "DESCEND", 47: "DON'T CLIMB", 48: "DON'T CLIMB >500FPM", 45: "DON'T DESCEND >2000FPM", 46: "DESCEND", 47: "DON'T CLIMB", 48: "DON'T CLIMB >500FPM",
49: "DON'T CLIMB >1000FPM", 50: "DON'T CLIMB >2000FPM", 51: "TURN LEFT", 52: "TURN RIGHT", 49: "DON'T CLIMB >1000FPM", 50: "DON'T CLIMB >2000FPM", 51: "TURN LEFT", 52: "TURN RIGHT",
53: "DON'T TURN LEFT", 54: "DON'T TURN RIGHT"} 53: "DON'T TURN LEFT", 54: "DON'T TURN RIGHT"}
rac_bits = {55: "DON'T DESCEND", 56: "DON'T CLIMB", 57: "DON'T TURN LEFT", 58: "DON'T TURN RIGHT"} rac_bits = {55: "DON'T DESCEND", 56: "DON'T CLIMB", 57: "DON'T TURN LEFT", 58: "DON'T TURN RIGHT"}
ara = data["ara"] ara = data["ara"]
rac = data["rac"] rac = data["rac"]
#check to see which bits are set #check to see which bits are set
resolutions = "" resolutions = ""
for bit in ara_bits: for bit in ara_bits:
if ara & (1 << (54-bit)): if ara & (1 << (54-bit)):
resolutions += " " + ara_bits[bit] resolutions += " " + ara_bits[bit]
complements = "" complements = ""
for bit in rac_bits: for bit in rac_bits:
if rac & (1 << (58-bit)): if rac & (1 << (58-bit)):
complements += " " + rac_bits[bit] complements += " " + rac_bits[bit]
return (resolutions, complements) return (resolutions, complements)
#rat is 1 if resolution advisory terminated <18s ago #rat is 1 if resolution advisory terminated <18s ago
#mte is 1 if multiple threats indicated #mte is 1 if multiple threats indicated
#tti is threat type: 1 if ID, 2 if range/brg/alt #tti is threat type: 1 if ID, 2 if range/brg/alt
#tida is threat altitude in Mode C format #tida is threat altitude in Mode C format
def parseMB_TCAS_threatid(self, data): #bds1==3, bds2==0, TTI==1 def parseMB_TCAS_threatid(self, data): #bds1==3, bds2==0, TTI==1
#3: {"bds1": (33,4), "bds2": (37,4), "ara": (41,14), "rac": (55,4), "rat": (59,1), #3: {"bds1": (33,4), "bds2": (37,4), "ara": (41,14), "rac": (55,4), "rat": (59,1),
# "mte": (60,1), "tti": (61,2), "tida": (63,13), "tidr": (76,7), "tidb": (83,6)} # "mte": (60,1), "tti": (61,2), "tida": (63,13), "tidr": (76,7), "tidb": (83,6)}
(resolutions, complements) = self.parseMB_TCAS_resolutions(data) (resolutions, complements) = self.parseMB_TCAS_resolutions(data)
return (resolutions, complements, data["rat"], data["mte"], data["tid"]) return (resolutions, complements, data["rat"], data["mte"], data["tid"])
def parseMB_TCAS_threatloc(self, data): #bds1==3, bds2==0, TTI==2 def parseMB_TCAS_threatloc(self, data): #bds1==3, bds2==0, TTI==2
(resolutions, complements) = self.parseMB_TCAS_resolutions(data) (resolutions, complements) = self.parseMB_TCAS_resolutions(data)
threat_alt = decode_alt(data["tida"], True) threat_alt = decode_alt(data["tida"], True)
return (resolutions, complements, data["rat"], data["mte"], threat_alt, data["tidr"], data["tidb"]) return (resolutions, complements, data["rat"], data["mte"], threat_alt, data["tidr"], data["tidb"])
#type 16 Coordination Reply Message #type 16 Coordination Reply Message
def parse_TCAS_CRM(self, data): def parse_TCAS_CRM(self, data):
(resolutions, complements) = self.parseMB_TCAS_resolutions(data) (resolutions, complements) = self.parseMB_TCAS_resolutions(data)
return (resolutions, complements, data["rat"], data["mte"]) return (resolutions, complements, data["rat"], data["mte"])