Skip to content

Commit

Permalink
feat: initial working implementation of sondehub library finished
Browse files Browse the repository at this point in the history
  • Loading branch information
VilemRaska committed Jan 24, 2025
1 parent 627c2df commit d3e3d57
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 70 deletions.
24 changes: 5 additions & 19 deletions GAPP/apps/gapp-server/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,6 @@
import Fastify from 'fastify';
import { app } from './app/app';
import { getConfig } from './config';
import { Uploader } from '@gapp/sondehub';

const uploader = new Uploader({ uploader_callsign: 'node-sondehub' });

uploader.addTelemetry({
payload_callsign: 'cesilko-payload',
datetime: new Date().toISOString(),
lat: 50,
lon: 15,
alt: 2000,
});

uploader.uploadTelemetry();

uploader.uploadStationPosition({
uploader_callsign: 'cesilko-car',
uploader_position: [50.01, 15.01, 185],
mobile: true
});

const config = getConfig(process.env);

Expand All @@ -35,3 +16,8 @@ server.listen({ port: config.PORT, host: '0.0.0.0' }, (err) => {
process.exit(1);
}
});

process.on('SIGINT', async () => {
await server.close();
server.log.info('Server stopped');
});
182 changes: 131 additions & 51 deletions GAPP/libs/sondehub/src/lib/uploader.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import axios from 'axios';
import { gzipSync } from 'node:zlib';
import { gzip } from 'node:zlib';

type JSONValue = string | number | boolean | { [x: string]: JSONValue } | Array<JSONValue>;

// Base interface containing common properties
interface BasePacket {
software_name: string;
software_version: string;
Expand All @@ -10,12 +11,11 @@ interface BasePacket {
uploader_antenna?: string;
}

// Telemetry packet interface
interface TelemetryPacket extends Partial<BasePacket> {
dev?: string;
time_received?: string; // ISO 8601 timestamp
time_received?: string;
payload_callsign: string;
datetime: string; // ISO 8601 timestamp
datetime: string;
lat: number;
lon: number;
alt: number;
Expand All @@ -32,40 +32,54 @@ interface TelemetryPacket extends Partial<BasePacket> {
rssi?: number;
telemetry_hidden?: boolean;
historical?: boolean;
upload_time?: string; // ISO 8601 timestamp
upload_time?: string;
}

type StationBasePayload = Partial<Omit<BasePacket, 'uploader_callsign' | 'uploader_position'>> &
Required<Pick<BasePacket, 'uploader_callsign' | 'uploader_position'>>;

// Station position packet interface
interface StationPositionPacket extends StationBasePayload {
uploader_radio?: string;
uploader_contact_email?: string;
mobile?: boolean;
}

interface UploaderConfig extends BasePacket {
/** @description how often packets will be sent to sondehub (in ms) */
uploadRate: number;
uploadTimeout: number;
/** @todo implement */
uploadRetries: number;
dev: boolean;
}

type MinimalUploaderConfig = Partial<Omit<UploaderConfig, 'uploader_Callsign'>> & Pick<UploaderConfig, 'uploader_callsign'>;

/**
* A class for uploading telemetry and station position data to SondeHub.
* This class handles queuing, compression, batching and periodic uploading of telemetry packets
* and station position information to the SondeHub Amateur API.
*
* @example
* const uploader = new Uploader({
* uploader_callsign: 'MYCALL',
* software_name: 'my-software',
* software_version: '1.0.0'
* });
*/
export class Uploader {
private timeoutId?: NodeJS.Timeout;
private uploaderConfig: UploaderConfig = {
uploader_callsign: '',
uploadRate: 2,
uploadRate: 5_000,
uploadTimeout: 20_000,
uploadRetries: 5,
dev: false,
software_name: 'node-sondehub',
software_version: '0.0.1',
};

private inputQueue: TelemetryPacket[] = [];
private telemetryQueue: TelemetryPacket[] = [];

public static readonly SONDEHUB_AMATEUR_URL = 'https://api.v2.sondehub.org/amateur/telemetry';
public static readonly SONDEHUB_AMATEUR_STATION_POSITION_URL = 'https://api.v2.sondehub.org/amateur/listeners';
Expand All @@ -75,60 +89,63 @@ export class Uploader {
...this.uploaderConfig,
...options,
};
}

private logDebug(message: string): void {
console.debug(`Sondehub Uploader: ${message}`);
}

private logInfo(message: string): void {
console.info(`Sondehub Uploader: ${message}`);
}

private logError(message: string): void {
console.error(`Sondehub Uploader: ${message}`);
this.timeoutId = setTimeout(() => this.processQueue(), this.uploaderConfig.uploadRate);
}

/**
* Adds a telemetry packet to the upload queue.
* The packet will be enhanced with default values and uploaded in the next upload cycle.
*
* @param {TelemetryPacket} packet - The telemetry packet to be queued for upload
* @example
* uploader.addTelemetry({
* payload_callsign: 'SONDE-1',
* datetime: '2023-01-01T12:00:00Z',
* lat: 51.5074,
* lon: -0.1278,
* alt: 1000
* });
*/
public addTelemetry(packet: TelemetryPacket): void {
const enhancedPacket = this.enhanceTelemetryPacket(packet);

this.inputQueue.push(enhancedPacket);
this.telemetryQueue.push(enhancedPacket);
this.logDebug('Telemetry packet added to queue.');
}

public async uploadTelemetry(): Promise<void> {
if (this.inputQueue.length === 0) {
this.logDebug('No telemetry packets to upload.');
return;
}

const packets = [...this.inputQueue];
this.inputQueue = [];

try {
const compressedPayload = gzipSync(JSON.stringify(packets));
const headers = {
'User-Agent': `${this.uploaderConfig.software_name}-${this.uploaderConfig.software_version}`,
'Content-Encoding': 'gzip',
'Content-Type': 'application/json',
};

const response = await axios.put(Uploader.SONDEHUB_AMATEUR_URL, compressedPayload, {
headers,
timeout: this.uploaderConfig.uploadTimeout,
});
/**
* Cleanly shuts down the uploader by clearing the upload timer and
* uploading any remaining packets in the queue.
* Should be called before the application exits.
*
* @returns {Promise<void>} A promise that resolves when all pending uploads are complete
* @example
* await uploader.deinit();
*/
public async deinit(): Promise<void> {
this.timeoutId.unref();

if (response.status === 200) {
this.logInfo(`Uploaded ${packets.length} telemetry packets.`);
} else {
this.logError(`Failed to upload telemetry. Status: ${response.status}, Message: ${response.statusText}`);
}
} catch (error) {
console.log(error);
this.logError(`Error uploading telemetry: ${error}`);
if (this.telemetryQueue.length) {
await this.uploadTelemetryPackets(this.telemetryQueue);
}
}

/**
* Uploads a station position update to SondeHub.
* This method can be used to update the receiver's location and configuration.
* Car will be shown on map when `mobile=true`
*
* @param {StationPositionPacket} stationPacket - The station position information to upload
* @returns {Promise<void>} A promise that resolves when the upload is complete
* @example
* await uploader.uploadStationPosition({
* uploader_callsign: 'MYCALL',
* uploader_position: [51.5074, -0.1278, 100],
* uploader_antenna: 'Diamond X-50',
* mobile: false
* });
*/
public async uploadStationPosition(stationPacket: StationPositionPacket): Promise<void> {
const payload = {
software_name: this.uploaderConfig.software_name,
Expand All @@ -143,7 +160,7 @@ export class Uploader {
};

try {
const compressedPayload = gzipSync(JSON.stringify(payload));
const compressedPayload = await this.compress(payload);
const headers = {
'User-Agent': `${this.uploaderConfig.software_name}-${this.uploaderConfig.software_version}`,
'Content-Encoding': 'gzip',
Expand All @@ -164,6 +181,46 @@ export class Uploader {
}
}

private async processQueue() {
if (!this.telemetryQueue.length) {
this.timeoutId = setTimeout(() => this.processQueue(), this.uploaderConfig.uploadRate);
return;
}

const queue = [...this.telemetryQueue];
this.telemetryQueue = [];

await this.uploadTelemetryPackets(queue);

this.timeoutId.unref();
this.timeoutId = setTimeout(() => this.processQueue(), this.uploaderConfig.uploadRate);
}

private async uploadTelemetryPackets(packets: TelemetryPacket[]): Promise<void> {
try {
const compressedPayload = await this.compress(packets);
const headers = {
'User-Agent': `${this.uploaderConfig.software_name}-${this.uploaderConfig.software_version}`,
'Content-Encoding': 'gzip',
'Content-Type': 'application/json',
};

const response = await axios.put(Uploader.SONDEHUB_AMATEUR_URL, compressedPayload, {
headers,
timeout: this.uploaderConfig.uploadTimeout,
});

if (response.status === 200) {
this.logInfo(`Uploaded ${packets.length} telemetry packets.`);
} else {
this.logError(`Failed to upload telemetry. Status: ${response.status}, Message: ${response.statusText}`);
}
} catch (error) {
console.log(error);
this.logError(`Error uploading telemetry: ${error}`);
}
}

private enhanceTelemetryPacket(packet: TelemetryPacket): TelemetryPacket {
const enhancedPacket = { ...packet };
enhancedPacket.software_name = this.uploaderConfig.software_name;
Expand All @@ -183,4 +240,27 @@ export class Uploader {

return enhancedPacket;
}

private compress(data: JSONValue | TelemetryPacket[] | StationPositionPacket): Promise<Buffer> {
return new Promise((resolve) => {
gzip(JSON.stringify(data), (error, compressedData) => {
if (error) {
this.logError(error.message);
}
resolve(compressedData);
});
});
}

private logDebug(message: string): void {
console.debug(`Sondehub Uploader: ${message}`);
}

private logInfo(message: string): void {
console.info(`Sondehub Uploader: ${message}`);
}

private logError(message: string): void {
console.error(`Sondehub Uploader: ${message}`);
}
}

0 comments on commit d3e3d57

Please sign in to comment.