This repository has been archived by the owner on Aug 3, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 14
/
events.go
189 lines (165 loc) · 5.79 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// Copyright (c) 2013 Project Iris. All rights reserved.
//
// The current language binding is an official support library of the Iris
// cloud messaging framework, and as such, the same licensing terms apply.
// For details please see http://iris.karalabe.com/downloads#License
// Event handlers for relay side messages.
package iris
import (
"errors"
"sync/atomic"
"time"
)
// Schedules an application broadcast message for the service handler to process.
func (c *Connection) handleBroadcast(message []byte) {
id := int(atomic.AddUint64(&c.bcastIdx, 1))
c.Log.Debug("scheduling arrived broadcast", "broadcast", id, "data", logLazyBlob(message))
// Make sure there is enough memory for the message
used := int(atomic.LoadInt32(&c.bcastUsed)) // Safe, since only 1 thread increments!
if used+len(message) <= c.limits.BroadcastMemory {
// Increment the memory usage of the queue and schedule the broadcast
atomic.AddInt32(&c.bcastUsed, int32(len(message)))
c.bcastPool.Schedule(func() {
// Start the processing by decrementing the memory usage
atomic.AddInt32(&c.bcastUsed, -int32(len(message)))
c.Log.Debug("handling scheduled broadcast", "broadcast", id)
c.handler.HandleBroadcast(message)
})
return
}
// Not enough memory in the broadcast queue
c.Log.Error("broadcast exceeded memory allowance", "broadcast", id, "limit", c.limits.BroadcastMemory, "used", used, "size", len(message))
}
// Schedules an application request for the service handler to process.
func (c *Connection) handleRequest(id uint64, request []byte, timeout time.Duration) {
logger := c.Log.New("remote_request", id)
logger.Debug("scheduling arrived request", "data", logLazyBlob(request), "timeout", timeout)
// Make sure there is enough memory for the request
used := int(atomic.LoadInt32(&c.reqUsed)) // Safe, since only 1 thread increments!
if used+len(request) <= c.limits.RequestMemory {
// Increment the memory usage of the queue
atomic.AddInt32(&c.reqUsed, int32(len(request)))
// Create the expiration timer and schedule the request
expiration := time.After(timeout)
c.reqPool.Schedule(func() {
// Start the processing by decrementing the memory usage
atomic.AddInt32(&c.reqUsed, -int32(len(request)))
// Make sure the request didn't expire while enqueued
select {
case expired := <-expiration:
exp := time.Since(expired)
logger.Error("dumping expired scheduled request", "scheduled", exp+timeout, "timeout", timeout, "expired", exp)
return
default:
// All ok, continue
}
// Handle the request and return a reply
logger.Debug("handling scheduled request")
reply, err := c.handler.HandleRequest(request)
fault := ""
if err != nil {
fault = err.Error()
}
logger.Debug("replying to handled request", "data", logLazyBlob(reply), "error", err)
if err := c.sendReply(id, reply, fault); err != nil {
logger.Error("failed to send reply", "reason", err)
}
})
return
}
// Not enough memory in the request queue
logger.Error("request exceeded memory allowance", "limit", c.limits.RequestMemory, "used", used, "size", len(request))
}
// Looks up a pending request and delivers the result.
func (c *Connection) handleReply(id uint64, reply []byte, fault string) {
c.reqLock.RLock()
defer c.reqLock.RUnlock()
if reply == nil && len(fault) == 0 {
c.reqErrs[id] <- ErrTimeout
} else if reply == nil {
c.reqErrs[id] <- &RemoteError{errors.New(fault)}
} else {
c.reqReps[id] <- reply
}
}
// Forwards a topic publish event to the topic subscription.
func (c *Connection) handlePublish(topic string, event []byte) {
// Fetch the handler and release the lock fast
c.subLock.RLock()
top, ok := c.subLive[topic]
c.subLock.RUnlock()
// Make sure the subscription is still live
if ok {
top.handlePublish(event)
} else {
c.Log.Warn("stale publish arrived", "topic", topic)
}
}
// Notifies the application of the relay link going down.
func (c *Connection) handleClose(reason error) {
// Notify the client of the drop if premature
if reason != nil {
c.Log.Crit("connection dropped", "reason", reason)
// Only server connections have registered handlers
if c.handler != nil {
c.handler.HandleDrop(reason)
}
}
// Close all open tunnels
c.tunLock.Lock()
for _, tun := range c.tunLive {
tun.handleClose("connection dropped")
}
c.tunLive = nil
c.tunLock.Unlock()
}
// Opens a new local tunnel endpoint and binds it to the remote side.
func (c *Connection) handleTunnelInit(id uint64, chunkLimit int) {
go func() {
if tun, err := c.acceptTunnel(id, chunkLimit); err == nil {
c.handler.HandleTunnel(tun)
}
// Else: failure already logged by the acceptor
}()
}
// Forwards the tunnel construction result to the requested tunnel.
func (c *Connection) handleTunnelResult(id uint64, chunkLimit int) {
// Retrieve the tunnel
c.tunLock.RLock()
tun := c.tunLive[id]
c.tunLock.RUnlock()
// Finalize initialization
tun.handleInitResult(chunkLimit)
}
// Forwards a tunnel data allowance to the requested tunnel.
func (c *Connection) handleTunnelAllowance(id uint64, space int) {
// Retrieve the tunnel
c.tunLock.RLock()
tun, ok := c.tunLive[id]
c.tunLock.RUnlock()
// Notify it of the granted data allowance
if ok {
tun.handleAllowance(space)
}
}
// Forwards a message chunk transfer to the requested tunnel.
func (c *Connection) handleTunnelTransfer(id uint64, size int, chunk []byte) {
// Retrieve the tunnel
c.tunLock.RLock()
tun, ok := c.tunLive[id]
c.tunLock.RUnlock()
// Notify it of the arrived message chunk
if ok {
tun.handleTransfer(size, chunk)
}
}
// Terminates a tunnel, stopping all data transfers.
func (c *Connection) handleTunnelClose(id uint64, reason string) {
c.tunLock.Lock()
defer c.tunLock.Unlock()
// Make sure the tunnel is still alive
if tun, ok := c.tunLive[id]; ok {
tun.handleClose(reason)
delete(c.tunLive, id)
}
}