Skip to content

Commit

Permalink
Merge pull request #31 from FalkorDB/sentinel
Browse files Browse the repository at this point in the history
fix #38 Auto discover masters when using sentinel
  • Loading branch information
gkorland authored Jun 4, 2024
2 parents 157a519 + 5a9cbcd commit b9ca8e4
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 3 deletions.
24 changes: 24 additions & 0 deletions examples/connect.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { FalkorDB } from 'falkordb';

const db = await FalkorDB.connect({
// username: 'myUsername',
// password: 'myPassword',
socket: {
host: 'localhost',
port: 26379
}
})
db.on('error', console.error)

console.log('Connected to FalkorDB')

const graph = db.selectGraph('myGraph')
const result = await graph.query('MATCH (n) RETURN n')

console.log(result)

console.log(await db.list())
console.log(await db.info())
console.log(await db.connection.info())

db.close()
7 changes: 7 additions & 0 deletions src/commands/SENTINEL_MASTER.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export const IS_READ_ONLY = true;

export function transformArguments(dbname: string): Array<string> {
return ['SENTINEL', 'MASTER', dbname];
}

export declare function transformReply(): Array<string>;
7 changes: 7 additions & 0 deletions src/commands/SENTINEL_MASTERS.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export const IS_READ_ONLY = true;

export function transformArguments(): Array<string> {
return ['SENTINEL', 'MASTERS'];
}

export declare function transformReply(): Array<Array<string>>;
8 changes: 7 additions & 1 deletion src/commands/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import * as SLOWLOG from './SLOWLOG';
import * as CONSTRAINT_CREATE from './CONSTRAINT_CREATE';
import * as CONSTRAINT_DROP from './CONSTRAINT_DROP';
import * as COPY from './COPY';
import * as SENTINEL_MASTER from './SENTINEL_MASTER';
import * as SENTINEL_MASTERS from './SENTINEL_MASTERS';

import { RedisCommandArgument, RedisCommandArguments } from '@redis/client/dist/lib/commands';

Expand Down Expand Up @@ -40,7 +42,11 @@ export default {
CONSTRAINT_DROP,
constraintDrop: CONSTRAINT_DROP,
COPY,
copy: COPY
copy: COPY,
SENTINEL_MASTER,
sentinelMaster: SENTINEL_MASTER,
SENTINEL_MASTERS,
sentinelMasters: SENTINEL_MASTERS
};

type QueryParam = null | string | number | boolean | QueryParams | Array<QueryParam>;
Expand Down
80 changes: 78 additions & 2 deletions src/falkordb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ import * as tls from 'tls';
import * as net from 'net';
import { EventEmitter } from 'events';

import { RedisClientOptions, RedisFunctions, RedisScripts, createClient } from 'redis';
import { RedisClientOptions, RedisDefaultModules, RedisFunctions, RedisScripts, createClient } from 'redis';

import Graph, { GraphConnection } from './graph';
import commands from './commands';
import { RedisClientType } from '@redis/client';

type NetSocketOptions = Partial<net.SocketConnectOpts> & {
tls?: false;
};

type TypedRedisClientOptions = RedisClientOptions<{ falkordb: typeof commands }, RedisFunctions, RedisScripts>;
type TypedRedisClientType = RedisClientType<RedisDefaultModules & { falkordb: typeof commands }, RedisFunctions, RedisScripts>

interface TlsSocketOptions extends tls.ConnectionOptions {
tls: true;
}
Expand Down Expand Up @@ -91,17 +95,82 @@ export interface FalkorDBOptions {
clientInfoTag?: string;
}

function extractDetails(masters: Array<Array<string>>) {
const allDetails: Record<string, string>[] = [];
for (const master of masters) {
const details: Record<string, string> = {};
for (let i = 0; i < master.length; i += 2) {
details[master[i]] = master[i + 1];
}
allDetails.push(details);
}
return allDetails;
}

export default class FalkorDB extends EventEmitter {

#client: GraphConnection;
#sentinel?: GraphConnection;

private constructor(client: GraphConnection) {
super();
this.#client = client;
}

private async connectServer(client: TypedRedisClientType, redisOption: TypedRedisClientOptions) {

// If not connected to sentinel, throws an error on missing command
const masters = await client.falkordb.sentinelMasters();
const details = extractDetails(masters);

if (details.length > 1) {
throw new Error('Multiple masters are not supported');
}

// Connect to the server with the details from sentinel
const socketOptions: tls.ConnectionOptions = {
...redisOption.socket,
host: details[0]['ip'] as string,
port: parseInt(details[0]['port'])
};
const serverOptions: TypedRedisClientOptions = {
...redisOption,
socket: socketOptions
};
const realClient = createClient<{ falkordb: typeof commands }, RedisFunctions, RedisScripts>(serverOptions)

// Set original client as sentinel and server client as client
this.#sentinel = client;
this.#client = realClient;

await realClient
.on('error', async err => {

console.debug('Error on server connection', err)

// Disconnect the client to avoid further errors and retries
realClient.disconnect();

// If error occurs on previous server connection, no need to reconnect
if (this.#client !== realClient) {
return;
}

try {
await this.connectServer(client, redisOption)
console.debug('Connected to server')
} catch (e) {
console.debug('Error on server reconnect', e)

// Forward errors if reconnection fails
this.emit('error', err)
}
})
.connect();
}

static async connect(options?: FalkorDBOptions) {
const redisOption = (options ?? {}) as RedisClientOptions<{ falkordb: typeof commands }, RedisFunctions, RedisScripts>;
const redisOption = (options ?? {}) as TypedRedisClientOptions;

// If the URL is provided, and the protocol is `falkor` replaces it with `redis` for the underline redis client
// e.g. falkor://localhost:6379 -> redis://localhost:6379
Expand All @@ -114,12 +183,19 @@ export default class FalkorDB extends EventEmitter {
}

const client = createClient<{ falkordb: typeof commands }, RedisFunctions, RedisScripts>(redisOption)

const falkordb = new FalkorDB(client);

await client
.on('error', err => falkordb.emit('error', err)) // Forward errors
.connect();

try {
await falkordb.connectServer(client, redisOption)
} catch (e) {
console.debug('Error in connecting to sentinel, connecting to server directly');
}

return falkordb
}

Expand Down

0 comments on commit b9ca8e4

Please sign in to comment.