-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathec_978.py
executable file
·1860 lines (1504 loc) · 66.7 KB
/
ec_978.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
ec_978 - Error Correct FIS-B and ADS-B messages.
================================================
Accepts packets (either FIS-B or ADS-B), usually sent from
``demod-978``, into standard input.
The input is sent as two messages, the first is a 30 byte character
string which provides information about the data packet to follow.
This is then followed by a datapacket as bytes suitable as a numpy
I32 array. Two sizes of packets are sent (and indicated as such in the
preceding 30 byte attribute packet): ``PACKET_LENGTH_FISB`` (35340)
for FIS-B packets and ``PACKET_LENGTH_ADSB`` (3084) for ADS-B packets.
Note that while there are two types of ADS-B packets, short and long,
the length is set for long packets. That allows for long and short packets
to be error corrected (you can sort of, but not absolutely, distinguish
between the two at the time they are sent).
After the packets are received, they are packed together into bytes
and error corrected using the Reed-Solomon parity bits. If the initial error
correction is not successful, the bits before and after each bit are used
in a set of weighted averages to come up with a new set of bits. These bits
are then tried. The concept is that since we are sampling at the
Nyquist frequency, the bits actually sampled are usually not at optimum
sampling points. By using their 'bit buddies' (neighbor bits) we can
come up with more optimum sampling points. Empirically, this is
very effective.
If FIS-B packets are not correctly decoded using the above method, other methods
such as supplying known fixed bits in block 0, or detecting runs of zeroes
at the ends of blocks are tried.
Successful corrections are passed to standard output.
"""
import sys
import os
import os.path
import numpy as np
import pyreedsolomon as rs
import argparse
import glob
import time
from datetime import timezone, datetime, timedelta
import shutil
from argparse import RawTextHelpFormatter
# List of positions to shift bits by to error correct messages.
#
# Positive values means you use the bits before, negative values means you use
# the bits after. These are the most likely values to result in a match
# ordered by the likeliness (empirically determined).
#
# This list was created a large number of samples. We found the amount that
# would error correct the largest percentage of packets, and then removed those
# packets from contention. The process was then repeated. It was also found
# that a granularity of less than 5% wasn't useful.
SHIFT_BY_PROBABILITY = [0, -0.75, 0.75, -0.50, 0.50, -0.25, \
0.25, -0.85, 0.40, 0.65, -0.30, 0.80, -.05, 0.05, -0.90, 0.90, \
-0.10, 0.10, 0.85, -0.15, 0.15, -0.80, -0.65, -0.35, 0.35, \
-0.70, 0.70, 0.30, -0.40, -0.60, 0.60, -0.20, 0.20, -0.45, \
0.45, -0.55, 0.55]
# Table that maps the uplink feedback code to the number of packets
# received by a particular channel in the last 32 seconds. These are
# the number of FIS-B packets received by an aircraft over a
# particular channel. The channel is rotated every second.
UPLINK_FEEDBACK_TABLE = [ '0', '1-13', '14-21', '22-25', '26-28', \
'29-30', '31', '32']
# Maps UAT data channel to the power of the station and TIS-B site id.
FISB_DATA_CHANNEL = [ \
'15H','14H','13H','12M','11M','10M','08L','05L/03S', \
'15H','14H','13H','12M','11M','09L','07L','06L/02S', \
'15H','14H','13H','12M','10M','09L','07L','06L/01S', \
'15H','14H','13H','11M','10M','08L','06L','04S']
# Base 40 table for decoding the call signs of ADS-B packets.
BASE40_TABLE = ['0','1','2','3','4','5','6','7','8','9', \
'A','B','C','D','E','F','G','H','I','J','K','L','M', \
'N','O','P','Q','R','S','T','U','V','W','X','Y','Z', \
' ','','x','y']
# Number of bits in a raw FIS-B data block (with RS bits)
BITS_PER_BLOCK = 4416
# Number of bytes in a raw FIS-B data block (with RS bits)
BYTES_PER_BLOCK = BITS_PER_BLOCK // 8
#: Length of attribute string sent with each packet.
ATTRIBUTE_LEN = 36
#: Size of a FIS-B packet in bytes.
#: Derived from ((4416 * 2) + 3) * 4.
#: FIS-B packet has 4416 bits * 2 for 2 samples/bit + 3 for extra
#: bit before actual data and 2 after. Multiplied by 4 since each
#: sample is 4 bytes long.
PACKET_LENGTH_FISB = 35340
#: Size of an ADS-B packet in bytes.
#: Derived from ((384 * 2) + 3) * 4.
#: ADS-B long packet has 384 bits * 2 for 2 samples/bit + 3 for extra
#: bit before actual data and 2 after. Multiplied by 4 since each
#: sample is 4 bytes long. We consider all ADS-B packets as long
#: since this will capture all cases (long and short ADS-B).
PACKET_LENGTH_ADSB = 3084
# If True, show information about failed FIS-B packets as a comment.
show_failed_fisb = False
# If True, show information about failed ADS-B packets as a comment.
show_failed_adsb = False
# If True, do an ADS-B partial decode. Turned on by --apd flag.
adsb_partial_decode = False
# If True, show extra FIS-B timing information in FIS-B comment.
fisb_extra_timing = False
# If True, transform output to mimic dump978 or dump978-fa format
output_d978 = False
output_d978fa = False
# Directory and flag for writing error files.
dir_out_errors = None
writingErrorFiles = False
# Flag to show lowest usable signal levels. Set by --ll.
show_lowest_levels = False
# Flag to repair block zero fixed (not changing) bits.
# Set by --bzfb flag.
block_zero_fixed_bits = True
# Set to True by --saveraw argument. If true, the data we get from
# demod_978 is save to a disk file, one file per packet. Used for
# evaluation, such as eye diagrams.
save_raw_data_to_disk = False
# Repair blocks that end in trailing zeros.
fix_trailing_zeros = True
# Set to True if replacing first 6 byte values. Set by --f6b.
replace_f6b = False
# For --f6b flag, holds number of 1st 6 byte values to check.
f6bArrayLen = 0
# For --f6b flag, contains contents of 1st 6 byte values.
f6bArray = None
# Reed-Solomon error correction Ground Uplink parameters:
# Symbol Size: 8
# Message Symbols: 72 (ADS-B short: 144, ADS-B long: 272)
# Message + ECC symbols: 92 (ADS-B short: 240, ADS-B long: 384)
# Galois field generator polynomial coefficients: 0x187
# fcr (First Consecutive Root): 120
# Primitive element: 1
# Number of roots: 20 (ADS-B short: 12, ADS-B long: 14)
# (same as the number of parity bits)
rsAdsbS = rs.Reed_Solomon(8,18,30,0x187,120,1,12)
rsAdsbL = rs.Reed_Solomon(8,34,48,0x187,120,1,14)
rsFisb = rs.Reed_Solomon(8,72,92,0x187,120,1,20)
def block0ThoroughCheck(hexBlocks):
"""
Look at all consecutive blocks starting with block 0 and see
if the packet has ended and the rest of the packet is
zero-filled. If so, ``hexBlocks`` will be altered and all
remaining blocks will be filled in.
Args:
hexBlocks (list): List of 6 hex blocks, each containing a
string of hex values if already error corrected, or ``None``
if not error corrected.
Returns:
tuple: Tuple containing:
* ``True`` if this packet matched. Else ``False``.
``True`` indicates the packet (all 6 blocks) is entirely error corrected.
* If this was a successful error correction, ``hexBlocks`` is altered
to contain values for all 6 blocks. Otherwise, the value that
``hexBlocks`` contained when the function was called is
returned.
"""
# If we don't have a block 0, we can't continue
if hexBlocks[0] == None:
return False, hexBlocks
# Gather all the consecutive blocks together, starting with block 0
hexData = hexBlocks[0]
lastBlock = 0
for i in range(1,5):
if hexBlocks[i] != None:
hexData += hexBlocks[i]
lastBlock += 1
else:
# Stop at the first non-consecutive block
break
# Turn hex string back to bytes.
dataBytes = bytearray.fromhex(hexData)
dataBytesLen = len(dataBytes)
# Jump through UAT Frames, looking for an empty frame.
bytePtr = 8
# Loop until we have looped over all packets we have.
while True:
if bytePtr + 1 >= dataBytesLen:
break
uatFrameLen = (dataBytes[bytePtr] << 1) | (dataBytes[bytePtr + 1] >> 7)
if uatFrameLen == 0:
# Found end of frame, so we matched.
currentBlock = (bytePtr + 1) // 72
# Set all future blocks to zeroes.
for i in range(currentBlock+1, 6):
hexBlocks[i] = '00' * 72
return True, hexBlocks
# Have a next frame, try to proceed to it
bytePtr += uatFrameLen + 2
# Didn't find anything
return False, hexBlocks
def packAndTest(rs, bits, f6b = False):
"""
Take a set of integers, each integer representing a
single bit, and turn them into binary and pack them into
bytes. Then check them with the Reed-Solomon error correction object.
Args:
rs (reed-solomon object): Reed-Solomon object to use.
This object is different for FIS-B,
ADS-B short, and ADS-B long.
bits (nparray): int32 array containing one integer per bit
for a single FIS-B block or an entire ADS-B message.
f6b (bool): ``True`` if we are forcing the first 6 bytes
of FIS-B block 0 to be a predetermined value. Otherwise ``False``.
Returns:
tuple: Tuple containing:
* String of hex if the error correction was successful, else ``None``.
* Number of errors detected. Will be positive if a success.
Otherwise, will be a negative number.
"""
# Turn integers to 1/0 bits and pack in bytes.
byts = np.packbits(np.where((bits > 0), 1, 0))
if not f6b:
# Do Reed-Solomon error correction.
errCorrected, errs = rs.decode(byts)
if errs >= 0:
errCorrectedHex = errCorrected.tobytes().hex()
return errCorrectedHex, errs
else:
return None, errs
else:
# Try forced value for each entry in list.
for i in range(0, f6bArrayLen):
byts[0:6] = f6bArray[i][0:6]
# Do Reed-Solomon error correction.
errCorrected, errs = rs.decode(byts)
if errs >= 0:
errCorrectedHex = errCorrected.tobytes().hex()
return errCorrectedHex, errs
return None, -74
def fisbExtractBlockBits(samples, offset, blockNumber):
"""
Given a FIS-B block number, extract an integer array of data bits for the
packet as supplied, as well as arrays for one sample before and
one sample after the normal packet. Applies to FIS-B only.
Args:
samples (nparray): Demodulated samples (int32).
offset (int): This is the offset into ``samples`` where we consider.
the 'starting' bit to be. It is usually one, although if we don't
error correct with 1, 2 will be used later. 1 gets the packet that
we error corrected the sync word to. 2 will get the packet as the
set of bits following the packet that we got the sync word for.
blockNumber (int): Block number we are decoding (0-5).
Returns:
tuple: Tuple containing:
* Array of integers (nparray i32) representing the actual error
corrected packet.
* Array of integers (nparray i32) containing 1 bit before the first
returned result.
* Array of integers (nparray i32) containing 1 bit after the first
returned result.
"""
# Arrays where bits will be placed.
bitsHolder = np.empty((92,8), dtype=np.int32)
bitsBeforeHolder = np.empty((92, 8), dtype=np.int32)
bitsAfterHolder = np.empty((92,8), dtype=np.int32)
# Start bit (and 'samples' running pointer)
samplesPtr = offset + ((8 * blockNumber) * 2)
# Each block contains 92 data words.
for i in range(0, 92):
# Extract the bits for a single 8-bit word.
bitsHolder[i] = samples[samplesPtr: samplesPtr+16: 2]
bitsBeforeHolder[i] = samples[samplesPtr-1: samplesPtr + 16 - 1 :2]
bitsAfterHolder[i] = samples[samplesPtr+1: samplesPtr + 16 + 1 :2]
# Data is interleaved. This will deinterleave the block.
samplesPtr += 96 # skip to next byte in block (5 * 8 * 2) + 16
# Array is a list of 92 items with each element containing an array
# of 8 integers. The reshape calls will turn this into a single list
# of 92 * 8 integers.
return np.reshape(bitsHolder, -1), np.reshape(bitsBeforeHolder, -1), \
np.reshape(bitsAfterHolder, -1)
def adsbExtractBlockBits(samples, offset, isShort):
"""
Extract an integer array of data bits for the
packet as supplied, as well as arrays for one sample before, and
one sample after the normal packet. Applies to ADS-B only.
ADS-B packets come in two flavors, short and long. Most are
long, but short packets have the first 5 bits set to zero. This gives
a good first indication of which one to try first, but because of errors,
both will be tried. There is about a 10:1 ratio of long to short packets.
Args:
samples (nparray): Demodulated samples (int32).
offset (int): This is the offset into ``samples`` where we consider
the 'starting' bit to be. It is usually one, although if we don't
error correct with 1, 2 will be used later. 1 gets the packet that we extracted
the sync word from. 2 will get the packet as the set of bits following
the packet that we got the sync word for.
isShort (bool): ``True`` if we are attempting to error correct a short ADS-B
packet, or ``False`` if this a long packet attempt.
Returns:
tuple: Tuple containing:
* Array of integers (nparray i32) representing the actual error corrected packet.
* Array of integers (nparray i32) containing 1 bit before the first returned
result.
* Array of integers (nparray i32) containing 1 bit after the first returned
result.
"""
# Long packet is 48 bytes.
numBytes = 48
if isShort:
# Short packet is 30 bytes.
numBytes = 30
# Create the samples
bits = samples[offset:(numBytes * 16):2]
bitsBefore = samples[offset-1:(numBytes * 16) - 1:2]
bitsAfter = samples[offset+1:(numBytes * 16) + 1:2]
return bits, bitsBefore, bitsAfter
def shiftBits(bits, neighborBits, shiftAmount):
"""
Given a set of packet bits, and either bits that
came before or after the normal packet bits, multiply
each neighbor bit by a percentage and add it to the
packet bit.
This is the magic behind better error correction. Since we are
sampling at the Nyquist rate, we can either be sampling
near the optimum sampling points, or the worst sampling
points. We use neighbor samples to influence where we
sample and provide alternate set of values to try to
error correct closer to the optimum sampling point.
For really strong signals, this doesn't have much effect,
most packets will error correct well. But for weaker signals,
this can provide close to a doubling of packets error corrected.
Shift amounts are found in ``SHIFT_BY_PROBABILITY`` and
are ordered by what shift values will produce the quickest
error corrections (using lots of empirical samples). The first shift
is always 0, since this is most likely to match. Well over 99%
of all matches will match within two tries.
You can also get a good first approximation by determining
zero-crossing values, but using the shift table is
essentially just as fast.
Args:
bits (nparray): Sample integers array. Int32.
neighborBits (nparray): Sample integers array of samples either
before (``bitsBefore``) or after (``bitsAfter``) the sample. Int32.
shiftAmount (float): Amount to shift bits toward the
``neighborBits`` as a percentage.
Returns:
nparray: ``bits`` array after shifting.
"""
# Add a percentage of a neighbor bit to the sample bit.
# Samples are positive and negative numbers, so this
# will either raise or lower the sample point.
shiftedBits = (bits + (neighborBits * shiftAmount))/2
# This an alternative formula for shifted bits, works at the
# same speed and produces near identical results. Not sure
# which is the absolute best one to use.
#
# One test running against 110,000 errors, this formula resulted
# in 188 more decodes (5772 to 5564). Over about an 11 minute
# run, this method was only 5 seconds slower.
#
# In a regular run of normal packets, this formula gets the
# same to slightly lesser number of packets.
#
#shiftedBits = ((neighborBits - bits) * shiftAmount) + bits
return shiftedBits
def tryShiftBits(rs, bits, bitsBefore, bitsAfter, tryFirst, \
f6b = False, block0FixedBits = False):
"""
Given 3 packets of data (sample, before sample, and
after sample), error correct the sample using various shifts.
The bits represent a single block of a FIS-B message,
or the entire ADS-B message.
Args:
rs (reed-solomon object): Reed-Solomon object to use.
One of three instances are passed dependent on the
type of packet being error corrected (FIS-B, ADS-B short,
ADS-B long).
bits (nparray): Integers representing the current packet.
bitsBefore (nparray): Integers representing the bits
before the current packet.
bitsAfter (nparray): Integers representing the bits
after the current packet.
tryFirst (int): Try this shift first. Is ignored if -1.
This is used for FIS-B packets to set the shift to one
that worked for a previous shift.
f6b (bool): ``True`` if we are to replace the first 6 bytes of a block
0 packet with a set of fixed values. If this is ``True``, it is assumed that
``bits`` is from a block 0 packet.
block0FixedBits (bool): ``True`` if we are to force a set of constant bits
for block 0. If ``True``, it is assumed that ``bits`` are from a block
0 packet.
Returns:
tuple: Tuple containing:
* ``True`` if there was a successful error correction, else ``False``.
* String of hex if the error correction was successful, else ``None``.
* Number of errors found. Will be 10 or less for successfull
FIS-B error correction (6 for ADS-B short, 7 for ADS-B long)
or 98 for a failed correction.
* Shift value that was successful, or -1 if not successful. Used as
``tryFirst`` on the next run.
"""
# If a previous packet was decoded with a shift, use that shift as the
# first attempt. It will almost always be successful.
if tryFirst != -1:
if tryFirst == 0:
shiftedBits = bits
elif tryFirst > 0:
shiftedBits = shiftBits(bits, bitsBefore, tryFirst)
else:
shiftedBits = shiftBits(bits, bitsAfter, -tryFirst)
errCorrectedHex, errs = packAndTest(rs, shiftedBits, f6b)
if errs >= 0:
return True, errCorrectedHex, errs, tryFirst
# Try all shifts in ``SHIFT_BY_PROBABILITY`` list.
for shift in SHIFT_BY_PROBABILITY:
# Don't try a shift we already tried
if shift == tryFirst:
continue
if shift == 0:
shiftedBits = bits
elif shift > 0:
shiftedBits = shiftBits(bits, bitsBefore, shift)
else:
shiftedBits = shiftBits(bits, bitsAfter, -shift)
if block0FixedBits:
# For block zero fixed bits, force any fixed bits to
# the appropriate value. This will only be called for
# block zero.
# Note: This does not include 'position valid' in byte 6
# (bit 47). This is always zero in reality, but standard states it
# should be one.
shiftedBits[48] = +10000 # UTC coupled
shiftedBits[49] = -10000 # Reserved
shiftedBits[50] = +10000 # App Data Valid
shiftedBits[60] = -10000 # Reserved
shiftedBits[61] = -10000 # Reserved
shiftedBits[62] = -10000 # Reserved
shiftedBits[63] = -10000 # Reserved
shiftedBits[73] = -10000 # Reserved (UAT Frame byte 2)
shiftedBits[74] = -10000 # Reserved (UAT Frame byte 2)
shiftedBits[75] = -10000 # Reserved (UAT Frame byte 2)
errCorrectedHex, errs = packAndTest(rs, shiftedBits, f6b)
if errs >= 0:
return True, errCorrectedHex, errs, shift
return False, None, 98, -1
def blockZeroTricks(bits, bitsBefore, bitsAfter, hexBlocks):
"""
For a block zero packet, this will force the
first 6 bytes to be a particular value, and/or set a number of bits
to values that will always be true.
The first 6 bytes of block zero contain the latitude and longitude of the
ground station. With the receiver at a fixed location, only one value
will usually be received. These can be forced into ``bits``. This is set with
the ``--f6b`` flag. It is possible to provide more than one set of 6 bytes
with the ``--f6b`` flag. If provided, all will be tried.
Block 0 fixed bits are a set of bits always set (either 1 or 0) in block 0.
This option is normally on, but can be turned off with the ``--nobzfb`` flag.
Note that this routine is normally called after a normal decode of a packet is
tried.
Args:
bits (nparray): Integers representing the current packet.
bitsBefore (nparray): Integers representing the bits
before the current packet.
bitsAfter (nparray): Integers representing the bits
after the current packet.
hexBlocks (list): 6 item list containing 1 hex string for each decoded hex
block, or ``None``.
Returns:
tuple: Tuple containing:
* ``True`` if there was a successful error correction, else ``False``.
* Array of ``hexblocks`` that was passed into the routine. It will be
unchanged if the decode was not a success, otherwise ``hexblocks[0]``
will contain the hex characters for block 0.
* Number of errors found. Will be 10 or less for successfull
error correction or 98 for a failed correction.
"""
# Using this in a file of 110951 all cause errors resulted in
# this additional number of complete packet decodes:
# 1st 6 byte fix only: 4908 (4.4%)
# block zero fix only: 756 (0.7%)
# combined: 5564 (5.0%)
status, errCorrectedHex, errs, _ = tryShiftBits(rsFisb, bits, bitsBefore, \
bitsAfter, -1, replace_f6b, block_zero_fixed_bits)
if status:
hexBlocks[0] = errCorrectedHex
return True, hexBlocks, errs
return False, hexBlocks, 98
def computeAverage1(bits):
"""
Given a set of bits representing a single FIS-B block, compute the average of all
1 values from the first 8 bytes and the last 20 bytes. Used as part of the
process for finding runs of zero values.
Args:
bits (nparray): Numpy array of bits to check.
Returns:
float: Calculated average 1 value of the first 8 bytes and last 20 bytes.
"""
# Isolate first 8 bytes and last 20 bytes.
frontBits = bits[0:64]
parityBits = bits[576:736]
# Remove all values below zero
frontOnes = np.delete(frontBits, np.where(frontBits < 0))
parityOnes = np.delete(parityBits, np.where(parityBits < 0))
# Calculate total number of samples.
totalOnes = frontOnes.size + parityOnes.size
# Sum the values of all the ones and divide by the number of samples to
# get the average.
ave = (np.sum(frontOnes) + np.sum(parityOnes)) / totalOnes
return ave
def computeAverage0(bits):
"""
Given a set of bits, compute the average of all the values below zero.
Args:
bits (nparray): Numpy array of bits to check.
Returns:
float: Calculated average zero value.
"""
return (np.average(np.delete(bits, np.where(bits > 0))))
def computePercentAboveAveOne(bits, aveOne, adjFactor):
"""
Given a set of ``bits``, compute the percentage of bits greater than
``aveOne`` times a percentage (``adjFactor``, usually 110%).
Args:
bits (nparray): Set of bits that the percentage will by computed from.
This is subset of a FIS-B packet, usually 128 bits taken from the part
of a complete FIS-B block that isn't the first 8 bytes, or last 20 parity
bytes.
aveOne (float): Average 1 bit derived from the first 8 bytes and last 20 bytes
of a FIS-B packet.
adjFactor (float): ``aveOne`` is multiplied by this value to obtain a
threshold value. Values above this threshold value are used to compute
the percentage.
Returns:
str: Percentage of bits greater than ``aveOne`` * ``adjFactor``.
"""
aveOneAdj = aveOne * adjFactor
aboveAveBits = np.sum(np.where(bits >= aveOneAdj, 1, 0))
ave = aboveAveBits / bits.size
return ave
def fixZeros(block):
"""
Given a FIS-B block, inspect the block and try to find a set of trailing zeros
at the end of the block. If found, set the value of the zeros to the average
zero for the entire block. This is called if a FIS-B block failed its first
attempts at decoding.
We first compute the average one value of the first 8 bytes of the block
and the last 20 bytes of the block (parity bits). These bits will have a
number of one bits.
Then we divide the rest of the block into 4 equal parts of 180 bits.
Starting from the back of the block,
we check the percentage of bits in the block that is greater than 110% of the
average one value. If the percentage is > 2%, we assume the block is not a block
of zeros.
If it is a block of zeros, we continue backwards looking at each quarter section
and computing if it is a block of zeros or not.
As a last step, we take the last section that we think doesn't have zeros, and
again starting at the end of the section, go back bit by bit until we find a
bit that is greater than 87% of the average one value. When we find such a bit,
we move at least 8 bits forward, and then potentially more bits forward, to align
to a byte boundary.
At this point, we will have a starting point where the block has zeros. We then set
those bits to the average zero value for the entire packet.
All of the somewhat arbitrary appearing numbers (110%, 87%, 2%) were obtained
empirically using a large set of error packets.
Args:
block (nparray): Set of bits representing a single FIS-B block.
Returns:
tuple: Tuple containing:
* ``True`` if we found a run of zeros and set them to a single value.
Else ``False``.
* Modified version of the ``block`` argument we were called with. Will have
the zero run set to the average zero value of the block. If the first tuple
item was ``False``, the contents of the ``block`` argument is returned
unchanged.
"""
# Out of 110951 errors, this repaired 15156 or 13.7%.
# Ran at 228/sec.
aveOne = computeAverage1(block)
startValue = 576
foundAnyZeros = False
# Divide the block into 128 bit 'quarters', starting from the back.
for i in range(576, 64, -128):
sample = block[i-128:i]
percentAboveAveOne = computePercentAboveAveOne(sample, aveOne, 1.10)
if percentAboveAveOne > 0.02:
break
if i != 64:
startValue = i - 128
else:
i = 64
foundAnyZeros = True
# Try to get finer grain by going backwards and looking
# for 1st value above 60% of 'percentAboveAveOne'.
newStartValue = startValue
if startValue != 64:
threshold = aveOne * 0.87 # empirically derived
for i in range(startValue - 1, startValue - 128, -1):
if block[i] > threshold:
newStartValue = i + 8
# align to nearest byte
rem = newStartValue % 8
if rem != 0:
newStartValue += 8 - rem
if newStartValue > startValue:
newStartValue = startValue
else:
foundAnyZeros = True
break
# We have zeros in the block, so set them to the average zero in the block.
if foundAnyZeros:
block[newStartValue:576] = computeAverage0(block)
return foundAnyZeros, block
def fisbDecode(samples, offset, hexBlocks, hexErrs):
"""
Given a FIS-B raw message, attempt to error correct all blocks.
We loop over each block and determine the bits, bits before, and
bits after. We then attempt to error correct using
``tryShiftBits()`` which will try different combinations.
We also employ various procedures such as seeing if a packet
ends early (like an empty packet) where we do not need to error correct
further blocks. Further processing will also be attempted if the intital
attempt does not decode.
Args:
samples (nparray): Int 32 array of samples containing the bit
before the actual sample and two bits after the normal sample.
offset (int): The normal value is 1, which uses the starting bit
(``samples[1]``) of the normal sample. Can also be 2, which looks at the set
of bits after the current sample (``samples[2]``). Using this can result in a
small, but significant increase, in error corrections.
hexBlocks (list): 6 item list containing a hex string for each
block, or ``None``. When first called with an offset of 1, will set hexBlocks
to a list of 6 ``None`` values.
hexErrs (list): List of 6 items containing integers representing
the error count for each FIS-B block. When first called with an offset of
1, will be set to a list of 6 ``99`` values. ``99`` indicates no
attempt to decode a block has been made.
Returns:
tuple: Tuple containing:
* ``True`` if there was a successful error correction, else ``False``.
* Updated version of ``hexBlocks`` which will be a 6 item list
containing ``None`` for blocks that did not error correct, or a
hex string with the error corrected value.
* Updated version of ``hexErrs`` which will be a 6 item list
with each element being number of errors found in the block
(0-10), or ``98`` for a block that failed to error correct, and ``99`` if
the block was not checked for errors.
"""
# Create hexBlocks if first time call.
if hexBlocks == None:
hexBlocks = [None, None, None, None, None, None]
# Create hexErrs if first time call.
if hexErrs == None:
hexErrs = [99, 99, 99, 99, 99, 99]
shiftThatWorked = -1
# Loop and try to error correct each block
for block in range(0, 6):
if hexBlocks[block] != None:
continue
bits, bitsBefore, bitsAfter = fisbExtractBlockBits(samples, offset, \
block)
# Shift bits
status, errCorrectedHex, hexErrs[block], shift = tryShiftBits(rsFisb, \
bits, bitsBefore, bitsAfter, shiftThatWorked)
if status:
# Start next block with the shift that worked.
shiftThatWorked = shift
hexBlocks[block] = errCorrectedHex
# For block0, see if empty frame. We put this check here since
# 'tryShiftBits' is the most likely opportunity to error correct a
# block zero, and this prevents us from even trying to error correct
# other frames. Empty frames are very common.
if block == 0:
foundEmptyFrame, hexBlocks = block0ThoroughCheck(hexBlocks)
if foundEmptyFrame:
return True, hexBlocks, hexErrs
continue
# Extra checks for block 0
if block == 0:
if replace_f6b or block_zero_fixed_bits:
status, hexBlocks, hexErrs[block] = blockZeroTricks(bits, \
bitsBefore, bitsAfter, hexBlocks)
if status:
foundEmptyFrame, hexBlocks = block0ThoroughCheck(hexBlocks)
if foundEmptyFrame:
return True, hexBlocks, hexErrs
continue
# Look for blocks with trailing zeros
if fix_trailing_zeros:
foundAnyZeros, bits = fixZeros(bits)
if foundAnyZeros:
status, hexBlocks[block], hexErrs[block], shift = tryShiftBits(rsFisb, \
bits, bitsBefore, bitsAfter, shiftThatWorked)
if status:
foundEmptyFrame, hexBlocks = block0ThoroughCheck(hexBlocks)
if foundEmptyFrame:
return True, hexBlocks, hexErrs
continue
# There are still blocks with errors
# Nothing worked, abandon and try block0ThoroughCheck
# Don't process other blocks.
if not status:
break
# If there are None values in hexBlocks, nothing worked.
if None in hexBlocks:
# Do final check for early ending packet.
status, hexBlocks = block0ThoroughCheck(hexBlocks)
if status:
return True, hexBlocks, hexErrs
return False, hexBlocks, hexErrs
# Otherwise, all blocks were error corrected.
return True, hexBlocks, hexErrs
def adsbDecode(samples, offset, isShort):
"""
Given a ADS-B raw message, attempt to error correct all blocks.
Args:
samples (nparray): int32 array of samples containing the bit
before the normal sample and two bits after the normal sample.
offset (int): The normal value is 1, which uses the starting bit
(``samples[1]``) of the normal sample. Can also be 2, which looks at the set
of bits after the current sample (``samples[2]``).
isShort (bool): ``True`` if we are trying to match a short ADS-B
packet, otherwise ``False``.
Returns:
tuple: Tuple containing:
* ``True`` if there was a successful error correction, else ``False``.
* Hex string representing the error corrected message.
* Number of errors found in the message (0-6 for ADS-B short,
0-7 for ADS-B long, or ``98`` for a message that failed to error correct).
"""
# Short and long ADS-B use different Reed-Solomon instances.
if isShort:
rs = rsAdsbS
else:
rs = rsAdsbL
# Create the bits to use.
bits, bitsBefore, bitsAfter = adsbExtractBlockBits(samples, \
offset, isShort)
# Shift bits
status, hexBlock, errs, _ = tryShiftBits(rs, bits, bitsBefore, bitsAfter, -1)
if status:
# These don't always decode correctly. Make sure that short messages are
# always payload type 0 or 12, and long messages are always payload types
# 1 to 14 (excluding 12).
# Also make sure the length of the hex string matches the payload type.
hexBlockLen = len(hexBlock)
byte0 = bytes.fromhex(hexBlock[0:2])[0]
payloadTypeCode = (byte0 & 0xF8) >> 3
if (payloadTypeCode in [0, 12]) and (hexBlockLen == 36):
return True, hexBlock, errs
elif payloadTypeCode in [1,2,3,4,5,6,11,13,14] \
and (hexBlockLen == 68):
return True, hexBlock, errs
return False, None, 98
def fisbHexErrsToStr(hexErrs):
"""
Given an list containing error entries for each FIS-B block, return a
string representing the errors. This will appear as a comment in either
the result string, or the failed error message.
Args:
hexErrs (list): List of 6 items, one for each FIS-B block. Each entry
will be the number of errors in the message (0-10), or 98 for a packet
that failed, and 99 for a packet that wasn't tried.
Returns:
str: String containing display string for error messages.
"""
return f'{hexErrs[0]:02}:{hexErrs[1]:02}:{hexErrs[2]:02}:{hexErrs[3]:02}:' + \
f'{hexErrs[4]:02}:{hexErrs[5]:02}'
def fisbHexBlocksToHexString(hexBlocks):
"""
Given a list of 6 items, each containing a hex-string of
an error corrected block, return a string with all 6 hex strings
concatinated. Used only for FIS-B.
This only returns the bare hex-string with '+'
prepended and no time, signal strength, or error count attached.
Args:
hexBlocks (list): List of 6 strings containing a hex string for each
of the 6 FIS-B hexblocks. Only called if all blocks error corrected
successfully (i.e. there should be no ``None`` items).
Returns:
str: String with all hex data and a '+' prepended at the front.
This function is only for FIS-B packets. No comment data is appended to
the end.
"""
return '+' + hexBlocks[0] + hexBlocks[1] + hexBlocks[2] + \
hexBlocks[3] + hexBlocks[4] + hexBlocks[5]
def fisbTimingDecode(hexBlock0, timeStr):
"""
Given the first hex block of a FIS-B message (as a string)
and the time of arrival,
calculate a string containing the site-id, data channel, mso,
slot-id, message delay time.
Args:
hexBlock0 (str): String of hex bytes for block 0.
timeStr (str): Epoch UTC time the message arrived.
Returns:
str: String with additional information about the timing of this
message.
"""
# Turn hexBlock0 into bytes array and extract slotId and TIS-B site ID
block0Bytes = bytes.fromhex(hexBlock0)
slotId = block0Bytes[6] & 0x1f
tisbSiteId = (block0Bytes[7] >>4) & 0x0f
# Calculate time in ms packet transmitted. Each slot is 5.5ms and
# MSOs start at 6ms
xmtMs = 6 + (slotId * 5.5)
# Calculate MSO. Each slot takes up 22 MSOs
mso = slotId * 22
timeStrAsFloat = float(timeStr)
msPartOfTimeStr = timeStrAsFloat - int(timeStrAsFloat)
xmtDelay = int((msPartOfTimeStr * 1000) - xmtMs)
# Get time in seconds past midnight (for data channel determination)
packetTime = datetime.fromtimestamp(timeStrAsFloat, tz=timezone.utc)
secsPastMidnightMod32 = int((packetTime - packetTime.replace(hour=0, \
minute=0, second=0)).total_seconds()) % 32
dataChannel0Based = slotId - secsPastMidnightMod32
if dataChannel0Based < 0:
dataChannel0Based += 32
dataChannel = dataChannel0Based + 1
return f'/{slotId+1}:{mso}:{xmtMs}/{xmtDelay}/{tisbSiteId}/{dataChannel}'
def fisbHexBlocksFormatted(hexBlocks, signalStrengthStr, timeStr, hexErrs, \
syncErrors):
"""
Takes error corrected FIS-B message and produces final output string.
Will concatenate all the hex blocks and add trailing error, signal strength
and time information. Should be called only for a successfully error corrected
FIS-B packet.