Skip to content

Commit

Permalink
feat: LazyResultSet implemenation, allowing one-by-one parsing (#14)
Browse files Browse the repository at this point in the history
* no lock

* Works now

* Works, added example, fixed docs

* Move to use try_fold

* Remove time testing
  • Loading branch information
EmilyMatt authored Jun 9, 2024
1 parent 840257f commit 8fbb689
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 124 deletions.
46 changes: 46 additions & 0 deletions examples/basic_usage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright FalkorDB Ltd. 2023 - present
* Licensed under the Server Side Public License v1 (SSPLv1).
*/

use falkordb::{FalkorClientBuilder, FalkorResult};

fn main() -> FalkorResult<()> {
let client = FalkorClientBuilder::new()
.with_connection_info("falkor://127.0.0.1:6379".try_into()?)
.build()?;

// Dataset is available in the 'resources' directory
let mut graph = client.select_graph("imdb");

let mut cloned_graph = client.copy_graph("imdb", "imdb_clone")?;

let mut res = graph.query("MATCH (a:actor) return a").execute()?;
let mut clone_graph_res = cloned_graph.query("MATCH (a:actor) return a").execute()?;

// Parses them one by one, to avoid unneeded performance hits
assert_eq!(res.data.len(), 1317);
assert_eq!(clone_graph_res.data.len(), 1317);
if let (Some(orig), Some(cloned)) = (res.data.next(), clone_graph_res.data.next()) {
println!("Original one: {orig:?}, Cloned one: {cloned:?}");
assert_eq!(orig, cloned);
}

// We have already parsed one result
assert_eq!(res.data.len(), 1316);
assert_eq!(clone_graph_res.data.len(), 1316);

// more iterator usage:
for (orig, cloned) in res.data.zip(clone_graph_res.data) {
println!("Original one: {orig:?}, Cloned one: {cloned:?}");
assert_eq!(orig, cloned);
}

cloned_graph.delete()?;

let res_again = graph.query("MATCH (a:actor) return a").execute()?;
let as_vec = res_again.data.collect::<Vec<_>>();
assert_eq!(as_vec.len(), 1317);

Ok(())
}
6 changes: 4 additions & 2 deletions src/client/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ mod tests {
.execute()
.expect("Could not get actors from unmodified graph");

assert_eq!(res.data.len(), 1317);
assert_eq!(res.data.collect::<Vec<_>>().len(), 1317);
}

#[test]
Expand All @@ -374,12 +374,14 @@ mod tests {
.query("MATCH (a:actor) RETURN a")
.execute()
.expect("Could not get actors from unmodified graph")
.data,
.data
.collect::<Vec<_>>(),
original_graph
.query("MATCH (a:actor) RETURN a")
.execute()
.expect("Could not get actors from unmodified graph")
.data
.collect::<Vec<_>>()
)
}

Expand Down
63 changes: 29 additions & 34 deletions src/graph/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

use crate::{
client::blocking::FalkorSyncClientInner, Constraint, ConstraintType, EntityType, ExecutionPlan,
FalkorIndex, FalkorResponse, FalkorResult, FalkorValue, GraphSchema, IndexType,
ProcedureQueryBuilder, QueryBuilder, ResultSet, SlowlogEntry,
FalkorIndex, FalkorResponse, FalkorResult, FalkorValue, GraphSchema, IndexType, LazyResultSet,
ProcedureQueryBuilder, QueryBuilder, SlowlogEntry,
};
use std::{collections::HashMap, fmt::Display, sync::Arc};

Expand Down Expand Up @@ -78,7 +78,7 @@ impl SyncGraph {
}

/// Creates a [`QueryBuilder`] for this graph, in an attempt to profile a specific query
/// This [`QueryBuilder`] has to be dropped or ran using [`QueryBuilder::perform`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists
/// This [`QueryBuilder`] has to be dropped or ran using [`QueryBuilder::execute`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists
///
/// # Arguments
/// * `query_string`: The query to profile
Expand All @@ -88,12 +88,12 @@ impl SyncGraph {
pub fn profile<'a>(
&'a mut self,
query_string: &'a str,
) -> QueryBuilder<ExecutionPlan> {
) -> QueryBuilder<ExecutionPlan, &str> {
QueryBuilder::<'a>::new(self, "GRAPH.PROFILE", query_string)
}

/// Creates a [`QueryBuilder`] for this graph, in an attempt to explain a specific query
/// This [`QueryBuilder`] has to be dropped or ran using [`QueryBuilder::perform`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists
/// This [`QueryBuilder`] has to be dropped or ran using [`QueryBuilder::execute`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists
///
/// # Arguments
/// * `query_string`: The query to explain the process for
Expand All @@ -103,27 +103,27 @@ impl SyncGraph {
pub fn explain<'a>(
&'a mut self,
query_string: &'a str,
) -> QueryBuilder<ExecutionPlan> {
) -> QueryBuilder<ExecutionPlan, &str> {
QueryBuilder::new(self, "GRAPH.EXPLAIN", query_string)
}

/// Creates a [`QueryBuilder`] for this graph
/// This [`QueryBuilder`] has to be dropped or ran using [`QueryBuilder::perform`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists
/// This [`QueryBuilder`] has to be dropped or ran using [`QueryBuilder::execute`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists
///
/// # Arguments
/// * `query_string`: The query to run
///
/// # Returns
/// A [`QueryBuilder`] object, which when performed will return a [`FalkorResponse<FalkorResultSet>`]
pub fn query<'a>(
&'a mut self,
query_string: &'a str,
) -> QueryBuilder<FalkorResponse<ResultSet>> {
pub fn query<T: Display>(
&mut self,
query_string: T,
) -> QueryBuilder<FalkorResponse<LazyResultSet>, T> {
QueryBuilder::new(self, "GRAPH.QUERY", query_string)
}

/// Creates a [`QueryBuilder`] for this graph, for a readonly query
/// This [`QueryBuilder`] has to be dropped or ran using [`QueryBuilder::perform`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists
/// This [`QueryBuilder`] has to be dropped or ran using [`QueryBuilder::execute`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists
/// Read-only queries are more limited with the operations they are allowed to perform.
///
/// # Arguments
Expand All @@ -134,12 +134,12 @@ impl SyncGraph {
pub fn ro_query<'a>(
&'a mut self,
query_string: &'a str,
) -> QueryBuilder<FalkorResponse<ResultSet>> {
) -> QueryBuilder<FalkorResponse<LazyResultSet>, &str> {
QueryBuilder::new(self, "GRAPH.QUERY_RO", query_string)
}

/// Creates a [`ProcedureQueryBuilder`] for this graph
/// This [`ProcedureQueryBuilder`] has to be dropped or ran using [`ProcedureQueryBuilder::perform`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists
/// This [`ProcedureQueryBuilder`] has to be dropped or ran using [`ProcedureQueryBuilder::execute`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists
/// Read-only queries are more limited with the operations they are allowed to perform.
///
/// # Arguments
Expand All @@ -155,7 +155,7 @@ impl SyncGraph {
}

/// Creates a [`ProcedureQueryBuilder`] for this graph, for a readonly procedure
/// This [`ProcedureQueryBuilder`] has to be dropped or ran using [`ProcedureQueryBuilder::perform`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists
/// This [`ProcedureQueryBuilder`] has to be dropped or ran using [`ProcedureQueryBuilder::execute`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists
/// Read-only procedures are more limited with the operations they are allowed to perform.
///
/// # Arguments
Expand Down Expand Up @@ -188,15 +188,15 @@ impl SyncGraph {
/// * `options`:
///
/// # Returns
/// A [`ResultSet`] containing information on the created index
/// A [`LazyResultSet`] containing information on the created index
pub fn create_index<P: Display>(
&mut self,
index_field_type: IndexType,
entity_type: EntityType,
label: &str,
properties: &[P],
options: Option<&HashMap<String, String>>,
) -> FalkorResult<FalkorResponse<ResultSet>> {
) -> FalkorResult<FalkorResponse<LazyResultSet>> {
// Create index from these properties
let properties_string = properties
.iter()
Expand Down Expand Up @@ -226,14 +226,12 @@ impl SyncGraph {
.map(|options_string| format!(" OPTIONS {{ {} }}", options_string))
.unwrap_or_default();

self.query(
format!(
"CREATE {idx_type}INDEX FOR {pattern} ON ({}){}",
properties_string, options_string
)
.as_str(),
)
.execute()
let query_str = format!(
"CREATE {idx_type}INDEX FOR {pattern} ON ({}){}",
properties_string, options_string
);
QueryBuilder::<FalkorResponse<LazyResultSet>, String>::new(self, "GRAPH.QUERY", query_str)
.execute()
}

/// Drop an existing index, by specifying its type, entity, label and specific properties
Expand All @@ -246,7 +244,7 @@ impl SyncGraph {
entity_type: EntityType,
label: L,
properties: &[P],
) -> FalkorResult<FalkorResponse<ResultSet>> {
) -> FalkorResult<FalkorResponse<LazyResultSet>> {
let properties_string = properties
.iter()
.map(|element| format!("e.{}", element.to_string()))
Expand All @@ -265,14 +263,11 @@ impl SyncGraph {
}
.to_string();

self.query(
format!(
"DROP {idx_type} INDEX for {pattern} ON ({})",
properties_string
)
.as_str(),
)
.execute()
let query_str = format!(
"DROP {idx_type} INDEX for {pattern} ON ({})",
properties_string
);
self.query(query_str).execute()
}

/// Calls the DB.CONSTRAINTS procedure on the graph, returning an array of the graph's constraints
Expand Down
38 changes: 23 additions & 15 deletions src/graph/query_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
*/

use crate::{
connection::blocking::BorrowedSyncConnection, parser::utils::parse_result_set, Constraint,
ExecutionPlan, FalkorDBError, FalkorIndex, FalkorParsable, FalkorResponse, FalkorResult,
FalkorValue, ResultSet, SyncGraph,
connection::blocking::BorrowedSyncConnection, Constraint, ExecutionPlan, FalkorDBError,
FalkorIndex, FalkorParsable, FalkorResponse, FalkorResult, FalkorValue, LazyResultSet,
SyncGraph,
};
use std::{collections::HashMap, fmt::Display, marker::PhantomData, ops::Not};

Expand All @@ -32,20 +32,20 @@ pub(crate) fn construct_query<Q: Display, T: Display, Z: Display>(
}

/// A Builder-pattern struct that allows creating and executing queries on a graph
pub struct QueryBuilder<'a, Output> {
pub struct QueryBuilder<'a, Output, T: Display> {
_unused: PhantomData<Output>,
graph: &'a mut SyncGraph,
command: &'a str,
query_string: &'a str,
query_string: T,
params: Option<&'a HashMap<String, String>>,
timeout: Option<i64>,
}

impl<'a, Output> QueryBuilder<'a, Output> {
impl<'a, Output, T: Display> QueryBuilder<'a, Output, T> {
pub(crate) fn new(
graph: &'a mut SyncGraph,
command: &'a str,
query_string: &'a str,
query_string: T,
) -> Self {
Self {
_unused: PhantomData,
Expand Down Expand Up @@ -90,7 +90,7 @@ impl<'a, Output> QueryBuilder<'a, Output> {
.graph
.client
.borrow_connection(self.graph.client.clone())?;
let query = construct_query(self.query_string, self.params);
let query = construct_query(&self.query_string, self.params);

let timeout = self.timeout.map(|timeout| format!("timeout {timeout}"));
let mut params = vec![query.as_str(), "--compact"];
Expand All @@ -105,9 +105,9 @@ impl<'a, Output> QueryBuilder<'a, Output> {
}
}

impl<'a> QueryBuilder<'a, FalkorResponse<ResultSet>> {
/// Executes the query, retuning a [`FalkorResponse`], with a [`ResultSet`] as its `data` member
pub fn execute(mut self) -> FalkorResult<FalkorResponse<ResultSet>> {
impl<'a, T: Display> QueryBuilder<'a, FalkorResponse<LazyResultSet<'a>>, T> {
/// Executes the query, retuning a [`FalkorResponse`], with a [`LazyResultSet`] as its `data` member
pub fn execute(mut self) -> FalkorResult<FalkorResponse<LazyResultSet<'a>>> {
let res = self.common_execute_steps()?.into_vec()?;

match res.len() {
Expand All @@ -118,7 +118,11 @@ impl<'a> QueryBuilder<'a, FalkorResponse<ResultSet>> {
),
)?;

FalkorResponse::from_response(None, Vec::default(), stats)
FalkorResponse::from_response(
None,
LazyResultSet::new(Default::default(), &mut self.graph.graph_schema),
stats,
)
}
2 => {
let [header, stats]: [FalkorValue; 2] = res.try_into().map_err(|_| {
Expand All @@ -127,7 +131,11 @@ impl<'a> QueryBuilder<'a, FalkorResponse<ResultSet>> {
)
})?;

FalkorResponse::from_response(Some(header), Vec::default(), stats)
FalkorResponse::from_response(
Some(header),
LazyResultSet::new(Default::default(), &mut self.graph.graph_schema),
stats,
)
}
3 => {
let [header, data, stats]: [FalkorValue; 3] = res.try_into().map_err(|_| {
Expand All @@ -138,7 +146,7 @@ impl<'a> QueryBuilder<'a, FalkorResponse<ResultSet>> {

FalkorResponse::from_response(
Some(header),
parse_result_set(data, &mut self.graph.graph_schema)?,
LazyResultSet::new(data.into_vec()?, &mut self.graph.graph_schema),
stats,
)
}
Expand All @@ -149,7 +157,7 @@ impl<'a> QueryBuilder<'a, FalkorResponse<ResultSet>> {
}
}

impl<'a> QueryBuilder<'a, ExecutionPlan> {
impl<'a, T: Display> QueryBuilder<'a, ExecutionPlan, T> {
/// Executes the query, returning an [`ExecutionPlan`] from the data returned
pub fn execute(mut self) -> FalkorResult<ExecutionPlan> {
let res = self.common_execute_steps()?;
Expand Down
20 changes: 10 additions & 10 deletions src/graph_schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,12 @@ impl GraphSchema {
raw_ids: Vec<FalkorValue>,
schema_type: SchemaType,
) -> FalkorResult<Vec<String>> {
let mut out_vec = Vec::with_capacity(raw_ids.len());
for raw_id in raw_ids {
let id = raw_id.to_i64().ok_or(FalkorDBError::ParsingI64)?;
out_vec.push(
match self
let raw_ids_len = raw_ids.len();
raw_ids
.into_iter()
.try_fold(Vec::with_capacity(raw_ids_len), |mut acc, raw_id| {
let id = raw_id.to_i64().ok_or(FalkorDBError::ParsingI64)?;
let value = match self
.get_id_map_by_schema_type(schema_type)
.get(&id)
.cloned()
Expand All @@ -206,11 +207,10 @@ impl GraphSchema {
.ok_or(FalkorDBError::MissingSchemaId(schema_type))?
}
Some(exists) => exists,
},
);
}

Ok(out_vec)
};
acc.push(value);
Ok(acc)
})
}

pub(crate) fn parse_properties_map(
Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/

#![deny(missing_docs)]
#![deny(rustdoc::broken_intra_doc_links)]
#![doc = include_str!("../README.md")]

#[cfg(not(feature = "redis"))]
Expand Down Expand Up @@ -38,8 +39,9 @@ pub use response::{
constraint::{Constraint, ConstraintStatus, ConstraintType},
execution_plan::ExecutionPlan,
index::{FalkorIndex, IndexStatus, IndexType},
lazy_result_set::LazyResultSet,
slowlog_entry::SlowlogEntry,
FalkorResponse, ResultSet,
FalkorResponse,
};
pub use value::{
config::ConfigValue,
Expand Down
Loading

0 comments on commit 8fbb689

Please sign in to comment.