-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbb_frontend.py
143 lines (113 loc) · 5.24 KB
/
bb_frontend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import bluebox, fec
import config, parser, tracker, ircreporter
import threading
import argparse
import time
import binascii
from datetime import datetime
class bb_frontend(threading.Thread):
LOGFILE = "log.txt"
def __init__(self, qth=None, config_file=None, enable_tracking=False, enable_auth=False):
c = config.Config(config_file)
self.qth = qth
self.config = c.get_config()
self.center_freq = self.config['radio_settings']['frequency']
self.enable_tracking = enable_tracking
self.bb_lock = threading.Lock()
with self.bb_lock:
self.bluebox = bluebox.Bluebox()
self.parser = parser.Parser()
if self.enable_tracking:
self.tle = '\n'.join(self.config['tle'])
self.tracker = tracker.Tracker(self.qth, self.tle, self.center_freq)
self.enable_auth = enable_auth
if self.enable_auth:
self.irc_reporter = ircreporter.IRCReporter()
self.update(self.config)
if not config_file:
c.add_observer(self)
def update(self, config):
settings = config['radio_settings']
self.center_freq = settings['frequency']
self.tracker.set_center_frequncy(self.center_freq)
with self.bb_lock:
self.bluebox.set_frequency(self.center_freq)
time.sleep(0.01)
self.bluebox.set_modindex(settings['modindex'])
time.sleep(0.01)
self.bluebox.set_bitrate(settings['bitrate'])
time.sleep(0.01)
self.bluebox.set_power(settings['power'])
time.sleep(0.01)
self.bluebox.set_training(settings['training'])
time.sleep(0.01)
self.bluebox.set_syncword(int(settings['syncword'], 16))
time.sleep(0.01)
def run(self):
if self.enable_tracking:
self.__run_pass__()
else:
self.__receive_mode__()
def receive(self):
with self.bb_lock:
data, rssi, freq = self.bluebox.receive(5000)
if data:
print("\n" + "#="*40 + "#\n")
print("Received packet {}".format(datetime.now().isoformat(' ')))
if self.enable_auth:
hex_str = binascii.b2a_hex(data)
self.irc_reporter.send("VERIFY,1,%s" % hex_str[0:len(hex_str)/2])
self.irc_reporter.send("VERIFY,2,%s" % hex_str[len(hex_str)/2:])
# Parse data
try:
beacon_str = self.parser.parse_data(data)
except Exception as e:
print e
else:
return beacon_str
def __run_pass__(self):
while True:
while self.tracker.in_range():
freq = self.config['radio_settings']['frequency'] + self.tracker.get_doppler()
print "Doppler freq:", freq
self.bluebox.set_frequency(freq)
beacon = self.receive()
if beacon:
print beacon
next_pass, duration, max_elvation = self.tracker.next_pass()
print """Next pass: {0}
Duration: {1:.0f} minutes
Max elevation: {2:.2f} degrees""".format(datetime.fromtimestamp(next_pass), duration / 60, max_elvation)
time.sleep(next_pass-time.time())
def __receive_mode__(self):
while True:
beacon = self.receive()
if beacon:
print beacon
if __name__ == '__main__':
args_parser = argparse.ArgumentParser(description='AAUSAT4 Bluebox based Beacon Parser')
args_parser.add_argument('--lat', dest='lat', required=False, type=float, default=None,
help='Latitude of ground station (N), e.g. 55.6167')
args_parser.add_argument('--lon', dest='lon', required=False, type=float, default=None,
help='Longitude of ground station (W), e.g. -12.6500')
args_parser.add_argument('--alt', dest='alt', required=False, type=float, default=None,
help='Altitude of ground station (meters), e.g. 10')
args_parser.add_argument('--disable-tracking', dest='disable_tracking',
action='store_true',
required=False, default=False,
help='Disables doppler correction and pass planning.')
args_parser.add_argument('--enable-verification', dest='enable_verification',
action='store_true',
required=False,
help='Enables automatic reporting of received packets.')
args_parser.add_argument('--config-file', dest='config_file', required=False, type=str, default=None,
help='Use a confguration file from the local disk instead of the one provided by AAUSAT (on github).')
args = args_parser.parse_args()
if args.disable_tracking:
qth = None
else:
qth = (args.lat, args.lon, args.alt)
if None in qth:
raise Exception("latitude longitude and altitude arguments are required for tracking")
bb = bb_frontend(qth, args.config_file, not args.disable_tracking, args.enable_verification)
bb.run()