-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcontrol_node.js
executable file
·187 lines (172 loc) · 6.23 KB
/
control_node.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
#!/usr/bin/env -S deno run --allow-net --allow-run=namadan-1.0.0,pkill,pgrep --allow-env=HOST,PORT,NAMADA,CHAIN_ID,NODE_OUT --allow-read=/home/namada/.local/share/namada --allow-write=/home/namada/.local/share/namada
// This service runs the node. In order for the indexer to have time to fetch all data
// before epoched data is pruned, this service parses the log output of the node, and
// when the epoch has incremented it tells the outgoing proxy to cut off outgoing
// connections from the node. Once the indexer is done with the current epoch, it tells
// the outgoing service to resume.
import { initialize, environment, api } from './lib.js'
import { TextLineStream } from './deps.js'
import { Service } from './services.js'
if (import.meta.main) setTimeout(main, 0)
function main () {
// Initialize and configure
initialize()
const { HOST, PORT, NAMADA, CHAIN_ID, NODE_OUT } = environment({
HOST: "0.0.0.0",
PORT: "25551",
NAMADA: "0=namadan-1.0.0",
CHAIN_ID: "namada-dryrun.abaaeaf7b78cb3ac",
NODE_OUT: "http://sync-proxy:25552"
})
// Namada node service manager
const service = new NamadaService(NAMADA, CHAIN_ID)
// When the node log contains block height and epoch, do the magic
service.events.addEventListener('synced', async ({detail: {block, epoch}}) => {
// Switch to next version of Namada node if hardfork has occurred
block = BigInt(block)
let namada = service.namadas[0n]
// Find the next version to run
for (const hardfork of Object.keys(service.namadas)) {
if (block > hardfork) {
namada = service.namadas[hardfork]
break
}
}
// If the next version is different to the current one, launch it
if (namada != service.namada) {
await service.pause()
service.namada = namada
await service.start()
}
// Pause if epoch has incremented
epoch = BigInt(epoch)
if (epoch > service.epoch) {
service.epoch = epoch
console.log('🟠 Epoch has increased to', epoch)
service.events.dispatchEvent(new RequestPauseEvent())
}
})
// When pause is requested, tell the sync-proxy to disconnect.
// The undexer will tell it to reenable connections when ready to continue.
service.events.addEventListener('request-pause', async () => {
let canConnect = true
while (canConnect) {
console.log('🟠 Requesting pause until indexer catches up.')
const response = await fetch(`${NODE_OUT}/pause`)
const responseJson = await response.json()
console.log('🟠 Pause response:', responseJson)
canConnect = responseJson.canConnect
await new Promise(resolve=>setTimeout(resolve, 100))
}
})
// Run HTTP+WS API server
api('Node', HOST, PORT, service.routes(), {
// Notify undexer of sync progress
onOpen: ({ send }) => {
service.events.addEventListener('synced', event => send(event))
},
// Stop trying to notify undexer of sync progress on disconnect
onClose: ({ send }) => {
service.events.removeEventListener('synced', event => send(event))
},
// Respond to resync command from undexer
onMessage: async ({ event }) => {
const data = JSON.parse(event.data)
if (data.restart) {
console.log('🚨 Restarting sync from beginning...')
await service.pause()
await service.deleteData()
await service.start()
}
}
})
// And away we go!
service.start()
}
export class NamadaService extends Service {
constructor (namadas = "0=namadan-0.45.1,182000=namadan-0.46.0", chainId) {
// Multiple versions of Namada to support hard forks
namadas = Object.fromEntries(namadas
.split(',')
.map(x=>x.split('='))
.map(([block, bin])=>[BigInt(block), bin])
)
const namada = namadas[0n]
if (!namada) {
throw new Error('NAMADA format: 0=...[,HardForkHeight=...]')
}
// Start with 1st version of Namada node
super('Namada', namada, 'ledger', 'run')
// Which version to run starting from which block
this.namadas = namadas
// Currently selected version
this.namada = namada
// Used to find config file
this.chainId = chainId
// Match block increment in log output
this.regex = new RegExp('Block height: (\\d+).+epoch: (\\d+)')
// Brokers events asynchronously
this.events = new EventTarget()
// Current epoch (FIXME: need to persist this!)
this.epoch = 0n
}
// Print config before launching node
async start () {
await this.printConfig()
return super.start()
}
// Print config
async printConfig () {
const configPath = `/home/namada/.local/share/namada/${this.chainId}/config.toml`
const config = (await Deno.readTextFile(configPath)).split('\n')
for (const line of config.filter(line=>line.includes('persistent_peers'))) {
console.log('ℹ️ Config:', line)
}
}
// Output from service is parsed line-by-line and passed to callback
pipe (stream, _kind) {
stream
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream())
.pipeTo(new WritableStream({ write: (chunk, _) => this.onChunk(chunk) }))
}
// Handle block and epoch increments
onChunk (chunk) {
if (!this.muted) {
console.log(chunk)
}
const match = chunk.match(this.regex)
if (match) {
// Report block and epoch progress
const [block, epoch] = match.slice(1)
console.log(`🟢 Sync: block ${block} of epoch ${epoch}`)
this.events.dispatchEvent(new SyncEvent({ block, epoch }))
}
}
/** Delete node state, allowing the sync to start from scratch.
* This is invoked by the indexer when it finds that it is more
* than 2 epochs ahead of the sync. */
async deleteData () {
while (true) try {
console.log('Deleting node data...')
await Promise.all([
`db`, 'cometbft', 'tx_wasm_cache', 'vp_wasm_cache'
].map(path=>Deno.remove(`/home/namada/.local/share/namada/${this.chainId}/${path}`, {
recursive: true
})))
break
} catch (e) {
console.warn(`Failed to remove ${path} (${e.message})`)
}
}
}
class SyncEvent extends CustomEvent {
constructor (detail) {
super('synced', { detail })
}
}
class RequestPauseEvent extends CustomEvent {
constructor () {
super('request-pause')
}
}