-
Notifications
You must be signed in to change notification settings - Fork 225
/
Copy pathocr_rpc_client.go
234 lines (195 loc) · 5.86 KB
/
ocr_rpc_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package ocrworker
import (
"encoding/json"
"fmt"
"time"
"github.com/nu7hatch/gouuid"
"github.com/couchbaselabs/logg"
"github.com/streadway/amqp"
)
const (
RPC_RESPONSE_TIMEOUT = time.Second * 120
)
type OcrRpcClient struct {
rabbitConfig RabbitConfig
connection *amqp.Connection
channel *amqp.Channel
}
type OcrResult struct {
Text string
}
func NewOcrRpcClient(rc RabbitConfig) (*OcrRpcClient, error) {
ocrRpcClient := &OcrRpcClient{
rabbitConfig: rc,
}
return ocrRpcClient, nil
}
func (c *OcrRpcClient) DecodeImage(ocrRequest OcrRequest) (OcrResult, error) {
var err error
correlationUuidRaw, err := uuid.NewV4()
if err != nil {
return OcrResult{}, err
}
correlationUuid := correlationUuidRaw.String()
logg.LogTo("OCR_CLIENT", "dialing %q", c.rabbitConfig.AmqpURI)
c.connection, err = amqp.Dial(c.rabbitConfig.AmqpURI)
if err != nil {
return OcrResult{}, err
}
defer c.connection.Close()
c.channel, err = c.connection.Channel()
if err != nil {
return OcrResult{}, err
}
if err := c.channel.ExchangeDeclare(
c.rabbitConfig.Exchange, // name
c.rabbitConfig.ExchangeType, // type
true, // durable
false, // auto-deleted
false, // internal
false, // noWait
nil, // arguments
); err != nil {
return OcrResult{}, err
}
rpcResponseChan := make(chan OcrResult)
callbackQueue, err := c.subscribeCallbackQueue(correlationUuid, rpcResponseChan)
if err != nil {
return OcrResult{}, err
}
// Reliable publisher confirms require confirm.select support from the
// connection.
if c.rabbitConfig.Reliable {
if err := c.channel.Confirm(false); err != nil {
return OcrResult{}, err
}
ack, nack := c.channel.NotifyConfirm(make(chan uint64, 1), make(chan uint64, 1))
defer confirmDelivery(ack, nack)
}
// TODO: we only need to download image url if there are
// any preprocessors. if rabbitmq isn't in same data center
// as open-ocr, it will be expensive in terms of bandwidth
// to have image binary in messages
if ocrRequest.ImgBytes == nil {
// if we do not have bytes use base 64 file by converting it to bytes
if ocrRequest.hasBase64() {
logg.LogTo("OCR_CLIENT", "OCR request has base 64 convert it to bytes")
err = ocrRequest.decodeBase64()
if err != nil {
logg.LogTo("OCR_CLIENT", "Error decoding base64: %v", err)
return OcrResult{}, err
}
} else {
// if we do not have base 64 or bytes download the file
err = ocrRequest.downloadImgUrl()
if err != nil {
logg.LogTo("OCR_CLIENT", "Error downloading img url: %v", err)
return OcrResult{}, err
}
}
}
logg.LogTo("OCR_CLIENT", "ocrRequest before: %v", ocrRequest)
routingKey := ocrRequest.nextPreprocessor(c.rabbitConfig.RoutingKey)
logg.LogTo("OCR_CLIENT", "publishing with routing key %q", routingKey)
logg.LogTo("OCR_CLIENT", "ocrRequest after: %v", ocrRequest)
ocrRequestJson, err := json.Marshal(ocrRequest)
if err != nil {
return OcrResult{}, err
}
if err = c.channel.Publish(
c.rabbitConfig.Exchange, // publish to an exchange
routingKey,
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "application/json",
ContentEncoding: "",
Body: []byte(ocrRequestJson),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
ReplyTo: callbackQueue.Name,
CorrelationId: correlationUuid,
// a bunch of application/implementation-specific fields
},
); err != nil {
return OcrResult{}, nil
}
select {
case ocrResult := <-rpcResponseChan:
return ocrResult, nil
case <-time.After(RPC_RESPONSE_TIMEOUT):
return OcrResult{}, fmt.Errorf("Timeout waiting for RPC response")
}
}
func (c OcrRpcClient) subscribeCallbackQueue(correlationUuid string, rpcResponseChan chan OcrResult) (amqp.Queue, error) {
// declare a callback queue where we will receive rpc responses
callbackQueue, err := c.channel.QueueDeclare(
"", // name -- let rabbit generate a random one
false, // durable
true, // delete when usused
true, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
return amqp.Queue{}, err
}
// bind the callback queue to an exchange + routing key
if err = c.channel.QueueBind(
callbackQueue.Name, // name of the queue
callbackQueue.Name, // bindingKey
c.rabbitConfig.Exchange, // sourceExchange
false, // noWait
nil, // arguments
); err != nil {
return amqp.Queue{}, err
}
logg.LogTo("OCR_CLIENT", "callbackQueue name: %v", callbackQueue.Name)
deliveries, err := c.channel.Consume(
callbackQueue.Name, // name
tag, // consumerTag,
true, // noAck
true, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)
if err != nil {
return amqp.Queue{}, err
}
go c.handleRpcResponse(deliveries, correlationUuid, rpcResponseChan)
return callbackQueue, nil
}
func (c OcrRpcClient) handleRpcResponse(deliveries <-chan amqp.Delivery, correlationUuid string, rpcResponseChan chan OcrResult) {
logg.LogTo("OCR_CLIENT", "looping over deliveries..")
for d := range deliveries {
if d.CorrelationId == correlationUuid {
logg.LogTo(
"OCR_CLIENT",
"got %dB delivery: [%v] %q. Reply to: %v",
len(d.Body),
d.DeliveryTag,
d.Body,
d.ReplyTo,
)
ocrResult := OcrResult{
Text: string(d.Body),
}
logg.LogTo("OCR_CLIENT", "send result to rpcResponseChan")
rpcResponseChan <- ocrResult
logg.LogTo("OCR_CLIENT", "sent result to rpcResponseChan")
return
} else {
logg.LogTo("OCR_CLIENT", "ignoring delivery w/ correlation id: %v", d.CorrelationId)
}
}
}
func confirmDelivery(ack, nack chan uint64) {
select {
case tag := <-ack:
logg.LogTo("OCR_CLIENT", "confirmed delivery, tag: %v", tag)
case tag := <-nack:
logg.LogTo("OCR_CLIENT", "failed to confirm delivery: %v", tag)
}
}