This is an implementation of a Kafka Connect Sink Connector that writes data from kafka to a FalkorDB instance.
The connector reads the data from a Kafka topic and writes it to a FalkorDB instance.
The data is expected to be in JSON format, list of json object for a record, each json in the json array can execute command on different graph.
The format of the json is:
[
{
"graphName": "falkordb",
"command": "GRAPH_QUERY",
"cypherCommand": "CREATE (p:Person {name: $name_param, age: $age_param, location: $location_param}) RETURN p",
"parameters": {
"location_param": "Location 0",
"age_param": 20,
"name_param": "Person 0"
}
},
{
"graphName": "falkordb",
"command": "GRAPH_QUERY",
"cypherCommand": "CREATE (p:Person {name: $name_param, age: $age_param, location: $location_param}) RETURN p",
"parameters": {
"location_param": "Location 1",
"age_param": 21,
"name_param": "Person 1"
}
}
]
You can build it locally (explain below) or download the fat jar from github release
This project use sdkman to manage multiple java versions, you can install it using the following command: curl -s "https://get.sdkman.io" | bash
Once installed you can use sdk env
cmd in the terminal to select the right java sdk for this project.
After that you can build the project using the following command:
./gradlew build
There are a few scripts that can be used to run the example. The scripts are located in the example
directory.
First run the download-kafka.sh
script to download the kafka binaries. Then run the run-kafka.sh
script to start the kafka server.
After kafka is up and running, run the run-connector.sh
script to build the fat jar locally and start the connector.
Then you can run falkordb locally using the docker image docker run -p 6379:6379 -p 3000:3000 -it --rm falkordb/falkordb:latest
.
Lastly run the send-to-topic.sh
script to send the queries to the kafka topic, if you wish to send the content of queries file to the topic use:
cat queries.json | ./send-to-topic.sh
.
the example/connectors folder contains the connector configuration file that can be used to configure the connector.