From d3e3d5796541e70e39c68047e5cdcb2f0e63e737 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vil=C3=A9m=20Ra=C5=A1ka?= Date: Fri, 24 Jan 2025 21:05:23 +0100 Subject: [PATCH] feat: initial working implementation of sondehub library finished --- GAPP/apps/gapp-server/src/main.ts | 24 +--- GAPP/libs/sondehub/src/lib/uploader.ts | 182 ++++++++++++++++++------- 2 files changed, 136 insertions(+), 70 deletions(-) diff --git a/GAPP/apps/gapp-server/src/main.ts b/GAPP/apps/gapp-server/src/main.ts index 47b956d..de2833c 100644 --- a/GAPP/apps/gapp-server/src/main.ts +++ b/GAPP/apps/gapp-server/src/main.ts @@ -1,25 +1,6 @@ import Fastify from 'fastify'; import { app } from './app/app'; import { getConfig } from './config'; -import { Uploader } from '@gapp/sondehub'; - -const uploader = new Uploader({ uploader_callsign: 'node-sondehub' }); - -uploader.addTelemetry({ - payload_callsign: 'cesilko-payload', - datetime: new Date().toISOString(), - lat: 50, - lon: 15, - alt: 2000, -}); - -uploader.uploadTelemetry(); - -uploader.uploadStationPosition({ - uploader_callsign: 'cesilko-car', - uploader_position: [50.01, 15.01, 185], - mobile: true -}); const config = getConfig(process.env); @@ -35,3 +16,8 @@ server.listen({ port: config.PORT, host: '0.0.0.0' }, (err) => { process.exit(1); } }); + +process.on('SIGINT', async () => { + await server.close(); + server.log.info('Server stopped'); +}); diff --git a/GAPP/libs/sondehub/src/lib/uploader.ts b/GAPP/libs/sondehub/src/lib/uploader.ts index f0b06ef..4cc8738 100644 --- a/GAPP/libs/sondehub/src/lib/uploader.ts +++ b/GAPP/libs/sondehub/src/lib/uploader.ts @@ -1,7 +1,8 @@ import axios from 'axios'; -import { gzipSync } from 'node:zlib'; +import { gzip } from 'node:zlib'; + +type JSONValue = string | number | boolean | { [x: string]: JSONValue } | Array; -// Base interface containing common properties interface BasePacket { software_name: string; software_version: string; @@ -10,12 +11,11 @@ interface BasePacket { uploader_antenna?: string; } -// Telemetry packet interface interface TelemetryPacket extends Partial { dev?: string; - time_received?: string; // ISO 8601 timestamp + time_received?: string; payload_callsign: string; - datetime: string; // ISO 8601 timestamp + datetime: string; lat: number; lon: number; alt: number; @@ -32,13 +32,12 @@ interface TelemetryPacket extends Partial { rssi?: number; telemetry_hidden?: boolean; historical?: boolean; - upload_time?: string; // ISO 8601 timestamp + upload_time?: string; } type StationBasePayload = Partial> & Required>; -// Station position packet interface interface StationPositionPacket extends StationBasePayload { uploader_radio?: string; uploader_contact_email?: string; @@ -46,18 +45,33 @@ interface StationPositionPacket extends StationBasePayload { } interface UploaderConfig extends BasePacket { + /** @description how often packets will be sent to sondehub (in ms) */ uploadRate: number; uploadTimeout: number; + /** @todo implement */ uploadRetries: number; dev: boolean; } type MinimalUploaderConfig = Partial> & Pick; +/** + * A class for uploading telemetry and station position data to SondeHub. + * This class handles queuing, compression, batching and periodic uploading of telemetry packets + * and station position information to the SondeHub Amateur API. + * + * @example + * const uploader = new Uploader({ + * uploader_callsign: 'MYCALL', + * software_name: 'my-software', + * software_version: '1.0.0' + * }); + */ export class Uploader { + private timeoutId?: NodeJS.Timeout; private uploaderConfig: UploaderConfig = { uploader_callsign: '', - uploadRate: 2, + uploadRate: 5_000, uploadTimeout: 20_000, uploadRetries: 5, dev: false, @@ -65,7 +79,7 @@ export class Uploader { software_version: '0.0.1', }; - private inputQueue: TelemetryPacket[] = []; + private telemetryQueue: TelemetryPacket[] = []; public static readonly SONDEHUB_AMATEUR_URL = 'https://api.v2.sondehub.org/amateur/telemetry'; public static readonly SONDEHUB_AMATEUR_STATION_POSITION_URL = 'https://api.v2.sondehub.org/amateur/listeners'; @@ -75,60 +89,63 @@ export class Uploader { ...this.uploaderConfig, ...options, }; - } - - private logDebug(message: string): void { - console.debug(`Sondehub Uploader: ${message}`); - } - - private logInfo(message: string): void { - console.info(`Sondehub Uploader: ${message}`); - } - private logError(message: string): void { - console.error(`Sondehub Uploader: ${message}`); + this.timeoutId = setTimeout(() => this.processQueue(), this.uploaderConfig.uploadRate); } + /** + * Adds a telemetry packet to the upload queue. + * The packet will be enhanced with default values and uploaded in the next upload cycle. + * + * @param {TelemetryPacket} packet - The telemetry packet to be queued for upload + * @example + * uploader.addTelemetry({ + * payload_callsign: 'SONDE-1', + * datetime: '2023-01-01T12:00:00Z', + * lat: 51.5074, + * lon: -0.1278, + * alt: 1000 + * }); + */ public addTelemetry(packet: TelemetryPacket): void { const enhancedPacket = this.enhanceTelemetryPacket(packet); - this.inputQueue.push(enhancedPacket); + this.telemetryQueue.push(enhancedPacket); this.logDebug('Telemetry packet added to queue.'); } - public async uploadTelemetry(): Promise { - if (this.inputQueue.length === 0) { - this.logDebug('No telemetry packets to upload.'); - return; - } - - const packets = [...this.inputQueue]; - this.inputQueue = []; - - try { - const compressedPayload = gzipSync(JSON.stringify(packets)); - const headers = { - 'User-Agent': `${this.uploaderConfig.software_name}-${this.uploaderConfig.software_version}`, - 'Content-Encoding': 'gzip', - 'Content-Type': 'application/json', - }; - - const response = await axios.put(Uploader.SONDEHUB_AMATEUR_URL, compressedPayload, { - headers, - timeout: this.uploaderConfig.uploadTimeout, - }); + /** + * Cleanly shuts down the uploader by clearing the upload timer and + * uploading any remaining packets in the queue. + * Should be called before the application exits. + * + * @returns {Promise} A promise that resolves when all pending uploads are complete + * @example + * await uploader.deinit(); + */ + public async deinit(): Promise { + this.timeoutId.unref(); - if (response.status === 200) { - this.logInfo(`Uploaded ${packets.length} telemetry packets.`); - } else { - this.logError(`Failed to upload telemetry. Status: ${response.status}, Message: ${response.statusText}`); - } - } catch (error) { - console.log(error); - this.logError(`Error uploading telemetry: ${error}`); + if (this.telemetryQueue.length) { + await this.uploadTelemetryPackets(this.telemetryQueue); } } + /** + * Uploads a station position update to SondeHub. + * This method can be used to update the receiver's location and configuration. + * Car will be shown on map when `mobile=true` + * + * @param {StationPositionPacket} stationPacket - The station position information to upload + * @returns {Promise} A promise that resolves when the upload is complete + * @example + * await uploader.uploadStationPosition({ + * uploader_callsign: 'MYCALL', + * uploader_position: [51.5074, -0.1278, 100], + * uploader_antenna: 'Diamond X-50', + * mobile: false + * }); + */ public async uploadStationPosition(stationPacket: StationPositionPacket): Promise { const payload = { software_name: this.uploaderConfig.software_name, @@ -143,7 +160,7 @@ export class Uploader { }; try { - const compressedPayload = gzipSync(JSON.stringify(payload)); + const compressedPayload = await this.compress(payload); const headers = { 'User-Agent': `${this.uploaderConfig.software_name}-${this.uploaderConfig.software_version}`, 'Content-Encoding': 'gzip', @@ -164,6 +181,46 @@ export class Uploader { } } + private async processQueue() { + if (!this.telemetryQueue.length) { + this.timeoutId = setTimeout(() => this.processQueue(), this.uploaderConfig.uploadRate); + return; + } + + const queue = [...this.telemetryQueue]; + this.telemetryQueue = []; + + await this.uploadTelemetryPackets(queue); + + this.timeoutId.unref(); + this.timeoutId = setTimeout(() => this.processQueue(), this.uploaderConfig.uploadRate); + } + + private async uploadTelemetryPackets(packets: TelemetryPacket[]): Promise { + try { + const compressedPayload = await this.compress(packets); + const headers = { + 'User-Agent': `${this.uploaderConfig.software_name}-${this.uploaderConfig.software_version}`, + 'Content-Encoding': 'gzip', + 'Content-Type': 'application/json', + }; + + const response = await axios.put(Uploader.SONDEHUB_AMATEUR_URL, compressedPayload, { + headers, + timeout: this.uploaderConfig.uploadTimeout, + }); + + if (response.status === 200) { + this.logInfo(`Uploaded ${packets.length} telemetry packets.`); + } else { + this.logError(`Failed to upload telemetry. Status: ${response.status}, Message: ${response.statusText}`); + } + } catch (error) { + console.log(error); + this.logError(`Error uploading telemetry: ${error}`); + } + } + private enhanceTelemetryPacket(packet: TelemetryPacket): TelemetryPacket { const enhancedPacket = { ...packet }; enhancedPacket.software_name = this.uploaderConfig.software_name; @@ -183,4 +240,27 @@ export class Uploader { return enhancedPacket; } + + private compress(data: JSONValue | TelemetryPacket[] | StationPositionPacket): Promise { + return new Promise((resolve) => { + gzip(JSON.stringify(data), (error, compressedData) => { + if (error) { + this.logError(error.message); + } + resolve(compressedData); + }); + }); + } + + private logDebug(message: string): void { + console.debug(`Sondehub Uploader: ${message}`); + } + + private logInfo(message: string): void { + console.info(`Sondehub Uploader: ${message}`); + } + + private logError(message: string): void { + console.error(`Sondehub Uploader: ${message}`); + } }