forked from BraydenNeale/DT-SNMP
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
122 lines (99 loc) · 3.45 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import json
import logging
from queue import Queue
from threading import Thread
from pprint import pprint
from dtsnmp.snmpv2_mib import SNMPv2MIB
from dtsnmp.avodaq_mib import AvodaqProcessMIB
"""
Test script designed to match the flow of custom_snmp_base_plugin_remote.py
Used to test snmp classes without requiring a build/package of the extension
Usage python test.py
"""
def test_query():
with open('properties.json') as fp:
config = json.load(fp)
device = _validate_device(config)
authentication = _validate_authentication(config)
# Connection check and system properties
snmpv2_mib = SNMPv2MIB(device, authentication)
property_dict = {}
try:
property_dict = snmpv2_mib.poll_properties()
except Exception as e:
# Just report the pysnmp exception back to the end user
info = 'Device connection issue: check snmp access'
raise Exception('{} - {}'.format(info,str(e))) from e
_display_properties(property_dict)
metric_queue = Queue()
thread_list = []
mib_list = []
# VENDOR/DEVICE SPECIFIC POLLING
DEVICE_OBJECT_ID = property_dict['sysObjectID']
F5_OBJECT_ID = '1.3.6.1.4.1.3375'
CISCO_OBJECT_ID = '1.3.6.1.4.1.9'
if DEVICE_OBJECT_ID.startswith(CISCO_OBJECT_ID):
print('CISCO')
elif DEVICE_OBJECT_ID.startswith(F5_OBJECT_ID):
print('F5')
else:
print('OTHER')
avodaq_mib = AvodaqProcessMIB(device, authentication)
mib_list.append(avodaq_mib)
for mib in mib_list:
t = Thread(target=lambda q,mib: q.put(mib.poll_metrics()), args=([metric_queue, mib]))
t.start()
thread_list.append(t)
for t in thread_list:
t.join()
_display_metrics(metric_queue)
def _display_metrics(metric_queue):
while not metric_queue.empty():
#pprint(metric_queue.get())
for endpoint,metrics in metric_queue.get().items():
for metric in metrics:
print('Key = {}, Value = {}, Absolute? = {}, Dimension = {}'.format(endpoint, metric['value'], metric['is_absolute_number'], metric['dimension']))
def _display_properties(property_dict):
for key,value in property_dict.items():
print('key = {}, Value = {}'.format(key,value))
def _validate_device(config):
hostname = config.get('hostname')
group_name = config.get('group')
device_type = config.get('device_type')
# Default port
port = 161
# If entered as 127.0.0.1:1234, extract the ip and the port
split_host = hostname.split(':')
if len(split_host) > 1:
hostname = split_host[0]
port = split_host[1]
# Check inputs are valid...
device = {
'host': hostname,
'port': int(port)
}
return device
def _validate_authentication(config):
snmp_version = config.get('snmp_version')
snmp_user = config.get('snmp_user')
auth_protocol = config.get('auth_protocol', None)
auth_key = config.get('auth_key', None)
priv_protocol = config.get('priv_protocol', None)
priv_key = config.get('priv_key', None)
# Check inputs are valid...
authentication = {
'version': int(snmp_version),
'user': snmp_user,
'auth': {
'protocol': auth_protocol,
'key': auth_key
},
'priv': {
'protocol': priv_protocol,
'key': priv_key
}
}
return authentication
if __name__ == '__main__':
logging.basicConfig(filename='test.log',level=logging.DEBUG)
test_query()