-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathriemann_encoder.go
168 lines (146 loc) · 4.53 KB
/
riemann_encoder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
/***** BEGIN LICENSE BLOCK *****
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2012-2014
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Rob Miller ([email protected])
# Mike Trinkala ([email protected])
#
# ***** END LICENSE BLOCK *****/
package heka_rieman_encoder
import (
"encoding/binary"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/gogo/protobuf/proto"
"github.com/mozilla-services/heka/message"
"github.com/mozilla-services/heka/pipeline"
"github.com/cspenceiv/heka-riemann-encoder/riemenc"
rproto "github.com/cspenceiv/heka-riemann-encoder/riemenc/proto"
)
// Encoder for converting Message objects into Protocol Buffer data.
type RiemannEncoder struct {
processMessageCount int64
processMessageFailures int64
processMessageSamples int64
processMessageDuration int64
pConfig *pipeline.PipelineConfig
reportLock sync.Mutex
sample bool
sampleDenominator int
}
// Heka will call this before calling any other methods to give us access to
// the pipeline configuration.
func (re *RiemannEncoder) SetPipelineConfig(pConfig *pipeline.PipelineConfig) {
re.pConfig = pConfig
}
func (re *RiemannEncoder) Init(config interface{}) error {
re.sample = true
re.sampleDenominator = re.pConfig.Globals.SampleDenominator
return nil
}
//func (re *RiemannEncoder) Encode(pack *pipeline.PipelinePack) (output []byte, err error) {
func (re *RiemannEncoder) Encode(pack *pipeline.PipelinePack) ([]byte, error) {
atomic.AddInt64(&re.processMessageCount, 1)
var startTime time.Time
if re.sample {
startTime = time.Now()
}
// Gather pack data into riemann event struct
var event = &riemenc.Event{}
var message = pack.Message
event.Time = *message.Timestamp / 1e9
if nil != message.Severity {
switch int(*message.Severity) {
case 0:
event.State = "Emergency"
case 1:
event.State = "Alert"
case 2:
event.State = "Critical"
case 3:
event.State = "Error"
case 4:
event.State = "Warning"
case 5:
event.State = "Notice"
case 6:
event.State = "Informational"
case 7:
event.State = "Debug"
}
}
event.Service = *message.Logger //Consider the use of Logger and Type
event.Host = *message.Hostname
if len(*message.Payload) != 0 {
event.Description = *message.Payload
}
//event.Tags
//event.Ttl
//event.Attributes
if metric, ok := message.GetFieldValue("Metric"); ok {
event.Metric = metric
}
// End of gathering pack
pbevent, err := riemenc.EventToPbEvent(event)
if err != nil {
return nil, err
}
msg := &rproto.Msg{}
msg.Events = append(msg.Events, pbevent)
pboutput, err := proto.Marshal(msg)
if err != nil {
return nil, err
}
// Prepend output with the length of pboutput for Riemann
pblen := uint32(len(pboutput))
output := make([]byte, 4)
binary.BigEndian.PutUint32(output, pblen)
output = append(output, pboutput[:]...)
// Once the reimplementation of the output API is finished we should be
// able to just return pack.MsgBytes directly, but for now we need to copy
// the data to prevent problems in case the pack is zeroed and/or reused
// (overwriting the pack.MsgBytes memory) before we're done with it.
//output = make([]byte, len(pack.MsgBytes))
//copy(output, pack.MsgBytes)
if re.sample {
duration := time.Since(startTime).Nanoseconds()
re.reportLock.Lock()
re.processMessageDuration += duration
re.processMessageSamples++
re.reportLock.Unlock()
}
re.sample = 0 == rand.Intn(re.sampleDenominator)
return output, nil
}
func (re *RiemannEncoder) Stop() {
return
}
func (re *RiemannEncoder) ReportMsg(msg *message.Message) error {
re.reportLock.Lock()
defer re.reportLock.Unlock()
message.NewInt64Field(msg, "ProcessMessageCount",
atomic.LoadInt64(&re.processMessageCount), "count")
message.NewInt64Field(msg, "ProcessMessageFailures",
atomic.LoadInt64(&re.processMessageFailures), "count")
message.NewInt64Field(msg, "ProcessMessageSamples",
re.processMessageSamples, "count")
var tmp int64 = 0
if re.processMessageSamples > 0 {
tmp = re.processMessageDuration / re.processMessageSamples
}
message.NewInt64Field(msg, "ProcessMessageAvgDuration", tmp, "ns")
return nil
}
func init() {
pipeline.RegisterPlugin("RiemannEncoder", func() interface{} {
return new(RiemannEncoder)
})
}