update EHS BDS identification, add isBDS50or60() function

pull/15/head
Junzi Sun 7 years ago
parent 5697e5b88e
commit 4911e69171

@ -6,3 +6,5 @@ from .decoder import ehs
from .decoder import els
from .decoder import util
from .decoder import modes_common
from .extra import aero
from .extra import beastclient

@ -18,8 +18,9 @@ A python package for decoding ModeS (DF20, DF21) messages.
"""
from __future__ import absolute_import, print_function, division
from . import util, modes_common
from scipy.stats import multivariate_normal
import numpy as np
from pyModeS.decoder import util, modes_common
from pyModeS.extra import aero
def icao(msg):
return modes_common.icao(msg)
@ -972,100 +973,93 @@ def vr60ins(msg):
return roc
def BDS(msg):
"""Estimate the most likely BDS code of an message
def isBDS50or60(msg, spd_ref, trk_ref, alt_ref):
"""Use reference ground speed and trk to determine BDS50 and DBS60
Args:
msg (String): 28 bytes hexadecimal message string
spd_ref (float): reference speed (ADS-B ground speed), kts
trk_ref (float): reference track (ADS-B track angle), deg
alt_ref (float): reference altitude (ADS-B altitude), ft
Returns:
String or None: BDS version, or possible versions, or None if nothing matches.
"""
def vxy(v, angle):
vx = v * np.sin(np.deg2rad(angle))
vy = v * np.cos(np.deg2rad(angle))
return vx, vy
if isnull(msg):
if not (isBDS50(msg) and isBDS60(msg)):
return None
is17 = isBDS17(msg)
is20 = isBDS20(msg)
is40 = isBDS40(msg)
is44 = isBDS44(msg)
is44rev = isBDS44(msg, rev=True)
is50 = isBDS50(msg)
is53 = isBDS53(msg)
is60 = isBDS60(msg)
h50 = trk50(msg)
v50 = gs50(msg)
h50 = np.nan if h50 is None else h50
v50 = np.nan if v50 is None else v50
BDS = ["BDS17", "BDS20", "BDS40", "BDS44", "BDS44REV", "BDS50", "BDS53", "BDS60"]
isBDS = [is17, is20, is40, is44, is44rev, is50, is53, is60]
if sum(isBDS) == 0:
return None
elif sum(isBDS) == 1:
return BDS[isBDS.index(True)]
else:
bds_ = [bds for (bds, i) in zip(BDS, isBDS) if i]
return ','.join(bds_)
h60 = hdg60(msg)
m60 = mach60(msg)
i60 = ias60(msg)
h60 = np.nan if h60 is None else h60
m60 = np.nan if m60 is None else m60
i60 = np.nan if i60 is None else i60
XY5 = vxy(v50*aero.kts, h50)
XY6m = vxy(aero.mach2tas(m60, alt_ref*aero.ft), h60)
XY6i = vxy(aero.cas2tas(i60*aero.kts, alt_ref*aero.ft), h60)
allbds = ['BDS50', 'BDS60', 'BDS60']
def Vxy(V, angle):
Vx = V*np.sin(np.deg2rad(angle))
Vy = V*np.cos(np.deg2rad(angle))
return Vx, Vy
X = np.array([XY5, XY6m, XY6i])
Mu = np.array(vxy(spd_ref*aero.kts, trk_ref*aero.kts))
def BDSv2(msg, SPDref=np.nan, TRKref=np.nan, ALTref=np.nan):
"""Use probabilistic method to determine the most likely BDS code of an message
# compute Mahalanobis distance matrix
# Cov = [[20**2, 0], [0, 20**2]]
# mmatrix = np.sqrt(np.dot(np.dot(X-Mu, np.linalg.inv(Cov)), (X-Mu).T))
# dist = np.diag(mmatrix)
# since the covariance matrix is identity matrix,
# M-dist is same as eculidian distance
dist = np.linalg.norm(X-Mu, axis=1)
BDS = allbds[np.nanargmin(dist)]
return BDS
def BDS(msg):
"""Estimate the most likely BDS code of an message
Args:
msg (String): 28 bytes hexadecimal message string
SPDref (float): reference speed (for example ADS-B GS)
TRKref (float): reference track (for example ADS-B TRK)
ALTref (float): reference altitude (for example ADS-B altitude)
Returns:
String or None: BDS version, or possible versions, or None if nothing matches.
"""
BDS = pms.ehs.BDS(msg)
if isnull(msg):
return None
if type(BDS) != list:
return BDS
else:
if 'BDS53' in BDS:
BDS.remove('BDS53')
if 'BDS40' in BDS:
fms = pms.ehs.alt40fms(msg)
mcp = pms.ehs.alt40mcp(msg)
baro = pms.ehs.p40baro(msg)
if fms != None:
if (((fms % 100) <= 8) or ((fms % 100) >= 92)) and fms < 50500:
return 'BDS40'
if mcp != None:
if (((mcp % 100) <= 8) or ((mcp % 100) >= 92)) and mcp < 50500:
return 'BDS40'
if baro != None:
if (983 <= baro <= 1043): #1013 -+ 30
return 'BDS40'
if set(BDS).issubset(['BDS50', 'BDS60']):
if ~(np.isnan(SPDref) or np.isnan(TRKref) or np.isnan(ALTref)):
meanV = Vxy(SPDref, TRKref)
sigmaV = 20
covV = [[sigmaV**2, 0], [0, sigmaV**2]]
try: # Because register field is not available.
pBDS50 = multivariate_normal(meanV, covV).pdf(Vxy(pms.ehs.gs50(msg), pms.ehs.trk50(msg)))
pBDS60_1 = multivariate_normal(meanV, covV).pdf(Vxy(aero.mach2tas(pms.ehs.mach60(msg), ALTref*aero.ft)/aero.kts, pms.ehs.hdg60(msg)))
pBDS60_2 = multivariate_normal(meanV, covV).pdf(Vxy(aero.cas2tas(pms.ehs.ias60(msg)*aero.kts, ALTref*aero.ft)/aero.kts, pms.ehs.hdg60(msg)))
pBDS60 = max(pBDS60_1, pBDS60_2)
except:
return BDS
is17 = isBDS17(msg)
is20 = isBDS20(msg)
is40 = isBDS40(msg)
is44 = isBDS44(msg)
is44rev = isBDS44(msg, rev=True)
is50 = isBDS50(msg)
is53 = isBDS53(msg)
is60 = isBDS60(msg)
if pBDS50 + pBDS60 > 0: #Avoid None values
if pBDS50 > pBDS60:
return 'BDS50'
elif pBDS50 < pBDS60:
return 'BDS60'
allbds = np.array([
"BDS17", "BDS20", "BDS40", "BDS44", "BDS44REV",
"BDS50", "BDS53", "BDS60"
])
return BDS
isBDS = [is17, is20, is40, is44, is44rev, is50, is53, is60]
bds = ','.join(sorted(allbds[isBDS]))
if len(bds) == 0:
return None
else:
return bds

@ -0,0 +1 @@
from __future__ import absolute_import, print_function, division

@ -95,8 +95,8 @@ def distance(lat1, lon1, lat2, lon2, H=0):
theta1 = np.radians(lon1)
theta2 = np.radians(lon2)
cos = np.sin(phi1) * np.sin(phi2) * np.cos(theta1 - theta2) \
+ np.cos(phi1) * np.cos(phi2)
cos = np.sin(phi1) * np.sin(phi2) * np.cos(theta1 - theta2) + np.cos(phi1) * np.cos(phi2)
cos = np.where(cos>1, 1, cos)
arc = np.arccos(cos)
dist = arc * (r_earth + H) # meters, radius of earth

@ -5,11 +5,11 @@ import argparse
import curses
import numpy as np
import time
import pyModeS as pms
from threading import Lock
from client import BaseClient
from stream import Stream
from screen import Screen
from pyModeS.decoder import util
from pyModeS.extra.beastclient import BaseClient
from pyModeS.streamer.stream import Stream
from pyModeS.streamer.screen import Screen
LOCK = Lock()
ADSB_MSG = []
@ -43,7 +43,7 @@ class ModesClient(BaseClient):
if len(msg) < 28: # only process long messages
continue
df = pms.df(msg)
df = util.df(msg)
if df == 17 or df == 18:
local_buffer_adsb_msg.append(msg)

@ -1,8 +1,7 @@
from __future__ import print_function, division
from __future__ import absolute_import, print_function, division
import numpy as np
import time
import pyModeS as pms
from pyModeS.decoder import adsb, ehs
class Stream():
def __init__(self, lat0, lon0):
@ -32,8 +31,8 @@ class Stream():
# process adsb message
for t, msg in zip(adsb_ts, adsb_msgs):
icao = pms.adsb.icao(msg)
tc = pms.adsb.typecode(msg)
icao = adsb.icao(msg)
tc = adsb.typecode(msg)
if icao not in self.acs:
self.acs[icao] = {
@ -52,10 +51,10 @@ class Stream():
self.acs[icao]['t'] = t
if 1 <= tc <= 4:
self.acs[icao]['callsign'] = pms.adsb.callsign(msg)
self.acs[icao]['callsign'] = adsb.callsign(msg)
if (5 <= tc <= 8) or (tc == 19):
vdata = pms.adsb.velocity(msg)
vdata = adsb.velocity(msg)
if vdata is None:
continue
@ -69,7 +68,7 @@ class Stream():
self.acs[icao]['tv'] = t
if (5 <= tc <= 18):
oe = pms.adsb.oe_flag(msg)
oe = adsb.oe_flag(msg)
self.acs[icao][oe] = msg
self.acs[icao]['t'+str(oe)] = t
@ -77,12 +76,12 @@ class Stream():
# use single message decoding
rlat = self.acs[icao]['lat']
rlon = self.acs[icao]['lon']
latlon = pms.adsb.position_with_ref(msg, rlat, rlon)
latlon = adsb.position_with_ref(msg, rlat, rlon)
elif ('t0' in self.acs[icao]) and ('t1' in self.acs[icao]) and \
(abs(self.acs[icao]['t0'] - self.acs[icao]['t1']) < 10):
# use multi message decoding
try:
latlon = pms.adsb.position(
latlon = adsb.position(
self.acs[icao][0],
self.acs[icao][1],
self.acs[icao]['t0'],
@ -99,29 +98,29 @@ class Stream():
self.acs[icao]['tpos'] = t
self.acs[icao]['lat'] = latlon[0]
self.acs[icao]['lon'] = latlon[1]
self.acs[icao]['alt'] = pms.adsb.altitude(msg)
self.acs[icao]['alt'] = adsb.altitude(msg)
local_updated_acs_buffer.append(icao)
# process ehs message
for t, msg in zip(ehs_ts, ehs_msgs):
icao = pms.ehs.icao(msg)
icao = ehs.icao(msg)
if icao not in self.acs:
continue
bds = pms.ehs.BDS(msg)
bds = ehs.BDS(msg)
if bds == 'BDS50':
tas = pms.ehs.tas50(msg)
tas = ehs.tas50(msg)
if tas:
self.acs[icao]['t50'] = t
self.acs[icao]['tas'] = tas
elif bds == 'BDS60':
ias = pms.ehs.ias60(msg)
hdg = pms.ehs.hdg60(msg)
mach = pms.ehs.mach60(msg)
ias = ehs.ias60(msg)
hdg = ehs.hdg60(msg)
mach = ehs.mach60(msg)
if ias or hdg or mach:
self.acs[icao]['t60'] = t

@ -14,10 +14,14 @@ def test_df20alt():
def test_ehs_BDS():
assert ehs.BDS("A0001838201584F23468207CDFA5") == 'BDS20'
assert ehs.BDS("A0001839CA3800315800007448D9") == 'BDS40'
# assert ehs.BDS("A000031DBAA9DD18622C441330E9") == 'BDS44'
assert ehs.BDS("A000139381951536E024D4CCF6B5") == 'BDS50'
assert ehs.BDS("A00004128F39F91A7E27C46ADC21") == 'BDS60'
def test_ehs_isBDS50or60():
assert ehs.isBDS50or60("A0001838201584F23468207CDFA5", 0, 0, 0) == None
assert ehs.isBDS50or60("A0000000FFDA9517000464000000", 182, 237, 1250) == 'BDS50'
assert ehs.isBDS50or60("A0000000919A5927E23444000000", 413, 54, 18700) == 'BDS60'
def test_ehs_BDS20_callsign():
assert ehs.callsign("A000083E202CC371C31DE0AA1CCF") == 'KLM1017_'
assert ehs.callsign("A0001993202422F2E37CE038738E") == 'IBK2873_'

Loading…
Cancel
Save