Features:

- initial support for multiples tracks simulation

Enhancements:
- code profiling and optimization
 - faster lookup table based ppm + IQ generation
 - faster CRC24 computations
 - frame encoding

Bug fixes:
- callsign encoding
- typos
This commit is contained in:
Mathieu Peyréga 2022-03-15 20:23:40 +01:00
parent 5642c6564e
commit bc9687feb9
21 changed files with 645 additions and 292 deletions

View File

@ -1,23 +1,39 @@
""" This class is responsible for low level encoding of IQ sample
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see <http://www.gnu.org/licenses/>.
"""
from CustomDecorators import *
import numpy
@Singleton
class ADSBLowLevelEncoder:
"""
Hamming and Manchester Encoding example
Author: Joel Addison
Date: March 2013
Functions to do (7,4) hamming encoding and decoding, including error detection
and correction.
Manchester encoding and decoding is also included, and by default will use
least bit ordering for the byte that is to be included in the array.
"""
def __init__(self):
self.adsb_frame_preamble = [0xA1,0x40]
self.adsb_frame_pause = [0]*70
self._adsb_frame_preamble = [0xA1,0x40]
self._adsb_frame_pause = [0]*4
# Build a manchester encoding lookup table
self._manchester_lookup = []
for i in range(256):
me = self.manchester_encode(i)
self._manchester_lookup.append([127*val for pair in zip(me, me) for val in pair])
# Build preamble and pause manchester encoded versions
me_bits = numpy.unpackbits(numpy.asarray(self._adsb_frame_preamble, dtype=numpy.uint8))
self._adsb_frame_preamble_IQ = [127*val for pair in zip(me_bits, me_bits) for val in pair]
self._adsb_frame_pause_IQ = self._adsb_frame_pause*16
self._len_pre_IQ = len(self._adsb_frame_preamble_IQ)
self._len_pause_IQ = len(self._adsb_frame_pause_IQ)
###############################################################
# Further work on fork
# Copyright (C) 2017 David Robinson
@ -38,65 +54,62 @@ class ADSBLowLevelEncoder:
# Encode byte
for i in range(7, -1, -1):
if self.extract_bit(byte, i):
manchester_encoded.extend([0,1])
else:
manchester_encoded.extend([1,0])
else:
manchester_encoded.extend([0,1])
return manchester_encoded
def frame_1090es_ppm_modulate(self, even, odd = []):
def frame_1090es_ppm_modulate_IQ(self, even, odd = []):
"""
Args:
even and odd: The data frames that need to be converted to PPM
Returns:
The bytearray of the PPM data
The bytearray of the IQ samples for PPM modulated data
"""
ppm = [ ]
length_even = len(even)
length_odd = len(odd)
if (length_even != 0 and length_odd == 0):
IQ = bytearray(32*length_even+2*self._len_pause_IQ+self._len_pre_IQ)
pos = self._len_pause_IQ
IQ[pos:pos+self._len_pre_IQ] = self._adsb_frame_preamble_IQ
pos += self._len_pre_IQ
if (length_even != 0):
ppm.extend(self.adsb_frame_pause) # pause
ppm.extend(self.adsb_frame_preamble) # preamble
tmp = []
for b in even:
tmp.extend(self._manchester_lookup[b])
IQ[pos:pos+32*length_even] = tmp
for i in range(length_even):
word16 = numpy.packbits(self.manchester_encode(~even[i]))
ppm.extend(word16[0:2])
ppm.extend(self.adsb_frame_pause) # pause
return IQ
if (length_odd != 0):
ppm.extend(self.adsb_frame_pause) # pause
ppm.extend(self.adsb_frame_preamble) # preamble
elif (length_even != 0 and length_odd != 0):
IQ = bytearray(32*(length_even+length_odd)+2*self._len_pre_IQ+3*self._len_pause_IQ)
pos = self._len_pause_IQ
IQ[pos:pos+self._len_pre_IQ] = self._adsb_frame_preamble_IQ
pos += self._len_pre_IQ
for i in range(length_odd):
word16 = numpy.packbits(self.manchester_encode(~odd[i]))
ppm.extend(word16[0:2])
tmp = []
for b in even:
tmp.extend(self._manchester_lookup[b])
length_even_IQ = 32*length_even
IQ[pos:pos+length_even_IQ] = tmp
pos += length_even_IQ
ppm.extend(self.adsb_frame_pause) # pause
pos += self._len_pause_IQ
IQ[pos:pos+self._len_pre_IQ] = self._adsb_frame_preamble_IQ
pos += self._len_pre_IQ
tmp = []
for b in odd:
tmp.extend(self._manchester_lookup[b])
IQ[pos:pos+32*length_odd] = tmp
return bytearray(ppm)
return IQ
def hackrf_raw_IQ_format(self, ppm):
"""
Args:
ppm: this is some data in ppm (pulse position modulation) which will be converted into
hackRF raw IQ sample format, ready to be broadcasted
Returns:
bytearray: containing the IQ data
"""
signal = []
bits = numpy.unpackbits(numpy.asarray(ppm, dtype=numpy.uint8))
for bit in bits:
if bit == 1:
I = 127
Q = 127
else:
I = 0
Q = 0
signal.append(I)
signal.append(Q)
else:
return bytearray(signal)
return None

View File

@ -21,10 +21,10 @@ You should have received a copy of the GNU General Public License along with
this program. If not, see <http://www.gnu.org/licenses/>.
"""
import time
import time, datetime
import abc
import threading
from CustomDecorators import *
from ModeS import ModeS
class AbstractTrajectorySimulatorBase(threading.Thread,abc.ABC):
@ -38,60 +38,120 @@ class AbstractTrajectorySimulatorBase(threading.Thread,abc.ABC):
self._do_stop = False
self._position_message_period_us = 150000 # max should be 0.2s
self._velocity_message_period_us = 1200000 # max should be 1.3s
self._identitification_message_period_us = 10000000 # max should be 15s
self._squawk_message_period_us = 800000 # TODO : specs says that interval should be randomized between [0.7s;0.9s] and max is 1.0s
self._startruntime = None
self._simstep = 0
# Thread core function
def run(self):
is_first_step = True
while not self._do_stop:
encoder_changed = False
# check if modeS encoder needs update
if self._aircraftinfos.icao_changed or self._aircraftinfos.capability_changed or is_first_step:
self._modeSencoder.icao = self._aircraftinfos.icao
self._modeSencoder.ca = self._aircraftinfos.capability
encoder_changed = True
now = datetime.datetime.now(datetime.timezone.utc)
if (not self._startruntime or (now - self._startruntime).total_seconds() >= self._simstep*self.refresh_delay()):
self._simstep += 1
encoder_changed = False
if encoder_changed or self._aircraftinfos.callsign_changed:
self.df_callsign = self._modeSencoder.callsign_encode(self._aircraftinfos.callsign)
self._broadcast_thread.replace_message("identification",self.df_callsign)
#self._broadcast_thread.getMutex().acquire()
# check if modeS encoder needs update
if self._aircraftinfos.icao_changed or self._aircraftinfos.capability_changed or not self._startruntime:
self._modeSencoder.icao = self._aircraftinfos.icao
self._modeSencoder.ca = self._aircraftinfos.capability
encoder_changed = True
if encoder_changed or self._aircraftinfos.squawk_changed:
self.frame_6116 = self._modeSencoder.modaA_encode(self._aircraftinfos.squawk)
self._broadcast_thread.replace_message("register_6116",self.frame_6116)
if encoder_changed or self._aircraftinfos.callsign_changed:
self.df_callsign = self._modeSencoder.callsign_encode(self._aircraftinfos.callsign)
self._broadcast_thread.replace_message("identification",self.df_callsign)
# message generation only if needed
if encoder_changed or self._aircraftinfos.on_surface_changed \
or self._aircraftinfos.lat_changed or self._aircraftinfos.lon_changed or self._aircraftinfos.alt_msl_changed \
or self._aircraftinfos.type_code_changed or self._aircraftinfos.surveillance_status_changed or self._aircraftinfos.nicsupb_changed \
or self._aircraftinfos.timesync_changed:
if not self._aircraftinfos.on_surface:
(self.df_pos_even, self.df_pos_odd) = self._modeSencoder.df_encode_airborne_position(self._aircraftinfos.lat_deg, self._aircraftinfos.lon_deg, self._aircraftinfos.alt_msl_ft, \
self._aircraftinfos.type_code, self._aircraftinfos.surveillance_status, self._aircraftinfos.nicsupb, self._aircraftinfos.timesync)
self._broadcast_thread.replace_message("airborne_position",self.df_pos_even,self.df_pos_odd)
self._broadcast_thread.replace_message("surface_position",[], [])
else:
(self.df_pos_even, self.df_pos_odd) = self._modeSencoder.df_encode_surface_position(self._aircraftinfos.lat_deg, self._aircraftinfos.lon_deg, self._aircraftinfos.alt_msl_ft, \
self._aircraftinfos.type_code, self._aircraftinfos.surveillance_status, self._aircraftinfos.nicsupb, self._aircraftinfos.timesync)
self._broadcast_thread.replace_message("surface_position",self.df_pos_even,self.df_pos_odd)
self._broadcast_thread.replace_message("airborne_position",[], [])
if encoder_changed or self._aircraftinfos.squawk_changed:
self.frame_6116 = self._modeSencoder.modaA_encode(self._aircraftinfos.squawk)
self._broadcast_thread.replace_message("register_6116",self.frame_6116)
if encoder_changed or self._aircraftinfos.speed_changed or self._aircraftinfos.track_angle_changed or self._aircraftinfos.vspeed_changed:
self.df_velocity = self._modeSencoder.df_encode_ground_velocity(self._aircraftinfos.speed_kt, self._aircraftinfos.track_angle_deg, self._aircraftinfos.vspeed_ftpmin)
self._broadcast_thread.replace_message("airborne_velocity",self.df_velocity)
# message generation only if needed
if encoder_changed or self._aircraftinfos.on_surface_changed \
or self._aircraftinfos.lat_changed or self._aircraftinfos.lon_changed or self._aircraftinfos.alt_msl_changed \
or self._aircraftinfos.type_code_changed or self._aircraftinfos.surveillance_status_changed or self._aircraftinfos.nicsupb_changed \
or self._aircraftinfos.timesync_changed:
if not self._aircraftinfos.on_surface:
(self.df_pos_even, self.df_pos_odd) = self._modeSencoder.df_encode_airborne_position(self._aircraftinfos.lat_deg, self._aircraftinfos.lon_deg, self._aircraftinfos.alt_msl_ft, \
self._aircraftinfos.type_code, self._aircraftinfos.surveillance_status, self._aircraftinfos.nicsupb, self._aircraftinfos.timesync)
self._broadcast_thread.replace_message("airborne_position",self.df_pos_even,self.df_pos_odd)
self._broadcast_thread.replace_message("surface_position",[], [])
else:
(self.df_pos_even, self.df_pos_odd) = self._modeSencoder.df_encode_surface_position(self._aircraftinfos.lat_deg, self._aircraftinfos.lon_deg, self._aircraftinfos.alt_msl_ft, \
self._aircraftinfos.type_code, self._aircraftinfos.surveillance_status, self._aircraftinfos.nicsupb, self._aircraftinfos.timesync)
self._broadcast_thread.replace_message("surface_position",self.df_pos_even,self.df_pos_odd)
self._broadcast_thread.replace_message("airborne_position",[], [])
is_first_step = False
self.update_aircraftinfos() # update_aircraftinfos() : abstract method that need to be implemented i nderived classes
time.sleep(self.refresh_delay()) # refresh_delay() : abstract method that need to be implemented i nderived classes
if encoder_changed or self._aircraftinfos.speed_changed or self._aircraftinfos.track_angle_changed or self._aircraftinfos.vspeed_changed:
self.df_velocity = self._modeSencoder.df_encode_ground_velocity(self._aircraftinfos.speed_kt, self._aircraftinfos.track_angle_deg, self._aircraftinfos.vspeed_ftpmin)
self._broadcast_thread.replace_message("airborne_velocity",self.df_velocity)
#self._broadcast_thread.getMutex().release()
if not self._startruntime:
self._startruntime = now
self.update_aircraftinfos() # update_aircraftinfos() : abstract method that need to be implemented in derived classes
#elapsed = (datetime.datetime.now(datetime.timezone.utc) - now).total_seconds()
#sleep_time = self.refresh_delay() - elapsed
#if sleep_time < 0.0:
# sleep_time = 0.0
time.sleep(0.05*self.refresh_delay()) # refresh_delay() : abstract method that need to be implemented in derived classes
# upon exit, reset _do_stop flag in case there is a new start
self._do_stop = False
def stop(self):
self._do_stop = True
@abc.abstractmethod
def refresh_delay(self):
...
@abc.abstractmethod
def update_aircraftinfos(self):
...
...
def stop(self):
self._do_stop = True
@property
def elapsed_sim_time(self):
return self._simstep*self.refresh_delay()
@property
def aircraftinfos(self):
return self._aircraftinfos
@property
def position_message_period_us(self):
return self._position_message_period_us
@position_message_period_us.setter
def position_message_period_us(self,value):
self._position_message_period_us = value
@property
def velocity_message_period_us(self):
return self._velocity_message_period_us
@velocity_message_period_us.setter
def velocity_message_period_us(self,value):
self._velocity_message_period_us = value
@property
def identitification_message_period_us(self):
return self._identitification_message_period_us
@identitification_message_period_us.setter
def identitification_message_period_us(self,value):
self._identitification_message_period_us = value
@property
def squawk_message_period_us(self):
return self._squawk_message_period_us
@squawk_message_period_us.setter
def squawk_message_period_us(self,value):
self._squawk_message_period_us = value

View File

@ -15,8 +15,38 @@ this program. If not, see <http://www.gnu.org/licenses/>.
"""
import math
import json
class AircraftInfos:
@classmethod
def from_json(cls,filepath):
json_file = open(filepath,'r')
state_dic = json.load(json_file)
json_file.close()
icao_aa = state_dic["icao_aa"]
callsign = state_dic["callsign"]
squawk = state_dic["squawk"]
alt_ft = state_dic["alt_ft"]
lat_deg = state_dic["lat_deg"]
lon_deg = state_dic["lon_deg"]
speed_kph = state_dic["speed_kph"]
vspeed_ftpmin = state_dic["vspeed_ftpmin"]
maxloadfactor = state_dic["maxloadfactor"]
track_angle_deg = state_dic["track_angle_deg"]
capability = state_dic["capability"]
type_code = state_dic["type_code"]
surveillance_status = state_dic["surveillance_status"]
timesync = state_dic["timesync"]
nicsup = state_dic["nicsup"]
on_surface = state_dic["on_surface"]
return AircraftInfos(icao_aa,callsign,squawk,
lat_deg,lon_deg,alt_ft,speed_kph,vspeed_ftpmin,maxloadfactor,track_angle_deg, \
timesync,capability,type_code,surveillance_status,nicsup,on_surface)
def __init__(self,icao,callsign,squawk,
lat_deg,lon_deg,alt_msl_ft,speed_kph,vspeed_ftpmin,maxloadfactor,track_angle_deg,
timesync,capability,type_code,surveillance_status,nicsupb,on_surface):

View File

@ -21,7 +21,7 @@ class FixedTrajectorySimulator(AbstractTrajectorySimulatorBase):
super().__init__(mutex,broadcast_thread,aircrafinfos)
def refresh_delay(self):
return 0.005
return 0.05
def update_aircraftinfos(self):
pass
pass

View File

@ -57,19 +57,14 @@ def hackrfTXCB(hackrf_transfer):
@Singleton
class HackRfBroadcastThread(threading.Thread):
def __init__(self,mutex,airborne_position_refresh_period = 150000):
def __init__(self,airborne_position_refresh_period = 150000):
super().__init__()
self._mutex = mutex
self._mutex = threading.Lock()
self._lowlevelencoder = ADSBLowLevelEncoder()
self._messages = {}
# key : "name of message" value : ["data to be broadcasted", datetime of last broadcast, delay_between 2 messages of this type]
self._messages["identification"] = [None, None, 10000000] # max should be 15s
self._messages["register_6116"] = [None, None, 800000] # TODO : specs says that interval should be randomized between [0.7s;0.9s] and max is 1.0s
self._messages["airborne_position"] = [None, None, airborne_position_refresh_period] # max should be 0.2s
self._messages["surface_position"] = [None, None, 150000] # max should be 0.2s
self._messages["airborne_velocity"] = [None, None, 1200000] # max should be 1.3s
self._messages_feed_threads = {}
# Initialize pyHackRF library
result = HackRF.initialize()
@ -86,6 +81,8 @@ class HackRfBroadcastThread(threading.Thread):
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
self._hackrf_broadcaster.setCrystalPPM(0)
result = self._hackrf_broadcaster.setSampleRate(2000000)
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
@ -95,7 +92,7 @@ class HackRfBroadcastThread(threading.Thread):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
#result = self.hackrf_broadcaster.setFrequency(868000000) # free frequency for over the air brodcast tests
result = self._hackrf_broadcaster.setFrequency(1090000000) # do not use 1090MHz for actual over the air broadcasting
result = self._hackrf_broadcaster.setFrequency(1090000000) # do not use 1090MHz for actual over the air broadcasting
# only if you use wire feed (you'll need attenuators in that case)
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
@ -125,53 +122,113 @@ class HackRfBroadcastThread(threading.Thread):
def stop(self):
self._do_stop = True
def getMutex(self):
return self._mutex
# updates the next data to be broadcaster for a given message type
#@Timed
def replace_message(self,type,frame_even,frame_odd = []):
frame_ppm = self._lowlevelencoder.frame_1090es_ppm_modulate(frame_even, frame_odd)
frame_IQ = self._lowlevelencoder.hackrf_raw_IQ_format(frame_ppm)
# this will usuallyy be called from another thread, so mutex lock mecanism is used during update
frame_IQ = self._lowlevelencoder.frame_1090es_ppm_modulate_IQ(frame_even, frame_odd)
# this will usually be called from another thread, so mutex lock mecanism is used during update
self._mutex.acquire()
self._messages[type][0] = frame_IQ
calling_thread = threading.current_thread()
if calling_thread in self._messages_feed_threads:
self._messages_feed_threads[calling_thread][type][0] = frame_IQ
self._mutex.release()
def broadcast_one_message(self,data):
self._tx_context.last_tx_pos = 0
self._mutex.acquire()
self._tx_context.buffer_length = len(data)
self._tx_context.buffer = (c_ubyte*self._tx_context.buffer_length).from_buffer_copy(data)
# TODO : need to evaluate if mutex protection is requiered during full broadcast or
# could be reduced to buffer filling (probably can be reduced)
# reduced version is when next line mutex.release() is uncommented and
# mutex release at the end of this method is commented
def register_track_simulation_thread(self,feeder_thread):
if feeder_thread in self._messages_feed_threads:
print(feeder_thread,"already registred as a feeder")
else:
self._messages_feed_threads[feeder_thread] = {}
self._mutex.release()
# key : "name of message" value : ["data to be broadcasted", datetime of last broadcast, delay_between 2 messages of this type]
self._messages_feed_threads[feeder_thread]["identification"] = [None, None, feeder_thread.identitification_message_period_us]
self._messages_feed_threads[feeder_thread]["register_6116"] = [None, None, feeder_thread.squawk_message_period_us]
self._messages_feed_threads[feeder_thread]["airborne_position"] = [None, None, feeder_thread.position_message_period_us]
self._messages_feed_threads[feeder_thread]["surface_position"] = [None, None, feeder_thread.position_message_period_us]
self._messages_feed_threads[feeder_thread]["airborne_velocity"] = [None, None, feeder_thread.velocity_message_period_us]
result = self._hackrf_broadcaster.startTX(hackrfTXCB,self._tx_context)
def broadcast_data(self,data):
length = len(data)
if length != 0:
sleep_time = length*0.50e-6*(1.0+1e-6*self._hackrf_broadcaster.getCrystalPPM())
self._tx_context.last_tx_pos = 0
self._mutex.acquire()
self._tx_context.buffer_length = length
self._tx_context.buffer = (c_ubyte*self._tx_context.buffer_length).from_buffer_copy(data)
# TODO : need to evaluate if mutex protection is requiered during full broadcast or
# could be reduced to buffer filling (probably can be reduced)
# reduced version is when next line mutex.release() is uncommented and
# mutex release at the end of this method is commented
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
self._mutex.release()
while self._hackrf_broadcaster.isStreaming():
time.sleep(0.00001)
result = self._hackrf_broadcaster.startTX(hackrfTXCB,self._tx_context)
result = self._hackrf_broadcaster.stopTX()
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
#self.mutex.release()
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
while self._hackrf_broadcaster.isStreaming():
time.sleep(sleep_time)
result = self._hackrf_broadcaster.stopTX()
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
#self._mutex.release()
def run(self):
while not self._do_stop:
for k,v in self._messages.items():
now = datetime.datetime.now(datetime.timezone.utc)
# Time throttling : messages are broadcasted only at provided time intervall
# TODO : implement UTC syncing mecanism (requiered that the actual host clock is UTC synced)
# which can be implemented to some accuracy level with ntp or GPS + PPS mecanisms
if (v[0] != None and len(v[0]) > 0) and (v[1] == None or (now - v[1]) >= datetime.timedelta(seconds=v[2] // 1000000,microseconds=v[2] % 1000000)):
self.broadcast_one_message(v[0])
v[1] = now
time.sleep(0.0001) # this loop will run at 10 kHz max
#self._mutex.acquire()
now = datetime.datetime.now(datetime.timezone.utc)
plane_messages = bytearray()
sleep_time = 10.0
for thread_broadcast_schedule in self._messages_feed_threads.values():
for v in thread_broadcast_schedule.values():
#now = datetime.datetime.now(datetime.timezone.utc)
v2_sec = v[2]*1e-6
if v[1] != None:
remaining = v2_sec - (now - v[1]).total_seconds()
else:
remaining = -float('inf')
sleep_time = 0.0
# Time throttling : messages are broadcasted only at provided time intervall
# TODO : implement UTC syncing mecanism (requiered that the actual host clock is UTC synced) ?
# which may be implemented to some accuracy level with ntp or GPS + PPS mecanisms ? in Python ?
if (v[0] != None and len(v[0]) > 0) and remaining <= 0.0:
plane_messages.extend(v[0])
v[1] = now
elif remaining > 0.0:
remaining = math.fmod(remaining,v2_sec)
if remaining < sleep_time:
sleep_time = remaining
#print("sleep_time1",sleep_time)
bc_length = len(plane_messages)
if (bc_length > 0):
self.broadcast_data(plane_messages)
elasped = (datetime.datetime.now(datetime.timezone.utc) - now).total_seconds()
sleep_time -= elasped
if sleep_time < 0.0:
sleep_time = 0.0
elif sleep_time < 0.5:
sleep_time *= 0.1
else:
sleep_time = 0.5
time.sleep(0.1*sleep_time)
else:
time.sleep(0.000001)
#self._mutex.release()
# upon exit, reset _do_stop flag in case there is a new start
self._do_stop = False

213
ModeS.py
View File

@ -1,6 +1,8 @@
from ModeSLocation import ModeSLocation
import math
import numpy
from CustomDecorators import *
###############################################################
# Further work on fork
# Copyright (C) 2017 David Robinson
@ -9,6 +11,9 @@ class ModeS:
"""
def __init__(self,df,icao,ca):
self._location = ModeSLocation()
self.df = df # as far as I understand specification, this should be :
# 17 if the broadcast source is an aircraft
# 18 if the broadcast source is some other ADSB facility (tower)
@ -17,30 +22,42 @@ class ModeS:
self.ca = ca # capability see §3.1.2.5.2.2.1
# (will usually be 5 for level 2 transponder and airborne)
self._charmap = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
self._crc_poly = 0x01FFF409
self._crc_lookup_T0 = [0]*256
crc = 0x1000000
i = 1
while(i < 256):
if crc & 0x1000000:
crc <<= 1
crc = crc ^ self._crc_poly
else:
crc <<= 1
for j in range(i):
self._crc_lookup_T0[i ^ j] = (crc ^ self._crc_lookup_T0[j])
i <<= 1
def df_frame_start(self):
"""
This will build the usual df frame start
This will build the usual df frame start and reserve a full frame with "0" for the reste of message
"""
frame = []
frame.append((self.df << 3) | self.ca)
frame.append((self.icao >> 16) & 0xff)
frame.append((self.icao >> 8) & 0xff)
frame.append((self.icao) & 0xff)
frame = bytearray(14)
frame[0] = (self.df << 3) | self.ca
frame[1] = ((self.icao >> 16) & 0xff)
frame[2] = ((self.icao >> 8) & 0xff)
frame[3] = ((self.icao) & 0xff)
return frame
def df_frame_append_crc(self,frame):
frame_str = "{0:02x}{1:02x}{2:02x}{3:02x}{4:02x}{5:02x}{6:02x}{7:02x}{8:02x}{9:02x}{10:02x}".format(*frame[0:11])
frame_crc = self.bin2int(self.modes_crc(frame_str + "000000", encode=True))
frame.append((frame_crc >> 16) & 0xff)
frame.append((frame_crc >> 8) & 0xff)
frame.append((frame_crc) & 0xff)
# Ref :
# ICAO Annex 10 : Aeronautical Telecommunications
# Volume IV : Surveillance and Collision Avoidance Systems
# Figure C-1. Extended Squitter Airborne Position
# "Register 05_16"
#@Timed
def df_encode_airborne_position(self, lat, lon, alt, tc, ss, nicsb, timesync):
"""
This will encode even and odd frames from airborne position extended squitter message
@ -52,38 +69,38 @@ class ModeS:
nicsb = NIC supplement-B (§C.2.3.2.5)
"""
location = ModeSLocation()
enc_alt = location.encode_alt_modes(alt, False)
#encode height information
enc_alt = self._location.encode_alt_modes(alt, False)
#encode that position
(evenenclat, evenenclon) = location.cpr_encode(lat, lon, False, False)
(oddenclat, oddenclon) = location.cpr_encode(lat, lon, True, False)
(evenenclat, evenenclon) = self._location.cpr_encode(lat, lon, False, False)
(oddenclat, oddenclon) = self._location.cpr_encode(lat, lon, True, False)
ff = 0
df_frame_even_bytes = self.df_frame_start()
# data
df_frame_even_bytes.append((tc<<3) | (ss<<1) | nicsb)
df_frame_even_bytes.append((enc_alt>>4) & 0xff)
df_frame_even_bytes.append((enc_alt & 0xf) << 4 | (timesync<<3) | (ff<<2) | (evenenclat>>15))
df_frame_even_bytes.append((evenenclat>>7) & 0xff)
df_frame_even_bytes.append(((evenenclat & 0x7f) << 1) | (evenenclon>>16))
df_frame_even_bytes.append((evenenclon>>8) & 0xff)
df_frame_even_bytes.append((evenenclon ) & 0xff)
df_frame_even_bytes[4] = (tc<<3) | (ss<<1) | nicsb
df_frame_even_bytes[5] = (enc_alt>>4) & 0xff
df_frame_even_bytes[6] = (enc_alt & 0xf) << 4 | (timesync<<3) | (ff<<2) | (evenenclat>>15)
df_frame_even_bytes[7] = (evenenclat>>7) & 0xff
df_frame_even_bytes[8] = ((evenenclat & 0x7f) << 1) | (evenenclon>>16)
df_frame_even_bytes[9] = (evenenclon>>8) & 0xff
df_frame_even_bytes[10] = (evenenclon ) & 0xff
self.df_frame_append_crc(df_frame_even_bytes)
self.df_frame_write_crc(df_frame_even_bytes)
ff = 1
df_frame_odd_bytes = self.df_frame_start()
# data
df_frame_odd_bytes.append((tc<<3) | (ss<<1) | nicsb)
df_frame_odd_bytes.append((enc_alt>>4) & 0xff)
df_frame_odd_bytes.append((enc_alt & 0xf) << 4 | (timesync<<3) | (ff<<2) | (oddenclat>>15))
df_frame_odd_bytes.append((oddenclat>>7) & 0xff)
df_frame_odd_bytes.append(((oddenclat & 0x7f) << 1) | (oddenclon>>16))
df_frame_odd_bytes.append((oddenclon>>8) & 0xff)
df_frame_odd_bytes.append((oddenclon ) & 0xff)
df_frame_odd_bytes[4] = (tc<<3) | (ss<<1) | nicsb
df_frame_odd_bytes[5] = (enc_alt>>4) & 0xff
df_frame_odd_bytes[6] = (enc_alt & 0xf) << 4 | (timesync<<3) | (ff<<2) | (oddenclat>>15)
df_frame_odd_bytes[7] = (oddenclat>>7) & 0xff
df_frame_odd_bytes[8] = ((oddenclat & 0x7f) << 1) | (oddenclon>>16)
df_frame_odd_bytes[9] = (oddenclon>>8) & 0xff
df_frame_odd_bytes[10] = (oddenclon ) & 0xff
self.df_frame_append_crc(df_frame_odd_bytes)
self.df_frame_write_crc(df_frame_odd_bytes)
return (df_frame_even_bytes, df_frame_odd_bytes)
@ -101,16 +118,17 @@ class ModeS:
# Volume IV : Surveillance and Collision Avoidance Systems
# Figure C-3. Extended Squitter Status
# "Register 07_16"
#@Timed
def df_encode_extended_squitter_status(self, trs = 0x0, ats = 0x0):
df_frame = self.df_frame_start()
df_frame.append((trs << 6) & 0x3 | (ats << 5) & 0x1)
df_frame.extend([0]*6)
df_frame[4] = ((trs << 6) & 0x3 | (ats << 5) & 0x1)
self.df_frame_append_crc(df_frame)
self.df_frame_write_crc(df_frame)
return df_frame
#From https://github.com/jaywilhelm/ADSB-Out_Python on 2019-08-18
#@Timed
def df_encode_ground_velocity(self, ground_velocity_kt, track_angle_deg, vertical_rate):
#1-5 downlink format
@ -161,15 +179,15 @@ class ModeS:
dfvel = self.df_frame_start()
# data
dfvel.append((tc << 3) | st)
dfvel.append((ic << 7) | (resv_a << 6) | (NAC << 3) | (S_EW << 2) | ((V_EW >> 8) & 0x03))
dfvel.append(0xFF & V_EW)
dfvel.append((S_NS << 7) | ((V_NS >> 3))) #& 0x7F))
dfvel.append(((V_NS << 5) & 0xE0) | (VrSrc << 4) | (S_Vr << 3) | ((Vr >> 6) & 0x03))
dfvel.append(((Vr << 2) & 0xFC) | (RESV_B))
dfvel.append((S_Dif << 7) | (Dif))
dfvel[4] = ((tc << 3) | st)
dfvel[5] = ((ic << 7) | (resv_a << 6) | (NAC << 3) | (S_EW << 2) | ((V_EW >> 8) & 0x03))
dfvel[6] = (0xFF & V_EW)
dfvel[7] = ((S_NS << 7) | ((V_NS >> 3))) #& 0x7F))
dfvel[8] = (((V_NS << 5) & 0xE0) | (VrSrc << 4) | (S_Vr << 3) | ((Vr >> 6) & 0x03))
dfvel[9] = (((Vr << 2) & 0xFC) | (RESV_B))
dfvel[10] = ((S_Dif << 7) | (Dif))
self.df_frame_append_crc(dfvel)
self.df_frame_write_crc(dfvel)
return dfvel
@ -186,19 +204,17 @@ class ModeS:
tc = 1 # §C.2.3.4
ec = 1 # §C.2.3.4
map = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######"
dfname = self.df_frame_start()
# data
dfname.append((tc << 3) | (ec))
dfname.append((0xFC & (int(map.find(csname[0])) << 2)) | (0x03 & (int(map.find(csname[1])) >> 6)))
dfname.append((0xF0 & (int(map.find(csname[1])) << 4)) | (0x0F & (int(map.find(csname[2])) >> 2)))
dfname.append((0xF0 & (int(map.find(csname[2])) << 6)) | (0x3F & (int(map.find(csname[3])) >> 0)))
dfname.append((0xFC & (int(map.find(csname[4])) << 2)) | (0x03 & (int(map.find(csname[5])) >> 4)))
dfname.append((0xF0 & (int(map.find(csname[5])) << 4)) | (0x0F & (int(map.find(csname[6])) >> 2)))
dfname.append((0xF0 & (int(map.find(csname[6])) << 6)) | (0x3F & (int(map.find(csname[7])) >> 0)))
dfname[4] = (tc << 3) | (ec)
dfname[5] = (0xFC & (int(self._charmap.find(csname[0])) << 2)) | (0x03 & (int(self._charmap.find(csname[1])) >> 4))
dfname[6] = (0xF0 & (int(self._charmap.find(csname[1])) << 4)) | (0x0F & (int(self._charmap.find(csname[2])) >> 2))
dfname[7] = (0xF0 & (int(self._charmap.find(csname[2])) << 6)) | (0x3F & (int(self._charmap.find(csname[3])) >> 0))
dfname[8] = (0xFC & (int(self._charmap.find(csname[4])) << 2)) | (0x03 & (int(self._charmap.find(csname[5])) >> 4))
dfname[9] = (0xF0 & (int(self._charmap.find(csname[5])) << 4)) | (0x0F & (int(self._charmap.find(csname[6])) >> 2))
dfname[10] = (0xF0 & (int(self._charmap.find(csname[6])) << 6)) | (0x3F & (int(self._charmap.find(csname[7])) >> 0))
self.df_frame_append_crc(dfname)
self.df_frame_write_crc(dfname)
return dfname
@ -208,6 +224,7 @@ class ModeS:
# Figure C-8a. Extended Squitter Aircraft Status
# "Register 61_16"
#@Timed
def modaA_encode(self,modeA_4096_code = "7000", emergency_state = 0x0):
frame = self.df_frame_start()
# data
@ -216,7 +233,7 @@ class ModeS:
# 1 : Emergency/Priority Status and Mode A Code
# 2 : TCAS/ACAS RA Broadcast -> Figure C-8b : fields have different meaning
# 3-7 : reserved
frame.append((format_tc << 3) | st)
frame[4] = ((format_tc << 3) | st)
# Encode Squawk
# ABCD (A:0-7, B:0-7, C:0-7, D:0-7)
@ -277,67 +294,39 @@ class ModeS:
elif squawk_str == "7500":
emergency = 0x5
frame.append(emergency << 5 | squawk_bits >> 8)
frame.append(squawk_bits & 0xFF)
frame[5] = (emergency << 5 | squawk_bits >> 8)
frame[6] = (squawk_bits & 0xFF)
frame.extend([0]*4)
self.df_frame_append_crc(frame)
self.df_frame_write_crc(frame)
return frame
###############################################################
# Copyright (C) 2015 Junzi Sun (TU Delft)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
###############################################################
#@Timed
def df_frame_write_crc(self,frame):
crc = self.crc24_lu(frame[0:11])
frame[11] = crc >> 16
frame[12] = (crc >> 8) & 0xFF
frame[13] = crc & 0xFF
# the polynominal generattor code for CRC
def modes_crc(self, msg, encode=False):
"""Mode-S Cyclic Redundancy Check
Detect if bit error occurs in the Mode-S message
Args:
msg (string): 28 bytes hexadecimal message string
encode (bool): True to encode the date only and return the checksum
Returns:
string: message checksum, or partity bits (encoder)
"""
#@Timed
# CRC24 without lookup table
def crc24(self,msg):
crc = 0x0
for b in msg:
crc ^= (b << 16)
for i in range(8):
crc <<= 1
if crc & 0x1000000: crc ^= self._crc_poly
GENERATOR = "1111111111111010000001001" # polynomial coefficients
msgbin = list(self.hex2bin(msg))
return crc & 0xFFFFFF
if encode:
msgbin[-24:] = ['0'] * 24
#@Timed
# CRC24 with lookup table (about 3x faster than non lookup version)
def crc24_lu(self,msg):
#print("len:",len(msg))
crc = 0x0
for b in msg:
tmp = (crc ^ (b << 16)) >> 16
crc = ((crc << 8) ^ self._crc_lookup_T0[tmp & 0xff])
# loop all bits, except last 24 piraty bits
for i in range(len(msgbin)-24):
# if 1, perform modulo 2 multiplication,
if msgbin[i] == '1':
for j in range(len(GENERATOR)):
# modulo 2 multiplication = XOR
msgbin[i+j] = str((int(msgbin[i+j]) ^ int(GENERATOR[j])))
# last 24 bits
reminder = ''.join(msgbin[-24:])
return reminder
def hex2bin(self, hexstr):
"""Convert a hexdecimal string to binary string, with zero fillings. """
scale = 16
num_of_bits = len(hexstr) * math.log(scale, 2)
binstr = bin(int(hexstr, scale))[2:].zfill(int(num_of_bits))
return binstr
def bin2int(self, binstr):
"""Convert a binary string to integer. """
return int(binstr, 2)
return crc & 0xFFFFFF

View File

@ -1,4 +1,5 @@
import math
from CustomDecorators import *
class ModeSLocation:
"""This class does ModeS/ADSB Location calulations"""
@ -89,6 +90,7 @@ class ModeSLocation:
# encode CPR position
# https://adsb-decode-guide.readthedocs.io/en/latest/content/cpr.html
# compact position reporting
#@Timed
def cpr_encode(self, lat, lon, ctype, surface):
if surface is True:
scalar = 2.**19

View File

@ -20,32 +20,29 @@ from AbstractTrajectorySimulatorBase import AbstractTrajectorySimulatorBase
class PseudoCircleTrajectorySimulator(AbstractTrajectorySimulatorBase):
def __init__(self,mutex,broadcast_thread,aircrafinfos):
super().__init__(mutex,broadcast_thread,aircrafinfos)
self._starttime = datetime.datetime.now(datetime.timezone.utc)
self._lasttime = self._starttime
self._lat0 = aircrafinfos.lat_deg
self._lon0 = aircrafinfos.lon_deg
def refresh_delay(self):
return 0.005
return 0.02
def update_aircraftinfos(self):
now = datetime.datetime.now(datetime.timezone.utc)
elapsed = (now - self._lasttime).total_seconds()
turn_rate = self.getTurnRate()
R = self.getTurnRadius()
Rlat = (R/6371000.0)*(180.0/math.pi)
ta = self._aircraftinfos.track_angle_deg - (turn_rate*elapsed)*(180.0/math.pi)
ta = self._aircraftinfos.track_angle_deg - (turn_rate*self.refresh_delay())*(180.0/math.pi)
ta = math.fmod(ta,360.0)
self._aircraftinfos.track_angle_deg = ta
self._aircraftinfos.track_angle_deg = ta # intermediate value and single assignment needed at time because of
# setter and change detection mecanism in AircraftInfo (somehox shitty)
# TODO : implement better mecanism
self._aircraftinfos.lat_deg = self._lat0 - Rlat*math.sin(self._aircraftinfos.track_angle_deg*math.pi/180.0)
self._aircraftinfos.lon_deg = self._lon0 + Rlat/math.cos(self._aircraftinfos.lat_deg*math.pi/180.0)*math.cos(self._aircraftinfos.track_angle_deg*math.pi/180.0)
self._lasttime = now
#self._lasttime = now
def getTurnRate(self):
tr = (9.81/self._aircraftinfos.speed_mps)*math.sqrt(self._aircraftinfos.maxloadfactor**2.0 - 1.0)
return tr
def getTurnRadius(self):
return self._aircraftinfos.speed_mps/self.getTurnRate()
return self._aircraftinfos.speed_mps/self.getTurnRate()

View File

@ -53,7 +53,7 @@ The hardware setup I'm using is pictured below. Please note the RF attenuators (
The extra 1090MHz filter is probably not requiered as the flight aware dongle already features 1090 MHz filtering.
My HackRF is fitted with a 0.5 ppm TCXO
![test setup image](./test-setup.jpg "test setup")
![test setup image](./images/test-setup.jpg "test setup")
The default hackrf settings in repo are :
- 1090 MHz
@ -72,11 +72,47 @@ Those forged broadcasts may be used to spoof ATC, trigger TCAS or other maliciou
## Command line examples
####*Command line switches can be displayed with*
```
mathieu@devbox:~/Dev/matioupi/realtime-adsb-out$ ./realtime-adsb-out.py -h
Usage: ./realtime-adsb-out.py [options]
-h | --help Display help message.
--scenario <opt> Scenario mode with a provided scenario filepath
--icao <opt> Callsign in hex, Default:0x508035
--callsign <opt> Callsign (8 chars max), Default:DEADBEEF
--squawk <opt> 4-digits 4096 code squawk, Default:7000
--trajectorytype <opt> Type of simulated trajectory amongst :
fixed : steady aircraft
circle : pseudo circular flight
random : random positions inside circle area
waypoints : fly long flight path
Default:fixed
--lat <opt> Latitude for the plane in decimal degrees, Default:50.44994
--long <opt> Longitude for the place in decimal degrees. Default:30.5211
--altitude <opt> Altitude in decimal feet, Default:1500.0
--speed <opt> Airspeed in decimal kph, Default:300.0
--vspeed <opt> Vertical speed en ft/min, positive up, Default:0
--maxloadfactor Specify the max load factor for aircraft simulation. Default:1.45
--trackangle <opt> Track angle in decimal degrees. Default:0
--timesync <opt> 0/1, 0 indicates time not synchronous with UTC, Default:0
--capability <opt> Capability, Default:5
--typecode <opt> ADS-B message type, Default:11
--sstatus <opt> Surveillance status, Default:0
--nicsupplementb <opt> NIC supplement-B, Default:0
--surface Aircraft located on ground, Default:False
--waypoints <opt> Waypoints file for waypoints trajectory
--posrate <opt> position frame broadcast period in µs, Default: 150000
```
####*Single plane scenarii can be achieved with command line switches*
`./realtime-adsb-out.py --callsign 'FCKPUTIN' --alt 4500 --speed 600 --trajectorytype circle --maxloadfactor 1.03`
will generate a pseudo circular trajectory, flown at 4500 ft, 600 km/h and a load factor of 1.03.
![circle mode example image](./adsb-out-circle.png "circle mode example")
![circle mode example image](./images/adsb-out-circle.png "circle mode example")
`./realtime-adsb-out.py --callsign 'FCKPUTIN' --alt 4500 --trajectorytype random`
@ -84,13 +120,22 @@ will generate a random trajectory in a ~30s at specified (here default) speed ar
track angle is randomized, speed is randomized, altitude is randomized. The default position frame broadcast period can be lowered in order to
produce a large numer of tracks in a given area
![random mode example image](./adsb-out-random.png "random mode example")
![random mode example image](./images/adsb-out-random.png "random mode example")
####*More complex scenarii with multiple planes can be achieved though json configuration files*
`./realtime-adsb-out.py --scenario ./examples/scenario3.json`
![4 planes scenario example](./images/adsb-out-scenario3.png "4 planes scenario example")
The maximum number of planes that can be simulated has not been evaluated yet. It will depends on the refresh rate of each message type, etc.
Tests have been performed on a laptop computer, but with not too many tracks, it should be possible to run on lighter platforms such as Raspberry Pi.
## Reference documentation
All reference documentation from the repositories mentionned in the foreword.
https://mode-s.org/
[https://mode-s.org/](https://mode-s.org/)
*ICAO Annex 10, Aeronautical Telecommunications, Volume IV - Surveillance Radar and Collision Avoidance Systems* which at time of writing can be retrieved here:
- english version https://www.bazl.admin.ch/bazl/en/home/specialists/regulations-and-guidelines/legislation-and-directives/anhaenge-zur-konvention-der-internationalen-zivilluftfahrtorgani.html
@ -99,5 +144,8 @@ https://mode-s.org/
*ICAO doc 9871 edition 1* which can be retrieved here (There is an edition 2 of this document but all seems to be behing paywalls):
- [ICAO doc 9871 edition 1](http://www.aviationchief.com/uploads/9/2/0/9/92098238/icao_doc_9871_-_technical_provisions_for_mode_s_-_advanced_edition_1.pdf)
Ghost in the Air(Traffic): On insecurity of ADS-B protocol and practical attacks on ADS-B devices (Andrei Costin, Aurélien Francillon):
[publication PDF hosted at eurocom.fr](https://www.s3.eurecom.fr/docs/bh12us_costin.pdf)
A DEFCON 20 video on Youtube that already highlighted some ADS-B weaknesses (and at this time, there was no HackRF):
[DEFCON 20 (2012) - RenderMan - Hacker + Airplanes = No Good Can Come Of This](https://www.youtube.com/watch?v=mY2uiLfXmaI)

18
examples/A400M-094.json Normal file
View File

@ -0,0 +1,18 @@
{
"icao_aa":"4B8214",
"callsign":"180094",
"squawk" : "7000",
"alt_ft": 35000.0,
"lat_deg": 50.24994,
"lon_deg": 30.0211,
"speed_kph": 780.0,
"vspeed_ftpmin": 0.0,
"maxloadfactor": 1.01,
"track_angle_deg": 0.0,
"capability": 5,
"type_code": 11,
"surveillance_status": 0,
"timesync": 0,
"nicsup": 0,
"on_surface": false
}

18
examples/A400M-110.json Normal file
View File

@ -0,0 +1,18 @@
{
"icao_aa":"3B7761",
"callsign":"FRBAR",
"squawk" : "7000",
"alt_ft": 15500.0,
"lat_deg": 50.24994,
"lon_deg": 30.5211,
"speed_kph": 750.0,
"vspeed_ftpmin": 0.0,
"maxloadfactor": 1.06,
"track_angle_deg": 0.0,
"capability": 5,
"type_code": 11,
"surveillance_status": 0,
"timesync": 0,
"nicsup": 0,
"on_surface": false
}

18
examples/AN-IL78MP.json Normal file
View File

@ -0,0 +1,18 @@
{
"icao_aa":"0x50FF5E",
"callsign":"R10002",
"squawk" : "7000",
"alt_ft": 6500.0,
"lat_deg": 50.44994,
"lon_deg": 30.8211,
"speed_kph": 500.0,
"vspeed_ftpmin": 0.0,
"maxloadfactor": 1.06,
"track_angle_deg": 0.0,
"capability": 5,
"type_code": 11,
"surveillance_status": 0,
"timesync": 0,
"nicsup": 0,
"on_surface": false
}

18
examples/MRYIA.json Normal file
View File

@ -0,0 +1,18 @@
{
"icao_aa":"0x508035",
"callsign":"MRYIA",
"squawk": "7000",
"alt_ft": 4500.0,
"lat_deg": 50.44994,
"lon_deg": 30.5211,
"speed_kph": 600.0,
"vspeed_ftpmin": 0.0,
"maxloadfactor": 1.03,
"track_angle_deg": 0.0,
"capability": 5,
"type_code": 11,
"surveillance_status": 0,
"timesync": 0,
"nicsup": 0,
"on_surface": false
}

7
examples/scenario.json Normal file
View File

@ -0,0 +1,7 @@
{
"plane1":
{
"filename":"./examples/MRYIA.json",
"trajectory_type":"circle"
}
}

12
examples/scenario2.json Normal file
View File

@ -0,0 +1,12 @@
{
"plane1":
{
"filename":"./examples/MRYIA.json",
"trajectory_type":"circle"
},
"plane2":
{
"filename":"./examples/AN-IL78MP.json",
"trajectory_type":"circle"
}
}

22
examples/scenario3.json Normal file
View File

@ -0,0 +1,22 @@
{
"plane1":
{
"filename":"./examples/MRYIA.json",
"trajectory_type":"circle"
},
"plane2":
{
"filename":"./examples/AN-IL78MP.json",
"trajectory_type":"circle"
},
"plane3":
{
"filename":"./examples/A400M-094.json",
"trajectory_type":"circle"
},
"plane4":
{
"filename":"./examples/A400M-110.json",
"trajectory_type":"circle"
}
}

View File

Before

Width:  |  Height:  |  Size: 1.2 MiB

After

Width:  |  Height:  |  Size: 1.2 MiB

View File

Before

Width:  |  Height:  |  Size: 1.6 MiB

After

Width:  |  Height:  |  Size: 1.6 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 MiB

View File

Before

Width:  |  Height:  |  Size: 93 KiB

After

Width:  |  Height:  |  Size: 93 KiB

View File

@ -14,8 +14,9 @@ You should have received a copy of the GNU General Public License along with
this program. If not, see <http://www.gnu.org/licenses/>.
"""
import sys, time, math
import threading
import sys, time, math, os
import threading, json
import traceback
from AircraftInfos import AircraftInfos
from FixedTrajectorySimulator import FixedTrajectorySimulator
@ -30,6 +31,7 @@ def usage(msg=False):
if msg:print(msg)
print("Usage: %s [options]\n" % sys.argv[0])
print("-h | --help Display help message.")
print("--scenario <opt> Scenario mode with a provided scenario filepath")
print("--icao <opt> Callsign in hex, Default:0x508035")
print("--callsign <opt> Callsign (8 chars max), Default:DEADBEEF")
print("--squawk <opt> 4-digits 4096 code squawk, Default:7000")
@ -59,6 +61,21 @@ def usage(msg=False):
sys.exit(2)
def getTrackSimulationThread(trajectory_type,brodcast_thread,aircraftinfos,waypoints_file = None):
if trajectory_type == 'fixed':
return FixedTrajectorySimulator(brodcast_thread.getMutex(),brodcast_thread,aircraftinfos)
elif trajectory_type == 'circle':
return PseudoCircleTrajectorySimulator(brodcast_thread.getMutex(),brodcast_thread,aircraftinfos)
elif trajectory_type == 'random':
return RandomTrajectorySimulator(brodcast_thread.getMutex(),brodcast_thread,aircraftinfos)
elif trajectory_type == 'waypoints':
print("WaypointsTrajectorySimulator not implemented yet")
exit(-1)
return WaypointsTrajectorySimulator(brodcast_thread.getMutex(),brodcast_thread,aircraftinfos,waypoints_file)
else:
return None
def main():
# Default values
@ -82,10 +99,10 @@ def main():
trajectory_type = 'fixed'
waypoints_file = None
posrate = 150000
scenariofile = None
try:
(opts, args) = getopt(sys.argv[1:], 'h', \
['help','icao=','callsign=','squawk=','trajectorytype=','lat=','long=','altitude=','speed=','vspeed=','maxloadfactor=','trackangle=',
['help','scenario=','icao=','callsign=','squawk=','trajectorytype=','lat=','long=','altitude=','speed=','vspeed=','maxloadfactor=','trackangle=',
'timesync=','capability=','typecode=','sstatus=','nicsupplementb=','surface','posrate='
])
except GetoptError as err:
@ -94,6 +111,7 @@ def main():
if len(opts) != 0:
for (opt, arg) in opts:
if opt in ('-h', '--help'):usage()
elif opt in ('--scenario'):scenariofile = arg
elif opt in ('--icao'):icao_aa = arg
elif opt in ('--callsign'):callsign = arg
elif opt in ('--squawk'):squawk = arg
@ -117,38 +135,64 @@ def main():
elif opt in ('--posrate'):posrate = int(arg)
else:usage("Unknown option %s\n" % opt)
aircraftinfos = AircraftInfos(icao_aa,callsign,squawk, \
lat_deg,lon_deg,alt_ft,speed_kph,vspeed_ftpmin,maxloadfactor,track_angle_deg, \
timesync,capability,type_code,surveillance_status,nicsup,on_surface)
track_simulators = []
broadcast_thread = HackRfBroadcastThread(posrate) # posrate would usally be used with random mode to generate load of tracks
# TODO : the mutex would better be implemented as an object attribute in broadcast thread
mutex = threading.Lock()
if scenariofile == None:
print("Going to run in single plane from command line mode")
aircraftinfos = AircraftInfos(icao_aa,callsign,squawk, \
lat_deg,lon_deg,alt_ft,speed_kph,vspeed_ftpmin,maxloadfactor,track_angle_deg, \
timesync,capability,type_code,surveillance_status,nicsup,on_surface)
brodcast_thread = HackRfBroadcastThread(mutex,posrate) # posrate would usally be used with random mode to generate load of tracks
track_simulation = getTrackSimulationThread(trajectory_type,broadcast_thread,aircraftinfos,waypoints_file)
if trajectory_type == 'fixed':
trajectory_simulator = FixedTrajectorySimulator(mutex,brodcast_thread,aircraftinfos)
elif trajectory_type == 'circle':
trajectory_simulator = PseudoCircleTrajectorySimulator(mutex,brodcast_thread,aircraftinfos)
elif trajectory_type == 'random':
trajectory_simulator = RandomTrajectorySimulator(mutex,brodcast_thread,aircraftinfos)
elif trajectory_type == 'waypoints':
print("WaypointsTrajectorySimulator not implemented yet")
exit(-1)
trajectory_simulator = WaypointsTrajectorySimulator(mutex,brodcast_thread,aircraftinfos,waypoints_file)
track_simulators.append(track_simulation)
else:
print("Going to run in json scenario mode from file "+os.path.abspath(scenariofile))
with open(scenariofile,'r') as json_file:
scenario = json.load(json_file)
for plane in scenario.values():
plane_info = AircraftInfos.from_json(plane["filename"])
if "waypoints_file" in plane:
waypoints_file = plane["waypoints_file"]
track_simulation = getTrackSimulationThread(plane["trajectory_type"],broadcast_thread,plane_info,waypoints_file)
track_simulators.append(track_simulation)
print("scenario contains tracks simulation instructions for "+str(len(track_simulators))+" planes:")
for tsim in track_simulators:
print("callsign: "+tsim.aircraftinfos.callsign.ljust(9,' ')+"MSL altitude: "+"{:7.1f}".format(tsim.aircraftinfos.alt_msl_ft)+" ft")
for tsim in track_simulators:
broadcast_thread.register_track_simulation_thread(tsim)
while(val:=input("Type \'s + Enter\' to start the adsb-out simulation, and type \'s + Enter\' again to stop it:\n") != 's'):
time.sleep(0.05)
trajectory_simulator.start()
brodcast_thread.start()
# start all threads
for tsim in track_simulators:
tsim.start()
broadcast_thread.start()
# user input loop. Todo : implement other commands ? (in that case don't forget to check if mutex protection is needed)
while(val:=input("") != 's'):
time.sleep(0.05)
trajectory_simulator.stop()
brodcast_thread.stop()
trajectory_simulator.join()
brodcast_thread.join()
# stop all threads
for tsim in track_simulators:
tsim.stop()
broadcast_thread.stop()
# wait for all threads to terminate
for tsim in track_simulators:
tsim.join()
broadcast_thread.join()
print("reatime-adsb-out simulation is finished")