From 51222372b1001056bd16cc25c59da4a233597f03 Mon Sep 17 00:00:00 2001 From: Xavier Olive Date: Tue, 1 Nov 2022 16:32:42 +0900 Subject: [PATCH] fully annotated code --- pyModeS/__init__.py | 14 +- pyModeS/c_common.pyi | 13 +- pyModeS/common.pyi | 22 +++ pyModeS/decoder/__init__.py | 50 ++++-- pyModeS/decoder/adsb.py | 288 +++++++++++++++++++++----------- pyModeS/decoder/allcall.py | 40 +++-- pyModeS/decoder/bds/__init__.py | 34 ++-- pyModeS/decoder/bds/bds05.py | 33 ++-- pyModeS/decoder/bds/bds06.py | 52 ++++-- pyModeS/decoder/bds/bds08.py | 12 +- pyModeS/decoder/bds/bds09.py | 53 ++++-- pyModeS/decoder/bds/bds10.py | 7 +- pyModeS/decoder/bds/bds17.py | 8 +- pyModeS/decoder/bds/bds20.py | 6 +- pyModeS/decoder/bds/bds30.py | 6 +- pyModeS/decoder/bds/bds40.py | 22 ++- pyModeS/decoder/bds/bds44.py | 16 +- pyModeS/decoder/bds/bds45.py | 22 +-- pyModeS/decoder/bds/bds50.py | 16 +- pyModeS/decoder/bds/bds53.py | 16 +- pyModeS/decoder/bds/bds60.py | 24 +-- pyModeS/decoder/bds/bds61.py | 10 +- pyModeS/decoder/bds/bds62.py | 91 +++++----- pyModeS/decoder/commb.py | 68 ++++++-- pyModeS/decoder/ehs.py | 43 ++++- pyModeS/decoder/els.py | 24 ++- pyModeS/decoder/surv.py | 102 ++++++++++- pyModeS/decoder/uncertainty.py | 83 ++++++--- pyModeS/decoder/uplink.py | 42 ++--- tests/sample_run_adsb.py | 8 +- tests/test_adsb.py | 25 +-- tests/test_commb.py | 27 +-- 32 files changed, 871 insertions(+), 406 deletions(-) create mode 100644 pyModeS/common.pyi diff --git a/pyModeS/__init__.py b/pyModeS/__init__.py index 9504d45..0b11444 100644 --- a/pyModeS/__init__.py +++ b/pyModeS/__init__.py @@ -4,7 +4,7 @@ import warnings try: from . import c_common as common from .c_common import * -except: +except Exception: from . import py_common as common # type: ignore from .py_common import * # type: ignore @@ -17,6 +17,18 @@ from .decoder import bds from .extra import aero from .extra import tcpclient +__all__ = [ + "common", + "tell", + "adsb", + "commb", + "allcall", + "surv", + "bds", + "aero", + "tcpclient", +] + warnings.simplefilter("once", DeprecationWarning) diff --git a/pyModeS/c_common.pyi b/pyModeS/c_common.pyi index dd28386..2c68584 100644 --- a/pyModeS/c_common.pyi +++ b/pyModeS/c_common.pyi @@ -2,28 +2,17 @@ def hex2bin(hexstr: str) -> str: ... def bin2int(binstr: str) -> int: ... def hex2int(hexstr: str) -> int: ... def bin2hex(binstr: str) -> str: ... - - def df(msg: str) -> int: ... def crc(msg: str, encode: bool = False) -> int: ... - - def floor(x: float) -> float: ... def icao(msg: str) -> str: ... def is_icao_assigned(icao: str) -> bool: ... - - def typecode(msg: str) -> int: ... def cprNL(lat: float) -> int: ... - - def idcode(msg: str) -> str: ... def squawk(binstr: str) -> str: ... - - def altcode(msg: str) -> int: ... def altitude(binstr: str) -> int: ... - - def data(msg: str) -> str: ... def allzeros(msg: str) -> bool: ... +def wrongstatus(data: str, sb: int, msb: int, lsb: int) -> bool: ... diff --git a/pyModeS/common.pyi b/pyModeS/common.pyi new file mode 100644 index 0000000..1f279f5 --- /dev/null +++ b/pyModeS/common.pyi @@ -0,0 +1,22 @@ +from typing import Optional + +def hex2bin(hexstr: str) -> str: ... +def bin2int(binstr: str) -> int: ... +def hex2int(hexstr: str) -> int: ... +def bin2hex(binstr: str) -> str: ... +def df(msg: str) -> int: ... +def crc(msg: str, encode: bool = False) -> int: ... +def floor(x: float) -> float: ... +def icao(msg: str) -> Optional[str]: ... +def is_icao_assigned(icao: str) -> bool: ... +def typecode(msg: str) -> Optional[int]: ... +def cprNL(lat: float) -> int: ... +def idcode(msg: str) -> str: ... +def squawk(binstr: str) -> str: ... +def altcode(msg: str) -> Optional[int]: ... +def altitude(binstr: str) -> Optional[int]: ... +def gray2alt(binstr: str) -> Optional[int]: ... +def gray2int(binstr: str) -> int: ... +def data(msg: str) -> str: ... +def allzeros(msg: str) -> bool: ... +def wrongstatus(data: str, sb: int, msb: int, lsb: int) -> bool: ... diff --git a/pyModeS/decoder/__init__.py b/pyModeS/decoder/__init__.py index 53ecf72..63341df 100644 --- a/pyModeS/decoder/__init__.py +++ b/pyModeS/decoder/__init__.py @@ -1,5 +1,5 @@ def tell(msg: str) -> None: - from pyModeS import common, adsb, commb, bds + from .. import common, adsb, commb, bds def _print(label, value, unit=None): print("%28s: " % label, end="") @@ -20,6 +20,11 @@ def tell(msg: str) -> None: _print("Protocol", "Mode-S Extended Squitter (ADS-B)") tc = common.typecode(msg) + + if tc is None: + _print("ERROR", "Unknown typecode") + return + if 1 <= tc <= 4: # callsign callsign = adsb.callsign(msg) _print("Type", "Identification and category") @@ -52,12 +57,14 @@ def tell(msg: str) -> None: if tc == 19: _print("Type", "Airborne velocity") - spd, trk, vr, t = adsb.velocity(msg) - types = {"GS": "Ground speed", "TAS": "True airspeed"} - _print("Speed", spd, "knots") - _print("Track", trk, "degrees") - _print("Vertical rate", vr, "feet/minute") - _print("Type", types[t]) + velocity = adsb.velocity(msg) + if velocity is not None: + spd, trk, vr, t = velocity + types = {"GS": "Ground speed", "TAS": "True airspeed"} + _print("Speed", spd, "knots") + _print("Track", trk, "degrees") + _print("Vertical rate", vr, "feet/minute") + _print("Type", types[t]) if 20 <= tc <= 22: # airborne position _print("Type", "Airborne position (with GNSS altitude)") @@ -106,8 +113,16 @@ def tell(msg: str) -> None: _print("Angle", angle, "°") _print("Angle Type", angle_type) _print("Angle Source", angle_source) - _print("Vertical mode", vertical_horizontal_types[vertical_mode]) - _print("Horizontal mode", vertical_horizontal_types[horizontal_mode]) + if vertical_mode is not None: + _print( + "Vertical mode", + vertical_horizontal_types[vertical_mode], + ) + if horizontal_mode is not None: + _print( + "Horizontal mode", + vertical_horizontal_types[horizontal_mode], + ) _print( "TCAS/ACAS", tcas_operational_types[tcas_operational] @@ -117,7 +132,7 @@ def tell(msg: str) -> None: _print("TCAS/ACAS RA", tcas_ra_types[tcas_ra]) _print("Emergency status", emergency_types[emergency_status]) else: - alt, alt_source = adsb.selected_altitude(msg) + alt, alt_source = adsb.selected_altitude(msg) # type: ignore baro = adsb.baro_pressure_setting(msg) hdg = adsb.selected_heading(msg) autopilot = adsb.autopilot(msg) @@ -127,13 +142,20 @@ def tell(msg: str) -> None: lnav = adsb.lnav_mode(msg) _print("Selected altitude", alt, "feet") _print("Altitude source", alt_source) - _print("Barometric pressure setting", baro, "" if baro == None else "millibars") + _print( + "Barometric pressure setting", + baro, + "" if baro is None else "millibars", + ) _print("Selected Heading", hdg, "°") if not (common.bin2int((common.hex2bin(msg)[32:])[46]) == 0): - _print("Autopilot", types_29[autopilot] if autopilot else None) + _print( + "Autopilot", types_29[autopilot] if autopilot else None + ) _print("VNAV mode", types_29[vnav] if vnav else None) _print( - "Altitude hold mode", types_29[alt_hold] if alt_hold else None + "Altitude hold mode", + types_29[alt_hold] if alt_hold else None, ) _print("Approach mode", types_29[app] if app else None) _print( @@ -167,7 +189,7 @@ def tell(msg: str) -> None: } BDS = bds.infer(msg, mrar=True) - if BDS in labels.keys(): + if BDS is not None and BDS in labels.keys(): _print("BDS", "%s (%s)" % (BDS, labels[BDS])) else: _print("BDS", BDS) diff --git a/pyModeS/decoder/adsb.py b/pyModeS/decoder/adsb.py index df35bc9..71bab5b 100644 --- a/pyModeS/decoder/adsb.py +++ b/pyModeS/decoder/adsb.py @@ -2,65 +2,120 @@ The ADS-B module also imports functions from the following modules: -- pyModeS.decoder.bds.bds05: ``airborne_position()``, ``airborne_position_with_ref()``, ``altitude()`` -- pyModeS.decoder.bds.bds06: ``surface_position()``, ``surface_position_with_ref()``, ``surface_velocity()`` -- pyModeS.decoder.bds.bds08: ``category()``, ``callsign()`` -- pyModeS.decoder.bds.bds09: ``airborne_velocity()``, ``altitude_diff()`` +- bds05: ``airborne_position()``, ``airborne_position_with_ref()``, + ``altitude()`` +- bds06: ``surface_position()``, ``surface_position_with_ref()``, + ``surface_velocity()`` +- bds08: ``category()``, ``callsign()`` +- bds09: ``airborne_velocity()``, ``altitude_diff()`` """ -import pyModeS as pms +from __future__ import annotations +from datetime import datetime -from pyModeS import common - -from pyModeS.decoder import uncertainty - -# from pyModeS.decoder.bds import bds05, bds06, bds09 -from pyModeS.decoder.bds.bds05 import ( - airborne_position, - airborne_position_with_ref, - altitude as altitude05, -) -from pyModeS.decoder.bds.bds06 import ( +from .. import common +from . import uncertainty +from .bds.bds05 import airborne_position, airborne_position_with_ref +from .bds.bds05 import altitude as altitude05 +from .bds.bds06 import ( surface_position, surface_position_with_ref, surface_velocity, ) -from pyModeS.decoder.bds.bds08 import category, callsign -from pyModeS.decoder.bds.bds09 import airborne_velocity, altitude_diff -from pyModeS.decoder.bds.bds61 import is_emergency, emergency_state, emergency_squawk -from pyModeS.decoder.bds.bds62 import ( +from .bds.bds08 import callsign, category +from .bds.bds09 import airborne_velocity, altitude_diff +from .bds.bds61 import emergency_squawk, emergency_state, is_emergency +from .bds.bds62 import ( + altitude_hold_mode, + approach_mode, + autopilot, + baro_pressure_setting, + emergency_status, + horizontal_mode, + lnav_mode, selected_altitude, selected_heading, target_altitude, target_angle, tcas_operational, tcas_ra, - baro_pressure_setting, vertical_mode, - horizontal_mode, vnav_mode, - lnav_mode, - autopilot, - altitude_hold_mode, - approach_mode, - emergency_status, ) - -def df(msg): +__all__ = [ + "airborne_position", + "airborne_position_with_ref", + "altitude05", + "surface_position", + "surface_position_with_ref", + "surface_velocity", + "callsign", + "category", + "airborne_velocity", + "altitude_diff", + "emergency_squawk", + "emergency_state", + "is_emergency", + "df", + "icao", + "typecode", + "position", + "position_with_ref", + "altitude", + "velocity", + "speed_heading", + "oe_flag", + "version", + "nuc_p", + "nuc_v", + "nic_v1", + "nic_v2", + "nic_s", + "nic_a_c", + "nic_b", + "nac_p", + "nac_v", + "sil", + "selected_altitude", + "target_altitude", + "vertical_mode", + "horizontal_mode", + "selected_heading", + "target_angle", + "baro_pressure_setting", + "autopilot", + "vnav_mode", + "altitude_hold_mode", + "approach_mode", + "lnav_mode", + "tcas_operational", + "tcas_ra", + "emergency_status", +] + + +def df(msg: str) -> int: return common.df(msg) -def icao(msg): +def icao(msg: str) -> None | str: return common.icao(msg) -def typecode(msg): +def typecode(msg: str) -> None | int: return common.typecode(msg) -def position(msg0, msg1, t0, t1, lat_ref=None, lon_ref=None): +def position( + msg0: str, + msg1: str, + t0: int | datetime, + t1: int | datetime, + lat_ref: None | float = None, + lon_ref: None | float = None, +) -> None | tuple[float, float]: """Decode surface or airborne position from a pair of even and odd position messages. @@ -82,6 +137,9 @@ def position(msg0, msg1, t0, t1, lat_ref=None, lon_ref=None): tc0 = typecode(msg0) tc1 = typecode(msg1) + if tc0 is None or tc1 is None: + raise RuntimeError("Incorrect or inconsistent message types") + if 5 <= tc0 <= 8 and 5 <= tc1 <= 8: if lat_ref is None or lon_ref is None: raise RuntimeError( @@ -103,7 +161,9 @@ def position(msg0, msg1, t0, t1, lat_ref=None, lon_ref=None): raise RuntimeError("Incorrect or inconsistent message types") -def position_with_ref(msg, lat_ref, lon_ref): +def position_with_ref( + msg: str, lat_ref: float, lon_ref: float +) -> tuple[float, float]: """Decode position with only one message. A reference position is required, which can be previously @@ -123,6 +183,9 @@ def position_with_ref(msg, lat_ref, lon_ref): tc = typecode(msg) + if tc is None: + raise RuntimeError("incorrect or inconsistent message types") + if 5 <= tc <= 8: return surface_position_with_ref(msg, lat_ref, lon_ref) @@ -133,7 +196,7 @@ def position_with_ref(msg, lat_ref, lon_ref): raise RuntimeError("incorrect or inconsistent message types") -def altitude(msg): +def altitude(msg: str) -> None | float: """Decode aircraft altitude. Args: @@ -145,7 +208,7 @@ def altitude(msg): """ tc = typecode(msg) - if tc < 5 or tc == 19 or tc > 22: + if tc is None or tc < 5 or tc == 19 or tc > 22: raise RuntimeError("%s: Not a position message" % msg) elif tc >= 5 and tc <= 8: @@ -157,16 +220,20 @@ def altitude(msg): return altitude05(msg) -def velocity(msg, source=False): - """Calculate the speed, heading, and vertical rate (handles both airborne or surface message). +def velocity( + msg: str, source: bool = False +) -> None | tuple[None | float, None | float, None | int, str]: + """Calculate the speed, heading, and vertical rate + (handles both airborne or surface message). Args: msg (str): 28 hexdigits string - source (boolean): Include direction and vertical rate sources in return. Default to False. - If set to True, the function will return six values instead of four. + source (boolean): Include direction and vertical rate sources in return. + Default to False. + If set to True, the function will return six value instead of four. Returns: - (int, float, int, string, [string], [string]): Four or six parameters, including: + int, float, int, string, [string], [string]: - Speed (kt) - Angle (degree), either ground track or heading - Vertical rate (ft/min) @@ -174,22 +241,26 @@ def velocity(msg, source=False): - [Optional] Direction source ('TRUE_NORTH' or 'MAGNETIC_NORTH') - [Optional] Vertical rate source ('BARO' or 'GNSS') - For surface messages, vertical rate and its respective sources are set to None. + For surface messages, vertical rate and its respective sources are set + to None. """ - if 5 <= typecode(msg) <= 8: + tc = typecode(msg) + error = "incorrect or inconsistent message types, expecting 4 None | tuple[None | float, None | float]: """Get speed and ground track (or heading) from the velocity message (handles both airborne or surface message) @@ -199,11 +270,14 @@ def speed_heading(msg): Returns: (int, float): speed (kt), ground track or heading (degree) """ - spd, trk_or_hdg, rocd, tag = velocity(msg) + decoded = velocity(msg) + if decoded is None: + return None + spd, trk_or_hdg, rocd, tag = decoded return spd, trk_or_hdg -def oe_flag(msg): +def oe_flag(msg: str) -> int: """Check the odd/even flag. Bit 54, 0 for even, 1 for odd. Args: msg (str): 28 hexdigits string @@ -214,7 +288,7 @@ def oe_flag(msg): return int(msgbin[53]) -def version(msg): +def version(msg: str) -> int: """ADS-B Version Args: @@ -236,13 +310,15 @@ def version(msg): return version -def nuc_p(msg): - """Calculate NUCp, Navigation Uncertainty Category - Position (ADS-B version 1) +def nuc_p(msg: str) -> tuple[int, None | float, None | int, None | int]: + """Calculate NUCp, Navigation Uncertainty Category - Position + (ADS-B version 1) Args: msg (str): 28 hexdigits string, Returns: + int: NUCp, Navigation Uncertainty Category (position) int: Horizontal Protection Limit int: 95% Containment Radius - Horizontal (meters) int: 95% Containment Radius - Vertical (meters) @@ -250,7 +326,7 @@ def nuc_p(msg): """ tc = typecode(msg) - if typecode(msg) < 5 or typecode(msg) > 22: + if tc is None or tc < 5 or tc is None or tc > 22: raise RuntimeError( "%s: Not a surface position message (5 tuple[int, None | float, None | float]: + """Calculate NUCv, Navigation Uncertainty Category - Velocity + (ADS-B version 1) Args: msg (str): 28 hexdigits string, Returns: + int: NUCv, Navigation Uncertainty Category (velocity) int or string: 95% Horizontal Velocity Error int or string: 95% Vertical Velocity Error """ @@ -295,17 +376,18 @@ def nuc_v(msg): msgbin = common.hex2bin(msg) NUCv = common.bin2int(msgbin[42:45]) + index = uncertainty.NUCv.get(NUCv, None) - try: - HVE = uncertainty.NUCv[NUCv]["HVE"] - VVE = uncertainty.NUCv[NUCv]["VVE"] - except KeyError: + if index is not None: + HVE = index["HVE"] + VVE = index["VVE"] + else: HVE, VVE = uncertainty.NA, uncertainty.NA - return HVE, VVE + return NUCv, HVE, VVE -def nic_v1(msg, NICs): +def nic_v1(msg: str, NICs: int) -> tuple[int, None | float, None | float]: """Calculate NIC, navigation integrity category, for ADS-B version 1 Args: @@ -313,10 +395,12 @@ def nic_v1(msg, NICs): NICs (int or string): NIC supplement Returns: + int: NIC, Navigation Integrity Category int or string: Horizontal Radius of Containment int or string: Vertical Protection Limit """ - if typecode(msg) < 5 or typecode(msg) > 22: + tc = typecode(msg) + if tc is None or tc < 5 or tc > 22: raise RuntimeError( "%s: Not a surface position message (5 tuple[int, int]: """Calculate NIC, navigation integrity category, for ADS-B version 2 Args: @@ -348,9 +434,11 @@ def nic_v2(msg, NICa, NICbc): NICbc (int or string): NIC supplement - B or C Returns: + int: NIC, Navigation Integrity Category int or string: Horizontal Radius of Containment """ - if typecode(msg) < 5 or typecode(msg) > 22: + tc = typecode(msg) + if tc is None or tc < 5 or tc > 22: raise RuntimeError( "%s: Not a surface position message (5 int: """Obtain NIC supplement bit, TC=31 message Args: @@ -399,7 +486,7 @@ def nic_s(msg): return nic_s -def nic_a_c(msg): +def nic_a_c(msg: str) -> tuple[int, int]: """Obtain NICa/c, navigation integrity category supplements a and c Args: @@ -422,7 +509,7 @@ def nic_a_c(msg): return nic_a, nic_c -def nic_b(msg): +def nic_b(msg: str) -> int: """Obtain NICb, navigation integrity category supplement-b Args: @@ -433,7 +520,7 @@ def nic_b(msg): """ tc = typecode(msg) - if tc < 9 or tc > 18: + if tc is None or tc < 9 or tc > 18: raise RuntimeError( "%s: Not a airborne position message, expecting 8 tuple[int, int | None, int | None]: """Calculate NACp, Navigation Accuracy Category - Position Args: msg (str): 28 hexdigits string, TC = 29 or 31 Returns: - int or string: 95% horizontal accuracy bounds, Estimated Position Uncertainty - int or string: 95% vertical accuracy bounds, Vertical Estimated Position Uncertainty + int: NACp, Navigation Accuracy Category (position) + int or string: 95% horizontal accuracy bounds, + Estimated Position Uncertainty + int or string: 95% vertical accuracy bounds, + Vertical Estimated Position Uncertainty """ tc = typecode(msg) @@ -476,18 +566,21 @@ def nac_p(msg): except KeyError: EPU, VEPU = uncertainty.NA, uncertainty.NA - return EPU, VEPU + return NACp, EPU, VEPU -def nac_v(msg): +def nac_v(msg: str) -> tuple[int, float | None, float | None]: """Calculate NACv, Navigation Accuracy Category - Velocity Args: msg (str): 28 hexdigits string, TC = 19 Returns: - int or string: 95% horizontal accuracy bounds for velocity, Horizontal Figure of Merit - int or string: 95% vertical accuracy bounds for velocity, Vertical Figure of Merit + int: NACv, Navigation Accuracy Category (velocity) + int or string: 95% horizontal accuracy bounds for velocity, + Horizontal Figure of Merit + int or string: 95% vertical accuracy bounds for velocity, + Vertical Figure of Merit """ tc = typecode(msg) @@ -505,18 +598,23 @@ def nac_v(msg): except KeyError: HFOMr, VFOMr = uncertainty.NA, uncertainty.NA - return HFOMr, VFOMr + return NACv, HFOMr, VFOMr -def sil(msg, version): +def sil( + msg: str, + version: None | int, +) -> tuple[float | None, float | None, str]: """Calculate SIL, Surveillance Integrity Level Args: msg (str): 28 hexdigits string with TC = 29, 31 Returns: - int or string: Probability of exceeding Horizontal Radius of Containment RCu - int or string: Probability of exceeding Vertical Integrity Containment Region VPL + int or string: + Probability of exceeding Horizontal Radius of Containment RCu + int or string: + Probability of exceeding Vertical Integrity Containment Region VPL string: SIL supplement based on per "hour" or "sample", or 'unknown' """ tc = typecode(msg) diff --git a/pyModeS/decoder/allcall.py b/pyModeS/decoder/allcall.py index d5592c8..3023ab3 100644 --- a/pyModeS/decoder/allcall.py +++ b/pyModeS/decoder/allcall.py @@ -2,13 +2,21 @@ Decode all-call reply messages, with downlink format 11 """ -from pyModeS import common +from __future__ import annotations +from typing import Callable, TypeVar + +from .. import common + +T = TypeVar("T") +F = Callable[[str], T] + + +def _checkdf(func: F[T]) -> F[T]: -def _checkdf(func): """Ensure downlink format is 11.""" - def wrapper(msg): + def wrapper(msg: str) -> T: df = common.df(msg) if df != 11: raise RuntimeError( @@ -20,7 +28,7 @@ def _checkdf(func): @_checkdf -def icao(msg): +def icao(msg: str) -> None | str: """Decode transponder code (ICAO address). Args: @@ -33,7 +41,7 @@ def icao(msg): @_checkdf -def interrogator(msg): +def interrogator(msg: str) -> str: """Decode interrogator identifier code. Args: @@ -42,19 +50,20 @@ def interrogator(msg): int: interrogator identifier code """ - # the CRC remainder contains the CL and IC field. top three bits are CL field and last four bits are IC field. + # the CRC remainder contains the CL and IC field. + # the top three bits are CL field and last four bits are IC field. remainder = common.crc(msg) - if remainder > 79: + if remainder > 79: IC = "corrupt IC" elif remainder < 16: - IC="II"+str(remainder) + IC = "II" + str(remainder) else: - IC="SI"+str(remainder-16) + IC = "SI" + str(remainder - 16) return IC @_checkdf -def capability(msg): +def capability(msg: str) -> tuple[int, None | str]: """Decode transponder capability. Args: @@ -73,9 +82,16 @@ def capability(msg): elif ca == 5: text = "level 2 transponder, ability to set CA to 7, airborne" elif ca == 6: - text = "evel 2 transponder, ability to set CA to 7, either airborne or ground" + text = ( + "evel 2 transponder, ability to set CA to 7, " + "either airborne or ground" + ) elif ca == 7: - text = "Downlink Request value is 0,or the Flight Status is 2, 3, 4 or 5, either airborne or on the ground" + text = ( + "Downlink Request value is 0, " + "or the Flight Status is 2, 3, 4 or 5, " + "either airborne or on the ground" + ) else: text = None diff --git a/pyModeS/decoder/bds/__init__.py b/pyModeS/decoder/bds/__init__.py index 509a568..1ee0042 100644 --- a/pyModeS/decoder/bds/__init__.py +++ b/pyModeS/decoder/bds/__init__.py @@ -18,16 +18,13 @@ Common functions for Mode-S decoding """ -import numpy as np +from typing import Optional -from pyModeS.extra import aero -from pyModeS import common +import numpy as np -from pyModeS.decoder.bds import ( - bds05, - bds06, - bds08, - bds09, +from ... import common +from ...extra import aero +from . import ( # noqa: F401 bds10, bds17, bds20, @@ -36,13 +33,15 @@ from pyModeS.decoder.bds import ( bds44, bds45, bds50, - bds53, bds60, - bds62 + bds61, + bds62, ) -def is50or60(msg, spd_ref, trk_ref, alt_ref): +def is50or60( + msg: str, spd_ref: float, trk_ref: float, alt_ref: float +) -> Optional[str]: """Use reference ground speed and trk to determine BDS50 and DBS60. Args: @@ -52,7 +51,8 @@ def is50or60(msg, spd_ref, trk_ref, alt_ref): alt_ref (float): reference altitude (ADS-B altitude), ft Returns: - String or None: BDS version, or possible versions, or None if nothing matches. + String or None: BDS version, or possible versions, + or None if nothing matches. """ @@ -114,15 +114,17 @@ def is50or60(msg, spd_ref, trk_ref, alt_ref): return BDS -def infer(msg, mrar=False): +def infer(msg: str, mrar: bool = False) -> Optional[str]: """Estimate the most likely BDS code of an message. Args: msg (str): 28 hexdigits string - mrar (bool): Also infer MRAR (BDS 44) and MHR (BDS 45). Defaults to False. + mrar (bool): Also infer MRAR (BDS 44) and MHR (BDS 45). + Defaults to False. Returns: - String or None: BDS version, or possible versions, or None if nothing matches. + String or None: BDS version, or possible versions, + or None if nothing matches. """ df = common.df(msg) @@ -133,6 +135,8 @@ def infer(msg, mrar=False): # For ADS-B / Mode-S extended squitter if df == 17: tc = common.typecode(msg) + if tc is None: + return None if 1 <= tc <= 4: return "BDS08" # identification and category diff --git a/pyModeS/decoder/bds/bds05.py b/pyModeS/decoder/bds/bds05.py index 31f6ca2..bf07f1a 100644 --- a/pyModeS/decoder/bds/bds05.py +++ b/pyModeS/decoder/bds/bds05.py @@ -1,14 +1,20 @@ # ------------------------------------------ # BDS 0,5 # ADS-B TC=9-18 -# Airborn position +# Airborne position # ------------------------------------------ -from pyModeS import common +from __future__ import annotations +from datetime import datetime -def airborne_position(msg0, msg1, t0, t1): - """Decode airborn position from a pair of even and odd position message +from ... import common + + +def airborne_position( + msg0: str, msg1: str, t0: int | datetime, t1: int | datetime +) -> None | tuple[float, float]: + """Decode airborne position from a pair of even and odd position message Args: msg0 (string): even message (28 hexdigits) @@ -59,7 +65,8 @@ def airborne_position(msg0, msg1, t0, t1): return None # compute ni, longitude index m, and longitude - if t0 > t1: + # (people pass int+int or datetime+datetime) + if t0 > t1: # type: ignore lat = lat_even nl = common.cprNL(lat) ni = max(common.cprNL(lat) - 0, 1) @@ -78,7 +85,9 @@ def airborne_position(msg0, msg1, t0, t1): return round(lat, 5), round(lon, 5) -def airborne_position_with_ref(msg, lat_ref, lon_ref): +def airborne_position_with_ref( + msg: str, lat_ref: float, lon_ref: float +) -> tuple[float, float]: """Decode airborne position with only one message, knowing reference nearby location, such as previously calculated location, ground station, or airport location, etc. The reference position shall @@ -123,7 +132,7 @@ def airborne_position_with_ref(msg, lat_ref, lon_ref): return round(lat, 5), round(lon, 5) -def altitude(msg): +def altitude(msg: str) -> None | int: """Decode aircraft altitude Args: @@ -135,16 +144,14 @@ def altitude(msg): tc = common.typecode(msg) - if tc < 9 or tc == 19 or tc > 22: - raise RuntimeError("%s: Not a airborn position message" % msg) + if tc is None or tc < 9 or tc == 19 or tc > 22: + raise RuntimeError("%s: Not an airborne position message" % msg) mb = common.hex2bin(msg)[32:] altbin = mb[8:20] if tc < 19: altcode = altbin[0:6] + "0" + altbin[6:] - alt = common.altitude(altcode) + return common.altitude(altcode) else: - alt = common.bin2int(altbin) * 3.28084 - - return alt + return common.bin2int(altbin) * 3.28084 # type: ignore diff --git a/pyModeS/decoder/bds/bds06.py b/pyModeS/decoder/bds/bds06.py index 4a9f6a9..9077275 100644 --- a/pyModeS/decoder/bds/bds06.py +++ b/pyModeS/decoder/bds/bds06.py @@ -4,10 +4,22 @@ # Surface movement # ------------------------------------------ -from pyModeS import common +from __future__ import annotations +from datetime import datetime + +from ... import common + + +def surface_position( + msg0: str, + msg1: str, + t0: int | datetime, + t1: int | datetime, + lat_ref: float, + lon_ref: float, +) -> None | tuple[float, float]: -def surface_position(msg0, msg1, t0, t1, lat_ref, lon_ref): """Decode surface position from a pair of even and odd position message, the lat/lon of receiver must be provided to yield the correct solution. @@ -55,7 +67,8 @@ def surface_position(msg0, msg1, t0, t1, lat_ref, lon_ref): return None # compute ni, longitude index m, and longitude - if t0 > t1: + # (people pass int+int or datetime+datetime) + if t0 > t1: # type: ignore lat = lat_even nl = common.cprNL(lat_even) ni = max(common.cprNL(lat_even) - 0, 1) @@ -72,17 +85,19 @@ def surface_position(msg0, msg1, t0, t1, lat_ref, lon_ref): lons = [lon, lon + 90, lon + 180, lon + 270] # make sure lons are between -180 and 180 - lons = [(l + 180) % 360 - 180 for l in lons] + lons = [(lon + 180) % 360 - 180 for lon in lons] # the closest solution to receiver is the correct one - dls = [abs(lon_ref - l) for l in lons] + dls = [abs(lon_ref - lon) for lon in lons] imin = min(range(4), key=dls.__getitem__) lon = lons[imin] return round(lat, 5), round(lon, 5) -def surface_position_with_ref(msg, lat_ref, lon_ref): +def surface_position_with_ref( + msg: str, lat_ref: float, lon_ref: float +) -> tuple[float, float]: """Decode surface position with only one message, knowing reference nearby location, such as previously calculated location, ground station, or airport location, etc. The reference position shall @@ -127,16 +142,19 @@ def surface_position_with_ref(msg, lat_ref, lon_ref): return round(lat, 5), round(lon, 5) -def surface_velocity(msg, source=False): +def surface_velocity( + msg: str, source: bool = False +) -> tuple[None | float, float, int, str]: """Decode surface velocity from a surface position message Args: msg (str): 28 hexdigits string - source (boolean): Include direction and vertical rate sources in return. Default to False. - If set to True, the function will return six values instead of four. + source (boolean): Include direction and vertical rate sources in return. + Default to False. + If set to True, the function will return six value instead of four. Returns: - int, float, int, string, [string], [string]: Four or six parameters, including: + int, float, int, string, [string], [string]: - Speed (kt) - Angle (degree), ground track - Vertical rate, always 0 @@ -145,7 +163,8 @@ def surface_velocity(msg, source=False): - [Optional] Vertical rate source (None) """ - if common.typecode(msg) < 5 or common.typecode(msg) > 8: + tc = common.typecode(msg) + if tc is None or tc < 5 or tc > 8: raise RuntimeError("%s: Not a surface message, expecting 5 124: spd = None elif mov == 1: - spd = 0 + spd = 0.0 elif mov == 124: - spd = 175 + spd = 175.0 else: mov_lb = [2, 9, 13, 39, 94, 109, 124] - kts_lb = [0.125, 1, 2, 15, 70, 100, 175] - step = [0.125, 0.25, 0.5, 1, 2, 5] + kts_lb: list[float] = [0.125, 1, 2, 15, 70, 100, 175] + step: list[float] = [0.125, 0.25, 0.5, 1, 2, 5] i = next(m[0] for m in enumerate(mov_lb) if m[1] > mov) spd = kts_lb[i - 1] + (mov - mov_lb[i - 1]) * step[i - 1] + if source: - return spd, trk, 0, "GS", "TRUE_NORTH", None + return spd, trk, 0, "GS", "TRUE_NORTH", None # type: ignore else: return spd, trk, 0, "GS" diff --git a/pyModeS/decoder/bds/bds08.py b/pyModeS/decoder/bds/bds08.py index b62d909..82402b1 100644 --- a/pyModeS/decoder/bds/bds08.py +++ b/pyModeS/decoder/bds/bds08.py @@ -4,10 +4,10 @@ # Aircraft identification and category # ------------------------------------------ -from pyModeS import common +from ... import common -def category(msg): +def category(msg: str) -> int: """Aircraft category number Args: @@ -17,7 +17,8 @@ def category(msg): int: category number """ - if common.typecode(msg) < 1 or common.typecode(msg) > 4: + tc = common.typecode(msg) + if tc is None or tc < 1 or tc > 4: raise RuntimeError("%s: Not a identification message" % msg) msgbin = common.hex2bin(msg) @@ -25,7 +26,7 @@ def category(msg): return common.bin2int(mebin[5:8]) -def callsign(msg): +def callsign(msg: str) -> str: """Aircraft callsign Args: @@ -34,8 +35,9 @@ def callsign(msg): Returns: string: callsign """ + tc = common.typecode(msg) - if common.typecode(msg) < 1 or common.typecode(msg) > 4: + if tc is None or tc < 1 or tc > 4: raise RuntimeError("%s: Not a identification message" % msg) chars = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######" diff --git a/pyModeS/decoder/bds/bds09.py b/pyModeS/decoder/bds/bds09.py index 737f31a..e462feb 100644 --- a/pyModeS/decoder/bds/bds09.py +++ b/pyModeS/decoder/bds/bds09.py @@ -1,25 +1,29 @@ # ------------------------------------------ # BDS 0,9 # ADS-B TC=19 -# Aircraft Airborn velocity +# Aircraft Airborne velocity # ------------------------------------------ -from pyModeS import common - +from __future__ import annotations import math +from ... import common + -def airborne_velocity(msg, source=False): +def airborne_velocity( + msg: str, source: bool = False +) -> None | tuple[None | int, None | float, None | int, str]: """Decode airborne velocity. Args: msg (str): 28 hexdigits string - source (boolean): Include direction and vertical rate sources in return. Default to False. - If set to True, the function will return six values instead of four. + source (boolean): Include direction and vertical rate sources in return. + Default to False. + If set to True, the function will return six value instead of four. Returns: - int, float, int, string, [string], [string]: Four or six parameters, including: + int, float, int, string, [string], [string]: - Speed (kt) - Angle (degree), either ground track or heading - Vertical rate (ft/min) @@ -29,12 +33,20 @@ def airborne_velocity(msg, source=False): """ if common.typecode(msg) != 19: - raise RuntimeError("%s: Not a airborne velocity message, expecting TC=19" % msg) + raise RuntimeError( + "%s: Not a airborne velocity message, expecting TC=19" % msg + ) mb = common.hex2bin(msg)[32:] subtype = common.bin2int(mb[5:8]) + if common.bin2int(mb[14:24]) == 0 or common.bin2int(mb[25:35]) == 0: + return None + + trk_or_hdg: None | float + spd: None | float + if subtype in (1, 2): v_ew = common.bin2int(mb[14:24]) @@ -80,11 +92,9 @@ def airborne_velocity(msg, source=False): trk_or_hdg = hdg spd = common.bin2int(mb[25:35]) - - if subtype == 4: # Supersonic - spd *= 4 - spd = None if spd == 0 else spd - 1 + if subtype == 4 and spd is not None: # Supersonic + spd *= 4 if mb[24] == "0": spd_type = "IAS" @@ -99,13 +109,20 @@ def airborne_velocity(msg, source=False): vs = None if vr == 0 else int(vr_sign * (vr - 1) * 64) if source: - return spd, trk_or_hdg, vs, spd_type, dir_type, vr_source + return ( # type: ignore + spd, + trk_or_hdg, + vs, + spd_type, + dir_type, + vr_source, + ) else: return spd, trk_or_hdg, vs, spd_type -def altitude_diff(msg): - """Decode the difference between GNSS and barometric altitude. +def altitude_diff(msg: str) -> None | float: + """Decode the differece between GNSS and barometric altitude. Args: msg (str): 28 hexdigits string, TC=19 @@ -117,8 +134,10 @@ def altitude_diff(msg): """ tc = common.typecode(msg) - if tc != 19: - raise RuntimeError("%s: Not a airborne velocity message, expecting TC=19" % msg) + if tc is None or tc != 19: + raise RuntimeError( + "%s: Not a airborne velocity message, expecting TC=19" % msg + ) msgbin = common.hex2bin(msg) sign = -1 if int(msgbin[80]) else 1 diff --git a/pyModeS/decoder/bds/bds10.py b/pyModeS/decoder/bds/bds10.py index b903bcb..b234ff1 100644 --- a/pyModeS/decoder/bds/bds10.py +++ b/pyModeS/decoder/bds/bds10.py @@ -3,10 +3,11 @@ # Data link capability report # ------------------------------------------ -from pyModeS import common +from ... import common -def is10(msg): + +def is10(msg: str) -> bool: """Check if a message is likely to be BDS code 1,0 Args: @@ -38,7 +39,7 @@ def is10(msg): return True -def ovc10(msg): +def ovc10(msg: str) -> int: """Return the overlay control capability Args: diff --git a/pyModeS/decoder/bds/bds17.py b/pyModeS/decoder/bds/bds17.py index 80e8bbd..56b4853 100644 --- a/pyModeS/decoder/bds/bds17.py +++ b/pyModeS/decoder/bds/bds17.py @@ -3,10 +3,12 @@ # Common usage GICB capability report # ------------------------------------------ -from pyModeS import common +from typing import List +from ... import common -def is17(msg): + +def is17(msg: str) -> bool: """Check if a message is likely to be BDS code 1,7 Args: @@ -38,7 +40,7 @@ def is17(msg): return True -def cap17(msg): +def cap17(msg: str) -> List[str]: """Extract capacities from BDS 1,7 message Args: diff --git a/pyModeS/decoder/bds/bds20.py b/pyModeS/decoder/bds/bds20.py index ce60217..8ee5513 100644 --- a/pyModeS/decoder/bds/bds20.py +++ b/pyModeS/decoder/bds/bds20.py @@ -3,10 +3,10 @@ # Aircraft identification # ------------------------------------------ -from pyModeS import common +from ... import common -def is20(msg): +def is20(msg: str) -> bool: """Check if a message is likely to be BDS code 2,0 Args: @@ -34,7 +34,7 @@ def is20(msg): return True -def cs20(msg): +def cs20(msg: str) -> str: """Aircraft callsign Args: diff --git a/pyModeS/decoder/bds/bds30.py b/pyModeS/decoder/bds/bds30.py index 7270d3c..da23f34 100644 --- a/pyModeS/decoder/bds/bds30.py +++ b/pyModeS/decoder/bds/bds30.py @@ -3,11 +3,11 @@ # ACAS active resolution advisory # ------------------------------------------ -from pyModeS import common +from ... import common -def is30(msg): - """Check if a message is likely to be BDS code 2,0 +def is30(msg: str) -> bool: + """Check if a message is likely to be BDS code 3,0 Args: msg (str): 28 hexdigits string diff --git a/pyModeS/decoder/bds/bds40.py b/pyModeS/decoder/bds/bds40.py index 5a145e7..66f6874 100644 --- a/pyModeS/decoder/bds/bds40.py +++ b/pyModeS/decoder/bds/bds40.py @@ -4,10 +4,12 @@ # ------------------------------------------ import warnings -from pyModeS import common +from typing import Optional +from ... import common -def is40(msg): + +def is40(msg: str) -> bool: """Check if a message is likely to be BDS code 4,0 Args: @@ -50,7 +52,7 @@ def is40(msg): return True -def selalt40mcp(msg): +def selalt40mcp(msg: str) -> Optional[int]: """Selected altitude, MCP/FCU Args: @@ -68,7 +70,7 @@ def selalt40mcp(msg): return alt -def selalt40fms(msg): +def selalt40fms(msg: str) -> Optional[int]: """Selected altitude, FMS Args: @@ -86,7 +88,7 @@ def selalt40fms(msg): return alt -def p40baro(msg): +def p40baro(msg: str) -> Optional[float]: """Barometric pressure setting Args: @@ -104,17 +106,19 @@ def p40baro(msg): return p -def alt40mcp(msg): +def alt40mcp(msg: str) -> Optional[int]: warnings.warn( - "alt40mcp() has been renamed to selalt40mcp(). It will be removed in the future.", + """alt40mcp() has been renamed to selalt40mcp(). + It will be removed in the future.""", DeprecationWarning, ) return selalt40mcp(msg) -def alt40fms(msg): +def alt40fms(msg: str) -> Optional[int]: warnings.warn( - "alt40fms() has been renamed to selalt40fms(). It will be removed in the future.", + """alt40fms() has been renamed to selalt40fms(). + It will be removed in the future.""", DeprecationWarning, ) return selalt40fms(msg) diff --git a/pyModeS/decoder/bds/bds44.py b/pyModeS/decoder/bds/bds44.py index 351f13e..ea86210 100644 --- a/pyModeS/decoder/bds/bds44.py +++ b/pyModeS/decoder/bds/bds44.py @@ -3,10 +3,12 @@ # Meteorological routine air report # ------------------------------------------ -from pyModeS import common +from typing import Optional, Tuple +from ... import common -def is44(msg): + +def is44(msg: str) -> bool: """Check if a message is likely to be BDS code 4,4. Meteorological routine air report @@ -51,7 +53,7 @@ def is44(msg): return True -def wind44(msg): +def wind44(msg: str) -> Tuple[Optional[int], Optional[float]]: """Wind speed and direction. Args: @@ -73,7 +75,7 @@ def wind44(msg): return round(speed, 0), round(direction, 1) -def temp44(msg): +def temp44(msg: str) -> Tuple[float, float]: """Static air temperature. Args: @@ -102,7 +104,7 @@ def temp44(msg): return temp, temp_alternative -def p44(msg): +def p44(msg: str) -> Optional[int]: """Static pressure. Args: @@ -122,7 +124,7 @@ def p44(msg): return p -def hum44(msg): +def hum44(msg: str) -> Optional[float]: """humidity Args: @@ -141,7 +143,7 @@ def hum44(msg): return round(hm, 1) -def turb44(msg): +def turb44(msg: str) -> Optional[int]: """Turbulence. Args: diff --git a/pyModeS/decoder/bds/bds45.py b/pyModeS/decoder/bds/bds45.py index 8dca85c..3c955be 100644 --- a/pyModeS/decoder/bds/bds45.py +++ b/pyModeS/decoder/bds/bds45.py @@ -3,10 +3,12 @@ # Meteorological hazard report # ------------------------------------------ -from pyModeS import common +from typing import Optional +from ... import common -def is45(msg): + +def is45(msg: str) -> bool: """Check if a message is likely to be BDS code 4,5. Meteorological hazard report @@ -60,7 +62,7 @@ def is45(msg): return True -def turb45(msg): +def turb45(msg: str) -> Optional[int]: """Turbulence. Args: @@ -78,7 +80,7 @@ def turb45(msg): return turb -def ws45(msg): +def ws45(msg: str) -> Optional[int]: """Wind shear. Args: @@ -96,7 +98,7 @@ def ws45(msg): return ws -def mb45(msg): +def mb45(msg: str) -> Optional[int]: """Microburst. Args: @@ -114,7 +116,7 @@ def mb45(msg): return mb -def ic45(msg): +def ic45(msg: str) -> Optional[int]: """Icing. Args: @@ -132,7 +134,7 @@ def ic45(msg): return ic -def wv45(msg): +def wv45(msg: str) -> Optional[int]: """Wake vortex. Args: @@ -150,7 +152,7 @@ def wv45(msg): return ws -def temp45(msg): +def temp45(msg: str) -> Optional[float]: """Static air temperature. Args: @@ -174,7 +176,7 @@ def temp45(msg): return temp -def p45(msg): +def p45(msg: str) -> Optional[int]: """Average static pressure. Args: @@ -191,7 +193,7 @@ def p45(msg): return p -def rh45(msg): +def rh45(msg: str) -> Optional[int]: """Radio height. Args: diff --git a/pyModeS/decoder/bds/bds50.py b/pyModeS/decoder/bds/bds50.py index 62dd186..b5e10e0 100644 --- a/pyModeS/decoder/bds/bds50.py +++ b/pyModeS/decoder/bds/bds50.py @@ -3,10 +3,12 @@ # Track and turn report # ------------------------------------------ -from pyModeS import common +from typing import Optional +from ... import common -def is50(msg): + +def is50(msg: str) -> bool: """Check if a message is likely to be BDS code 5,0 (Track and turn report) @@ -57,7 +59,7 @@ def is50(msg): return True -def roll50(msg): +def roll50(msg: str) -> Optional[float]: """Roll angle, BDS 5,0 message Args: @@ -82,7 +84,7 @@ def roll50(msg): return round(angle, 1) -def trk50(msg): +def trk50(msg: str) -> Optional[float]: """True track angle, BDS 5,0 message Args: @@ -111,7 +113,7 @@ def trk50(msg): return round(trk, 3) -def gs50(msg): +def gs50(msg: str) -> Optional[float]: """Ground speed, BDS 5,0 message Args: @@ -129,7 +131,7 @@ def gs50(msg): return spd -def rtrk50(msg): +def rtrk50(msg: str) -> Optional[float]: """Track angle rate, BDS 5,0 message Args: @@ -155,7 +157,7 @@ def rtrk50(msg): return round(angle, 3) -def tas50(msg): +def tas50(msg: str) -> Optional[float]: """Aircraft true airspeed, BDS 5,0 message Args: diff --git a/pyModeS/decoder/bds/bds53.py b/pyModeS/decoder/bds/bds53.py index d7c47b9..00847dc 100644 --- a/pyModeS/decoder/bds/bds53.py +++ b/pyModeS/decoder/bds/bds53.py @@ -3,10 +3,12 @@ # Air-referenced state vector # ------------------------------------------ -from pyModeS import common +from typing import Optional +from ... import common -def is53(msg): + +def is53(msg: str) -> bool: """Check if a message is likely to be BDS code 5,3 (Air-referenced state vector) @@ -58,7 +60,7 @@ def is53(msg): return True -def hdg53(msg): +def hdg53(msg: str) -> Optional[float]: """Magnetic heading, BDS 5,3 message Args: @@ -87,7 +89,7 @@ def hdg53(msg): return round(hdg, 3) -def ias53(msg): +def ias53(msg: str) -> Optional[float]: """Indicated airspeed, DBS 5,3 message Args: @@ -105,7 +107,7 @@ def ias53(msg): return ias -def mach53(msg): +def mach53(msg: str) -> Optional[float]: """MACH number, DBS 5,3 message Args: @@ -123,7 +125,7 @@ def mach53(msg): return round(mach, 3) -def tas53(msg): +def tas53(msg: str) -> Optional[float]: """Aircraft true airspeed, BDS 5,3 message Args: @@ -141,7 +143,7 @@ def tas53(msg): return round(tas, 1) -def vr53(msg): +def vr53(msg: str) -> Optional[int]: """Vertical rate Args: diff --git a/pyModeS/decoder/bds/bds60.py b/pyModeS/decoder/bds/bds60.py index 1af3882..98c9a79 100644 --- a/pyModeS/decoder/bds/bds60.py +++ b/pyModeS/decoder/bds/bds60.py @@ -3,11 +3,13 @@ # Heading and speed report # ------------------------------------------ -from pyModeS import common -from pyModeS.extra import aero +from typing import Optional +from ... import common +from ...extra import aero -def is60(msg): + +def is60(msg: str) -> bool: """Check if a message is likely to be BDS code 6,0 Args: @@ -66,7 +68,7 @@ def is60(msg): return True -def hdg60(msg): +def hdg60(msg: str) -> Optional[float]: """Megnetic heading of aircraft Args: @@ -92,10 +94,10 @@ def hdg60(msg): if hdg < 0: hdg = 360 + hdg - return round(hdg, 3) + return hdg -def ias60(msg): +def ias60(msg: str) -> Optional[float]: """Indicated airspeed Args: @@ -113,7 +115,7 @@ def ias60(msg): return ias -def mach60(msg): +def mach60(msg: str) -> Optional[float]: """Aircraft MACH number Args: @@ -128,10 +130,10 @@ def mach60(msg): return None mach = common.bin2int(d[24:34]) * 2.048 / 512.0 - return round(mach, 3) + return mach -def vr60baro(msg): +def vr60baro(msg: str) -> Optional[int]: """Vertical rate from barometric measurement, this value may be very noisy. Args: @@ -157,8 +159,8 @@ def vr60baro(msg): return roc -def vr60ins(msg): - """Vertical rate measured by onboard equipment (IRS, AHRS) +def vr60ins(msg: str) -> Optional[int]: + """Vertical rate measurd by onbard equiments (IRS, AHRS) Args: msg (str): 28 hexdigits string diff --git a/pyModeS/decoder/bds/bds61.py b/pyModeS/decoder/bds/bds61.py index 22e7343..e71e018 100644 --- a/pyModeS/decoder/bds/bds61.py +++ b/pyModeS/decoder/bds/bds61.py @@ -4,7 +4,7 @@ # Aircraft Airborne status # ------------------------------------------ -from pyModeS import common +from ... import common def is_emergency(msg: str) -> bool: @@ -18,7 +18,9 @@ def is_emergency(msg: str) -> bool: :return: if the aircraft has declared an emergency """ if common.typecode(msg) != 28: - raise RuntimeError("%s: Not an airborne status message, expecting TC=28" % msg) + raise RuntimeError( + "%s: Not an airborne status message, expecting TC=28" % msg + ) mb = common.hex2bin(msg)[32:] subtype = common.bin2int(mb[5:8]) @@ -72,7 +74,9 @@ def emergency_squawk(msg: str) -> str: :return: aircraft squawk code """ if common.typecode(msg) != 28: - raise RuntimeError("%s: Not an airborne status message, expecting TC=28" % msg) + raise RuntimeError( + "%s: Not an airborne status message, expecting TC=28" % msg + ) msgbin = common.hex2bin(msg) diff --git a/pyModeS/decoder/bds/bds62.py b/pyModeS/decoder/bds/bds62.py index 2d070d7..4b822de 100644 --- a/pyModeS/decoder/bds/bds62.py +++ b/pyModeS/decoder/bds/bds62.py @@ -5,10 +5,11 @@ # ------------------------------------------ from __future__ import annotations -from pyModeS import common +from ... import common -def selected_altitude(msg): + +def selected_altitude(msg: str) -> tuple[None | float, str]: """Decode selected altitude. Args: @@ -31,18 +32,20 @@ def selected_altitude(msg): if subtype == 0: raise RuntimeError( - "%s: ADS-B version 1 target state and status message does not contain selected altitude, use target altitude instead" - % msg + "%s: ADS-B version 1 target state and status message does not" + " contain selected altitude, use target altitude instead" % msg ) alt = common.bin2int(mb[9:20]) - alt = None if alt == 0 else (alt - 1) * 32 + if alt == 0: + return None, "N/A" + alt = (alt - 1) * 32 alt_source = "MCP/FCU" if int(mb[8]) == 0 else "FMS" return alt, alt_source -def target_altitude(msg): +def target_altitude(msg: str) -> tuple[None | int, str, str]: """Decode target altitude. Args: @@ -51,7 +54,8 @@ def target_altitude(msg): Returns: int: Target altitude (ft) string: Source ('MCP/FCU', 'Holding mode' or 'FMS/RNAV') - string: Altitude reference, either pressure altitude or barometric corrected altitude ('FL' or 'MSL') + string: Altitude reference, either pressure altitude or barometric + corrected altitude ('FL' or 'MSL') """ @@ -66,13 +70,13 @@ def target_altitude(msg): if subtype == 1: raise RuntimeError( - "%s: ADS-B version 2 target state and status message does not contain target altitude, use selected altitude instead" - % msg + "%s: ADS-B version 2 target state and status message does not" + " contain target altitude, use selected altitude instead" % msg ) alt_avail = common.bin2int(mb[7:9]) if alt_avail == 0: - return None + return None, "N/A", "" elif alt_avail == 1: alt_source = "MCP/FCU" elif alt_avail == 2: @@ -87,7 +91,7 @@ def target_altitude(msg): return alt, alt_source, alt_ref -def vertical_mode(msg): +def vertical_mode(msg: str) -> None | int: """Decode vertical mode. Value Meaning @@ -115,8 +119,8 @@ def vertical_mode(msg): if subtype == 1: raise RuntimeError( - "%s: ADS-B version 2 target state and status message does not contain vertical mode, use vnav mode instead" - % msg + "%s: ADS-B version 2 target state and status message does not" + " contain vertical mode, use vnav mode instead" % msg ) vertical_mode = common.bin2int(mb[13:15]) @@ -126,7 +130,7 @@ def vertical_mode(msg): return vertical_mode -def horizontal_mode(msg): +def horizontal_mode(msg: str) -> None | int: """Decode horizontal mode. Value Meaning @@ -154,8 +158,8 @@ def horizontal_mode(msg): if subtype == 1: raise RuntimeError( - "%s: ADS-B version 2 target state and status message does not contain horizontal mode, use lnav mode instead" - % msg + "%s: ADS-B version 2 target state and status message does not " + "contain horizontal mode, use lnav mode instead" % msg ) horizontal_mode = common.bin2int(mb[25:27]) @@ -165,7 +169,7 @@ def horizontal_mode(msg): return horizontal_mode -def selected_heading(msg): +def selected_heading(msg: str) -> None | float: """Decode selected heading. Args: @@ -187,12 +191,12 @@ def selected_heading(msg): if subtype == 0: raise RuntimeError( - "%s: ADS-B version 1 target state and status message does not contain selected heading, use target angle instead" - % msg + "%s: ADS-B version 1 target state and status message does not " + "contain selected heading, use target angle instead" % msg ) if int(mb[29]) == 0: - hdg = None + return None else: hdg_sign = int(mb[30]) hdg = (hdg_sign + 1) * common.bin2int(mb[31:39]) * (180 / 256) @@ -201,7 +205,7 @@ def selected_heading(msg): return hdg -def target_angle(msg): +def target_angle(msg: str) -> tuple[None | int, str, str]: """Decode target heading/track angle. Args: @@ -225,13 +229,13 @@ def target_angle(msg): if subtype == 1: raise RuntimeError( - "%s: ADS-B version 2 target state and status message does not contain target angle, use selected heading instead" - % msg + "%s: ADS-B version 2 target state and status message does not " + "contain target angle, use selected heading instead" % msg ) angle_avail = common.bin2int(mb[25:27]) if angle_avail == 0: - angle = None + return None, "", "N/A" else: angle = common.bin2int(mb[27:36]) @@ -247,7 +251,7 @@ def target_angle(msg): return angle, angle_type, angle_source -def baro_pressure_setting(msg): +def baro_pressure_setting(msg: str) -> None | float: """Decode barometric pressure setting. Args: @@ -269,14 +273,15 @@ def baro_pressure_setting(msg): if subtype == 0: raise RuntimeError( - "%s: ADS-B version 1 target state and status message does not contain barometric pressure setting" - % msg + "%s: ADS-B version 1 target state and status message does not " + "contain barometric pressure setting" % msg ) baro = common.bin2int(mb[20:29]) - baro = None if baro == 0 else round(800 + (baro - 1) * 0.8, 1) + if baro == 0: + return None - return baro + return 800 + (baro - 1) * 0.8 def autopilot(msg) -> None | bool: @@ -301,8 +306,8 @@ def autopilot(msg) -> None | bool: if subtype == 0: raise RuntimeError( - "%s: ADS-B version 1 target state and status message does not contain autopilot engagement" - % msg + "%s: ADS-B version 1 target state and status message does not " + "contain autopilot engagement" % msg ) if int(mb[46]) == 0: @@ -335,8 +340,8 @@ def vnav_mode(msg) -> None | bool: if subtype == 0: raise RuntimeError( - "%s: ADS-B version 1 target state and status message does not contain vnav mode, use vertical mode instead" - % msg + "%s: ADS-B version 1 target state and status message does not " + "contain vnav mode, use vertical mode instead" % msg ) if int(mb[46]) == 0: @@ -369,8 +374,8 @@ def altitude_hold_mode(msg) -> None | bool: if subtype == 0: raise RuntimeError( - "%s: ADS-B version 1 target state and status message does not contain altitude hold mode" - % msg + "%s: ADS-B version 1 target state and status message does not " + "contain altitude hold mode" % msg ) if int(mb[46]) == 0: @@ -403,8 +408,8 @@ def approach_mode(msg) -> None | bool: if subtype == 0: raise RuntimeError( - "%s: ADS-B version 1 target state and status message does not contain approach mode" - % msg + "%s: ADS-B version 1 target state and status message does not " + "contain approach mode" % msg ) if int(mb[46]) == 0: @@ -437,8 +442,8 @@ def lnav_mode(msg) -> None | bool: if subtype == 0: raise RuntimeError( - "%s: ADS-B version 1 target state and status message does not contain lnav mode, use horizontal mode instead" - % msg + "%s: ADS-B version 1 target state and status message does not " + "contain lnav mode, use horizontal mode instead" % msg ) if int(mb[46]) == 0: @@ -499,8 +504,8 @@ def tcas_ra(msg) -> bool: if subtype == 1: raise RuntimeError( - "%s: ADS-B version 2 target state and status message does not contain TCAS/ACAS RA" - % msg + "%s: ADS-B version 2 target state and status message does not " + "contain TCAS/ACAS RA" % msg ) tcas_ra = True if int(mb[52]) == 1 else False @@ -541,8 +546,8 @@ def emergency_status(msg) -> int: if subtype == 1: raise RuntimeError( - "%s: ADS-B version 2 target state and status message does not contain emergency status" - % msg + "%s: ADS-B version 2 target state and status message does not " + "contain emergency status" % msg ) return common.bin2int(mb[53:56]) diff --git a/pyModeS/decoder/commb.py b/pyModeS/decoder/commb.py index d49d293..3a214c6 100644 --- a/pyModeS/decoder/commb.py +++ b/pyModeS/decoder/commb.py @@ -23,18 +23,66 @@ MRAR and MHR """ # ELS - elementary surveillance -from pyModeS.decoder.bds.bds10 import * -from pyModeS.decoder.bds.bds17 import * -from pyModeS.decoder.bds.bds20 import * -from pyModeS.decoder.bds.bds30 import * +from .bds.bds10 import is10, ovc10 +from .bds.bds17 import is17, cap17 +from .bds.bds20 import is20, cs20 +from .bds.bds30 import is30 # ELS - enhanced surveillance -from pyModeS.decoder.bds.bds40 import * -from pyModeS.decoder.bds.bds50 import * -from pyModeS.decoder.bds.bds60 import * +from .bds.bds40 import ( + is40, + selalt40fms, + selalt40mcp, + p40baro, + alt40fms, + alt40mcp, +) +from .bds.bds50 import is50, roll50, trk50, gs50, rtrk50, tas50 +from .bds.bds60 import is60, hdg60, ias60, mach60, vr60baro, vr60ins # MRAR and MHR -from pyModeS.decoder.bds.bds44 import * -from pyModeS.decoder.bds.bds45 import * +from .bds.bds44 import is44, wind44, temp44, p44, hum44, turb44 +from .bds.bds45 import is45, turb45, ws45, mb45, ic45, wv45, temp45, p45, rh45 -from pyModeS.py_common import fs, dr, um +__all__ = [ + "is10", + "ovc10", + "is17", + "cap17", + "is20", + "cs20", + "is30", + "is40", + "selalt40fms", + "selalt40mcp", + "p40baro", + "alt40fms", + "alt40mcp", + "is50", + "roll50", + "trk50", + "gs50", + "rtrk50", + "tas50", + "is60", + "hdg60", + "ias60", + "mach60", + "vr60baro", + "vr60ins", + "is44", + "wind44", + "temp44", + "p44", + "hum44", + "turb44", + "is45", + "turb45", + "ws45", + "mb45", + "ic45", + "wv45", + "temp45", + "p45", + "rh45", +] diff --git a/pyModeS/decoder/ehs.py b/pyModeS/decoder/ehs.py index 8533f7f..849bef5 100644 --- a/pyModeS/decoder/ehs.py +++ b/pyModeS/decoder/ehs.py @@ -11,20 +11,51 @@ The EHS wrapper imports all functions from the following modules: import warnings -from pyModeS.decoder.bds.bds40 import * -from pyModeS.decoder.bds.bds50 import * -from pyModeS.decoder.bds.bds60 import * -from pyModeS.decoder.bds import infer +from .bds.bds40 import ( + is40, + selalt40fms, + selalt40mcp, + p40baro, + alt40fms, + alt40mcp, +) +from .bds.bds50 import is50, roll50, trk50, gs50, rtrk50, tas50 +from .bds.bds60 import is60, hdg60, ias60, mach60, vr60baro, vr60ins +from .bds import infer + +__all__ = [ + "is40", + "selalt40fms", + "selalt40mcp", + "p40baro", + "alt40fms", + "alt40mcp", + "is50", + "roll50", + "trk50", + "gs50", + "rtrk50", + "tas50", + "is60", + "hdg60", + "ias60", + "mach60", + "vr60baro", + "vr60ins", + "infer", +] warnings.simplefilter("once", DeprecationWarning) warnings.warn( - "pms.ehs module is deprecated. Please use pms.commb instead.", DeprecationWarning + "pms.ehs module is deprecated. Please use pms.commb instead.", + DeprecationWarning, ) def BDS(msg): warnings.warn( - "pms.ehs.BDS() is deprecated, use pms.bds.infer() instead.", DeprecationWarning + "pms.ehs.BDS() is deprecated, use pms.bds.infer() instead.", + DeprecationWarning, ) return infer(msg) diff --git a/pyModeS/decoder/els.py b/pyModeS/decoder/els.py index 9c26b93..be25c1b 100644 --- a/pyModeS/decoder/els.py +++ b/pyModeS/decoder/els.py @@ -10,14 +10,26 @@ The ELS wrapper imports all functions from the following modules: """ -from pyModeS.decoder.bds.bds10 import * -from pyModeS.decoder.bds.bds17 import * -from pyModeS.decoder.bds.bds20 import * -from pyModeS.decoder.bds.bds30 import * - import warnings +from .bds.bds10 import is10, ovc10 +from .bds.bds17 import cap17, is17 +from .bds.bds20 import cs20, is20 +from .bds.bds30 import is30 + warnings.simplefilter("once", DeprecationWarning) warnings.warn( - "pms.els module is deprecated. Please use pms.commb instead.", DeprecationWarning + "pms.els module is deprecated. Please use pms.commb instead.", + DeprecationWarning, ) + + +__all__ = [ + "is10", + "ovc10", + "is17", + "cap17", + "is20", + "cs20", + "is30", +] diff --git a/pyModeS/decoder/surv.py b/pyModeS/decoder/surv.py index 4e54614..d1dbf3a 100644 --- a/pyModeS/decoder/surv.py +++ b/pyModeS/decoder/surv.py @@ -2,14 +2,19 @@ Decode short roll call surveillance replies, with downlink format 4 or 5 """ -from pyModeS import common -from pyModeS.py_common import fs, dr, um +from __future__ import annotations +from typing import Callable, TypeVar +from .. import common -def _checkdf(func): +T = TypeVar("T") +F = Callable[[str], T] + + +def _checkdf(func: F[T]) -> F[T]: """Ensure downlink format is 4 or 5.""" - def wrapper(msg): + def wrapper(msg: str) -> T: df = common.df(msg) if df not in [4, 5]: raise RuntimeError( @@ -21,7 +26,92 @@ def _checkdf(func): @_checkdf -def altitude(msg): +def fs(msg: str) -> tuple[int, str]: + """Decode flight status. + + Args: + msg (str): 14 hexdigits string + Returns: + int, str: flight status, description + + """ + msgbin = common.hex2bin(msg) + fs = common.bin2int(msgbin[5:8]) + text = "" + + if fs == 0: + text = "no alert, no SPI, aircraft is airborne" + elif fs == 1: + text = "no alert, no SPI, aircraft is on-ground" + elif fs == 2: + text = "alert, no SPI, aircraft is airborne" + elif fs == 3: + text = "alert, no SPI, aircraft is on-ground" + elif fs == 4: + text = "alert, SPI, aircraft is airborne or on-ground" + elif fs == 5: + text = "no alert, SPI, aircraft is airborne or on-ground" + + return fs, text + + +@_checkdf +def dr(msg: str) -> tuple[int, str]: + """Decode downlink request. + + Args: + msg (str): 14 hexdigits string + Returns: + int, str: downlink request, description + + """ + msgbin = common.hex2bin(msg) + dr = common.bin2int(msgbin[8:13]) + + text = "" + + if dr == 0: + text = "no downlink request" + elif dr == 1: + text = "request to send Comm-B message" + elif dr == 4: + text = "Comm-B broadcast 1 available" + elif dr == 5: + text = "Comm-B broadcast 2 available" + elif dr >= 16: + text = "ELM downlink segments available: {}".format(dr - 15) + + return dr, text + + +@_checkdf +def um(msg: str) -> tuple[int, int, None | str]: + """Decode utility message. + + Utility message contains interrogator identifier and reservation type. + + Args: + msg (str): 14 hexdigits string + Returns: + int, str: interrogator identifier code that triggered the reply, and + reservation type made by the interrogator + """ + msgbin = common.hex2bin(msg) + iis = common.bin2int(msgbin[13:17]) + ids = common.bin2int(msgbin[17:19]) + if ids == 0: + ids_text = None + if ids == 1: + ids_text = "Comm-B interrogator identifier code" + if ids == 2: + ids_text = "Comm-C interrogator identifier code" + if ids == 3: + ids_text = "Comm-D interrogator identifier code" + return iis, ids, ids_text + + +@_checkdf +def altitude(msg: str) -> None | int: """Decode altitude. Args: @@ -35,7 +125,7 @@ def altitude(msg): @_checkdf -def identity(msg): +def identity(msg: str) -> str: """Decode squawk code. Args: diff --git a/pyModeS/decoder/uncertainty.py b/pyModeS/decoder/uncertainty.py index 2a923ab..122603a 100644 --- a/pyModeS/decoder/uncertainty.py +++ b/pyModeS/decoder/uncertainty.py @@ -1,8 +1,11 @@ """Uncertainty parameters. -See source code at: https://github.com/junzis/pyModeS/blob/master/pyModeS/decoder/uncertainty.py """ +from __future__ import annotations + +from typing import TypedDict + NA = None TC_NUCp_lookup = { @@ -26,7 +29,7 @@ TC_NUCp_lookup = { 22: 0, } -TC_NICv1_lookup = { +TC_NICv1_lookup: dict[int, int | dict[int, int]] = { 5: 11, 6: 10, 7: 9, @@ -46,7 +49,7 @@ TC_NICv1_lookup = { 22: 0, } -TC_NICv2_lookup = { +TC_NICv2_lookup: dict[int, int | dict[int, int]] = { 5: 11, 6: 10, 7: {2: 9, 0: 8}, @@ -67,20 +70,32 @@ TC_NICv2_lookup = { } -NUCp = { - 9: {"HPL": 7.5, "RCu": 3}, - 8: {"HPL": 25, "RCu": 10}, - 7: {"HPL": 185, "RCu": 93}, - 6: {"HPL": 370, "RCu": 185}, - 5: {"HPL": 926, "RCu": 463}, - 4: {"HPL": 1852, "RCu": 926}, - 3: {"HPL": 3704, "RCu": 1852}, - 2: {"HPL": 18520, "RCu": 9260}, - 1: {"HPL": 37040, "RCu": 18520}, - 0: {"HPL": NA, "RCu": NA}, +class NUCpEntry(TypedDict): + HPL: None | float + RCu: None | int + RCv: None | int + + +NUCp: dict[int, NUCpEntry] = { + 9: {"HPL": 7.5, "RCu": 3, "RCv": 4}, + 8: {"HPL": 25, "RCu": 10, "RCv": 15}, + 7: {"HPL": 185, "RCu": 93, "RCv": NA}, + 6: {"HPL": 370, "RCu": 185, "RCv": NA}, + 5: {"HPL": 926, "RCu": 463, "RCv": NA}, + 4: {"HPL": 1852, "RCu": 926, "RCv": NA}, + 3: {"HPL": 3704, "RCu": 1852, "RCv": NA}, + 2: {"HPL": 18520, "RCu": 9260, "RCv": NA}, + 1: {"HPL": 37040, "RCu": 18520, "RCv": NA}, + 0: {"HPL": NA, "RCu": NA, "RCv": NA}, } -NUCv = { + +class NUCvEntry(TypedDict): + HVE: None | float + VVE: None | float + + +NUCv: dict[int, NUCvEntry] = { 0: {"HVE": NA, "VVE": NA}, 1: {"HVE": 10, "VVE": 15.2}, 2: {"HVE": 3, "VVE": 4.5}, @@ -88,7 +103,13 @@ NUCv = { 4: {"HVE": 0.3, "VVE": 0.46}, } -NACp = { + +class NACpEntry(TypedDict): + EPU: None | int + VEPU: None | int + + +NACp: dict[int, NACpEntry] = { 11: {"EPU": 3, "VEPU": 4}, 10: {"EPU": 10, "VEPU": 15}, 9: {"EPU": 30, "VEPU": 45}, @@ -103,7 +124,13 @@ NACp = { 0: {"EPU": NA, "VEPU": NA}, } -NACv = { + +class NACvEntry(TypedDict): + HFOMr: None | float + VFOMr: None | float + + +NACv: dict[int, NACvEntry] = { 0: {"HFOMr": NA, "VFOMr": NA}, 1: {"HFOMr": 10, "VFOMr": 15.2}, 2: {"HFOMr": 3, "VFOMr": 4.5}, @@ -111,7 +138,13 @@ NACv = { 4: {"HFOMr": 0.3, "VFOMr": 0.46}, } -SIL = { + +class SILEntry(TypedDict): + PE_RCu: None | float + PE_VPL: None | float + + +SIL: dict[int, SILEntry] = { 3: {"PE_RCu": 1e-7, "PE_VPL": 2e-7}, 2: {"PE_RCu": 1e-5, "PE_VPL": 1e-5}, 1: {"PE_RCu": 1e-3, "PE_VPL": 1e-3}, @@ -119,7 +152,12 @@ SIL = { } -NICv1 = { +class NICv1Entry(TypedDict): + Rc: None | float + VPL: None | float + + +NICv1: dict[int, dict[int, NICv1Entry]] = { # NIC is used as the index at second Level 11: {0: {"Rc": 7.5, "VPL": 11}}, 10: {0: {"Rc": 25, "VPL": 37.5}}, @@ -135,7 +173,12 @@ NICv1 = { 0: {0: {"Rc": NA, "VPL": NA}}, } -NICv2 = { + +class NICv2Entry(TypedDict): + Rc: None | float + + +NICv2: dict[int, dict[int, NICv2Entry]] = { # Decimal value of [NICa NICb/NICc] is used as the index at second Level 11: {0: {"Rc": 7.5}}, 10: {0: {"Rc": 25}}, diff --git a/pyModeS/decoder/uplink.py b/pyModeS/decoder/uplink.py index 5afe2d1..fa7bf9d 100644 --- a/pyModeS/decoder/uplink.py +++ b/pyModeS/decoder/uplink.py @@ -1,9 +1,10 @@ -from pyModeS import common +from typing import Optional +from .. import common from textwrap import wrap -def uplink_icao(msg): - """Calculate the ICAO address from a Mode-S interrogation (uplink message)""" +def uplink_icao(msg: str) -> str: + "Calculate the ICAO address from a Mode-S interrogation (uplink message)" p_gen = 0xFFFA0480 << ((len(msg) - 14) * 4) data = int(msg[:-6], 16) PA = int(msg[-6:], 16) @@ -20,22 +21,22 @@ def uplink_icao(msg): return "%06X" % (ad >> 2) -def uf(msg): +def uf(msg: str) -> int: """Decode Uplink Format value, bits 1 to 5.""" ufbin = common.hex2bin(msg[:2]) return min(common.bin2int(ufbin[0:5]), 24) -def bds(msg): - """Decode requested BDS register from selective (Roll Call) interrogation.""" +def bds(msg: str) -> Optional[str]: + "Decode requested BDS register from selective (Roll Call) interrogation." UF = uf(msg) msgbin = common.hex2bin(msg) msgbin_split = wrap(msgbin, 8) mbytes = list(map(common.bin2int, msgbin_split)) - if uf(msg) in {4, 5, 20, 21}: - - di = mbytes[1] & 0x7 # DI - Designator Identification + if UF in {4, 5, 20, 21}: + + di = mbytes[1] & 0x7 # DI - Designator Identification RR = mbytes[1] >> 3 & 0x1F if RR > 15: BDS1 = RR - 16 @@ -46,7 +47,9 @@ def bds(msg): RRS = ((mbytes[2] & 0x1) << 3) | ((mbytes[3] & 0xE0) >> 5) BDS2 = RRS else: - BDS2 = 0 # for other values of DI, the BDS2 is assumed 0 (as per ICAO Annex 10 Vol IV) + # for other values of DI, the BDS2 is assumed 0 + # (as per ICAO Annex 10 Vol IV) + BDS2 = 0 return str(format(BDS1,"X")) + str(format(BDS2,"X")) else: @@ -55,7 +58,7 @@ def bds(msg): return None -def pr(msg): +def pr(msg: str) -> Optional[int]: """Decode PR (probability of reply) field from All Call interrogation. Interpretation: 0 signifies reply with probability of 1 @@ -80,7 +83,7 @@ def pr(msg): return None -def ic(msg): +def ic(msg: str) -> Optional[str]: """Decode IC (interrogator code) from a ground-based interrogation.""" UF = uf(msg) @@ -88,8 +91,7 @@ def ic(msg): msgbin_split = wrap(msgbin, 8) mbytes = list(map(common.bin2int, msgbin_split)) IC = None - BDS2 = "" - if uf(msg) == 11: + if UF == 11: codeLabel = mbytes[1] & 0x7 icField = (mbytes[1] >> 3) & 0xF @@ -104,11 +106,11 @@ def ic(msg): } IC = ic_switcher.get(codeLabel, "") - if uf(msg) in {4, 5, 20, 21}: + if UF in {4, 5, 20, 21}: di = mbytes[1] & 0x7 RR = mbytes[1] >> 3 & 0x1F if RR > 15: - BDS1 = RR - 16 + BDS1 = RR - 16 # noqa: F841 if di == 0 or di == 1 or di == 7: # II II = (mbytes[2] >> 4) & 0xF @@ -148,7 +150,6 @@ def uplink_fields(msg): msgbin_split = wrap(msgbin, 8) mbytes = list(map(common.bin2int, msgbin_split)) PR = "" - LOS = "" IC = "" lockout = False di = "" @@ -156,13 +157,11 @@ def uplink_fields(msg): RRS = "" BDS = "" if uf(msg) == 11: - - # Probability of Reply decoding PR = ((mbytes[0] & 0x7) << 1) | ((mbytes[1] & 0x80) >> 7) - + # Get cl and ic bit fields from the data # Decode the SI or II interrogator code codeLabel = mbytes[1] & 0x7 @@ -179,7 +178,8 @@ def uplink_fields(msg): IC = ic_switcher.get(codeLabel, "") if uf(msg) in {4, 5, 20, 21}: - # Decode the DI and get the lockout information conveniently (LSS or LOS) + # Decode the DI and get the lockout information conveniently + # (LSS or LOS) # DI - Designator Identification di = mbytes[1] & 0x7 diff --git a/tests/sample_run_adsb.py b/tests/sample_run_adsb.py index f0d5134..d561a35 100644 --- a/tests/sample_run_adsb.py +++ b/tests/sample_run_adsb.py @@ -1,11 +1,7 @@ -import sys -import time import csv +import time -if len(sys.argv) > 1 and sys.argv[1] == "cython": - from pyModeS.c_decoder import adsb -else: - from pyModeS.decoder import adsb +from pyModeS.decoder import adsb print("===== Decode ADS-B sample data=====") diff --git a/tests/test_adsb.py b/tests/test_adsb.py index a735023..442b755 100644 --- a/tests/test_adsb.py +++ b/tests/test_adsb.py @@ -43,14 +43,20 @@ def test_adsb_position_with_ref(): def test_adsb_airborne_position_with_ref(): - pos = adsb.airborne_position_with_ref("8D40058B58C901375147EFD09357", 49.0, 6.0) + pos = adsb.airborne_position_with_ref( + "8D40058B58C901375147EFD09357", 49.0, 6.0 + ) assert pos == (49.82410, 6.06785) - pos = adsb.airborne_position_with_ref("8D40058B58C904A87F402D3B8C59", 49.0, 6.0) + pos = adsb.airborne_position_with_ref( + "8D40058B58C904A87F402D3B8C59", 49.0, 6.0 + ) assert pos == (49.81755, 6.08442) def test_adsb_surface_position_with_ref(): - pos = adsb.surface_position_with_ref("8FC8200A3AB8F5F893096B000000", -43.5, 172.5) + pos = adsb.surface_position_with_ref( + "8FC8200A3AB8F5F893096B000000", -43.5, 172.5 + ) assert pos == (-43.48564, 172.53942) @@ -91,13 +97,12 @@ def test_adsb_target_state_status(): assert sel_alt == (16992, "MCP/FCU") assert adsb.baro_pressure_setting("8DA05629EA21485CBF3F8CADAEEB") == 1012.8 assert adsb.selected_heading("8DA05629EA21485CBF3F8CADAEEB") == 66.8 - assert adsb.autopilot("8DA05629EA21485CBF3F8CADAEEB") == True - assert adsb.vnav_mode("8DA05629EA21485CBF3F8CADAEEB") == True - assert adsb.altitude_hold_mode("8DA05629EA21485CBF3F8CADAEEB") == False - assert adsb.approach_mode("8DA05629EA21485CBF3F8CADAEEB") == False - assert adsb.tcas_operational("8DA05629EA21485CBF3F8CADAEEB") == True - assert adsb.lnav_mode("8DA05629EA21485CBF3F8CADAEEB") == True - + assert adsb.autopilot("8DA05629EA21485CBF3F8CADAEEB") is True + assert adsb.vnav_mode("8DA05629EA21485CBF3F8CADAEEB") is True + assert adsb.altitude_hold_mode("8DA05629EA21485CBF3F8CADAEEB") is False + assert adsb.approach_mode("8DA05629EA21485CBF3F8CADAEEB") is False + assert adsb.tcas_operational("8DA05629EA21485CBF3F8CADAEEB") is True + assert adsb.lnav_mode("8DA05629EA21485CBF3F8CADAEEB") is True # def test_nic(): diff --git a/tests/test_commb.py b/tests/test_commb.py index 45cc1ec..d9936fe 100644 --- a/tests/test_commb.py +++ b/tests/test_commb.py @@ -1,4 +1,5 @@ from pyModeS import bds, commb +import pytest # from pyModeS import ehs, els # deprecated @@ -23,7 +24,7 @@ def test_bds40_functions(): def test_bds50_functions(): assert bds.bds50.roll50("A000139381951536E024D4CCF6B5") == 2.1 - assert bds.bds50.roll50("A0001691FFD263377FFCE02B2BF9") == -0.4 # signed value + assert bds.bds50.roll50("A0001691FFD263377FFCE02B2BF9") == -0.4 assert bds.bds50.trk50("A000139381951536E024D4CCF6B5") == 114.258 assert bds.bds50.gs50("A000139381951536E024D4CCF6B5") == 438 assert bds.bds50.rtrk50("A000139381951536E024D4CCF6B5") == 0.125 @@ -38,14 +39,16 @@ def test_bds50_functions(): def test_bds60_functions(): - assert bds.bds60.hdg60("A00004128F39F91A7E27C46ADC21") == 42.715 - assert bds.bds60.ias60("A00004128F39F91A7E27C46ADC21") == 252 - assert bds.bds60.mach60("A00004128F39F91A7E27C46ADC21") == 0.42 - assert bds.bds60.vr60baro("A00004128F39F91A7E27C46ADC21") == -1920 - assert bds.bds60.vr60ins("A00004128F39F91A7E27C46ADC21") == -1920 - - assert commb.hdg60("A00004128F39F91A7E27C46ADC21") == 42.715 - assert commb.ias60("A00004128F39F91A7E27C46ADC21") == 252 - assert commb.mach60("A00004128F39F91A7E27C46ADC21") == 0.42 - assert commb.vr60baro("A00004128F39F91A7E27C46ADC21") == -1920 - assert commb.vr60ins("A00004128F39F91A7E27C46ADC21") == -1920 + msg = "A00004128F39F91A7E27C46ADC21" + + assert bds.bds60.hdg60(msg) == pytest.approx(42.71484) + assert bds.bds60.ias60(msg) == 252 + assert bds.bds60.mach60(msg) == 0.42 + assert bds.bds60.vr60baro(msg) == -1920 + assert bds.bds60.vr60ins(msg) == -1920 + + assert commb.hdg60(msg) == pytest.approx(42.71484) + assert commb.ias60(msg) == 252 + assert commb.mach60(msg) == 0.42 + assert commb.vr60baro(msg) == -1920 + assert commb.vr60ins(msg) == -1920