reduce complexity and change default type to str

This commit is contained in:
Junzi Sun 2020-02-26 00:09:52 +01:00
parent dea7cde317
commit 555b2eea40
19 changed files with 131 additions and 1048 deletions

3
.gitignore vendored
View File

@ -5,6 +5,9 @@ __pycache__/
*.py[cod] *.py[cod]
.pytest_cache/ .pytest_cache/
#cython
.c
# C extensions # C extensions
*.so *.so

View File

@ -11,8 +11,8 @@ test:
python -m pytest python -m pytest
clean: clean:
find pyModeS/c_decoder -type f -name '*.c' -delete find pyModeS/decoder -type f -name '*.c' -delete
find pyModeS/c_decoder -type f -name '*.so' -delete find pyModeS/decoder -type f -name '*.so' -delete
find . | grep -E "(__pycache__|\.pyc|\.pyo$$)" | xargs rm -rf find . | grep -E "(__pycache__|\.pyc|\.pyo$$)" | xargs rm -rf
rm -rf *.egg-info rm -rf *.egg-info
rm -rf .pytest_cache rm -rf .pytest_cache

View File

@ -3,11 +3,16 @@ from __future__ import absolute_import, print_function, division
import os import os
import warnings import warnings
try:
from .decoder import c_common as common
from .decoder.c_common import *
except:
from .decoder import common
from .decoder.common import * from .decoder.common import *
from .decoder import tell from .decoder import tell
from .decoder import adsb from .decoder import adsb
from .decoder import commb from .decoder import commb
from .decoder import common
from .decoder import bds from .decoder import bds
from .extra import aero from .extra import aero
from .extra import tcpclient from .extra import tcpclient

View File

@ -1 +0,0 @@
*.c

View File

@ -1,222 +0,0 @@
# cython: language_level=3
"""ADS-B Wrapper.
The ADS-B wrapper also imports functions from the following modules:
- pyModeS.decoder.bds.bds05
Functions: ``airborne_position``, ``airborne_position_with_ref``, ``altitude``
- pyModeS.decoder.bds.bds06
Functions: ``surface_position``, ``surface_position_with_ref``, ``surface_velocity``
- pyModeS.decoder.bds.bds08
Functions: ``category``, ``callsign``
- pyModeS.decoder.bds.bds09
Functions: ``airborne_velocity``, ``altitude_diff``
"""
from libc.math cimport NAN as nan
from . cimport common
from .bds.bds05 import (
airborne_position,
airborne_position_with_ref,
altitude,
)
from .bds.bds06 import (
surface_position,
surface_position_with_ref,
surface_velocity,
)
from .bds.bds08 import category, callsign
from .bds.bds09 import airborne_velocity, altitude_diff
def icao(bytes msg):
return common.icao(msg)
def typecode(bytes msg):
return common.typecode(msg)
def position(bytes msg0 not None, bytes msg1 not None, double t0, double t1, double lat_ref=nan, double lon_ref=nan):
"""Decode position from a pair of even and odd position message
(works with both airborne and surface position messages)
Args:
msg0 (string): even message (28 bytes hexadecimal string)
msg1 (string): odd message (28 bytes hexadecimal string)
t0 (double): timestamps for the even message
t1 (double): timestamps for the odd message
Returns:
(float, float): (latitude, longitude) of the aircraft
"""
cdef int tc0 = typecode(msg0)
cdef int tc1 = typecode(msg1)
if 5 <= tc0 <= 8 and 5 <= tc1 <= 8:
if (lat_ref != lat_ref) or (lon_ref != lon_ref):
raise RuntimeError(
"Surface position encountered, a reference \
position lat/lon required. Location of \
receiver can be used."
)
else:
return surface_position(msg0, msg1, t0, t1, lat_ref, lon_ref)
elif 9 <= tc0 <= 18 and 9 <= tc1 <= 18:
# Airborne position with barometric height
return airborne_position(msg0, msg1, t0, t1)
elif 20 <= tc0 <= 22 and 20 <= tc1 <= 22:
# Airborne position with GNSS height
return airborne_position(msg0, msg1, t0, t1)
else:
raise RuntimeError("incorrect or inconsistant message types")
def position_with_ref(bytes msg not None, double lat_ref, double lon_ref):
"""Decode position with only one message,
knowing reference nearby location, such as previously
calculated location, ground station, or airport location, etc.
Works with both airborne and surface position messages.
The reference position shall be with in 180NM (airborne) or 45NM (surface)
of the true position.
Args:
msg (string): even message (28 bytes hexadecimal string)
lat_ref: previous known latitude
lon_ref: previous known longitude
Returns:
(float, float): (latitude, longitude) of the aircraft
"""
cdef int tc = typecode(msg)
if 5 <= tc <= 8:
return surface_position_with_ref(msg, lat_ref, lon_ref)
elif 9 <= tc <= 18 or 20 <= tc <= 22:
return airborne_position_with_ref(msg, lat_ref, lon_ref)
else:
raise RuntimeError("incorrect or inconsistant message types")
def altitude(bytes msg):
"""Decode aircraft altitude
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
int: altitude in feet
"""
cdef int tc = typecode(msg)
if tc < 5 or tc == 19 or tc > 22:
raise RuntimeError("%s: Not a position message" % msg)
if tc >= 5 and tc <= 8:
# surface position, altitude 0
return 0
cdef bytearray msgbin = common.hex2bin(msg)
cdef int q = common.char_to_int(msgbin[47])
cdef int n
cdef double alt
if q:
n = common.bin2int(msgbin[40:47] + msgbin[48:52])
alt = n * 25 - 1000
return alt
else:
return nan
def velocity(bytes msg, bint rtn_sources=False):
"""Calculate the speed, heading, and vertical rate
(handles both airborne or surface message)
Args:
msg (string): 28 bytes hexadecimal message string
rtn_source (boolean): If the function will return
the sources for direction of travel and vertical
rate. This will change the return value from a four
element array to a six element array.
Returns:
(int, float, int, string, string, string): speed (kt),
ground track or heading (degree),
rate of climb/descent (ft/min), speed type
('GS' for ground speed, 'AS' for airspeed),
direction source ('true_north' for ground track / true north
as refrence, 'mag_north' for magnetic north as reference),
rate of climb/descent source ('Baro' for barometer, 'GNSS'
for GNSS constellation).
In the case of surface messages, None will be put in place
for vertical rate and its respective sources.
"""
cdef int tc = typecode(msg)
if 5 <= tc <= 8:
return surface_velocity(msg, rtn_sources)
elif tc == 19:
return airborne_velocity(msg, rtn_sources)
else:
raise RuntimeError(
"incorrect or inconsistant message types, expecting 4<TC<9 or TC=19"
)
def speed_heading(bytes msg):
"""Get speed and ground track (or heading) from the velocity message
(handles both airborne or surface message)
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
(int, float): speed (kt), ground track or heading (degree)
"""
spd, trk_or_hdg, rocd, tag = velocity(msg)
return spd, trk_or_hdg
def oe_flag(bytes msg):
"""Check the odd/even flag. Bit 54, 0 for even, 1 for odd.
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
int: 0 or 1, for even or odd frame
"""
cdef bytearray msgbin = common.hex2bin(msg)
return common.char_to_int(msgbin[53])
def version(bytes msg):
"""ADS-B Version
Args:
msg (string): 28 bytes hexadecimal message string, TC = 31
Returns:
int: version number
"""
cdef int tc = typecode(msg)
if tc != 31:
raise RuntimeError(
"%s: Not a status operation message, expecting TC = 31" % msg
)
cdef bytearray msgbin = common.hex2bin(msg)
cdef int version = common.bin2int(msgbin[72:75])
return version

View File

@ -1,180 +0,0 @@
# ------------------------------------------
# BDS 0,5
# ADS-B TC=9-18
# Airborn position
# ------------------------------------------
# cython: language_level=3
cimport cython
from .. cimport common
from libc.math cimport NAN as nan
@cython.cdivision(True)
def airborne_position(bytes msg0 not None, bytes msg1 not None, double t0, double t1):
"""Decode airborn position from a pair of even and odd position message
Args:
msg0 (string): even message (28 bytes hexadecimal string)
msg1 (string): odd message (28 bytes hexadecimal string)
t0 (double): timestamps for the even message
t1 (double): timestamps for the odd message
Returns:
(float, float): (latitude, longitude) of the aircraft
"""
cdef bytearray mb0 = common.hex2bin(msg0)[32:]
cdef bytearray mb1 = common.hex2bin(msg1)[32:]
cdef unsigned char oe0 = common.char_to_int(mb0[21])
cdef unsigned char oe1 = common.char_to_int(mb1[21])
if oe0 == 0 and oe1 == 1:
pass
elif oe0 == 1 and oe1 == 0:
mb0, mb1 = mb1, mb0
t0, t1 = t1, t0
else:
raise RuntimeError("Both even and odd CPR frames are required.")
# 131072 is 2^17, since CPR lat and lon are 17 bits each.
cdef double cprlat_even = common.bin2int(mb0[22:39]) / 131072.0
cdef double cprlon_even = common.bin2int(mb0[39:56]) / 131072.0
cdef double cprlat_odd = common.bin2int(mb1[22:39]) / 131072.0
cdef double cprlon_odd = common.bin2int(mb1[39:56]) / 131072.0
cdef double air_d_lat_even = 360.0 / 60
cdef double air_d_lat_odd = 360.0 / 59
# compute latitude index 'j'
cdef long j = common.floor(59 * cprlat_even - 60 * cprlat_odd + 0.5)
cdef double lat_even = (air_d_lat_even * (j % 60 + cprlat_even))
cdef double lat_odd = (air_d_lat_odd * (j % 59 + cprlat_odd))
if lat_even >= 270:
lat_even = lat_even - 360
if lat_odd >= 270:
lat_odd = lat_odd - 360
# check if both are in the same latidude zone, exit if not
if common.cprNL(lat_even) != common.cprNL(lat_odd):
return nan
cdef int nl, ni, m
cdef double lat, lon
# compute ni, longitude index m, and longitude
if t0 > t1:
lat = lat_even
nl = common.cprNL(lat)
# ni = max(common.cprNL(lat) - 0, 1)
ni = common.cprNL(lat)
if ni < 1:
ni = 1
m = common.floor(cprlon_even * (nl - 1) - cprlon_odd * nl + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_even)
else:
lat = lat_odd
nl = common.cprNL(lat)
# ni = max(common.cprNL(lat) - 1, 1)
ni = common.cprNL(lat) - 1
if ni < 1:
ni = 1
m = common.floor(cprlon_even * (nl - 1) - cprlon_odd * nl + 0.5)
lon = (360.0 / ni) * (m % ni + cprlon_odd)
if lon > 180:
lon = lon - 360
return round(lat, 5), round(lon, 5)
@cython.cdivision(True)
def airborne_position_with_ref(bytes msg not None, double lat_ref, double lon_ref):
"""Decode airborne position with only one message,
knowing reference nearby location, such as previously calculated location,
ground station, or airport location, etc. The reference position shall
be with in 180NM of the true position.
Args:
msg (string): even message (28 bytes hexadecimal string)
lat_ref: previous known latitude
lon_ref: previous known longitude
Returns:
(float, float): (latitude, longitude) of the aircraft
"""
cdef bytearray mb = common.hex2bin(msg)[32:]
cdef double cprlat = common.bin2int(mb[22:39]) / 131072.0
cdef double cprlon = common.bin2int(mb[39:56]) / 131072.0
cdef unsigned char i = common.char_to_int(mb[21])
cdef double d_lat = 360.0 / 59 if i else 360.0 / 60
# https://docs.python.org/3/library/math.html#math.fmod
cdef double mod_lat = lat_ref % d_lat if lat_ref >= 0 else (lat_ref % d_lat + d_lat)
cdef long j = common.floor(lat_ref / d_lat) + common.floor(
0.5 + (mod_lat / d_lat) - cprlat
)
cdef double lat = d_lat * (j + cprlat)
cdef double d_lon, lon
cdef int ni = common.cprNL(lat) - i
if ni > 0:
d_lon = 360.0 / ni
else:
d_lon = 360.0
# https://docs.python.org/3/library/math.html#math.fmod
cdef double mod_lon = lon_ref % d_lon if lon_ref >= 0 else (lon_ref % d_lon + d_lon)
cdef int m = common.floor(lon_ref / d_lon) + common.floor(
0.5 + (mod_lon / d_lon) - cprlon
)
lon = d_lon * (m + cprlon)
return round(lat, 5), round(lon, 5)
cpdef double altitude(bytes msg):
"""Decode aircraft altitude
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
int: altitude in feet
"""
cdef int tc = common.typecode(msg)
if tc < 9 or tc == 19 or tc > 22:
raise RuntimeError("%s: Not a airborn position message" % msg)
cdef bytearray mb = common.hex2bin(msg)[32:]
cdef unsigned char q
cdef int n
cdef double alt
if tc < 19:
# barometric altitude
q = mb[15]
if q:
n = common.bin2int(mb[8:15] + mb[16:20])
alt = n * 25 - 1000
else:
alt = nan
else:
# GNSS altitude, meters -> feet
alt = common.bin2int(mb[8:20]) * 3.28084
return alt

View File

@ -1,236 +0,0 @@
# ------------------------------------------
# BDS 0,6
# ADS-B TC=5-8
# Surface position
# ------------------------------------------
# cython: language_level=3
cimport cython
from .. cimport common
from cpython cimport array
from libc.math cimport NAN as nan
import math
@cython.cdivision(True)
def surface_position(bytes msg0 not None, bytes msg1 not None, double t0, double t1, double lat_ref, double lon_ref):
"""Decode surface position from a pair of even and odd position message,
the lat/lon of receiver must be provided to yield the correct solution.
Args:
msg0 (string): even message (28 bytes hexadecimal string)
msg1 (string): odd message (28 bytes hexadecimal string)
t0 (double): timestamps for the even message
t1 (double): timestamps for the odd message
lat_ref (float): latitude of the receiver
lon_ref (float): longitude of the receiver
Returns:
(float, float): (latitude, longitude) of the aircraft
"""
cdef bytearray msgbin0 = common.hex2bin(msg0)
cdef bytearray msgbin1 = common.hex2bin(msg1)
# 131072 is 2^17, since CPR lat and lon are 17 bits each.
cdef double cprlat_even = common.bin2int(msgbin0[54:71]) / 131072.0
cdef double cprlon_even = common.bin2int(msgbin0[71:88]) / 131072.0
cdef double cprlat_odd = common.bin2int(msgbin1[54:71]) / 131072.0
cdef double cprlon_odd = common.bin2int(msgbin1[71:88]) / 131072.0
cdef double air_d_lat_even = 90.0 / 60
cdef double air_d_lat_odd = 90.0 / 59
# compute latitude index 'j'
cdef long j = common.floor(59 * cprlat_even - 60 * cprlat_odd + 0.5)
# solution for north hemisphere
cdef int j_mod_60 = j % 60 if j > 0 else (j % 60) + 60
cdef int j_mod_59 = j % 59 if j > 0 else (j % 59) + 59
cdef double lat_even_n = (air_d_lat_even * ((j_mod_60) + cprlat_even))
cdef double lat_odd_n = (air_d_lat_odd * ((j_mod_59) + cprlat_odd))
# solution for north hemisphere
cdef double lat_even_s = lat_even_n - 90.0
cdef double lat_odd_s = lat_odd_n - 90.0
# chose which solution corrispondes to receiver location
cdef double lat_even = lat_even_n if lat_ref > 0 else lat_even_s
cdef double lat_odd = lat_odd_n if lat_ref > 0 else lat_odd_s
# check if both are in the same latidude zone, rare but possible
if common.cprNL(lat_even) != common.cprNL(lat_odd):
return nan
cdef int nl, ni, m, m_mod_ni
cdef double lat, lon
# compute ni, longitude index m, and longitude
if t0 > t1:
lat = lat_even
nl = common.cprNL(lat_even)
# ni = max(common.cprNL(lat_even) - 0, 1)
ni = common.cprNL(lat_even)
if ni < 1:
ni = 1
m = common.floor(cprlon_even * (nl - 1) - cprlon_odd * nl + 0.5)
# https://docs.python.org/3/library/math.html#math.fmod
m_mod_ni = m % ni if ni > 0 else (m % ni) + ni
lon = (90.0 / ni) * (m_mod_ni + cprlon_even)
else:
lat = lat_odd
nl = common.cprNL(lat_odd)
# ni = max(common.cprNL(lat_odd) - 1, 1)
ni = common.cprNL(lat_odd) - 1
if ni < 1:
ni = 1
m = common.floor(cprlon_even * (nl - 1) - cprlon_odd * nl + 0.5)
# https://docs.python.org/3/library/math.html#math.fmod
m_mod_ni = m % ni if ni > 0 else (m % ni) + ni
lon = (90.0 / ni) * (m_mod_ni + cprlon_odd)
# four possible longitude solutions
# lons = [lon, lon + 90.0, lon + 180.0, lon + 270.0]
cdef array.array _lons = array.array(
'd', [lon, lon + 90.0, lon + 180.0, lon + 270.0]
)
cdef double[4] lons = _lons
# make sure lons are between -180 and 180
# lons = [(l + 180) % 360 - 180 for l in lons]
cdef int idxmin = 0
cdef float d_, delta = abs(lons[0] - lon_ref)
for i in range(1, 4):
lons[i] = (lons[i] + 180) % 360 - 180
d_ = abs(lons[i] - lon_ref)
if d_ < delta:
idxmin = i
delta = d_
# the closest solution to receiver is the correct one
# dls = [abs(lon_ref - l) for l in lons]
# imin = min(range(4), key=dls.__getitem__)
# lon = lons[imin]
lon = lons[idxmin]
return round(lat, 5), round(lon, 5)
@cython.cdivision(True)
def surface_position_with_ref(bytes msg not None, double lat_ref, double lon_ref):
"""Decode surface position with only one message,
knowing reference nearby location, such as previously calculated location,
ground station, or airport location, etc. The reference position shall
be with in 45NM of the true position.
Args:
msg (string): even message (28 bytes hexadecimal string)
lat_ref: previous known latitude
lon_ref: previous known longitude
Returns:
(float, float): (latitude, longitude) of the aircraft
"""
cdef bytearray mb = common.hex2bin(msg)[32:]
cdef double cprlat = common.bin2int(mb[22:39]) / 131072.0
cdef double cprlon = common.bin2int(mb[39:56]) / 131072.0
cdef unsigned char i = common.char_to_int(mb[21])
cdef double d_lat = 90.0 / 59 if i else 90.0 / 60
# https://docs.python.org/3/library/math.html#math.fmod
cdef double mod_lat = lat_ref % d_lat if lat_ref >= 0 else (lat_ref % d_lat + d_lat)
cdef long j = common.floor(lat_ref / d_lat) + common.floor(
0.5 + (mod_lat / d_lat) - cprlat
)
cdef double lat = d_lat * (j + cprlat)
cdef double d_lon, lon
cdef int ni = common.cprNL(lat) - i
if ni > 0:
d_lon = 90.0 / ni
else:
d_lon = 90.0
# https://docs.python.org/3/library/math.html#math.fmod
cdef double mod_lon = lon_ref % d_lon if lon_ref >= 0 else (lon_ref % d_lon + d_lon)
cdef int m = common.floor(lon_ref / d_lon) + common.floor(
0.5 + (mod_lon / d_lon) - cprlon
)
lon = d_lon * (m + cprlon)
return round(lat, 5), round(lon, 5)
@cython.cdivision(True)
def surface_velocity(bytes msg, bint rtn_sources=False):
"""Decode surface velocity from from a surface position message
Args:
msg (string): 28 bytes hexadecimal message string
rtn_source (boolean): If the function will return
the sources for direction of travel and vertical
rate. This will change the return value from a four
element array to a six element array.
Returns:
(int, float, int, string, string, None): speed (kt),
ground track (degree), None for rate of climb/descend (ft/min),
and speed type ('GS' for ground speed), direction source
('true_north' for ground track / true north as reference),
None rate of climb/descent source.
"""
if common.typecode(msg) < 5 or common.typecode(msg) > 8:
raise RuntimeError("%s: Not a surface message, expecting 5<TC<8" % msg)
cdef bytearray mb = common.hex2bin(msg)[32:]
cdef double trk
# ground track
cdef unsigned char trk_status = common.char_to_int(mb[12])
if trk_status == 1:
trk = common.bin2int(mb[13:20]) * 360.0 / 128.0
trk = round(trk, 1)
else:
trk = nan
# ground movment / speed
cdef long mov = common.bin2int(mb[5:12])
cdef double spd, step
cdef array.array _movs, _kts
cdef double[7] movs, kts
cdef Py_ssize_t i = 0
if mov == 0 or mov > 124:
spd = nan
elif mov == 1:
spd = 0
elif mov == 124:
spd = 175
else:
_movs = array.array('d', [2, 9, 13, 39, 94, 109, 124])
_kts = array.array('d', [0.125, 1, 2, 15, 70, 100, 175])
movs = _movs
kts = _kts
# i = next(m[0] for m in enumerate(movs) if m[1] > mov)
for i in range(7):
if movs[i] > mov:
break
step = (kts[i] - kts[i - 1]) * 1.0 / (movs[i] - movs[i - 1])
spd = kts[i - 1] + (mov - movs[i - 1]) * step
spd = round(spd, 2)
if rtn_sources:
return spd, trk, 0, "GS", "true_north", None
else:
return spd, trk, 0, "GS"

View File

@ -1,61 +0,0 @@
# cython: language_level=3
cimport cython
from .. cimport common
def category(bytes msg):
"""Aircraft category number
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
int: category number
"""
cdef int tc = common.typecode(msg)
if tc < 1 or tc > 4:
raise RuntimeError("%s: Not a identification message" % msg)
cdef bytearray msgbin = common.hex2bin(msg)
mebin = msgbin[32:87]
return common.bin2int(mebin[5:8])
def callsign(bytes msg):
"""Aircraft callsign
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
string: callsign
"""
cdef int tc = common.typecode(msg)
if tc < 1 or tc > 4:
raise RuntimeError("%s: Not a identification message" % msg)
cdef bytearray _chars = bytearray(
b"#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
)
cdef unsigned char[:] chars = _chars
cdef bytearray msgbin = common.hex2bin(msg)
cdef bytearray csbin = msgbin[40:96]
cdef bytearray _cs = bytearray(8)
cdef unsigned char[:] cs = _cs
cs[0] = chars[common.bin2int(csbin[0:6])]
cs[1] = chars[common.bin2int(csbin[6:12])]
cs[2] = chars[common.bin2int(csbin[12:18])]
cs[3] = chars[common.bin2int(csbin[18:24])]
cs[4] = chars[common.bin2int(csbin[24:30])]
cs[5] = chars[common.bin2int(csbin[30:36])]
cs[6] = chars[common.bin2int(csbin[36:42])]
cs[7] = chars[common.bin2int(csbin[42:48])]
# clean string, remove spaces and marks, if any.
# cs = cs.replace('_', '')
return _cs.decode().replace("#", "")

View File

@ -1,122 +0,0 @@
# cython: language_level=3
cimport cython
from ..common cimport char_to_int, typecode, hex2bin, bin2int
from libc.math cimport atan2, sqrt, pi, NAN as nan
@cython.cdivision(True)
def airborne_velocity(bytes msg, bint rtn_sources=False):
"""Calculate the speed, track (or heading), and vertical rate
Args:
msg (string): 28 bytes hexadecimal message string
rtn_source (boolean): If the function will return
the sources for direction of travel and vertical
rate. This will change the return value from a four
element array to a six element array.
Returns:
(int, float, int, string, string, string): speed (kt),
ground track or heading (degree),
rate of climb/descent (ft/min), speed type
('GS' for ground speed, 'AS' for airspeed),
direction source ('true_north' for ground track / true north
as refrence, 'mag_north' for magnetic north as reference),
rate of climb/descent source ('Baro' for barometer, 'GNSS'
for GNSS constellation).
"""
if typecode(msg) != 19:
raise RuntimeError("%s: Not a airborne velocity message, expecting TC=19" % msg)
cdef bytearray mb = hex2bin(msg)[32:]
cdef int subtype = bin2int(mb[5:8])
if bin2int(mb[14:24]) == 0 or bin2int(mb[25:35]) == 0:
return None
cdef int v_ew_sign, v_ew, v_ns_sign, v_ns, v_we, v_sn
cdef double spd, trk, trk_or_hdg, hdg
if subtype in (1, 2):
v_ew_sign = -1 if mb[13] == 49 else 1 # "1"
v_ew = bin2int(mb[14:24]) - 1 # east-west velocity
if subtype == 2: # Supersonic
v_ew *= 4
v_ns_sign = -1 if mb[24] == 49 else 1 # "1"
v_ns = bin2int(mb[25:35]) - 1 # north-south velocity
if subtype == 2: # Supersonic
v_ns *= 4
v_we = v_ew_sign * v_ew
v_sn = v_ns_sign * v_ns
spd = sqrt(v_sn * v_sn + v_we * v_we) # unit in kts
# spd = int(spd)
trk = atan2(v_we, v_sn)
# trk = math.degrees(trk) # convert to degrees
trk = trk * 180 / pi
trk = trk if trk >= 0 else trk + 360 # no negative val
tag = "GS"
trk_or_hdg = round(trk, 2)
dir_type = "true_north"
else:
if mb[13] == 48: # "0"
hdg = nan
else:
hdg = bin2int(mb[14:24]) / 1024.0 * 360.0
hdg = round(hdg, 2)
trk_or_hdg = hdg
spd = bin2int(mb[25:35])
spd = nan if spd == 0 else spd - 1
if subtype == 4: # Supersonic
spd *= 4
if mb[24] == 48: # "0"
tag = "IAS"
else:
tag = "TAS"
dir_type = "mag_north"
vr_source = "GNSS" if mb[35] == 48 else "Baro" # "0"
cdef int vr_sign = -1 if mb[36] == 49 else 1 # "1"
cdef int vr = bin2int(mb[37:46])
rocd = None if vr == 0 else int(vr_sign * (vr - 1) * 64)
if rtn_sources:
return int(spd), trk_or_hdg, rocd, tag, dir_type, vr_source
else:
return int(spd), trk_or_hdg, rocd, tag
def altitude_diff(bytes msg):
"""Decode the differece between GNSS and barometric altitude
Args:
msg (string): 28 bytes hexadecimal message string, TC=19
Returns:
int: Altitude difference in ft. Negative value indicates GNSS altitude
below barometric altitude.
"""
cdef int tc = typecode(msg)
if tc != 19:
raise RuntimeError("%s: Not a airborne velocity message, expecting TC=19" % msg)
cdef bytearray msgbin = hex2bin(msg)
cdef int sign = -1 if char_to_int(msgbin[80]) else 1
cdef int value = bin2int(msgbin[81:88])
if value == 0 or value == 127:
return None
else:
return sign * (value - 1) * 25 # in ft.

View File

@ -1,23 +0,0 @@
# cython: language_level=3
cdef int char_to_int(unsigned char binstr)
cdef unsigned char int_to_char(unsigned char i)
cpdef bytearray hex2bin(bytes hexstr)
cpdef long bin2int(bytearray binstr)
cpdef long hex2int(bytearray binstr)
cpdef unsigned char df(bytes msg)
cpdef long crc(bytes msg, bint encode=*)
cpdef long floor(double x)
cpdef str icao(bytes msg)
cpdef bint is_icao_assigned(bytes icao)
cpdef int typecode(bytes msg)
cpdef int cprNL(double lat)
cpdef str idcode(bytes msg)
cpdef int altcode(bytes msg)
cdef bytes data(bytes msg)
cpdef bint allzeros(bytes msg)

View File

@ -0,0 +1,23 @@
# cython: language_level=3
cdef int char_to_int(unsigned char binstr)
cdef unsigned char int_to_char(unsigned char i)
cpdef str hex2bin(str hexstr)
cpdef long bin2int(str binstr)
cpdef long hex2int(str binstr)
cpdef unsigned char df(str msg)
cpdef long crc(str msg, bint encode=*)
cpdef long floor(double x)
cpdef str icao(str msg)
cpdef bint is_icao_assigned(str icao)
cpdef int typecode(str msg)
cpdef int cprNL(double lat)
cpdef str idcode(str msg)
cpdef int altcode(str msg)
cdef str data(str msg)
cpdef bint allzeros(str msg)

View File

@ -24,49 +24,52 @@ cdef unsigned char int_to_char(unsigned char i):
@cython.boundscheck(False) @cython.boundscheck(False)
@cython.overflowcheck(False) @cython.overflowcheck(False)
cpdef bytearray hex2bin(bytes hexstr): cpdef str hex2bin(str hexstr):
"""Convert a hexdecimal string to binary string, with zero fillings.""" """Convert a hexdecimal string to binary string, with zero fillings."""
# num_of_bits = len(hexstr) * 4 # num_of_bits = len(hexstr) * 4
cdef Py_ssize_t len_hexstr = PyBytes_GET_SIZE(hexstr) cdef hexbytes = bytes(hexstr.encode())
# binstr = bin(int(hexstr, 16))[2:].zfill(int(num_of_bits)) cdef Py_ssize_t len_hexstr = PyBytes_GET_SIZE(hexbytes)
# binstr = bin(int(hexbytes, 16))[2:].zfill(int(num_of_bits))
cdef bytearray _binstr = bytearray(4 * len_hexstr) cdef bytearray _binstr = bytearray(4 * len_hexstr)
cdef unsigned char[:] binstr = _binstr cdef unsigned char[:] binstr = _binstr
cdef unsigned char int_ cdef unsigned char int_
cdef Py_ssize_t i cdef Py_ssize_t i
for i in range(len_hexstr): for i in range(len_hexstr):
int_ = char_to_int(hexstr[i]) int_ = char_to_int(hexbytes[i])
binstr[4*i] = int_to_char((int_ >> 3) & 1) binstr[4*i] = int_to_char((int_ >> 3) & 1)
binstr[4*i+1] = int_to_char((int_ >> 2) & 1) binstr[4*i+1] = int_to_char((int_ >> 2) & 1)
binstr[4*i+2] = int_to_char((int_ >> 1) & 1) binstr[4*i+2] = int_to_char((int_ >> 1) & 1)
binstr[4*i+3] = int_to_char((int_) & 1) binstr[4*i+3] = int_to_char((int_) & 1)
return _binstr return _binstr.decode()
@cython.boundscheck(False) @cython.boundscheck(False)
cpdef long bin2int(bytearray binstr): cpdef long bin2int(str binstr):
"""Convert a binary string to integer.""" """Convert a binary string to integer."""
# return int(binstr, 2) # return int(binstr, 2)
cdef Py_ssize_t len_ = PyByteArray_GET_SIZE(binstr) cdef bytearray binbytes = bytearray(binstr.encode())
cdef Py_ssize_t len_ = PyByteArray_GET_SIZE(binbytes)
cdef long cumul = 0 cdef long cumul = 0
cdef unsigned char[:] v_binstr = binstr cdef unsigned char[:] v_binstr = binbytes
for i in range(len_): for i in range(len_):
cumul = 2*cumul + char_to_int(v_binstr[i]) cumul = 2*cumul + char_to_int(v_binstr[i])
return cumul return cumul
@cython.boundscheck(False) @cython.boundscheck(False)
cpdef long hex2int(bytearray binstr): cpdef long hex2int(str hexstr):
"""Convert a binary string to integer.""" """Convert a binary string to integer."""
# return int(binstr, 2) # return int(hexstr, 2)
cdef Py_ssize_t len_ = PyByteArray_GET_SIZE(binstr) cdef bytearray binbytes = bytearray(hexstr.encode())
cdef Py_ssize_t len_ = PyByteArray_GET_SIZE(binbytes)
cdef long cumul = 0 cdef long cumul = 0
cdef unsigned char[:] v_binstr = binstr cdef unsigned char[:] v_hexstr = binbytes
for i in range(len_): for i in range(len_):
cumul = 16*cumul + char_to_int(v_binstr[i]) cumul = 16*cumul + char_to_int(v_hexstr[i])
return cumul return cumul
@cython.boundscheck(False) @cython.boundscheck(False)
cpdef unsigned char df(bytes msg): cpdef unsigned char df(str msg):
"""Decode Downlink Format vaule, bits 1 to 5.""" """Decode Downlink Format vaule, bits 1 to 5."""
cdef bytearray dfbin = hex2bin(msg[:2]) cdef str dfbin = hex2bin(msg[:2])
# return min(bin2int(dfbin[0:5]), 24) # return min(bin2int(dfbin[0:5]), 24)
cdef long df = bin2int(dfbin[0:5]) cdef long df = bin2int(dfbin[0:5])
if df > 24: if df > 24:
@ -79,7 +82,7 @@ cdef array.array _G = array.array('l', [0b11111111, 0b11111010, 0b00000100, 0b10
@cython.boundscheck(False) @cython.boundscheck(False)
@cython.wraparound(False) @cython.wraparound(False)
cpdef long crc(bytes msg, bint encode=False): cpdef long crc(str msg, bint encode=False):
"""Mode-S Cyclic Redundancy Check. """Mode-S Cyclic Redundancy Check.
Detect if bit error occurs in the Mode-S message. When encode option is on, Detect if bit error occurs in the Mode-S message. When encode option is on,
@ -99,7 +102,7 @@ cpdef long crc(bytes msg, bint encode=False):
# msgbin_split = wrap(msgbin, 8) # msgbin_split = wrap(msgbin, 8)
# mbytes = list(map(bin2int, msgbin_split)) # mbytes = list(map(bin2int, msgbin_split))
cdef bytearray _msgbin = hex2bin(msg) cdef bytearray _msgbin = bytearray(hex2bin(msg).encode())
cdef unsigned char[:] msgbin = _msgbin cdef unsigned char[:] msgbin = _msgbin
cdef Py_ssize_t len_msgbin = PyByteArray_GET_SIZE(_msgbin) cdef Py_ssize_t len_msgbin = PyByteArray_GET_SIZE(_msgbin)
@ -111,7 +114,7 @@ cpdef long crc(bytes msg, bint encode=False):
msgbin[i] = 0 msgbin[i] = 0
cdef array.array _mbytes = array.array( cdef array.array _mbytes = array.array(
'l', [bin2int(_msgbin[8*i:8*i+8]) for i in range(len_mbytes)] 'l', [bin2int(_msgbin[8*i:8*i+8].decode()) for i in range(len_mbytes)]
) )
cdef long[:] mbytes = _mbytes cdef long[:] mbytes = _mbytes
@ -151,7 +154,7 @@ cpdef long floor(double x):
""" """
return <long> c_floor(x) return <long> c_floor(x)
cpdef str icao(bytes msg): cpdef str icao(str msg):
"""Calculate the ICAO address from an Mode-S message. """Calculate the ICAO address from an Mode-S message.
Applicable only with DF4, DF5, DF20, DF21 messages. Applicable only with DF4, DF5, DF20, DF21 messages.
@ -166,13 +169,11 @@ cpdef str icao(bytes msg):
cdef unsigned char DF = df(msg) cdef unsigned char DF = df(msg)
cdef long c0, c1 cdef long c0, c1
cdef bytearray bmsg = bytearray(msg)
if DF in (11, 17, 18): if DF in (11, 17, 18):
addr = bmsg[2:8].decode() addr = msg[2:8]
elif DF in (0, 4, 5, 16, 20, 21): elif DF in (0, 4, 5, 16, 20, 21):
c0 = crc(msg, encode=True) c0 = crc(msg, encode=True)
c1 = hex2int(bmsg[-6:]) c1 = hex2int(msg[-6:])
addr = "%06X" % (c0 ^ c1) addr = "%06X" % (c0 ^ c1)
else: else:
addr = None addr = None
@ -180,7 +181,7 @@ cpdef str icao(bytes msg):
return addr return addr
cpdef bint is_icao_assigned(bytes icao): cpdef bint is_icao_assigned(str icao):
"""Check whether the ICAO address is assigned (Annex 10, Vol 3).""" """Check whether the ICAO address is assigned (Annex 10, Vol 3)."""
if (icao is None) or (not isinstance(icao, str)) or (len(icao) != 6): if (icao is None) or (not isinstance(icao, str)) or (len(icao) != 6):
return False return False
@ -210,7 +211,7 @@ cpdef bint is_icao_assigned(bytes icao):
@cython.boundscheck(False) @cython.boundscheck(False)
@cython.wraparound(False) @cython.wraparound(False)
cpdef int typecode(bytes msg): cpdef int typecode(str msg):
"""Type code of ADS-B message """Type code of ADS-B message
Args: Args:
@ -223,7 +224,7 @@ cpdef int typecode(bytes msg):
return -1 return -1
# return None # return None
cdef bytearray tcbin = hex2bin(msg[8:10]) cdef str tcbin = hex2bin(msg[8:10])
return bin2int(tcbin[0:5]) return bin2int(tcbin[0:5])
@cython.cdivision(True) @cython.cdivision(True)
@ -248,7 +249,7 @@ cpdef int cprNL(double lat):
@cython.boundscheck(False) @cython.boundscheck(False)
@cython.wraparound(False) @cython.wraparound(False)
cpdef str idcode(bytes msg): cpdef str idcode(str msg):
"""Compute identity (squawk code). """Compute identity (squawk code).
Applicable only for DF5 or DF21 messages, bit 20-32. Applicable only for DF5 or DF21 messages, bit 20-32.
@ -264,7 +265,7 @@ cpdef str idcode(bytes msg):
if df(msg) not in [5, 21]: if df(msg) not in [5, 21]:
raise RuntimeError("Message must be Downlink Format 5 or 21.") raise RuntimeError("Message must be Downlink Format 5 or 21.")
cdef bytearray _mbin = hex2bin(msg) cdef bytearray _mbin = bytearray(hex2bin(msg).encode())
cdef unsigned char[:] mbin = _mbin cdef unsigned char[:] mbin = _mbin
cdef bytearray _idcode = bytearray(4) cdef bytearray _idcode = bytearray(4)
@ -301,7 +302,7 @@ cpdef str idcode(bytes msg):
@cython.boundscheck(False) @cython.boundscheck(False)
@cython.wraparound(False) @cython.wraparound(False)
cpdef int altcode(bytes msg): cpdef int altcode(str msg):
"""Compute the altitude. """Compute the altitude.
Applicable only for DF4 or DF20 message, bit 20-32. Applicable only for DF4 or DF20 message, bit 20-32.
@ -318,52 +319,52 @@ cpdef int altcode(bytes msg):
raise RuntimeError("Message must be Downlink Format 0, 4, 16, or 20.") raise RuntimeError("Message must be Downlink Format 0, 4, 16, or 20.")
# Altitude code, bit 20-32 # Altitude code, bit 20-32
cdef bytearray _mbin = hex2bin(msg) cdef bytearray _mbin = bytearray(hex2bin(msg).encode())
cdef unsigned char[:] mbin = _mbin cdef unsigned char[:] mbin = _mbin
cdef char mbit = mbin[25] # M bit: 26 cdef char mbit = mbin[25] # M bit: 26
cdef char qbit = mbin[27] # Q bit: 28 cdef char qbit = mbin[27] # Q bit: 28
cdef int alt = 0 cdef int alt = 0
cdef bytearray vbin cdef bytearray vbin
cdef bytearray _graystr = bytearray(11) cdef bytearray _graybytes = bytearray(11)
cdef unsigned char[:] graystr = _graystr cdef unsigned char[:] graybytes = _graybytes
if mbit == 48: # unit in ft, "0" -> 48 if mbit == 48: # unit in ft, "0" -> 48
if qbit == 49: # 25ft interval, "1" -> 49 if qbit == 49: # 25ft interval, "1" -> 49
vbin = _mbin[19:25] + _mbin[26:27] + _mbin[28:32] vbin = _mbin[19:25] + _mbin[26:27] + _mbin[28:32]
alt = bin2int(vbin) * 25 - 1000 alt = bin2int(vbin.decode()) * 25 - 1000
if qbit == 48: # 100ft interval, above 50175ft, "0" -> 48 if qbit == 48: # 100ft interval, above 50175ft, "0" -> 48
graystr[8] = mbin[19] graybytes[8] = mbin[19]
graystr[2] = mbin[20] graybytes[2] = mbin[20]
graystr[9] = mbin[21] graybytes[9] = mbin[21]
graystr[3] = mbin[22] graybytes[3] = mbin[22]
graystr[10] = mbin[23] graybytes[10] = mbin[23]
graystr[4] = mbin[24] graybytes[4] = mbin[24]
# _ = mbin[25] # _ = mbin[25]
graystr[5] = mbin[26] graybytes[5] = mbin[26]
# cdef char D1 = mbin[27] # always zero # cdef char D1 = mbin[27] # always zero
graystr[6] = mbin[28] graybytes[6] = mbin[28]
graystr[0] = mbin[29] graybytes[0] = mbin[29]
graystr[7] = mbin[30] graybytes[7] = mbin[30]
graystr[1] = mbin[31] graybytes[1] = mbin[31]
# graystr = D2 + D4 + A1 + A2 + A4 + B1 + B2 + B4 + C1 + C2 + C4 # graybytes = D2 + D4 + A1 + A2 + A4 + B1 + B2 + B4 + C1 + C2 + C4
alt = gray2alt(_graystr) alt = gray2alt(_graybytes.decode())
if mbit == 49: # unit in meter, "1" -> 49 if mbit == 49: # unit in meter, "1" -> 49
vbin = _mbin[19:25] + _mbin[26:31] vbin = _mbin[19:25] + _mbin[26:31]
alt = int(bin2int(vbin) * 3.28084) # convert to ft alt = int(bin2int(vbin.decode()) * 3.28084) # convert to ft
return alt return alt
cpdef int gray2alt(bytearray codestr): cpdef int gray2alt(str codestr):
cdef bytearray gc500 = codestr[:8] cdef str gc500 = codestr[:8]
cdef int n500 = gray2int(gc500) cdef int n500 = gray2int(gc500)
# in 100-ft step must be converted first # in 100-ft step must be converted first
cdef bytearray gc100 = codestr[8:] cdef str gc100 = codestr[8:]
cdef int n100 = gray2int(gc100) cdef int n100 = gray2int(gc100)
if n100 in [0, 5, 6]: if n100 in [0, 5, 6]:
@ -380,7 +381,7 @@ cpdef int gray2alt(bytearray codestr):
return alt return alt
cdef int gray2int(bytearray graystr): cdef int gray2int(str graystr):
"""Convert greycode to binary.""" """Convert greycode to binary."""
cdef int num = bin2int(graystr) cdef int num = bin2int(graystr)
num ^= num >> 8 num ^= num >> 8
@ -390,12 +391,12 @@ cdef int gray2int(bytearray graystr):
return num return num
cdef bytes data(bytes msg): cdef str data(str msg):
"""Return the data frame in the message, bytes 9 to 22.""" """Return the data frame in the message, bytes 9 to 22."""
return msg[8:-6] return msg[8:-6]
cpdef bint allzeros(bytes msg): cpdef bint allzeros(str msg):
"""Check if the data bits are all zeros. """Check if the data bits are all zeros.
Args: Args:

View File

@ -19,14 +19,7 @@ from setuptools import setup, find_packages
from setuptools.extension import Extension from setuptools.extension import Extension
from Cython.Build import cythonize from Cython.Build import cythonize
extensions = [ extensions = [Extension("pyModeS.decoder.c_common", ["pyModeS/decoder/c_common.pyx"])]
Extension("pyModeS.c_decoder.common", ["pyModeS/c_decoder/common.pyx"]),
Extension("pyModeS.c_decoder.adsb", ["pyModeS/c_decoder/adsb.pyx"]),
Extension("pyModeS.c_decoder.bds.bds05", ["pyModeS/c_decoder/bds/bds05.pyx"]),
Extension("pyModeS.c_decoder.bds.bds06", ["pyModeS/c_decoder/bds/bds06.pyx"]),
Extension("pyModeS.c_decoder.bds.bds08", ["pyModeS/c_decoder/bds/bds08.pyx"]),
Extension("pyModeS.c_decoder.bds.bds09", ["pyModeS/c_decoder/bds/bds09.pyx"]),
]
# To use a consistent encoding # To use a consistent encoding

View File

@ -2,6 +2,7 @@ import sys
import time import time
import pandas as pd import pandas as pd
from tqdm import tqdm from tqdm import tqdm
from pyModeS.decoder import adsb
fin = sys.argv[1] fin = sys.argv[1]
@ -13,7 +14,6 @@ total = df_adsb.shape[0]
def native(): def native():
from pyModeS.decoder import adsb
from pyModeS.decoder import common from pyModeS.decoder import common
# airborne position # airborne position
@ -26,7 +26,7 @@ def native():
for i, r in tqdm(df_adsb.iterrows(), total=total): for i, r in tqdm(df_adsb.iterrows(), total=total):
ts = r.ts ts = r.ts
m = r.msg.encode() m = r.msg
downlink_format = common.df(m) downlink_format = common.df(m)
crc = common.crc(m) crc = common.crc(m)
@ -68,8 +68,7 @@ def native():
def cython(): def cython():
from pyModeS.c_decoder import adsb from pyModeS.decoder import c_common as common
from pyModeS.c_decoder import common
# airborne position # airborne position
m_air_0 = None m_air_0 = None
@ -81,7 +80,7 @@ def cython():
for i, r in tqdm(df_adsb.iterrows(), total=total): for i, r in tqdm(df_adsb.iterrows(), total=total):
ts = r.ts ts = r.ts
m = r.msg.encode() m = r.msg
downlink_format = common.df(m) downlink_format = common.df(m)
crc = common.crc(m) crc = common.crc(m)

View File

@ -1,95 +0,0 @@
from pyModeS.c_decoder import adsb
# === TEST ADS-B package ===
def test_adsb_icao():
assert adsb.icao(b"8D406B902015A678D4D220AA4BDA") == "406B90"
def test_adsb_category():
assert adsb.category(b"8D406B902015A678D4D220AA4BDA") == 0
def test_adsb_callsign():
assert adsb.callsign(b"8D406B902015A678D4D220AA4BDA") == "EZY85MH_"
def test_adsb_position():
pos = adsb.position(
b"8D40058B58C901375147EFD09357",
b"8D40058B58C904A87F402D3B8C59",
1446332400,
1446332405,
)
assert pos == (49.81755, 6.08442)
def test_adsb_position_swap_odd_even():
pos = adsb.position(
b"8D40058B58C904A87F402D3B8C59",
b"8D40058B58C901375147EFD09357",
1446332405,
1446332400,
)
assert pos == (49.81755, 6.08442)
def test_adsb_position_with_ref():
pos = adsb.position_with_ref(b"8D40058B58C901375147EFD09357", 49.0, 6.0)
assert pos == (49.82410, 6.06785)
pos = adsb.position_with_ref(b"8FC8200A3AB8F5F893096B000000", -43.5, 172.5)
assert pos == (-43.48564, 172.53942)
def test_adsb_airborne_position_with_ref():
pos = adsb.airborne_position_with_ref(b"8D40058B58C901375147EFD09357", 49.0, 6.0)
assert pos == (49.82410, 6.06785)
pos = adsb.airborne_position_with_ref(b"8D40058B58C904A87F402D3B8C59", 49.0, 6.0)
assert pos == (49.81755, 6.08442)
def test_adsb_surface_position_with_ref():
pos = adsb.surface_position_with_ref(b"8FC8200A3AB8F5F893096B000000", -43.5, 172.5)
assert pos == (-43.48564, 172.53942)
def test_adsb_surface_position():
pos = adsb.surface_position(
b"8CC8200A3AC8F009BCDEF2000000",
b"8FC8200A3AB8F5F893096B000000",
0,
2,
-43.496,
172.558,
)
assert pos == (-43.48564, 172.53942)
def test_adsb_alt():
assert adsb.altitude(b"8D40058B58C901375147EFD09357") == 39000
def test_adsb_velocity():
vgs = adsb.velocity(b"8D485020994409940838175B284F")
vas = adsb.velocity(b"8DA05F219B06B6AF189400CBC33F")
vgs_surface = adsb.velocity(b"8FC8200A3AB8F5F893096B000000")
assert vgs == (159, 182.88, -832, "GS")
assert vas == (375, 243.98, -2304, "TAS")
assert vgs_surface == (19.0, 42.2, 0, "GS")
assert adsb.altitude_diff(b"8D485020994409940838175B284F") == 550
# def test_nic():
# assert adsb.nic('8D3C70A390AB11F55B8C57F65FE6') == 0
# assert adsb.nic('8DE1C9738A4A430B427D219C8225') == 1
# assert adsb.nic('8D44058880B50006B1773DC2A7E9') == 2
# assert adsb.nic('8D44058881B50006B1773DC2A7E9') == 3
# assert adsb.nic('8D4AB42A78000640000000FA0D0A') == 4
# assert adsb.nic('8D4405887099F5D9772F37F86CB6') == 5
# assert adsb.nic('8D4841A86841528E72D9B472DAC2') == 6
# assert adsb.nic('8D44057560B9760C0B840A51C89F') == 7
# assert adsb.nic('8D40621D58C382D690C8AC2863A7') == 8
# assert adsb.nic('8F48511C598D04F12CCF82451642') == 9
# assert adsb.nic('8DA4D53A50DBF8C6330F3B35458F') == 10
# assert adsb.nic('8D3C4ACF4859F1736F8E8ADF4D67') == 11

View File

@ -1,61 +1,60 @@
from pyModeS.c_decoder import common from pyModeS.decoder import c_common as common
def test_conversions(): def test_conversions():
assert common.hex2bin(b"6E406B") == bytearray(b"011011100100000001101011") assert common.hex2bin("6E406B") == "011011100100000001101011"
def test_crc_decode(): def test_crc_decode():
assert common.crc(b"8D406B902015A678D4D220AA4BDA") == 0 assert common.crc("8D406B902015A678D4D220AA4BDA") == 0
assert common.crc(b"8d8960ed58bf053cf11bc5932b7d") == 0 assert common.crc("8d8960ed58bf053cf11bc5932b7d") == 0
assert common.crc(b"8d45cab390c39509496ca9a32912") == 0 assert common.crc("8d45cab390c39509496ca9a32912") == 0
assert common.crc(b"8d49d3d4e1089d00000000744c3b") == 0 assert common.crc("8d74802958c904e6ef4ba0184d5c") == 0
assert common.crc(b"8d74802958c904e6ef4ba0184d5c") == 0 assert common.crc("8d4400cd9b0000b4f87000e71a10") == 0
assert common.crc(b"8d4400cd9b0000b4f87000e71a10") == 0 assert common.crc("8d4065de58a1054a7ef0218e226a") == 0
assert common.crc(b"8d4065de58a1054a7ef0218e226a") == 0
assert common.crc(b"c80b2dca34aa21dd821a04cb64d4") == 10719924 assert common.crc("c80b2dca34aa21dd821a04cb64d4") == 10719924
assert common.crc(b"a800089d8094e33a6004e4b8a522") == 4805588 assert common.crc("a800089d8094e33a6004e4b8a522") == 4805588
assert common.crc(b"a8000614a50b6d32bed000bbe0ed") == 5659991 assert common.crc("a8000614a50b6d32bed000bbe0ed") == 5659991
assert common.crc(b"a0000410bc900010a40000f5f477") == 11727682 assert common.crc("a0000410bc900010a40000f5f477") == 11727682
assert common.crc(b"8d4ca251204994b1c36e60a5343d") == 16 assert common.crc("8d4ca251204994b1c36e60a5343d") == 16
assert common.crc(b"b0001718c65632b0a82040715b65") == 353333 assert common.crc("b0001718c65632b0a82040715b65") == 353333
def test_crc_encode(): def test_crc_encode():
parity = common.crc(b"8D406B902015A678D4D220AA4BDA", encode=True) parity = common.crc("8D406B902015A678D4D220AA4BDA", encode=True)
assert parity == 11160538 assert parity == 11160538
def test_icao(): def test_icao():
assert common.icao(b"8D406B902015A678D4D220AA4BDA") == "406B90" assert common.icao("8D406B902015A678D4D220AA4BDA") == "406B90"
assert common.icao(b"A0001839CA3800315800007448D9") == "400940" assert common.icao("A0001839CA3800315800007448D9") == "400940"
assert common.icao(b"A000139381951536E024D4CCF6B5") == "3C4DD2" assert common.icao("A000139381951536E024D4CCF6B5") == "3C4DD2"
assert common.icao(b"A000029CFFBAA11E2004727281F1") == "4243D0" assert common.icao("A000029CFFBAA11E2004727281F1") == "4243D0"
def test_modes_altcode(): def test_modes_altcode():
assert common.altcode(b"A02014B400000000000000F9D514") == 32300 assert common.altcode("A02014B400000000000000F9D514") == 32300
def test_modes_idcode(): def test_modes_idcode():
assert common.idcode(b"A800292DFFBBA9383FFCEB903D01") == "1346" assert common.idcode("A800292DFFBBA9383FFCEB903D01") == "1346"
def test_graycode_to_altitude(): def test_graycode_to_altitude():
assert common.gray2alt(bytearray(b"00000000010")) == -1000 assert common.gray2alt("00000000010") == -1000
assert common.gray2alt(bytearray(b"00000001010")) == -500 assert common.gray2alt("00000001010") == -500
assert common.gray2alt(bytearray(b"00000011011")) == -100 assert common.gray2alt("00000011011") == -100
assert common.gray2alt(bytearray(b"00000011010")) == 0 assert common.gray2alt("00000011010") == 0
assert common.gray2alt(bytearray(b"00000011110")) == 100 assert common.gray2alt("00000011110") == 100
assert common.gray2alt(bytearray(b"00000010011")) == 600 assert common.gray2alt("00000010011") == 600
assert common.gray2alt(bytearray(b"00000110010")) == 1000 assert common.gray2alt("00000110010") == 1000
assert common.gray2alt(bytearray(b"00001001001")) == 5800 assert common.gray2alt("00001001001") == 5800
assert common.gray2alt(bytearray(b"00011100100")) == 10300 assert common.gray2alt("00011100100") == 10300
assert common.gray2alt(bytearray(b"01100011010")) == 32000 assert common.gray2alt("01100011010") == 32000
assert common.gray2alt(bytearray(b"01110000100")) == 46300 assert common.gray2alt("01110000100") == 46300
assert common.gray2alt(bytearray(b"01010101100")) == 50200 assert common.gray2alt("01010101100") == 50200
assert common.gray2alt(bytearray(b"11011110100")) == 73200 assert common.gray2alt("11011110100") == 73200
assert common.gray2alt(bytearray(b"10000000011")) == 126600 assert common.gray2alt("10000000011") == 126600
assert common.gray2alt(bytearray(b"10000000001")) == 126700 assert common.gray2alt("10000000001") == 126700

View File

@ -1,4 +1,4 @@
from pyModeS import common from pyModeS.decoder import common
def test_conversions(): def test_conversions():