realtime-adsb-out/HackRfBroadcastThread.py

234 lines
11 KiB
Python
Raw Permalink Normal View History

2022-03-11 04:25:48 +08:00
""" This class holds the aircraft states from the ADS-B point of view
It is refreshed by the simulation thread (or sensor feed thread) and will
be used to provide broadcasted informations
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see <http://www.gnu.org/licenses/>.
"""
#
# This class overrides threading.Thread and provides service to broacast
# ADS-B message though a HackRF device
# message updates are performed from a separate thread which will
# update/push messages thanks to the replace_message method
# thread loop will pump and broacast updated message (soft realtime)
#
# mutex protection mecanism is implemented in
# replace_message() which is call from other thread
# broadcast_one_message() which is called from this thread
# in order to prevent concurrent access to broadcasted data buffers
import time, datetime, math
import threading
from CustomDecorators import *
from ADSBLowLevelEncoder import ADSBLowLevelEncoder
from pyhackrf import *
from ctypes import *
class hackrf_tx_context(Structure):
_fields_ = [("buffer", POINTER(c_ubyte)),
("last_tx_pos", c_int),
("buffer_length", c_int) ]
def hackrfTXCB(hackrf_transfer):
user_tx_context = cast(hackrf_transfer.contents.tx_ctx, POINTER(hackrf_tx_context))
tx_buffer_length = hackrf_transfer.contents.valid_length
left = user_tx_context.contents.buffer_length - user_tx_context.contents.last_tx_pos
addr_dest = addressof(hackrf_transfer.contents.buffer.contents)
addr_src = addressof(user_tx_context.contents.buffer.contents)
if (left > tx_buffer_length):
memmove(addr_dest,addr_src,tx_buffer_length)
user_tx_context.contents.last_tx_pos += tx_buffer_length
return 0
else:
memmove(addr_dest,addr_src,left)
memset(addr_dest+left,0,tx_buffer_length-left)
return -1
@Singleton
class HackRfBroadcastThread(threading.Thread):
def __init__(self,airborne_position_refresh_period = 150000):
2022-03-11 04:25:48 +08:00
super().__init__()
self._mutex = threading.Lock()
2022-03-11 04:25:48 +08:00
self._lowlevelencoder = ADSBLowLevelEncoder()
self._messages_feed_threads = {}
2022-03-11 04:25:48 +08:00
# Initialize pyHackRF library
result = HackRF.initialize()
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
# Initialize HackRF instance (could pass board serial or index if specific board is needed)
self._hackrf_broadcaster = HackRF()
# Do requiered settings
# so far hard-coded e.g. gain and disabled amp are specific to hardware test setup
# with hackrf feeding a flight aware dongle through cable + attenuators (-50dB)
result = self._hackrf_broadcaster.open()
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
self._hackrf_broadcaster.setCrystalPPM(0)
2022-03-11 04:25:48 +08:00
result = self._hackrf_broadcaster.setSampleRate(2000000)
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
result = self._hackrf_broadcaster.setBasebandFilterBandwidth(HackRF.computeBaseBandFilterBw(2000000))
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
#result = self.hackrf_broadcaster.setFrequency(868000000) # free frequency for over the air brodcast tests
result = self._hackrf_broadcaster.setFrequency(1090000000) # do not use 1090MHz for actual over the air broadcasting
2022-03-11 04:25:48 +08:00
# only if you use wire feed (you'll need attenuators in that case)
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
result = self._hackrf_broadcaster.setTXVGAGain(4) # week gain (used for wire feed + attenuators)
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
result = self._hackrf_broadcaster.setAmplifierMode(LibHackRfHwMode.HW_MODE_OFF)
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
self._tx_context = hackrf_tx_context()
self._do_stop = False
# do hackRF lib and instance cleanup at object destruction time
def __del__(self):
result = self._hackrf_broadcaster.close()
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
result = HackRF.deinitialize()
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
def stop(self):
self._do_stop = True
def getMutex(self):
return self._mutex
2022-03-11 04:25:48 +08:00
# updates the next data to be broadcaster for a given message type
#@Timed
2022-03-11 04:25:48 +08:00
def replace_message(self,type,frame_even,frame_odd = []):
frame_IQ = self._lowlevelencoder.frame_1090es_ppm_modulate_IQ(frame_even, frame_odd)
2022-03-11 04:25:48 +08:00
# this will usually be called from another thread, so mutex lock mecanism is used during update
2022-03-11 04:25:48 +08:00
self._mutex.acquire()
calling_thread = threading.current_thread()
if calling_thread in self._messages_feed_threads:
self._messages_feed_threads[calling_thread][type][0] = frame_IQ
2022-03-11 04:25:48 +08:00
self._mutex.release()
def register_track_simulation_thread(self,feeder_thread):
if feeder_thread in self._messages_feed_threads:
print(feeder_thread,"already registred as a feeder")
else:
self._messages_feed_threads[feeder_thread] = {}
# key : "name of message" value : ["data to be broadcasted", datetime of last broadcast, delay_between 2 messages of this type]
self._messages_feed_threads[feeder_thread]["identification"] = [None, None, feeder_thread.identitification_message_period_us]
self._messages_feed_threads[feeder_thread]["register_6116"] = [None, None, feeder_thread.squawk_message_period_us]
self._messages_feed_threads[feeder_thread]["airborne_position"] = [None, None, feeder_thread.position_message_period_us]
self._messages_feed_threads[feeder_thread]["surface_position"] = [None, None, feeder_thread.position_message_period_us]
self._messages_feed_threads[feeder_thread]["airborne_velocity"] = [None, None, feeder_thread.velocity_message_period_us]
def broadcast_data(self,data):
length = len(data)
if length != 0:
sleep_time = length*0.50e-6*(1.0+1e-6*self._hackrf_broadcaster.getCrystalPPM())
self._tx_context.last_tx_pos = 0
self._mutex.acquire()
self._tx_context.buffer_length = length
self._tx_context.buffer = (c_ubyte*self._tx_context.buffer_length).from_buffer_copy(data)
# TODO : need to evaluate if mutex protection is requiered during full broadcast or
# could be reduced to buffer filling (probably can be reduced)
# reduced version is when next line mutex.release() is uncommented and
# mutex release at the end of this method is commented
self._mutex.release()
result = self._hackrf_broadcaster.startTX(hackrfTXCB,self._tx_context)
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
while self._hackrf_broadcaster.isStreaming():
time.sleep(sleep_time)
result = self._hackrf_broadcaster.stopTX()
if (result != LibHackRfReturnCode.HACKRF_SUCCESS):
print("Error :",result, ",", HackRF.getHackRfErrorCodeName(result))
#self._mutex.release()
2022-03-11 04:25:48 +08:00
def run(self):
2022-03-11 04:25:48 +08:00
while not self._do_stop:
#self._mutex.acquire()
now = datetime.datetime.now(datetime.timezone.utc)
plane_messages = bytearray()
sleep_time = 10.0
for thread_broadcast_schedule in self._messages_feed_threads.values():
for v in thread_broadcast_schedule.values():
#now = datetime.datetime.now(datetime.timezone.utc)
v2_sec = v[2]*1e-6
if v[1] != None:
remaining = v2_sec - (now - v[1]).total_seconds()
else:
remaining = -float('inf')
sleep_time = 0.0
# Time throttling : messages are broadcasted only at provided time intervall
# TODO : implement UTC syncing mecanism (requiered that the actual host clock is UTC synced) ?
# which may be implemented to some accuracy level with ntp or GPS + PPS mecanisms ? in Python ?
if (v[0] != None and len(v[0]) > 0) and remaining <= 0.0:
plane_messages.extend(v[0])
v[1] = now
elif remaining > 0.0:
remaining = math.fmod(remaining,v2_sec)
if remaining < sleep_time:
sleep_time = remaining
#print("sleep_time1",sleep_time)
bc_length = len(plane_messages)
if (bc_length > 0):
self.broadcast_data(plane_messages)
elasped = (datetime.datetime.now(datetime.timezone.utc) - now).total_seconds()
sleep_time -= elasped
if sleep_time < 0.0:
sleep_time = 0.0
elif sleep_time < 0.5:
sleep_time *= 0.1
else:
sleep_time = 0.5
time.sleep(0.1*sleep_time)
else:
time.sleep(0.000001)
#self._mutex.release()
2022-03-11 04:25:48 +08:00
# upon exit, reset _do_stop flag in case there is a new start
self._do_stop = False