cythonize common

This commit is contained in:
Xavier Olive 2019-10-29 10:00:01 +01:00
parent b04a1bd49c
commit eb675d5ca3
4 changed files with 442 additions and 0 deletions

1
pyModeS/c_decoder/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
*.c

View File

View File

@ -0,0 +1,431 @@
# cython: language_level=3
cimport cython
from cpython cimport array
from cpython.bytes cimport PyBytes_GET_SIZE
from cpython.bytearray cimport PyByteArray_GET_SIZE
from libc.math cimport cos, acos, fabs, M_PI as pi, floor as c_floor
cdef int char_to_int(unsigned char binstr):
if 48 <= binstr <= 57: # 0 to 9
return binstr - 48
if 97 <= binstr <= 102: # a to f
return binstr - 97 + 10
if 65 <= binstr <= 70: # A to F
return binstr - 65 + 10
return 0
cdef unsigned char int_to_char(unsigned char i):
if i < 10:
return 48 + i # "0" + i
return 97 - 10 + i # "a" - 10 + i
@cython.boundscheck(False)
@cython.overflowcheck(False)
cpdef bytearray hex2bin(bytes hexstr):
"""Convert a hexdecimal string to binary string, with zero fillings."""
# num_of_bits = len(hexstr) * 4
cdef Py_ssize_t len_hexstr = PyBytes_GET_SIZE(hexstr)
# binstr = bin(int(hexstr, 16))[2:].zfill(int(num_of_bits))
cdef bytearray _binstr = bytearray(4 * len_hexstr)
cdef unsigned char[:] binstr = _binstr
cdef unsigned char int_
cdef Py_ssize_t i
for i in range(len_hexstr):
int_ = char_to_int(hexstr[i])
binstr[4*i] = int_to_char((int_ >> 3) & 1)
binstr[4*i+1] = int_to_char((int_ >> 2) & 1)
binstr[4*i+2] = int_to_char((int_ >> 1) & 1)
binstr[4*i+3] = int_to_char((int_) & 1)
return _binstr
@cython.boundscheck(False)
cpdef long bin2int(bytearray binstr):
"""Convert a binary string to integer."""
# return int(binstr, 2)
cdef Py_ssize_t len_ = PyByteArray_GET_SIZE(binstr)
cdef long cumul = 0
cdef unsigned char[:] v_binstr = binstr
for i in range(len_):
cumul = 2*cumul + char_to_int(v_binstr[i])
return cumul
@cython.boundscheck(False)
cpdef long hex2int(bytearray binstr):
"""Convert a binary string to integer."""
# return int(binstr, 2)
cdef Py_ssize_t len_ = PyByteArray_GET_SIZE(binstr)
cdef long cumul = 0
cdef unsigned char[:] v_binstr = binstr
for i in range(len_):
cumul = 16*cumul + char_to_int(v_binstr[i])
return cumul
@cython.boundscheck(False)
cpdef unsigned char df(bytes msg):
"""Decode Downlink Format vaule, bits 1 to 5."""
cdef bytearray dfbin = hex2bin(msg[:2])
# return min(bin2int(dfbin[0:5]), 24)
cdef long df = bin2int(dfbin[0:5])
if df > 24:
return 24
return df
# the CRC generator
# G = [int("11111111", 2), int("11111010", 2), int("00000100", 2), int("10000000", 2)]
cdef array.array _G = array.array('l', [0b11111111, 0b11111010, 0b00000100, 0b10000000])
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef long crc(bytes msg, bint encode=False):
"""Mode-S Cyclic Redundancy Check.
Detect if bit error occurs in the Mode-S message. When encode option is on,
the checksum is generated.
Args:
msg (string): 28 bytes hexadecimal message string
encode (bool): True to encode the date only and return the checksum
Returns:
int: message checksum, or partity bits (encoder)
"""
# the CRC generator
# G = [int("11111111", 2), int("11111010", 2), int("00000100", 2), int("10000000", 2)]
# cdef array.array _G = array.array('l', [0b11111111, 0b11111010, 0b00000100, 0b10000000])
cdef long[4] G = _G
# msgbin_split = wrap(msgbin, 8)
# mbytes = list(map(bin2int, msgbin_split))
cdef bytearray _msgbin = hex2bin(msg)
cdef unsigned char[:] msgbin = _msgbin
cdef Py_ssize_t len_msgbin = PyByteArray_GET_SIZE(_msgbin)
cdef Py_ssize_t len_mbytes = len_msgbin // 8
cdef Py_ssize_t i
if encode:
for i in range(len_msgbin - 24, len_msgbin):
msgbin[i] = 0
cdef array.array _mbytes = array.array(
'l', [bin2int(_msgbin[8*i:8*i+8]) for i in range(len_mbytes)]
)
cdef long[:] mbytes = _mbytes
cdef long bits, mask
cdef Py_ssize_t ibyte, ibit
for ibyte in range(len_mbytes - 3):
for ibit in range(8):
mask = 0x80 >> ibit
bits = mbytes[ibyte] & mask
if bits > 0:
mbytes[ibyte] = mbytes[ibyte] ^ (G[0] >> ibit)
mbytes[ibyte + 1] = mbytes[ibyte + 1] ^ (
0xFF & ((G[0] << 8 - ibit) | (G[1] >> ibit))
)
mbytes[ibyte + 2] = mbytes[ibyte + 2] ^ (
0xFF & ((G[1] << 8 - ibit) | (G[2] >> ibit))
)
mbytes[ibyte + 3] = mbytes[ibyte + 3] ^ (
0xFF & ((G[2] << 8 - ibit) | (G[3] >> ibit))
)
cdef long result = (mbytes[len_mbytes-3] << 16) | (mbytes[len_mbytes-2] << 8) | mbytes[len_mbytes-1]
return result
cpdef int floor(float x):
"""Mode-S floor function.
Defined as the greatest integer value k, such that k <= x
For example: floor(3.6) = 3 and floor(-3.6) = -4
"""
return <int> c_floor(x)
cpdef str icao(bytes msg):
"""Calculate the ICAO address from an Mode-S message.
Applicable only with DF4, DF5, DF20, DF21 messages.
Args:
msg (String): 28 bytes hexadecimal message string
Returns:
String: ICAO address in 6 bytes hexadecimal string
"""
cdef unsigned char DF = df(msg)
cdef long c0, c1
cdef bytearray bmsg = bytearray(msg)
if DF in (11, 17, 18):
addr = bmsg[2:8].decode()
elif DF in (0, 4, 5, 16, 20, 21):
c0 = crc(msg, encode=True)
c1 = hex2int(bmsg[-6:])
addr = "%06X" % (c0 ^ c1)
else:
addr = None
return addr
cpdef bint is_icao_assigned(bytes icao):
"""Check whether the ICAO address is assigned (Annex 10, Vol 3)."""
if (icao is None) or (not isinstance(icao, str)) or (len(icao) != 6):
return False
cdef long icaoint = hex2int(icao)
if 0x200000 < icaoint < 0x27FFFF:
return False # AFI
if 0x280000 < icaoint < 0x28FFFF:
return False # SAM
if 0x500000 < icaoint < 0x5FFFFF:
return False # EUR, NAT
if 0x600000 < icaoint < 0x67FFFF:
return False # MID
if 0x680000 < icaoint < 0x6F0000:
return False # ASIA
if 0x900000 < icaoint < 0x9FFFFF:
return False # NAM, PAC
if 0xB00000 < icaoint < 0xBFFFFF:
return False # CAR
if 0xD00000 < icaoint < 0xDFFFFF:
return False # future
if 0xF00000 < icaoint < 0xFFFFFF:
return False # future
return True
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef int typecode(bytes msg):
"""Type code of ADS-B message
Args:
msg (string): 28 bytes hexadecimal message string
Returns:
int: type code number
"""
if df(msg) not in (17, 18):
return -1
# return None
cdef bytearray tcbin = hex2bin(msg[8:10])
return bin2int(tcbin[0:5])
@cython.cdivision(True)
cpdef int cprNL(int lat):
"""NL() function in CPR decoding."""
if lat == 0:
return 59
if lat == 87 or lat == -87:
return 2
if lat > 87 or lat < -87:
return 1
cdef int nz = 15
cdef float a = 1 - cos(pi / (2 * nz))
cdef float b = cos(pi / 180.0 * fabs(lat)) ** 2
cdef float nl = 2 * pi / (acos(1 - a / b))
NL = floor(nl)
return NL
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef str idcode(bytes msg):
"""Compute identity (squawk code).
Applicable only for DF5 or DF21 messages, bit 20-32.
credit: @fbyrkjeland
Args:
msg (String): 28 bytes hexadecimal message string
Returns:
string: squawk code
"""
if df(msg) not in [5, 21]:
raise RuntimeError("Message must be Downlink Format 5 or 21.")
cdef bytearray _mbin = hex2bin(msg)
cdef unsigned char[:] mbin = _mbin
cdef bytearray _idcode = bytearray(4)
cdef unsigned char[:] idcode = _idcode
cdef unsigned char C1 = mbin[19]
cdef unsigned char A1 = mbin[20]
cdef unsigned char C2 = mbin[21]
cdef unsigned char A2 = mbin[22]
cdef unsigned char C4 = mbin[23]
cdef unsigned char A4 = mbin[24]
# _ = mbin[25]
cdef unsigned char B1 = mbin[26]
cdef unsigned char D1 = mbin[27]
cdef unsigned char B2 = mbin[28]
cdef unsigned char D2 = mbin[29]
cdef unsigned char B4 = mbin[30]
cdef unsigned char D4 = mbin[31]
# byte1 = int(A4 + A2 + A1, 2)
# byte2 = int(B4 + B2 + B1, 2)
# byte3 = int(C4 + C2 + C1, 2)
# byte4 = int(D4 + D2 + D1, 2)
idcode[0] = int_to_char((char_to_int(A4)*2 + char_to_int(A2))*2 + char_to_int(A1))
idcode[1] = int_to_char((char_to_int(B4)*2 + char_to_int(B2))*2 + char_to_int(B1))
idcode[2] = int_to_char((char_to_int(C4)*2 + char_to_int(C2))*2 + char_to_int(C1))
idcode[3] = int_to_char((char_to_int(D4)*2 + char_to_int(D2))*2 + char_to_int(D1))
return _idcode.decode()
#return str(byte1) + str(byte2) + str(byte3) + str(byte4)
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef int altcode(bytes msg):
"""Compute the altitude.
Applicable only for DF4 or DF20 message, bit 20-32.
credit: @fbyrkjeland
Args:
msg (String): 28 bytes hexadecimal message string
Returns:
int: altitude in ft
"""
if df(msg) not in [0, 4, 16, 20]:
raise RuntimeError("Message must be Downlink Format 0, 4, 16, or 20.")
# Altitude code, bit 20-32
cdef bytearray _mbin = hex2bin(msg)
cdef unsigned char[:] mbin = _mbin
cdef char mbit = mbin[25] # M bit: 26
cdef char qbit = mbin[27] # Q bit: 28
cdef int alt = 0
cdef bytearray vbin
cdef bytearray _graystr = bytearray(11)
cdef unsigned char[:] graystr = _graystr
if mbit == 48: # unit in ft, "0" -> 48
if qbit == 49: # 25ft interval, "1" -> 49
vbin = _mbin[19:25] + _mbin[26:27] + _mbin[28:32]
alt = bin2int(vbin) * 25 - 1000
if qbit == 48: # 100ft interval, above 50175ft, "0" -> 48
graystr[8] = mbin[19]
graystr[2] = mbin[20]
graystr[9] = mbin[21]
graystr[3] = mbin[22]
graystr[10] = mbin[23]
graystr[4] = mbin[24]
# _ = mbin[25]
graystr[5] = mbin[26]
# cdef char D1 = mbin[27] # always zero
graystr[6] = mbin[28]
graystr[0] = mbin[29]
graystr[7] = mbin[30]
graystr[1] = mbin[31]
# graystr = D2 + D4 + A1 + A2 + A4 + B1 + B2 + B4 + C1 + C2 + C4
alt = gray2alt(_graystr)
if mbit == 49: # unit in meter, "1" -> 49
vbin = _mbin[19:25] + _mbin[26:31]
alt = int(bin2int(vbin) * 3.28084) # convert to ft
return alt
cdef int gray2alt(bytearray codestr):
cdef bytearray gc500 = codestr[:8]
cdef int n500 = gray2int(gc500)
# in 100-ft step must be converted first
cdef bytearray gc100 = codestr[8:]
cdef int n100 = gray2int(gc100)
if n100 in [0, 5, 6]:
return -1
#return None
if n100 == 7:
n100 = 5
if n500 % 2:
n100 = 6 - n100
alt = (n500 * 500 + n100 * 100) - 1300
return alt
cdef int gray2int(bytearray graystr):
"""Convert greycode to binary."""
cdef int num = bin2int(graystr)
num ^= num >> 8
num ^= num >> 4
num ^= num >> 2
num ^= num >> 1
return num
cdef bytes data(bytes msg):
"""Return the data frame in the message, bytes 9 to 22."""
return msg[8:-6]
cpdef bint allzeros(bytes msg):
"""Check if the data bits are all zeros.
Args:
msg (String): 28 bytes hexadecimal message string
Returns:
bool: True or False
"""
d = hex2bin(data(msg))
if bin2int(d) > 0:
return False
else:
return True
def wrongstatus(data, sb, msb, lsb):
"""Check if the status bit and field bits are consistency.
This Function is used for checking BDS code versions.
"""
# status bit, most significant bit, least significant bit
status = int(data[sb - 1])
value = bin2int(data[msb - 1 : lsb])
if not status:
if value != 0:
return True
return False

View File

@ -15,6 +15,15 @@ Steps for deploying a new verison:
# Always prefer setuptools over distutils
from setuptools import setup, find_packages
# Compile some parts
from setuptools.extension import Extension
from Cython.Build import cythonize
extensions = [
Extension("pyModeS.c_decoder.common", ["pyModeS/c_decoder/common.pyx"])
]
# To use a consistent encoding
from codecs import open
from os import path
@ -57,6 +66,7 @@ setup(
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 3",
],
ext_modules=cythonize(extensions),
# What does your project relate to?
keywords="Mode-S ADS-B EHS ELS Comm-B",
# You can just specify the packages manually here if your project is