-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpytanker.py
executable file
·276 lines (225 loc) · 11.4 KB
/
pytanker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
#!/usr/bin/env python3.10
# vim: lw=-c\ pytanker.yaml
import sys
import argparse
import logging
import logging.handlers
import yaml
import asyncio
from yaml.composer import Composer
from yaml.constructor import SafeConstructor
from yaml.parser import Parser
from yaml.reader import Reader
from yaml.resolver import BaseResolver
from yaml.scanner import Scanner
import re
import time
import os
import json
from datetime import datetime, timedelta
_version_ = "0.0.1"
_author_ = "Artem Illarionov <[email protected]>"
def checkcond_time(condition: dict) -> bool:
#TODO: check if start is preore stop, duration < 1d + call check_cnd_time function and check for exceptions
now = datetime.now()
def srtstp2tddt(timestr):
match timestr.count(':'):
case 1:
return(datetime.combine(now.date(), datetime.strptime(timestr, "%H:%M").time()))
case 2:
return(datetime.combine(now.date(), datetime.strptime(timestr, "%H:%M:%S").time()))
raise ValueError
if 'stop' in condition and now > srtstp2tddt(condition['stop']): # stop time is set and we already passed it
return(False)
start = srtstp2tddt(condition['start'])
if 'duration' in condition:
duration = condition['duration'].lower()
hours = minutes = seconds = 0
if 'h' in duration:
hours, duration = duration.split('h')
if 'm' in duration:
minutes, duration = duration.split('m')
if 's' in duration:
seconds, duration = duration.split('s')
duration = timedelta(hours=int(hours), minutes=int(minutes), seconds=int(seconds))
if (start + duration).day <= now.day: # check if job ends today
if now > start + duration: # check if we already passed end time
return(False)
else: # job will end tomorrow
if start + duration - timedelta(days=1) < now < start: # check if we already passed the remainig part of the end time
return(False) # or did't reached start time yet
else: # we are still withing the remainig part of the end time
return(True) # return True now, as we are still withing the remaining part
if now < start: # did not reached start time yet
return(False)
return(True)
def checkcond_state(condition: str) -> bool:
return(True)
def checkcond_power(condition: str) -> bool:
return(True)
def checkcond(condition: str) -> bool:
match condition['type']:
case 'time':
return(checkcond_time(condition))
case 'state':
return(checkcond_state(condition))
case 'power':
return(checkcond_power(condition))
# unknown -> inactive -> scheduled -> pending -> active
async def task_loop(jobs: dict, statedb: dict):
log = logging.getLogger("__main__")
log.info("Entering job event loop..")
while True:
nextrun_uts = int(time.time()) + 1 # Save round second for the next cycle to be run
state_update = False
for job in jobs:
for state in jobs[job]['states']:
if state['name'] == 'default': # Skip default state
continue
status = 'active'
for condition in state['conditions']: # Cycle through all conditions for the current state
if not checkcond(condition): # Check if current condition failed
status = 'inactive'
break # Stop checking conditions on first failure
if statedb[job][state['name']] != status:
if status == 'active':
if statedb[job][state['name']] in ['scheduled', 'pending']:
break
else:
status = 'scheduled'
log.debug(f"Chaging {job} state '{state['name']}': {statedb[job][state['name']]} -> {status}")
statedb[job][state['name']] = status
state_update = True
# Check if default state is present and should be activated for current job
if 'default' in statedb[job]:
default = True
for name in statedb[job]:
if name != 'default' and statedb[job][name] in ['scheduled', 'pending', 'active']:
default = False
break
if default:
if statedb[job]['default'] not in ['scheduled', 'pending' 'active']:
log.debug(f"Chaging {job} state 'default': {statedb[job]['default']} -> scheduled")
statedb[job]['default'] = 'scheduled'
else:
if statedb[job]['default'] in ['scheduled', 'pending' 'active']:
log.debug(f"Chaging {job} state 'default': {statedb[job]['default']} -> inactive")
statedb[job]['default'] = 'inactive'
# print(json.dumps(statedb, indent=2, sort_keys=True))
if state_update:
log.debug("State update is scheduled")
if not state_update:
await asyncio.sleep(nextrun_uts - time.time()) # Wait if no state updates scheduled or till upcoming second
async def state_loop():
log = logging.getLogger("__main__")
log.info("Entering state event loop..")
while True:
await asyncio.sleep(0.5)
async def main_loop(jobs: dict, statedb: dict):
await asyncio.gather(task_loop(jobs, statedb), state_loop())
def main():
parser = argparse.ArgumentParser(add_help=True, description="Aquarium scheduler and queue manager daemon.")
parser.add_argument('-c', nargs='?', required=True, metavar='file', help="Scheduler configuration file in YAML format", dest='config')
#TODO: Реализовать опцию проверки конфигурации
parser.add_argument('-t', nargs='?', metavar='test', help="Test devices and jobs according to specified configuration", dest='test')
args = parser.parse_args()
"""Load configuration from YAML"""
try:
with open(args.config) as f:
config = yaml.safe_load(f)
except OSError as e:
sys.exit(f"pytanker: (C) Failed to load config: {e.strerror} : '{e.filename}'")
except yaml.YAMLError as e:
sys.exit(f"pytanker: (C) Failed to parse config: {e}")
"""Setup logging"""
def setLogDestination(dst):
match dst:
case 'console':
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(fmt='%(asctime)s.%(msecs)03d pytanker: (%(levelname).1s) %(message)s', datefmt="%H:%M:%S"))
case 'syslog':
handler = logging.handlers.SysLogHandler(facility=logging.handlers.SysLogHandler.LOG_DAEMON, address = '/dev/log')
handler.setFormatter(logging.Formatter(fmt='pytanker[%(process)d]: (%(levelname).1s) %(message)s'))
case _:
raise ValueError
log.handlers.clear()
log.addHandler(handler)
# Configure default logger
log = logging.getLogger(__name__)
setLogDestination('syslog')
log.setLevel(logging.INFO)
try:
setLogDestination(config['log']['destination'].lower())
except KeyError:
log.error("Failed to configure log: Destination is undefined. Failing over to syslog.")
except ValueError:
log.error(f"Failed to configure log: Unknown destination: '{config['log']['destination']}'. Failing over to syslog.")
try:
log.setLevel(config['log']['level'].upper())
except KeyError:
log.error("Failed to configure log: Log level is undefined. Failing over to info.")
except ValueError:
log.error(f"Failed to configure log: Unknown level: '{config['log']['level']}'. Failing over to info.")
log.info(f"Starting pytanker v{_version_} ..")
log.debug(f"Log level set to: {logging.getLevelName(log.level)}")
"""Configure custom resolver to treat various true/false string combinations as booleans"""
class CustomResolver(BaseResolver):
pass
CustomResolver.add_implicit_resolver(
u'tag:yaml.org,2002:bool',
re.compile(u'''^(?:true|True|TRUE|false|False|FALSE)$''', re.X),
list(u'tTfF'))
class CustomLoader(Reader, Scanner, Parser, Composer, SafeConstructor, CustomResolver):
def __init__(self, stream):
Reader.__init__(self, stream)
Scanner.__init__(self)
Parser.__init__(self)
Composer.__init__(self)
SafeConstructor.__init__(self)
CustomResolver.__init__(self)
"""Load devices"""
devices = {}
for entry in os.scandir(config['devices']):
if entry.is_file() and (entry.name.endswith(".yaml") or entry.name.endswith(".yml")):
with open(entry.path) as f:
newdyaml = yaml.load(f, Loader=CustomLoader)
for newdev in newdyaml:
if newdev not in devices: # TODO: should be moved to pre check procedure
devices = {**devices, newdev: newdyaml[newdev]}
else:
log.error(f"Peripheral device: '{newdev}' already exist")
if not devices:
log.critical("No peripheral devices found, unable to continue")
sys.exit(1)
log.info(f"Found {len(devices)} peripheral device(s)")
"""Load jobs"""
jobs = {}
for entry in os.scandir(config['jobs']):
if entry.is_file() and (entry.name.endswith(".yaml") or entry.name.endswith(".yml")):
with open(entry.path) as f:
newjyaml = yaml.load(f, Loader=CustomLoader)
for newjob in newjyaml:
if newjob not in jobs: # TODO: should be moved to pre check procedure
jobs = {**jobs, newjob: newjyaml[newjob]}
else:
log.error(f"Job: '{newjob}' already exist")
if not jobs:
log.critical("No jobs found, unable to continue")
sys.exit(1)
log.info(f"Found {len(jobs)} job(s)")
"""Generate an empty state DB from all job states"""
statedb = {}
for job in jobs:
statedb[job] = {}
for state in jobs[job]['states']:
statedb[job][state['name']] = 'unknown'
if not statedb:
log.critical("Failed to generate state DB, unable to continue")
sys.exit(1)
log.info(f"Generated state DB for {len(statedb)} jobs")
# print(json.dumps(statedb, indent=2, sort_keys=True))
asyncio.run(main_loop(jobs, statedb))
log.info(f"Shutting down pytanker v{_version_} ..")
logging.shutdown()
if __name__ == "__main__":
main()