Merge branch 'master' into pr-71/tuftedocelot/acas

This commit is contained in:
Junzi Sun 2020-05-23 21:53:51 +02:00
commit c25c9d6b96
11 changed files with 343 additions and 234 deletions

View File

@ -8,6 +8,12 @@ ext:
python setup.py build_ext --inplace
test:
make clean
@echo ""
@echo "[Test with py_common]"
python -m pytest tests
@echo ""
@echo "[Test with c_common]"
python setup.py build_ext --inplace
python -m pytest tests

View File

@ -17,8 +17,12 @@ cpdef bint is_icao_assigned(str icao)
cpdef int typecode(str msg)
cpdef int cprNL(double lat)
cpdef str idcode(str msg)
cpdef str squawk(str binstr)
cpdef int altcode(str msg)
cpdef int altitude(str binstr)
cpdef str data(str msg)
cpdef bint allzeros(str msg)

View File

@ -160,17 +160,7 @@ cpdef long floor(double x):
return <long> c_floor(x)
cpdef str icao(str msg):
"""Calculate the ICAO address from an Mode-S message.
Applicable only with DF4, DF5, DF20, DF21 messages.
Args:
msg (String): 28 bytes hexadecimal message string
Returns:
String: ICAO address in 6 bytes hexadecimal string
"""
"""Calculate the ICAO address from an Mode-S message."""
cdef unsigned char DF = df(msg)
cdef long c0, c1
@ -217,14 +207,7 @@ cpdef bint is_icao_assigned(str icao):
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef int typecode(str msg):
"""Type code of ADS-B message
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
int: type code number
"""
"""Type code of ADS-B message"""
if df(msg) not in (17, 18):
return -1
# return None
@ -253,45 +236,41 @@ cpdef int cprNL(double lat):
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef str idcode(str msg):
"""Compute identity (squawk code).
Applicable only for DF5 or DF21 messages, bit 20-32.
credit: @fbyrkjeland
Args:
msg (String): 28 bytes hexadecimal message string
Returns:
string: squawk code
"""
"""Compute identity (squawk code)."""
if df(msg) not in [5, 21]:
raise RuntimeError("Message must be Downlink Format 5 or 21.")
cdef bytearray _mbin = bytearray(hex2bin(msg).encode())
squawk_code = squawk(hex2bin(msg)[19:32])
return squawk_code
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef str squawk(str binstr):
"""Compute identity (squawk code)."""
if len(binstr) != 13 or set(binstr) != set('01'):
raise RuntimeError("Input must be 13 bits binary string")
cdef bytearray _mbin = bytearray(binstr.encode())
cdef unsigned char[:] mbin = _mbin
cdef bytearray _idcode = bytearray(4)
cdef unsigned char[:] idcode = _idcode
cdef unsigned char C1 = mbin[19]
cdef unsigned char A1 = mbin[20]
cdef unsigned char C2 = mbin[21]
cdef unsigned char A2 = mbin[22]
cdef unsigned char C4 = mbin[23]
cdef unsigned char A4 = mbin[24]
# _ = mbin[25]
cdef unsigned char B1 = mbin[26]
cdef unsigned char D1 = mbin[27]
cdef unsigned char B2 = mbin[28]
cdef unsigned char D2 = mbin[29]
cdef unsigned char B4 = mbin[30]
cdef unsigned char D4 = mbin[31]
# byte1 = int(A4 + A2 + A1, 2)
# byte2 = int(B4 + B2 + B1, 2)
# byte3 = int(C4 + C2 + C1, 2)
# byte4 = int(D4 + D2 + D1, 2)
cdef unsigned char C1 = mbin[0]
cdef unsigned char A1 = mbin[1]
cdef unsigned char C2 = mbin[2]
cdef unsigned char A2 = mbin[3]
cdef unsigned char C4 = mbin[4]
cdef unsigned char A4 = mbin[5]
# X = mbin[6]
cdef unsigned char B1 = mbin[7]
cdef unsigned char D1 = mbin[8]
cdef unsigned char B2 = mbin[9]
cdef unsigned char D2 = mbin[10]
cdef unsigned char B4 = mbin[11]
cdef unsigned char D4 = mbin[12]
idcode[0] = int_to_char((char_to_int(A4)*2 + char_to_int(A2))*2 + char_to_int(A1))
idcode[1] = int_to_char((char_to_int(B4)*2 + char_to_int(B2))*2 + char_to_int(B1))
@ -300,68 +279,68 @@ cpdef str idcode(str msg):
return _idcode.decode()
#return str(byte1) + str(byte2) + str(byte3) + str(byte4)
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef int altcode(str msg):
"""Compute the altitude.
Applicable only for DF4 or DF20 message, bit 20-32.
credit: @fbyrkjeland
Args:
msg (String): 28 bytes hexadecimal message string
Returns:
int: altitude in ft
"""
"""Compute the altitude."""
if df(msg) not in [0, 4, 16, 20]:
raise RuntimeError("Message must be Downlink Format 0, 4, 16, or 20.")
# Altitude code, bit 20-32
cdef bytearray _mbin = bytearray(hex2bin(msg).encode())
alt = altitude(hex2bin(msg)[19:32])
return alt
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef int altitude(str binstr):
if len(binstr) != 13 or set(binstr) != set('01'):
raise RuntimeError("Input must be 13 bits binary string")
cdef bytearray _mbin = bytearray(binstr.encode())
cdef unsigned char[:] mbin = _mbin
cdef char mbit = mbin[25] # M bit: 26
cdef char qbit = mbin[27] # Q bit: 28
cdef char Mbit = binstr[6]
cdef char Qbit = binstr[8]
cdef int alt = 0
cdef bytearray vbin
cdef bytearray _graybytes = bytearray(11)
cdef unsigned char[:] graybytes = _graybytes
if mbit == 48: # unit in ft, "0" -> 48
if qbit == 49: # 25ft interval, "1" -> 49
vbin = _mbin[19:25] + _mbin[26:27] + _mbin[28:32]
if bin2int(binstr) == 0:
# altitude unknown or invalid
alt = -9999
elif Mbit == 48: # unit in ft, "0" -> 48
if Qbit == 49: # 25ft interval, "1" -> 49
vbin = _mbin[:6] + _mbin[7:8] + _mbin[9:]
alt = bin2int(vbin.decode()) * 25 - 1000
if qbit == 48: # 100ft interval, above 50175ft, "0" -> 48
graybytes[8] = mbin[19]
graybytes[2] = mbin[20]
graybytes[9] = mbin[21]
graybytes[3] = mbin[22]
graybytes[10] = mbin[23]
graybytes[4] = mbin[24]
# _ = mbin[25]
graybytes[5] = mbin[26]
# cdef char D1 = mbin[27] # always zero
graybytes[6] = mbin[28]
graybytes[0] = mbin[29]
graybytes[7] = mbin[30]
graybytes[1] = mbin[31]
# graybytes = D2 + D4 + A1 + A2 + A4 + B1 + B2 + B4 + C1 + C2 + C4
if Qbit == 48: # 100ft interval, above 50175ft, "0" -> 48
graybytes[8] = mbin[0]
graybytes[2] = mbin[1]
graybytes[9] = mbin[2]
graybytes[3] = mbin[3]
graybytes[10] = mbin[4]
graybytes[4] = mbin[5]
# M = mbin[6]
graybytes[5] = mbin[7]
# Q = mbin[8]
graybytes[6] = mbin[9]
graybytes[0] = mbin[10]
graybytes[7] = mbin[11]
graybytes[1] = mbin[12]
alt = gray2alt(_graybytes.decode())
if mbit == 49: # unit in meter, "1" -> 49
vbin = _mbin[19:25] + _mbin[26:31]
elif Mbit == 49: # unit in meter, "1" -> 49
vbin = _mbin[:6] + _mbin[7:]
alt = int(bin2int(vbin.decode()) * 3.28084) # convert to ft
return alt
cpdef int gray2alt(str codestr):
cdef str gc500 = codestr[:8]
cdef int n500 = gray2int(gc500)
@ -400,15 +379,7 @@ cpdef str data(str msg):
cpdef bint allzeros(str msg):
"""Check if the data bits are all zeros.
Args:
msg (String): 28 bytes hexadecimal message string
Returns:
bool: True or False
"""
"""Check if the data bits are all zeros."""
d = hex2bin(data(msg))
if bin2int(d) > 0:

View File

@ -1,4 +1,4 @@
def tell(msg):
def tell(msg: str) -> None:
from pyModeS import common, adsb, commb, bds
def _print(label, value, unit=None):

View File

@ -28,6 +28,7 @@ from pyModeS.decoder.bds.bds06 import (
)
from pyModeS.decoder.bds.bds08 import category, callsign
from pyModeS.decoder.bds.bds09 import airborne_velocity, altitude_diff
from pyModeS.decoder.bds.bds61 import is_emergency, emergency_state, emergency_squawk
def df(msg):

View File

@ -139,17 +139,13 @@ def altitude(msg):
raise RuntimeError("%s: Not a airborn position message" % msg)
mb = common.hex2bin(msg)[32:]
altbin = mb[8:20]
if tc < 19:
# barometric altitude
q = mb[15]
if q:
n = common.bin2int(mb[8:15] + mb[16:20])
alt = n * 25 - 1000
altcode = altbin[0:6] + "0" + altbin[6:]
else:
alt = None
else:
# GNSS altitude, meters -> feet
alt = common.bin2int(mb[8:20]) * 3.28084
altcode = altbin[0:6] + "0" + altbin[6:]
alt = common.altitude(altcode)
return alt

View File

@ -0,0 +1,83 @@
# ------------------------------------------
# BDS 6,1
# ADS-B TC=28
# Aircraft Airborne status
# ------------------------------------------
from pyModeS import common
def is_emergency(msg: str) -> bool:
"""Check if the aircraft is reporting an emergency.
Non-emergencies are either a subtype of zero (no information) or
subtype of one and a value of zero (no emergency).
Subtype = 2 indicates an ACAS RA broadcast, look in BDS 3,0
:param msg: 28 bytes hexadecimal message string
:return: if the aircraft has declared an emergency
"""
if common.typecode(msg) != 28:
raise RuntimeError("%s: Not an airborne status message, expecting TC=28" % msg)
mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:8])
if subtype == 2:
raise RuntimeError("%s: Emergency message is ACAS-RA, not implemented")
emergency_state = common.bin2int(mb[8:11])
if subtype == 1 and emergency_state == 1:
return True
else:
return False
def emergency_state(msg: str) -> int:
"""Decode aircraft emergency state.
Value Meaning
----- -----------------------
0 No emergency
1 General emergency
2 Lifeguard/Medical
3 Minimum fuel
4 No communications
5 Unlawful communications
6-7 Reserved
:param msg: 28 bytes hexadecimal message string
:return: emergency state
"""
mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:8])
if subtype == 2:
raise RuntimeError("%s: Emergency message is ACAS-RA, not implemented")
emergency_state = common.bin2int(mb[8:11])
return emergency_state
def emergency_squawk(msg: str) -> str:
"""Decode squawk code.
Emergency value 1: squawk 7700.
Emergency value 4: squawk 7600.
Emergency value 5: squawk 7500.
:param msg: 28 bytes hexadecimal message string
:return: aircraft squawk code
"""
if common.typecode(msg) != 28:
raise RuntimeError("%s: Not an airborne status message, expecting TC=28" % msg)
msgbin = common.hex2bin(msg)
# construct the 13 bits Mode A ID code
idcode = msgbin[43:49] + "0" + msgbin[49:55]
squawk = common.squawk(idcode)
return squawk

View File

@ -282,6 +282,8 @@ class TcpClient(object):
# raise RuntimeError("test exception")
except zmq.error.Again:
continue
except Exception as e:
tb = traceback.format_exc()
exception_queue.put(tb)

View File

@ -1,44 +1,46 @@
from typing import Optional
import numpy as np
from textwrap import wrap
def hex2bin(hexstr):
def hex2bin(hexstr: str) -> str:
"""Convert a hexdecimal string to binary string, with zero fillings."""
num_of_bits = len(hexstr) * 4
binstr = bin(int(hexstr, 16))[2:].zfill(int(num_of_bits))
return binstr
def hex2int(hexstr):
def hex2int(hexstr: str) -> int:
"""Convert a hexdecimal string to integer."""
return int(hexstr, 16)
def bin2int(binstr):
def bin2int(binstr: str) -> int:
"""Convert a binary string to integer."""
return int(binstr, 2)
def bin2hex(binstr):
def bin2hex(binstr: str) -> str:
"""Convert a binary string to hexdecimal string."""
return "{0:X}".format(int(binstr, 2))
def df(msg):
def df(msg: str) -> int:
"""Decode Downlink Format value, bits 1 to 5."""
dfbin = hex2bin(msg[:2])
return min(bin2int(dfbin[0:5]), 24)
def crc(msg, encode=False):
def crc(msg: str, encode: bool = False) -> int:
"""Mode-S Cyclic Redundancy Check.
Detect if bit error occurs in the Mode-S message. When encode option is on,
the checksum is generated.
Args:
msg (string): 28 bytes hexadecimal message string
encode (bool): True to encode the date only and return the checksum
msg: 28 bytes hexadecimal message string
encode: True to encode the date only and return the checksum
Returns:
int: message checksum, or partity bits (encoder)
@ -75,7 +77,7 @@ def crc(msg, encode=False):
return result
def crc_legacy(msg, encode=False):
def crc_legacy(msg: str, encode: bool = False) -> int:
"""Mode-S Cyclic Redundancy Check. (Legacy code, 2x slow)."""
# the polynominal generattor code for CRC [1111111111111010000001001]
generator = np.array(
@ -103,7 +105,7 @@ def crc_legacy(msg, encode=False):
return reminder
def floor(x):
def floor(x: float) -> int:
"""Mode-S floor function.
Defined as the greatest integer value k, such that k <= x
@ -113,7 +115,7 @@ def floor(x):
return int(np.floor(x))
def icao(msg):
def icao(msg: str) -> Optional[str]:
"""Calculate the ICAO address from an Mode-S message.
Applicable only with DF4, DF5, DF20, DF21 messages.
@ -125,6 +127,7 @@ def icao(msg):
String: ICAO address in 6 bytes hexadecimal string
"""
addr: Optional[str]
DF = df(msg)
if DF in (11, 17, 18):
@ -139,7 +142,7 @@ def icao(msg):
return addr
def is_icao_assigned(icao):
def is_icao_assigned(icao: str) -> bool:
"""Check whether the ICAO address is assigned (Annex 10, Vol 3)."""
if (icao is None) or (not isinstance(icao, str)) or (len(icao) != 6):
return False
@ -168,7 +171,7 @@ def is_icao_assigned(icao):
return True
def typecode(msg):
def typecode(msg: str) -> Optional[int]:
"""Type code of ADS-B message
Args:
@ -184,7 +187,7 @@ def typecode(msg):
return bin2int(tcbin[0:5])
def cprNL(lat):
def cprNL(lat: float) -> int:
"""NL() function in CPR decoding."""
if np.isclose(lat, 0):
@ -202,11 +205,8 @@ def cprNL(lat):
return NL
def idcode(msg):
"""Compute identity (squawk code).
Applicable only for DF5 or DF21 messages, bit 20-32.
credit: @fbyrkjeland
def idcode(msg: str) -> str:
"""Compute identity code (squawk) encoded in DF5 or DF21 message.
Args:
msg (String): 28 bytes hexadecimal message string
@ -219,20 +219,37 @@ def idcode(msg):
raise RuntimeError("Message must be Downlink Format 5 or 21.")
mbin = hex2bin(msg)
idcodebin = mbin[19:32]
C1 = mbin[19]
A1 = mbin[20]
C2 = mbin[21]
A2 = mbin[22]
C4 = mbin[23]
A4 = mbin[24]
# _ = mbin[25]
B1 = mbin[26]
D1 = mbin[27]
B2 = mbin[28]
D2 = mbin[29]
B4 = mbin[30]
D4 = mbin[31]
return squawk(idcodebin)
def squawk(binstr: str) -> str:
"""Decode 13 bits identity (squawk) code.
Args:
binstr (String): 13 bits binary string
Returns:
int: altitude in ft
"""
if len(binstr) != 13 or set(binstr) != set("01"):
raise RuntimeError("Input must be 13 bits binary string")
C1 = binstr[0]
A1 = binstr[1]
C2 = binstr[2]
A2 = binstr[3]
C4 = binstr[4]
A4 = binstr[5]
# X = binstr[6]
B1 = binstr[7]
D1 = binstr[8]
B2 = binstr[9]
D2 = binstr[10]
B4 = binstr[11]
D4 = binstr[12]
byte1 = int(A4 + A2 + A1, 2)
byte2 = int(B4 + B2 + B1, 2)
@ -242,11 +259,8 @@ def idcode(msg):
return str(byte1) + str(byte2) + str(byte3) + str(byte4)
def altcode(msg):
"""Compute the altitude.
Applicable only for DF4 or DF20 message, bit 20-32.
credit: @fbyrkjeland
def altcode(msg: str) -> Optional[int]:
"""Compute altitude encoded in DF4 or DF20 message.
Args:
msg (String): 28 bytes hexadecimal message string
@ -255,50 +269,78 @@ def altcode(msg):
int: altitude in ft
"""
alt: Optional[int]
if df(msg) not in [0, 4, 16, 20]:
raise RuntimeError("Message must be Downlink Format 0, 4, 16, or 20.")
# Altitude code, bit 20-32
mbin = hex2bin(msg)
mbit = mbin[25] # M bit: 26
qbit = mbin[27] # Q bit: 28
altitude_code = mbin[19:32]
if mbit == "0": # unit in ft
if qbit == "1": # 25ft interval
vbin = mbin[19:25] + mbin[26] + mbin[28:32]
alt = altitude(altitude_code)
return alt
def altitude(binstr: str) -> Optional[int]:
"""Decode 13 bits altitude code.
Args:
binstr (String): 13 bits binary string
Returns:
int: altitude in ft
"""
alt: Optional[int]
if len(binstr) != 13 or set(binstr) != set("01"):
raise RuntimeError("Input must be 13 bits binary string")
Mbit = binstr[6]
Qbit = binstr[8]
if bin2int(binstr) == 0:
# altitude unknown or invalid
alt = None
elif Mbit == "0": # unit in ft
if Qbit == "1": # 25ft interval
vbin = binstr[:6] + binstr[7] + binstr[9:]
alt = bin2int(vbin) * 25 - 1000
if qbit == "0": # 100ft interval, above 50175ft
C1 = mbin[19]
A1 = mbin[20]
C2 = mbin[21]
A2 = mbin[22]
C4 = mbin[23]
A4 = mbin[24]
# _ = mbin[25]
B1 = mbin[26]
# D1 = mbin[27] # always zero
B2 = mbin[28]
D2 = mbin[29]
B4 = mbin[30]
D4 = mbin[31]
if Qbit == "0": # 100ft interval, above 50187.5ft
C1 = binstr[0]
A1 = binstr[1]
C2 = binstr[2]
A2 = binstr[3]
C4 = binstr[4]
A4 = binstr[5]
# M = binstr[6]
B1 = binstr[7]
# Q = binstr[8]
B2 = binstr[9]
D2 = binstr[10]
B4 = binstr[11]
D4 = binstr[12]
graystr = D2 + D4 + A1 + A2 + A4 + B1 + B2 + B4 + C1 + C2 + C4
alt = gray2alt(graystr)
if mbit == "1": # unit in meter
vbin = mbin[19:25] + mbin[26:31]
if Mbit == "1": # unit in meter
vbin = binstr[:6] + binstr[7:]
alt = int(bin2int(vbin) * 3.28084) # convert to ft
return alt
def gray2alt(codestr):
gc500 = codestr[:8]
def gray2alt(binstr: str) -> Optional[int]:
gc500 = binstr[:8]
n500 = gray2int(gc500)
# in 100-ft step must be converted first
gc100 = codestr[8:]
gc100 = binstr[8:]
n100 = gray2int(gc100)
if n100 in [0, 5, 6]:
@ -314,9 +356,9 @@ def gray2alt(codestr):
return alt
def gray2int(graystr):
def gray2int(binstr: str) -> int:
"""Convert greycode to binary."""
num = bin2int(graystr)
num = bin2int(binstr)
num ^= num >> 8
num ^= num >> 4
num ^= num >> 2
@ -324,12 +366,12 @@ def gray2int(graystr):
return num
def data(msg):
def data(msg: str) -> str:
"""Return the data frame in the message, bytes 9 to 22."""
return msg[8:-6]
def allzeros(msg):
def allzeros(msg: str) -> bool:
"""Check if the data bits are all zeros.
Args:
@ -347,7 +389,7 @@ def allzeros(msg):
return True
def wrongstatus(data, sb, msb, lsb):
def wrongstatus(data: str, sb: int, msb: int, lsb: int) -> bool:
"""Check if the status bit and field bits are consistency.
This Function is used for checking BDS code versions.

View File

@ -80,6 +80,12 @@ def test_adsb_velocity():
assert adsb.altitude_diff("8D485020994409940838175B284F") == 550
def test_adsb_emergency():
assert not adsb.is_emergency("8DA2C1B6E112B600000000760759")
assert adsb.emergency_state("8DA2C1B6E112B600000000760759") == 0
assert adsb.emergency_squawk("8DA2C1B6E112B600000000760759") == "6615"
# def test_nic():
# assert adsb.nic('8D3C70A390AB11F55B8C57F65FE6') == 0
# assert adsb.nic('8DE1C9738A4A430B427D219C8225') == 1

View File

@ -1,12 +1,11 @@
try:
from pyModeS import c_common
def test_conversions():
assert c_common.hex2bin("6E") == "01101110"
assert c_common.bin2hex("01101110") == "6E"
assert c_common.bin2hex("1101110") == "6E"
def test_crc_decode():
assert c_common.crc("8D406B902015A678D4D220AA4BDA") == 0
@ -23,27 +22,22 @@ def test_crc_decode():
assert c_common.crc("8d4ca251204994b1c36e60a5343d") == 16
assert c_common.crc("b0001718c65632b0a82040715b65") == 353333
def test_crc_encode():
parity = c_common.crc("8D406B902015A678D4D220AA4BDA", encode=True)
assert parity == 11160538
def test_icao():
assert c_common.icao("8D406B902015A678D4D220AA4BDA") == "406B90"
assert c_common.icao("A0001839CA3800315800007448D9") == "400940"
assert c_common.icao("A000139381951536E024D4CCF6B5") == "3C4DD2"
assert c_common.icao("A000029CFFBAA11E2004727281F1") == "4243D0"
def test_modes_altcode():
assert c_common.altcode("A02014B400000000000000F9D514") == 32300
def test_modes_idcode():
assert c_common.idcode("A800292DFFBBA9383FFCEB903D01") == "1346"
def test_graycode_to_altitude():
assert c_common.gray2alt("00000000010") == -1000
assert c_common.gray2alt("00000001010") == -500
@ -60,3 +54,7 @@ def test_graycode_to_altitude():
assert c_common.gray2alt("11011110100") == 73200
assert c_common.gray2alt("10000000011") == 126600
assert c_common.gray2alt("10000000001") == 126700
except:
pass