diff --git a/examples/basic_usage.rs b/examples/basic_usage.rs new file mode 100644 index 0000000..3227890 --- /dev/null +++ b/examples/basic_usage.rs @@ -0,0 +1,46 @@ +/* + * Copyright FalkorDB Ltd. 2023 - present + * Licensed under the Server Side Public License v1 (SSPLv1). + */ + +use falkordb::{FalkorClientBuilder, FalkorResult}; + +fn main() -> FalkorResult<()> { + let client = FalkorClientBuilder::new() + .with_connection_info("falkor://127.0.0.1:6379".try_into()?) + .build()?; + + // Dataset is available in the 'resources' directory + let mut graph = client.select_graph("imdb"); + + let mut cloned_graph = client.copy_graph("imdb", "imdb_clone")?; + + let mut res = graph.query("MATCH (a:actor) return a").execute()?; + let mut clone_graph_res = cloned_graph.query("MATCH (a:actor) return a").execute()?; + + // Parses them one by one, to avoid unneeded performance hits + assert_eq!(res.data.len(), 1317); + assert_eq!(clone_graph_res.data.len(), 1317); + if let (Some(orig), Some(cloned)) = (res.data.next(), clone_graph_res.data.next()) { + println!("Original one: {orig:?}, Cloned one: {cloned:?}"); + assert_eq!(orig, cloned); + } + + // We have already parsed one result + assert_eq!(res.data.len(), 1316); + assert_eq!(clone_graph_res.data.len(), 1316); + + // more iterator usage: + for (orig, cloned) in res.data.zip(clone_graph_res.data) { + println!("Original one: {orig:?}, Cloned one: {cloned:?}"); + assert_eq!(orig, cloned); + } + + cloned_graph.delete()?; + + let res_again = graph.query("MATCH (a:actor) return a").execute()?; + let as_vec = res_again.data.collect::>(); + assert_eq!(as_vec.len(), 1317); + + Ok(()) +} diff --git a/src/client/blocking.rs b/src/client/blocking.rs index 70c0ad2..614eb3f 100644 --- a/src/client/blocking.rs +++ b/src/client/blocking.rs @@ -350,7 +350,7 @@ mod tests { .execute() .expect("Could not get actors from unmodified graph"); - assert_eq!(res.data.len(), 1317); + assert_eq!(res.data.collect::>().len(), 1317); } #[test] @@ -374,12 +374,14 @@ mod tests { .query("MATCH (a:actor) RETURN a") .execute() .expect("Could not get actors from unmodified graph") - .data, + .data + .collect::>(), original_graph .query("MATCH (a:actor) RETURN a") .execute() .expect("Could not get actors from unmodified graph") .data + .collect::>() ) } diff --git a/src/graph/blocking.rs b/src/graph/blocking.rs index 48b8548..579809f 100644 --- a/src/graph/blocking.rs +++ b/src/graph/blocking.rs @@ -5,8 +5,8 @@ use crate::{ client::blocking::FalkorSyncClientInner, Constraint, ConstraintType, EntityType, ExecutionPlan, - FalkorIndex, FalkorResponse, FalkorResult, FalkorValue, GraphSchema, IndexType, - ProcedureQueryBuilder, QueryBuilder, ResultSet, SlowlogEntry, + FalkorIndex, FalkorResponse, FalkorResult, FalkorValue, GraphSchema, IndexType, LazyResultSet, + ProcedureQueryBuilder, QueryBuilder, SlowlogEntry, }; use std::{collections::HashMap, fmt::Display, sync::Arc}; @@ -78,7 +78,7 @@ impl SyncGraph { } /// Creates a [`QueryBuilder`] for this graph, in an attempt to profile a specific query - /// This [`QueryBuilder`] has to be dropped or ran using [`QueryBuilder::perform`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists + /// This [`QueryBuilder`] has to be dropped or ran using [`QueryBuilder::execute`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists /// /// # Arguments /// * `query_string`: The query to profile @@ -88,12 +88,12 @@ impl SyncGraph { pub fn profile<'a>( &'a mut self, query_string: &'a str, - ) -> QueryBuilder { + ) -> QueryBuilder { QueryBuilder::<'a>::new(self, "GRAPH.PROFILE", query_string) } /// Creates a [`QueryBuilder`] for this graph, in an attempt to explain a specific query - /// This [`QueryBuilder`] has to be dropped or ran using [`QueryBuilder::perform`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists + /// This [`QueryBuilder`] has to be dropped or ran using [`QueryBuilder::execute`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists /// /// # Arguments /// * `query_string`: The query to explain the process for @@ -103,27 +103,27 @@ impl SyncGraph { pub fn explain<'a>( &'a mut self, query_string: &'a str, - ) -> QueryBuilder { + ) -> QueryBuilder { QueryBuilder::new(self, "GRAPH.EXPLAIN", query_string) } /// Creates a [`QueryBuilder`] for this graph - /// This [`QueryBuilder`] has to be dropped or ran using [`QueryBuilder::perform`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists + /// This [`QueryBuilder`] has to be dropped or ran using [`QueryBuilder::execute`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists /// /// # Arguments /// * `query_string`: The query to run /// /// # Returns /// A [`QueryBuilder`] object, which when performed will return a [`FalkorResponse`] - pub fn query<'a>( - &'a mut self, - query_string: &'a str, - ) -> QueryBuilder> { + pub fn query( + &mut self, + query_string: T, + ) -> QueryBuilder, T> { QueryBuilder::new(self, "GRAPH.QUERY", query_string) } /// Creates a [`QueryBuilder`] for this graph, for a readonly query - /// This [`QueryBuilder`] has to be dropped or ran using [`QueryBuilder::perform`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists + /// This [`QueryBuilder`] has to be dropped or ran using [`QueryBuilder::execute`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists /// Read-only queries are more limited with the operations they are allowed to perform. /// /// # Arguments @@ -134,12 +134,12 @@ impl SyncGraph { pub fn ro_query<'a>( &'a mut self, query_string: &'a str, - ) -> QueryBuilder> { + ) -> QueryBuilder, &str> { QueryBuilder::new(self, "GRAPH.QUERY_RO", query_string) } /// Creates a [`ProcedureQueryBuilder`] for this graph - /// This [`ProcedureQueryBuilder`] has to be dropped or ran using [`ProcedureQueryBuilder::perform`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists + /// This [`ProcedureQueryBuilder`] has to be dropped or ran using [`ProcedureQueryBuilder::execute`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists /// Read-only queries are more limited with the operations they are allowed to perform. /// /// # Arguments @@ -155,7 +155,7 @@ impl SyncGraph { } /// Creates a [`ProcedureQueryBuilder`] for this graph, for a readonly procedure - /// This [`ProcedureQueryBuilder`] has to be dropped or ran using [`ProcedureQueryBuilder::perform`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists + /// This [`ProcedureQueryBuilder`] has to be dropped or ran using [`ProcedureQueryBuilder::execute`], before reusing the graph, as it takes a mutable reference to the graph for as long as it exists /// Read-only procedures are more limited with the operations they are allowed to perform. /// /// # Arguments @@ -188,7 +188,7 @@ impl SyncGraph { /// * `options`: /// /// # Returns - /// A [`ResultSet`] containing information on the created index + /// A [`LazyResultSet`] containing information on the created index pub fn create_index( &mut self, index_field_type: IndexType, @@ -196,7 +196,7 @@ impl SyncGraph { label: &str, properties: &[P], options: Option<&HashMap>, - ) -> FalkorResult> { + ) -> FalkorResult> { // Create index from these properties let properties_string = properties .iter() @@ -226,14 +226,12 @@ impl SyncGraph { .map(|options_string| format!(" OPTIONS {{ {} }}", options_string)) .unwrap_or_default(); - self.query( - format!( - "CREATE {idx_type}INDEX FOR {pattern} ON ({}){}", - properties_string, options_string - ) - .as_str(), - ) - .execute() + let query_str = format!( + "CREATE {idx_type}INDEX FOR {pattern} ON ({}){}", + properties_string, options_string + ); + QueryBuilder::, String>::new(self, "GRAPH.QUERY", query_str) + .execute() } /// Drop an existing index, by specifying its type, entity, label and specific properties @@ -246,7 +244,7 @@ impl SyncGraph { entity_type: EntityType, label: L, properties: &[P], - ) -> FalkorResult> { + ) -> FalkorResult> { let properties_string = properties .iter() .map(|element| format!("e.{}", element.to_string())) @@ -265,14 +263,11 @@ impl SyncGraph { } .to_string(); - self.query( - format!( - "DROP {idx_type} INDEX for {pattern} ON ({})", - properties_string - ) - .as_str(), - ) - .execute() + let query_str = format!( + "DROP {idx_type} INDEX for {pattern} ON ({})", + properties_string + ); + self.query(query_str).execute() } /// Calls the DB.CONSTRAINTS procedure on the graph, returning an array of the graph's constraints diff --git a/src/graph/query_builder.rs b/src/graph/query_builder.rs index 96a6e21..a8cf51a 100644 --- a/src/graph/query_builder.rs +++ b/src/graph/query_builder.rs @@ -4,9 +4,9 @@ */ use crate::{ - connection::blocking::BorrowedSyncConnection, parser::utils::parse_result_set, Constraint, - ExecutionPlan, FalkorDBError, FalkorIndex, FalkorParsable, FalkorResponse, FalkorResult, - FalkorValue, ResultSet, SyncGraph, + connection::blocking::BorrowedSyncConnection, Constraint, ExecutionPlan, FalkorDBError, + FalkorIndex, FalkorParsable, FalkorResponse, FalkorResult, FalkorValue, LazyResultSet, + SyncGraph, }; use std::{collections::HashMap, fmt::Display, marker::PhantomData, ops::Not}; @@ -32,20 +32,20 @@ pub(crate) fn construct_query( } /// A Builder-pattern struct that allows creating and executing queries on a graph -pub struct QueryBuilder<'a, Output> { +pub struct QueryBuilder<'a, Output, T: Display> { _unused: PhantomData, graph: &'a mut SyncGraph, command: &'a str, - query_string: &'a str, + query_string: T, params: Option<&'a HashMap>, timeout: Option, } -impl<'a, Output> QueryBuilder<'a, Output> { +impl<'a, Output, T: Display> QueryBuilder<'a, Output, T> { pub(crate) fn new( graph: &'a mut SyncGraph, command: &'a str, - query_string: &'a str, + query_string: T, ) -> Self { Self { _unused: PhantomData, @@ -90,7 +90,7 @@ impl<'a, Output> QueryBuilder<'a, Output> { .graph .client .borrow_connection(self.graph.client.clone())?; - let query = construct_query(self.query_string, self.params); + let query = construct_query(&self.query_string, self.params); let timeout = self.timeout.map(|timeout| format!("timeout {timeout}")); let mut params = vec![query.as_str(), "--compact"]; @@ -105,9 +105,9 @@ impl<'a, Output> QueryBuilder<'a, Output> { } } -impl<'a> QueryBuilder<'a, FalkorResponse> { - /// Executes the query, retuning a [`FalkorResponse`], with a [`ResultSet`] as its `data` member - pub fn execute(mut self) -> FalkorResult> { +impl<'a, T: Display> QueryBuilder<'a, FalkorResponse>, T> { + /// Executes the query, retuning a [`FalkorResponse`], with a [`LazyResultSet`] as its `data` member + pub fn execute(mut self) -> FalkorResult>> { let res = self.common_execute_steps()?.into_vec()?; match res.len() { @@ -118,7 +118,11 @@ impl<'a> QueryBuilder<'a, FalkorResponse> { ), )?; - FalkorResponse::from_response(None, Vec::default(), stats) + FalkorResponse::from_response( + None, + LazyResultSet::new(Default::default(), &mut self.graph.graph_schema), + stats, + ) } 2 => { let [header, stats]: [FalkorValue; 2] = res.try_into().map_err(|_| { @@ -127,7 +131,11 @@ impl<'a> QueryBuilder<'a, FalkorResponse> { ) })?; - FalkorResponse::from_response(Some(header), Vec::default(), stats) + FalkorResponse::from_response( + Some(header), + LazyResultSet::new(Default::default(), &mut self.graph.graph_schema), + stats, + ) } 3 => { let [header, data, stats]: [FalkorValue; 3] = res.try_into().map_err(|_| { @@ -138,7 +146,7 @@ impl<'a> QueryBuilder<'a, FalkorResponse> { FalkorResponse::from_response( Some(header), - parse_result_set(data, &mut self.graph.graph_schema)?, + LazyResultSet::new(data.into_vec()?, &mut self.graph.graph_schema), stats, ) } @@ -149,7 +157,7 @@ impl<'a> QueryBuilder<'a, FalkorResponse> { } } -impl<'a> QueryBuilder<'a, ExecutionPlan> { +impl<'a, T: Display> QueryBuilder<'a, ExecutionPlan, T> { /// Executes the query, returning an [`ExecutionPlan`] from the data returned pub fn execute(mut self) -> FalkorResult { let res = self.common_execute_steps()?; diff --git a/src/graph_schema/mod.rs b/src/graph_schema/mod.rs index 82950cf..7c618a3 100644 --- a/src/graph_schema/mod.rs +++ b/src/graph_schema/mod.rs @@ -189,11 +189,12 @@ impl GraphSchema { raw_ids: Vec, schema_type: SchemaType, ) -> FalkorResult> { - let mut out_vec = Vec::with_capacity(raw_ids.len()); - for raw_id in raw_ids { - let id = raw_id.to_i64().ok_or(FalkorDBError::ParsingI64)?; - out_vec.push( - match self + let raw_ids_len = raw_ids.len(); + raw_ids + .into_iter() + .try_fold(Vec::with_capacity(raw_ids_len), |mut acc, raw_id| { + let id = raw_id.to_i64().ok_or(FalkorDBError::ParsingI64)?; + let value = match self .get_id_map_by_schema_type(schema_type) .get(&id) .cloned() @@ -206,11 +207,10 @@ impl GraphSchema { .ok_or(FalkorDBError::MissingSchemaId(schema_type))? } Some(exists) => exists, - }, - ); - } - - Ok(out_vec) + }; + acc.push(value); + Ok(acc) + }) } pub(crate) fn parse_properties_map( diff --git a/src/lib.rs b/src/lib.rs index 736d3f7..e4acb0c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ */ #![deny(missing_docs)] +#![deny(rustdoc::broken_intra_doc_links)] #![doc = include_str!("../README.md")] #[cfg(not(feature = "redis"))] @@ -38,8 +39,9 @@ pub use response::{ constraint::{Constraint, ConstraintStatus, ConstraintType}, execution_plan::ExecutionPlan, index::{FalkorIndex, IndexStatus, IndexType}, + lazy_result_set::LazyResultSet, slowlog_entry::SlowlogEntry, - FalkorResponse, ResultSet, + FalkorResponse, }; pub use value::{ config::ConfigValue, diff --git a/src/parser/utils.rs b/src/parser/utils.rs index 0ba5610..f02b6fc 100644 --- a/src/parser/utils.rs +++ b/src/parser/utils.rs @@ -3,9 +3,7 @@ * Licensed under the Server Side Public License v1 (SSPLv1). */ -use crate::{ - value::utils::parse_type, FalkorDBError, FalkorResult, FalkorValue, GraphSchema, ResultSet, -}; +use crate::{FalkorDBError, FalkorResult, FalkorValue}; pub(crate) fn string_vec_from_val(value: FalkorValue) -> FalkorResult> { value.into_vec().map(|value_as_vec| { @@ -18,46 +16,33 @@ pub(crate) fn string_vec_from_val(value: FalkorValue) -> FalkorResult FalkorResult> { let in_vec = header.into_vec()?; + let in_vec_len = in_vec.len(); + in_vec + .into_iter() + .try_fold(Vec::with_capacity(in_vec_len), |mut acc, item| { + let item_vec = item.into_vec()?; - let mut out_vec = Vec::with_capacity(in_vec.len()); - for item in in_vec { - let item_vec = item.into_vec()?; - - out_vec.push( - if item_vec.len() == 2 { - let [_, key]: [FalkorValue; 2] = item_vec.try_into().map_err(|_| { - FalkorDBError::ParsingHeader( - "Could not get 2-sized array despite there being 2 elements".to_string(), - ) - })?; - key - } else { - item_vec - .into_iter() - .next() - .ok_or(FalkorDBError::ParsingHeader( - "Expected at least one item in header vector".to_string(), - ))? - } - .into_string()?, - ) - } - - Ok(out_vec) -} - -pub(crate) fn parse_result_set( - data: FalkorValue, - graph_schema: &mut GraphSchema, -) -> FalkorResult { - let data_vec = data.into_vec()?; - - let mut out_vec = Vec::with_capacity(data_vec.len()); - for column in data_vec { - out_vec.push(parse_type(6, column, graph_schema)?.into_vec()?); - } - - Ok(out_vec) + acc.push( + if item_vec.len() == 2 { + let [_, key]: [FalkorValue; 2] = item_vec.try_into().map_err(|_| { + FalkorDBError::ParsingHeader( + "Could not get 2-sized array despite there being 2 elements" + .to_string(), + ) + })?; + key + } else { + item_vec + .into_iter() + .next() + .ok_or(FalkorDBError::ParsingHeader( + "Expected at least one item in header vector".to_string(), + ))? + } + .into_string()?, + ); + Ok(acc) + }) } #[cfg(test)] diff --git a/src/response/execution_plan.rs b/src/response/execution_plan.rs index 4d3f646..d410f78 100644 --- a/src/response/execution_plan.rs +++ b/src/response/execution_plan.rs @@ -137,18 +137,20 @@ impl ExecutionPlan { .map_err(|_| FalkorDBError::RefCountBooBoo)? .into_inner(); - let mut out_vec = Vec::with_capacity(current_op.children.len()); - for child in current_op.children.into_iter() { - out_vec.push(Self::finalize_operation(child)?); - } - + let children_count = current_op.children.len(); Ok(Rc::new(Operation { name: current_op.name, args: current_op.args, records_produced: current_op.records_produced, execution_time: current_op.execution_time, depth: current_op.depth, - children: out_vec, + children: current_op.children.into_iter().try_fold( + Vec::with_capacity(children_count), + |mut acc, child| { + acc.push(Self::finalize_operation(child)?); + Result::<_, FalkorDBError>::Ok(acc) + }, + )?, })) } diff --git a/src/response/lazy_result_set.rs b/src/response/lazy_result_set.rs new file mode 100644 index 0000000..6f13667 --- /dev/null +++ b/src/response/lazy_result_set.rs @@ -0,0 +1,48 @@ +/* + * Copyright FalkorDB Ltd. 2023 - present + * Licensed under the Server Side Public License v1 (SSPLv1). + */ + +use crate::{value::utils::parse_type, FalkorValue, GraphSchema}; +use std::collections::VecDeque; + +/// A wrapper around the returned raw data, allowing parsing on demand of each result +/// This implements Iterator, so can simply be collect()'ed into any desired container +pub struct LazyResultSet<'a> { + data: VecDeque, + graph_schema: &'a mut GraphSchema, +} + +impl<'a> LazyResultSet<'a> { + pub(crate) fn new( + data: Vec, + graph_schema: &'a mut GraphSchema, + ) -> Self { + Self { + data: data.into(), + graph_schema, + } + } + + /// Returns the remaining rows in the result set. + pub fn len(&self) -> usize { + self.data.len() + } + + /// Returns whether this result set is empty or depleted + pub fn is_empty(&self) -> bool { + self.data.is_empty() + } +} + +impl<'a> Iterator for LazyResultSet<'a> { + type Item = Vec; + + fn next(&mut self) -> Option { + self.data.pop_front().map(|current_result| { + parse_type(6, current_result, self.graph_schema) + .and_then(|parsed_result| parsed_result.into_vec()) + .unwrap_or(vec![FalkorValue::Unparseable]) + }) + } +} diff --git a/src/response/mod.rs b/src/response/mod.rs index 5ac983b..031a2f3 100644 --- a/src/response/mod.rs +++ b/src/response/mod.rs @@ -11,11 +11,9 @@ use crate::{ pub(crate) mod constraint; pub(crate) mod execution_plan; pub(crate) mod index; +pub(crate) mod lazy_result_set; pub(crate) mod slowlog_entry; -/// A [`Vec`], representing a table of other [`Vec`]s, representing columns, containing [`FalkorValue`]s -pub type ResultSet = Vec>; - /// A response struct which also contains the returned header and stats data #[derive(Clone, Debug, Default)] pub struct FalkorResponse { diff --git a/src/value/mod.rs b/src/value/mod.rs index 5e9c5e0..8527b4f 100644 --- a/src/value/mod.rs +++ b/src/value/mod.rs @@ -41,6 +41,8 @@ pub enum FalkorValue { Path(Path), /// A NULL type None, + /// Failed parsing this value + Unparseable, } macro_rules! impl_to_falkordb_value { diff --git a/src/value/utils.rs b/src/value/utils.rs index 2a8e68b..cefd39d 100644 --- a/src/value/utils.rs +++ b/src/value/utils.rs @@ -27,17 +27,15 @@ pub(crate) fn parse_type( 3 => FalkorValue::I64(val.to_i64().ok_or(FalkorDBError::ParsingI64)?), 4 => FalkorValue::Bool(val.to_bool().ok_or(FalkorDBError::ParsingBool)?), 5 => FalkorValue::F64(val.try_into()?), - 6 => FalkorValue::Array({ - let val_vec = val.into_vec()?; - - let mut out_vec = Vec::with_capacity(val_vec.len()); - for item in val_vec { - let (type_marker, val) = type_val_from_value(item)?; - out_vec.push(parse_type(type_marker, val, graph_schema)?) - } - - out_vec - }), + 6 => FalkorValue::Array( + val.into_vec()? + .into_iter() + .flat_map(|item| { + type_val_from_value(item) + .and_then(|(type_marker, val)| parse_type(type_marker, val, graph_schema)) + }) + .collect(), + ), // The following types are sent as an array and require specific parsing functions 7 => FalkorValue::Edge(FalkorParsable::from_falkor_value(val, graph_schema)?), 8 => FalkorValue::Node(FalkorParsable::from_falkor_value(val, graph_schema)?),