-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
114 lines (85 loc) · 2.89 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
const { spawnSync } = require(`child_process`)
const hyperdrive = require(`hyperdrive`)
const hyperswarm = require(`hyperswarm`)
const cronitor = require(`cronitor-caller`)(process.env.CRONITOR_KEY)
const moment = require(`moment`)
const fetch = require(`node-fetch`)
const debug = require(`debug`)(`qnzl:aw:sync-client`)
const TEN_MINUTES = 5 * 60 * 1000
const CRONITOR_AW_ID = process.env.CRONITOR_AW_ID
let lastUploadMarker = null
const sliceData = (data) => {
const dataSlices = []
debug(`slice data of len ${data.length}`)
for (let i = 0; i < Math.ceil(data.length / 300); i++) {
const sliceBegin = i * 300
let sliceEnd = (i + 1) * 300
sliceEnd = sliceEnd > data.length ? data.length : sliceEnd
debug(`slice begin: ${sliceBegin}\nSlice end: ${sliceEnd}`)
const dataSlice = data.slice(sliceBegin, sliceEnd)
dataSlices.push(dataSlice)
}
return dataSlices
}
const filterData = (data, marker) => {
if (!marker) return data
return data.filter((event) => {
const eventTimestamp = moment(event.timestamp)
return moment(marker).isBefore(eventTimestamp)
})
}
const heartbeat = async () => {
debug(`activity watch sync heartbeat`)
let localBuckets = await fetch(`http://localhost:5600/api/0/buckets`)
localBuckets = await localBuckets.json()
cronitor.run(CRONITOR_AW_ID)
debug(`got buckets`)
const promises = Object.values(localBuckets).map(async ({ id }) => {
debug(`get events for bucket ${id}`)
let bucketData = await fetch(`http://localhost:5600/api/0/buckets/${id}/events`)
let data = await bucketData.json()
data = filterData(data, lastUploadMarker)
debug(`got ${data.length} events for ${id}`)
let dataSlices = sliceData(data)
debug(`created ${dataSlices.length} slices to upload`)
dataSlices.forEach(async (slice, index) => {
debug(`upload slice #${index} for bucket ${id}`)
await fetch(`${process.env.AW_SYNC_SERVER}/api/v1/add/${id}`, {
method: `POST`,
headers: {
Authorization: `Bearer ${process.env.AW_AUTHORIZATION_TOKEN}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
data: slice,
timestamp: new Date()
})
})
})
})
Promise.all(promises)
.then(() => {
lastUploadMarker = moment()
})
if (process.argv[2] !== `once`) {
debug(`create next heartbeat timer`)
setTimeout(() => {
heartbeat()
}, TEN_MINUTES)
} else {
process.exit(0)
}
}
process.on(`SIGINT`, () => {
if (process.argv[2] === `once`) return
cronitor.fail(CRONITOR_AW_ID)
const out = spawnSync(`node ${__filename}`, [ `once` ], { encoding: `utf8` })
process.exit(10)
})
process.on(`exit`, (exitCode) => {
debug(`closing local aw sync`)
if (process.argv[2] === `once`) return
if (exitCode === 10) return
const out = spawnSync(`node ${__filename}`, [ `once` ], { encoding: `utf8` })
})
heartbeat()