-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFlightGazer.py
3230 lines (2957 loc) · 136 KB
/
FlightGazer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# _/_/_/_/ _/_/ _/ _/ _/ _/_/_/
# _/ _/ _/_/_/ _/_/_/ _/_/_/_/ _/ _/_/_/ _/_/_/_/ _/_/ _/ _/_/
# _/_/_/ _/ _/ _/ _/ _/ _/ _/ _/ _/_/ _/ _/ _/ _/_/_/_/ _/_/
# _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/
# _/ _/_/ _/ _/_/_/ _/ _/ _/_/ _/_/_/ _/_/_/ _/_/_/_/ _/_/_/ _/
# _/ by: WeegeeNumbuh1
# _/_/
"""
A program heavily inspired by https://github.com/ColinWaddell/its-a-plane-python, but supplements flight information of
nearby planes with real-time ADS-B and UAT data from dump1090 and dump978. Uses the FlightAware API instead of FlightRadar24.
"""
# =============== Imports ==================
# ==========================================
import time
START_TIME: float = time.monotonic()
import datetime
STARTED_DATE: datetime = datetime.datetime.now()
VERSION: str = 'v.2.4.1 --- 2025-02-01'
import os
os.environ["PYTHONUNBUFFERED"] = "1"
import argparse
import sys
import math
from pathlib import Path
from contextlib import closing
from urllib.request import urlopen, Request
import urllib.parse
import json
import signal
import threading
import asyncio
from collections import deque
from string import Formatter
import random
from getpass import getuser
import socket
import logging
# external imports
import requests
from pydispatch import dispatcher # pip install pydispatcher *not* pip install pydispatch
import schedule
import psutil
from suntime import Sun, SunTimeException
# utilities
import utilities.flags as flags
import utilities.registrations as registrations
from utilities.animator import Animator
from setup import frames
argflags = argparse.ArgumentParser(
description="FlightGazer, a program to show dump1090 info to an RGB-Matrix display.",
epilog="Protip: Ensure your location is set in your dump1090 configuration!"
)
argflags.add_argument('-i', '--interactive',
action='store_true',
help="Print program output to console. If this flag is not used, this program runs silently."
)
argflags.add_argument('-e', '--emulate',
action='store_true',
help="Run the display in emulator mode via RGBMatrixEmulator."
)
argflags.add_argument('-d', '--nodisplay',
action='store_true',
help="Only show console output and do not use the display. Implies Interactive mode."
)
argflags.add_argument('-f', '--nofilter',
action='store_true',
help="Disable filtering and show all planes detected by dump1090.\n\
Disables API fetching and Display remains as a clock.\n\
Implies Interactive mode."
)
argflags.add_argument('-v', '--verbose',
action='store_true',
help="Log/display more detailed messages.\n\
This flag is useful for debugging.")
args = argflags.parse_args()
if args.interactive:
INTERACTIVE: bool = True
else:
INTERACTIVE = False
if args.emulate:
EMULATE_DISPLAY: bool = True
else:
EMULATE_DISPLAY = False
if args.nodisplay:
NODISPLAY_MODE: bool = True
INTERACTIVE = True
else:
NODISPLAY_MODE = False
if args.nofilter:
NOFILTER_MODE: bool = True
INTERACTIVE = True
else:
NOFILTER_MODE = False
if args.verbose:
VERBOSE_MODE: bool = True
else:
VERBOSE_MODE = False
FORGOT_TO_SET_INTERACTIVE: bool = False
# =========== Initialization I =============
# ==========================================
if __name__ != '__main__':
print("FlightGazer cannot be imported as a module.")
sys.exit(1)
# setup logging
main_logger = logging.getLogger("FlightGazer")
CURRENT_DIR = Path(__file__).resolve().parent
CURRENT_USER = getuser()
LOGFILE = Path(f"{CURRENT_DIR}/FlightGazer-log.log")
LOGFILE.touch(mode=0o777, exist_ok=True)
with open(LOGFILE, 'a') as f:
f.write("\n") # append a newline at the start of logging
logging_format = logging.Formatter(
fmt='%(asctime)s.%(msecs)03d - %(threadName)s | %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
# set root logger to write out to file but not stdout
logging.basicConfig(
filename=LOGFILE,
format='%(asctime)s.%(msecs)03d - %(name)s %(threadName)s | %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
encoding='utf-8',
level=logging.DEBUG if VERBOSE_MODE else logging.INFO,
)
# add a stdout stream for the logger that we can disable if we run interactively
# NB: in `main_logger.handlers` this handler will be in index 0 (the default one we set above does not add a handler),
# so to stop the stdout stream, use main_logger.removeHandler()
stdout_stream = logging.StreamHandler(sys.stdout)
stdout_stream.setLevel(logging.NOTSET)
stdout_stream.setFormatter(logging_format)
main_logger.addHandler(stdout_stream)
main_logger.info("==============================================================")
main_logger.info("=== Welcome to FlightGazer! ===")
main_logger.info("==============================================================")
main_logger.info(f"FlightGazer Version: {VERSION}")
main_logger.info(f"Script started: {STARTED_DATE.replace(microsecond=0)}")
main_logger.info(f"We are running in \'{CURRENT_DIR}\'")
main_logger.info(f"Using: \'{sys.executable}\' as \'{CURRENT_USER}\' with PID: {os.getpid()}")
FLYBY_STATS_FILE = Path(f"{CURRENT_DIR}/flybys.csv")
CONFIG_FILE = Path(f"{CURRENT_DIR}/config.yaml")
API_URL: str = "https://aeroapi.flightaware.com/aeroapi/"
USER_AGENT: dict = {'User-Agent': "Wget/1.21.3"}
""" Use Wget user-agent for our requests """
DUMP1090_IS_AVAILABLE: bool = False
""" If we fail to load dump1090, set to False and continue. We assume it isn't loaded at first. """
LOOP_INTERVAL: float = 2
""" in seconds. Affects how often we poll `dump1090`'s json (which itself atomically updates every second).
Affects how often other processing threads handle data as they are triggered on every update.
Should be left at 2 (or slower) """
if not VERBOSE_MODE:
sys.tracebacklimit = 0
else:
main_logger.debug("Verbose mode enabled.")
# load in all the display-related modules
DISPLAY_IS_VALID: bool = True
if not NODISPLAY_MODE:
try:
try:
if EMULATE_DISPLAY: raise Exception
from rgbmatrix import graphics
from rgbmatrix import RGBMatrix, RGBMatrixOptions
except:
# this is for debugging display output outside of physical hardware
from RGBMatrixEmulator import graphics
from RGBMatrixEmulator import RGBMatrix, RGBMatrixOptions
# these modules depend on the above, so they should load successfully at this point,
# but if they break somehow, we can still catch it
from setup import colors, fonts
if 'RGBMatrixEmulator' in sys.modules:
# INTERACTIVE = True
EMULATE_DISPLAY = True
except:
DISPLAY_IS_VALID = False
main_logger.error("Cannot load display modules. There will be no display output!")
main_logger.warning(">>> This script will still function as a basic flight parser and stat generator,")
main_logger.warning(">>> if the environment allows.")
main_logger.warning(">>> If you're sure you don't want to use any display output,")
main_logger.warning(">>> use the \'-d\' flag to suppress this warning.")
time.sleep(2)
else:
DISPLAY_IS_VALID = False
main_logger.info("Display output disabled. Running in console-only mode.")
# If we invoked this script by terminal and we forgot to set any flags, set this flag.
# This affects how to handle our exit signals (previously)
if not INTERACTIVE:
if sys.__stdin__.isatty(): FORGOT_TO_SET_INTERACTIVE = True
# make additional use for psutil
this_process = psutil.Process()
this_process_cpu = this_process.cpu_percent(interval=None)
CORE_COUNT = os.cpu_count()
if CORE_COUNT is None:
CORE_COUNT = 1
# =========== Settings Load-in =============
# ==========================================
# Define our settings and initalize to defaults
FLYBY_STATS_ENABLED: bool = False
HEIGHT_LIMIT: float = 15000
RANGE: float = 2
API_KEY: str|None = ""
API_DAILY_LIMIT: int|None = None
CLOCK_24HR: bool = True
CUSTOM_DUMP1090_LOCATION: str = ""
CUSTOM_DUMP978_LOCATION: str = ""
BRIGHTNESS: int = 100
GPIO_SLOWDOWN: int = 2
HAT_PWM_ENABLED: bool = False
RGB_ROWS: int = 32
RGB_COLS: int = 64
LED_PWM_BITS: int = 8
UNITS: int = 0
FLYBY_STALENESS: int = 60
ENHANCED_READOUT: bool = False
DISPLAY_SUNRISE_SUNSET: bool = False
DISPLAY_RECEIVER_STATS: bool = False
ENABLE_TWO_BRIGHTNESS: bool = True
BRIGHTNESS_2: int = 50
BRIGHTNESS_SWITCH_TIME: dict = {"Sunrise":"06:00","Sunset":"18:00"}
USE_SUNRISE_SUNSET: bool = True
ACTIVE_PLANE_DISPLAY_BRIGHTNESS: int|None = None
LOCATION_TIMEOUT: int = 60
''' Programmer's notes for settings that are dicts:
Don't change key names or extend the dict. You're stuck with them once baked into this script.
Why? The settings migrator can't handle migrating dicts that have different keys.
ex: SETTING = {'key1':val1, 'key2':val2} (user's settings)
SETTING = {'key1':val10, 'key2':val20, 'key3':val3} (some hypothetical extension for SETTING in new config)
* settings migration *
SETTING = {'key1':val1, 'key2':val2} (migrated settings) '''
# Create our settings as a dict
# NB: if we don't want to load certain settings,
# we can simply remove elements from this dictionary
# but be cautious of leaving out keys that are used elsewhere
settings_values: dict = {
"FLYBY_STATS_ENABLED": FLYBY_STATS_ENABLED,
"HEIGHT_LIMIT": HEIGHT_LIMIT,
"RANGE": RANGE,
"API_KEY": API_KEY,
"API_DAILY_LIMIT": API_DAILY_LIMIT,
"CLOCK_24HR": CLOCK_24HR,
"CUSTOM_DUMP1090_LOCATION": CUSTOM_DUMP1090_LOCATION,
"CUSTOM_DUMP978_LOCATION": CUSTOM_DUMP978_LOCATION,
"BRIGHTNESS": BRIGHTNESS,
"GPIO_SLOWDOWN": GPIO_SLOWDOWN,
"HAT_PWM_ENABLED": HAT_PWM_ENABLED,
"RGB_ROWS": RGB_ROWS,
"RGB_COLS": RGB_COLS,
"LED_PWM_BITS": LED_PWM_BITS,
"UNITS": UNITS,
"FLYBY_STALENESS": FLYBY_STALENESS,
"ENHANCED_READOUT": ENHANCED_READOUT,
"DISPLAY_SUNRISE_SUNSET": DISPLAY_SUNRISE_SUNSET,
"DISPLAY_RECEIVER_STATS": DISPLAY_RECEIVER_STATS,
"ENABLE_TWO_BRIGHTNESS": ENABLE_TWO_BRIGHTNESS,
"BRIGHTNESS_2": BRIGHTNESS_2,
"BRIGHTNESS_SWITCH_TIME": BRIGHTNESS_SWITCH_TIME,
"USE_SUNRISE_SUNSET": USE_SUNRISE_SUNSET,
"ACTIVE_PLANE_DISPLAY_BRIGHTNESS": ACTIVE_PLANE_DISPLAY_BRIGHTNESS,
"LOCATION_TIMEOUT": LOCATION_TIMEOUT,
}
""" Dict of default settings """
CONFIG_MISSING: bool = False
main_logger.info("Loading configuration...")
try:
from ruamel.yaml import YAML
yaml = YAML()
except:
main_logger.warning("Failed to load required module \'ruamel.yaml\'. Configuration file cannot be loaded.")
main_logger.info(">>> Using default settings.")
CONFIG_MISSING = True
if not CONFIG_MISSING:
try:
config = yaml.load(open(CONFIG_FILE, 'r'))
except:
main_logger.warning(f"Cannot find configuration file \'config.yaml\' in \'{CURRENT_DIR}\'")
main_logger.info(">>> Using default settings.")
CONFIG_MISSING = True
if not CONFIG_MISSING:
try:
config_version = config['CONFIG_VERSION']
except KeyError:
main_logger.warning("Warning: Cannot determine configuration version. This may not be a valid FlightGazer config file.")
main_logger.info(">>> Using default settings.")
CONFIG_MISSING = True
''' We do the next block to enable backward compatibility for older config versions.
In the future, additional settings could be defined, which older config files
will not have, so we attempt to load what we can and handle cases when the setting value is missing.
This shouldn't be an issue when FlightGazer is updated with the update script, but we still have to import the settings. '''
if not CONFIG_MISSING:
for setting_key in settings_values:
try:
globals()[f"{setting_key}"] = config[setting_key] # match setting key from config file with expected keys
except:
# ensure we can always revert to default values
globals()[f"{setting_key}"] = settings_values[setting_key]
main_logger.warning(f"{setting_key} missing, using default value")
else:
main_logger.info(f"Loaded settings from configuration file. Version: {config_version}")
main_logger.info("Checking settings configuration...")
# =========== Global Variables =============
# ==========================================
general_stats: dict = {'Tracking':0, 'Range':0}
""" General dump1090 stats (updated per loop).
`general_stats` = {`Tracking`, `Range`} """
receiver_stats: dict = {'Gain':None, 'Noise':None, 'Strong':None}
""" Receiver stats (if available). None values for keys if data is unavailable.
`receiver_stats` = {`Gain`: float, `Noise`: float (negative), `Strong`: percentage} """
# active plane stuff
relevant_planes: list = []
""" List of planes and associated stats found inside area of interest (refer to `main_loop_generator.dump1090_loop()` for keys) """
focus_plane: str = ""
""" Current plane in focus, selected by `AirplaneParser.plane_selector()`. Defaults to an empty string when no active plane is selected. """
focus_plane_stats: dict = {}
""" Extracted stats for `focus_plane` from `relevant_planes` """
focus_plane_iter: int = 0
""" Variable that increments per loop when `AirplaneParser` is active """
focus_plane_ids_scratch = set()
""" Scratchpad of currently tracked planes (all IDs in `relevant_planes` at current loop).
Elements can be removed if plane count > 1 due to selector algorithm """
focus_plane_ids_discard = set()
""" Scratchpad of previously tracked plane IDs during the duration of `AirplaneParser`'s execution """
last_plane_count: int = 0
""" Count of planes in `relevant_planes` from the previous loop """
plane_latch_times: list = [
int(30 // LOOP_INTERVAL),
int(20 // LOOP_INTERVAL),
int(15 // LOOP_INTERVAL),
]
""" Precomputed table of latch times (loops) for plane selection algorithm. [2 planes, 3 planes, 4+ planes] """
focus_plane_api_results = deque([None] * 25, maxlen=25)
""" Additional API-derived information for `focus_plane` and previously tracked planes from FlightAware API.
Valid keys are {`ID`, `Flight`, `Origin`, `Destination`, `Departure`} """
unique_planes_seen: list = []
""" List of nested dictionaries that tracks unique hex IDs of all plane flybys in a day.
Keys are {`ID`, `Time`} """
# display stuff
idle_data: dict = {'Flybys': "0", 'Track': "0", 'Range': "0"}
""" Formatted dict for our Display driver.
`idle_data` = {`Flybys`, `Track`, `Range`} """
idle_data_2: dict = {'SunriseSunset': "", 'ReceiverStats': ""}
""" Additional formatted dict for our Display driver.
`idle_data_2` = {`SunriseSunset`, `ReceiverStats`} """
active_data: dict = {}
""" Formatted dict for our Display driver.
`active_data` = {
`Callsign`, `Origin`, `Destination`, `FlightTime`,
`Altitude`, `Speed`, `Distance`, `Country`,
`Latitude`, `Longitude`, `Track`, `VertSpeed`, `RSSI`
} or {} """
active_plane_display: bool = False
""" Which scene to put on the display. False = clock/idle, True = active plane """
current_brightness: int = BRIGHTNESS
""" Commanded brightness level for the display; may be changed depending on settings """
# location stuff
rlat: float | None = None
""" Our location latitude """
rlon: float | None = None
""" Our location longitude """
sunset_sunrise: dict = {"Sunrise": None, "Sunset": None}
""" Sunrise and sunset times for our location in datetime format.
Updated every day at midnight via the scheduler. Defaults to None if times cannot be determined. """
CURRENT_IP = ""
""" IP address of device running this script """
# runtime stuff
process_time: list = [0,0,0,0]
""" For debug; [json parse, filter data, API response, format data] ms """
api_hits: list = [0,0,0,0]
""" [successful API returns, failed API returns, no data returned, cache hits] """
flyby_stats_present: bool = False
""" Flag to check if we can write to `FLYBY_STATS_FILE`, initialized to False """
dump1090_failures: int = 0
""" Track amount of times we fail to read dump1090 data. """
watchdog_triggers: int = 0
""" Track amount of times the watchdog is triggered. If this amount exceeds
a certain threshold, permanently disable watching dump1090 for this session. """
selection_events: int = 0
""" Track amount of times the plane selector is triggered. (this is just a verbose stat) """
# hashable objects for our cross-thread signaling
DATA_UPDATED: str = "updated-data"
PLANE_SELECTED: str = "plane-in-range"
DISPLAY_SWITCH: str = "reset-scene"
END_THREADS: str = "terminate"
KICK_DUMP1090_WATCHDOG: str = "kick-watchdog"
# define our units and multiplication factors (based on aeronautical units)
distance_unit: str = "nmi"
altitude_unit: str = "ft"
speed_unit: str = "kt"
distance_multiplier: float = 1
altitude_multiplier: float = 1
speed_multiplier: float = 1
if UNITS == 1: # metric
distance_unit = "km"
altitude_unit = "m"
speed_unit = "km/h"
distance_multiplier = 1.852
altitude_multiplier = 0.3048
speed_multiplier = 1.85184
main_logger.info("Using metric units (km, m, km/h)")
elif UNITS == 2: # imperial
distance_unit = "mi"
speed_unit = "mph"
distance_multiplier = 1.150779
speed_multiplier = 1.150783
main_logger.info("Using imperial units (mi, ft, mph)")
else:
main_logger.info("Using default aeronautical units (nmi, ft, kt)")
# =========== Program Setup I ==============
# =============( Utilities )================
def has_key(book, key):
return (key in book)
def sigterm_handler(signum, frame):
""" Shutdown worker threads and exit this program. """
signal.signal(signum, signal.SIG_IGN) # ignore additional signals
exit_time = datetime.datetime.now()
end_time = round(time.monotonic() - START_TIME, 3)
dispatcher.send(message='', signal=END_THREADS, sender=sigterm_handler)
os.write(sys.stdout.fileno(), str.encode(f"\n- Exit signal commanded at {exit_time}\n"))
os.write(sys.stdout.fileno(), str.encode(f" Script ran for {timedelta_clean(end_time)}\n"))
os.write(sys.stdout.fileno(), str.encode(f"Shutting down... "))
# write the above message to the log file
main_logger.info(f"- Exit signal commanded at {exit_time}")
main_logger.info(f" Script ran for {timedelta_clean(end_time)}")
flyby_stats()
os.write(sys.stdout.fileno(), b"Done.\n")
main_logger.info("FlightGazer is shutdown.")
sys.exit(0)
def register_signal_handler(loop, handler, signal, sender) -> None:
""" Thread communication enabler. """
def dispatcher_receive(message):
loop.call_soon_threadsafe(handler, message)
dispatcher.connect(dispatcher_receive, signal=signal, sender=sender, weak=False)
def schedule_thread() -> None:
""" Our schedule runner """
while True:
schedule.run_pending()
time.sleep(1)
def cls() -> None:
""" Clear the console when using a terminal """
os.system('cls' if os.name=='nt' else 'clear')
def timedelta_clean(timeinput: datetime) -> str:
""" Cleans up time deltas without the microseconds. """
delta_time = datetime.timedelta(seconds=timeinput)
return str(delta_time).split(".")[0]
def strfdelta(tdelta, fmt='{D:02}d {H:02}h {M:02}m {S:02}s', inputtype='timedelta') -> str:
"""Convert a datetime.timedelta object or a regular number to a custom-
formatted string, just like the stftime() method does for datetime.datetime
objects. Sourced from https://stackoverflow.com/a/42320260
The fmt argument allows custom formatting to be specified. Fields can
include seconds, minutes, hours, days, and weeks. Each field is optional.
Some examples:
'{D:02}d {H:02}h {M:02}m {S:02}s' --> '05d 08h 04m 02s' (default)
'{W}w {D}d {H}:{M:02}:{S:02}' --> '4w 5d 8:04:02'
'{D:2}d {H:2}:{M:02}:{S:02}' --> ' 5d 8:04:02'
'{H}h {S}s' --> '72h 800s'
The inputtype argument allows tdelta to be a regular number instead of the
default, which is a datetime.timedelta object. Valid inputtype strings:
's', 'seconds',
'm', 'minutes',
'h', 'hours',
'd', 'days',
'w', 'weeks'
"""
# Convert tdelta to integer seconds.
if inputtype == 'timedelta':
remainder = int(tdelta.total_seconds())
elif inputtype in ['s', 'seconds']:
remainder = int(tdelta)
elif inputtype in ['m', 'minutes']:
remainder = int(tdelta)*60
elif inputtype in ['h', 'hours']:
remainder = int(tdelta)*3600
elif inputtype in ['d', 'days']:
remainder = int(tdelta)*86400
elif inputtype in ['w', 'weeks']:
remainder = int(tdelta)*604800
f = Formatter()
desired_fields = [field_tuple[1] for field_tuple in f.parse(fmt)]
possible_fields = ('W', 'D', 'H', 'M', 'S')
constants = {'W': 604800, 'D': 86400, 'H': 3600, 'M': 60, 'S': 1}
values = {}
for field in possible_fields:
if field in desired_fields and field in constants:
values[field], remainder = divmod(remainder, constants[field])
return f.format(fmt, **values)
def reset_unique_tracks() -> None:
""" Resets the tracked planes set and daily accumulators (schedule this) """
time.sleep(2) # wait for hourly events to complete
global unique_planes_seen, api_hits, selection_events
main_logger.info(f"DAILY STATS: {len(unique_planes_seen)} flybys. {selection_events} selection events. \
{api_hits[0]}/{api_hits[0]+api_hits[1]} successful API calls, of which {api_hits[2]} returned no data.")
with threading.Lock():
unique_planes_seen.clear()
for i in range(len(api_hits)):
api_hits[i] = 0
selection_events = 0
return
def match_commandline(command_search: str, process_name: str) -> list:
""" Find all processes associated with a command line and process name that matches the given inputs.
Returns a list of dictionaries of matching processes.
Perfect for making sure only a single running instance of this script is allowed. """
list_of_processes = []
# iterate over all running processes
for proc in psutil.process_iter():
try:
pinfo = proc.as_dict(attrs=['pid', 'name', 'create_time'])
cmdline = proc.cmdline()
# check if process name contains the given string in its command line
if any(command_search in position for position in cmdline) and process_name in pinfo['name']:
list_of_processes.append(pinfo)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return list_of_processes
def get_ip() -> str:
''' Gets us our local IP. Modified from my other project `UNRAID_Status_Screen`.
Modifies the global `CURRENT_IP` '''
global CURRENT_IP
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
try:
s.connect(('10.254.254.254', 1)) # doesn't even need to connect
IP = s.getsockname()[0]
except Exception:
IP = ""
finally:
s.close()
CURRENT_IP = IP
# =========== Program Setup II =============
# ========( Initialization Tools )==========
def probe1090() -> tuple[str, str] | None:
""" Determines which json exists on the system. Returns `JSON1090_LOCATION` and its base `URL` """
locations = iter(
[CUSTOM_DUMP1090_LOCATION,
"http://localhost/tar1090",
"http://localhost/skyaware",
"http://localhost/dump1090-fa",
"http://localhost:8080",]
)
while True:
json_1090 = next(locations, "nothing")
if json_1090 == "nothing":
return None, None
try:
test1 = requests.get(json_1090 + '/data/aircraft.json', headers=USER_AGENT, timeout=0.5)
test1.raise_for_status()
return json_1090 + '/data/aircraft.json', json_1090
except:
pass
def probe978() -> str | None:
""" Check if dump978 exists and returns its `URL` or None if not found. """
locations = iter(
["http://localhost:8978",
CUSTOM_DUMP978_LOCATION]
)
while True:
json_978 = next(locations, "nothing")
if json_978 == "nothing": break
try:
test1 = requests.get(json_978 + '/data/aircraft.json', headers=USER_AGENT, timeout=0.5)
test1.raise_for_status()
main_logger.info(f"dump978 detected as well, at \'{json_978}\'")
return json_978 + '/data/aircraft.json'
except:
pass
return None
def dump1090_check() -> None:
""" Checks what dump1090 we have available upon startup. If we can't find it, just become a clock. """
global DUMP1090_JSON, URL, DUMP978_JSON, DUMP1090_IS_AVAILABLE
main_logger.info("Searching for dump1090...")
for wait in range(3):
tries = 3 - wait
DUMP1090_JSON, URL = probe1090()
if DUMP1090_JSON is not None:
main_logger.info(f"Found dump1090 at \'{DUMP1090_JSON}\'")
DUMP1090_IS_AVAILABLE = True
break
else:
main_logger.info(f"Could not find dump1090.json. dump1090 may not be loaded yet. Waiting 10 seconds and trying {tries} more time(s).")
time.sleep(10)
else: # try it again one last time
DUMP1090_JSON, URL = probe1090()
if DUMP1090_JSON is None:
DUMP1090_IS_AVAILABLE = False
if DISPLAY_IS_VALID:
main_logger.error("dump1090 not found. This will just be a cool-looking clock until this program is restarted.")
else:
main_logger.error("dump1090 not found. Additionally, screen resources are missing.")
main_logger.error(">>> This script may not be useful until these issues are corrected.")
DUMP978_JSON = probe978() # we don't wait for this one as it's usually not present
def read_1090_config() -> None:
""" Gets us our location, if it is configured in dump1090. """
global rlat, rlon, DISPLAY_SUNRISE_SUNSET
if not DUMP1090_IS_AVAILABLE: return
try:
req = Request(URL + '/data/receiver.json', data=None, headers=USER_AGENT)
with closing(urlopen(req, None, LOOP_INTERVAL * 0.75)) as receiver_file:
receiver = json.load(receiver_file)
with threading.Lock():
if has_key(receiver,'lat'): #if location is set
rlat_last = rlat
rlon_last = rlon
if receiver['lat'] != rlat_last or receiver['lon'] != rlon_last:
rlat = float(receiver['lat'])
rlon = float(receiver['lon'])
main_logger.info(f"Location updated.")
main_logger.debug(f">>> ({rlat}, {rlon})") # do not write to file unless verbose mode
else:
rlat = rlon = None
main_logger.warning("Location has not been set! This program will not be able to determine any nearby planes or calculate range!")
main_logger.warning(">>> Please set location in dump1090 to disable this message.")
if DISPLAY_SUNRISE_SUNSET:
main_logger.warning("Sunrise and sunset times will not be displayed.")
DISPLAY_SUNRISE_SUNSET = False
except:
main_logger.error("Cannot load receiver config.")
return
def probe_API() -> tuple[int, float] | None:
""" Checks if the provided API Key is valid, and if it is, pulls stats from the last 30 days.
This specific query doesn't use API credits according to the API reference. If the call fails, returns None. """
if API_KEY is None or not API_KEY: return None, None
if NOFILTER_MODE or ENHANCED_READOUT: return None, None
api_calls = 0
api_cost = 0
date_now = datetime.datetime.now()
time_delta_last_month = date_now - datetime.timedelta(days=30)
date_month_iso = time_delta_last_month.astimezone().replace(microsecond=0).isoformat()
auth_header = {'x-apikey':API_KEY, 'Accept':"application/json; charset=UTF-8"}
query_string = (API_URL
+ "account/usage"
+ "?start=" + urllib.parse.quote(date_month_iso)
)
try:
response = requests.get(query_string, headers=auth_header, timeout=10)
response.raise_for_status()
if response.status_code == 200:
response_json = response.json()
api_calls = response_json['total_calls']
api_cost = response_json['total_cost']
return api_calls, api_cost
else:
return None, None
except:
return None, None
def configuration_check() -> None:
""" Basic configuration checker """
global RANGE, HEIGHT_LIMIT, FLYBY_STATS_ENABLED, FLYBY_STALENESS, LOCATION_TIMEOUT
global API_KEY, API_DAILY_LIMIT
global BRIGHTNESS, BRIGHTNESS_2, ACTIVE_PLANE_DISPLAY_BRIGHTNESS
global RGB_COLS, RGB_ROWS
valid_rgb_sizes = [16, 32, 64]
if (not isinstance(RGB_ROWS, int) or not isinstance(RGB_ROWS, int)) or\
(RGB_ROWS not in valid_rgb_sizes or RGB_COLS not in valid_rgb_sizes):
main_logger.warning(f"Selected RGB dimensions is not a valid size.")
main_logger.info(f">>> Setting values to default. ({settings_values['RGB_ROWS']}x{settings_values['RGB_COLS']})")
RGB_ROWS = settings_values['RGB_ROWS']
RGB_COLS = settings_values['RGB_COLS']
if not isinstance(LOCATION_TIMEOUT, int) or\
(LOCATION_TIMEOUT < 15 or LOCATION_TIMEOUT > 60):
main_logger.warning(f"LOCATION TIMEOUT is out of bounds or not an integer.")
main_logger.info(f">>> Setting to default ({settings_values['LOCATION_TIMEOUT']})")
LOCATION_TIMEOUT = settings_values['LOCATION_TIMEOUT']
else:
if LOCATION_TIMEOUT == 60:
main_logger.info("Location timeout set to 60 seconds. This will match to dump1090's behavior.")
else:
main_logger.info(f"Location timeout set to {LOCATION_TIMEOUT} seconds.")
if not NOFILTER_MODE:
if not isinstance(RANGE, (int, float)):
main_logger.warning(f"RANGE is not a number. Setting to default value ({settings_values['RANGE']}).")
globals()['RANGE'] = settings_values['RANGE']
if not isinstance(HEIGHT_LIMIT, int):
main_logger.warning(f"HEIGHT_LIMIT is not an integer. Setting to default value ({settings_values['HEIGHT_LIMIT']}).")
globals()['HEIGHT_LIMIT'] = settings_values['HEIGHT_LIMIT']
# set hard limits for range
if RANGE > (20 * distance_multiplier):
main_logger.warning(f"Desired range ({RANGE}{distance_unit}) is out of bounds. Limiting to {20 * distance_multiplier}{distance_unit}.")
main_logger.info(">>> If you would like to see more planes, consider \'No Filter\' mode. Use the \'-f\' flag.")
RANGE = (20 * distance_multiplier)
elif RANGE < (0.2 * distance_multiplier):
main_logger.warning(f"Desired range ({RANGE}{distance_unit}) is too low. Limiting to {0.2 * distance_multiplier}{distance_unit}.")
RANGE = (0.2 * distance_multiplier)
height_warning = f"Warning: Desired height cutoff ({HEIGHT_LIMIT}{altitude_unit}) is"
if HEIGHT_LIMIT >= (275000 * altitude_multiplier):
main_logger.warning(f"{height_warning} beyond the theoretical limit for flight.")
main_logger.info(f">>> Setting to a reasonable value:{75000 * altitude_multiplier}{altitude_unit}")
HEIGHT_LIMIT = (75000 * altitude_multiplier)
elif HEIGHT_LIMIT > (75000 * altitude_multiplier) and HEIGHT_LIMIT < (275000 * altitude_multiplier):
main_logger.warning(f"{height_warning} beyond typical aviation flight levels.")
main_logger.info(f">>> Limiting to {75000 * altitude_multiplier}{altitude_unit}.")
HEIGHT_LIMIT = (75000 * altitude_multiplier)
elif HEIGHT_LIMIT <= 0:
main_logger.warning(f"{height_warning} ground level or underground.")
main_logger.warning("Planes won't be doing the thing planes do at that point (flying).")
main_logger.info(f">>> Setting to a reasonable value: {5000 * altitude_multiplier}{altitude_unit}.")
HEIGHT_LIMIT = (5000 * altitude_multiplier)
elif HEIGHT_LIMIT > 0 and HEIGHT_LIMIT < (200 * altitude_multiplier):
main_logger.warning(f"{height_warning} too low. Are planes landing on your house?")
main_logger.info(f">>> Setting to a reasonable value: {5000 * altitude_multiplier}{altitude_unit}.")
del height_warning
else:
RANGE = 10000
HEIGHT_LIMIT = 275000
if not isinstance(FLYBY_STALENESS, int) or (FLYBY_STALENESS < 1 or FLYBY_STALENESS >= 1440):
main_logger.warning(f"Desired flyby staleness is out of bounds.")
main_logger.info(f">>> Setting to default ({settings_values['FLYBY_STALENESS']})")
FLYBY_STALENESS = settings_values['FLYBY_STALENESS']
if not FLYBY_STATS_ENABLED:
main_logger.info("Flyby stats will not be written.")
if DISPLAY_SUNRISE_SUNSET and DISPLAY_RECEIVER_STATS:
main_logger.warning("Display option for sunrise and sunset times is enabled, however, receiver stats will be displayed instead.")
elif DISPLAY_SUNRISE_SUNSET and not DISPLAY_RECEIVER_STATS:
main_logger.info("Sunrise and sunset times will be displayed.")
elif not DISPLAY_SUNRISE_SUNSET and DISPLAY_RECEIVER_STATS:
main_logger.info("Receiver stats will be displayed.")
brightness_list = ["BRIGHTNESS", "BRIGHTNESS_2", "ACTIVE_PLANE_DISPLAY_BRIGHTNESS"]
for setting_entry in brightness_list:
try:
imported_value = globals()[f"{setting_entry}"] # get current imported setting value
if setting_entry == "ACTIVE_PLANE_DISPLAY_BRIGHTNESS" and imported_value is None:
continue
if not isinstance(imported_value, int) or (imported_value < 0 or imported_value > 100):
main_logger.warning(f"{setting_entry} is out of bounds or not an integer.")
main_logger.info(f">>> Using default value ({settings_values[setting_entry]}).")
globals()[f"{setting_entry}"] = settings_values[setting_entry]
except KeyError:
pass
main_logger.info("Settings check complete.")
# check the API config
main_logger.info("Checking API settings...")
if not isinstance(API_KEY, str):
main_logger.warning("API key is invalid.")
API_KEY = ""
if (API_KEY and (
API_DAILY_LIMIT is not None
and not isinstance(API_DAILY_LIMIT, int)
)) or (
isinstance(API_DAILY_LIMIT, int)
and API_DAILY_LIMIT < 0):
main_logger.warning("API_DAILY_LIMIT is invalid. Refusing to use API to prevent accidental overcharges.")
API_DAILY_LIMIT = None
API_KEY = ""
if API_KEY is not None and API_KEY:
api_use = None
api_cost = None
api_use, api_cost = probe_API()
if api_use is None:
main_logger.warning("Provided API Key failed to return a valid response.")
API_KEY = ""
else:
main_logger.info(f"API Key \'***{API_KEY[-5:]}\' is valid.")
main_logger.info(f">>> Stats from the past 30 days: {api_use} total calls, costing ${api_cost}.")
if API_KEY is not None and API_KEY: # test again
if ENHANCED_READOUT:
main_logger.info("ENHANCED_READOUT setting is enabled. API will not be used.")
else:
if API_DAILY_LIMIT is None:
main_logger.info("No limit set for API calls.")
else:
main_logger.info(f"Limiting API calls to {API_DAILY_LIMIT} per day.")
else:
if not ENHANCED_READOUT:
main_logger.info("API will not be used. Additional airplane info will not be available.")
if DISPLAY_IS_VALID:
main_logger.info(">>> Setting ENHANCED_READOUT to \'true\' is recommended.")
elif ENHANCED_READOUT and DISPLAY_IS_VALID:
main_logger.info("API will not be used. Instead, additional dump1090 info will be substituted.")
main_logger.info("API check complete.")
def read_receiver_stats() -> None:
""" Poll receiver stats from dump1090. Writes to `receiver_stats`.
Needs to run on its own thread as its timing does not depend on `LOOP_INTERVAL`. """
if not DUMP1090_IS_AVAILABLE: return
global receiver_stats
while True:
gain_now = None
noise_now = None
loud_percentage = None
try:
req = Request(URL + '/data/stats.json', data=None, headers=USER_AGENT)
with closing(urlopen(req, None, 5)) as stats_file:
stats = json.load(stats_file)
if has_key(stats, 'last1min'):
try:
noise_now = stats['last1min']['local']['noise']
except KeyError:
noise_now = None
try:
messages1min = stats['last1min']['messages']
loud_messages = stats['last1min']['local']['strong_signals']
if messages1min == 0:
loud_percentage = 0
else:
loud_percentage = (loud_messages / messages1min) * 100
except KeyError:
loud_percentage = None
else:
noise_now = None
loud_percentage = None
if has_key(stats, 'gain_db'):
gain_now = stats['gain_db']
else:
gain_now = None
except:
pass
with threading.Lock():
receiver_stats['Gain'] = gain_now
receiver_stats['Noise'] = noise_now
receiver_stats['Strong'] = loud_percentage
time.sleep(5) # don't need to poll too often
def suntimes() -> None:
""" Update sunrise and sunset times """
global sunset_sunrise
if rlat is not None and rlon is not None:
sun = Sun(rlat, rlon)
time_now = datetime.datetime.now().astimezone()
try:
sunset_sunrise['Sunrise'] = sun.get_sunrise_time(time_now).astimezone()
sunset_sunrise['Sunset'] = sun.get_sunset_time(time_now).astimezone()
except SunTimeException:
sunset_sunrise['Sunrise'] = None
sunset_sunrise['Sunset'] = None
else:
sunset_sunrise['Sunrise'] = None
sunset_sunrise['Sunset'] = None
main_logger.debug(f"Sunrise: {sunset_sunrise['Sunrise']}, Sunset: {sunset_sunrise['Sunset']}")
# =========== Program Setup III ============
# ===========( Core Functions )=============
def flyby_tracker(input_ID: str) -> None:
""" Adds given plane ID to `unique_planes_seen` list. """
global unique_planes_seen
def add_entry() -> None:
with threading.Lock():
unique_planes_seen.append(
{"ID": input_ID,
"Time": time.monotonic()
}
)
entry_count = len(unique_planes_seen)
# limit search to the following amount for speed reasons (<0.5ms);
# it's assumed that if a previously seen plane appears again and
# there have already been these many flybys, it's already stale
limit_count = 500
stale_age = FLYBY_STALENESS * 60 # seconds
if entry_count > limit_count: entry_count = limit_count
# special case when there aren't any entries yet
if len(unique_planes_seen) == 0:
add_entry()
return
for a in range(entry_count):
# search backwards through list
if unique_planes_seen[-a-1]['ID'] == input_ID:
if unique_planes_seen[-a-1]['Time'] - time.monotonic() > stale_age:
add_entry()
return
else: # if we recently have seen this plane
return
else: # finally, if we don't find the entry, add a new one
add_entry()
return
def flyby_stats() -> None:
"""
If `FLYBY_STATS_ENABLED` is true, write the gathered stats from our flybys to a csv file.
When this is run for the first time, it will check if `FLYBY_STATS_FILE` exists and sets appropriate flags.
If `FLYBY_STATS_FILE` is valid, subsequent calls to this function will append data to the end of it.
This function assumes it will be called hourly to keep track of stats thoughout the day.
Written values are accumulative and are reset at midnight.
"""
global flyby_stats_present, FLYBY_STATS_ENABLED
if not FLYBY_STATS_ENABLED:
return
header = "Date,Number of flybys,API calls (successful),API calls (failed),API calls (empty)\n"
if FLYBY_STATS_FILE.is_file() and not flyby_stats_present:
with open(FLYBY_STATS_FILE, 'r') as stats: # check if the file has a valid header
head = next(stats)
if head == header:
flyby_stats_present = True
main_logger.info(f"Flyby stats file \'{FLYBY_STATS_FILE}\' is present.")
else:
main_logger.warning(f"Header in \'{FLYBY_STATS_FILE}\' is incorrect or has been modifed. Stats will not be saved.")
FLYBY_STATS_ENABLED = False
# load in last line of stats file, check if today's the current date, and re-populate our running stats
if flyby_stats_present:
with open(FLYBY_STATS_FILE, 'rb') as f:
try: # catch OSError in case of a one line file
f.seek(-2, os.SEEK_END)
while f.read(1) != b'\n':
f.seek(-2, os.SEEK_CUR)
except OSError:
f.seek(0)
last_line = f.readline().decode()
date_now_str = datetime.datetime.now().strftime('%Y-%m-%d')
last_date = (last_line.split(",")[0]).split(" ")[0] # splitting strftime('%Y-%m-%d %H:%M')
if date_now_str == last_date:
global api_hits, unique_planes_seen
planes_seen = int(last_line.split(",")[1])
api_hits[0] = int(last_line.split(",")[2])
api_hits[1] = int(last_line.split(",")[3])
api_hits[2] = int(last_line.split(",")[4])
for i in range(planes_seen): # fill the set with filler values, we don't recall the last contents of `unique_planes_seen`
unique_planes_seen.append(
{"ID":i+1,
"Time":time.monotonic(),
}
)
main_logger.info(f"Successfully reloaded last written data for {date_now_str}. Flybys: {planes_seen}.")
return
elif not FLYBY_STATS_FILE.is_file():
try:
if os.name=='posix':
os.mknod(FLYBY_STATS_FILE)
os.chmod(FLYBY_STATS_FILE, 0o777)
with open(FLYBY_STATS_FILE, 'w') as stats:
stats.write(header)
main_logger.info(f"No Flyby stats file was found. A new flyby stats file \'{FLYBY_STATS_FILE}\' was created.")
flyby_stats_present = True
return