Merge branch 'JoseAndresMR-master'

pull/27/head
Junzi Sun 6 years ago
commit 648f4660b7

@ -6,7 +6,10 @@ from .decoder import commb
from .decoder import common
from .decoder import bds
from .extra import aero
from .extra import beastclient
from .extra import tcpclient
from .decoder import els # depricated
from .decoder import ehs # depricated
import os
dirpath = os.path.dirname(os.path.realpath(__file__))

@ -18,9 +18,12 @@ The wrapper for decoding ADS-B messages
"""
from __future__ import absolute_import, print_function, division
import pyModeS as pms
from pyModeS.decoder import common
# from pyModeS.decoder.bds import bds05, bds06, bds09
from pyModeS.decoder import uncertainty
# from pyModeS.decoder.bds import bds05, bds06, bds09
from pyModeS.decoder.bds.bds05 import airborne_position, airborne_position_with_ref, altitude
from pyModeS.decoder.bds.bds06 import surface_position, surface_position_with_ref, surface_velocity
from pyModeS.decoder.bds.bds08 import category, callsign
@ -197,143 +200,126 @@ def version(msg):
return version
def nic_v1(msg, nic_sup_b):
"""Calculate NIC, navigation integrity category for ADS-B version 1
def nuc_p(msg):
"""Calculate NUCp, Navigation Uncertainty Category - Position (ADS-B version 1)
Args:
msg (string): 28 bytes hexadecimal message string,
Returns:
int: Horizontal Protection Limit
int: 95% Containment Radius - Horizontal (meters)
int: 95% Containment Radius - Vertical (meters)
"""
tc = typecode(msg)
if typecode(msg) < 5 or typecode(msg) > 22:
raise RuntimeError(
"%s: Not a surface position message (5<TC<8), \
airborne position message (8<TC<19), \
or airborne position with GNSS height (20<TC<22)" % msg
)
NUCp = uncertainty.TC_NUCp_lookup[tc]
HPL = uncertainty.NUCp[NUCp]['HPL']
RCu = uncertainty.NUCp[NUCp]['RCu']
RCv = uncertainty.NUCp[NUCp]['RCv']
if tc in [20, 21]:
RCv = uncertainty.NA
return HPL, RCu, RCv
def nuc_v(msg):
"""Calculate NUCv, Navigation Uncertainty Category - Velocity (ADS-B version 1)
Args:
msg (string): 28 bytes hexadecimal message string,
Returns:
int or string: 95% Horizontal Velocity Error
int or string: 95% Vertical Velocity Error
"""
tc = typecode(msg)
if tc != 19:
raise RuntimeError("%s: Not an airborne velocity message, expecting TC = 19" % msg)
msgbin = common.hex2bin(msg)
NUCv = common.bin2int(msgbin[42:45])
HVE = uncertainty.NUCv[NUCv]['HVE']
VVE = uncertainty.NUCv[NUCv]['VVE']
return HVE, VVE
def nic_v1(msg, NICs):
"""Calculate NIC, navigation integrity category, for ADS-B version 1
Args:
msg (string): 28 bytes hexadecimal message string
nic_sup_b (int or string): NIC supplement
NICs (int or string): NIC supplement
Returns:
int: NIC number (from 0 to 11), -1 if not applicable
int or string: Horizontal Radius of Containment
int or string: Vertical Protection Limit
"""
if typecode(msg) < 5 or typecode(msg) > 22:
raise RuntimeError("%s: Not a surface position message (5<TC<8), \
airborne position message (8<TC<19), \
or airborne position with GNSS height (20<TC<22)" % msg)
raise RuntimeError(
"%s: Not a surface position message (5<TC<8), \
airborne position message (8<TC<19), \
or airborne position with GNSS height (20<TC<22)" % msg
)
tc = typecode(msg)
NIC = uncertainty.TC_NICv1_lookup[tc]
if nic_sup_b in ['0', '1']:
nic_sup_b = int(nic_sup_b)
if isinstance(NIC, dict):
NIC = NIC[NICs]
if tc in [0, 8, 18, 22]:
nic = 0
elif tc == 17:
nic = 1
elif tc == 16:
if nic_sup_b:
nic = 3
else:
nic = 2
elif tc == 15:
nic = 4
elif tc == 14:
nic = 5
elif tc == 13:
if nic_sup_b:
nic = 6
else:
nic = 6
elif tc == 12:
nic = 7
elif tc == 11:
if nic_sup_b:
nic = 9
else:
nic = 8
elif tc in [6, 10, 21]:
nic = 10
elif tc in [5, 9, 20]:
nic = 11
elif tc == 7:
if nic_sup_b:
nic = 9
else:
nic = 8
else:
nic = -1
return nic
Rc = uncertainty.NICv1[NIC][NICs]['Rc']
VPL = uncertainty.NICv1[NIC][NICs]['VPL']
return Rc, VPL
def nic_v2(msg, nic_a, nic_b, nic_c):
def nic_v2(msg, NICa, NICbc):
"""Calculate NIC, navigation integrity category, for ADS-B version 2
Args:
msg (string): 28 bytes hexadecimal message string
nic_a (int or string): NIC supplement
nic_b (int or srting): NIC supplement
nic_c (int or string): NIC supplement
NICa (int or string): NIC supplement - A
NICbc (int or srting): NIC supplement - B or C
Returns:
int: NIC number (from 0 to 11), -1 if not applicable
int or string: Horizontal Radius of Containment
"""
if typecode(msg) < 5 or typecode(msg) > 22:
raise RuntimeError("%s: Not a surface position message (5<TC<8) \
airborne position message (8<TC<19), \
or airborne position with GNSS height (20<TC<22)" % msg)
raise RuntimeError(
"%s: Not a surface position message (5<TC<8), \
airborne position message (8<TC<19), \
or airborne position with GNSS height (20<TC<22)" % msg
)
tc = typecode(msg)
NIC = uncertainty.TC_NICv2_lookup[tc]
if nic_a in ['0', '1']:
nic_a = int(nic_a)
if 20<=tc<=22:
NICs = 0
else:
NICs = NICa*2 + NICbc
if nic_b in ['0', '1']:
nic_b = int(nic_b)
if isinstance(NIC, dict):
NIC = NIC[NICs]
if nic_c in ['0', '1']:
nic_c = int(nic_c)
Rc = uncertainty.NICv2[NIC][NICs]['Rc']
if tc in [0, 18, 22]:
nic = 0
elif tc == 17:
nic = 1
elif tc == 16:
if nic_a:
nic = 3
else:
nic = 2
elif tc == 15:
nic = 4
elif tc == 14:
nic = 5
elif tc == 13:
if nic_a:
nic = 6
else:
if nic_b:
nic = 6
else:
nic = 6
elif tc == 12:
nic = 7
elif tc == 11:
if nic_a:
nic = 9
else:
nic = 8
elif tc in [6, 10, 21]:
nic = 10
elif tc in [5, 9, 20]:
nic = 11
elif tc == 8:
if nic_a:
if nic_c:
nic = 7
else:
nic = 6
else:
if nic_c:
nic = 6
else:
nic = 0
elif tc == 7:
if nic_a:
nic = 9
else:
nic = 8
else:
nic = -1
return nic
return Rc
def nic_s(msg):
@ -404,7 +390,8 @@ def nac_p(msg):
msg (string): 28 bytes hexadecimal message string, TC = 29 or 31
Returns:
int: NACp number (0 or 1)
int or string: 95% horizontal accuracy bounds, Estimated Position Uncertainty
int or string: 95% vertical accuracy bounds, Vertical Estimated Position Uncertainty
"""
tc = typecode(msg)
@ -415,12 +402,14 @@ def nac_p(msg):
msgbin = common.hex2bin(msg)
if tc == 29:
nac_p = common.bin2int(msgbin[71:75])
NACp = common.bin2int(msgbin[71:75])
elif tc == 31:
nac_p = common.bin2int(msgbin[76:80])
NACp = common.bin2int(msgbin[76:80])
return nac_p
EPU = uncertainty.NACp[NACp]['EPU']
VEPU = uncertainty.NACp[NACp]['VEPU']
return EPU, VEPU
def nac_v(msg):
"""Calculate NACv, Navigation Accuracy Category - Velocity
@ -429,7 +418,8 @@ def nac_v(msg):
msg (string): 28 bytes hexadecimal message string, TC = 19
Returns:
int: NACv number (from 0 to 4), -1 if not applicable
int or string: 95% horizontal accuracy bounds for velocity, Horizontal Figure of Merit
int or string: 95% vertical accuracy bounds for velocity, Vertical Figure of Merit
"""
tc = typecode(msg)
@ -437,8 +427,12 @@ def nac_v(msg):
raise RuntimeError("%s: Not an airborne velocity message, expecting TC = 19" % msg)
msgbin = common.hex2bin(msg)
nac_v = common.bin2int(msgbin[42:45])
return nac_v
NACv = common.bin2int(msgbin[42:45])
HFOMr = uncertainty.NACv[NACv]['HFOMr']
VFOMr = uncertainty.NACv[NACv]['VFOMr']
return HFOMr, VFOMr
def sil(msg, version):
@ -448,7 +442,9 @@ def sil(msg, version):
msg (string): 28 bytes hexadecimal message string with TC = 29, 31
Returns:
(int, int): sil number and sil supplement (only for v2)
int or string: Probability of exceeding Horizontal Radius of Containment RCu
int or string: Probability of exceeding Vertical Integrity Containment Region VPL
string: SIL supplement based on per "hour" or "sample", or 'unknown'
"""
tc = typecode(msg)
@ -459,16 +455,24 @@ def sil(msg, version):
msgbin = common.hex2bin(msg)
if tc == 29:
sil = common.bin2int(msgbin[76:78])
SIL = common.bin2int(msgbin[76:78])
elif tc == 31:
sil = common.bin2int(msg[82:84])
SIL = common.bin2int(msgbin[82:84])
PE_RCu = uncertainty.SIL[SIL]['PE_RCu']
PE_VPL = uncertainty.SIL[SIL]['PE_VPL']
sil_sup = None
base = 'unknown'
if version == 2:
if version == 29:
sil_sup = common.bin2int(msgbin[39])
elif version == 31:
sil_sup = common.bin2int(msgbin[86])
if tc == 29:
SIL_SUP = common.bin2int(msgbin[39])
elif tc == 31:
SIL_SUP = common.bin2int(msgbin[86])
if SIL_SUP == 0:
base = "hour"
elif SIL_SUP == 1:
base = "sample"
return sil, sil_sup
return PE_RCu, PE_VPL, base

@ -0,0 +1,120 @@
NA = None
TC_NUCp_lookup = {
0:0, 5:9, 6:8, 7:7, 8:6,
9:9, 10:8, 11:7, 12:6, 13:5, 14:4, 15:3, 16:2, 17:1, 18:0,
20:9, 21:8, 22:0
}
TC_NICv1_lookup = {
5:11, 6:10, 7:9, 8:0,
9:11, 10:10, 11:{1:9, 0:8}, 12:7, 13:6, 14:5, 15:4, 16:{1:3, 0:2}, 17:1, 18:0,
20:11, 21:10, 22:0
}
TC_NICv2_lookup = {
5:11, 6:10, 7:{2:9, 0:8}, 8:{3:7, 2:6, 1:6, 0:0},
9:11, 10:10, 11:{3:9, 0:8}, 12:7, 13:6, 14:5, 15:4, 16:{3:3, 0:2}, 17:1, 18:0,
20:11, 21:10, 22:0
}
NUCp = {
9: {'HPL':7.5, 'RCu':3, 'RCv':4},
8: {'HPL':25, 'RCu':10, 'RCv':15},
7: {'HPL':185, 'RCu':93, 'RCv':NA},
6: {'HPL':370, 'RCu':185, 'RCv':NA},
5: {'HPL':926, 'RCu':463, 'RCv':NA},
4: {'HPL':1852, 'RCu':926, 'RCv':NA},
3: {'HPL':3704, 'RCu':1852, 'RCv':NA},
2: {'HPL':18520, 'RCu':9260, 'RCv':NA},
1: {'HPL':37040, 'RCu':18520, 'RCv':NA},
0: {'HPL':NA, 'RCu':NA, 'RCv':NA},
}
NUCv = {
0: {'HVE':NA, 'VVE':NA},
1: {'HVE':10, 'VVE':15.2},
2: {'HVE':3, 'VVE':4.5},
3: {'HVE':1, 'VVE':1.5},
4: {'HVE':0.3, 'VVE':0.46},
}
NACp = {
11: {'EPU': 3, 'VEPU': 4},
10: {'EPU': 10, 'VEPU': 15},
9: {'EPU': 30, 'VEPU': 45},
8: {'EPU': 93, 'VEPU': NA},
7: {'EPU': 185, 'VEPU': NA},
6: {'EPU': 556, 'VEPU': NA},
5: {'EPU': 926, 'VEPU': NA},
4: {'EPU': 1852, 'VEPU': NA},
3: {'EPU': 3704, 'VEPU': NA},
2: {'EPU': 7408, 'VEPU': NA},
1: {'EPU': 18520, 'VEPU': NA},
0: {'EPU': NA, 'VEPU': NA},
}
NACv = {
0: {'HFOMr':NA, 'VFOMr':NA},
1: {'HFOMr':10, 'VFOMr':15.2},
2: {'HFOMr':3, 'VFOMr':4.5},
3: {'HFOMr':1, 'VFOMr':1.5},
4: {'HFOMr':0.3, 'VFOMr':0.46},
}
SIL = {
3: {'PE_RCu': 1e-7, 'PE_VPL': 2e-7},
2: {'PE_RCu': 1e-5, 'PE_VPL': 1e-5},
1: {'PE_RCu': 1e-3, 'PE_VPL': 1e-3},
0: {'PE_RCu': NA, 'PE_VPL': NA},
}
NICv1 = {
# NIC is used as the index at second Level
11: {0: {'Rc': 7.5, 'VPL': 11}},
10: {0: {'Rc': 25, 'VPL': 37.5}},
9: {1: {'Rc': 75, 'VPL': 112}},
8: {0: {'Rc': 185, 'VPL': NA}},
7: {0: {'Rc': 370, 'VPL': NA}},
6: {
0: {'Rc': 926, 'VPL': NA},
1: {'Rc': 1111, 'VPL': NA},
},
5: {0: {'Rc': 1852, 'VPL': NA}},
4: {0: {'Rc': 3702, 'VPL': NA}},
3: {1: {'Rc': 7408, 'VPL': NA}},
2: {0: {'Rc': 14008, 'VPL': NA}},
1: {0: {'Rc': 37000, 'VPL': NA}},
0: {0: {'Rc': NA, 'VPL': NA}},
}
NICv2 = {
# Decimal value of [NICa NICb/NICc] is used as the index at second Level
11: {0: {'Rc': 7.5}},
10: {0: {'Rc': 25}},
9: {
2: {'Rc': 75},
3: {'Rc': 75},
},
8: {0: {'Rc': 185}},
7: {
0: {'Rc': 370},
2: {'Rc': 370},
},
6: {
0: {'Rc': 926},
1: {'Rc': 556},
2: {'Rc': 556},
3: {'Rc': 1111},
},
5: {0: {'Rc': 1852}},
4: {0: {'Rc': 3702}},
3: {2: {'Rc': 7408}},
2: {0: {'Rc': 14008}},
1: {0: {'Rc': 37000}},
0: {0: {'Rc': NA}},
}

@ -22,6 +22,7 @@ parser.add_argument('--server', help='server address or IP', required=True)
parser.add_argument('--port', help='raw data port', required=True)
parser.add_argument('--rawtype', help='beast or avr', required=True)
parser.add_argument('--latlon', help='receiver position', nargs=2, metavar=('LAT', 'LON'), required=True)
parser.add_argument('--show-uncertainty', help='display uncertainty indicators', required=False)
args = parser.parse_args()
SERVER = args.server

@ -17,10 +17,19 @@ COLUMNS = [
('trk', 10),
('hdg', 10),
('ver', 4),
('NIC', 5),
('NACv', 5),
('NACp', 5),
('SIL', 5),
('HPL', 5),
('RCu', 5),
('RCv', 5),
('HVE', 5),
('VVE', 5),
('Rc', 4),
('VPL', 5),
('EPU', 5),
('VEPU', 6),
('HFOMr', 7),
('VFOMr', 7),
('PE_RCu', 8),
('PE_VPL', 8),
('live', 6),
]

@ -48,10 +48,19 @@ class Stream():
'mach': None,
'hdg': None,
'ver' : None,
'NIC' : None,
'NACp' : None,
'NACv' : None,
'SIL' : None
'HPL' : None,
'RCu' : None,
'RCv' : None,
'HVE' : None,
'VVE' : None,
'Rc' : None,
'VPL' : None,
'EPU' : None,
'VEPU' : None,
'HFOMr' : None,
'VFOMr' : None,
'PE_RCu' : None,
'PE_VPL' : None,
}
self.acs[icao]['live'] = int(t)
@ -112,26 +121,35 @@ class Stream():
# Uncertainty & accuracy
ac = self.acs[icao]
if 9 <= tc <= 18:
ac['nic_bc'] = pms.adsb.nic_b(msg)
if (5 <= tc <= 8) or (9 <= tc <= 18) or (20 <= tc <= 22):
ac['HPL'], ac['RCu'], ac['RCv'] = pms.adsb.nuc_p(msg)
if (ac['ver'] == 1) and ('nic_s' in ac.keys()):
self.acs[icao]['NIC'] = pms.adsb.nic_v1(msg, ac['nic_s'])
elif (ac['ver'] == 2) and ('nic_a' in ac.keys()) and ('nic_b' in ac.keys()):
self.acs[icao]['NIC'] = pms.adsb.nic_v2(msg, ac['nic_a'], ac['nic_b'], ac['nic_c'])
ac['Rc'], ac['VPL'] = pms.adsb.nic_v1(msg, ac['nic_s'])
elif (ac['ver'] == 2) and ('nic_a' in ac.keys()) and ('nic_bc' in ac.keys()):
ac['Rc'] = pms.adsb.nic_v2(msg, ac['nic_a'], ac['nic_bc'])
if tc == 19:
ac['HVE'], ac['VVE'] = pms.adsb.nuc_v(msg)
if ac['ver'] in [1, 2]:
self.acs[icao]['NACv'] = pms.adsb.nac_v(msg)
ac['EPU'], ac['VEPU'] = pms.adsb.nac_v(msg)
if tc == 29:
if ac['ver'] != None:
self.acs[icao]['SIL'], self.acs[icao]['sil_s'] = pms.adsb.sil(msg, ac['ver'])
self.acs[icao]['NACp'] = pms.adsb.nac_p(msg)
ac['PE_RCu'], ac['PE_VPL'], ac['base'] = pms.adsb.sil(msg, ac['ver'])
ac['HFOMr'], ac['VFOMr'] = pms.adsb.nac_p(msg)
if tc == 31:
self.acs[icao]['ver'] = pms.adsb.version(msg)
self.acs[icao]['SIL'] = pms.adsb.version(msg)
self.acs[icao]['NACp'] = pms.adsb.nac_p(msg)
if self.acs[icao]['ver'] == 1:
self.acs[icao]['nic_s'] = pms.adsb.nic_s(msg)
elif self.acs[icao]['ver'] == 2:
self.acs[icao]['nic_a'], self.acs[icao]['nic_c'] = pms.adsb.nic_a_c(msg)
ac['ver'] = pms.adsb.version(msg)
ac['HFOMr'], ac['VFOMr'] = pms.adsb.nac_p(msg)
ac['PE_RCu'], ac['PE_VPL'], ac['sil_base'] = pms.adsb.sil(msg, ac['ver'])
if ac['ver'] == 1:
ac['nic_s'] = pms.adsb.nic_s(msg)
elif ac['ver'] == 2:
ac['nic_a'], ac['nic_bc'] = pms.adsb.nic_a_c(msg)
# process commb message

@ -64,16 +64,16 @@ def test_adsb_velocity():
assert adsb.altitude_diff('8D485020994409940838175B284F') == 550
def test_nic():
assert adsb.nic('8D3C70A390AB11F55B8C57F65FE6') == 0
assert adsb.nic('8DE1C9738A4A430B427D219C8225') == 1
assert adsb.nic('8D44058880B50006B1773DC2A7E9') == 2
assert adsb.nic('8D44058881B50006B1773DC2A7E9') == 3
assert adsb.nic('8D4AB42A78000640000000FA0D0A') == 4
assert adsb.nic('8D4405887099F5D9772F37F86CB6') == 5
assert adsb.nic('8D4841A86841528E72D9B472DAC2') == 6
assert adsb.nic('8D44057560B9760C0B840A51C89F') == 7
assert adsb.nic('8D40621D58C382D690C8AC2863A7') == 8
assert adsb.nic('8F48511C598D04F12CCF82451642') == 9
assert adsb.nic('8DA4D53A50DBF8C6330F3B35458F') == 10
assert adsb.nic('8D3C4ACF4859F1736F8E8ADF4D67') == 11
# def test_nic():
# assert adsb.nic('8D3C70A390AB11F55B8C57F65FE6') == 0
# assert adsb.nic('8DE1C9738A4A430B427D219C8225') == 1
# assert adsb.nic('8D44058880B50006B1773DC2A7E9') == 2
# assert adsb.nic('8D44058881B50006B1773DC2A7E9') == 3
# assert adsb.nic('8D4AB42A78000640000000FA0D0A') == 4
# assert adsb.nic('8D4405887099F5D9772F37F86CB6') == 5
# assert adsb.nic('8D4841A86841528E72D9B472DAC2') == 6
# assert adsb.nic('8D44057560B9760C0B840A51C89F') == 7
# assert adsb.nic('8D40621D58C382D690C8AC2863A7') == 8
# assert adsb.nic('8F48511C598D04F12CCF82451642') == 9
# assert adsb.nic('8DA4D53A50DBF8C6330F3B35458F') == 10
# assert adsb.nic('8D3C4ACF4859F1736F8E8ADF4D67') == 11

Loading…
Cancel
Save