Skip to content

Commit

Permalink
add triplestore example
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Dec 3, 2023
1 parent b6b87cb commit 634fe27
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 3 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
workspaces: >
. -> target
examples/actix-kv -> target
examples/actix-triplestore -> target
- name: Build
run: cargo build -v
- name: Format
Expand Down
5 changes: 2 additions & 3 deletions examples/actix-kv/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use actix_web::{
App, HttpResponse, HttpServer,
};
use error::MyResult;
use log::info;
use lsm_tree::{Config, Tree};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
Expand Down Expand Up @@ -137,13 +136,13 @@ async fn main() -> std::io::Result<()> {
let port = port.parse::<u16>().expect("invalid port");

let data_folder = std::env::var("DATA_FOLDER").unwrap_or(".data".into());
info!("Opening database at {data_folder}");
log::info!("Opening database at {data_folder}");
let db = Config::new(&data_folder)
.block_cache_size(25_600) // 100 MB
.open()
.expect("failed to open db");

info!("Starting on port {port}");
log::info!("Starting on port {port}");

HttpServer::new(move || {
App::new()
Expand Down
3 changes: 3 additions & 0 deletions examples/actix-triplestore/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/target
.data

14 changes: 14 additions & 0 deletions examples/actix-triplestore/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "actix-triplestore"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
actix-web = "4"
log = { version = "0.4", features = ["release_max_level_info"] }
env_logger = "0.10.0"
lsm_tree = { path = "../../" }
serde = { version = "1.0.164", features = ["derive"] }
serde_json = "1.0.99"
37 changes: 37 additions & 0 deletions examples/actix-triplestore/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# actix-triplestore

This example uses `lsm-tree`, `actix_web` and `serde_json` to provide a simple triplestore with a JSON REST API.

## REST API

### `POST /{subject}`

Upserts a subject.

```json
{
"item": {
"name": "John"
}
}
```

### `POST /{subject}/{verb}/{object}`

Upserts a relation between a subject and an object.

```json
{
"item": {
"created_at": 12345
}
}
```

### `GET /{subject}`

Returns a subject if it exists.

### `GET /{subject}/{verb}?limit=1234`

Returns a list of edges, their data and the corresponding nodes.
24 changes: 24 additions & 0 deletions examples/actix-triplestore/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use actix_web::{HttpResponse, ResponseError};

#[derive(Debug)]
pub struct MyError(lsm_tree::Error);

impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.0)
}
}

impl ResponseError for MyError {
fn error_response(&self) -> HttpResponse {
HttpResponse::InternalServerError().body("Internal Server Error")
}
}

impl From<lsm_tree::Error> for MyError {
fn from(value: lsm_tree::Error) -> Self {
Self(value)
}
}

pub type MyResult<T> = Result<T, MyError>;
196 changes: 196 additions & 0 deletions examples/actix-triplestore/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
mod error;

use actix_web::{
get,
middleware::Logger,
post,
web::{self},
App, HttpResponse, HttpServer,
};
use error::MyResult;
use lsm_tree::{Config, Tree};
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Deserialize)]
struct PrefixQueryParams {
limit: Option<u32>,
}

// This struct represents state
struct AppState {
db: Tree,
}

#[derive(Serialize, Deserialize)]
struct InsertBody {
item: Value,
}

#[derive(Serialize, Deserialize)]
struct BulkBody {
upsert: Option<Vec<(String, String, String, Value)>>,
remove: Option<Vec<(String, String, String)>>,
}

#[post("/{subject}")]
async fn insert_subject(
data: web::Data<AppState>,
body: web::Json<InsertBody>,
path: web::Path<String>,
) -> MyResult<HttpResponse> {
eprintln!("INSERT SUBJECT");

let before = std::time::Instant::now();

let subject_key = path.into_inner();

data.db.insert(
format!("s:{subject_key}"),
serde_json::to_string(&body.item).unwrap(),
)?;

data.db.flush()?;

Ok(HttpResponse::Ok()
.append_header(("x-took-ms", before.elapsed().as_millis().to_string()))
.body("OK"))
}

#[post("/{subject}/{verb}/{object}")]
async fn insert_relation(
data: web::Data<AppState>,
body: web::Json<InsertBody>,
path: web::Path<(String, String, String)>,
) -> MyResult<HttpResponse> {
eprintln!("INSERT RELATION");

let before = std::time::Instant::now();

let (subject_key, verb_key, object_key) = path.into_inner();

data.db.insert(
format!("v:s:{subject_key}:v:{verb_key}:o:{object_key}"),
serde_json::to_string(&body.item).unwrap(),
)?;

data.db.flush()?;

Ok(HttpResponse::Ok()
.append_header(("x-took-ms", before.elapsed().as_millis().to_string()))
.body("OK"))
}

#[get("/{subject}")]
async fn get_subject(path: web::Path<String>, data: web::Data<AppState>) -> MyResult<HttpResponse> {
let before = std::time::Instant::now();

let subject_key = path.into_inner();
let key = format!("s:{subject_key}");

match data.db.get(key)? {
Some(item) => Ok(HttpResponse::Ok()
.append_header(("x-took-ms", before.elapsed().as_millis().to_string()))
.content_type("application/json; utf-8")
.body(item.value)),
None => Ok(HttpResponse::NotFound()
.append_header(("x-took-ms", before.elapsed().as_millis().to_string()))
.content_type("text:html; utf-8")
.body("Not found")),
}
}

#[get("/{subject}/{verb}")]
async fn list_by_verb(
path: web::Path<(String, String)>,
data: web::Data<AppState>,
query: web::Query<PrefixQueryParams>,
) -> MyResult<HttpResponse> {
let (subject, verb) = path.into_inner();

let before = std::time::Instant::now();

let all = data
.db
.prefix(format!("v:s:{subject}:v:{verb}:"))?
.into_iter()
.take(query.limit.unwrap_or(10_000) as usize)
.collect::<Vec<_>>();

// let mut joined = String::with_capacity(100);

let count = all.len();

let mut edges = vec![];

for item in all {
let item = item?;

let key = std::str::from_utf8(&item.key).unwrap();
let verb_key = key.split(':').nth(4).unwrap();
let object_key = key.split(':').nth(6).unwrap();
let relation_data =
serde_json::from_str::<serde_json::Value>(std::str::from_utf8(&item.value).unwrap())
.unwrap();

let object_data = data
.db
.get(format!("s:{object_key}"))?
.map(|x| {
serde_json::from_str::<serde_json::Value>(std::str::from_utf8(&x.value).unwrap())
.unwrap()
})
.unwrap_or(serde_json::Value::Null);

edges.push(serde_json::json!({
"key": verb_key,
"data": relation_data,
"node": {
"key": object_key,
"data": object_data,
},
}));
}

let body = serde_json::json!({
"edges": edges,
});

Ok(HttpResponse::Ok()
.append_header(("x-count", count.to_string()))
.append_header(("x-took-ms", before.elapsed().as_millis().to_string()))
.content_type("application/json; utf-8")
.body(serde_json::to_string(&body).unwrap()))
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
env_logger::Builder::new()
.filter_level(log::LevelFilter::Info)
.init();

let port = std::env::var("PORT").unwrap_or("8000".into());
let port = port.parse::<u16>().expect("invalid port");

let data_folder = std::env::var("DATA_FOLDER").unwrap_or(".data".into());
log::info!("Opening database at {data_folder}");
let db = Config::new(&data_folder)
.block_cache_size(25_600) // 100 MB
.open()
.expect("failed to open db");

log::info!("Starting on port {port}");

HttpServer::new(move || {
App::new()
.wrap(Logger::new("%r %s - %{User-Agent}i"))
.app_data(web::Data::new(AppState { db: db.clone() }))
.service(insert_subject)
.service(insert_relation)
.service(get_subject)
.service(list_by_verb)
})
.bind(("127.0.0.1", port))?
.run()
.await
}

0 comments on commit 634fe27

Please sign in to comment.