gr-air-modes/apps/modes_gui

337 lines
12 KiB
Python
Executable File

#!/usr/bin/python
import os, sys, time, threading
from PyQt4 import QtCore,QtGui
from gnuradio import gr, gru, optfir, eng_notation, blks2
import gnuradio.gr.gr_threading as _threading
import air_modes
from air_modes.modes_exceptions import *
from air_modes.modes_rx_ui import Ui_MainWindow
import csv
import sqlite3
class mainwindow(QtGui.QMainWindow):
def __init__(self):
QtGui.QMainWindow.__init__(self)
self.ui = Ui_MainWindow()
self.ui.setupUi(self)
#set defaults
#add file, RTL, UHD sources
self.ui.combo_source.addItems(["UHD device", "RTL-SDR", "File"])
self.ui.combo_source.setCurrentIndex(0)
#populate antenna, rate combo boxes based on source
self.populate_source_options()
#should round to actual achieved gain
self.ui.line_gain.insert("30")
#default to 3dB
self.ui.line_threshold.insert("3")
self.ui.combo_ant.setCurrentIndex(self.ui.combo_ant.findText("RX2"))
#check KML by default, leave the rest unchecked.
self.ui.check_sbs1.setCheckState(QtCore.Qt.Unchecked)
self.ui.check_raw.setCheckState(QtCore.Qt.Unchecked)
self.ui.check_fgfs.setCheckState(QtCore.Qt.Unchecked)
self.ui.check_kml.setCheckState(QtCore.Qt.Checked)
self.ui.line_sbs1port.insert("30003")
self.ui.line_rawport.insert("9988")
self.ui.line_fgfsport.insert("5500")
self.ui.line_kmlfilename.insert("modes.kml")
#disable by default
self.ui.check_adsbonly.setCheckState(QtCore.Qt.Unchecked)
self.queue = gr.msg_queue(10)
self.runner = None
self.fg = None
self.outputs = []
self.updates = []
self.output_handler = None
self.kmlgen = None #necessary bc we stop its thread in shutdown
self.dbinput = None
self.dbname = 'air_modes.db'
self.datamodel = modes_datamodel(self.dbname)
self.ui.list_aircraft.setModel(self.datamodel)
self.ui.list_aircraft.show() #TODO remove
#goes and gets valid antenna, sample rate options from the device and grays out appropriate things
def populate_source_options(self):
sourceid = self.ui.combo_source.currentText()
self.rates = []
self.ratetext = []
self.antennas = []
if sourceid == "UHD device":
try:
from gnuradio import uhd
self.src = uhd.single_usrp_source("", uhd.io_type_t.COMPLEX_FLOAT32, 1)
self.rates = [rate.start() for rate in self.src.get_samp_rates()]
self.antennas = self.src.get_antennas()
self.src = None #deconstruct UHD source for now
self.ui.combo_ant.setEnabled(True)
self.ui.combo_rate.setEnabled(True)
self.ui.stack_source.setCurrentIndex(0)
except:
self.rates = []
self.antennas = []
self.ui.combo_ant.setEnabled(False)
self.ui.combo_rate.setEnabled(False)
self.ui.stack_source.setCurrentIndex(0)
elif sourceid == "RTL-SDR":
self.rates = [2.4e6]
self.antennas = ["RX"]
self.ui.combo_ant.setEnabled(False)
self.ui.combo_rate.setEnabled(False)
self.ui.stack_source.setCurrentIndex(0)
elif sourceid == "File":
self.rates = [2e6, 4e6, 6e6, 8e6, 10e6]
self.antennas = ["None"]
self.ui.combo_ant.setEnabled(False)
self.ui.combo_rate.setEnabled(True)
self.ui.stack_source.setCurrentIndex(1)
self.ui.combo_rate.clear()
self.ratetext = ["%.3f" % (rate / 1.e6) for rate in self.rates]
for rate, text in zip(self.rates, self.ratetext):
self.ui.combo_rate.addItem(text, rate)
self.ui.combo_ant.clear()
self.ui.combo_ant.addItems(self.antennas)
if 4e6 in self.rates:
self.ui.combo_rate.setCurrentIndex(self.rates.index(4e6))
def on_combo_source_currentIndexChanged(self, index):
self.populate_source_options()
def on_button_start_released(self):
#if we're already running, kill it!
if self.runner is not None:
self.output_handler.done = True
self.output_handler = None
self.outputs = []
self.updates = []
self.fg.stop()
self.runner = None
self.fg = None
if self.kmlgen is not None:
self.kmlgen.done = True
#TODO FIXME need a way to kill kmlgen safely without delay
#self.kmlgen.join()
#self.kmlgen = None
if self.dbinput is not None: self.dbinput.close()
self.dbinput = None
self.ui.button_start.setText("Start")
else: #we aren't already running, let's get this party started
options = {}
options["source"] = str(self.ui.combo_source.currentText())
options["rate"] = int(self.ui.combo_rate.currentIndex())
options["antenna"] = str(self.ui.combo_ant.currentText())
options["gain"] = float(self.ui.line_gain.text())
options["threshold"] = float(self.ui.line_threshold.text())
options["filename"] = str(self.ui.line_inputfile.text())
self.fg = adsb_rx_block(options, self.queue) #create top RX block
self.runner = top_block_runner(self.fg) #spawn new thread to do RX
try:
my_position = [float(self.ui.line_my_lat.text()), float(self.ui.line_my_lon.text())]
except:
my_position = None
self.outputs = []
self.updates = []
#output options to populate outputs, updates
if self.ui.check_kml.checkState():
#we spawn a thread to run every 30 seconds (or whatever) to generate KML
self.kmlgen = air_modes.modes_kml(self.ui.line_kmlfilename.text(), self.dbname, my_position) #create a KML generating thread
if self.ui.check_sbs1.checkState():
sbs1port = int(self.ui.line_sbs1port.text())
sbs1out = air_modes.modes_output_sbs1(my_position, sbs1port)
self.outputs.append(sbs1out.output)
self.updates.append(sbs1out.add_pending_conns)
if self.ui.check_fgfs.checkState():
fghost = "127.0.0.1" #TODO FIXME
fgport = self.ui.line_fgfsport.currentText()
fgout = air_modes.modes_flightgear(my_position, fghost, int(fgport))
self.outputs.append(fgout.output)
if self.ui.check_raw.checkState():
rawport = air_modes.modes_raw_server(int(self.ui.line_raw.text()))
self.outputs.append(rawport.output)
self.updates.append(rawport.add_pending_conns)
self.livedata = air_modes.modes_output_print(my_position)
#add output for live data box
self.outputs.append(self.output_live_data)
#create SQL database for KML and dashboard displays
self.dbwriter = air_modes.modes_output_sql(my_position, self.dbname)
self.outputs.append(self.dbwriter.output) #now the db will update itself
#create output handler thread
self.output_handler = output_handler(self.outputs, self.updates, self.queue)
self.ui.button_start.setText("Stop")
def output_live_data(self, msg):
msgstr = self.livedata.parse(msg)
if msgstr is not None:
self.ui.text_livedata.append(msgstr)
self.ui.text_livedata.verticalScrollBar().setSliderPosition(self.ui.text_livedata.verticalScrollBar().maximum())
def on_list_aircraft_clicked(self, index):
icao = long(str(index.data().toString()), 16)
#ok now let's fetch the info for this icao and update the display
print icao
#eventually: set up a QTimer or whatever and have it self-update and emit dataChanged()
#better yet just have the SQL interface yell and say hey that line changed.
#on selected aircraft or on update for visible aircraft, emit signal to update current dashboard display.
class modes_datamodel(QtCore.QAbstractListModel):
def __init__(self, dbname):
QtCore.QAbstractListModel.__init__(self)
self.db = sqlite3.connect(dbname)
def rowCount(self, parent=QtCore.QModelIndex()):
icaoquery = "select count(distinct icao) from positions"
cursor = self.db.cursor()
cursor.execute(icaoquery)
icaolist = cursor.fetchall()
cursor.close()
return icaolist[0][0]
def data(self, index, role=QtCore.Qt.DisplayRole):
if not index.isValid():
return QtCore.QVariant()
if index.row() >= self.rowCount():
return QtCore.QVariant()
if role != QtCore.Qt.DisplayRole:
return QtCore.QVariant()
icaoquery = "select distinct icao from positions order by icao"
cursor = self.db.cursor()
cursor.execute(icaoquery)
icaolist = cursor.fetchall()
cursor.close()
return "%06x" % icaolist[index.row()][0]
class output_handler(threading.Thread):
def __init__(self, outputs, updates, queue):
threading.Thread.__init__(self)
self.setDaemon(1)
self.outputs = outputs
self.updates = updates
self.queue = queue
self.done = False
self.start()
def run(self):
while self.done is False:
for update in self.updates:
update()
if not self.queue.empty_p():
msg = self.queue.delete_head()
for output in self.outputs:
try:
output(msg.to_string())
except ADSBError:
pass
time.sleep(0.1)
self.done = True
self.outputs = None
self.updates = None
self.queue = None
class top_block_runner(_threading.Thread):
def __init__(self, tb):
_threading.Thread.__init__(self)
self.setDaemon(1)
self.tb = tb
self.done = False
self.start()
def run(self):
self.tb.run()
self.done = True
#Top block for ADSB receiver. If you define a standard interface you
#can make this common code between the GUI app and the cmdline app
class adsb_rx_block (gr.top_block):
def __init__(self, options, queue):
gr.top_block.__init__(self)
self.options = options
rate = options["rate"]
use_resampler = False
freq = 1090e6
if options["source"] == "UHD device":
from gnuradio import uhd
self.u = uhd.single_usrp_source("", uhd.io_type_t.COMPLEX_FLOAT32, 1)
time_spec = uhd.time_spec(0.0)
self.u.set_time_now(time_spec)
self.u.set_antenna(options["antenna"])
self.u.set_samp_rate(rate)
rate = self.u.get_samp_rate()
self.u.set_gain(int(options["gain"]))
self.u.set_center_freq(freq, 0)
elif options["source"] == "RTL-SDR":
import osmosdr
self.u = osmosdr.source_c()
self.u.set_sample_rate(2.4e6) #fixed for RTL dongles
rate = int(4e6)
self.u.set_gain_mode(0) #manual gain mode
self.u.set_gain(int(options["gain"]))
self.u.set_center_freq(freq, 0)
use_resampler = True
elif options["source"] == "File":
self.u = gr.file_source(gr.sizeof_gr_complex, options["filename"])
else:
raise NotImplementedError
self.demod = gr.complex_to_mag()
self.avg = gr.moving_average_ff(100, 1.0/100, 400)
self.preamble = air_modes.modes_preamble(int(rate), float(options["threshold"]))
self.slicer = air_modes.modes_slicer(int(rate), queue)
if use_resampler:
self.lpfiltcoeffs = gr.firdes.low_pass(1, 5*2.4e6, 1.2e6, 300e3)
self.resample = blks2.rational_resampler_ccf(interpolation=5, decimation=3, taps=self.lpfiltcoeffs)
self.connect(self.u, self.resample, self.demod)
else:
self.connect(self.u, self.demod)
self.connect(self.demod, self.avg)
self.connect(self.demod, (self.preamble, 0))
self.connect(self.avg, (self.preamble, 1))
self.connect(self.preamble, self.slicer)
if __name__ == '__main__':
app = QtGui.QApplication(sys.argv)
window = mainwindow()
window.setWindowTitle("Mode S/ADS-B receiver")
window.show()
sys.exit(app.exec_())