Compare commits

...

11 Commits
master ... acas

Author SHA1 Message Date
Xavier Olive
ff19b54868 fix typing 2022-12-29 19:22:35 +01:00
Xavier Olive
a30565b4ef restore missing file 2022-12-29 19:15:39 +01:00
Junzi Sun
64923438a1 Merge branch 'master' into acas 2022-12-26 18:33:11 +01:00
Junzi Sun
f9225bf375 merge main 2022-12-26 18:27:23 +01:00
Junzi Sun
67ced74c0a fix spaces 2022-12-26 18:22:24 +01:00
Junzi Sun
46ffb79234 add VDS check in ACAS decoding 2020-12-02 22:29:42 +01:00
Junzi Sun
2e9833148b update ACAS and BDS61 structures and functions 2020-05-23 23:40:49 +02:00
Junzi Sun
c25c9d6b96 Merge branch 'master' into pr-71/tuftedocelot/acas 2020-05-23 21:53:51 +02:00
tuftedocelot
13f2071bf0 Add indicators for do not pass
What to do with ARA though? Really need actual ACAS messages for
reference versus reading the spec only
2020-05-20 20:44:47 -05:00
tuftedocelot
3d0c4f0240 Add previously existing work 2020-05-20 19:56:07 -05:00
tuftedocelot
643f14e725 beginnings of acas 2020-05-20 19:51:58 -05:00
8 changed files with 303 additions and 13 deletions

3
.gitignore vendored
View File

@ -5,9 +5,6 @@ __pycache__/
*.py[cod] *.py[cod]
.pytest_cache/ .pytest_cache/
#cython
*.c
# C extensions # C extensions
*.so *.so

View File

@ -10,6 +10,7 @@ except Exception:
from .decoder import tell from .decoder import tell
from .decoder import adsb from .decoder import adsb
from .decoder import acas
from .decoder import commb from .decoder import commb
from .decoder import allcall from .decoder import allcall
from .decoder import surv from .decoder import surv

View File

@ -1,5 +1,158 @@
""" """
Decoding Air-Air Surveillance (ACAS) DF=0/16 Decoding Air-Air Surveillance (ACAS) DF=0/16
[To be implemented]
""" """
from __future__ import annotations
from .. import common
import warnings
warnings.simplefilter("always", UserWarning)
def isACAS(msg: str) -> bool:
"""Check if the message is an ACAS coordination message.
:param msg: 28 hexdigits string
:return: if VDS is 3,1
"""
mv = common.hex2bin(common.data(msg))
vds = mv[0:8]
if vds == "00110000":
return True
else:
return False
def rac(msg: str) -> None | str:
"""Resolution Advisory Complement.
:param msg: 28 hexdigits string
:return: RACs
"""
if not isACAS(msg):
warnings.warn("Not an ACAS coordination message.")
return None
RAC = []
mv = common.hex2bin(common.data(msg))
if mv[22] == "1":
RAC.append("do not pass below")
if mv[23] == "1":
RAC.append("do not pass above")
if mv[24] == "1":
RAC.append("do not pass left")
if mv[25] == "1":
RAC.append("do not pass right")
return "; ".join(RAC)
def rat(msg: str) -> None | int:
"""RA terminated indicator
Mode S transponder is still required to report RA 18 seconds after
it is terminated by ACAS. Hence, the RAT filed is used.
:param msg: 28 hexdigits string
:return: if RA has been terminated
"""
if not isACAS(msg):
warnings.warn("Not an ACAS coordination message.")
return None
mv = common.hex2bin(common.data(msg))
mte = int(mv[26])
return mte
def mte(msg: str) -> None | int:
"""Multiple threat encounter.
:param msg: 28 hexdigits string
:return: if there are multiple threats
"""
if not isACAS(msg):
warnings.warn("Not an ACAS coordination message.")
return None
mv = common.hex2bin(common.data(msg))
mte = int(mv[27])
return mte
def ara(msg: str) -> None | str:
"""Decode active resolution advisory.
:param msg: 28 bytes hexadecimal message string
:return: RA charactristics
"""
if not isACAS(msg):
warnings.warn("Not an ACAS coordination message.")
return None
mv = common.hex2bin(common.data(msg))
mte = int(mv[27])
ara_b1 = int(mv[8])
ara_b2 = int(mv[9])
ara_b3 = int(mv[10])
ara_b4 = int(mv[11])
ara_b5 = int(mv[12])
ara_b6 = int(mv[14])
ara_b7 = int(mv[15])
# ACAS III are bits 15-22
RA = []
if ara_b1 == 1:
if ara_b2:
RA.append("corrective")
else:
RA.append("preventive")
if ara_b3:
RA.append("downward sense")
else:
RA.append("upward sense")
if ara_b4:
RA.append("increased rate")
if ara_b5:
RA.append("sense reversal")
if ara_b6:
RA.append("altitude crossing")
if ara_b7:
RA.append("positive")
else:
RA.append("vertical speed limit")
if ara_b1 == 0 and mte == 1:
if ara_b2:
RA.append("requires a correction in the upward sense")
if ara_b3:
RA.append("requires a positive climb")
if ara_b4:
RA.append("requires a correction in downward sense")
if ara_b5:
RA.append("requires a positive descent")
if ara_b6:
RA.append("requires a crossing")
if ara_b7:
RA.append("requires a sense reversal")
return "; ".join(RA)

View File

@ -25,7 +25,7 @@ from .bds.bds06 import (
) )
from .bds.bds08 import callsign, category from .bds.bds08 import callsign, category
from .bds.bds09 import airborne_velocity, altitude_diff from .bds.bds09 import airborne_velocity, altitude_diff
from .bds.bds61 import emergency_squawk, emergency_state, is_emergency from .bds.bds61_st1 import emergency_squawk, emergency_state, is_emergency
from .bds.bds62 import ( from .bds.bds62 import (
altitude_hold_mode, altitude_hold_mode,
approach_mode, approach_mode,
@ -161,9 +161,7 @@ def position(
raise RuntimeError("Incorrect or inconsistent message types") raise RuntimeError("Incorrect or inconsistent message types")
def position_with_ref( def position_with_ref(msg: str, lat_ref: float, lon_ref: float) -> tuple[float, float]:
msg: str, lat_ref: float, lon_ref: float
) -> tuple[float, float]:
"""Decode position with only one message. """Decode position with only one message.
A reference position is required, which can be previously A reference position is required, which can be previously

View File

@ -34,14 +34,12 @@ from . import ( # noqa: F401
bds45, bds45,
bds50, bds50,
bds60, bds60,
bds61, bds61_st1,
bds62, bds62,
) )
def is50or60( def is50or60(msg: str, spd_ref: float, trk_ref: float, alt_ref: float) -> Optional[str]:
msg: str, spd_ref: float, trk_ref: float, alt_ref: float
) -> Optional[str]:
"""Use reference ground speed and trk to determine BDS50 and DBS60. """Use reference ground speed and trk to determine BDS50 and DBS60.
Args: Args:

View File

@ -2,6 +2,7 @@
# BDS 6,1 # BDS 6,1
# ADS-B TC=28 # ADS-B TC=28
# Aircraft Airborne status # Aircraft Airborne status
# (Subtype 1)
# ------------------------------------------ # ------------------------------------------
from ... import common from ... import common

View File

@ -0,0 +1,102 @@
# ------------------------------------------
# BDS 6,1
# ADS-B TC=28
# Aircraft Airborne status
# (Subtype 1)
# ------------------------------------------
from __future__ import annotations
from ... import common
from .. import acas
def threat_type(msg: str) -> int:
"""Determine the threat type indicator.
Value Meaning
----- ---------------------------------------
0 No identity data in TID
1 TID has Mode S address (ICAO)
2 TID has altitude, range, and bearing
3 Not assigned
:param msg: 28 hexdigits string
:return: indicator of threat type
"""
mb = common.hex2bin(common.data(msg))
tti = common.bin2int(mb[28:30])
return tti
def threat_identity(msg: str) -> str:
mb = common.hex2bin(common.data(msg))
tti = threat_type(msg)
# The ICAO of the threat is announced
if tti == 1:
return common.icao(mb[30:55])
else:
raise RuntimeError("%s: Missing threat identity (ICAO)")
def threat_location(msg: str) -> None | tuple[int, int, int]:
"""Get the altitude, range, and bearing of the threat.
Altitude is the Mode C altitude
:param msg: 28 hexdigits string
:return: tuple of the Mode C altitude, range, and bearing
"""
mb = common.hex2bin(common.data(msg))
tti = threat_type(msg)
# Altitude, range, and bearing of threat
if tti == 2:
altitude = common.altitude(mb[31:44])
distance = common.bin2int(mb[44:51])
bearing = common.bin2int(mb[51:57])
return altitude, distance, bearing
return None
def has_multiple_threats(msg: str) -> bool:
"""Indicate if the ACAS is processing multiple threats simultaneously.
:param msg: 28 hexdigits string
:return: if there are multiple threats
"""
return acas.mte(msg) == 1
def active_resolution_advisories(msg: str) -> None | str:
"""Decode active resolution advisory.
Uses ARA decoding function from ACAS module.
:param msg: 28 bytes hexadecimal message string
:return: RA charactristics
"""
return acas.ara(msg)
def is_ra_terminated(msg: str) -> bool:
"""Indicate if the threat is still being generated.
Mode S transponder is still required to report RA 18 seconds after
it is terminated by ACAS. Hence, the RAT filed is used.
:param msg: 28 hexdigits string
:return: if the threat is terminated
"""
return acas.rat(msg) == 1
def ra_complement(msg: str) -> None | str:
"""Resolution Advisory Complement.
:param msg: 28 hexdigits string
:return: RACs
"""
return acas.rac(msg)

40
tests/test_acas.py Normal file
View File

@ -0,0 +1,40 @@
import pyModeS as pms
msgs = [
"80E1983958C392E8710C642D0BAD",
"8609F8E19E90653083FDE096BE26",
"80011B41F40017D0000012E06B45",
"808182365813667446EB715E253D",
"80E1953058AB029BE8F4E60D989E",
"8667ECFA8FE06B30E59A124D5AEF",
"80030AA000042C0AD05D6205DB2C",
"8631A80000373C463B6E7A00008B",
"80E19131588B146108DE703F3C47",
"825CC4A0001595C4600030A40000",
"808182365813667438EB710EB6D8",
"80A18393581D3655E90354A664B2",
"8081823658136309B4F22BCB5F8D",
"80E1953058AB06516E8602756DD8",
"80E1991058C9063AB6A3E4744DC3",
"8073EC91840AFCED8F300BE765C5",
"8571F54AA814BF8130066A19A31D",
"864070E1990D3B1E78048BAE6987",
"80E1991058C9063AB6A3E4744DC3",
"80818298581581F7CAF8C3C1EEA4",
"80004E98BC90000FF01010951298",
"8526D57E4D963C92CDEE1B6C7C49",
"802A613C93F65A7FF803A51B5ADB",
"85B6C54279B67BE2A0001998FFDA",
"851944F15881648338D1AF4B7A27",
"8321014858208000787905B0E800",
"866DD078EDEBD330404FFFAE9BA5",
"80E196905AB503260E835D849E35",
]
for msg in msgs:
print("-" * 80)
print(msg)
print("ARA", pms.acas.ara(msg))
print("RAC", pms.acas.rac(msg))
print("MTE", pms.acas.mte(msg))
print("RAT", pms.acas.rat(msg))