-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #242 from FalkorDB/241-sentinel-does-not-close-the…
…-connection-properly add disconnect to clients
- Loading branch information
Showing
8 changed files
with
451 additions
and
371 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,48 +1,76 @@ | ||
import { RedisCommandArgument } from "@redis/client/dist/lib/commands" | ||
import { QueryOptions } from "../commands" | ||
import { ConstraintType, EntityType, GraphReply } from "../graph" | ||
import FalkorDB from "../falkordb" | ||
import { SingleGraphConnection } from "./single" | ||
import { RedisCommandArgument } from "@redis/client/dist/lib/commands"; | ||
import { QueryOptions } from "../commands"; | ||
import { ConstraintType, EntityType, GraphReply } from "../graph"; | ||
import FalkorDB from "../falkordb"; | ||
import { SingleGraphConnection } from "./single"; | ||
|
||
// A generic client interface for Redis clients | ||
export interface Client { | ||
init(falkordb: FalkorDB): Promise<void>; | ||
|
||
init(falkordb: FalkorDB): Promise<void> | ||
list(): Promise<Array<string>>; | ||
|
||
list(): Promise<Array<string>> | ||
configGet( | ||
configKey: string | ||
): Promise<(string | number)[] | (string | number)[][]>; | ||
|
||
configGet(configKey: string): Promise<(string | number)[] | (string | number)[][]> | ||
configSet(configKey: string, value: number | string): Promise<void>; | ||
|
||
configSet(configKey: string, value: number | string): Promise<void> | ||
info(section?: string): Promise<(string | string[])[]>; | ||
|
||
info(section?: string): Promise<(string | string[])[]> | ||
query<T>( | ||
graph: string, | ||
query: RedisCommandArgument, | ||
options?: QueryOptions, | ||
compact?: boolean | ||
): Promise<any>; | ||
|
||
query<T>(graph: string, query: RedisCommandArgument,options?: QueryOptions, compact?: boolean): Promise<any> | ||
profile<T>(graph: string, query: RedisCommandArgument): Promise<any>; | ||
|
||
profile<T>(graph: string, query: RedisCommandArgument): Promise<any> | ||
roQuery<T>( | ||
graph: string, | ||
query: RedisCommandArgument, | ||
options?: QueryOptions, | ||
compact?: boolean | ||
): Promise<any>; | ||
|
||
roQuery<T>(graph: string, query: RedisCommandArgument, options?: QueryOptions, compact?: boolean): Promise<any> | ||
copy<T>(srcGraph: string, destGraph: string): Promise<any>; | ||
|
||
copy<T>(srcGraph: string, destGraph: string): Promise<any> | ||
delete(graph: string): Promise<void>; | ||
|
||
delete(graph: string): Promise<void> | ||
explain(graph: string, query: string): Promise<any>; | ||
|
||
explain(graph: string, query: string): Promise<any> | ||
slowLog(graph: string): Promise< | ||
{ | ||
timestamp: Date; | ||
command: string; | ||
query: string; | ||
took: number; | ||
}[] | ||
>; | ||
|
||
slowLog(graph: string) : Promise<{ | ||
timestamp: Date; | ||
command: string; | ||
query: string; | ||
took: number; | ||
}[]> | ||
constraintCreate( | ||
graph: string, | ||
constraintType: ConstraintType, | ||
entityType: EntityType, | ||
label: string, | ||
...properties: string[] | ||
): Promise<void>; | ||
|
||
constraintCreate(graph: string, constraintType: ConstraintType, entityType: EntityType, | ||
label: string, ...properties: string[]) : Promise<void> | ||
constraintDrop( | ||
graph: string, | ||
constraintType: ConstraintType, | ||
entityType: EntityType, | ||
label: string, | ||
...properties: string[] | ||
): Promise<void>; | ||
|
||
constraintDrop(graph: string, constraintType: ConstraintType, entityType: EntityType, | ||
label: string, ...properties: string[]) : Promise<void> | ||
/** | ||
* @deprecated Use `disconnect` instead | ||
*/ | ||
quit(): Promise<void>; | ||
|
||
quit(): Promise<void> | ||
disconnect(): Promise<void>; | ||
|
||
getConnection(): Promise<SingleGraphConnection>; | ||
getConnection(): Promise<SingleGraphConnection>; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,116 +1,154 @@ | ||
import { Client } from "./client"; | ||
import { ConstraintType, EntityType } from "../graph"; | ||
import { RedisCommandArgument, RedisFunctions, RedisScripts } from "@redis/client/dist/lib/commands"; | ||
import { | ||
RedisCommandArgument, | ||
RedisFunctions, | ||
RedisScripts, | ||
} from "@redis/client/dist/lib/commands"; | ||
import commands, { QueryOptions } from "../commands"; | ||
import { createCluster, RedisClusterType } from "@redis/client"; | ||
import FalkorDB, { TypedRedisClusterClientOptions } from "../falkordb"; | ||
import { SingleGraphConnection } from "./single"; | ||
import { RedisClusterClientOptions } from "@redis/client/dist/lib/cluster"; | ||
import * as lodash from 'lodash' | ||
export type ClusterGraphConnection = RedisClusterType<{ falkordb: typeof commands }, RedisFunctions, RedisScripts>; | ||
import * as lodash from "lodash"; | ||
export type ClusterGraphConnection = RedisClusterType< | ||
{ falkordb: typeof commands }, | ||
RedisFunctions, | ||
RedisScripts | ||
>; | ||
|
||
/** | ||
* A client that connects to a Redis Cluster. | ||
*/ | ||
export class Cluster implements Client { | ||
|
||
#client: ClusterGraphConnection; | ||
|
||
constructor(client: SingleGraphConnection) { | ||
|
||
// Convert the single client options to a cluster client options | ||
const redisClusterOption = client.options as TypedRedisClusterClientOptions; | ||
redisClusterOption.rootNodes = [client.options as RedisClusterClientOptions]; | ||
|
||
// Remove the URL from the defaults so it won't override the dynamic cluster URLs | ||
const defaults = lodash.cloneDeep(client.options); | ||
defaults?.url && delete defaults.url; | ||
|
||
redisClusterOption.defaults = defaults; | ||
redisClusterOption.maxCommandRedirections = 100000; | ||
client.disconnect(); | ||
this.#client = createCluster<{ falkordb: typeof commands }, RedisFunctions, RedisScripts>(redisClusterOption) | ||
} | ||
|
||
async getConnection() { | ||
const connection = this.#client.nodeClient(this.#client.getRandomNode()); | ||
return connection instanceof Promise ? await connection : connection; | ||
} | ||
|
||
async init(falkordb: FalkorDB) { | ||
await this.#client | ||
.on('error', err => falkordb.emit('error', err)) // Forward errors | ||
.connect(); | ||
} | ||
|
||
async query<T>(graph: string, query: RedisCommandArgument, options?: QueryOptions, compact=true) { | ||
return this.#client.falkordb.query(graph, query, options, compact) | ||
} | ||
async roQuery<T>(graph: string, query: RedisCommandArgument, options?: QueryOptions, compact=true) { | ||
return this.#client.falkordb.roQuery(graph, query, options, compact) | ||
} | ||
|
||
async delete(graph: string) { | ||
const reply = this.#client.falkordb.delete(graph) | ||
return reply.then(() => { }) | ||
} | ||
|
||
async explain(graph: string, query: string) { | ||
return this.#client.falkordb.explain(graph, query) | ||
} | ||
|
||
async list(): Promise<Array<string>> { | ||
return this.#client.falkordb.list() | ||
} | ||
|
||
async configGet(configKey: string) { | ||
return this.#client.falkordb.configGet(configKey) | ||
} | ||
|
||
async configSet(configKey: string, value: number | string) { | ||
const reply = this.#client.falkordb.configSet(configKey, value) | ||
return reply.then(() => { }) | ||
} | ||
|
||
async info(section?: string) { | ||
return this.#client.falkordb.info(section) | ||
} | ||
|
||
async copy<T>(srcGraph: string, destGraph: string) { | ||
return this.#client.falkordb.copy(srcGraph, destGraph) | ||
} | ||
|
||
slowLog(graph: string) { | ||
return this.#client.falkordb.slowLog(graph) | ||
} | ||
async constraintCreate(graph: string, constraintType: ConstraintType, entityType: EntityType, label: string, ...properties: string[]) { | ||
const reply = this.#client.falkordb.constraintCreate( | ||
graph, | ||
constraintType, | ||
entityType, | ||
label, | ||
...properties | ||
) | ||
return reply.then(() => { }) | ||
} | ||
|
||
async constraintDrop(graph: string, constraintType: ConstraintType, entityType: EntityType, label: string, ...properties: string[]) { | ||
const reply = this.#client.falkordb.constraintDrop( | ||
graph, | ||
constraintType, | ||
entityType, | ||
label, | ||
...properties | ||
) | ||
return reply.then(() => { }) | ||
} | ||
|
||
async profile<T>(graph: string, query: string) { | ||
return this.#client.falkordb.profile( graph, query) | ||
} | ||
|
||
async quit() { | ||
const reply = this.#client.quit(); | ||
return reply.then(() => {}) | ||
} | ||
} | ||
#client: ClusterGraphConnection; | ||
|
||
constructor(client: SingleGraphConnection) { | ||
// Convert the single client options to a cluster client options | ||
const redisClusterOption = client.options as TypedRedisClusterClientOptions; | ||
redisClusterOption.rootNodes = [ | ||
client.options as RedisClusterClientOptions, | ||
]; | ||
|
||
// Remove the URL from the defaults so it won't override the dynamic cluster URLs | ||
const defaults = lodash.cloneDeep(client.options); | ||
defaults?.url && delete defaults.url; | ||
|
||
redisClusterOption.defaults = defaults; | ||
redisClusterOption.maxCommandRedirections = 100000; | ||
client.disconnect(); | ||
this.#client = createCluster< | ||
{ falkordb: typeof commands }, | ||
RedisFunctions, | ||
RedisScripts | ||
>(redisClusterOption); | ||
} | ||
|
||
async getConnection() { | ||
const connection = this.#client.nodeClient(this.#client.getRandomNode()); | ||
return connection instanceof Promise ? await connection : connection; | ||
} | ||
|
||
async init(falkordb: FalkorDB) { | ||
await this.#client | ||
.on("error", (err) => falkordb.emit("error", err)) // Forward errors | ||
.connect(); | ||
} | ||
|
||
async query<T>( | ||
graph: string, | ||
query: RedisCommandArgument, | ||
options?: QueryOptions, | ||
compact = true | ||
) { | ||
return this.#client.falkordb.query(graph, query, options, compact); | ||
} | ||
async roQuery<T>( | ||
graph: string, | ||
query: RedisCommandArgument, | ||
options?: QueryOptions, | ||
compact = true | ||
) { | ||
return this.#client.falkordb.roQuery(graph, query, options, compact); | ||
} | ||
|
||
async delete(graph: string) { | ||
const reply = this.#client.falkordb.delete(graph); | ||
return reply.then(() => {}); | ||
} | ||
|
||
async explain(graph: string, query: string) { | ||
return this.#client.falkordb.explain(graph, query); | ||
} | ||
|
||
async list(): Promise<Array<string>> { | ||
return this.#client.falkordb.list(); | ||
} | ||
|
||
async configGet(configKey: string) { | ||
return this.#client.falkordb.configGet(configKey); | ||
} | ||
|
||
async configSet(configKey: string, value: number | string) { | ||
const reply = this.#client.falkordb.configSet(configKey, value); | ||
return reply.then(() => {}); | ||
} | ||
|
||
async info(section?: string) { | ||
return this.#client.falkordb.info(section); | ||
} | ||
|
||
async copy<T>(srcGraph: string, destGraph: string) { | ||
return this.#client.falkordb.copy(srcGraph, destGraph); | ||
} | ||
|
||
slowLog(graph: string) { | ||
return this.#client.falkordb.slowLog(graph); | ||
} | ||
async constraintCreate( | ||
graph: string, | ||
constraintType: ConstraintType, | ||
entityType: EntityType, | ||
label: string, | ||
...properties: string[] | ||
) { | ||
const reply = this.#client.falkordb.constraintCreate( | ||
graph, | ||
constraintType, | ||
entityType, | ||
label, | ||
...properties | ||
); | ||
return reply.then(() => {}); | ||
} | ||
|
||
async constraintDrop( | ||
graph: string, | ||
constraintType: ConstraintType, | ||
entityType: EntityType, | ||
label: string, | ||
...properties: string[] | ||
) { | ||
const reply = this.#client.falkordb.constraintDrop( | ||
graph, | ||
constraintType, | ||
entityType, | ||
label, | ||
...properties | ||
); | ||
return reply.then(() => {}); | ||
} | ||
|
||
async profile<T>(graph: string, query: string) { | ||
return this.#client.falkordb.profile(graph, query); | ||
} | ||
|
||
async quit() { | ||
return this.disconnect(); | ||
} | ||
|
||
async disconnect(): Promise<void> { | ||
const reply = this.#client.disconnect(); | ||
return reply.then(() => {}); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.