-
Notifications
You must be signed in to change notification settings - Fork 2
/
system_stats.py
executable file
·689 lines (590 loc) · 29.8 KB
/
system_stats.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# vim: set expandtab tabstop=4 shiftwidth=4 :
# system_stats.py - Module that implements all of the system statistic
# collection and reporting parts of system_bot.py.
# By: The Doctor <drwho at virtadpt dot net>
# 0x807B17C1 / 7960 1CDC 85C9 0B63 8D9F DD89 3BD8 FF2B 807B 17C1
# License: GPLv3
# v4.5 - Changed how free and memory are calculated because it wasn't really
# cross-platform. In other words, I finally have a work laptop that
# I can mess around with a little.
# - Tore out the OpenWRT stuff because System Script now exists.
# - Rounded off the system load values so they're easier to read.
# - Rounded off some of the temperature values returned. I did
# Fahrenheit, but forgot Centigrade.
# - Made temperature checking conditional.
# - Made deletion of the loopback interface conditional. Also added OSX's lo0.
# v4.4 - Added some code to skip any file system mounts specified in the config
# file.
# v4.3 - Fixed a bug in Fahrenheit to Centigrade conversion. Oops.
# - Added a utility function get_disk_space(), which returns the amount
# of disk space used by a mount point.
# - Fixed a bug in which disk space usage alerts didn't get rounded off.
# v4.2 - Added support for getting the local date and time.
# - Added OpenWRT support for local date and time.
# v4.1 - Added support for OpenWRT with a separate module.
# v4.0 - Ported to Python 3.
# v3.3 - Added a Centigrade-to-Fahrenheit utility function.
# - Added a function that periodically checks the current temperature
# of every sensor on the system and sends an alert to the user if
# the temperature reaches what the driver considers a dangerous or
# critical point.
# - Renamed disk_usage() to get_disk_usage() because it conflicted
# with a variable name elsewhere.
# - Made check_memory_utilization() configurable.
# v3.2 - Changed "disk free" to "disk used," so it's more like the output of
# `df`.
# v3.1 - Added function to get the local IP address of the host.
# v3.0 - Added real statistics support.
# v2.2 - Added function to get public IP address of host.
# - Added function that gets network traffic stats.
# v2.1 - Added system uptime.
# v2.0 - Refactoring code to split it out into separate modules.
# v1.0 - Initial release.
# TO-DO:
# - Optimize the temperature monitoring loop for the general case.
# Load modules.
import logging
import math
import os
import psutil
import requests
import statistics
import sys
import time
from datetime import timedelta
import globals
# Variables global to this module.
# Running lists of system averages.
one_minute_average = []
five_minute_average = []
fifteen_minute_average = []
# Running lists of device temperatures in case the drivers don't have a sense
# of high or critical temperatures.
device_temperatures = {}
# Functions.
# sysload(): Function that takes a snapshot of the current system load
# averages. Takes no arguments. Returns system loads as a hash table.
def sysload():
sysload = {}
system_load = os.getloadavg()
sysload["one_minute"] = round(system_load[0], 2)
sysload["five_minute"] = round(system_load[1], 2)
sysload["fifteen_minute"] = round(system_load[2], 2)
return sysload
# check_sysload: Function that pulls the current system load and tests the
# load averages to see if they're too high. Takes seven arguments, the
# sysload counter, the time between alerts, the value of status_polling, the
# number of standard deviations to calculate, minimum and maximum system
# stat queue lengths, and the name of a function to send a message with.
# Sends a message to the user, returns an updated
# value for sysload_counter.
def check_sysload(sysload_counter, time_between_alerts, status_polling,
std_devs, sys_avg_min_len, sys_avg_max_len, send_message_to_user):
message = ""
std_dev = 0.0
current_load_avg = sysload()
logging.debug("Value of sys_avg_min_len: " + str(sys_avg_min_len))
logging.debug("Value of sys_avg_max_len: " + str(sys_avg_max_len))
logging.debug("Current system load averages: " + str(current_load_avg))
# Copy the load averages into the appropriate running lists.
one_minute_average.append(current_load_avg["one_minute"])
five_minute_average.append(current_load_avg["five_minute"])
fifteen_minute_average.append(current_load_avg["fifteen_minute"])
logging.debug("Length of one_minute_average: " + str(len(one_minute_average)))
logging.debug("Length of five_minute_average: " + str(len(five_minute_average)))
logging.debug("Length of fifteen_minute_average: " + str(len(fifteen_minute_average)))
# Pop the oldest values out of the lists to keep them at a manageable size.
if len(one_minute_average) >= int(sys_avg_max_len):
logging.debug("Removing oldest system load values.")
one_minute_average.pop(0)
five_minute_average.pop(0)
fifteen_minute_average.pop(0)
# To calculate the standard deviation of a group of values, there need to
# be several available. Make sure this is the case.
if len(one_minute_average) < int(sys_avg_min_len):
logging.debug("Need more than " + str(sys_avg_min_len) + " samples of system load. Waiting.")
return sysload_counter
# Calculate the standard deviations of the three system loads and send an
# alert if there's been a huge spike.
std_dev = statistics.stdev(one_minute_average)
logging.debug("Standard deviation of one minute system load: " + str(std_dev))
if std_dev > float(std_devs):
message = message + "WARNING: The current system load has spiked to " + str(current_load_avg["one_minute"]) + ".\n"
std_dev = statistics.stdev(five_minute_average)
logging.debug("Standard deviation of five minute system load: " + str(std_dev))
if std_dev > float(std_devs):
message = message + "WARNING: The five minute system load has spiked to " + str(current_load_avg["five_minute"]) + ". What could be running that's doing this?\n"
std_dev = statistics.stdev(fifteen_minute_average)
logging.debug("Standard deviation of fifteen minute system load: " + str(std_dev))
if std_dev > float(std_devs):
message = message + "WARNING: The fifteen minute system load has spiked to " + str(current_load_avg["fifteen_minute"]) + ". I think something's dreadfully wrong.\n"
# If a message has been constructed, check to see if it's been longer than
# the last time a message was sent. If so, send it and reset the counter.
if message:
# If time_between_alerts is zero, alerting has been disabled so just
# return.
if time_between_alerts == 0:
logging.debug("System load alerting disabled.")
return 0
if sysload_counter >= time_between_alerts:
send_message_to_user(message)
return 0
# If enough time between alerts hasn't passed yet, just increment the
# counter.
sysload_counter = sysload_counter + status_polling
return sysload_counter
# uname(): Function that calls os.uname(), extracts a few things. This should
# only be called upon request by the user, or maybe when the bot starts up.
# There's no sense in having it run every time it loops. Takes no arguments.
# Returns a hash table containing the information.
def uname():
system_info = {}
sysinfo = os.uname()
system_info["hostname"] = sysinfo[1]
system_info["version"] = sysinfo[2]
system_info["buildinfo"] = sysinfo[3]
system_info["arch"] = sysinfo[4]
return system_info
# cpus(): Takes no arguments. Returns the number of CPUs on the system.
def cpus():
return psutil.cpu_count()
# cpu_idle_time(): Takes no arguments. Returns the percentage of runtime the
# CPUs are idle as a floating point number.
def cpu_idle_time():
return psutil.cpu_times_percent()[3]
# check_cpu_idle_time(): Monitors the amount of time the CPU(s) are idle.
# Takes four arguments: the CPU idle time counter, the time between alerts,
# the value of status_polling, and the name of a function to send a message
# with. Sends an alert to the bot's owner if the CPU idle time is too low.
# Returns an updated value for cpu_idle_time_counter.
def check_cpu_idle_time(cpu_idle_time_counter, time_between_alerts,
status_polling, send_message_to_user):
message = ""
idle_time = cpu_idle_time()
# Check the percentage of CPU idle time and construct a message for the
# bot's owner if it's too low.
if idle_time < 15.0:
message = "WARNING: The current CPU idle time is sitting at " + str(idle_time) + ". What's keeping it so busy?"
# If a message has been built, check to see if enough time in between
# messages has passed. If so, send the message.
if message:
# If time_between_alerts is zero, alerting has been disabled so just
# return.
if time_between_alerts == 0:
logging.debug("CPU idle time alerting disabled.")
return 0
if cpu_idle_time_counter >= time_between_alerts:
send_message_to_user(message)
return 0
# If not enough time has passed yet, just increment the counter.
cpu_idle_time_counter = cpu_idle_time_counter + status_polling
return cpu_idle_time_counter
# get_disk_usage(): Takes no arguments. Returns a hash table containing the
# disk device name as the key and percentage used as the value.
def get_disk_usage():
disk_used = {}
disk_partitions = None
disk_device = None
max = 0.0
used = 0.0
# Prime the hash with the names of the mounted disk partitions.
disk_partitions = psutil.disk_partitions()
for i in disk_partitions:
disk_used[i.mountpoint] = ""
# Now delete every mountpoint that matches the ignore list from the hash
# table. We put this inside an if-conditional because it may not be
# configured.
if globals.ignored_mountpoints:
for i in globals.ignored_mountpoints:
logging.debug("Looking for ignored mountpoint %s in disk mounts." % i)
# We iterate through the hash table as a list here because
# otherwise we might change the hash table as we're iterating over
# it, and that throws a runtime error. By looking at a list we
# can edit the original hash without trouble.
for j in list(disk_used):
if i in j:
logging.debug("Mountpoint to ignore %s matched %s. Deleting." % (i, j))
del disk_used[j]
# Calculate the maximum and free bytes of each disk device.
for i in list(disk_used.keys()):
try:
disk_used[i] = psutil.disk_usage(i).percent
except:
# Docker causes this to not work with permissions problems.
logging.debug("Skipping disk device " + i + " due to restrictive permissions.")
return disk_used
# get_disk_space(): Takes a string corresponding to a mountpoint ("/home").
# Looks up the total amount of disk space, the amount of disk space used,
# and the amount of disk space free. Returns those values as a hash table
# or None if it wasn't able to.
def get_disk_space(device):
logging.debug("Entered system_stats.get_disk_space().")
disk_space = {}
disk_stats = None
try:
disk_stats = psutil.disk_usage(device)
disk_space["total"] = disk_stats.total
disk_space["used"] = disk_stats.used
disk_space["free"] = disk_stats.free
except:
logging.debug("Unable to get disk stats for " + str(device) + ".")
return None
logging.debug("Value of disk_space: " + str(disk_space))
return disk_space
# check_disk_usage(): Pull the amount of used storage for each disk device on
# the system and send the bot's owner an alert if one of the disks gets too
# full. Takes as arguments the values of disk_usage_counter,
# time_between_alerts, status polling, the value of disk_usage, and the name
# of a function to send messages with. Returns an updated value for
# disk_usage_counter.
def check_disk_usage(disk_usage_counter, time_between_alerts, status_polling,
disk_usage, send_message_to_user):
message = ""
disk_space_free = get_disk_usage()
# Check the amount of space free on each disk device. For each disk that's
# running low on space construct a line of the message.
for disk in list(disk_space_free.keys()):
if not disk_space_free[disk]:
logging.debug("disk_space_free[disk] value " + str(disk_space_free[disk]) + " isn't usable. Forget it.")
continue
try:
if disk_space_free[disk] > disk_usage:
message = message + "WARNING: Disk device " + disk + " has " + str(round(100.0 - disk_space_free[disk], 2)) + "% of its capacity left.\n"
except:
message = message + "NOTICE: Storage usage monitoring is not yet implemented for storage device " + disk + ".\n"
# If a message has been constructed, check how much time has passed since
# the last message was sent. If enough time has, sent the bot's owner
# the message.
if message:
# If time_between_alerts is zero, alerting has been disabled so just
# return.
if time_between_alerts == 0:
logging.debug("Disk usage alerting disabled.")
return 0
if disk_usage_counter >= time_between_alerts:
send_message_to_user(message)
return 0
# Not enough time has passed. Increment the counter and move on.
disk_usage_counter = disk_usage_counter + status_polling
return disk_usage_counter
# memory_utilization(): Function that returns a snapshot of memory
# utilization. Takes no arguments.
def memory_utilization():
return psutil.virtual_memory()
# check_memory_utilization(): Function that checks how much memory is free on
# the system and alerts the bot's owner if it's below a certain amount.
# Takes five arguments, the current values of memory_free_counter and
# time_between_alerts, the value of status_polling, the value of
# memory_remaining, and the name of a function to send messages with.
# Returns an updated value for memory_free_counter.
def check_memory_utilization(memory_free_counter, time_between_alerts,
status_polling, memory_remaining, send_message_to_user):
message = ""
memory_stats = memory_utilization()
calculated_free_memory = memory_stats.available
logging.debug("Calculated free memory: %s" % convert_bytes(calculated_free_memory))
# Check the amount of memory free. If it's below a critical threshold
# construct a message for the bot's owner. It's formatted this way for
# clarity later. Rounded off to two decimal places.
calculated_free_memory = (calculated_free_memory / memory_stats.total)
calculated_free_memory = round(calculated_free_memory * 100.0, 2)
logging.debug("Percentage of free memory: %s" % str(calculated_free_memory))
if calculated_free_memory <= memory_remaining:
message = "WARNING: The amount of free memory has reached the critical point of " + str(calculated_free_memory) + "% free. You'll want to see to this before the OOM killer starts reaping processes."
# If a message has been constructed, check how much time has passed since
# the last message was sent. If enough time has, send the bot's owner the
# message.
if message:
# If time_between_alerts is zero, alerting has been disabled so just
# return.
if time_between_alerts == 0:
logging.debug("Memory utilization alerting disabled.")
return 0
if memory_free_counter >= time_between_alerts:
send_message_to_user(message)
return 0
# Not enough time has passed. Increment the counter and move on.
memory_free_counter = memory_free_counter + status_polling
return memory_free_counter
# uptime(): Function that returns the length of time the system has been
# online from /proc/uptime. Takes no arguments, returns a string.
def uptime():
uptime_seconds = None
uptime_string = None
try:
file = open("/proc/uptime", "r")
uptime_seconds = float(file.readline().split()[0])
file.close()
except:
return None
uptime_string = str(timedelta(seconds = uptime_seconds))
return uptime_string
# current_ip_address(): Function that returns the current non-RFC 1989 IP
# address of the system using an external HTTP(S) service or REST API.
# Takes one argument, a string containing the URL to the service. Returns
# the IP address as a string or None if it didn't work.
def current_ip_address(ip_addr_service):
request = None
# Attempt to make an HTTP(S) request to the service that returns the
# public IP of the host.
request = requests.get(ip_addr_service)
# Handle catastrophic failure.
if not request:
logging.err("Failed to contact HTTP(S) service " + str(ip_addr_service) + " to get host's IP address.")
return None
# Handle HTTP error codes.
if request.status_code != requests.codes.ok:
logging.err("HTTP(S) request to IP address service " + str(ip_addr_service) + "failed, returned HTTP error code " + str(request.status_code) + ".")
return None
# Got the host's public IP address. Explicitly cast to a string to make
# life easier four other modules.
logging.debug("Got current IP address of host: " + str(request.text))
return str(request.text)
# local_ip_address(): Function that returns the local IP address of the system
# by querying the primary network interface. Takes no arguments. Returns
# the IP address as a string or None if it didn't work.
def local_ip_address():
nics = psutil.net_if_addrs()
primary_nic = None
nic = None
addr = None
# Remove the loopback interface from the hash. If this results in an
# empty hash, return None.
try:
del nics["lo"]
except:
logging.debug("I guess this system doesn't call its loopback interface 'lo'.")
try:
del nics["lo0"]
except:
logging.debug("I guess this system doesn't call its loopback interface 'lo0'.")
if not nics:
logging.debug("No network interfaces found. That's weird.")
return None
# Search the hash for the primary NIC.
for nic in list(nics.keys()):
# Make sure we filter out VPN interfaces.
if "tun" in nic:
continue
for addr in nics[nic]:
# We want AF_INET.
if addr.family == 2:
# Only the primary has a broadcast address.
if addr.broadcast:
primary_nic = addr
# Return the IP address if we have one, an error message if not.
if primary_nic:
logging.debug("Got primary IP address of system: " + primary_nic.address)
return primary_nic.address
else:
logging.err("Unable to get primary IP address. Something went wrong.")
return "unknown. Something went wrong"
# convert_bytes(): Function that takes an arbitrary number of bytes and
# converts them to kilobytes, megabytes, gigabytes... taken from here:
# https://stackoverflow.com/questions/5194057/better-way-to-convert-file-sizes-in-python
# Written by user James Sapam, cleaned up a bit by me. He did a better job
# than I could. Returns a string containing the appropriate suffix.
def convert_bytes(bytes):
size_name = ("Bytes", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
# Catch the inactive interface case.
if (bytes == 0):
return "0B"
# Extract the whole number part of the traffic volume. This is the index
# into size_name above.
i = int(math.floor(math.log(bytes, 1024)))
# 1024^i
p = math.pow(1024, i)
# Generate the fractional part of the traffic volume.
s = round(bytes/p, 2)
# return the number and the appropriate label for the number.
return "%s %s" % (s, size_name[i])
# network_traffic(): Function that uses the psutil module to extract stats for
# every network interface on the system (except for the loopback) and returns
# them to the calling function.
def network_traffic():
stats = {}
nics = psutil.net_io_counters(pernic=True)
# Remove the loopback interface from the hash.
try:
del nics["lo"]
except:
logging.debug("I guess this system doesn't call its loopback interface 'lo'.")
try:
del nics["lo0"]
except:
logging.debug("I guess this system doesn't call its loopback interface 'lo0'.")
# Prime the network stats hash table with the remaining network interfaces.
for i in list(nics.keys()):
stats[i] = {}
# For each network interface on the system, convert bytes_sent and
# bytes_recv into human-readable strings.
for i in list(nics.keys()):
stats[i]["sent"] = convert_bytes(nics[i].bytes_sent)
stats[i]["received"] = convert_bytes(nics[i].bytes_recv)
logging.debug("Traffic volume to date for " + i + ": " + str(stats[i]))
return stats
# centigrade_to_fahrenheit: Function that takes a floating point value
# representing a temperature in degrees Celsius, and returns a floating
# point value representing the temperature in degrees Fahrenheit.
def centigrade_to_fahrenheit(celsius):
logging.debug("Entered system_stats.centigrade_to_fahrenheit().")
logging.debug("Temperature in Centigrade: " + str(celsius))
fahrenheit = 0.0
fahrenheit = celsius * 9.0
fahrenheit = fahrenheit / 5.0
fahrenheit = fahrenheit + 32.0
logging.debug("Temperature in Fahrenheit: " + str(fahrenheit))
return fahrenheit
# get_hardware_temperatures: Function that polls the temperature monitoring
# sensors available in the system. Takes no arguments. Returns a hash table
# containing the data. Returns None if there are no sensors (i.e., this is a
# virtual machine).
def get_hardware_temperatures():
try:
return psutil.sensors_temperatures()
except:
logging.debug("Unable to poll temperature sensors. They might not be supported on this platform.")
return None
# check_hardware_temperatures: Function that analyzes the values of the
# hardware temperatures and alerts the user if one of them has either reached
# a high or critical threshold. Takes seven arguments, the current values of
# temperature_counter, time_between_alerts, the value of status_polling, the
# number of standard deviations to calculate, the minimum and maximum
# temperature stat queue lengths, and the name of a function to send
# messages with. Returns an updated value for temperature_counter.
def check_hardware_temperatures(temperature_counter, time_between_alerts,
status_polling, std_devs, sys_avg_min_len, sys_avg_max_len,
send_message_to_user):
logging.debug("Entered function system_stats.check_hardware_temperatures().")
message = ""
label = ""
temperatures = get_hardware_temperatures()
fahrenheit = 0.0
no_critical = False
no_high = False
std_dev = 0.0
# If we're running in a virtual machine, we'll get an empty hash table.
if not temperatures:
logging.debug("Running on a virtual machine or not supported on this platform. Bouncing.")
return 0
# If we've made it this far, we're probably running on real hardware with
# at least one hardware sensor.
for temp_sensor in list(temperatures.keys()):
label = temp_sensor
# Temperature readings take the form of lists of tuples, where the
# tuples contain the actual data.
for i in temperatures[temp_sensor]:
no_high = False
no_critical = False
# Some sensors have internal names, some don't. If this sensor
# has one, replace the label with it.
if i[0]:
label = i[0]
logging.debug("Name of sensor: " + label)
# Schema of tuples:
# 0: Internal label (can be blank)
# 1: Current temperature (in Centigrade)
# 2: Temperature the driver considers too high (can be None)
# 3: Temperature the driver considers dangerously high (can be
# None)
# Check to see if the critical point is set and has been reached.
if i[3]:
if i[1] >= i[3]:
if time_between_alerts == 0:
logging.debug("System temperature alerting disabled.")
continue
fahrenheit = centigrade_to_fahrenheit(i[1])
message = "WARNING: Temperature sensor " + label + " is now reading " + str(round(i[1], 2)) + " degrees Centigrade (" + str(round(fahrenheit, 2)) + " degrees Fahrenheit). This is alarmingly high!"
send_message_to_user(message)
continue
else:
no_critical = True
logging.debug("Sensor " + label + " does not have a critical point defined.")
# Check to see if the too high point is set and has been reached.
# We only want one of these.
if i[2]:
if i[1] >= i[2]:
# If time_between_alerts is zero, alerting has been
# disabled so just move on.
if time_between_alerts == 0:
logging.debug("System temperature alerting disabled.")
continue
fahrenheit = centigrade_to_fahrenheit(i[1])
message = "DANGER: Temperature sensor " + label + " is now reading " + str(round(i[1], 2)) + " degrees Centigrade (" + str(round(fahrenheit, 2)) + " degrees Fahrenheit). Critical temperature reached! Investigate immediately!"
send_message_to_user(message)
continue
else:
no_high = True
logging.debug("Sensor " + label + " does not have a high point defined.")
# If there are no high or critical points defined by the driver, we
# have to fall back on a statistical analysis of temperature
# history.
if no_high or no_critical:
std_dev = 0.0
# If a list of device temperatures for this device doesn't
# exist, add it to the hash.
if label not in list(device_temperatures.keys()):
logging.debug("Creating temperature history for device " + label + ".")
device_temperatures[label] = []
# Make sure the temperature makes sense.
if i[1] <= 0.0:
logging.debug("Temperature for device " + label + " is negative. This makes no sense. Skipping.")
continue
# Store the current temperature in Centigrade.
device_temperatures[label].append(i[1])
logging.debug("Length of device_temperatures[" + label + "]: " + str(len(device_temperatures[label])))
# Pop the oldest values out of the list to keep it at a
# manageable size.
if len(device_temperatures[label]) >= int(sys_avg_max_len):
logging.debug("Removing oldest temperature for device_temperatures[" + label + "]." )
device_temperatures[label].pop(0)
# To calculate the standard deviation of a group of values,
# there need to be several available. Make sure this is the
# case.
if len(device_temperatures[label]) < int(sys_avg_min_len):
logging.debug("Need more than " + str(sys_avg_min_len) + " temperature samples. Waiting.")
continue
# Calculate the standard deviations of the three system loads
# and send an alert if there's been a spike.
std_dev = statistics.stdev(device_temperatures[label])
logging.debug("Standard deviation of temperature of sensor " + label + ": " + str(std_dev))
if std_dev > float(std_devs):
# If time_between_alerts is zero, alerting has been
# disabled so just move on.
if time_between_alerts == 0:
logging.debug("System temperature alerting disabled.")
continue
fahrenheit = centigrade_to_fahrenheit(i[1])
message = message + "WARNING: The temperature of sensor " + label + " has spiked to " + str(round(i[1], 2)) + " degrees Centigrade (" + str(round(fahrenheit, 2)) + " degrees Fahrenheit)! Investigate immediately!"
# Bottom of cycle through sensors on this device.
# Bottom of cycle through temperature sensors on the system.
# If a message has been sent in the recent past, check to see if it's been
# longer than the last time a message was sent. If so, reset the counter.
if message:
if temperature_counter >= time_between_alerts:
send_message_to_user(message)
logging.debug("Resetting time between alerts counter to 0.")
return 0
# If enough time between alerts hasn't passed yet, just increment the
# counter.
temperature_counter = temperature_counter + status_polling
logging.debug("Incrementing time between alerts counter.")
return temperature_counter
# local_datetime: Utility function which gets the current date and time from
# the system and returns it as a string. Returns None if it can't. Takes
# no arguments.
def local_datetime():
logging.debug("Entered system_stats.local_datetime().")
current_datetime = ""
current_datetime = time.asctime(time.localtime()) + " "
# Account for daylight savings time.
if time.daylight:
current_datetime = current_datetime + time.tzname[1]
else:
current_datetime = current_datetime + time.tzname[0]
return current_datetime
if __name__ == "__main__":
print("No tests yet.")
pass