-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathegnode.py
465 lines (355 loc) · 17.1 KB
/
egnode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
# Authors: Shreyas M, Ritesh G, Tanvi P
import copy
import datetime
import math
import threading
import xmlrpc.client
from random import sample
import utilities.Constants as Constants
from handlers.SynGossipDigest import *
from handlers.SynVerbHandler import *
from handlers.AckVerbHandler import *
from handlers.Ack2VerbHandler import *
from utilities.utils import *
monitor_client = xmlrpc.client.ServerProxy('http://' + Constants.MONITOR_ADDRESS + '/RPC2', allow_none=True)
provider_node = xmlrpc.client.ServerProxy('http://' + Constants.PROVIDER_ADDRESS + '/RPC2', allow_none=True)
class Node(object):
def __init__(self, host, port, id):
"""
Author: Ritesh G
Initialization of Data Structures maintained at every node.
"""
self.ip = str(host) + ':' + str(port)
self.heart_beat_state = {"heartBeatValue": Constants.INITIAL_HEARTBEAT, "generation": getCurrentGeneration()}
self.app_state = {"IP_Port": str(host)+':'+str(port) , "App_version": Constants.APP_VERSION_DEFAULT, "App_status": Constants.STATUS_NORMAL}
self.endpoint_state_map = {self.app_state["IP_Port"]: {'heartBeat': self.heart_beat_state, 'appState':self.app_state, 'last_updated_time': getTimeStamp()}}
self.gDigestList = {self.app_state['IP_Port'] : [self.app_state['App_version'], self.heart_beat_state['generation'], self.heart_beat_state['heartBeatValue']]}
self.fault_vector = {self.ip:0}
self.live_nodes = list(self.ip)
self.dead_nodes = list()
self.handshake_nodes = list()
self.message_count = 0
self.rr_index = 0
self.rr_list = []
self.gossip_version = 0
self.rr_round = 0
self.gossip_protocol= ""
self.sc_index = "" # used in sc round robin for current ip position
self.lastReceived = {'ip': "", 'timestamp': ""}
def updateHearbeat(self):
self.heart_beat_state["heartBeatValue"] += 1
self.endpoint_state_map[self.ip]['heartBeat'] = self.heart_beat_state
self.gDigestList[self.ip][2] = self.heart_beat_state['heartBeatValue']
def sendSYN(self, sendTo):
"""
Author: Shreyas M
:param sendTo: IP address of node to initiate the handshake.
First step of handshake. Sends SYN.
"""
self.message_count += 1
synDigest = SynGossipDigest(Constants.DEFAULT_CLUSTER, self.gDigestList)
try:
client = xmlrpc.client.ServerProxy('http://' + sendTo + '/RPC2')
client.acceptSyn(synDigest, self.ip)
print("SYN sent")
except Exception as e:
pass
def acceptSyn(self,synDigest, clientIp):
"""
Author: Shreyas M
:param synDigest: List of IPs that the sender knows
:param clientIP: sender IP
Sends out the accept ACK along with delta list of nodes it needs the endpoint state map of.
"""
self.message_count += 1
variable = SynVerbHandler(self)
deltaGDigest, deltaEpStateMap = variable.handleSync(synDigest)
print('\nSyn handler completed')
try:
client = xmlrpc.client.ServerProxy('http://' + clientIp + '/RPC2')
client.acceptAck(deltaGDigest, deltaEpStateMap, self.app_state["IP_Port"])
print('\nACK sent')
except Exception as e:
pass
def updateTimestamp(self, ip):
try:
self.endpoint_state_map[ip]['last_updated_time'] = getTimeStamp()
except Exception as e:
pass
# if ip in self.fault_vector and self.fault_vector[ip] == 1:
self.fault_vector[ip] = 0
try:
# print(self.ip, self.fault_vector)
monitor_client.updateSuspectMatrix(self.ip, self.fault_vector, self.getGeneration(ip))
except Exception as e:
pass
def updateAliveStatus(self, ip, clientIp):
if(self.gossip_protocol == Constants.SCRR_GOSSIP):
if(ip not in self.fault_vector or ((self.fault_vector[ip]!=1) or (self.fault_vector[ip]==1 and ip==clientIp))):
self.updateTimestamp(ip)
else:
self.updateTimestamp(ip)
def acceptAck(self, deltaGDigest, deltaEpStateMap, clientIp):
"""
Author: Shreyas M
:param deltaGDigest: List of IPs for which client needs the endpoint State map
:param deltaEpStateMap: List of endpoint statemap of IPs that I don't know.
:param clientIP: sender IP
Sends back the endpoint statempa of the requested IPs
"""
print('\nIn Accept ACK')
self.message_count += 1
epStateMap = {}
ackHandler = AckVerbHandler(self)
# retrieve meta-app states of requested IPs
epStateMap = ackHandler.setEpStateMap(deltaGDigest)
#update my own meta-apps in endpoint
ackHandler.updateEpStateMap(deltaEpStateMap, clientIp)
self.fault_vector[clientIp] = 0
try:
# print(self.ip, self.fault_vector)
monitor_client.updateSuspectMatrix(self.ip, self.fault_vector, self.getGeneration(clientIp))
self.endpoint_state_map[clientIp]['last_updated_time'] = getTimeStamp()
except Exception as e:
pass
# updating timestamp for clientIp
if clientIp in self.endpoint_state_map:
self.updateAliveStatus(clientIp, clientIp)
print("\nACK handled... sending ACK2")
if not self.isInHandshake(clientIp):
self.handshake_nodes.append(clientIp)
if not self.isInLivenodes(clientIp):
self.live_nodes = list(self.endpoint_state_map.keys())
try:
client = xmlrpc.client.ServerProxy('http://' + clientIp + '/RPC2')
client.acceptAck2(epStateMap, self.app_state["IP_Port"])
print("\nACK2 sent")
except Exception as e:
pass
def acceptAck2(self, deltaEpStateMap, clientIp):
"""
Author: Shreyas M
:param deltaEpStateMap: List of endpoint statemap of IPs that I don't know.
:param clientIP: sender IP
Updates my own end point statemap as suggested by client IP
"""
print('\n in Accept Ack 2')
self.message_count += 1
ack2Handler = Ack2VerbHandler(self)
ack2Handler.updateEpStateMap(deltaEpStateMap, clientIp)
if not self.isInHandshake(clientIp):
self.handshake_nodes.append(clientIp)
if not self.isInLivenodes(clientIp):
self.live_nodes = list(self.endpoint_state_map.keys())
self.fault_vector[clientIp] = 0
try:
# print(self.ip, self.fault_vector)
monitor_client.updateSuspectMatrix(self.ip, self.fault_vector, self.getGeneration(clientIp))
self.endpoint_state_map[clientIp]['last_updated_time'] = getTimeStamp()
except Exception as e:
pass
# updating timestamp of clientIp
self.updateAliveStatus(clientIp, clientIp)
print('\n ACK2 processed... complete handshake')
def getGeneration(self, clientIp):
if clientIp in self.endpoint_state_map:
return self.endpoint_state_map[clientIp]["heartBeat"]["generation"]
else:
return getCurrentGeneration()
def isInHandshake(self, ip):
if ip in self.handshake_nodes:
return True
else:
return False
def isInLivenodes(self, ip):
if ip in self.live_nodes:
return True
else:
return False
def initiateRandomGossip(self):
"""
Author: Tanvi P
:params None
Create a copy of the gossipList with each node and randomly select a single node to send gossip.
Initiate a gossip with the randomly selected node
"""
digestList = copy.deepcopy(self.gDigestList)
digestList.pop(self.ip, None)
from gossip_server import scheduleGossip, scheduler
keyList = list(digestList.keys() - self.ip)
random_numbers = sample(range(0, len(keyList)), 1)
self.message_count += 1
for i in random_numbers:
ip = keyList[i]
if self.isInHandshake(ip):
try:
client = xmlrpc.client.ServerProxy('http://' + ip + '/RPC2')
client.receiveGossip(self.gDigestList, self.ip)
except Exception as e:
pass
else:
print('--------------------> sending syn'+ip)
self.sendSYN(ip)
def initiateRRGossip(self):
"""
Author: Shreyas M
Implementation of Round Robin Gossip Algorithm
It keeps track of the node list and gossips based on the index of list in each round in round robin fashion.
rr_index provides the index to gossip in each round
"""
if self.rr_index == 0:
self.rr_list = copy.deepcopy(self.gDigestList)
self.rr_list.pop(self.ip, None)
print('------ Starting Round Robin Rounds -------')
from gossip_server import scheduleGossip, scheduler
keyList = list(self.rr_list.keys() - self.ip)
self.message_count += 1
ip = keyList[self.rr_index]
if self.isInHandshake(ip):
try:
print("I'm gossiping to--> ", ip)
client = xmlrpc.client.ServerProxy('http://' + ip + '/RPC2')
client.receiveGossip(self.gDigestList, self.ip)
except Exception as e:
pass
else:
print('------> Initiate Handshake for: '+ip)
self.sendSYN(ip)
# Update to rr_index for the key list based on the round robin algorithm.
self.rr_index = (self.rr_index + 1) % len(self.rr_list)
def initiateBinaryRRGossip(self):
"""
Author: Ritesh G
Implementation of Binary Round Robin Gossip Algorithm.
It doesn't update the node list that follows the round robin order unless all indexes
in current list are processed. Special formula is used in calculating destination
node index.
Sends a normal gossip if already done with handshake. Else does the handshake first.
"""
if len(self.rr_list)==0 or self.rr_round-1 > math.log2(len(self.rr_list)):
self.rr_list = provider_node.getMapping()
self.rr_index = self.rr_list.index(self.ip) + 1
self.rr_round = 0
print('------ Starting Binary Round Robin Rounds -------')
self.message_count += 1 # used in monitoring node
ip = self.rr_list[(self.rr_index)%len(self.rr_list)]
if self.isInHandshake(ip):
try:
print("I'm gossiping to--> ", ip)
client = xmlrpc.client.ServerProxy('http://' + ip + '/RPC2')
client.receiveGossip(self.gDigestList, self.ip)
except Exception as e:
print(e)
pass
else:
print('------> Initiate Handshake for: '+ip)
self.sendSYN(ip)
self.rr_index = (self.rr_index + 2**(self.rr_round)) % len(self.rr_list)
self.rr_round += 1
def initiateSCRRGossip(self):
"""
Author: Shreyas M
It keeps track of the node list and gossips based on the index of list in each round in round robin fashion.
rr_index provides the index to gossip in each round
On the receive gossip side it expects the gossip from specfic rr_index node if thats not received then it marks
the node as potential failure. Shares this information with the monitor node.
"""
if len(self.rr_list)==0 or self.rr_index == self.sc_index:
self.rr_list = provider_node.getMapping()
self.sc_index = self.rr_list.index(self.ip)
self.rr_index = (self.sc_index + 1) % len(self.rr_list)
from gossip_server import scheduleGossip, scheduler
# Update to rr_index for the rr list from provider based on the round robin algorithm.
self.rr_round = (self.rr_index - self.sc_index + len(self.rr_list))%len(self.rr_list)
self.message_count += 1
ip = self.rr_list[self.rr_index]
if self.isInHandshake(ip):
try:
print("I'm gossiping to--> ", ip)
client = xmlrpc.client.ServerProxy('http://' + ip + '/RPC2')
client.receiveGossip(self.gDigestList, self.ip)
except Exception as e:
pass
else:
print('------> Initiate Handshake for: '+ip)
self.sendSYN(ip)
self.rr_index = (self.rr_index + 1) % len(self.rr_list)
def startGossip(self, gossip_protocol):
"""
Author: Tanvi P
Making a single application run multiple protocols via command line argument
:param gossip_protocol: Type of protocol to run
:return: returns nothing
"""
if gossip_protocol == Constants.RANDOM_GOSSIP:
self.initiateRandomGossip()
elif gossip_protocol == Constants.RR_GOSSIP:
self.initiateRRGossip()
elif gossip_protocol == Constants.BRR_GOSSIP:
self.initiateBinaryRRGossip()
elif gossip_protocol == Constants.SCRR_GOSSIP:
self.initiateSCRRGossip()
def receiveGossip(self, digestList, clientIp):
"""
Authors: Ritesh G, Shreyas M
:param digestList: gossip digest list received from clientIP
:param clientIp: IP of sender node
RANDOM: Add the unknown IPs to its own digest list. Update the EndPoint State Map
for the IPs which are already in its EndPoint State Map. Update the last_updated_time
for the clientIp.
ROUND_ROBIN: Perform the same operation as that of RANDOM. In addition to this update the
last_updated_time for the IPs received in digestList which are already in its
EndPoint State Map for that round.
BINARY_ROUND_ROBIN: Same as ROUND_ROBIN.
SEQUENCE_CHECK: Same as ROUND_ROBIN. In addition to this, it checks if the it missed any gossip from
a IP in previous round. In this case, it is detected as FAIL.
"""
print('++++> Gossip received from--> ' + clientIp)
if self.gossip_protocol == Constants.SCRR_GOSSIP:
# checks for missed gossip in previous round.
# In this case, it is detected as FAIL
indexOfClient = self.rr_list.index(clientIp)
lastReceivedIp = self.rr_list[(indexOfClient + 1)%len(self.rr_list)]
if lastReceivedIp != self.ip:
if(lastReceivedIp != self.lastReceived['ip']) or \
(getDiffInSeconds(self.lastReceived['timestamp']) > Constants.WAIT_SECONDS_FAIL):
print('Gossip message expected from: '+lastReceivedIp + ' in last round. Hence marking as FAIL')
self.fault_vector[lastReceivedIp] = 1
monitor_client.updateSuspectMatrix(self.ip, self.fault_vector, self.getGeneration(lastReceivedIp))
self.lastReceived['ip'] = clientIp
self.lastReceived['timestamp'] = getTimeStamp()
currenttList = copy.deepcopy(self.gDigestList)
updatedList = []
for ip, digest in digestList.items():
if ip == self.ip:
continue
if ip in self.gDigestList:
if self.gDigestList[ip][1] < digest[1]:
currenttList[ip] = digest
try:
self.endpoint_state_map[ip]['appState']['App_version'] = digest[0]
self.endpoint_state_map[ip]['heartBeat']['generation'] = digest[1]
self.endpoint_state_map[ip]['heartBeat']['heartBeatValue'] = digest[2]
updatedList.append(ip)
except Exception as e:
print('passed the exception')
pass
elif self.gDigestList[ip][2] < digest[2] and self.gDigestList[ip][1] == digest[1]:
currenttList[ip][2] = digest[2]
try:
self.endpoint_state_map[ip]['heartBeat']['heartBeatValue'] = digest[2]
updatedList.append(ip)
except Exception as e:
print('passed the exception')
pass
if self.gossip_version == Constants.ROUND_ROBIN:
self.updateAliveStatus(ip, clientIp)
else:
currenttList[ip] = digest
print('Status updated by: '+self.ip)
print('for: ', updatedList)
print('as directed by: ', clientIp)
self.gDigestList = copy.deepcopy(currenttList)
# updating timestamp for clientIp
if clientIp in self.endpoint_state_map:
self.updateAliveStatus(clientIp, clientIp)