Compare commits

...

11 Commits
master ... acas

Author SHA1 Message Date
Xavier Olive
ff19b54868 fix typing 2022-12-29 19:22:35 +01:00
Xavier Olive
a30565b4ef restore missing file 2022-12-29 19:15:39 +01:00
Junzi Sun
64923438a1 Merge branch 'master' into acas 2022-12-26 18:33:11 +01:00
Junzi Sun
f9225bf375 merge main 2022-12-26 18:27:23 +01:00
Junzi Sun
67ced74c0a fix spaces 2022-12-26 18:22:24 +01:00
Junzi Sun
46ffb79234 add VDS check in ACAS decoding 2020-12-02 22:29:42 +01:00
Junzi Sun
2e9833148b update ACAS and BDS61 structures and functions 2020-05-23 23:40:49 +02:00
Junzi Sun
c25c9d6b96 Merge branch 'master' into pr-71/tuftedocelot/acas 2020-05-23 21:53:51 +02:00
tuftedocelot
13f2071bf0 Add indicators for do not pass
What to do with ARA though? Really need actual ACAS messages for
reference versus reading the spec only
2020-05-20 20:44:47 -05:00
tuftedocelot
3d0c4f0240 Add previously existing work 2020-05-20 19:56:07 -05:00
tuftedocelot
643f14e725 beginnings of acas 2020-05-20 19:51:58 -05:00
8 changed files with 303 additions and 13 deletions

3
.gitignore vendored
View File

@ -5,9 +5,6 @@ __pycache__/
*.py[cod]
.pytest_cache/
#cython
*.c
# C extensions
*.so

View File

@ -10,6 +10,7 @@ except Exception:
from .decoder import tell
from .decoder import adsb
from .decoder import acas
from .decoder import commb
from .decoder import allcall
from .decoder import surv

View File

@ -1,5 +1,158 @@
"""
Decoding Air-Air Surveillance (ACAS) DF=0/16
[To be implemented]
"""
from __future__ import annotations
from .. import common
import warnings
warnings.simplefilter("always", UserWarning)
def isACAS(msg: str) -> bool:
"""Check if the message is an ACAS coordination message.
:param msg: 28 hexdigits string
:return: if VDS is 3,1
"""
mv = common.hex2bin(common.data(msg))
vds = mv[0:8]
if vds == "00110000":
return True
else:
return False
def rac(msg: str) -> None | str:
"""Resolution Advisory Complement.
:param msg: 28 hexdigits string
:return: RACs
"""
if not isACAS(msg):
warnings.warn("Not an ACAS coordination message.")
return None
RAC = []
mv = common.hex2bin(common.data(msg))
if mv[22] == "1":
RAC.append("do not pass below")
if mv[23] == "1":
RAC.append("do not pass above")
if mv[24] == "1":
RAC.append("do not pass left")
if mv[25] == "1":
RAC.append("do not pass right")
return "; ".join(RAC)
def rat(msg: str) -> None | int:
"""RA terminated indicator
Mode S transponder is still required to report RA 18 seconds after
it is terminated by ACAS. Hence, the RAT filed is used.
:param msg: 28 hexdigits string
:return: if RA has been terminated
"""
if not isACAS(msg):
warnings.warn("Not an ACAS coordination message.")
return None
mv = common.hex2bin(common.data(msg))
mte = int(mv[26])
return mte
def mte(msg: str) -> None | int:
"""Multiple threat encounter.
:param msg: 28 hexdigits string
:return: if there are multiple threats
"""
if not isACAS(msg):
warnings.warn("Not an ACAS coordination message.")
return None
mv = common.hex2bin(common.data(msg))
mte = int(mv[27])
return mte
def ara(msg: str) -> None | str:
"""Decode active resolution advisory.
:param msg: 28 bytes hexadecimal message string
:return: RA charactristics
"""
if not isACAS(msg):
warnings.warn("Not an ACAS coordination message.")
return None
mv = common.hex2bin(common.data(msg))
mte = int(mv[27])
ara_b1 = int(mv[8])
ara_b2 = int(mv[9])
ara_b3 = int(mv[10])
ara_b4 = int(mv[11])
ara_b5 = int(mv[12])
ara_b6 = int(mv[14])
ara_b7 = int(mv[15])
# ACAS III are bits 15-22
RA = []
if ara_b1 == 1:
if ara_b2:
RA.append("corrective")
else:
RA.append("preventive")
if ara_b3:
RA.append("downward sense")
else:
RA.append("upward sense")
if ara_b4:
RA.append("increased rate")
if ara_b5:
RA.append("sense reversal")
if ara_b6:
RA.append("altitude crossing")
if ara_b7:
RA.append("positive")
else:
RA.append("vertical speed limit")
if ara_b1 == 0 and mte == 1:
if ara_b2:
RA.append("requires a correction in the upward sense")
if ara_b3:
RA.append("requires a positive climb")
if ara_b4:
RA.append("requires a correction in downward sense")
if ara_b5:
RA.append("requires a positive descent")
if ara_b6:
RA.append("requires a crossing")
if ara_b7:
RA.append("requires a sense reversal")
return "; ".join(RA)

View File

@ -25,7 +25,7 @@ from .bds.bds06 import (
)
from .bds.bds08 import callsign, category
from .bds.bds09 import airborne_velocity, altitude_diff
from .bds.bds61 import emergency_squawk, emergency_state, is_emergency
from .bds.bds61_st1 import emergency_squawk, emergency_state, is_emergency
from .bds.bds62 import (
altitude_hold_mode,
approach_mode,
@ -161,9 +161,7 @@ def position(
raise RuntimeError("Incorrect or inconsistent message types")
def position_with_ref(
msg: str, lat_ref: float, lon_ref: float
) -> tuple[float, float]:
def position_with_ref(msg: str, lat_ref: float, lon_ref: float) -> tuple[float, float]:
"""Decode position with only one message.
A reference position is required, which can be previously

View File

@ -34,14 +34,12 @@ from . import ( # noqa: F401
bds45,
bds50,
bds60,
bds61,
bds61_st1,
bds62,
)
def is50or60(
msg: str, spd_ref: float, trk_ref: float, alt_ref: float
) -> Optional[str]:
def is50or60(msg: str, spd_ref: float, trk_ref: float, alt_ref: float) -> Optional[str]:
"""Use reference ground speed and trk to determine BDS50 and DBS60.
Args:

View File

@ -2,6 +2,7 @@
# BDS 6,1
# ADS-B TC=28
# Aircraft Airborne status
# (Subtype 1)
# ------------------------------------------
from ... import common

View File

@ -0,0 +1,102 @@
# ------------------------------------------
# BDS 6,1
# ADS-B TC=28
# Aircraft Airborne status
# (Subtype 1)
# ------------------------------------------
from __future__ import annotations
from ... import common
from .. import acas
def threat_type(msg: str) -> int:
"""Determine the threat type indicator.
Value Meaning
----- ---------------------------------------
0 No identity data in TID
1 TID has Mode S address (ICAO)
2 TID has altitude, range, and bearing
3 Not assigned
:param msg: 28 hexdigits string
:return: indicator of threat type
"""
mb = common.hex2bin(common.data(msg))
tti = common.bin2int(mb[28:30])
return tti
def threat_identity(msg: str) -> str:
mb = common.hex2bin(common.data(msg))
tti = threat_type(msg)
# The ICAO of the threat is announced
if tti == 1:
return common.icao(mb[30:55])
else:
raise RuntimeError("%s: Missing threat identity (ICAO)")
def threat_location(msg: str) -> None | tuple[int, int, int]:
"""Get the altitude, range, and bearing of the threat.
Altitude is the Mode C altitude
:param msg: 28 hexdigits string
:return: tuple of the Mode C altitude, range, and bearing
"""
mb = common.hex2bin(common.data(msg))
tti = threat_type(msg)
# Altitude, range, and bearing of threat
if tti == 2:
altitude = common.altitude(mb[31:44])
distance = common.bin2int(mb[44:51])
bearing = common.bin2int(mb[51:57])
return altitude, distance, bearing
return None
def has_multiple_threats(msg: str) -> bool:
"""Indicate if the ACAS is processing multiple threats simultaneously.
:param msg: 28 hexdigits string
:return: if there are multiple threats
"""
return acas.mte(msg) == 1
def active_resolution_advisories(msg: str) -> None | str:
"""Decode active resolution advisory.
Uses ARA decoding function from ACAS module.
:param msg: 28 bytes hexadecimal message string
:return: RA charactristics
"""
return acas.ara(msg)
def is_ra_terminated(msg: str) -> bool:
"""Indicate if the threat is still being generated.
Mode S transponder is still required to report RA 18 seconds after
it is terminated by ACAS. Hence, the RAT filed is used.
:param msg: 28 hexdigits string
:return: if the threat is terminated
"""
return acas.rat(msg) == 1
def ra_complement(msg: str) -> None | str:
"""Resolution Advisory Complement.
:param msg: 28 hexdigits string
:return: RACs
"""
return acas.rac(msg)

40
tests/test_acas.py Normal file
View File

@ -0,0 +1,40 @@
import pyModeS as pms
msgs = [
"80E1983958C392E8710C642D0BAD",
"8609F8E19E90653083FDE096BE26",
"80011B41F40017D0000012E06B45",
"808182365813667446EB715E253D",
"80E1953058AB029BE8F4E60D989E",
"8667ECFA8FE06B30E59A124D5AEF",
"80030AA000042C0AD05D6205DB2C",
"8631A80000373C463B6E7A00008B",
"80E19131588B146108DE703F3C47",
"825CC4A0001595C4600030A40000",
"808182365813667438EB710EB6D8",
"80A18393581D3655E90354A664B2",
"8081823658136309B4F22BCB5F8D",
"80E1953058AB06516E8602756DD8",
"80E1991058C9063AB6A3E4744DC3",
"8073EC91840AFCED8F300BE765C5",
"8571F54AA814BF8130066A19A31D",
"864070E1990D3B1E78048BAE6987",
"80E1991058C9063AB6A3E4744DC3",
"80818298581581F7CAF8C3C1EEA4",
"80004E98BC90000FF01010951298",
"8526D57E4D963C92CDEE1B6C7C49",
"802A613C93F65A7FF803A51B5ADB",
"85B6C54279B67BE2A0001998FFDA",
"851944F15881648338D1AF4B7A27",
"8321014858208000787905B0E800",
"866DD078EDEBD330404FFFAE9BA5",
"80E196905AB503260E835D849E35",
]
for msg in msgs:
print("-" * 80)
print(msg)
print("ARA", pms.acas.ara(msg))
print("RAC", pms.acas.rac(msg))
print("MTE", pms.acas.mte(msg))
print("RAT", pms.acas.rat(msg))