fully annotated code

This commit is contained in:
Xavier Olive 2022-11-01 16:32:42 +09:00
parent d3607a16c1
commit 51222372b1
32 changed files with 869 additions and 404 deletions

View File

@ -4,7 +4,7 @@ import warnings
try: try:
from . import c_common as common from . import c_common as common
from .c_common import * from .c_common import *
except: except Exception:
from . import py_common as common # type: ignore from . import py_common as common # type: ignore
from .py_common import * # type: ignore from .py_common import * # type: ignore
@ -17,6 +17,18 @@ from .decoder import bds
from .extra import aero from .extra import aero
from .extra import tcpclient from .extra import tcpclient
__all__ = [
"common",
"tell",
"adsb",
"commb",
"allcall",
"surv",
"bds",
"aero",
"tcpclient",
]
warnings.simplefilter("once", DeprecationWarning) warnings.simplefilter("once", DeprecationWarning)

View File

@ -2,28 +2,17 @@ def hex2bin(hexstr: str) -> str: ...
def bin2int(binstr: str) -> int: ... def bin2int(binstr: str) -> int: ...
def hex2int(hexstr: str) -> int: ... def hex2int(hexstr: str) -> int: ...
def bin2hex(binstr: str) -> str: ... def bin2hex(binstr: str) -> str: ...
def df(msg: str) -> int: ... def df(msg: str) -> int: ...
def crc(msg: str, encode: bool = False) -> int: ... def crc(msg: str, encode: bool = False) -> int: ...
def floor(x: float) -> float: ... def floor(x: float) -> float: ...
def icao(msg: str) -> str: ... def icao(msg: str) -> str: ...
def is_icao_assigned(icao: str) -> bool: ... def is_icao_assigned(icao: str) -> bool: ...
def typecode(msg: str) -> int: ... def typecode(msg: str) -> int: ...
def cprNL(lat: float) -> int: ... def cprNL(lat: float) -> int: ...
def idcode(msg: str) -> str: ... def idcode(msg: str) -> str: ...
def squawk(binstr: str) -> str: ... def squawk(binstr: str) -> str: ...
def altcode(msg: str) -> int: ... def altcode(msg: str) -> int: ...
def altitude(binstr: str) -> int: ... def altitude(binstr: str) -> int: ...
def data(msg: str) -> str: ... def data(msg: str) -> str: ...
def allzeros(msg: str) -> bool: ... def allzeros(msg: str) -> bool: ...
def wrongstatus(data: str, sb: int, msb: int, lsb: int) -> bool: ...

22
pyModeS/common.pyi Normal file
View File

@ -0,0 +1,22 @@
from typing import Optional
def hex2bin(hexstr: str) -> str: ...
def bin2int(binstr: str) -> int: ...
def hex2int(hexstr: str) -> int: ...
def bin2hex(binstr: str) -> str: ...
def df(msg: str) -> int: ...
def crc(msg: str, encode: bool = False) -> int: ...
def floor(x: float) -> float: ...
def icao(msg: str) -> Optional[str]: ...
def is_icao_assigned(icao: str) -> bool: ...
def typecode(msg: str) -> Optional[int]: ...
def cprNL(lat: float) -> int: ...
def idcode(msg: str) -> str: ...
def squawk(binstr: str) -> str: ...
def altcode(msg: str) -> Optional[int]: ...
def altitude(binstr: str) -> Optional[int]: ...
def gray2alt(binstr: str) -> Optional[int]: ...
def gray2int(binstr: str) -> int: ...
def data(msg: str) -> str: ...
def allzeros(msg: str) -> bool: ...
def wrongstatus(data: str, sb: int, msb: int, lsb: int) -> bool: ...

View File

@ -1,5 +1,5 @@
def tell(msg: str) -> None: def tell(msg: str) -> None:
from pyModeS import common, adsb, commb, bds from .. import common, adsb, commb, bds
def _print(label, value, unit=None): def _print(label, value, unit=None):
print("%28s: " % label, end="") print("%28s: " % label, end="")
@ -20,6 +20,11 @@ def tell(msg: str) -> None:
_print("Protocol", "Mode-S Extended Squitter (ADS-B)") _print("Protocol", "Mode-S Extended Squitter (ADS-B)")
tc = common.typecode(msg) tc = common.typecode(msg)
if tc is None:
_print("ERROR", "Unknown typecode")
return
if 1 <= tc <= 4: # callsign if 1 <= tc <= 4: # callsign
callsign = adsb.callsign(msg) callsign = adsb.callsign(msg)
_print("Type", "Identification and category") _print("Type", "Identification and category")
@ -52,12 +57,14 @@ def tell(msg: str) -> None:
if tc == 19: if tc == 19:
_print("Type", "Airborne velocity") _print("Type", "Airborne velocity")
spd, trk, vr, t = adsb.velocity(msg) velocity = adsb.velocity(msg)
types = {"GS": "Ground speed", "TAS": "True airspeed"} if velocity is not None:
_print("Speed", spd, "knots") spd, trk, vr, t = velocity
_print("Track", trk, "degrees") types = {"GS": "Ground speed", "TAS": "True airspeed"}
_print("Vertical rate", vr, "feet/minute") _print("Speed", spd, "knots")
_print("Type", types[t]) _print("Track", trk, "degrees")
_print("Vertical rate", vr, "feet/minute")
_print("Type", types[t])
if 20 <= tc <= 22: # airborne position if 20 <= tc <= 22: # airborne position
_print("Type", "Airborne position (with GNSS altitude)") _print("Type", "Airborne position (with GNSS altitude)")
@ -106,8 +113,16 @@ def tell(msg: str) -> None:
_print("Angle", angle, "°") _print("Angle", angle, "°")
_print("Angle Type", angle_type) _print("Angle Type", angle_type)
_print("Angle Source", angle_source) _print("Angle Source", angle_source)
_print("Vertical mode", vertical_horizontal_types[vertical_mode]) if vertical_mode is not None:
_print("Horizontal mode", vertical_horizontal_types[horizontal_mode]) _print(
"Vertical mode",
vertical_horizontal_types[vertical_mode],
)
if horizontal_mode is not None:
_print(
"Horizontal mode",
vertical_horizontal_types[horizontal_mode],
)
_print( _print(
"TCAS/ACAS", "TCAS/ACAS",
tcas_operational_types[tcas_operational] tcas_operational_types[tcas_operational]
@ -117,7 +132,7 @@ def tell(msg: str) -> None:
_print("TCAS/ACAS RA", tcas_ra_types[tcas_ra]) _print("TCAS/ACAS RA", tcas_ra_types[tcas_ra])
_print("Emergency status", emergency_types[emergency_status]) _print("Emergency status", emergency_types[emergency_status])
else: else:
alt, alt_source = adsb.selected_altitude(msg) alt, alt_source = adsb.selected_altitude(msg) # type: ignore
baro = adsb.baro_pressure_setting(msg) baro = adsb.baro_pressure_setting(msg)
hdg = adsb.selected_heading(msg) hdg = adsb.selected_heading(msg)
autopilot = adsb.autopilot(msg) autopilot = adsb.autopilot(msg)
@ -127,13 +142,20 @@ def tell(msg: str) -> None:
lnav = adsb.lnav_mode(msg) lnav = adsb.lnav_mode(msg)
_print("Selected altitude", alt, "feet") _print("Selected altitude", alt, "feet")
_print("Altitude source", alt_source) _print("Altitude source", alt_source)
_print("Barometric pressure setting", baro, "" if baro == None else "millibars") _print(
"Barometric pressure setting",
baro,
"" if baro is None else "millibars",
)
_print("Selected Heading", hdg, "°") _print("Selected Heading", hdg, "°")
if not (common.bin2int((common.hex2bin(msg)[32:])[46]) == 0): if not (common.bin2int((common.hex2bin(msg)[32:])[46]) == 0):
_print("Autopilot", types_29[autopilot] if autopilot else None) _print(
"Autopilot", types_29[autopilot] if autopilot else None
)
_print("VNAV mode", types_29[vnav] if vnav else None) _print("VNAV mode", types_29[vnav] if vnav else None)
_print( _print(
"Altitude hold mode", types_29[alt_hold] if alt_hold else None "Altitude hold mode",
types_29[alt_hold] if alt_hold else None,
) )
_print("Approach mode", types_29[app] if app else None) _print("Approach mode", types_29[app] if app else None)
_print( _print(
@ -167,7 +189,7 @@ def tell(msg: str) -> None:
} }
BDS = bds.infer(msg, mrar=True) BDS = bds.infer(msg, mrar=True)
if BDS in labels.keys(): if BDS is not None and BDS in labels.keys():
_print("BDS", "%s (%s)" % (BDS, labels[BDS])) _print("BDS", "%s (%s)" % (BDS, labels[BDS]))
else: else:
_print("BDS", BDS) _print("BDS", BDS)

View File

@ -2,65 +2,120 @@
The ADS-B module also imports functions from the following modules: The ADS-B module also imports functions from the following modules:
- pyModeS.decoder.bds.bds05: ``airborne_position()``, ``airborne_position_with_ref()``, ``altitude()`` - bds05: ``airborne_position()``, ``airborne_position_with_ref()``,
- pyModeS.decoder.bds.bds06: ``surface_position()``, ``surface_position_with_ref()``, ``surface_velocity()`` ``altitude()``
- pyModeS.decoder.bds.bds08: ``category()``, ``callsign()`` - bds06: ``surface_position()``, ``surface_position_with_ref()``,
- pyModeS.decoder.bds.bds09: ``airborne_velocity()``, ``altitude_diff()`` ``surface_velocity()``
- bds08: ``category()``, ``callsign()``
- bds09: ``airborne_velocity()``, ``altitude_diff()``
""" """
import pyModeS as pms from __future__ import annotations
from datetime import datetime
from pyModeS import common from .. import common
from . import uncertainty
from pyModeS.decoder import uncertainty from .bds.bds05 import airborne_position, airborne_position_with_ref
from .bds.bds05 import altitude as altitude05
# from pyModeS.decoder.bds import bds05, bds06, bds09 from .bds.bds06 import (
from pyModeS.decoder.bds.bds05 import (
airborne_position,
airborne_position_with_ref,
altitude as altitude05,
)
from pyModeS.decoder.bds.bds06 import (
surface_position, surface_position,
surface_position_with_ref, surface_position_with_ref,
surface_velocity, surface_velocity,
) )
from pyModeS.decoder.bds.bds08 import category, callsign from .bds.bds08 import callsign, category
from pyModeS.decoder.bds.bds09 import airborne_velocity, altitude_diff from .bds.bds09 import airborne_velocity, altitude_diff
from pyModeS.decoder.bds.bds61 import is_emergency, emergency_state, emergency_squawk from .bds.bds61 import emergency_squawk, emergency_state, is_emergency
from pyModeS.decoder.bds.bds62 import ( from .bds.bds62 import (
altitude_hold_mode,
approach_mode,
autopilot,
baro_pressure_setting,
emergency_status,
horizontal_mode,
lnav_mode,
selected_altitude, selected_altitude,
selected_heading, selected_heading,
target_altitude, target_altitude,
target_angle, target_angle,
tcas_operational, tcas_operational,
tcas_ra, tcas_ra,
baro_pressure_setting,
vertical_mode, vertical_mode,
horizontal_mode,
vnav_mode, vnav_mode,
lnav_mode,
autopilot,
altitude_hold_mode,
approach_mode,
emergency_status,
) )
__all__ = [
"airborne_position",
"airborne_position_with_ref",
"altitude05",
"surface_position",
"surface_position_with_ref",
"surface_velocity",
"callsign",
"category",
"airborne_velocity",
"altitude_diff",
"emergency_squawk",
"emergency_state",
"is_emergency",
"df",
"icao",
"typecode",
"position",
"position_with_ref",
"altitude",
"velocity",
"speed_heading",
"oe_flag",
"version",
"nuc_p",
"nuc_v",
"nic_v1",
"nic_v2",
"nic_s",
"nic_a_c",
"nic_b",
"nac_p",
"nac_v",
"sil",
"selected_altitude",
"target_altitude",
"vertical_mode",
"horizontal_mode",
"selected_heading",
"target_angle",
"baro_pressure_setting",
"autopilot",
"vnav_mode",
"altitude_hold_mode",
"approach_mode",
"lnav_mode",
"tcas_operational",
"tcas_ra",
"emergency_status",
]
def df(msg):
def df(msg: str) -> int:
return common.df(msg) return common.df(msg)
def icao(msg): def icao(msg: str) -> None | str:
return common.icao(msg) return common.icao(msg)
def typecode(msg): def typecode(msg: str) -> None | int:
return common.typecode(msg) return common.typecode(msg)
def position(msg0, msg1, t0, t1, lat_ref=None, lon_ref=None): def position(
msg0: str,
msg1: str,
t0: int | datetime,
t1: int | datetime,
lat_ref: None | float = None,
lon_ref: None | float = None,
) -> None | tuple[float, float]:
"""Decode surface or airborne position from a pair of even and odd """Decode surface or airborne position from a pair of even and odd
position messages. position messages.
@ -82,6 +137,9 @@ def position(msg0, msg1, t0, t1, lat_ref=None, lon_ref=None):
tc0 = typecode(msg0) tc0 = typecode(msg0)
tc1 = typecode(msg1) tc1 = typecode(msg1)
if tc0 is None or tc1 is None:
raise RuntimeError("Incorrect or inconsistent message types")
if 5 <= tc0 <= 8 and 5 <= tc1 <= 8: if 5 <= tc0 <= 8 and 5 <= tc1 <= 8:
if lat_ref is None or lon_ref is None: if lat_ref is None or lon_ref is None:
raise RuntimeError( raise RuntimeError(
@ -103,7 +161,9 @@ def position(msg0, msg1, t0, t1, lat_ref=None, lon_ref=None):
raise RuntimeError("Incorrect or inconsistent message types") raise RuntimeError("Incorrect or inconsistent message types")
def position_with_ref(msg, lat_ref, lon_ref): def position_with_ref(
msg: str, lat_ref: float, lon_ref: float
) -> tuple[float, float]:
"""Decode position with only one message. """Decode position with only one message.
A reference position is required, which can be previously A reference position is required, which can be previously
@ -123,6 +183,9 @@ def position_with_ref(msg, lat_ref, lon_ref):
tc = typecode(msg) tc = typecode(msg)
if tc is None:
raise RuntimeError("incorrect or inconsistent message types")
if 5 <= tc <= 8: if 5 <= tc <= 8:
return surface_position_with_ref(msg, lat_ref, lon_ref) return surface_position_with_ref(msg, lat_ref, lon_ref)
@ -133,7 +196,7 @@ def position_with_ref(msg, lat_ref, lon_ref):
raise RuntimeError("incorrect or inconsistent message types") raise RuntimeError("incorrect or inconsistent message types")
def altitude(msg): def altitude(msg: str) -> None | float:
"""Decode aircraft altitude. """Decode aircraft altitude.
Args: Args:
@ -145,7 +208,7 @@ def altitude(msg):
""" """
tc = typecode(msg) tc = typecode(msg)
if tc < 5 or tc == 19 or tc > 22: if tc is None or tc < 5 or tc == 19 or tc > 22:
raise RuntimeError("%s: Not a position message" % msg) raise RuntimeError("%s: Not a position message" % msg)
elif tc >= 5 and tc <= 8: elif tc >= 5 and tc <= 8:
@ -157,16 +220,20 @@ def altitude(msg):
return altitude05(msg) return altitude05(msg)
def velocity(msg, source=False): def velocity(
"""Calculate the speed, heading, and vertical rate (handles both airborne or surface message). msg: str, source: bool = False
) -> None | tuple[None | float, None | float, None | int, str]:
"""Calculate the speed, heading, and vertical rate
(handles both airborne or surface message).
Args: Args:
msg (str): 28 hexdigits string msg (str): 28 hexdigits string
source (boolean): Include direction and vertical rate sources in return. Default to False. source (boolean): Include direction and vertical rate sources in return.
If set to True, the function will return six values instead of four. Default to False.
If set to True, the function will return six value instead of four.
Returns: Returns:
(int, float, int, string, [string], [string]): Four or six parameters, including: int, float, int, string, [string], [string]:
- Speed (kt) - Speed (kt)
- Angle (degree), either ground track or heading - Angle (degree), either ground track or heading
- Vertical rate (ft/min) - Vertical rate (ft/min)
@ -174,22 +241,26 @@ def velocity(msg, source=False):
- [Optional] Direction source ('TRUE_NORTH' or 'MAGNETIC_NORTH') - [Optional] Direction source ('TRUE_NORTH' or 'MAGNETIC_NORTH')
- [Optional] Vertical rate source ('BARO' or 'GNSS') - [Optional] Vertical rate source ('BARO' or 'GNSS')
For surface messages, vertical rate and its respective sources are set to None. For surface messages, vertical rate and its respective sources are set
to None.
""" """
if 5 <= typecode(msg) <= 8: tc = typecode(msg)
error = "incorrect or inconsistent message types, expecting 4<TC<9 or TC=19"
if tc is None:
raise RuntimeError(error)
if 5 <= tc <= 8:
return surface_velocity(msg, source) return surface_velocity(msg, source)
elif typecode(msg) == 19: elif tc == 19:
return airborne_velocity(msg, source) return airborne_velocity(msg, source)
else: else:
raise RuntimeError( raise RuntimeError(error)
"incorrect or inconsistent message types, expecting 4<TC<9 or TC=19"
)
def speed_heading(msg): def speed_heading(msg: str) -> None | tuple[None | float, None | float]:
"""Get speed and ground track (or heading) from the velocity message """Get speed and ground track (or heading) from the velocity message
(handles both airborne or surface message) (handles both airborne or surface message)
@ -199,11 +270,14 @@ def speed_heading(msg):
Returns: Returns:
(int, float): speed (kt), ground track or heading (degree) (int, float): speed (kt), ground track or heading (degree)
""" """
spd, trk_or_hdg, rocd, tag = velocity(msg) decoded = velocity(msg)
if decoded is None:
return None
spd, trk_or_hdg, rocd, tag = decoded
return spd, trk_or_hdg return spd, trk_or_hdg
def oe_flag(msg): def oe_flag(msg: str) -> int:
"""Check the odd/even flag. Bit 54, 0 for even, 1 for odd. """Check the odd/even flag. Bit 54, 0 for even, 1 for odd.
Args: Args:
msg (str): 28 hexdigits string msg (str): 28 hexdigits string
@ -214,7 +288,7 @@ def oe_flag(msg):
return int(msgbin[53]) return int(msgbin[53])
def version(msg): def version(msg: str) -> int:
"""ADS-B Version """ADS-B Version
Args: Args:
@ -236,13 +310,15 @@ def version(msg):
return version return version
def nuc_p(msg): def nuc_p(msg: str) -> tuple[int, None | float, None | int, None | int]:
"""Calculate NUCp, Navigation Uncertainty Category - Position (ADS-B version 1) """Calculate NUCp, Navigation Uncertainty Category - Position
(ADS-B version 1)
Args: Args:
msg (str): 28 hexdigits string, msg (str): 28 hexdigits string,
Returns: Returns:
int: NUCp, Navigation Uncertainty Category (position)
int: Horizontal Protection Limit int: Horizontal Protection Limit
int: 95% Containment Radius - Horizontal (meters) int: 95% Containment Radius - Horizontal (meters)
int: 95% Containment Radius - Vertical (meters) int: 95% Containment Radius - Vertical (meters)
@ -250,7 +326,7 @@ def nuc_p(msg):
""" """
tc = typecode(msg) tc = typecode(msg)
if typecode(msg) < 5 or typecode(msg) > 22: if tc is None or tc < 5 or tc is None or tc > 22:
raise RuntimeError( raise RuntimeError(
"%s: Not a surface position message (5<TC<8), \ "%s: Not a surface position message (5<TC<8), \
airborne position message (8<TC<19), \ airborne position message (8<TC<19), \
@ -258,12 +334,15 @@ def nuc_p(msg):
% msg % msg
) )
try: NUCp = uncertainty.TC_NUCp_lookup[tc]
NUCp = uncertainty.TC_NUCp_lookup[tc] index = uncertainty.NUCp.get(NUCp, None)
HPL = uncertainty.NUCp[NUCp]["HPL"]
RCu = uncertainty.NUCp[NUCp]["RCu"] if index is not None:
except KeyError: HPL = index["HPL"]
HPL, RCu = uncertainty.NA, uncertainty.NA RCu = index["RCu"]
RCv = index["RCv"]
else:
HPL, RCu, RCv = uncertainty.NA, uncertainty.NA, uncertainty.NA
RCv = uncertainty.NA RCv = uncertainty.NA
@ -273,16 +352,18 @@ def nuc_p(msg):
elif tc == 21: elif tc == 21:
RCv = 15 RCv = 15
return HPL, RCu, RCv return NUCp, HPL, RCu, RCv
def nuc_v(msg): def nuc_v(msg: str) -> tuple[int, None | float, None | float]:
"""Calculate NUCv, Navigation Uncertainty Category - Velocity (ADS-B version 1) """Calculate NUCv, Navigation Uncertainty Category - Velocity
(ADS-B version 1)
Args: Args:
msg (str): 28 hexdigits string, msg (str): 28 hexdigits string,
Returns: Returns:
int: NUCv, Navigation Uncertainty Category (velocity)
int or string: 95% Horizontal Velocity Error int or string: 95% Horizontal Velocity Error
int or string: 95% Vertical Velocity Error int or string: 95% Vertical Velocity Error
""" """
@ -295,17 +376,18 @@ def nuc_v(msg):
msgbin = common.hex2bin(msg) msgbin = common.hex2bin(msg)
NUCv = common.bin2int(msgbin[42:45]) NUCv = common.bin2int(msgbin[42:45])
index = uncertainty.NUCv.get(NUCv, None)
try: if index is not None:
HVE = uncertainty.NUCv[NUCv]["HVE"] HVE = index["HVE"]
VVE = uncertainty.NUCv[NUCv]["VVE"] VVE = index["VVE"]
except KeyError: else:
HVE, VVE = uncertainty.NA, uncertainty.NA HVE, VVE = uncertainty.NA, uncertainty.NA
return HVE, VVE return NUCv, HVE, VVE
def nic_v1(msg, NICs): def nic_v1(msg: str, NICs: int) -> tuple[int, None | float, None | float]:
"""Calculate NIC, navigation integrity category, for ADS-B version 1 """Calculate NIC, navigation integrity category, for ADS-B version 1
Args: Args:
@ -313,10 +395,12 @@ def nic_v1(msg, NICs):
NICs (int or string): NIC supplement NICs (int or string): NIC supplement
Returns: Returns:
int: NIC, Navigation Integrity Category
int or string: Horizontal Radius of Containment int or string: Horizontal Radius of Containment
int or string: Vertical Protection Limit int or string: Vertical Protection Limit
""" """
if typecode(msg) < 5 or typecode(msg) > 22: tc = typecode(msg)
if tc is None or tc < 5 or tc > 22:
raise RuntimeError( raise RuntimeError(
"%s: Not a surface position message (5<TC<8), \ "%s: Not a surface position message (5<TC<8), \
airborne position message (8<TC<19), \ airborne position message (8<TC<19), \
@ -324,22 +408,24 @@ def nic_v1(msg, NICs):
% msg % msg
) )
tc = typecode(msg)
NIC = uncertainty.TC_NICv1_lookup[tc] NIC = uncertainty.TC_NICv1_lookup[tc]
if isinstance(NIC, dict): if isinstance(NIC, dict):
NIC = NIC[NICs] NIC = NIC[NICs]
try: d_index = uncertainty.NICv1.get(NIC, None)
Rc = uncertainty.NICv1[NIC][NICs]["Rc"] Rc, VPL = uncertainty.NA, uncertainty.NA
VPL = uncertainty.NICv1[NIC][NICs]["VPL"]
except KeyError:
Rc, VPL = uncertainty.NA, uncertainty.NA
return Rc, VPL if d_index is not None:
index = d_index.get(NICs, None)
if index is not None:
Rc = index["Rc"]
VPL = index["VPL"]
return NIC, Rc, VPL
def nic_v2(msg, NICa, NICbc): def nic_v2(msg: str, NICa: int, NICbc: int) -> tuple[int, int]:
"""Calculate NIC, navigation integrity category, for ADS-B version 2 """Calculate NIC, navigation integrity category, for ADS-B version 2
Args: Args:
@ -348,9 +434,11 @@ def nic_v2(msg, NICa, NICbc):
NICbc (int or string): NIC supplement - B or C NICbc (int or string): NIC supplement - B or C
Returns: Returns:
int: NIC, Navigation Integrity Category
int or string: Horizontal Radius of Containment int or string: Horizontal Radius of Containment
""" """
if typecode(msg) < 5 or typecode(msg) > 22: tc = typecode(msg)
if tc is None or tc < 5 or tc > 22:
raise RuntimeError( raise RuntimeError(
"%s: Not a surface position message (5<TC<8), \ "%s: Not a surface position message (5<TC<8), \
airborne position message (8<TC<19), \ airborne position message (8<TC<19), \
@ -358,7 +446,6 @@ def nic_v2(msg, NICa, NICbc):
% msg % msg
) )
tc = typecode(msg)
NIC = uncertainty.TC_NICv2_lookup[tc] NIC = uncertainty.TC_NICv2_lookup[tc]
if 20 <= tc <= 22: if 20 <= tc <= 22:
@ -374,10 +461,10 @@ def nic_v2(msg, NICa, NICbc):
except KeyError: except KeyError:
Rc = uncertainty.NA Rc = uncertainty.NA
return Rc return NIC, Rc # type: ignore
def nic_s(msg): def nic_s(msg: str) -> int:
"""Obtain NIC supplement bit, TC=31 message """Obtain NIC supplement bit, TC=31 message
Args: Args:
@ -399,7 +486,7 @@ def nic_s(msg):
return nic_s return nic_s
def nic_a_c(msg): def nic_a_c(msg: str) -> tuple[int, int]:
"""Obtain NICa/c, navigation integrity category supplements a and c """Obtain NICa/c, navigation integrity category supplements a and c
Args: Args:
@ -422,7 +509,7 @@ def nic_a_c(msg):
return nic_a, nic_c return nic_a, nic_c
def nic_b(msg): def nic_b(msg: str) -> int:
"""Obtain NICb, navigation integrity category supplement-b """Obtain NICb, navigation integrity category supplement-b
Args: Args:
@ -433,7 +520,7 @@ def nic_b(msg):
""" """
tc = typecode(msg) tc = typecode(msg)
if tc < 9 or tc > 18: if tc is None or tc < 9 or tc > 18:
raise RuntimeError( raise RuntimeError(
"%s: Not a airborne position message, expecting 8<TC<19" % msg "%s: Not a airborne position message, expecting 8<TC<19" % msg
) )
@ -444,15 +531,18 @@ def nic_b(msg):
return nic_b return nic_b
def nac_p(msg): def nac_p(msg: str) -> tuple[int, int | None, int | None]:
"""Calculate NACp, Navigation Accuracy Category - Position """Calculate NACp, Navigation Accuracy Category - Position
Args: Args:
msg (str): 28 hexdigits string, TC = 29 or 31 msg (str): 28 hexdigits string, TC = 29 or 31
Returns: Returns:
int or string: 95% horizontal accuracy bounds, Estimated Position Uncertainty int: NACp, Navigation Accuracy Category (position)
int or string: 95% vertical accuracy bounds, Vertical Estimated Position Uncertainty int or string: 95% horizontal accuracy bounds,
Estimated Position Uncertainty
int or string: 95% vertical accuracy bounds,
Vertical Estimated Position Uncertainty
""" """
tc = typecode(msg) tc = typecode(msg)
@ -476,18 +566,21 @@ def nac_p(msg):
except KeyError: except KeyError:
EPU, VEPU = uncertainty.NA, uncertainty.NA EPU, VEPU = uncertainty.NA, uncertainty.NA
return EPU, VEPU return NACp, EPU, VEPU
def nac_v(msg): def nac_v(msg: str) -> tuple[int, float | None, float | None]:
"""Calculate NACv, Navigation Accuracy Category - Velocity """Calculate NACv, Navigation Accuracy Category - Velocity
Args: Args:
msg (str): 28 hexdigits string, TC = 19 msg (str): 28 hexdigits string, TC = 19
Returns: Returns:
int or string: 95% horizontal accuracy bounds for velocity, Horizontal Figure of Merit int: NACv, Navigation Accuracy Category (velocity)
int or string: 95% vertical accuracy bounds for velocity, Vertical Figure of Merit int or string: 95% horizontal accuracy bounds for velocity,
Horizontal Figure of Merit
int or string: 95% vertical accuracy bounds for velocity,
Vertical Figure of Merit
""" """
tc = typecode(msg) tc = typecode(msg)
@ -505,18 +598,23 @@ def nac_v(msg):
except KeyError: except KeyError:
HFOMr, VFOMr = uncertainty.NA, uncertainty.NA HFOMr, VFOMr = uncertainty.NA, uncertainty.NA
return HFOMr, VFOMr return NACv, HFOMr, VFOMr
def sil(msg, version): def sil(
msg: str,
version: None | int,
) -> tuple[float | None, float | None, str]:
"""Calculate SIL, Surveillance Integrity Level """Calculate SIL, Surveillance Integrity Level
Args: Args:
msg (str): 28 hexdigits string with TC = 29, 31 msg (str): 28 hexdigits string with TC = 29, 31
Returns: Returns:
int or string: Probability of exceeding Horizontal Radius of Containment RCu int or string:
int or string: Probability of exceeding Vertical Integrity Containment Region VPL Probability of exceeding Horizontal Radius of Containment RCu
int or string:
Probability of exceeding Vertical Integrity Containment Region VPL
string: SIL supplement based on per "hour" or "sample", or 'unknown' string: SIL supplement based on per "hour" or "sample", or 'unknown'
""" """
tc = typecode(msg) tc = typecode(msg)

View File

@ -2,13 +2,21 @@
Decode all-call reply messages, with downlink format 11 Decode all-call reply messages, with downlink format 11
""" """
from pyModeS import common
from __future__ import annotations
from typing import Callable, TypeVar
from .. import common
T = TypeVar("T")
F = Callable[[str], T]
def _checkdf(func): def _checkdf(func: F[T]) -> F[T]:
"""Ensure downlink format is 11.""" """Ensure downlink format is 11."""
def wrapper(msg): def wrapper(msg: str) -> T:
df = common.df(msg) df = common.df(msg)
if df != 11: if df != 11:
raise RuntimeError( raise RuntimeError(
@ -20,7 +28,7 @@ def _checkdf(func):
@_checkdf @_checkdf
def icao(msg): def icao(msg: str) -> None | str:
"""Decode transponder code (ICAO address). """Decode transponder code (ICAO address).
Args: Args:
@ -33,7 +41,7 @@ def icao(msg):
@_checkdf @_checkdf
def interrogator(msg): def interrogator(msg: str) -> str:
"""Decode interrogator identifier code. """Decode interrogator identifier code.
Args: Args:
@ -42,19 +50,20 @@ def interrogator(msg):
int: interrogator identifier code int: interrogator identifier code
""" """
# the CRC remainder contains the CL and IC field. top three bits are CL field and last four bits are IC field. # the CRC remainder contains the CL and IC field.
# the top three bits are CL field and last four bits are IC field.
remainder = common.crc(msg) remainder = common.crc(msg)
if remainder > 79: if remainder > 79:
IC = "corrupt IC" IC = "corrupt IC"
elif remainder < 16: elif remainder < 16:
IC="II"+str(remainder) IC = "II" + str(remainder)
else: else:
IC="SI"+str(remainder-16) IC = "SI" + str(remainder - 16)
return IC return IC
@_checkdf @_checkdf
def capability(msg): def capability(msg: str) -> tuple[int, None | str]:
"""Decode transponder capability. """Decode transponder capability.
Args: Args:
@ -73,9 +82,16 @@ def capability(msg):
elif ca == 5: elif ca == 5:
text = "level 2 transponder, ability to set CA to 7, airborne" text = "level 2 transponder, ability to set CA to 7, airborne"
elif ca == 6: elif ca == 6:
text = "evel 2 transponder, ability to set CA to 7, either airborne or ground" text = (
"evel 2 transponder, ability to set CA to 7, "
"either airborne or ground"
)
elif ca == 7: elif ca == 7:
text = "Downlink Request value is 0,or the Flight Status is 2, 3, 4 or 5, either airborne or on the ground" text = (
"Downlink Request value is 0, "
"or the Flight Status is 2, 3, 4 or 5, "
"either airborne or on the ground"
)
else: else:
text = None text = None

View File

@ -18,16 +18,13 @@
Common functions for Mode-S decoding Common functions for Mode-S decoding
""" """
from typing import Optional
import numpy as np import numpy as np
from pyModeS.extra import aero from ... import common
from pyModeS import common from ...extra import aero
from . import ( # noqa: F401
from pyModeS.decoder.bds import (
bds05,
bds06,
bds08,
bds09,
bds10, bds10,
bds17, bds17,
bds20, bds20,
@ -36,13 +33,15 @@ from pyModeS.decoder.bds import (
bds44, bds44,
bds45, bds45,
bds50, bds50,
bds53,
bds60, bds60,
bds62 bds61,
bds62,
) )
def is50or60(msg, spd_ref, trk_ref, alt_ref): def is50or60(
msg: str, spd_ref: float, trk_ref: float, alt_ref: float
) -> Optional[str]:
"""Use reference ground speed and trk to determine BDS50 and DBS60. """Use reference ground speed and trk to determine BDS50 and DBS60.
Args: Args:
@ -52,7 +51,8 @@ def is50or60(msg, spd_ref, trk_ref, alt_ref):
alt_ref (float): reference altitude (ADS-B altitude), ft alt_ref (float): reference altitude (ADS-B altitude), ft
Returns: Returns:
String or None: BDS version, or possible versions, or None if nothing matches. String or None: BDS version, or possible versions,
or None if nothing matches.
""" """
@ -114,15 +114,17 @@ def is50or60(msg, spd_ref, trk_ref, alt_ref):
return BDS return BDS
def infer(msg, mrar=False): def infer(msg: str, mrar: bool = False) -> Optional[str]:
"""Estimate the most likely BDS code of an message. """Estimate the most likely BDS code of an message.
Args: Args:
msg (str): 28 hexdigits string msg (str): 28 hexdigits string
mrar (bool): Also infer MRAR (BDS 44) and MHR (BDS 45). Defaults to False. mrar (bool): Also infer MRAR (BDS 44) and MHR (BDS 45).
Defaults to False.
Returns: Returns:
String or None: BDS version, or possible versions, or None if nothing matches. String or None: BDS version, or possible versions,
or None if nothing matches.
""" """
df = common.df(msg) df = common.df(msg)
@ -133,6 +135,8 @@ def infer(msg, mrar=False):
# For ADS-B / Mode-S extended squitter # For ADS-B / Mode-S extended squitter
if df == 17: if df == 17:
tc = common.typecode(msg) tc = common.typecode(msg)
if tc is None:
return None
if 1 <= tc <= 4: if 1 <= tc <= 4:
return "BDS08" # identification and category return "BDS08" # identification and category

View File

@ -1,14 +1,20 @@
# ------------------------------------------ # ------------------------------------------
# BDS 0,5 # BDS 0,5
# ADS-B TC=9-18 # ADS-B TC=9-18
# Airborn position # Airborne position
# ------------------------------------------ # ------------------------------------------
from pyModeS import common from __future__ import annotations
from datetime import datetime
from ... import common
def airborne_position(msg0, msg1, t0, t1): def airborne_position(
"""Decode airborn position from a pair of even and odd position message msg0: str, msg1: str, t0: int | datetime, t1: int | datetime
) -> None | tuple[float, float]:
"""Decode airborne position from a pair of even and odd position message
Args: Args:
msg0 (string): even message (28 hexdigits) msg0 (string): even message (28 hexdigits)
@ -59,7 +65,8 @@ def airborne_position(msg0, msg1, t0, t1):
return None return None
# compute ni, longitude index m, and longitude # compute ni, longitude index m, and longitude
if t0 > t1: # (people pass int+int or datetime+datetime)
if t0 > t1: # type: ignore
lat = lat_even lat = lat_even
nl = common.cprNL(lat) nl = common.cprNL(lat)
ni = max(common.cprNL(lat) - 0, 1) ni = max(common.cprNL(lat) - 0, 1)
@ -78,7 +85,9 @@ def airborne_position(msg0, msg1, t0, t1):
return round(lat, 5), round(lon, 5) return round(lat, 5), round(lon, 5)
def airborne_position_with_ref(msg, lat_ref, lon_ref): def airborne_position_with_ref(
msg: str, lat_ref: float, lon_ref: float
) -> tuple[float, float]:
"""Decode airborne position with only one message, """Decode airborne position with only one message,
knowing reference nearby location, such as previously calculated location, knowing reference nearby location, such as previously calculated location,
ground station, or airport location, etc. The reference position shall ground station, or airport location, etc. The reference position shall
@ -123,7 +132,7 @@ def airborne_position_with_ref(msg, lat_ref, lon_ref):
return round(lat, 5), round(lon, 5) return round(lat, 5), round(lon, 5)
def altitude(msg): def altitude(msg: str) -> None | int:
"""Decode aircraft altitude """Decode aircraft altitude
Args: Args:
@ -135,16 +144,14 @@ def altitude(msg):
tc = common.typecode(msg) tc = common.typecode(msg)
if tc < 9 or tc == 19 or tc > 22: if tc is None or tc < 9 or tc == 19 or tc > 22:
raise RuntimeError("%s: Not a airborn position message" % msg) raise RuntimeError("%s: Not an airborne position message" % msg)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
altbin = mb[8:20] altbin = mb[8:20]
if tc < 19: if tc < 19:
altcode = altbin[0:6] + "0" + altbin[6:] altcode = altbin[0:6] + "0" + altbin[6:]
alt = common.altitude(altcode) return common.altitude(altcode)
else: else:
alt = common.bin2int(altbin) * 3.28084 return common.bin2int(altbin) * 3.28084 # type: ignore
return alt

View File

@ -4,10 +4,22 @@
# Surface movement # Surface movement
# ------------------------------------------ # ------------------------------------------
from pyModeS import common from __future__ import annotations
from datetime import datetime
from ... import common
def surface_position(msg0, msg1, t0, t1, lat_ref, lon_ref): def surface_position(
msg0: str,
msg1: str,
t0: int | datetime,
t1: int | datetime,
lat_ref: float,
lon_ref: float,
) -> None | tuple[float, float]:
"""Decode surface position from a pair of even and odd position message, """Decode surface position from a pair of even and odd position message,
the lat/lon of receiver must be provided to yield the correct solution. the lat/lon of receiver must be provided to yield the correct solution.
@ -55,7 +67,8 @@ def surface_position(msg0, msg1, t0, t1, lat_ref, lon_ref):
return None return None
# compute ni, longitude index m, and longitude # compute ni, longitude index m, and longitude
if t0 > t1: # (people pass int+int or datetime+datetime)
if t0 > t1: # type: ignore
lat = lat_even lat = lat_even
nl = common.cprNL(lat_even) nl = common.cprNL(lat_even)
ni = max(common.cprNL(lat_even) - 0, 1) ni = max(common.cprNL(lat_even) - 0, 1)
@ -72,17 +85,19 @@ def surface_position(msg0, msg1, t0, t1, lat_ref, lon_ref):
lons = [lon, lon + 90, lon + 180, lon + 270] lons = [lon, lon + 90, lon + 180, lon + 270]
# make sure lons are between -180 and 180 # make sure lons are between -180 and 180
lons = [(l + 180) % 360 - 180 for l in lons] lons = [(lon + 180) % 360 - 180 for lon in lons]
# the closest solution to receiver is the correct one # the closest solution to receiver is the correct one
dls = [abs(lon_ref - l) for l in lons] dls = [abs(lon_ref - lon) for lon in lons]
imin = min(range(4), key=dls.__getitem__) imin = min(range(4), key=dls.__getitem__)
lon = lons[imin] lon = lons[imin]
return round(lat, 5), round(lon, 5) return round(lat, 5), round(lon, 5)
def surface_position_with_ref(msg, lat_ref, lon_ref): def surface_position_with_ref(
msg: str, lat_ref: float, lon_ref: float
) -> tuple[float, float]:
"""Decode surface position with only one message, """Decode surface position with only one message,
knowing reference nearby location, such as previously calculated location, knowing reference nearby location, such as previously calculated location,
ground station, or airport location, etc. The reference position shall ground station, or airport location, etc. The reference position shall
@ -127,16 +142,19 @@ def surface_position_with_ref(msg, lat_ref, lon_ref):
return round(lat, 5), round(lon, 5) return round(lat, 5), round(lon, 5)
def surface_velocity(msg, source=False): def surface_velocity(
msg: str, source: bool = False
) -> tuple[None | float, float, int, str]:
"""Decode surface velocity from a surface position message """Decode surface velocity from a surface position message
Args: Args:
msg (str): 28 hexdigits string msg (str): 28 hexdigits string
source (boolean): Include direction and vertical rate sources in return. Default to False. source (boolean): Include direction and vertical rate sources in return.
If set to True, the function will return six values instead of four. Default to False.
If set to True, the function will return six value instead of four.
Returns: Returns:
int, float, int, string, [string], [string]: Four or six parameters, including: int, float, int, string, [string], [string]:
- Speed (kt) - Speed (kt)
- Angle (degree), ground track - Angle (degree), ground track
- Vertical rate, always 0 - Vertical rate, always 0
@ -145,7 +163,8 @@ def surface_velocity(msg, source=False):
- [Optional] Vertical rate source (None) - [Optional] Vertical rate source (None)
""" """
if common.typecode(msg) < 5 or common.typecode(msg) > 8: tc = common.typecode(msg)
if tc is None or tc < 5 or tc > 8:
raise RuntimeError("%s: Not a surface message, expecting 5<TC<8" % msg) raise RuntimeError("%s: Not a surface message, expecting 5<TC<8" % msg)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
@ -164,16 +183,17 @@ def surface_velocity(msg, source=False):
if mov == 0 or mov > 124: if mov == 0 or mov > 124:
spd = None spd = None
elif mov == 1: elif mov == 1:
spd = 0 spd = 0.0
elif mov == 124: elif mov == 124:
spd = 175 spd = 175.0
else: else:
mov_lb = [2, 9, 13, 39, 94, 109, 124] mov_lb = [2, 9, 13, 39, 94, 109, 124]
kts_lb = [0.125, 1, 2, 15, 70, 100, 175] kts_lb: list[float] = [0.125, 1, 2, 15, 70, 100, 175]
step = [0.125, 0.25, 0.5, 1, 2, 5] step: list[float] = [0.125, 0.25, 0.5, 1, 2, 5]
i = next(m[0] for m in enumerate(mov_lb) if m[1] > mov) i = next(m[0] for m in enumerate(mov_lb) if m[1] > mov)
spd = kts_lb[i - 1] + (mov - mov_lb[i - 1]) * step[i - 1] spd = kts_lb[i - 1] + (mov - mov_lb[i - 1]) * step[i - 1]
if source: if source:
return spd, trk, 0, "GS", "TRUE_NORTH", None return spd, trk, 0, "GS", "TRUE_NORTH", None # type: ignore
else: else:
return spd, trk, 0, "GS" return spd, trk, 0, "GS"

View File

@ -4,10 +4,10 @@
# Aircraft identification and category # Aircraft identification and category
# ------------------------------------------ # ------------------------------------------
from pyModeS import common from ... import common
def category(msg): def category(msg: str) -> int:
"""Aircraft category number """Aircraft category number
Args: Args:
@ -17,7 +17,8 @@ def category(msg):
int: category number int: category number
""" """
if common.typecode(msg) < 1 or common.typecode(msg) > 4: tc = common.typecode(msg)
if tc is None or tc < 1 or tc > 4:
raise RuntimeError("%s: Not a identification message" % msg) raise RuntimeError("%s: Not a identification message" % msg)
msgbin = common.hex2bin(msg) msgbin = common.hex2bin(msg)
@ -25,7 +26,7 @@ def category(msg):
return common.bin2int(mebin[5:8]) return common.bin2int(mebin[5:8])
def callsign(msg): def callsign(msg: str) -> str:
"""Aircraft callsign """Aircraft callsign
Args: Args:
@ -34,8 +35,9 @@ def callsign(msg):
Returns: Returns:
string: callsign string: callsign
""" """
tc = common.typecode(msg)
if common.typecode(msg) < 1 or common.typecode(msg) > 4: if tc is None or tc < 1 or tc > 4:
raise RuntimeError("%s: Not a identification message" % msg) raise RuntimeError("%s: Not a identification message" % msg)
chars = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######" chars = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"

View File

@ -1,25 +1,29 @@
# ------------------------------------------ # ------------------------------------------
# BDS 0,9 # BDS 0,9
# ADS-B TC=19 # ADS-B TC=19
# Aircraft Airborn velocity # Aircraft Airborne velocity
# ------------------------------------------ # ------------------------------------------
from pyModeS import common from __future__ import annotations
import math import math
from ... import common
def airborne_velocity(msg, source=False):
def airborne_velocity(
msg: str, source: bool = False
) -> None | tuple[None | int, None | float, None | int, str]:
"""Decode airborne velocity. """Decode airborne velocity.
Args: Args:
msg (str): 28 hexdigits string msg (str): 28 hexdigits string
source (boolean): Include direction and vertical rate sources in return. Default to False. source (boolean): Include direction and vertical rate sources in return.
If set to True, the function will return six values instead of four. Default to False.
If set to True, the function will return six value instead of four.
Returns: Returns:
int, float, int, string, [string], [string]: Four or six parameters, including: int, float, int, string, [string], [string]:
- Speed (kt) - Speed (kt)
- Angle (degree), either ground track or heading - Angle (degree), either ground track or heading
- Vertical rate (ft/min) - Vertical rate (ft/min)
@ -29,12 +33,20 @@ def airborne_velocity(msg, source=False):
""" """
if common.typecode(msg) != 19: if common.typecode(msg) != 19:
raise RuntimeError("%s: Not a airborne velocity message, expecting TC=19" % msg) raise RuntimeError(
"%s: Not a airborne velocity message, expecting TC=19" % msg
)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:8]) subtype = common.bin2int(mb[5:8])
if common.bin2int(mb[14:24]) == 0 or common.bin2int(mb[25:35]) == 0:
return None
trk_or_hdg: None | float
spd: None | float
if subtype in (1, 2): if subtype in (1, 2):
v_ew = common.bin2int(mb[14:24]) v_ew = common.bin2int(mb[14:24])
@ -80,11 +92,9 @@ def airborne_velocity(msg, source=False):
trk_or_hdg = hdg trk_or_hdg = hdg
spd = common.bin2int(mb[25:35]) spd = common.bin2int(mb[25:35])
if subtype == 4: # Supersonic
spd *= 4
spd = None if spd == 0 else spd - 1 spd = None if spd == 0 else spd - 1
if subtype == 4 and spd is not None: # Supersonic
spd *= 4
if mb[24] == "0": if mb[24] == "0":
spd_type = "IAS" spd_type = "IAS"
@ -99,13 +109,20 @@ def airborne_velocity(msg, source=False):
vs = None if vr == 0 else int(vr_sign * (vr - 1) * 64) vs = None if vr == 0 else int(vr_sign * (vr - 1) * 64)
if source: if source:
return spd, trk_or_hdg, vs, spd_type, dir_type, vr_source return ( # type: ignore
spd,
trk_or_hdg,
vs,
spd_type,
dir_type,
vr_source,
)
else: else:
return spd, trk_or_hdg, vs, spd_type return spd, trk_or_hdg, vs, spd_type
def altitude_diff(msg): def altitude_diff(msg: str) -> None | float:
"""Decode the difference between GNSS and barometric altitude. """Decode the differece between GNSS and barometric altitude.
Args: Args:
msg (str): 28 hexdigits string, TC=19 msg (str): 28 hexdigits string, TC=19
@ -117,8 +134,10 @@ def altitude_diff(msg):
""" """
tc = common.typecode(msg) tc = common.typecode(msg)
if tc != 19: if tc is None or tc != 19:
raise RuntimeError("%s: Not a airborne velocity message, expecting TC=19" % msg) raise RuntimeError(
"%s: Not a airborne velocity message, expecting TC=19" % msg
)
msgbin = common.hex2bin(msg) msgbin = common.hex2bin(msg)
sign = -1 if int(msgbin[80]) else 1 sign = -1 if int(msgbin[80]) else 1

View File

@ -3,10 +3,11 @@
# Data link capability report # Data link capability report
# ------------------------------------------ # ------------------------------------------
from pyModeS import common
from ... import common
def is10(msg): def is10(msg: str) -> bool:
"""Check if a message is likely to be BDS code 1,0 """Check if a message is likely to be BDS code 1,0
Args: Args:
@ -38,7 +39,7 @@ def is10(msg):
return True return True
def ovc10(msg): def ovc10(msg: str) -> int:
"""Return the overlay control capability """Return the overlay control capability
Args: Args:

View File

@ -3,10 +3,12 @@
# Common usage GICB capability report # Common usage GICB capability report
# ------------------------------------------ # ------------------------------------------
from pyModeS import common from typing import List
from ... import common
def is17(msg): def is17(msg: str) -> bool:
"""Check if a message is likely to be BDS code 1,7 """Check if a message is likely to be BDS code 1,7
Args: Args:
@ -38,7 +40,7 @@ def is17(msg):
return True return True
def cap17(msg): def cap17(msg: str) -> List[str]:
"""Extract capacities from BDS 1,7 message """Extract capacities from BDS 1,7 message
Args: Args:

View File

@ -3,10 +3,10 @@
# Aircraft identification # Aircraft identification
# ------------------------------------------ # ------------------------------------------
from pyModeS import common from ... import common
def is20(msg): def is20(msg: str) -> bool:
"""Check if a message is likely to be BDS code 2,0 """Check if a message is likely to be BDS code 2,0
Args: Args:
@ -34,7 +34,7 @@ def is20(msg):
return True return True
def cs20(msg): def cs20(msg: str) -> str:
"""Aircraft callsign """Aircraft callsign
Args: Args:

View File

@ -3,11 +3,11 @@
# ACAS active resolution advisory # ACAS active resolution advisory
# ------------------------------------------ # ------------------------------------------
from pyModeS import common from ... import common
def is30(msg): def is30(msg: str) -> bool:
"""Check if a message is likely to be BDS code 2,0 """Check if a message is likely to be BDS code 3,0
Args: Args:
msg (str): 28 hexdigits string msg (str): 28 hexdigits string

View File

@ -4,10 +4,12 @@
# ------------------------------------------ # ------------------------------------------
import warnings import warnings
from pyModeS import common from typing import Optional
from ... import common
def is40(msg): def is40(msg: str) -> bool:
"""Check if a message is likely to be BDS code 4,0 """Check if a message is likely to be BDS code 4,0
Args: Args:
@ -50,7 +52,7 @@ def is40(msg):
return True return True
def selalt40mcp(msg): def selalt40mcp(msg: str) -> Optional[int]:
"""Selected altitude, MCP/FCU """Selected altitude, MCP/FCU
Args: Args:
@ -68,7 +70,7 @@ def selalt40mcp(msg):
return alt return alt
def selalt40fms(msg): def selalt40fms(msg: str) -> Optional[int]:
"""Selected altitude, FMS """Selected altitude, FMS
Args: Args:
@ -86,7 +88,7 @@ def selalt40fms(msg):
return alt return alt
def p40baro(msg): def p40baro(msg: str) -> Optional[float]:
"""Barometric pressure setting """Barometric pressure setting
Args: Args:
@ -104,17 +106,19 @@ def p40baro(msg):
return p return p
def alt40mcp(msg): def alt40mcp(msg: str) -> Optional[int]:
warnings.warn( warnings.warn(
"alt40mcp() has been renamed to selalt40mcp(). It will be removed in the future.", """alt40mcp() has been renamed to selalt40mcp().
It will be removed in the future.""",
DeprecationWarning, DeprecationWarning,
) )
return selalt40mcp(msg) return selalt40mcp(msg)
def alt40fms(msg): def alt40fms(msg: str) -> Optional[int]:
warnings.warn( warnings.warn(
"alt40fms() has been renamed to selalt40fms(). It will be removed in the future.", """alt40fms() has been renamed to selalt40fms().
It will be removed in the future.""",
DeprecationWarning, DeprecationWarning,
) )
return selalt40fms(msg) return selalt40fms(msg)

View File

@ -3,10 +3,12 @@
# Meteorological routine air report # Meteorological routine air report
# ------------------------------------------ # ------------------------------------------
from pyModeS import common from typing import Optional, Tuple
from ... import common
def is44(msg): def is44(msg: str) -> bool:
"""Check if a message is likely to be BDS code 4,4. """Check if a message is likely to be BDS code 4,4.
Meteorological routine air report Meteorological routine air report
@ -51,7 +53,7 @@ def is44(msg):
return True return True
def wind44(msg): def wind44(msg: str) -> Tuple[Optional[int], Optional[float]]:
"""Wind speed and direction. """Wind speed and direction.
Args: Args:
@ -73,7 +75,7 @@ def wind44(msg):
return round(speed, 0), round(direction, 1) return round(speed, 0), round(direction, 1)
def temp44(msg): def temp44(msg: str) -> Tuple[float, float]:
"""Static air temperature. """Static air temperature.
Args: Args:
@ -102,7 +104,7 @@ def temp44(msg):
return temp, temp_alternative return temp, temp_alternative
def p44(msg): def p44(msg: str) -> Optional[int]:
"""Static pressure. """Static pressure.
Args: Args:
@ -122,7 +124,7 @@ def p44(msg):
return p return p
def hum44(msg): def hum44(msg: str) -> Optional[float]:
"""humidity """humidity
Args: Args:
@ -141,7 +143,7 @@ def hum44(msg):
return round(hm, 1) return round(hm, 1)
def turb44(msg): def turb44(msg: str) -> Optional[int]:
"""Turbulence. """Turbulence.
Args: Args:

View File

@ -3,10 +3,12 @@
# Meteorological hazard report # Meteorological hazard report
# ------------------------------------------ # ------------------------------------------
from pyModeS import common from typing import Optional
from ... import common
def is45(msg): def is45(msg: str) -> bool:
"""Check if a message is likely to be BDS code 4,5. """Check if a message is likely to be BDS code 4,5.
Meteorological hazard report Meteorological hazard report
@ -60,7 +62,7 @@ def is45(msg):
return True return True
def turb45(msg): def turb45(msg: str) -> Optional[int]:
"""Turbulence. """Turbulence.
Args: Args:
@ -78,7 +80,7 @@ def turb45(msg):
return turb return turb
def ws45(msg): def ws45(msg: str) -> Optional[int]:
"""Wind shear. """Wind shear.
Args: Args:
@ -96,7 +98,7 @@ def ws45(msg):
return ws return ws
def mb45(msg): def mb45(msg: str) -> Optional[int]:
"""Microburst. """Microburst.
Args: Args:
@ -114,7 +116,7 @@ def mb45(msg):
return mb return mb
def ic45(msg): def ic45(msg: str) -> Optional[int]:
"""Icing. """Icing.
Args: Args:
@ -132,7 +134,7 @@ def ic45(msg):
return ic return ic
def wv45(msg): def wv45(msg: str) -> Optional[int]:
"""Wake vortex. """Wake vortex.
Args: Args:
@ -150,7 +152,7 @@ def wv45(msg):
return ws return ws
def temp45(msg): def temp45(msg: str) -> Optional[float]:
"""Static air temperature. """Static air temperature.
Args: Args:
@ -174,7 +176,7 @@ def temp45(msg):
return temp return temp
def p45(msg): def p45(msg: str) -> Optional[int]:
"""Average static pressure. """Average static pressure.
Args: Args:
@ -191,7 +193,7 @@ def p45(msg):
return p return p
def rh45(msg): def rh45(msg: str) -> Optional[int]:
"""Radio height. """Radio height.
Args: Args:

View File

@ -3,10 +3,12 @@
# Track and turn report # Track and turn report
# ------------------------------------------ # ------------------------------------------
from pyModeS import common from typing import Optional
from ... import common
def is50(msg): def is50(msg: str) -> bool:
"""Check if a message is likely to be BDS code 5,0 """Check if a message is likely to be BDS code 5,0
(Track and turn report) (Track and turn report)
@ -57,7 +59,7 @@ def is50(msg):
return True return True
def roll50(msg): def roll50(msg: str) -> Optional[float]:
"""Roll angle, BDS 5,0 message """Roll angle, BDS 5,0 message
Args: Args:
@ -82,7 +84,7 @@ def roll50(msg):
return round(angle, 1) return round(angle, 1)
def trk50(msg): def trk50(msg: str) -> Optional[float]:
"""True track angle, BDS 5,0 message """True track angle, BDS 5,0 message
Args: Args:
@ -111,7 +113,7 @@ def trk50(msg):
return round(trk, 3) return round(trk, 3)
def gs50(msg): def gs50(msg: str) -> Optional[float]:
"""Ground speed, BDS 5,0 message """Ground speed, BDS 5,0 message
Args: Args:
@ -129,7 +131,7 @@ def gs50(msg):
return spd return spd
def rtrk50(msg): def rtrk50(msg: str) -> Optional[float]:
"""Track angle rate, BDS 5,0 message """Track angle rate, BDS 5,0 message
Args: Args:
@ -155,7 +157,7 @@ def rtrk50(msg):
return round(angle, 3) return round(angle, 3)
def tas50(msg): def tas50(msg: str) -> Optional[float]:
"""Aircraft true airspeed, BDS 5,0 message """Aircraft true airspeed, BDS 5,0 message
Args: Args:

View File

@ -3,10 +3,12 @@
# Air-referenced state vector # Air-referenced state vector
# ------------------------------------------ # ------------------------------------------
from pyModeS import common from typing import Optional
from ... import common
def is53(msg): def is53(msg: str) -> bool:
"""Check if a message is likely to be BDS code 5,3 """Check if a message is likely to be BDS code 5,3
(Air-referenced state vector) (Air-referenced state vector)
@ -58,7 +60,7 @@ def is53(msg):
return True return True
def hdg53(msg): def hdg53(msg: str) -> Optional[float]:
"""Magnetic heading, BDS 5,3 message """Magnetic heading, BDS 5,3 message
Args: Args:
@ -87,7 +89,7 @@ def hdg53(msg):
return round(hdg, 3) return round(hdg, 3)
def ias53(msg): def ias53(msg: str) -> Optional[float]:
"""Indicated airspeed, DBS 5,3 message """Indicated airspeed, DBS 5,3 message
Args: Args:
@ -105,7 +107,7 @@ def ias53(msg):
return ias return ias
def mach53(msg): def mach53(msg: str) -> Optional[float]:
"""MACH number, DBS 5,3 message """MACH number, DBS 5,3 message
Args: Args:
@ -123,7 +125,7 @@ def mach53(msg):
return round(mach, 3) return round(mach, 3)
def tas53(msg): def tas53(msg: str) -> Optional[float]:
"""Aircraft true airspeed, BDS 5,3 message """Aircraft true airspeed, BDS 5,3 message
Args: Args:
@ -141,7 +143,7 @@ def tas53(msg):
return round(tas, 1) return round(tas, 1)
def vr53(msg): def vr53(msg: str) -> Optional[int]:
"""Vertical rate """Vertical rate
Args: Args:

View File

@ -3,11 +3,13 @@
# Heading and speed report # Heading and speed report
# ------------------------------------------ # ------------------------------------------
from pyModeS import common from typing import Optional
from pyModeS.extra import aero
from ... import common
from ...extra import aero
def is60(msg): def is60(msg: str) -> bool:
"""Check if a message is likely to be BDS code 6,0 """Check if a message is likely to be BDS code 6,0
Args: Args:
@ -66,7 +68,7 @@ def is60(msg):
return True return True
def hdg60(msg): def hdg60(msg: str) -> Optional[float]:
"""Megnetic heading of aircraft """Megnetic heading of aircraft
Args: Args:
@ -92,10 +94,10 @@ def hdg60(msg):
if hdg < 0: if hdg < 0:
hdg = 360 + hdg hdg = 360 + hdg
return round(hdg, 3) return hdg
def ias60(msg): def ias60(msg: str) -> Optional[float]:
"""Indicated airspeed """Indicated airspeed
Args: Args:
@ -113,7 +115,7 @@ def ias60(msg):
return ias return ias
def mach60(msg): def mach60(msg: str) -> Optional[float]:
"""Aircraft MACH number """Aircraft MACH number
Args: Args:
@ -128,10 +130,10 @@ def mach60(msg):
return None return None
mach = common.bin2int(d[24:34]) * 2.048 / 512.0 mach = common.bin2int(d[24:34]) * 2.048 / 512.0
return round(mach, 3) return mach
def vr60baro(msg): def vr60baro(msg: str) -> Optional[int]:
"""Vertical rate from barometric measurement, this value may be very noisy. """Vertical rate from barometric measurement, this value may be very noisy.
Args: Args:
@ -157,8 +159,8 @@ def vr60baro(msg):
return roc return roc
def vr60ins(msg): def vr60ins(msg: str) -> Optional[int]:
"""Vertical rate measured by onboard equipment (IRS, AHRS) """Vertical rate measurd by onbard equiments (IRS, AHRS)
Args: Args:
msg (str): 28 hexdigits string msg (str): 28 hexdigits string

View File

@ -4,7 +4,7 @@
# Aircraft Airborne status # Aircraft Airborne status
# ------------------------------------------ # ------------------------------------------
from pyModeS import common from ... import common
def is_emergency(msg: str) -> bool: def is_emergency(msg: str) -> bool:
@ -18,7 +18,9 @@ def is_emergency(msg: str) -> bool:
:return: if the aircraft has declared an emergency :return: if the aircraft has declared an emergency
""" """
if common.typecode(msg) != 28: if common.typecode(msg) != 28:
raise RuntimeError("%s: Not an airborne status message, expecting TC=28" % msg) raise RuntimeError(
"%s: Not an airborne status message, expecting TC=28" % msg
)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:8]) subtype = common.bin2int(mb[5:8])
@ -72,7 +74,9 @@ def emergency_squawk(msg: str) -> str:
:return: aircraft squawk code :return: aircraft squawk code
""" """
if common.typecode(msg) != 28: if common.typecode(msg) != 28:
raise RuntimeError("%s: Not an airborne status message, expecting TC=28" % msg) raise RuntimeError(
"%s: Not an airborne status message, expecting TC=28" % msg
)
msgbin = common.hex2bin(msg) msgbin = common.hex2bin(msg)

View File

@ -5,10 +5,11 @@
# ------------------------------------------ # ------------------------------------------
from __future__ import annotations from __future__ import annotations
from pyModeS import common
from ... import common
def selected_altitude(msg): def selected_altitude(msg: str) -> tuple[None | float, str]:
"""Decode selected altitude. """Decode selected altitude.
Args: Args:
@ -31,18 +32,20 @@ def selected_altitude(msg):
if subtype == 0: if subtype == 0:
raise RuntimeError( raise RuntimeError(
"%s: ADS-B version 1 target state and status message does not contain selected altitude, use target altitude instead" "%s: ADS-B version 1 target state and status message does not"
% msg " contain selected altitude, use target altitude instead" % msg
) )
alt = common.bin2int(mb[9:20]) alt = common.bin2int(mb[9:20])
alt = None if alt == 0 else (alt - 1) * 32 if alt == 0:
return None, "N/A"
alt = (alt - 1) * 32
alt_source = "MCP/FCU" if int(mb[8]) == 0 else "FMS" alt_source = "MCP/FCU" if int(mb[8]) == 0 else "FMS"
return alt, alt_source return alt, alt_source
def target_altitude(msg): def target_altitude(msg: str) -> tuple[None | int, str, str]:
"""Decode target altitude. """Decode target altitude.
Args: Args:
@ -51,7 +54,8 @@ def target_altitude(msg):
Returns: Returns:
int: Target altitude (ft) int: Target altitude (ft)
string: Source ('MCP/FCU', 'Holding mode' or 'FMS/RNAV') string: Source ('MCP/FCU', 'Holding mode' or 'FMS/RNAV')
string: Altitude reference, either pressure altitude or barometric corrected altitude ('FL' or 'MSL') string: Altitude reference, either pressure altitude or barometric
corrected altitude ('FL' or 'MSL')
""" """
@ -66,13 +70,13 @@ def target_altitude(msg):
if subtype == 1: if subtype == 1:
raise RuntimeError( raise RuntimeError(
"%s: ADS-B version 2 target state and status message does not contain target altitude, use selected altitude instead" "%s: ADS-B version 2 target state and status message does not"
% msg " contain target altitude, use selected altitude instead" % msg
) )
alt_avail = common.bin2int(mb[7:9]) alt_avail = common.bin2int(mb[7:9])
if alt_avail == 0: if alt_avail == 0:
return None return None, "N/A", ""
elif alt_avail == 1: elif alt_avail == 1:
alt_source = "MCP/FCU" alt_source = "MCP/FCU"
elif alt_avail == 2: elif alt_avail == 2:
@ -87,7 +91,7 @@ def target_altitude(msg):
return alt, alt_source, alt_ref return alt, alt_source, alt_ref
def vertical_mode(msg): def vertical_mode(msg: str) -> None | int:
"""Decode vertical mode. """Decode vertical mode.
Value Meaning Value Meaning
@ -115,8 +119,8 @@ def vertical_mode(msg):
if subtype == 1: if subtype == 1:
raise RuntimeError( raise RuntimeError(
"%s: ADS-B version 2 target state and status message does not contain vertical mode, use vnav mode instead" "%s: ADS-B version 2 target state and status message does not"
% msg " contain vertical mode, use vnav mode instead" % msg
) )
vertical_mode = common.bin2int(mb[13:15]) vertical_mode = common.bin2int(mb[13:15])
@ -126,7 +130,7 @@ def vertical_mode(msg):
return vertical_mode return vertical_mode
def horizontal_mode(msg): def horizontal_mode(msg: str) -> None | int:
"""Decode horizontal mode. """Decode horizontal mode.
Value Meaning Value Meaning
@ -154,8 +158,8 @@ def horizontal_mode(msg):
if subtype == 1: if subtype == 1:
raise RuntimeError( raise RuntimeError(
"%s: ADS-B version 2 target state and status message does not contain horizontal mode, use lnav mode instead" "%s: ADS-B version 2 target state and status message does not "
% msg "contain horizontal mode, use lnav mode instead" % msg
) )
horizontal_mode = common.bin2int(mb[25:27]) horizontal_mode = common.bin2int(mb[25:27])
@ -165,7 +169,7 @@ def horizontal_mode(msg):
return horizontal_mode return horizontal_mode
def selected_heading(msg): def selected_heading(msg: str) -> None | float:
"""Decode selected heading. """Decode selected heading.
Args: Args:
@ -187,12 +191,12 @@ def selected_heading(msg):
if subtype == 0: if subtype == 0:
raise RuntimeError( raise RuntimeError(
"%s: ADS-B version 1 target state and status message does not contain selected heading, use target angle instead" "%s: ADS-B version 1 target state and status message does not "
% msg "contain selected heading, use target angle instead" % msg
) )
if int(mb[29]) == 0: if int(mb[29]) == 0:
hdg = None return None
else: else:
hdg_sign = int(mb[30]) hdg_sign = int(mb[30])
hdg = (hdg_sign + 1) * common.bin2int(mb[31:39]) * (180 / 256) hdg = (hdg_sign + 1) * common.bin2int(mb[31:39]) * (180 / 256)
@ -201,7 +205,7 @@ def selected_heading(msg):
return hdg return hdg
def target_angle(msg): def target_angle(msg: str) -> tuple[None | int, str, str]:
"""Decode target heading/track angle. """Decode target heading/track angle.
Args: Args:
@ -225,13 +229,13 @@ def target_angle(msg):
if subtype == 1: if subtype == 1:
raise RuntimeError( raise RuntimeError(
"%s: ADS-B version 2 target state and status message does not contain target angle, use selected heading instead" "%s: ADS-B version 2 target state and status message does not "
% msg "contain target angle, use selected heading instead" % msg
) )
angle_avail = common.bin2int(mb[25:27]) angle_avail = common.bin2int(mb[25:27])
if angle_avail == 0: if angle_avail == 0:
angle = None return None, "", "N/A"
else: else:
angle = common.bin2int(mb[27:36]) angle = common.bin2int(mb[27:36])
@ -247,7 +251,7 @@ def target_angle(msg):
return angle, angle_type, angle_source return angle, angle_type, angle_source
def baro_pressure_setting(msg): def baro_pressure_setting(msg: str) -> None | float:
"""Decode barometric pressure setting. """Decode barometric pressure setting.
Args: Args:
@ -269,14 +273,15 @@ def baro_pressure_setting(msg):
if subtype == 0: if subtype == 0:
raise RuntimeError( raise RuntimeError(
"%s: ADS-B version 1 target state and status message does not contain barometric pressure setting" "%s: ADS-B version 1 target state and status message does not "
% msg "contain barometric pressure setting" % msg
) )
baro = common.bin2int(mb[20:29]) baro = common.bin2int(mb[20:29])
baro = None if baro == 0 else round(800 + (baro - 1) * 0.8, 1) if baro == 0:
return None
return baro return 800 + (baro - 1) * 0.8
def autopilot(msg) -> None | bool: def autopilot(msg) -> None | bool:
@ -301,8 +306,8 @@ def autopilot(msg) -> None | bool:
if subtype == 0: if subtype == 0:
raise RuntimeError( raise RuntimeError(
"%s: ADS-B version 1 target state and status message does not contain autopilot engagement" "%s: ADS-B version 1 target state and status message does not "
% msg "contain autopilot engagement" % msg
) )
if int(mb[46]) == 0: if int(mb[46]) == 0:
@ -335,8 +340,8 @@ def vnav_mode(msg) -> None | bool:
if subtype == 0: if subtype == 0:
raise RuntimeError( raise RuntimeError(
"%s: ADS-B version 1 target state and status message does not contain vnav mode, use vertical mode instead" "%s: ADS-B version 1 target state and status message does not "
% msg "contain vnav mode, use vertical mode instead" % msg
) )
if int(mb[46]) == 0: if int(mb[46]) == 0:
@ -369,8 +374,8 @@ def altitude_hold_mode(msg) -> None | bool:
if subtype == 0: if subtype == 0:
raise RuntimeError( raise RuntimeError(
"%s: ADS-B version 1 target state and status message does not contain altitude hold mode" "%s: ADS-B version 1 target state and status message does not "
% msg "contain altitude hold mode" % msg
) )
if int(mb[46]) == 0: if int(mb[46]) == 0:
@ -403,8 +408,8 @@ def approach_mode(msg) -> None | bool:
if subtype == 0: if subtype == 0:
raise RuntimeError( raise RuntimeError(
"%s: ADS-B version 1 target state and status message does not contain approach mode" "%s: ADS-B version 1 target state and status message does not "
% msg "contain approach mode" % msg
) )
if int(mb[46]) == 0: if int(mb[46]) == 0:
@ -437,8 +442,8 @@ def lnav_mode(msg) -> None | bool:
if subtype == 0: if subtype == 0:
raise RuntimeError( raise RuntimeError(
"%s: ADS-B version 1 target state and status message does not contain lnav mode, use horizontal mode instead" "%s: ADS-B version 1 target state and status message does not "
% msg "contain lnav mode, use horizontal mode instead" % msg
) )
if int(mb[46]) == 0: if int(mb[46]) == 0:
@ -499,8 +504,8 @@ def tcas_ra(msg) -> bool:
if subtype == 1: if subtype == 1:
raise RuntimeError( raise RuntimeError(
"%s: ADS-B version 2 target state and status message does not contain TCAS/ACAS RA" "%s: ADS-B version 2 target state and status message does not "
% msg "contain TCAS/ACAS RA" % msg
) )
tcas_ra = True if int(mb[52]) == 1 else False tcas_ra = True if int(mb[52]) == 1 else False
@ -541,8 +546,8 @@ def emergency_status(msg) -> int:
if subtype == 1: if subtype == 1:
raise RuntimeError( raise RuntimeError(
"%s: ADS-B version 2 target state and status message does not contain emergency status" "%s: ADS-B version 2 target state and status message does not "
% msg "contain emergency status" % msg
) )
return common.bin2int(mb[53:56]) return common.bin2int(mb[53:56])

View File

@ -23,18 +23,66 @@ MRAR and MHR
""" """
# ELS - elementary surveillance # ELS - elementary surveillance
from pyModeS.decoder.bds.bds10 import * from .bds.bds10 import is10, ovc10
from pyModeS.decoder.bds.bds17 import * from .bds.bds17 import is17, cap17
from pyModeS.decoder.bds.bds20 import * from .bds.bds20 import is20, cs20
from pyModeS.decoder.bds.bds30 import * from .bds.bds30 import is30
# ELS - enhanced surveillance # ELS - enhanced surveillance
from pyModeS.decoder.bds.bds40 import * from .bds.bds40 import (
from pyModeS.decoder.bds.bds50 import * is40,
from pyModeS.decoder.bds.bds60 import * selalt40fms,
selalt40mcp,
p40baro,
alt40fms,
alt40mcp,
)
from .bds.bds50 import is50, roll50, trk50, gs50, rtrk50, tas50
from .bds.bds60 import is60, hdg60, ias60, mach60, vr60baro, vr60ins
# MRAR and MHR # MRAR and MHR
from pyModeS.decoder.bds.bds44 import * from .bds.bds44 import is44, wind44, temp44, p44, hum44, turb44
from pyModeS.decoder.bds.bds45 import * from .bds.bds45 import is45, turb45, ws45, mb45, ic45, wv45, temp45, p45, rh45
from pyModeS.py_common import fs, dr, um __all__ = [
"is10",
"ovc10",
"is17",
"cap17",
"is20",
"cs20",
"is30",
"is40",
"selalt40fms",
"selalt40mcp",
"p40baro",
"alt40fms",
"alt40mcp",
"is50",
"roll50",
"trk50",
"gs50",
"rtrk50",
"tas50",
"is60",
"hdg60",
"ias60",
"mach60",
"vr60baro",
"vr60ins",
"is44",
"wind44",
"temp44",
"p44",
"hum44",
"turb44",
"is45",
"turb45",
"ws45",
"mb45",
"ic45",
"wv45",
"temp45",
"p45",
"rh45",
]

View File

@ -11,20 +11,51 @@ The EHS wrapper imports all functions from the following modules:
import warnings import warnings
from pyModeS.decoder.bds.bds40 import * from .bds.bds40 import (
from pyModeS.decoder.bds.bds50 import * is40,
from pyModeS.decoder.bds.bds60 import * selalt40fms,
from pyModeS.decoder.bds import infer selalt40mcp,
p40baro,
alt40fms,
alt40mcp,
)
from .bds.bds50 import is50, roll50, trk50, gs50, rtrk50, tas50
from .bds.bds60 import is60, hdg60, ias60, mach60, vr60baro, vr60ins
from .bds import infer
__all__ = [
"is40",
"selalt40fms",
"selalt40mcp",
"p40baro",
"alt40fms",
"alt40mcp",
"is50",
"roll50",
"trk50",
"gs50",
"rtrk50",
"tas50",
"is60",
"hdg60",
"ias60",
"mach60",
"vr60baro",
"vr60ins",
"infer",
]
warnings.simplefilter("once", DeprecationWarning) warnings.simplefilter("once", DeprecationWarning)
warnings.warn( warnings.warn(
"pms.ehs module is deprecated. Please use pms.commb instead.", DeprecationWarning "pms.ehs module is deprecated. Please use pms.commb instead.",
DeprecationWarning,
) )
def BDS(msg): def BDS(msg):
warnings.warn( warnings.warn(
"pms.ehs.BDS() is deprecated, use pms.bds.infer() instead.", DeprecationWarning "pms.ehs.BDS() is deprecated, use pms.bds.infer() instead.",
DeprecationWarning,
) )
return infer(msg) return infer(msg)

View File

@ -10,14 +10,26 @@ The ELS wrapper imports all functions from the following modules:
""" """
from pyModeS.decoder.bds.bds10 import *
from pyModeS.decoder.bds.bds17 import *
from pyModeS.decoder.bds.bds20 import *
from pyModeS.decoder.bds.bds30 import *
import warnings import warnings
from .bds.bds10 import is10, ovc10
from .bds.bds17 import cap17, is17
from .bds.bds20 import cs20, is20
from .bds.bds30 import is30
warnings.simplefilter("once", DeprecationWarning) warnings.simplefilter("once", DeprecationWarning)
warnings.warn( warnings.warn(
"pms.els module is deprecated. Please use pms.commb instead.", DeprecationWarning "pms.els module is deprecated. Please use pms.commb instead.",
DeprecationWarning,
) )
__all__ = [
"is10",
"ovc10",
"is17",
"cap17",
"is20",
"cs20",
"is30",
]

View File

@ -2,14 +2,19 @@
Decode short roll call surveillance replies, with downlink format 4 or 5 Decode short roll call surveillance replies, with downlink format 4 or 5
""" """
from pyModeS import common from __future__ import annotations
from pyModeS.py_common import fs, dr, um from typing import Callable, TypeVar
from .. import common
T = TypeVar("T")
F = Callable[[str], T]
def _checkdf(func): def _checkdf(func: F[T]) -> F[T]:
"""Ensure downlink format is 4 or 5.""" """Ensure downlink format is 4 or 5."""
def wrapper(msg): def wrapper(msg: str) -> T:
df = common.df(msg) df = common.df(msg)
if df not in [4, 5]: if df not in [4, 5]:
raise RuntimeError( raise RuntimeError(
@ -21,7 +26,92 @@ def _checkdf(func):
@_checkdf @_checkdf
def altitude(msg): def fs(msg: str) -> tuple[int, str]:
"""Decode flight status.
Args:
msg (str): 14 hexdigits string
Returns:
int, str: flight status, description
"""
msgbin = common.hex2bin(msg)
fs = common.bin2int(msgbin[5:8])
text = ""
if fs == 0:
text = "no alert, no SPI, aircraft is airborne"
elif fs == 1:
text = "no alert, no SPI, aircraft is on-ground"
elif fs == 2:
text = "alert, no SPI, aircraft is airborne"
elif fs == 3:
text = "alert, no SPI, aircraft is on-ground"
elif fs == 4:
text = "alert, SPI, aircraft is airborne or on-ground"
elif fs == 5:
text = "no alert, SPI, aircraft is airborne or on-ground"
return fs, text
@_checkdf
def dr(msg: str) -> tuple[int, str]:
"""Decode downlink request.
Args:
msg (str): 14 hexdigits string
Returns:
int, str: downlink request, description
"""
msgbin = common.hex2bin(msg)
dr = common.bin2int(msgbin[8:13])
text = ""
if dr == 0:
text = "no downlink request"
elif dr == 1:
text = "request to send Comm-B message"
elif dr == 4:
text = "Comm-B broadcast 1 available"
elif dr == 5:
text = "Comm-B broadcast 2 available"
elif dr >= 16:
text = "ELM downlink segments available: {}".format(dr - 15)
return dr, text
@_checkdf
def um(msg: str) -> tuple[int, int, None | str]:
"""Decode utility message.
Utility message contains interrogator identifier and reservation type.
Args:
msg (str): 14 hexdigits string
Returns:
int, str: interrogator identifier code that triggered the reply, and
reservation type made by the interrogator
"""
msgbin = common.hex2bin(msg)
iis = common.bin2int(msgbin[13:17])
ids = common.bin2int(msgbin[17:19])
if ids == 0:
ids_text = None
if ids == 1:
ids_text = "Comm-B interrogator identifier code"
if ids == 2:
ids_text = "Comm-C interrogator identifier code"
if ids == 3:
ids_text = "Comm-D interrogator identifier code"
return iis, ids, ids_text
@_checkdf
def altitude(msg: str) -> None | int:
"""Decode altitude. """Decode altitude.
Args: Args:
@ -35,7 +125,7 @@ def altitude(msg):
@_checkdf @_checkdf
def identity(msg): def identity(msg: str) -> str:
"""Decode squawk code. """Decode squawk code.
Args: Args:

View File

@ -1,8 +1,11 @@
"""Uncertainty parameters. """Uncertainty parameters.
See source code at: https://github.com/junzis/pyModeS/blob/master/pyModeS/decoder/uncertainty.py
""" """
from __future__ import annotations
from typing import TypedDict
NA = None NA = None
TC_NUCp_lookup = { TC_NUCp_lookup = {
@ -26,7 +29,7 @@ TC_NUCp_lookup = {
22: 0, 22: 0,
} }
TC_NICv1_lookup = { TC_NICv1_lookup: dict[int, int | dict[int, int]] = {
5: 11, 5: 11,
6: 10, 6: 10,
7: 9, 7: 9,
@ -46,7 +49,7 @@ TC_NICv1_lookup = {
22: 0, 22: 0,
} }
TC_NICv2_lookup = { TC_NICv2_lookup: dict[int, int | dict[int, int]] = {
5: 11, 5: 11,
6: 10, 6: 10,
7: {2: 9, 0: 8}, 7: {2: 9, 0: 8},
@ -67,20 +70,32 @@ TC_NICv2_lookup = {
} }
NUCp = { class NUCpEntry(TypedDict):
9: {"HPL": 7.5, "RCu": 3}, HPL: None | float
8: {"HPL": 25, "RCu": 10}, RCu: None | int
7: {"HPL": 185, "RCu": 93}, RCv: None | int
6: {"HPL": 370, "RCu": 185},
5: {"HPL": 926, "RCu": 463},
4: {"HPL": 1852, "RCu": 926}, NUCp: dict[int, NUCpEntry] = {
3: {"HPL": 3704, "RCu": 1852}, 9: {"HPL": 7.5, "RCu": 3, "RCv": 4},
2: {"HPL": 18520, "RCu": 9260}, 8: {"HPL": 25, "RCu": 10, "RCv": 15},
1: {"HPL": 37040, "RCu": 18520}, 7: {"HPL": 185, "RCu": 93, "RCv": NA},
0: {"HPL": NA, "RCu": NA}, 6: {"HPL": 370, "RCu": 185, "RCv": NA},
5: {"HPL": 926, "RCu": 463, "RCv": NA},
4: {"HPL": 1852, "RCu": 926, "RCv": NA},
3: {"HPL": 3704, "RCu": 1852, "RCv": NA},
2: {"HPL": 18520, "RCu": 9260, "RCv": NA},
1: {"HPL": 37040, "RCu": 18520, "RCv": NA},
0: {"HPL": NA, "RCu": NA, "RCv": NA},
} }
NUCv = {
class NUCvEntry(TypedDict):
HVE: None | float
VVE: None | float
NUCv: dict[int, NUCvEntry] = {
0: {"HVE": NA, "VVE": NA}, 0: {"HVE": NA, "VVE": NA},
1: {"HVE": 10, "VVE": 15.2}, 1: {"HVE": 10, "VVE": 15.2},
2: {"HVE": 3, "VVE": 4.5}, 2: {"HVE": 3, "VVE": 4.5},
@ -88,7 +103,13 @@ NUCv = {
4: {"HVE": 0.3, "VVE": 0.46}, 4: {"HVE": 0.3, "VVE": 0.46},
} }
NACp = {
class NACpEntry(TypedDict):
EPU: None | int
VEPU: None | int
NACp: dict[int, NACpEntry] = {
11: {"EPU": 3, "VEPU": 4}, 11: {"EPU": 3, "VEPU": 4},
10: {"EPU": 10, "VEPU": 15}, 10: {"EPU": 10, "VEPU": 15},
9: {"EPU": 30, "VEPU": 45}, 9: {"EPU": 30, "VEPU": 45},
@ -103,7 +124,13 @@ NACp = {
0: {"EPU": NA, "VEPU": NA}, 0: {"EPU": NA, "VEPU": NA},
} }
NACv = {
class NACvEntry(TypedDict):
HFOMr: None | float
VFOMr: None | float
NACv: dict[int, NACvEntry] = {
0: {"HFOMr": NA, "VFOMr": NA}, 0: {"HFOMr": NA, "VFOMr": NA},
1: {"HFOMr": 10, "VFOMr": 15.2}, 1: {"HFOMr": 10, "VFOMr": 15.2},
2: {"HFOMr": 3, "VFOMr": 4.5}, 2: {"HFOMr": 3, "VFOMr": 4.5},
@ -111,7 +138,13 @@ NACv = {
4: {"HFOMr": 0.3, "VFOMr": 0.46}, 4: {"HFOMr": 0.3, "VFOMr": 0.46},
} }
SIL = {
class SILEntry(TypedDict):
PE_RCu: None | float
PE_VPL: None | float
SIL: dict[int, SILEntry] = {
3: {"PE_RCu": 1e-7, "PE_VPL": 2e-7}, 3: {"PE_RCu": 1e-7, "PE_VPL": 2e-7},
2: {"PE_RCu": 1e-5, "PE_VPL": 1e-5}, 2: {"PE_RCu": 1e-5, "PE_VPL": 1e-5},
1: {"PE_RCu": 1e-3, "PE_VPL": 1e-3}, 1: {"PE_RCu": 1e-3, "PE_VPL": 1e-3},
@ -119,7 +152,12 @@ SIL = {
} }
NICv1 = { class NICv1Entry(TypedDict):
Rc: None | float
VPL: None | float
NICv1: dict[int, dict[int, NICv1Entry]] = {
# NIC is used as the index at second Level # NIC is used as the index at second Level
11: {0: {"Rc": 7.5, "VPL": 11}}, 11: {0: {"Rc": 7.5, "VPL": 11}},
10: {0: {"Rc": 25, "VPL": 37.5}}, 10: {0: {"Rc": 25, "VPL": 37.5}},
@ -135,7 +173,12 @@ NICv1 = {
0: {0: {"Rc": NA, "VPL": NA}}, 0: {0: {"Rc": NA, "VPL": NA}},
} }
NICv2 = {
class NICv2Entry(TypedDict):
Rc: None | float
NICv2: dict[int, dict[int, NICv2Entry]] = {
# Decimal value of [NICa NICb/NICc] is used as the index at second Level # Decimal value of [NICa NICb/NICc] is used as the index at second Level
11: {0: {"Rc": 7.5}}, 11: {0: {"Rc": 7.5}},
10: {0: {"Rc": 25}}, 10: {0: {"Rc": 25}},

View File

@ -1,9 +1,10 @@
from pyModeS import common from typing import Optional
from .. import common
from textwrap import wrap from textwrap import wrap
def uplink_icao(msg): def uplink_icao(msg: str) -> str:
"""Calculate the ICAO address from a Mode-S interrogation (uplink message)""" "Calculate the ICAO address from a Mode-S interrogation (uplink message)"
p_gen = 0xFFFA0480 << ((len(msg) - 14) * 4) p_gen = 0xFFFA0480 << ((len(msg) - 14) * 4)
data = int(msg[:-6], 16) data = int(msg[:-6], 16)
PA = int(msg[-6:], 16) PA = int(msg[-6:], 16)
@ -20,22 +21,22 @@ def uplink_icao(msg):
return "%06X" % (ad >> 2) return "%06X" % (ad >> 2)
def uf(msg): def uf(msg: str) -> int:
"""Decode Uplink Format value, bits 1 to 5.""" """Decode Uplink Format value, bits 1 to 5."""
ufbin = common.hex2bin(msg[:2]) ufbin = common.hex2bin(msg[:2])
return min(common.bin2int(ufbin[0:5]), 24) return min(common.bin2int(ufbin[0:5]), 24)
def bds(msg): def bds(msg: str) -> Optional[str]:
"""Decode requested BDS register from selective (Roll Call) interrogation.""" "Decode requested BDS register from selective (Roll Call) interrogation."
UF = uf(msg) UF = uf(msg)
msgbin = common.hex2bin(msg) msgbin = common.hex2bin(msg)
msgbin_split = wrap(msgbin, 8) msgbin_split = wrap(msgbin, 8)
mbytes = list(map(common.bin2int, msgbin_split)) mbytes = list(map(common.bin2int, msgbin_split))
if uf(msg) in {4, 5, 20, 21}: if UF in {4, 5, 20, 21}:
di = mbytes[1] & 0x7 # DI - Designator Identification di = mbytes[1] & 0x7 # DI - Designator Identification
RR = mbytes[1] >> 3 & 0x1F RR = mbytes[1] >> 3 & 0x1F
if RR > 15: if RR > 15:
BDS1 = RR - 16 BDS1 = RR - 16
@ -46,7 +47,9 @@ def bds(msg):
RRS = ((mbytes[2] & 0x1) << 3) | ((mbytes[3] & 0xE0) >> 5) RRS = ((mbytes[2] & 0x1) << 3) | ((mbytes[3] & 0xE0) >> 5)
BDS2 = RRS BDS2 = RRS
else: else:
BDS2 = 0 # for other values of DI, the BDS2 is assumed 0 (as per ICAO Annex 10 Vol IV) # for other values of DI, the BDS2 is assumed 0
# (as per ICAO Annex 10 Vol IV)
BDS2 = 0
return str(format(BDS1,"X")) + str(format(BDS2,"X")) return str(format(BDS1,"X")) + str(format(BDS2,"X"))
else: else:
@ -55,7 +58,7 @@ def bds(msg):
return None return None
def pr(msg): def pr(msg: str) -> Optional[int]:
"""Decode PR (probability of reply) field from All Call interrogation. """Decode PR (probability of reply) field from All Call interrogation.
Interpretation: Interpretation:
0 signifies reply with probability of 1 0 signifies reply with probability of 1
@ -80,7 +83,7 @@ def pr(msg):
return None return None
def ic(msg): def ic(msg: str) -> Optional[str]:
"""Decode IC (interrogator code) from a ground-based interrogation.""" """Decode IC (interrogator code) from a ground-based interrogation."""
UF = uf(msg) UF = uf(msg)
@ -88,8 +91,7 @@ def ic(msg):
msgbin_split = wrap(msgbin, 8) msgbin_split = wrap(msgbin, 8)
mbytes = list(map(common.bin2int, msgbin_split)) mbytes = list(map(common.bin2int, msgbin_split))
IC = None IC = None
BDS2 = "" if UF == 11:
if uf(msg) == 11:
codeLabel = mbytes[1] & 0x7 codeLabel = mbytes[1] & 0x7
icField = (mbytes[1] >> 3) & 0xF icField = (mbytes[1] >> 3) & 0xF
@ -104,11 +106,11 @@ def ic(msg):
} }
IC = ic_switcher.get(codeLabel, "") IC = ic_switcher.get(codeLabel, "")
if uf(msg) in {4, 5, 20, 21}: if UF in {4, 5, 20, 21}:
di = mbytes[1] & 0x7 di = mbytes[1] & 0x7
RR = mbytes[1] >> 3 & 0x1F RR = mbytes[1] >> 3 & 0x1F
if RR > 15: if RR > 15:
BDS1 = RR - 16 BDS1 = RR - 16 # noqa: F841
if di == 0 or di == 1 or di == 7: if di == 0 or di == 1 or di == 7:
# II # II
II = (mbytes[2] >> 4) & 0xF II = (mbytes[2] >> 4) & 0xF
@ -148,7 +150,6 @@ def uplink_fields(msg):
msgbin_split = wrap(msgbin, 8) msgbin_split = wrap(msgbin, 8)
mbytes = list(map(common.bin2int, msgbin_split)) mbytes = list(map(common.bin2int, msgbin_split))
PR = "" PR = ""
LOS = ""
IC = "" IC = ""
lockout = False lockout = False
di = "" di = ""
@ -157,8 +158,6 @@ def uplink_fields(msg):
BDS = "" BDS = ""
if uf(msg) == 11: if uf(msg) == 11:
# Probability of Reply decoding # Probability of Reply decoding
PR = ((mbytes[0] & 0x7) << 1) | ((mbytes[1] & 0x80) >> 7) PR = ((mbytes[0] & 0x7) << 1) | ((mbytes[1] & 0x80) >> 7)
@ -179,7 +178,8 @@ def uplink_fields(msg):
IC = ic_switcher.get(codeLabel, "") IC = ic_switcher.get(codeLabel, "")
if uf(msg) in {4, 5, 20, 21}: if uf(msg) in {4, 5, 20, 21}:
# Decode the DI and get the lockout information conveniently (LSS or LOS) # Decode the DI and get the lockout information conveniently
# (LSS or LOS)
# DI - Designator Identification # DI - Designator Identification
di = mbytes[1] & 0x7 di = mbytes[1] & 0x7

View File

@ -1,11 +1,7 @@
import sys
import time
import csv import csv
import time
if len(sys.argv) > 1 and sys.argv[1] == "cython": from pyModeS.decoder import adsb
from pyModeS.c_decoder import adsb
else:
from pyModeS.decoder import adsb
print("===== Decode ADS-B sample data=====") print("===== Decode ADS-B sample data=====")

View File

@ -43,14 +43,20 @@ def test_adsb_position_with_ref():
def test_adsb_airborne_position_with_ref(): def test_adsb_airborne_position_with_ref():
pos = adsb.airborne_position_with_ref("8D40058B58C901375147EFD09357", 49.0, 6.0) pos = adsb.airborne_position_with_ref(
"8D40058B58C901375147EFD09357", 49.0, 6.0
)
assert pos == (49.82410, 6.06785) assert pos == (49.82410, 6.06785)
pos = adsb.airborne_position_with_ref("8D40058B58C904A87F402D3B8C59", 49.0, 6.0) pos = adsb.airborne_position_with_ref(
"8D40058B58C904A87F402D3B8C59", 49.0, 6.0
)
assert pos == (49.81755, 6.08442) assert pos == (49.81755, 6.08442)
def test_adsb_surface_position_with_ref(): def test_adsb_surface_position_with_ref():
pos = adsb.surface_position_with_ref("8FC8200A3AB8F5F893096B000000", -43.5, 172.5) pos = adsb.surface_position_with_ref(
"8FC8200A3AB8F5F893096B000000", -43.5, 172.5
)
assert pos == (-43.48564, 172.53942) assert pos == (-43.48564, 172.53942)
@ -91,13 +97,12 @@ def test_adsb_target_state_status():
assert sel_alt == (16992, "MCP/FCU") assert sel_alt == (16992, "MCP/FCU")
assert adsb.baro_pressure_setting("8DA05629EA21485CBF3F8CADAEEB") == 1012.8 assert adsb.baro_pressure_setting("8DA05629EA21485CBF3F8CADAEEB") == 1012.8
assert adsb.selected_heading("8DA05629EA21485CBF3F8CADAEEB") == 66.8 assert adsb.selected_heading("8DA05629EA21485CBF3F8CADAEEB") == 66.8
assert adsb.autopilot("8DA05629EA21485CBF3F8CADAEEB") == True assert adsb.autopilot("8DA05629EA21485CBF3F8CADAEEB") is True
assert adsb.vnav_mode("8DA05629EA21485CBF3F8CADAEEB") == True assert adsb.vnav_mode("8DA05629EA21485CBF3F8CADAEEB") is True
assert adsb.altitude_hold_mode("8DA05629EA21485CBF3F8CADAEEB") == False assert adsb.altitude_hold_mode("8DA05629EA21485CBF3F8CADAEEB") is False
assert adsb.approach_mode("8DA05629EA21485CBF3F8CADAEEB") == False assert adsb.approach_mode("8DA05629EA21485CBF3F8CADAEEB") is False
assert adsb.tcas_operational("8DA05629EA21485CBF3F8CADAEEB") == True assert adsb.tcas_operational("8DA05629EA21485CBF3F8CADAEEB") is True
assert adsb.lnav_mode("8DA05629EA21485CBF3F8CADAEEB") == True assert adsb.lnav_mode("8DA05629EA21485CBF3F8CADAEEB") is True
# def test_nic(): # def test_nic():

View File

@ -1,4 +1,5 @@
from pyModeS import bds, commb from pyModeS import bds, commb
import pytest
# from pyModeS import ehs, els # deprecated # from pyModeS import ehs, els # deprecated
@ -23,7 +24,7 @@ def test_bds40_functions():
def test_bds50_functions(): def test_bds50_functions():
assert bds.bds50.roll50("A000139381951536E024D4CCF6B5") == 2.1 assert bds.bds50.roll50("A000139381951536E024D4CCF6B5") == 2.1
assert bds.bds50.roll50("A0001691FFD263377FFCE02B2BF9") == -0.4 # signed value assert bds.bds50.roll50("A0001691FFD263377FFCE02B2BF9") == -0.4
assert bds.bds50.trk50("A000139381951536E024D4CCF6B5") == 114.258 assert bds.bds50.trk50("A000139381951536E024D4CCF6B5") == 114.258
assert bds.bds50.gs50("A000139381951536E024D4CCF6B5") == 438 assert bds.bds50.gs50("A000139381951536E024D4CCF6B5") == 438
assert bds.bds50.rtrk50("A000139381951536E024D4CCF6B5") == 0.125 assert bds.bds50.rtrk50("A000139381951536E024D4CCF6B5") == 0.125
@ -38,14 +39,16 @@ def test_bds50_functions():
def test_bds60_functions(): def test_bds60_functions():
assert bds.bds60.hdg60("A00004128F39F91A7E27C46ADC21") == 42.715 msg = "A00004128F39F91A7E27C46ADC21"
assert bds.bds60.ias60("A00004128F39F91A7E27C46ADC21") == 252
assert bds.bds60.mach60("A00004128F39F91A7E27C46ADC21") == 0.42
assert bds.bds60.vr60baro("A00004128F39F91A7E27C46ADC21") == -1920
assert bds.bds60.vr60ins("A00004128F39F91A7E27C46ADC21") == -1920
assert commb.hdg60("A00004128F39F91A7E27C46ADC21") == 42.715 assert bds.bds60.hdg60(msg) == pytest.approx(42.71484)
assert commb.ias60("A00004128F39F91A7E27C46ADC21") == 252 assert bds.bds60.ias60(msg) == 252
assert commb.mach60("A00004128F39F91A7E27C46ADC21") == 0.42 assert bds.bds60.mach60(msg) == 0.42
assert commb.vr60baro("A00004128F39F91A7E27C46ADC21") == -1920 assert bds.bds60.vr60baro(msg) == -1920
assert commb.vr60ins("A00004128F39F91A7E27C46ADC21") == -1920 assert bds.bds60.vr60ins(msg) == -1920
assert commb.hdg60(msg) == pytest.approx(42.71484)
assert commb.ias60(msg) == 252
assert commb.mach60(msg) == 0.42
assert commb.vr60baro(msg) == -1920
assert commb.vr60ins(msg) == -1920