with mypy

This commit is contained in:
Xavier Olive 2022-04-18 18:32:33 +02:00
parent 65ce1a62c4
commit 6e33ac0288
9 changed files with 177 additions and 64 deletions

View File

@ -30,9 +30,9 @@ jobs:
pip install -U pytest codecov pytest-cov pip install -U pytest codecov pytest-cov
pip install . pip install .
# - name: Type checking - name: Type checking
# run: | run: |
# mypy pyModeS tests mypy pyModeS
- name: Run tests (without Cython) - name: Run tests (without Cython)
run: | run: |

View File

@ -5,8 +5,8 @@ try:
from . import c_common as common from . import c_common as common
from .c_common import * from .c_common import *
except: except:
from . import py_common as common from . import py_common as common # type: ignore
from .py_common import * from .py_common import * # type: ignore
from .decoder import tell from .decoder import tell
from .decoder import adsb from .decoder import adsb

29
pyModeS/c_common.pyi Normal file
View File

@ -0,0 +1,29 @@
def hex2bin(hexstr: str) -> str: ...
def bin2int(binstr: str) -> int: ...
def hex2int(hexstr: str) -> int: ...
def bin2hex(binstr: str) -> str: ...
def df(msg: str) -> int: ...
def crc(msg: str, encode: bool = False) -> int: ...
def floor(x: float) -> float: ...
def icao(msg: str) -> str: ...
def is_icao_assigned(icao: str) -> bool: ...
def typecode(msg: str) -> int: ...
def cprNL(lat: float) -> int: ...
def idcode(msg: str) -> str: ...
def squawk(binstr: str) -> str: ...
def altcode(msg: str) -> int: ...
def altitude(binstr: str) -> int: ...
def data(msg: str) -> str: ...
def allzeros(msg: str) -> bool: ...

View File

@ -71,12 +71,12 @@ def tell(msg: str) -> None:
_print("CPR Longitude", cprlon) _print("CPR Longitude", cprlon)
_print("Altitude", alt, "feet") _print("Altitude", alt, "feet")
if tc == 29: # target state and status if tc == 29: # target state and status
_print("Type", "Target State and Status") _print("Type", "Target State and Status")
subtype = common.bin2int((common.hex2bin(msg)[32:])[5:7]) subtype = common.bin2int((common.hex2bin(msg)[32:])[5:7])
_print("Subtype", subtype) _print("Subtype", subtype)
tcas_operational = adsb.tcas_operational(msg) tcas_operational = adsb.tcas_operational(msg)
types = {0: "Not Engaged", 1: "Engaged"} types_29 = {0: "Not Engaged", 1: "Engaged"}
tcas_operational_types = {0: "Not Operational", 1: "Operational"} tcas_operational_types = {0: "Not Operational", 1: "Operational"}
if subtype == 0: if subtype == 0:
emergency_types = { emergency_types = {
@ -87,11 +87,11 @@ def tell(msg: str) -> None:
4: "No communications", 4: "No communications",
5: "Unlawful interference", 5: "Unlawful interference",
6: "Downed aircraft", 6: "Downed aircraft",
7: "Reserved" 7: "Reserved",
} }
vertical_horizontal_types = { vertical_horizontal_types = {
1: "Acquiring mode", 1: "Acquiring mode",
2: "Capturing/Maintaining mode" 2: "Capturing/Maintaining mode",
} }
tcas_ra_types = {0: "Not active", 1: "Active"} tcas_ra_types = {0: "Not active", 1: "Active"}
alt, alt_source, alt_ref = adsb.target_altitude(msg) alt, alt_source, alt_ref = adsb.target_altitude(msg)
@ -108,7 +108,12 @@ def tell(msg: str) -> None:
_print("Angle Source", angle_source) _print("Angle Source", angle_source)
_print("Vertical mode", vertical_horizontal_types[vertical_mode]) _print("Vertical mode", vertical_horizontal_types[vertical_mode])
_print("Horizontal mode", vertical_horizontal_types[horizontal_mode]) _print("Horizontal mode", vertical_horizontal_types[horizontal_mode])
_print("TCAS/ACAS", tcas_operational_types[tcas_operational]) _print(
"TCAS/ACAS",
tcas_operational_types[tcas_operational]
if tcas_operational
else None,
)
_print("TCAS/ACAS RA", tcas_ra_types[tcas_ra]) _print("TCAS/ACAS RA", tcas_ra_types[tcas_ra])
_print("Emergency status", emergency_types[emergency_status]) _print("Emergency status", emergency_types[emergency_status])
else: else:
@ -124,14 +129,20 @@ def tell(msg: str) -> None:
_print("Altitude source", alt_source) _print("Altitude source", alt_source)
_print("Barometric pressure setting", baro, "millibars") _print("Barometric pressure setting", baro, "millibars")
_print("Selected Heading", hdg, "°") _print("Selected Heading", hdg, "°")
if not(common.bin2int((common.hex2bin(msg)[32:])[46]) == 0): if not (common.bin2int((common.hex2bin(msg)[32:])[46]) == 0):
_print("Autopilot", types[autopilot]) _print("Autopilot", types_29[autopilot] if autopilot else None)
_print("VNAV mode", types[vnav]) _print("VNAV mode", types_29[vnav] if vnav else None)
_print("Altitude hold mode", types[alt_hold]) _print(
_print("Approach mode", types[app]) "Altitude hold mode", types_29[alt_hold] if alt_hold else None
_print("TCAS/ACAS", tcas_operational_types[tcas_operational]) )
_print("LNAV mode", types[lnav]) _print("Approach mode", types_29[app] if app else None)
_print(
"TCAS/ACAS",
tcas_operational_types[tcas_operational]
if tcas_operational
else None,
)
_print("LNAV mode", types_29[lnav] if lnav else None)
if df == 20: if df == 20:
_print("Protocol", "Mode-S Comm-B altitude reply") _print("Protocol", "Mode-S Comm-B altitude reply")

View File

@ -1,5 +1,3 @@
# noqa
"""ADS-B module. """ADS-B module.
The ADS-B module also imports functions from the following modules: The ADS-B module also imports functions from the following modules:

View File

@ -4,8 +4,10 @@
# Target State and Status # Target State and Status
# ------------------------------------------ # ------------------------------------------
from __future__ import annotations
from pyModeS import common from pyModeS import common
def selected_altitude(msg): def selected_altitude(msg):
"""Decode selected altitude. """Decode selected altitude.
@ -19,14 +21,19 @@ def selected_altitude(msg):
""" """
if common.typecode(msg) != 29: if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg) raise RuntimeError(
"%s: Not a target state and status message, expecting TC=29" % msg
)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7]) subtype = common.bin2int(mb[5:7])
if subtype == 0: if subtype == 0:
raise RuntimeError("%s: ADS-B version 1 target state and status message does not contain selected altitude, use target altitude instead" % msg) raise RuntimeError(
"%s: ADS-B version 1 target state and status message does not contain selected altitude, use target altitude instead"
% msg
)
alt = common.bin2int(mb[9:20]) alt = common.bin2int(mb[9:20])
alt = None if alt == 0 else (alt - 1) * 32 alt = None if alt == 0 else (alt - 1) * 32
@ -35,7 +42,6 @@ def selected_altitude(msg):
return alt, alt_source return alt, alt_source
def target_altitude(msg): def target_altitude(msg):
"""Decode target altitude. """Decode target altitude.
@ -50,14 +56,19 @@ def target_altitude(msg):
""" """
if common.typecode(msg) != 29: if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg) raise RuntimeError(
"%s: Not a target state and status message, expecting TC=29" % msg
)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7]) subtype = common.bin2int(mb[5:7])
if subtype == 1: if subtype == 1:
raise RuntimeError("%s: ADS-B version 2 target state and status message does not contain target altitude, use selected altitude instead" % msg) raise RuntimeError(
"%s: ADS-B version 2 target state and status message does not contain target altitude, use selected altitude instead"
% msg
)
alt_avail = common.bin2int(mb[7:9]) alt_avail = common.bin2int(mb[7:9])
if alt_avail == 0: if alt_avail == 0:
@ -94,14 +105,19 @@ def vertical_mode(msg):
""" """
if common.typecode(msg) != 29: if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg) raise RuntimeError(
"%s: Not a target state and status message, expecting TC=29" % msg
)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7]) subtype = common.bin2int(mb[5:7])
if subtype == 1: if subtype == 1:
raise RuntimeError("%s: ADS-B version 2 target state and status message does not contain vertical mode, use vnav mode instead" % msg) raise RuntimeError(
"%s: ADS-B version 2 target state and status message does not contain vertical mode, use vnav mode instead"
% msg
)
vertical_mode = common.bin2int(mb[13:15]) vertical_mode = common.bin2int(mb[13:15])
if vertical_mode == 0: if vertical_mode == 0:
@ -128,14 +144,19 @@ def horizontal_mode(msg):
""" """
if common.typecode(msg) != 29: if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg) raise RuntimeError(
"%s: Not a target state and status message, expecting TC=29" % msg
)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7]) subtype = common.bin2int(mb[5:7])
if subtype == 1: if subtype == 1:
raise RuntimeError("%s: ADS-B version 2 target state and status message does not contain horizontal mode, use lnav mode instead" % msg) raise RuntimeError(
"%s: ADS-B version 2 target state and status message does not contain horizontal mode, use lnav mode instead"
% msg
)
horizontal_mode = common.bin2int(mb[25:27]) horizontal_mode = common.bin2int(mb[25:27])
if horizontal_mode == 0: if horizontal_mode == 0:
@ -156,20 +177,25 @@ def selected_heading(msg):
""" """
if common.typecode(msg) != 29: if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg) raise RuntimeError(
"%s: Not a target state and status message, expecting TC=29" % msg
)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7]) subtype = common.bin2int(mb[5:7])
if subtype == 0: if subtype == 0:
raise RuntimeError("%s: ADS-B version 1 target state and status message does not contain selected heading, use target angle instead" % msg) raise RuntimeError(
"%s: ADS-B version 1 target state and status message does not contain selected heading, use target angle instead"
% msg
)
if int(mb[29]) == 0: if int(mb[29]) == 0:
hdg = None hdg = None
else: else:
hdg_sign = int(mb[30]) hdg_sign = int(mb[30])
hdg = (hdg_sign+1) * common.bin2int(mb[31:39]) * (180/256) hdg = (hdg_sign + 1) * common.bin2int(mb[31:39]) * (180 / 256)
hdg = round(hdg, 2) hdg = round(hdg, 2)
return hdg return hdg
@ -189,14 +215,19 @@ def target_angle(msg):
""" """
if common.typecode(msg) != 29: if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg) raise RuntimeError(
"%s: Not a target state and status message, expecting TC=29" % msg
)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7]) subtype = common.bin2int(mb[5:7])
if subtype == 1: if subtype == 1:
raise RuntimeError("%s: ADS-B version 2 target state and status message does not contain target angle, use selected heading instead" % msg) raise RuntimeError(
"%s: ADS-B version 2 target state and status message does not contain target angle, use selected heading instead"
% msg
)
angle_avail = common.bin2int(mb[25:27]) angle_avail = common.bin2int(mb[25:27])
if angle_avail == 0: if angle_avail == 0:
@ -228,14 +259,19 @@ def baro_pressure_setting(msg):
""" """
if common.typecode(msg) != 29: if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg) raise RuntimeError(
"%s: Not a target state and status message, expecting TC=29" % msg
)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7]) subtype = common.bin2int(mb[5:7])
if subtype == 0: if subtype == 0:
raise RuntimeError("%s: ADS-B version 1 target state and status message does not contain barometric pressure setting" % msg) raise RuntimeError(
"%s: ADS-B version 1 target state and status message does not contain barometric pressure setting"
% msg
)
baro = common.bin2int(mb[20:29]) baro = common.bin2int(mb[20:29])
baro = None if baro == 0 else 800 + (baro - 1) * 0.8 baro = None if baro == 0 else 800 + (baro - 1) * 0.8
@ -243,7 +279,8 @@ def baro_pressure_setting(msg):
return baro return baro
def autopilot(msg) -> bool:
def autopilot(msg) -> None | bool:
"""Decode autopilot engagement. """Decode autopilot engagement.
Args: Args:
@ -255,14 +292,19 @@ def autopilot(msg) -> bool:
""" """
if common.typecode(msg) != 29: if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg) raise RuntimeError(
"%s: Not a target state and status message, expecting TC=29" % msg
)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7]) subtype = common.bin2int(mb[5:7])
if subtype == 0: if subtype == 0:
raise RuntimeError("%s: ADS-B version 1 target state and status message does not contain autopilot engagement" % msg) raise RuntimeError(
"%s: ADS-B version 1 target state and status message does not contain autopilot engagement"
% msg
)
if int(mb[46]) == 0: if int(mb[46]) == 0:
return None return None
@ -271,7 +313,8 @@ def autopilot(msg) -> bool:
return autopilot return autopilot
def vnav_mode(msg) -> bool:
def vnav_mode(msg) -> None | bool:
"""Decode VNAV mode. """Decode VNAV mode.
Args: Args:
@ -283,14 +326,19 @@ def vnav_mode(msg) -> bool:
""" """
if common.typecode(msg) != 29: if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg) raise RuntimeError(
"%s: Not a target state and status message, expecting TC=29" % msg
)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7]) subtype = common.bin2int(mb[5:7])
if subtype == 0: if subtype == 0:
raise RuntimeError("%s: ADS-B version 1 target state and status message does not contain vnav mode, use vertical mode instead" % msg) raise RuntimeError(
"%s: ADS-B version 1 target state and status message does not contain vnav mode, use vertical mode instead"
% msg
)
if int(mb[46]) == 0: if int(mb[46]) == 0:
return None return None
@ -300,7 +348,7 @@ def vnav_mode(msg) -> bool:
return vnav_mode return vnav_mode
def altitude_hold_mode(msg) -> bool: def altitude_hold_mode(msg) -> None | bool:
"""Decode altitude hold mode. """Decode altitude hold mode.
Args: Args:
@ -312,14 +360,19 @@ def altitude_hold_mode(msg) -> bool:
""" """
if common.typecode(msg) != 29: if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg) raise RuntimeError(
"%s: Not a target state and status message, expecting TC=29" % msg
)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7]) subtype = common.bin2int(mb[5:7])
if subtype == 0: if subtype == 0:
raise RuntimeError("%s: ADS-B version 1 target state and status message does not contain altitude hold mode" % msg) raise RuntimeError(
"%s: ADS-B version 1 target state and status message does not contain altitude hold mode"
% msg
)
if int(mb[46]) == 0: if int(mb[46]) == 0:
return None return None
@ -329,8 +382,7 @@ def altitude_hold_mode(msg) -> bool:
return alt_hold_mode return alt_hold_mode
def approach_mode(msg) -> None | bool:
def approach_mode(msg) -> bool:
"""Decode approach mode. """Decode approach mode.
Args: Args:
@ -342,14 +394,19 @@ def approach_mode(msg) -> bool:
""" """
if common.typecode(msg) != 29: if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg) raise RuntimeError(
"%s: Not a target state and status message, expecting TC=29" % msg
)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7]) subtype = common.bin2int(mb[5:7])
if subtype == 0: if subtype == 0:
raise RuntimeError("%s: ADS-B version 1 target state and status message does not contain approach mode" % msg) raise RuntimeError(
"%s: ADS-B version 1 target state and status message does not contain approach mode"
% msg
)
if int(mb[46]) == 0: if int(mb[46]) == 0:
return None return None
@ -359,7 +416,7 @@ def approach_mode(msg) -> bool:
return app_mode return app_mode
def lnav_mode(msg) -> bool: def lnav_mode(msg) -> None | bool:
"""Decode LNAV mode. """Decode LNAV mode.
Args: Args:
@ -371,14 +428,19 @@ def lnav_mode(msg) -> bool:
""" """
if common.typecode(msg) != 29: if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg) raise RuntimeError(
"%s: Not a target state and status message, expecting TC=29" % msg
)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7]) subtype = common.bin2int(mb[5:7])
if subtype == 0: if subtype == 0:
raise RuntimeError("%s: ADS-B version 1 target state and status message does not contain lnav mode, use horizontal mode instead" % msg) raise RuntimeError(
"%s: ADS-B version 1 target state and status message does not contain lnav mode, use horizontal mode instead"
% msg
)
if int(mb[46]) == 0: if int(mb[46]) == 0:
return None return None
@ -388,7 +450,7 @@ def lnav_mode(msg) -> bool:
return lnav_mode return lnav_mode
def tcas_operational(msg) -> bool: def tcas_operational(msg) -> None | bool:
"""Decode TCAS/ACAS operational. """Decode TCAS/ACAS operational.
Args: Args:
@ -400,7 +462,9 @@ def tcas_operational(msg) -> bool:
""" """
if common.typecode(msg) != 29: if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg) raise RuntimeError(
"%s: Not a target state and status message, expecting TC=29" % msg
)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
@ -413,6 +477,7 @@ def tcas_operational(msg) -> bool:
return tcas return tcas
def tcas_ra(msg) -> bool: def tcas_ra(msg) -> bool:
"""Decode TCAS/ACAS Resolution advisory. """Decode TCAS/ACAS Resolution advisory.
@ -425,14 +490,19 @@ def tcas_ra(msg) -> bool:
""" """
if common.typecode(msg) != 29: if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg) raise RuntimeError(
"%s: Not a target state and status message, expecting TC=29" % msg
)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7]) subtype = common.bin2int(mb[5:7])
if subtype == 1: if subtype == 1:
raise RuntimeError("%s: ADS-B version 2 target state and status message does not contain TCAS/ACAS RA" % msg) raise RuntimeError(
"%s: ADS-B version 2 target state and status message does not contain TCAS/ACAS RA"
% msg
)
tcas_ra = True if int(mb[52]) == 1 else False tcas_ra = True if int(mb[52]) == 1 else False
@ -462,13 +532,18 @@ def emergency_status(msg) -> int:
""" """
if common.typecode(msg) != 29: if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg) raise RuntimeError(
"%s: Not a target state and status message, expecting TC=29" % msg
)
mb = common.hex2bin(msg)[32:] mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7]) subtype = common.bin2int(mb[5:7])
if subtype == 1: if subtype == 1:
raise RuntimeError("%s: ADS-B version 2 target state and status message does not contain emergency status" % msg) raise RuntimeError(
"%s: ADS-B version 2 target state and status message does not contain emergency status"
% msg
)
return common.bin2int(mb[53:56]) return common.bin2int(mb[53:56])

View File

@ -30,6 +30,6 @@ def BDS(msg):
def icao(msg): def icao(msg):
from pyModeS.decoder.common import icao from . import common
return icao(msg) return common.icao(msg)

View File

@ -4,7 +4,7 @@ import numpy as np
import pyModeS as pms import pyModeS as pms
try: try:
import rtlsdr import rtlsdr # type: ignore
except: except:
print("------------------------------------------------------------------------") print("------------------------------------------------------------------------")
print("! Warning: pyrtlsdr not installed (required for using RTL-SDR devices) !") print("! Warning: pyrtlsdr not installed (required for using RTL-SDR devices) !")