add EHS BDS44 decoding (not reliable though...)

pull/8/head
junzis 8 years ago
parent a08c91a3a1
commit b648f4e7a5

@ -206,13 +206,16 @@ def pbaro(msg):
# ------------------------------------------
# DF 20/21, BDS 4,0
# DF 20/21, BDS 4,4
# ------------------------------------------
def isBDS44(msg):
"""Check if a message is likely to be BDS code 4,4
Meteorological routine air report
WARNING: there is two different definition for BDS4,4. This part of the
decoder is likely to be wrong...
Args:
msg (String): 28 bytes hexadecimal message string
@ -225,9 +228,22 @@ def isBDS44(msg):
result = True
result = result & checkbits(d, 5, 6, 14) \
& checkbits(d, 15, 16, 23) & checkbits(d, 24, 25, 35) \
& checkbits(d, 36, 37, 47) & checkbits(d, 49, 50, 56)
# --- current version ---
result = result & checkbits(d, 5, 6, 23) \
& checkbits(d, 35, 36, 46) & checkbits(d, 47, 48, 49) \
& checkbits(d, 50, 51, 56)
# # --- revised future version ---
# result = result & checkbits(d, 5, 6, 14) \
# & checkbits(d, 15, 16, 23) & checkbits(d, 24, 25, 35) \
# & checkbits(d, 36, 37, 47) & checkbits(d, 49, 50, 56)
if wind(msg) and wind(msg)[0] > 250:
result &= False
# if temperature(msg):
# if temperature(msg) > 60 or temperature(msg) < -80:
# result &= False
return result
@ -248,10 +264,65 @@ def wind(msg):
return None
speed = util.bin2int(d[5:14]) # knots
direction = util.bin2int(d[15:23]) * 180.0 / 128.0 # degree
direction = util.bin2int(d[14:23]) * 180.0 / 256.0 # degree
return round(speed, 0), round(direction, 1)
def temperature(msg):
"""reported air temperature
Args:
msg (String): 28 bytes hexadecimal message (BDS44) string
Returns:
float: tmeperature in Celsius degree
"""
d = util.hex2bin(data(msg))
sign = int(d[23]) # 1 -> minus
temp = util.bin2int(d[24:34]) * 0.25 # celsius
temp = round(temp, 1)
return -1 * temp if sign else temp
def pressure(msg):
"""reported average static pressure
Args:
msg (String): 28 bytes hexadecimal message (BDS44) string
Returns:
int: static pressure in hPa
"""
d = util.hex2bin(data(msg))
status = int(d[34])
if not status:
return None
p = util.bin2int(d[35:46]) # hPa
return p
def humidity(msg):
"""reported humidity
Args:
msg (String): 28 bytes hexadecimal message (BDS44) string
Returns:
float: percentage of humidity, [0 - 100] %
"""
d = util.hex2bin(data(msg))
status = int(d[49])
if not status:
return None
hm = util.bin2int(d[50:56]) * 100.0 / 64 # %
return round(hm, 1)
# ------------------------------------------
# DF 20/21, BDS 5,0
# ------------------------------------------

@ -85,7 +85,3 @@ def floor(x):
eg.: floor(3.6) = 3, while floor(-3.6) = -4
"""
return int(math.floor(x))
def test():
print('test here')

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

@ -0,0 +1,79 @@
# If you get import error run with ipython
from pyModeS import adsb
from pyModeS import ehs
from pyModeS import util
# === Decode sample data file ===
def adsb_decode_all(n=None):
print("===== Decode all ADS-B sample data=====")
import csv
f = open('tests/sample_data_adsb.csv', 'rt')
msg0 = None
msg1 = None
for i, r in enumerate(csv.reader(f)):
if n and i > n:
break
ts = r[0]
m = r[1]
icao = adsb.icao(m)
tc = adsb.typecode(m)
if 1 <= tc <= 4:
print(ts, m, icao, tc, adsb.category(m), adsb.callsign(m))
if tc == 19:
print(ts, m, icao, tc, adsb.velocity(m))
if 5 <= tc <= 18:
if adsb.oe_flag(m):
msg1 = m
t1 = ts
else:
msg0 = m
t0 = ts
if msg0 and msg1:
pos = adsb.position(msg0, msg1, t0, t1)
alt = adsb.altitude(m)
print(ts, m, icao, tc, pos, alt)
def ehs_decode_all(n=None):
print("===== Decode all Mode-S EHS sample data=====")
import csv
f = open('tests/sample_data_ehs.csv', 'rt')
for i, r in enumerate(csv.reader(f)):
if n and i > n:
break
ts = r[1]
m = r[2]
icao = ehs.icao(m)
vBDS = ehs.BDS(m)
if vBDS:
if vBDS == "BDS20":
print(ts, m, icao, vBDS, ehs.callsign(m))
if vBDS == "BDS40":
print(ts, m, icao, vBDS, ehs.alt_mcp(m),
ehs.alt_fms(m), ehs.pbaro(m))
if vBDS == "BDS44":
print(ts, m, icao, vBDS, ehs.wind(m))
if vBDS == "BDS50":
print(ts, m, icao, vBDS, ehs.roll(m), ehs.track(m),
ehs.gs(m), ehs.rtrack(m), ehs.tas(m))
if vBDS == "BDS60":
print(ts, m, icao, vBDS, ehs.heading(m), ehs.ias(m),
ehs.mach(m), ehs.baro_vr(m), ehs.ins_vr(m))
else:
print(ts, m, icao, 'UNKNOWN')
if __name__ == '__main__':
adsb_decode_all(100)
ehs_decode_all(100)

@ -0,0 +1,63 @@
from pyModeS import adsb
# === TEST ADS-B package ===
def test_adsb_icao():
assert adsb.icao("8D406B902015A678D4D220AA4BDA") == "406B90"
def test_adsb_category():
assert adsb.category("8D406B902015A678D4D220AA4BDA") == 5
def test_adsb_callsign():
assert adsb.callsign("8D406B902015A678D4D220AA4BDA") == "EZY85MH_"
def test_adsb_position():
pos = adsb.position("8D40058B58C901375147EFD09357",
"8D40058B58C904A87F402D3B8C59",
1446332400, 1446332405)
assert pos == (49.81755, 6.08442)
def test_adsb_airborne_position_with_ref():
pos = adsb.airborne_position_with_ref("8D40058B58C901375147EFD09357",
49.0, 6.0)
assert pos == (49.82410, 6.06785)
pos = adsb.airborne_position_with_ref("8D40058B58C904A87F402D3B8C59",
49.0, 6.0)
assert pos == (49.81755, 6.08442)
def test_adsb_surface_position_with_ref():
pos = adsb.surface_position_with_ref("8FC8200A3AB8F5F893096B22B4A8",
-43.5, 172.5)
assert pos == (-43.48564, 175.87195)
def test_adsb_alt():
assert adsb.altitude("8D40058B58C901375147EFD09357") == 39000
def test_adsb_velocity():
vgs = adsb.velocity("8D485020994409940838175B284F")
vas = adsb.velocity("8DA05F219B06B6AF189400CBC33F")
assert vgs == (159, 182.9, -14, 'GS')
assert vas == (376, 244.0, -37, 'AS')
def test_nic():
assert adsb.nic('8D3C70A390AB11F55B8C57F65FE6') == 0
assert adsb.nic('8DE1C9738A4A430B427D219C8225') == 1
assert adsb.nic('8D44058880B50006B1773DC2A7E9') == 2
assert adsb.nic('8D44058881B50006B1773DC2A7E9') == 3
assert adsb.nic('8D4AB42A78000640000000FA0D0A') == 4
assert adsb.nic('8D4405887099F5D9772F37F86CB6') == 5
assert adsb.nic('8D4841A86841528E72D9B472DAC2') == 6
assert adsb.nic('8D44057560B9760C0B840A51C89F') == 7
assert adsb.nic('8D40621D58C382D690C8AC2863A7') == 8
assert adsb.nic('8F48511C598D04F12CCF82451642') == 9
assert adsb.nic('8DA4D53A50DBF8C6330F3B35458F') == 10
assert adsb.nic('8D3C4ACF4859F1736F8E8ADF4D67') == 11

@ -0,0 +1,42 @@
from pyModeS import ehs
def test_ehs_icao():
assert ehs.icao("A0001839CA3800315800007448D9") == '400940'
assert ehs.icao("A000139381951536E024D4CCF6B5") == '3C4DD2'
assert ehs.icao("A000029CFFBAA11E2004727281F1") == '4243D0'
def test_ehs_BDS():
assert ehs.BDS("A0001838201584F23468207CDFA5") == 'BDS20'
assert ehs.BDS("A0001839CA3800315800007448D9") == 'BDS40'
assert ehs.BDS("A000139381951536E024D4CCF6B5") == 'BDS50'
assert ehs.BDS("A000029CFFBAA11E2004727281F1") == 'BDS60'
assert ehs.BDS("A0281838CAE9E12FA03FFF2DDDE5") is None
def test_ehs_BDS20_callsign():
assert ehs.callsign("A000083E202CC371C31DE0AA1CCF") == 'KLM1017_'
assert ehs.callsign("A0001993202422F2E37CE038738E") == 'IBK2873_'
def test_ehs_BDS40_functions():
assert ehs.alt_mcp("A000029C85E42F313000007047D3") == 3008
assert ehs.alt_fms("A000029C85E42F313000007047D3") == 3008
assert ehs.pbaro("A000029C85E42F313000007047D3") == 1020.0
def test_ehs_BDS50_functions():
assert ehs.roll("A000139381951536E024D4CCF6B5") == 2.1
assert ehs.track("A000139381951536E024D4CCF6B5") == 114.3
assert ehs.gs("A000139381951536E024D4CCF6B5") == 438
assert ehs.rtrack("A000139381951536E024D4CCF6B5") == 0.125
assert ehs.tas("A000139381951536E024D4CCF6B5") == 424
def test_ehs_BDS60_functions():
assert ehs.heading("A000029CFFBAA11E2004727281F1") == 180.9
assert ehs.ias("A000029CFFBAA11E2004727281F1") == 336
assert ehs.mach("A000029CFFBAA11E2004727281F1") == 0.48
assert ehs.baro_vr("A000029CFFBAA11E2004727281F1") == 0
assert ehs.ins_vr("A000029CFFBAA11E2004727281F1") == -3648

@ -0,0 +1,15 @@
from pyModeS import util
def test_hex2bin():
assert util.hex2bin('6E406B') == "011011100100000001101011"
def test_crc_decode():
checksum = util.crc("8D406B902015A678D4D220AA4BDA")
assert checksum == "000000000000000000000000"
def test_crc_encode():
parity = util.crc("8D406B902015A678D4D220AA4BDA", encode=True)
assert util.hex2bin("AA4BDA") == parity
Loading…
Cancel
Save