Move fgtools module into it's own subfolder, add scripts to fix ICAO's of apt.dat files (mainly for those pulled from the XPlane gateway) and to generate apt.dat files from OurAirports and OSM data

This commit is contained in:
TheFGFSEagle 2022-09-15 20:33:25 +02:00
parent eae92402bc
commit 875799b048
20 changed files with 1295 additions and 110 deletions

0
__init__.py → fgtools/__init__.py Executable file → Normal file
View File

386
fgtools/aptdat.py Normal file
View File

@ -0,0 +1,386 @@
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import logging
import os
from fgtools.utils import files
from fgtools import geo
from fgtools.utils import unit_convert
from fgtools import utils
class Code:
def __init__(self, name, code):
self.name = name
self.code = code
def __str__(self):
return self.name
def __repr__(self):
return str(self.code)
def __int__(self):
return self.code
def __float__(self):
return float(self.code)
def __eq__(self, other):
if isinstance(other, Code):
return self.name == other.name
elif isinstance(other, str):
return self.name == other
elif isinstance(other, int):
return self.code == other
def __ne__(self, other):
if isinstance(other, Code):
return self.name != other.name
elif isinstance(other, str):
return self.name != other
elif isinstance(other, int):
return self.code != other
class CodeEnum:
def __init__(self, names, codes):
for name, code in zip(names, codes):
setattr(self, name, Code(name, code))
SurfaceCode = CodeEnum(
("Asphalt", "Concrete", "Grass", "Dirt", "Gravel", "DryLakebed", "Water", "SnowIce", "Transparent"),
list(range(1, 6)) + list(range(12, 16))
)
RunwayShoulderCode = CodeEnum(
("NoShoulder", "Asphalt", "Concrete"),
range(3)
)
RunwayMarkingCode = CodeEnum(
("NoMarkings", "Visual", "NonPrecision", "Precision", "UKNonPrecision", "UKPrecision"),
range(6)
)
ApproachLightsCode = CodeEnum(
("NoLights", "ALSF_I", "ALSF_II", "Calvert", "CalvertILS", "SSALR", "SSALF", "SALS", "MALSR", "MALSF", "MALS", "ODALS", "RAIL"),
range(13)
)
LineTypeCode = CodeEnum(
(
"NoLine", "TaxiwayCenter", "MiscBoundary", "TaxiwayEdge", "RunwayHold", "OtherHold", "ILSHold",
"TaxiwayCenterRunwaySafety", "AircraftQueueLane", "AircraftQueueLaneDouble",
"TaxiwayCenterLarge", "TaxiwayCenterRunwaySafetyLarge", "RunwayHoldLarge", "OtherHoldLarge", "ILSHoldLarge",
"TaxiwayCenterBordered", "MiscBoundaryBordered", "TaxiwayEdgeBordered", "RunwayHoldBordered",
"OtherHoldBordered", "ILSHoldBordered", "TaxiwayCenterRunwaySafetyBordered", "AircraftQueueLaneBordered",
"AircraftQueueLaneDoubleBordered", "TaxiwayCenterLargeBordered", "TaxiwayCenterRunwaySafetyLargeBordered",
"RunwayHoldLargeBordered", "OtherHoldLargeBordered", "ILSHoldLargeBordered",
"TaxiwayShoulder", "RoadwayEdge", "RoadwayEdgeAircraftMovingArea", "RoadwayCenter", "RoadwayEdgeBroken",
"RoadwayEdgeWide", "RoadwayCenterWide", "SolidRed", "DashedRed", "SolidRedWide", "SolidOrange",
"SolidBlue", "SolidGreen", "RoadwayEdgeBordered", "RoadwayEdgeAircraftMovingAreaBordered",
"RoadwayCenterBordered", "RoadwayEdgeBrokenBordered", "RoadwayEdgeWideBordered",
"RoadwayCenterWideBordered", "SolidRedBordered", "DashedRedBordered", "SolidRedWideBordered",
"SolidOrangeBordered", "SolidBlueBordered", "SolidGreenBordered"
),
tuple(range(15)) + tuple(range(51, 65)) + tuple(range(19, 26)) + (30, 31, 32) + (40, 41, 42) + \
tuple(range(70, 76)) + (80, 81, 82) + (90, 91, 92)
)
LineLightTypeCode = CodeEnum(
(
"TaxiwayCenter", "TaxiwayEdge", "Hold", "RunwayHold", "TaxiwayCenterRunwaySafety",
"TaxiwayEdgeDangerous", "TaxiwayLeadOff", "TaxiwayLeadOffAmber"
),
range(101, 109)
)
BeaconTypeCode = CodeEnum(
("NoBeacon", "CivilianAirport", "Seaport", "Heliport", "MilitaryAirport"),
range(5)
)
SignSizeCode = CodeEnum(
("TaxiwaySmall", "TaxiwayMedium", "TaxiwayLarge", "DistanceRemainingLarge", "DistanceRemainingSmall"),
range(1, 6)
)
LightingObjectCode = CodeEnum(
("VASI", "PAPI4L", "PAPI4R", "PAPISpaceShuttle", "TriColorVASI", "RunwayGuard"),
range(1, 7)
)
REILCode = CodeEnum(
("NoREIL", "OmnidirREIL", "UnidirREIL"),
range(3)
)
AirportType = CodeEnum(
("Land", "Sea", "Heli"),
(1, 16, 17)
)
class Helipad:
def __init__(self, id, lon, lat, heading, length, width, surface, shoulder=RunwayShoulderCode.NoShoulder,
smoothness=0.25, edge_lights=False):
self.id = id
self.lon = lon
self.lat = lat
self.heading = heading
self.length = length
self.width = width
self.surface = surface
self.shoulder = shoulder
self.smoothness = smoothness
self.edge_lights = edge_lights
def write(self, f):
f.write((f"102 {self.id} {float(self.lat)} {float(self.lon)} {float(self.heading)} {float(self.length):.2f}" +
f" {float(self.width):.2f} {int(self.surface)} {int(self.shoulder)} {float(self.smoothness):.2f} {int(self.edge_lights)}\n"))
class Runway:
def __init__(self, width, id1, lon1, lat1, id2, lon2, lat2):
self.width = width
self.id1 = id1
self.lon1 = lon1
self.lat1 = lat1
self.id2 = id2
self.lon2 = lon2
self.lat2 = lat2
def get_heading1(self):
return geo.get_bearing_deg(self.lon1, self.lat1, self.lon2, self.lat2)
def get_heading2(self):
return utils.wrap_period(self.get_heading1() + 180, 0, 360)
def get_length_m(self):
return geo.great_circle_distance_m(self.lon1, self.lat1, self.lon2, self.lat2)
def get_length_ft(self):
return unit_convert.m2ft(self.get_length_m())
class WaterRunway(Runway):
def __init__(self, width, id1, lon1, lat1, id2, lon2, lat2, perimeter_buoys=False):
Runway.__init__(self, width, id1, lon1, lat1, id2, lon2, lat2)
self.perimeter_buoys = perimeter_buoys
def write(self, f):
f.write((f"101 {float(self.width):.2f} {int(self.perimeter_buoys)} {self.id1} {float(self.lat1):.8f}" +
f" {float(self.lon1):.8f} {self.id2} {float(self.lat2):.8f} {float(self.lon2):.8f}\n"))
class LandRunway(Runway):
def __init__(self, width, surface, id1, lon1, lat1, id2, lon2, lat2,
smoothness=0.25, shoulder=RunwayShoulderCode.NoShoulder, center_lights=False,
edge_lights=False, distance_signs=False,
displ_thresh1=0, blastpad1=0, markings1=RunwayMarkingCode.Visual,
appr_lights1=ApproachLightsCode.NoLights, tdz_lights1=False, reil_type1=REILCode.NoREIL,
displ_thresh2=0, blastpad2=0, markings2=RunwayMarkingCode.Visual,
appr_lights2=ApproachLightsCode.NoLights, tdz_lights2=False, reil_type2=REILCode.NoREIL):
Runway.__init__(self, width, id1, lon1, lat1, id2, lon2, lat2)
self.surface = surface
self.shoulder = shoulder
self.smoothness = smoothness
self.center_lights = center_lights
self.edge_lights = edge_lights
self.distance_signs = distance_signs
self.displ_thresh1 = displ_thresh1
self.blastpad1 = blastpad1
self.markings1 = markings1
self.appr_lights1 = appr_lights1
self.tdz_lights1 = tdz_lights1
self.reil_type1 = reil_type1
self.displ_thresh2 = displ_thresh2
self.blastpad2 = blastpad2
self.markings2 = markings2
self.appr_lights2 = appr_lights2
self.tdz_lights2 = tdz_lights2
self.reil_type2 = reil_type2
def write(self, f):
f.write((f"100 {float(self.width):.2f} {int(self.surface)} {int(self.shoulder)} {self.smoothness} {int(self.center_lights)}" +
f" {int(self.edge_lights)} {int(self.distance_signs)}" +
f" {self.id1} {float(self.lat1):.8f} {float(self.lon1):.8f} {float(self.displ_thresh1):.2f} {float(self.blastpad1):.2f} {int(self.markings1)}" +
f" {int(self.appr_lights2)} {int(self.tdz_lights2)} {int(self.reil_type2)}" +
f" {self.id2} {float(self.lat2):.8f} {float(self.lon2):.8f} {float(self.displ_thresh2):.2f} {float(self.blastpad2):.2f} {int(self.markings2)}" +
f" {int(self.appr_lights2)} {int(self.tdz_lights2)} {int(self.reil_type2)}\n"))
class Airport:
def __init__(self, elev, icao, name, lon, lat, type=AirportType.Land):
self.runways = {}
self.helipads = {}
self.parkings = []
self.aprons = []
self.tower = None
self.windsocks = []
self.beacons = []
self.elev = elev
self.icao = icao
self.name = name
self.type = type
self.lon = lon
self.lat = lat
def add_runway(self, runway):
self.runways[runway.id1] = runway
def add_helipad(self, helipad):
self.helipads[helipad.id] = helipad
def write(self, f):
f.write(f"{repr(self.type)} {int(self.elev)} 0 0 {self.icao} {self.name}\n")
f.write(f"1302 datum_lat {self.lat}\n")
f.write(f"1302 datum_lon {self.lon}\n")
f.write(f"1302 icao_code {self.icao}\n")
for id in self.runways:
self.runways[id].write(f)
for id in self.helipads:
self.helipads[id].write(f)
"""for parking in self.parkings:
parking.write(f)
for apron in self.aprons:
apron.write(f)
if self._tower:
self.tower.write(f)
for windsock in self.windsocks:
windsock.write(f)
for beacon in self.beacons:
beacon.write(f)"""
class ReaderWriterAptDat:
def __init__(self, file_header="Generated by fgtools.aptdat.ReaderWriterAptDat"):
self._airports = []
self.file_header = file_header
def _get_airport_index(self, icao):
for i, airport in enumerate(self._airports):
if airport.icao == icao:
return i
return -1
def add_airport(self, airport):
if not airport in self._airports:
self._airports.append(airport)
def add_airports(self, airports):
for airport in airports:
self.add_airport(airport)
def get_airport(self, icao):
i = _get_airport_index(icao)
if i > -1:
return self._airports[i]
def get_airports(self, icaos):
for icao in icaos:
yield self.get_airport(icao)
def set_airport(self, airport):
i = self._get_airport_index(airport.icao)
if i > -1:
self._airports[i] = airport
else:
self.add_airport(airport)
return i
def set_airports(self, airports):
for airport in airports:
self.set_airport(airport)
def remove_airport(self, icao):
return self._airports.pop(self._get_airport_index(icao))
def remove_airports(self, icaos):
for icao in icaos:
yield self._airports.pop(self._get_airport_index(icao))
def read(self, path):
exists = files.check_exists(path, exit=False)
if exists == 1:
pass
elif exists == 2:
self.read_multiple(os.listdir(path))
else:
logging.fatal(f"Path {path} does not exist - exiting !")
def read_multiple(self, paths):
for path in paths:
self.read(path)
# Write apt.dat files into output
# @param output -> str Path to put apt.dat files into
# @param merge -> bool Whether to merge all airports into one apt.dat file or write one file per airport
# @param overwrite -> bool Whether to overwrite an apt.dat file when it already exists
# @param overwrite_func -> callable Function whose return value replces overwrite - will get passed
# output_path as positional argument, will be called only if path actually
# exists and is a file
# @return bool 0 on success, 1 if the apt.dat file already exists and overwrite == False
def write(self, output, merge=False, overwrite=False, overwrite_func=None):
if len(self._airports) == 0:
print("ReaderWriterAptDat has no airports - not writing anything !")
return 1
if merge:
exists = files.check_exists(output, type="file", exit=False)
if exists == 2:
output = os.path.join(output, "apt.dat")
exists = files.check_exists(output, type="file", exit=False)
if exists == 1:
if callable(overwrite_func):
overwrite = overwrite_func(path)
if not overwrite:
print(f"Output file {output} exists already - not writing any airports !")
return 1
elif exists == 2:
print(f"Output path {path} for airport is a directory - skipping", end=" " * 100 + "\n")
return 1
with open(output, "w") as f:
self._write_header(f)
i = 0
total = len(airports)
for airport in self._airports:
print(f"Writing airports … {i / total * 100}% ({i} of {total} airports done)", end="\r")
i += 1
airport.write(f)
print(f"Writing airports … {i / total * 100}% ({i} of {total} airports done)")
self._write_footer(f)
else:
files.check_exists(output, type="dir")
i = 0
total = len(self._airports)
skipped = 0
for airport in self._airports:
print(f"Writing airports … {i / total * 100}% ({i} of {total} airports done, {skipped} skipped)", end="\r")
i += 1
path = os.path.join(output, airport.icao + ".dat")
exists = files.check_exists(path, type="file", exit=False, create=True)
if exists == 1:
if callable(overwrite_func):
overwrite = overwrite_func(path)
if not overwrite:
print(f"Output file {path} for airport exists already - skipping", end=" " * 100 + "\n")
skipped += 1
continue
elif exists == 2:
print(f"Output path {path} for airport is a directory - skipping", end=" " * 100 + "\n")
continue
with open(path, "w") as f:
self._write_header(f)
airport.write(f)
self._write_footer(f)
print(f"Writing airports … {i / total * 100}% ({i} of {total} airports done, {skipped} skipped)", end=" " * 100 + "\n")
return 0
def _write_header(self, f):
f.write("I\n")
f.write(f"1130 {self.file_header}\n")
def _write_footer(self, f):
f.write("99\n")

0
dsf2stg_lookup.py → fgtools/dsf2stg_lookup.py Executable file → Normal file
View File

27
fgtools/fgelev.py Normal file
View File

@ -0,0 +1,27 @@
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import subprocess
class Pipe:
def __init__(self, fgelev, fgscenery, fgdata):
print("Creating pipe to fgelev … ", end="")
sys.stdout.flush()
self.env = os.environ.copy()
self.env["FG_SCENERY"] = os.pathsep.join(fgscenery)
self.env["FG_ROOT"] = fgdata
self.pipe = subprocess.Popen(args=[fgelev, "--expire", "1"], env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.pipe.stdout.flush()
self.pipe.stdout.readline()
self.pipe.stdin.flush()
self.pipe.stdin.flush()
print("done")
def get_elevation(self, lon, lat):
elevpipe.stdin.write(f"_ {lon} {lat}\n".encode("utf-8"))
elevpipe.stdin.flush()
elevout = elevpipe.stdout.readline().split()
if len(elevout) == 2:
return float(elevout[1])
else

5
fgtools/geo/__init__.py Normal file
View File

@ -0,0 +1,5 @@
#!/usr/bin/env python
#-*- coding:utf-8 -*-
EARTH_RADIUS = 6378138.12

55
fgtools/geo/coord.py Normal file
View File

@ -0,0 +1,55 @@
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import math
from fgtools.utils import unit_convert, wrap_period
from fgtools.geo import EARTH_RADIUS
class Coord:
def __init__(self, lon, lat):
self.lon = lon
self.lat = lat
def distance_m(self, other):
lon1, lat1, lon2, lat2 = map(math.radians, (self.lon, self.lat, other.lon, other.lat))
return abs(EARTH_RADIUS * math.acos(round(math.sin(lat1) * math.sin(lat2) + math.cos(lat1) * math.cos(lat2) * math.cos(lon1 - lon2), 14)))
def distance_km(self, other):
return self.distance_m(other) / 1000
def distance_ft(self, other):
return unit_convert.m2ft(self.distance_m(other))
def angle(self, other):
dlon = (math.radians(other.lon) - math.radians(self.lon))
x = math.sin(dlon) * math.cos(math.radians(other.lat))
y = (math.cos(math.radians(self.lat)) * math.sin(math.radians(other.lat)) -
math.sin(math.radians(self.lat)) * math.cos(math.radians(other.lat)) * math.cos(dlon))
angle = math.degrees(math.fmod(math.atan2(x, y), 2 * math.pi))
return wrap_period(angle, 0, 360)
def apply_angle_distance_m(self, angle, distance):
lon = math.radians(self.lon)
lat = math.radians(self.lat)
heading = math.radians(angle)
distance /= EARTH_RADIUS
if distance < 0:
distance = abs(distance)
heading -= math.pi
lat = math.asin(math.sin(lat) * math.cos(distance) + math.cos(lat) * math.sin(distance) * math.cos(heading))
if math.cos(lat) > 1e-15:
lon = math.pi - math.fmod(math.pi - lon - math.asin(math.sin(heading) * math.sin(distance) / math.cos(lat)), (2 * math.pi))
lon = math.degrees(lon)
lat = math.degrees(lat)
if lon > 180:
lon -= 360
elif lon < -180:
lon += 360
return Coord(lon, lat)

27
fgtools/geo/rectangle.py Normal file
View File

@ -0,0 +1,27 @@
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from fgtools.geo import coord
class Rectangle:
def __init__(self, ll, ur):
if not (isinstance(ll, coord.Coord) and isinstance(ur, coord.Coord)):
raise TypeError("loer left or upper right coordinate is not of type fgtools.geo.coord.Coord")
self.ll = ll
self.ur = ur
def midpoint(self):
return coord.Coord((self.ll.lon + self.ur.lon) / 2, (self.ll.lat + self.ur.lat) / 2)
def is_inside(self, coord):
return self.ll.lon <= coord.lon <= self.ur.lon and self.ll.lat <= coord.lat <= self.ur.lat
def diagonal_m(self):
return self.ll.distance_m(self.ur)
def length_m(self):
return self.ur.distance_m(Coord(self.ur.lon, self.ll.lat))
def width_m(self):
return self.ll.distance_m(Coord(self.ur.lon, self.ll.lat))

0
fgtools/math/__init__.py Normal file
View File

17
fgtools/math/coord.py Normal file
View File

@ -0,0 +1,17 @@
#!/usr/bin/env python
#-*- coding:utf-8 -*-
class Coord:
def __init__(self, x, y):
self.x = x
self.y = y
def distance(self, other):
return math.sqrt((other.x - self.x) ** 2 + (other.y - self.y) ** 2)
def angle(self, other):
return math.degrees(math.atan2(other.x - self.x, other.y - self.y))
def apply_angle_distance(self, angle, distance):
angle_rad = math.pi / 2 - math.radians(angle)
return Coord(self.x + distance * math.cos(angle_rad), self.y + distance * math.sin(angle_rad))

27
fgtools/math/rectangle.py Normal file
View File

@ -0,0 +1,27 @@
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from fgtools.math import coord
class Rectangle:
def __init__(self, ll, ur):
if not (isinstance(ll, coord.Coord) and isinstance(ur, coord.Coord):
raise TypeError("loer left or upper right coordinate is not of type fgtools.math.coord.Coord")
self.ll = ll
self.ur = ur
def midpoint(self):
return coord.Coord((self.ll.x + self.ur.x) / 2, (self.ll.y + self.ur.y) / 2)
- def is_inside(self, coord):
return self.ll.x <= coord.x <= self.ur.x and self.ll.y <= coord.y <= self.ur.y
def diagonal(self):
return self.ll.distance(self.ur)
def length(self):
return self.ur.distance(Coord(self.ur.x, self.ll.y))
def width(self):
return self.ll.distance(Coord(self.ur.x, self.ll.y))

View File

@ -2,9 +2,11 @@
#-*- coding:utf-8 -*- #-*- coding:utf-8 -*-
import os import os
import sys from appdirs import user_cache_dir
HOME = os.environ.get("HOME", os.path.expanduser("~")) HOME = os.environ.get("HOME", os.path.expanduser("~"))
CACHEDIR = os.environ.get("FGTOOLS_CACHEDIR", user_cache_dir("fgtools", "TheEagle"))
os.makedirs(CACHEDIR, exist_ok=True)
__version__ = (1, 0, 0) __version__ = (1, 0, 0)
__versionstr__ = ".".join(map(str, __version__)) __versionstr__ = ".".join(map(str, __version__))

68
fgtools/utils/files.py Normal file
View File

@ -0,0 +1,68 @@
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import os
import logging
from fgtools.utils import isiterable
def find_input_files(paths, prefix="", suffix=""):
if not isiterable(paths):
if isinstance(paths, str):
paths = [paths]
else:
raise TypeError("paths is not iterable / not a string")
files = []
for path in paths:
if os.path.isfile(path) and os.path.split(path)[-1].startswith(prefix) and path.endswith(suffix):
files.append(path)
elif os.path.isdir(path):
files += find_input_files([os.path.join(path, s) for s in os.listdir(path)])
else:
print(f"Input file / directory {path} does not exist - skipping")
return files
def write_xml_header(f):
f.write('<?xml version="1.0" encoding="UTF-8"?>\n')
def check_exists(path, exit=True, type="file", create=True):
if type not in ("file", "dir"):
raise ValueError(f'check_exists() got an unrecognized value {type} for argument "type" - must be either "file" or "dir" !')
if not path:
raise ValueError(f'check_exists() got an empty path !')
action = ("skipping", "exiting")[exit]
# logging.fatal returns nothing so this will always succeed
func = (logging.warn, lambda s: logging.fatal(s) or sys.exit(1))[exit]
if os.path.isfile(path):
if type == "file":
return 1
else:
func(f"Path {path} is not a file - {action} !")
return 2
elif os.path.isdir(path):
if type == "dir":
return 1
else:
func(f"Path {path} is not a directory - {action} !")
return 2
else:
if create:
logging.info(f"Creating non-existent path {path}")
if type == "file":
parts = os.path.split(path)
if len(parts) > 1:
os.makedirs(os.path.join(*parts[:-1]), exist_ok=True)
else:
os.makedirs(".", exist_ok=True)
with open(path, "a") as f:
pass
else:
os.makedirs(path, exist_ok=True)
return 3
else:
func(f"Path {path} does not exist - {action} !")
return 0

View File

@ -8,7 +8,7 @@ import statistics
from fgtools.utils.files import find_input_files from fgtools.utils.files import find_input_files
from fgtools import utils from fgtools import utils
from fgtools.utils import geo from fgtools.geo import coord
from fgtools.utils import unit_convert from fgtools.utils import unit_convert
def format_coord(coord, lonlat): def format_coord(coord, lonlat):
@ -19,10 +19,7 @@ def format_coord(coord, lonlat):
return f"{prefix}{i} {f * 60:.8f}" return f"{prefix}{i} {f * 60:.8f}"
def get_icao_xml_path(icao, what): def get_icao_xml_path(icao, what):
if len(icao) == 3: return f"{icao[0]}/{icao[1]}/{icao[2]}/{icao}.{what}.xml"
return f"{icao[0]}/{icao[1]}/{icao}.{what}.xml"
else:
return f"{icao[0]}/{icao[1]}/{icao[2]}/{icao}.{what}.xml"
class Parking: class Parking:
def __init__(self, index, type, name, lon, lat, hdg, radius=7.5, pushback_route=-1, airline_codes=[]): def __init__(self, index, type, name, lon, lat, hdg, radius=7.5, pushback_route=-1, airline_codes=[]):
@ -42,7 +39,7 @@ class Parking:
def __repr__(self): def __repr__(self):
s = (f' <Parking index="{self.index}" type="{self.type}" name="{self.name}"' + s = (f' <Parking index="{self.index}" type="{self.type}" name="{self.name}"' +
f' lon="{format_coord(self.lon, "lon")}" lat="{format_coord(self.lat, "lat")}"' + f' lat="{format_coord(self.lat, "lat")}" lon="{format_coord(self.lon, "lon")}"' +
f' heading="{self.hdg}" radius="{self.radius}"') f' heading="{self.hdg}" radius="{self.radius}"')
if self.pushback_route > -1: if self.pushback_route > -1:
s += f' pushBackRoute="{self.pushback_route}"' s += f' pushBackRoute="{self.pushback_route}"'
@ -63,66 +60,69 @@ class TaxiNode:
return self.on_runway != None return self.on_runway != None
def __repr__(self): def __repr__(self):
return (f' <node index="{self.index}" lon="{format_coord(self.lon, "lon")}" ' + return (f' <node index="{self.index}" lat="{format_coord(self.lat, "lat")}"' +
f'lat="{format_coord(self.lat, "lat")}" isOnRunway="{int(self.on_runway)}" ' + f' lon="{format_coord(self.lon, "lon")}" isOnRunway="{int(self.on_runway)}"' +
f'holdPointType="{self.holdPointType}"/>\n') f' holdPointType="{self.holdPointType}"/>\n')
class TaxiEdge: class TaxiEdge:
def __init__(self, begin, end, is_on_runway, name): def __init__(self, begin, end, bidirectional, is_on_runway, name):
self.begin = begin self.begin = begin
self.end = end self.end = end
self.name = name self.name = name
self.bidirectional = bidirectional
self.is_on_runway = is_on_runway self.is_on_runway = is_on_runway
self.is_pushback_route = False self.is_pushback_route = False
def __bool__(self): def __bool__(self):
return self.is_pushback_route != None return self.is_pushback_route != None
def __contains__(self, node):
return node.index in (self.begin, self.end)
def __repr__(self): def __repr__(self):
return (f' <arc begin="{self.begin}" end="{self.end}" ' + s = (f' <arc begin="{self.begin}" end="{self.end}" ' +
f'isPushBackRoute="{int(self.is_pushback_route)}" name="{self.name}"/>\n') f'isPushBackRoute="{int(self.is_pushback_route)}" name="{self.name}"/>\n')
if self.bidirectional:
s += (f' <arc begin="{self.end}" end="{self.begin}" ' +
f'isPushBackRoute="{int(self.is_pushback_route)}" name="{self.name}"/>\n')
return s
class Runway: class Runway:
def __init__(self, id1, lon1, lat1, displ1, stopway1, id2, lon2, lat2, displ2, stopway2): def __init__(self, id1, lon1, lat1, displ1, stopway1, id2, lon2, lat2, displ2, stopway2):
self.lon1 = lon1 self.coord1 = coord.Coord(lon1, lat1)
self.lat1 = lat1
self.id1 = id1 self.id1 = id1
self.displ1 = displ1 self.displ1 = displ1
self.stopway1 = stopway1 self.stopway1 = stopway1
self.lon2 = lon2 self.coord2 = coord.Coord(lon2, lat2)
self.lat2 = lat2
self.id2 = id2 self.id2 = id2
self.displ2 = displ2 self.displ2 = displ2
self.stopway2 = stopway2 self.stopway2 = stopway2
def get_length_m(self): def get_length_m(self):
return geo.great_circle_distance_m(self.lon1, self.lat1, self.lon2, self.lat2) return self.coord1.distance_m(self.coord2)
def get_length_ft(self): def get_length_ft(self):
return unit_convert.m2ft(self.get_length_m()) return unit_convert.m2ft(self.get_length_m())
def get_heading1_deg(self): def get_heading1_deg(self):
return geo.get_bearing_deg(self.lon1, self.lat1, self.lon2, self.lat2) return self.coord2.angle(self.coord1)
def get_heading2_deg(self): def get_heading2_deg(self):
brg = self.get_heading1_deg() + 180 return self.coord1.angle(self.coord2)
while brg >= 360:
brg -= 360
return brg
def __repr__(self): def __repr__(self):
return f""" <runway> return f""" <runway>
<threshold> <threshold>
<lon>{self.lon1}</lon> <lat>{self.coord1.lat}</lat>
<lat>{self.lat1}</lat> <lon>{self.coord1.lon}</lon>
<rwy>{self.id1}</rwy> <rwy>{self.id1}</rwy>
<hdg-deg>{self.get_heading1_deg():.2f}</hdg-deg> <hdg-deg>{self.get_heading1_deg():.2f}</hdg-deg>
<displ-m>{self.displ1}</displ-m> <displ-m>{self.displ1}</displ-m>
<stopw-m>{self.stopway1}</stopw-m> <stopw-m>{self.stopway1}</stopw-m>
</threshold> </threshold>
<threshold> <threshold>
<lon>{self.lon2}</lon> <lat>{self.coord2.lat}</lat>
<lat>{self.lat2}</lat> <lon>{self.coord2.lon}</lon>
<rwy>{self.id2}</rwy> <rwy>{self.id2}</rwy>
<hdg-deg>{self.get_heading2_deg():.2f}</hdg-deg> <hdg-deg>{self.get_heading2_deg():.2f}</hdg-deg>
<displ-m>{self.displ2}</displ-m> <displ-m>{self.displ2}</displ-m>
@ -144,8 +144,8 @@ class Tower:
def __repr__(self): def __repr__(self):
return f""" <tower> return f""" <tower>
<twr> <twr>
<lon>{self.lon}</lon>
<lat>{self.lat}</lat> <lat>{self.lat}</lat>
<lon>{self.lon}</lon>
<elev-m>{self.agl}</elev-m> <elev-m>{self.agl}</elev-m>
</twr> </twr>
</tower> </tower>
@ -185,8 +185,8 @@ class ILS:
if None not in (self.lon1, self.lat1, self.rwy1, self.hdg1, self.elev1, self.ident1): if None not in (self.lon1, self.lat1, self.rwy1, self.hdg1, self.elev1, self.ident1):
s += f""" <ils> s += f""" <ils>
<lon>{self.lon1}</lon>
<lat>{self.lat1}</lat> <lat>{self.lat1}</lat>
<lon>{self.lon1}</lon>
<rwy>{self.rwy1}</rwy> <rwy>{self.rwy1}</rwy>
<hdg-deg>{self.hdg1:.2f}</hdg-deg> <hdg-deg>{self.hdg1:.2f}</hdg-deg>
<elev-m>{self.elev1}</elev-m> <elev-m>{self.elev1}</elev-m>
@ -195,8 +195,8 @@ class ILS:
""" """
if None not in (self.lon2, self.lat2, self.rwy2, self.hdg2, self.elev2, self.ident2): if None not in (self.lon2, self.lat2, self.rwy2, self.hdg2, self.elev2, self.ident2):
s += """ <ils> s += """ <ils>
<lon>{self.lon2}</lon>
<lat>{self.lat2}</lat> <lat>{self.lat2}</lat>
<lon>{self.lon2}</lon>
<rwy>{self.rwy2}</rwy> <rwy>{self.rwy2}</rwy>
<hdg-deg>{self.hdg2:.2f}</hdg-deg> <hdg-deg>{self.hdg2:.2f}</hdg-deg>
<elev-m>{self.elev2}</elev-m> <elev-m>{self.elev2}</elev-m>
@ -283,25 +283,26 @@ def parse_aptdat_files(files, nav_dat, print_runway_lengths):
elif line[0] == 14: elif line[0] == 14:
towers[icao] = Tower(float(line[2]), float(line[1]), float(line[3])) towers[icao] = Tower(float(line[2]), float(line[1]), float(line[3]))
elif line[0] == 1201: # taxi node elif line[0] == 1201: # taxi node
taxi_nodes[icao].append(TaxiNode(float(line[2]), float(line[1]), len(taxi_nodes[icao]))) taxi_nodes[icao].append(TaxiNode(float(line[2]), float(line[1]), int(line[4]) + len(parkings[icao])))
elif line[0] == 1202: # taxi edge elif line[0] == 1202: # taxi edge
if len(line) == 6: if len(line) == 6:
taxi_edges[icao].append(TaxiEdge(int(line[1]), int(line[2]), line[4] == "runway", line[5])) edge = TaxiEdge(int(line[1]) + len(parkings[icao]), int(line[2]) + len(parkings[icao]), line[3] == "twoway", line[4] == "runway", line[5])
if edge.begin != edge.end:
taxi_edges[icao].append(edge)
if not icao in towers and len(runways[icao]) > 0: if not icao in towers and len(runways[icao]) > 0:
runway_lons = [] runway_lons = []
runway_lats = [] runway_lats = []
runway_hdgs = [] runway_hdgs = []
for runway in runways[icao]: for runway in runways[icao]:
runway_lons += [runway.lon1, runway.lon2] runway_lons += [runway.coord1.lon, runway.coord2.lon]
runway_lats += [runway.lat1, runway.lat2] runway_lats += [runway.coord1.lat, runway.coord2.lat]
runway_hdgs.append(runway.get_heading1_deg()) runway_hdgs.append(runway.get_heading1_deg())
tower_lon, tower_lat = geo.apply_heading_distance(statistics.median(runway_lons), tower_pos = (coord.Coord(statistics.median(runway_lons), statistics.median(runway_lats))
statistics.median(runway_lats), .apply_angle_distance_m(statistics.median(runway_hdgs) + 90, 200))
statistics.median(runway_hdgs) + 90, 50) towers[icao] = Tower(tower_pos.lon, tower_pos.lat, 15)
towers[icao] = Tower(tower_lon, tower_lat, 15)
if not parkings[icao]: if not parkings[icao]:
del parkings[icao] del parkings[icao]
@ -352,10 +353,11 @@ def write_groundnet_files(parkings, taxi_nodes, taxi_edges, output, overwrite):
if len(taxi_nodes[icao]) > 0 and len(taxi_edges[icao]) > 0: if len(taxi_nodes[icao]) > 0 and len(taxi_edges[icao]) > 0:
f.write(" <TaxiNodes>\n") f.write(" <TaxiNodes>\n")
for edge in taxi_edges[icao]: for edge in taxi_edges[icao]:
taxi_nodes[icao][edge.begin].is_on_runway = edge.is_on_runway taxi_nodes[icao][edge.begin - len(parkings[icao])].is_on_runway = edge.is_on_runway
taxi_nodes[icao][edge.end].is_on_runway = edge.is_on_runway taxi_nodes[icao][edge.end - len(parkings[icao])].is_on_runway = edge.is_on_runway
for node in taxi_nodes[icao]: for node in taxi_nodes[icao]:
f.write(repr(node)) if any(node in edge for edge in taxi_edges[icao]):
f.write(repr(node))
f.write(" </TaxiNodes>\n") f.write(" </TaxiNodes>\n")
f.write(" <TaxiWaySegments>\n") f.write(" <TaxiWaySegments>\n")

149
scenery/fix-aptdat-icaos.py Normal file
View File

@ -0,0 +1,149 @@
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import os
import sys
import csv
import requests
import argparse
import shutil
from fgtools.utils.files import find_input_files
from fgtools.geo.coord import Coord
from fgtools.utils import constants
def _get_ourairports_csv(what):
path = os.path.join(constants.CACHEDIR, what + ".csv")
if not os.path.isfile(path):
with open(path, "w") as f:
f.write(requests.get(f"https://davidmegginson.github.io/ourairports-data/{what}.csv").content.decode())
f = open(path, "r", newline="")
return list(csv.DictReader(f))[1:]
def get_ourairports_icao(airport, csv):
c = Coord(airport["lon"], airport["lat"])
matches = []
for line in csv:
d = c.distance_km(Coord(float(line["longitude_deg"]), float(line["latitude_deg"])))
if d <= 10:
matches.append({"distance": d, "icao": line["gps_code"] or line["local_code"] or line["ident"]})
matches.sort(key=lambda m: m["distance"])
if len(matches) and matches[0]["icao"]:
if len(airport["icao"]) != 4:
airport["newicao"] = matches[0]["icao"]
else:
print(f"No matching airport found for {airport['icao']} - skipping", end=" " * 100 + "\n")
return airport
def process(files, output):
csv = _get_ourairports_csv("airports")
i = 0
n = 0
skipped = 0
total = len(files)
files_d = {}
for p in files:
print(f"Parsing files … {i / total * 100:.1f}% ({i} of {total} done, found {n} airports)", end="\r")
i += 1
file_d = {"lines": [], "airports": {}}
with open(p, "r") as f:
file_d["lines"] = list(map(lambda l: list(filter(None, l)), map(lambda s: s.split(" "), filter(None, map(str.strip, f.readlines())))))
curicao = ""
for line in file_d["lines"]:
if line[0] in ("1", "16", "17"):
curicao = line[4]
skip = False
file_d["airports"][curicao] = {"icao": curicao}
n += 1
elif line[0] == "1302":
if line[1] == "datum_lon":
if len(line) < 3:
continue
file_d["airports"][curicao]["lon"] = float(line[2])
if line[1] == "datum_lat":
if len(line) < 3:
continue
file_d["airports"][curicao]["lat"] = float(line[2])
elif line[0] in ("100", "101", "102") and not ("lon" in file_d["airports"][curicao] and "lat" in file_d["airports"][curicao]):
# no datum_lon / datum_lat found, approximate airport position from first runway / helipad found
if line[0] == "100": # land runway
lon = (float(line[10]) + float(line[19])) / 2
lat = (float(line[9]) + float(line[18])) / 2
elif line[0] == "101": # water runway
lon = (float(line[5]) + float(line[8])) / 2
lat = (float(line[4]) + float(line[7])) / 2
else: # helipad
lon = float(line[3])
lat = float(line[2])
file_d["airports"][curicao]["lon"] = lon
file_d["airports"][curicao]["lat"] = lat
for icao in list(file_d["airports"].keys()):
if not ("lon" in file_d["airports"][icao] and "lat" in file_d["airports"][icao]):
print(f"Unable to get longitude / latitude of airport {curicao} in file {p} - skipping", end=" " * 100 + "\n")
del file_d["airports"][icao]
n -= 1
skipped += 1
files_d[p] = file_d
print(f"Parsing files … {i / total * 100:.1f}% ({i} of {total} done, found {n} airports, skipped {skipped})", end=" " * 100 + "\n")
i = 0
total = n
for p in files_d:
for icao in files_d[p]["airports"]:
print(f"Getting ICAOs for airports … {i / total * 100:.1f}% ({i} of {total} done)", end="\r")
i += 1
files_d[p]["airports"][icao] = get_ourairports_icao(files_d[p]["airports"][icao], csv)
print(f"Getting ICAOs for airports … {i / total * 100:.1f}% ({i} of {total} done)", end=" " * 100 + "\n")
i = 0
total = len(files_d)
for p in files_d:
print(f"Writing new apt.dat files … {i / total * 100:.1f}% ({i} of {total} done)", end="\r")
i += 1
if output == None:
outp = p
else:
outp = os.path.join(output, os.path.split(p)[-1])
parts = os.path.split(outp)
prefix, newname = os.path.join(*parts[:-1]), parts[-1]
if len(files_d[p]["airports"]) > 0 and newname != "apt.dat":
firsticao = list(files_d[p]["airports"].keys())[0]
if "newicao" in files_d[p]["airports"][firsticao]:
newname = files_d[p]["airports"][firsticao]["newicao"] + ".dat"
newoutp = os.path.join(prefix, newname)
with open(outp, "w") as f:
for line in files_d[p]["lines"]:
if line[0] in ("1", "16", "17") and line[4] in files_d[p]["airports"] and "newicao" in files_d[p]["airports"][line[4]]:
line[4] = files_d[p]["airports"][line[4]]["newicao"]
f.write(" ".join(line) + "\n")
if outp != newoutp:
print(f"Renaming file: {outp} -> {newoutp}", end=" " * 100 + "\n")
shutil.move(outp, newoutp)
print(f"Writing new apt.dat files … {i / total * 100:.1f}% ({i} of {total} done)", end=" " * 100 + "\n")
if __name__ == "__main__":
argp = argparse.ArgumentParser(description="Fix apt.dat ICAO's - some apt.dat files from the XPlane gateway have them of the form XAY0016 - this script gets the right ICAO from OurAirports data (if the airport is found there)")
argp.add_argument(
"-i", "--input",
help="Input apt.dat file(s) / folder(s) containing apt.dat files",
required=True,
nargs="+"
)
argp.add_argument(
"-o", "--output",
help="Output folder for the modified apt.dat files - omit to edit the files in-place",
default=None
)
args = argp.parse_args()
infiles = find_input_files(args.input)
process(infiles, args.output)

490
scenery/osm2aptdat.py Normal file
View File

@ -0,0 +1,490 @@
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import os
import sys
import argparse
import requests
import csv
import re
import logging
import math
from OSMPythonTools import overpass
from fgtools.geo import coord, rectangle
from fgtools.utils import files
from fgtools import aptdat
from fgtools.utils import constants
from fgtools.utils import unit_convert
osmapi = overpass.Overpass()
def parse_runway_id(id):
which, heading = "", 0
if id[-1] in ("L", "C", "R"):
which = id[-1]
try:
if which:
heading = int(id[:-1])
else:
heading = int(id)
except ValueError:
heading = 0
return heading * 10, which
def parse_surface_type(surface):
if re.search("pem|mac|sealed|bit|asp(h)?(alt)?|tarmac", surface) or surface in ("b"):
surface = "Asphalt"
elif re.search("wood|cement|bri(ck)?|hard|paved|pad|psp|met|c[o0]n(c)?", surface):
surface = "Concrete"
elif re.search("rock|gvl|grvl|gravel|pi(ç|c)", surface):
surface = "Gravel"
elif re.search("tr(ea)?t(e)?d|san(d)?|ter|none|cor|so(ft|d|il)|earth|cop|com|per|ground|silt|cla(y)?|dirt|turf", surface):
surface = "Dirt"
elif re.search("gr(a*)?s|gre", surface) or surface in ("g"):
surface = "Grass"
elif re.search("wat(er)?", surface):
surface = "Water"
elif re.search("sno|ice", surface):
surface = "SnowIce"
else:
surface = "Unknown"
return surface
def _get_ourairports_csv(what):
path = os.path.join(constants.CACHEDIR, what + ".csv")
if not os.path.isfile(path):
with open(path, "w") as f:
f.write(requests.get(f"https://davidmegginson.github.io/ourairports-data/{what}.csv").content.decode())
f = open(path, "r", newline="")
return list(csv.DictReader(f))[1:]
def get_ourairports_airports(bbox=None, icaos=[]):
if not (bbox or icaos):
raise TypeError("both bbox and icaos are None")
csv = _get_ourairports_csv("airports")
airports = []
print("Creating airports from OurAirports data … ", end="")
for line in csv:
type = aptdat.AirportType.Land
if "sea" in line["type"]:
type = aptdat.AirportType.Sea
elif "heli" in line["type"]:
type = aptdat.AirportType.Heli
code = line["gps_code"] or line["local_code"] or line["ident"]
if not code:
continue
airport = {"ident": line["ident"], "airport": aptdat.Airport(int(line["elevation_ft"] or 0), code,
line["name"], float(line["longitude_deg"]),
float(line["latitude_deg"]), type=type)}
if code in icaos:
icaos.remove(code)
airports.append(airport)
elif bbox and bbox.is_inside(coord.Coord(airport["airport"].lon, airport["airport"].lat)):
airports.append(airport)
print(f"done - found {len(airports)} airports for the given bounding box / ICAO")
return airports
def get_osm_elements_near_airport(airport, what, query, element_type, radius=10000, max_retries=10):
left = coord.Coord(airport.lon, 0).apply_angle_distance_m(-90, radius).lon
right = coord.Coord(airport.lon, 0).apply_angle_distance_m(90, radius).lon
upper = coord.Coord(0, airport.lat).apply_angle_distance_m(0, radius).lat
lower = coord.Coord(0, airport.lat).apply_angle_distance_m(180, radius).lat
query = overpass.overpassQueryBuilder(bbox=[lower, left, upper, right], elementType=element_type,
selector=query, out="center")
result = -1
retries = 0
while result == -1 and retries < max_retries:
try:
if element_type == "node":
result = osmapi.query(query, timeout=60).nodes()
elif element_type == "way":
result = osmapi.query(query, timeout=60).ways()
elif element_type == "relations":
result = osmapi.query(query, timeout=60).relations()
else:
result = []
qresult = osmapi.query(query, timeout=100)
if not qresult:
result = None
break
for etype in element_type:
result += getattr(qresult, etype + "s")() or []
except Exception as e:
if "timeout" in str(e.args).lower():
result = -1
else:
raise e
retries += 1
if result == -1:
print(f"API query for OSM {what} data for airport {airport.icao} timed out {retries} times - won't retry", end=" " * 100 + "\n")
result = []
if result == None:
result = []
print(f"No OSM {what} data found for airport {airport.icao}", end=" " * 100 + "\n")
return result
def add_osm_runways(airport):
result = get_osm_elements_near_airport(airport["airport"], "runway", '"aeroway"="runway"', "way")
osmways = []
for way in result:
first, last = way.nodes()[0], way.nodes()[-1]
if first.id() == last.id():
print("Got a runway mapped as area from OSM - not supported yet", end=" " * 100 + "\n")
continue
first = coord.Coord(first.lon(), first.lat())
last = coord.Coord(last.lon(), last.lat())
heading = first.angle(last)
if heading > 180:
first, last = last, first
heading -= 180
center = rectangle.Rectangle(last, first).midpoint()
distance = coord.Coord(airport["airport"].lon, airport["airport"].lat).distance_m(center)
osmways.append({"distance": distance, "heading": heading, "way": way, "first": first, "last": last})
osmways.sort(key=lambda r: r["distance"])
for i, runway in enumerate(airport["runways"]):
osmways_filtered = []
for osmway in osmways:
if "ref" in osmway["way"].tags():
# water runways somtimes have N, NE, etc. as identifier instead of 36, 04, etc.
mapping = {"N": "36", "NE": "04", "E": "09", "SE": "13", "S": "18", "SW": "22", "W": "27", "NW": "31"}
if runway["le_ident"] in mapping:
runway["le_ident"] = mapping[runway["le_ident"]]
if runway["he_ident"] in mapping:
runway["he_ident"] = mapping[runway["he_ident"]]
if osmway["way"].tags()["ref"] == runway["le_ident"] + "/" + runway["he_ident"]:
osmways_filtered = [osmway]
break
else:
heading, which = parse_runway_id(runway["le_ident"])
if heading <= osmway["heading"] < heading + 10:
if not which:
osmways_filtered = [osmway]
break
else:
osmways_filtered.append(osmway)
if len(osmways_filtered) == 3:
break
if len(osmways_filtered) == 0:
print("No OSM data found for runway", runway["le_ident"], "at airport", airport["airport"].icao, end=" " * 100 + "\n")
elif len(osmways_filtered) == 1: # just one matching runway - nothing left to do
runway["osmway"] = osmways_filtered[0]
elif len(osmways_filtered) == 2: # two parallel runways - sort from left to right and pick the right one
center1 = coord.Coord(osmways_filtered[0].centerLon(), osmways_filtered[0].centerLat())
center2 = coord.Coord(osmways_filtered[1].centerLon(), osmways_filtered[1].centerLat())
heading, which = parse_runway_id(runway["le_ident"])
rel_bearing = center1.angle(center2) - heading
index = 0
if rel_bearing > 0:
index = 0 if which == "L" else 1
else:
index = 1 if which == "L" else 0
runway["osmway"] = osmways_filtered[index]
else: # three or more parallel runways - sort the first three from left to right and pick the right one
center1 = coord.Coord(osmways_filtered[0].centerLon(), osmways_filtered[0].centerLat())
center2 = coord.Coord(osmways_filtered[1].centerLon(), osmways_filtered[1].centerLat())
center3 = coord.Coord(osmways_filtered[2].centerLon(), osmways_filtered[2].centerLat())
heading, which = parse_runway_id(runway["le_ident"])
rel_bearing1 = center1.angle(center2) - heading
rel_bearing2 = center2.angle(center3) - heading
index = 0
if rel_bearing1 > 0 and rel_bearing2 > 0:
index = "LCR".find(which)
elif rel_bearing1 <= 0 and rel_bearing2 > 0:
index = "CLR".find(which)
elif rel_bearing1 > 0 and rel_bearing2 <= 0:
index = "LRC".find(which)
else:
index = "RCL".find(which)
runway["osmway"] = osmways_filtered[index]
if not runway["le_longitude_deg"] or not runway["he_longitude_deg"]:
if "osmway" in runway:
runway["le_heading_degT"] = runway["osmway"]["heading"]
runway["he_heading_degT"] = runway["osmway"]["heading"] + 180
runway["le_longitude_deg"] = runway["osmway"]["first"].lon
runway["le_latitude_deg"] = runway["osmway"]["first"].lat
runway["he_longitude_deg"] = runway["osmway"]["last"].lon
runway["he_latitude_deg"] = runway["osmway"]["last"].lat
else:
print("No threshold information found for runway", runway["le_ident"], "at", airport["airport"].icao, "- removing !", end=" " * 100 + "\n")
airport["runways"][i] = None
airport["runways"] = list(filter(None, airport["runways"]))
def add_osm_helipads(airport):
result = get_osm_elements_near_airport(airport["airport"], "helipad", '"aeroway"="helipad"', ["node", "way"])
osmhelipads = []
counter = 0
for element in result:
if element.type() == "node":
c = coord.Coord(element.lon(), element.lat())
radius = 0
else:
lon_sum = lat_sum = 0
divider = 0
for node in element.nodes():
lon_sum += node.lon()
lat_sum += node.lat()
divider += 1
c = coord.Coord(lon_sum / divider, lat_sum / divider)
dist_sum = 0
for node in element.nodes():
dist_sum += c.distance_m(coord.Coord(node.lon(), node.lat()))
radius = dist_sum / divider
surface = ""
if "surface" in element.tags():
surface = element.tags()["surface"]
lit = None
if "lit" in element.tags():
lit = element.tags()["lit"] == "yes"
id = f"H{counter}"
counter += 1
osmhelipads.append({"coord": c, "radius": radius, "surface": surface, "id": id, "lit": None})
with_lon_lat = []
without_lon_lat = []
while len(airport["helipads"]):
helipad = airport["helipads"].pop()
if helipad["le_longitude_deg"] and helipad["le_latitude_deg"]:
with_lon_lat.append(helipad)
else:
without_lon_lat.append(helipad)
for helipad in with_lon_lat:
if len(osmhelipads) > 0:
for osmhelipad in osmhelipads:
osmhelipad["distance"] = osmhelipad["coord"].distance_m(coord.Coord(float(helipad["le_longitude_deg"]),
float(helipad["le_latitude_deg"])))
osmhelipads.sort(key=lambda d: d["distance"])
helipad["osmhelipad"] = osmhelipads.pop(0)
else:
helipad["osmhelipad"] = {}
for osmhelipad in osmhelipads:
osmhelipad["distance"] = osmhelipad["coord"].distance_m(coord.Coord(airport["airport"].lon, airport["airport"].lat))
osmhelipads.sort(key=lambda d: d["distance"])
for i, helipad in enumerate(without_lon_lat):
if len(osmhelipads) > 0:
helipad["osmhelipad"] = osmhelipads.pop(0)
helipad["le_longitude_deg"] = helipad["osmhelipad"]["coord"].lon
helipad["le_latitude_deg"] = helipad["osmhelipad"]["coord"].lat
else:
without_lon_lat[i] = None
if None in without_lon_lat:
print(f"No position information found for {without_lon_lat.count(None)} helipad(s) at {airport['airport'].icao} - removing", end=" " * 100 + "\n")
without_lon_lat = list(filter(None, without_lon_lat))
airport["helipads"] = with_lon_lat + without_lon_lat
for osmhelipad in osmhelipads:
helipad = {"airport_ident": airport["airport"].icao, "le_longitude_deg": osmhelipad["coord"].lon,
"le_latitude_deg": osmhelipad["coord"].lat, "lighted": osmhelipad["lit"], "surface": osmhelipad["surface"],
"length_ft": 0, "width_ft": 0, "osmhelipad": osmhelipad}
def add_ourairports_runways(airports):
csv = _get_ourairports_csv("runways")
i = 0
total = len(airports)
for airport in airports:
print(f"Extracting runways from OurAirports data … {i / total * 100:.1f}% ({i} of {total} airports done)", end="\r")
i += 1
runways = []
helipads = []
for line in csv:
if line["airport_ident"] == airport["ident"]:
if re.match(line["le_ident"], "H[0-9]*"):
helipads.append(line)
else:
runways.append(line)
else:
if runways or helipads:
break
airport["runways"] = runways
airport["helipads"] = helipads
add_osm_runways(airport)
add_osm_helipads(airport)
if len(airport["runways"]) == 0 and len(airport["helipads"]) == 0:
print(f"Removing airport {airport['airport'].icao} since it has no runways / helipads !", end=" " * 100 + "\n")
airports[i - 1] = None
continue
for runway in airport["runways"]:
surface = parse_surface_type(runway["surface"].lower())
if surface == "Unknown":
if "osmway" in runway:
if "surface" in runway["osmway"]["way"].tags():
surface = parse_surface_type(runway["osmway"]["way"].tags()["surface"])
if surface == "Unknown":
if (int(runway["length_ft"] or 0) > 1500 and int(runway["width_ft"] or 0) > 30) or int(runway["lighted"]):
surface = "Asphalt"
else:
surface = "Dirt"
print("Unknown surface type:", runway["surface"], "for runway", runway["le_ident"], "at airport", airport["airport"].icao, "- falling back to", surface, end=" " * 100 + "\n")
runway["surface"] = surface
print(f"Extracting runways from OurAirports data … {i / total * 100:.1f}% ({i} of {total} airports done)", end="\r\n")
airports = list(filter(None, airports))
i = 0
total = len(airports)
for airport in airports:
print(f"Creating runways … {i / total * 100}% ({i} of {total} airports done)", end="\r")
i += 1
for runway in airport["runways"]:
width = runway["width_ft"]
if width != "":
width = float(width)
elif "osmway" in runway and "width" in runway["osmway"]["way"].tags():
width = round(float(runway["osmway"]["way"].tags()["width"]), 2)
else:
print(f"No width found for runway {runway['le_ident']} at airport {airport['airport'].icao} - guessing from length", end=" " * 100 + "\n")
width = math.sqrt(int(runway["length_ft"] or 0))
if runway["surface"] == "water":
runway = aptdat.WaterRunway(unit_convert.ft2m(width),
runway["le_ident"], float(runway["le_longitude_deg"]), float(runway["le_latitude_deg"]),
runway["he_ident"], float(runway["he_longitude_deg"]), float(runway["he_latitude_deg"]),
perimeter_buoys=True)
else:
center_lights = edge_lights = bool(runway["lighted"])
if center_lights and surface not in ("Asphalt", "Concrete"):
center_lights = edge_lights = False
distance_signs = int(runway["length_ft"] or 0) > 4000
tdz_lights = runway["surface"] in ("Asphalt", "Concrete")
markings = aptdat.RunwayMarkingCode.Visual
if runway["surface"] not in ("Asphalt", "Concrete"):
markings = aptdat.RunwayMarkingCode.NoMarkings
elif 4000 < int(runway["length_ft"] or 0) < 6000:
markings = aptdat.RunwayMarkingCode.NonPrecision
elif int(runway["length_ft"] or 0) >= 6000:
markings = aptdat.RunwayMarkingCode.Precision
reil_type = aptdat.REILCode.NoREIL
if markings == aptdat.RunwayMarkingCode.NonPrecision:
reil_type = aptdat.REILCode.UnidirREIL
runway = aptdat.LandRunway(unit_convert.ft2m(width), getattr(aptdat.SurfaceCode, runway["surface"]),
runway["le_ident"], float(runway["le_longitude_deg"]), float(runway["le_latitude_deg"]),
runway["he_ident"], float(runway["he_longitude_deg"]), float(runway["he_latitude_deg"]),
center_lights=center_lights, edge_lights=edge_lights, distance_signs=distance_signs,
displ_thresh1=float(runway["le_displaced_threshold_ft"] or 0), tdz_lights1=tdz_lights,
markings1=markings, reil_type1=reil_type,
displ_thresh2=float(runway["he_displaced_threshold_ft"] or 0), tdz_lights2=tdz_lights,
markings2=markings, reil_type2=reil_type)
airport["airport"].add_runway(runway)
print(f"Creating runways … {i / total * 100}% ({i} of {total} airports done)")
i = 0
for airport in airports:
print("Creating helipads … {i / total * 100}% ({i} of {total} airports done)", end="\r")
i += 1
for helipad in airport["helipads"]:
if helipad["width_ft"]:
width = round(float(helipad["width_ft"]), 2)
elif "radius" in helipad["osmhelipad"]:
width = radius * 2
else:
width = 50
print((f"Unable to get width for for helipad {helipad['le_ident']} at airport {airport['airport'].icao}" +
f" - setting to {width} ft"), end=" " * 100 + "\n")
if helipad["length_ft"]:
length = round(float(helipad["length_ft"]), 2)
elif "radius" in helipad["osmhelipad"]:
length = radius * 2
else:
length = 50
print((f"Unable to get length for for helipad {helipad['le_ident']} at airport {airport['airport'].icao}" +
f" - setting to {length} ft"), end=" " * 100 + "\n")
lighted = bool(int(helipad["lighted"]))
surface = parse_surface_type(helipad["surface"])
if surface == "Unknown" and "surface" in helipad["osmhelipad"]:
surface = parse_surface_type(helipad["osmhelipad"]["surface"])
if surface == "Unknown":
if lighted:
surface = "Asphalt"
else:
surface = "Grass"
print((f"Unknown surface type {helipad['surface']} for helipad {helipad['le_ident']} at" +
f" airport {airport['airport'].icao} - setting to {surface}"), end=" " * 100 + "\n")
if surface not in ("Concrete", "Asphalt"):
lighted = False
print(helipad)
helipad = aptdat.Helipad(helipad["id"], float(helipad["le_longitude_deg"]), float(helipad["le_latitude_deg"]), 0,
unit_convert.ft2m(length), unit_convert.ft2m(width), surface, edge_lights=lighted)
airport["airport"].add_helipad(helipad)
airports[i - 1] = airport["airport"]
return airports
def query_airports_by_icaos(icaos):
# remove doubles
icaos = list(set(icaos))
ourairports = get_ourairports_airports(icaos=icaos)
ourairports = add_ourairports_runways(ourairports)
return ourairports
def query_airports_by_bbox(left, lower, right, upper):
ourairports = get_ourairports_airports(bbox=rectangle.Rectangle(coord.Coord(left, lower), coord.Coord(right, upper)))
ourairports = add_ourairports_runways(ourairports)
return ourairports
def check_aptdat_written_by_this(path):
with open(path, "r") as f:
i = 0
while i < 2:
fl = f.readline()
if fl:
i += 1
if "osm2aptdat.py" in fl:
return True
return False
def write_aptdat_files(output, airports, merge=False):
writer = aptdat.ReaderWriterAptDat(file_header="Generated from OSM and OurAirports data by fgtools.osm2aptdat.py")
writer.add_airports(airports)
writer.write(output, merge=merge, overwrite_func=check_aptdat_written_by_this)
if __name__ == "__main__":
logging.getLogger("OSMPythonTools").setLevel(logging.FATAL)
argp = argparse.ArgumentParser(description="query airports from OSM and convert the results to apt.dat files")
bbox_icao_group = argp.add_mutually_exclusive_group(required=True)
bbox_icao_group.add_argument(
"-b", "--bbox",
help="GPS coordinates of the lower left and upper right corners of the bounding box within which all airports should be processed",
nargs=4,
metavar=("LL_LON", "LL_LAT", "UR_LON", "UR_LAT"),
type=float,
)
bbox_icao_group.add_argument(
"-i", "--icao",
help="ICAO code(s) of the airport(s) to process",
nargs="+"
)
argp.add_argument(
"-m", "--merge",
help="Merge all airports into one big apt.dat file instead of writing one file for each airport",
action="store_true"
)
argp.add_argument(
"-o", "--output",
help="directory to put apt.dat files into",
required=True
)
args = argp.parse_args()
if args.icao:
airports = query_airports_by_icaos(args.icao)
else:
airports = query_airports_by_bbox(left=args.bbox[0], lower=args.bbox[1], right=args.bbox[2], upper=args.bbox[3])
write_aptdat_files(args.output, airports, merge=args.merge)

View File

@ -1,28 +0,0 @@
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import os
from fgtools.utils import isiterable
def find_input_files(paths, prefix="", suffix=""):
if not isiterable(paths):
if isinstance(paths, str):
paths = [paths]
else:
raise TypeError("paths is not iterable")
files = []
for path in paths:
if os.path.isfile(path) and os.path.split(path)[-1].startswith(prefix) and path.endswith(suffix):
files.append(path)
elif os.path.isdir(path):
files += find_input_files([os.path.join(path, s) for s in os.listdir(path)])
else:
print(f"Input file / directory {path} does not exist - skipping")
return files
def write_xml_header(f):
f.write('<?xml version="1.0" encoding="UTF-8"?>\n')

View File

@ -1,42 +0,0 @@
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import math
from fgtools.utils import wrap_period
EARTH_RADIUS = 6378138.12
def great_circle_distance_m(lon1, lat1, lon2, lat2):
lon1, lat1, lon2, lat2 = map(math.radians, (lon1, lat1, lon2, lat2))
return abs(EARTH_RADIUS * math.acos(math.sin(lat1) * math.sin(lat2) + math.cos(lat1) * math.cos(lat2) * math.cos(lon1 - lon2)))
def great_circle_distance_km(lon1, lat1, lon2, lat2):
return great_circle_distance_m(lon1, lat1, lon2, lat2) / 1000
def get_bearing_deg(lon1, lat1, lon2, lat2):
dlon = (lon2 - lon1)
x = math.cos(math.radians(lat2)) * math.sin(math.radians(dlon))
y = math.cos(math.radians(lat1)) * math.sin(math.radians(lat2)) - math.sin(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.cos(math.radians(dlon))
brg = math.atan2(x, y)
brg = math.degrees(brg)
return wrap_period(brg, 0, 360)
def apply_heading_distance(lon, lat, heading, distance):
lon = math.radians(lon)
lat = math.radians(lat)
heading = math.radians(heading)
distance /= EARTH_RADIUS
if distance < 0:
distance = abs(distance)
heading -= math.pi
lat = math.asin(math.sin(lat) * math.cos(distance) + math.cos(lat) * math.sin(distance) * math.cos(heading))
if math.cos(lat) > 1e-15:
lon = math.pi - (math.pi - lon - math.asin(math.sin(heading) * math.sin(distance) / math.cos(lat)) % (2 * math.pi))
return wrap_period(math.degrees(lon), -180, 180), wrap_period(math.degrees(lat), -90, 90)