Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: data frame with lazy relation AltrepDataFrameRelation #960

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
8 changes: 8 additions & 0 deletions R/cpp11.R
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,18 @@ rapi_rel_filter <- function(rel, exprs) {
.Call(`_duckdb_rapi_rel_filter`, rel, exprs)
}

rapi_rel_filter2 <- function(df, con, exprs) {
.Call(`_duckdb_rapi_rel_filter2`, df, con, exprs)
}

rapi_rel_project <- function(rel, exprs) {
.Call(`_duckdb_rapi_rel_project`, rel, exprs)
}

rapi_rel_project2 <- function(df, con, exprs) {
.Call(`_duckdb_rapi_rel_project2`, df, con, exprs)
}

rapi_rel_aggregate <- function(rel, groups, aggregates) {
.Call(`_duckdb_rapi_rel_aggregate`, rel, groups, aggregates)
}
Expand Down
27 changes: 27 additions & 0 deletions R/relational.R
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ rel_project <- function(rel, exprs) {
rethrow_rapi_rel_project(rel, exprs)
}

#' Lazily project a DuckDB relation object
#' @param rel the DuckDB relation object
#' @param exprs a list of DuckDB expressions to project
#' @return the now projected `duckdb_relation` object
#' @noRd
#' @examples
#' con <- DBI::dbConnect(duckdb())
#' rel <- rel_from_df(con, mtcars)
#' rel2 <- rel_project(rel, list(expr_reference("cyl"), expr_reference("disp")))
rel_project2 <- function(df, con, exprs) {
rethrow_rapi_rel_project2(df, con@conn_ref, exprs)
}

#' Lazily filter a DuckDB relation object
#' @param rel the DuckDB relation object
#' @param exprs a list of DuckDB expressions to filter by
Expand All @@ -147,6 +160,20 @@ rel_filter <- function(rel, exprs) {
rethrow_rapi_rel_filter(rel, exprs)
}

#' Lazily filter a DuckDB relation object
#' @param rel the DuckDB relation object
#' @param exprs a list of DuckDB expressions to filter by
#' @return the now filtered `duckdb_relation` object
#' @noRd
#' @examples
#' con <- DBI::dbConnect(duckdb())
#' DBI::dbExecute(con, "CREATE OR REPLACE MACRO gt(a, b) AS a > b")
#' rel <- rel_from_df(con, mtcars)
#' rel2 <- rel_filter(rel, list(expr_function("gt", list(expr_reference("cyl"), expr_constant("6")))))
rel_filter2 <- function(df, con, exprs) {
rethrow_rapi_rel_filter2(df, con@conn_ref, exprs)
}

#' Lazily aggregate a DuckDB relation object
#' @param rel the DuckDB relation object
#' @param groups a list of DuckDB expressions to group by
Expand Down
20 changes: 20 additions & 0 deletions R/rethrow-gen.R
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,15 @@ rethrow_rapi_rel_filter <- function(rel, exprs, call = parent.frame(2)) {
)
}

rethrow_rapi_rel_filter2 <- function(df, con, exprs, call = parent.frame(2)) {
rlang::try_fetch(
rapi_rel_filter2(df, con, exprs),
error = function(e) {
rethrow_error_from_rapi(e, call)
}
)
}

rethrow_rapi_rel_project <- function(rel, exprs, call = parent.frame(2)) {
rlang::try_fetch(
rapi_rel_project(rel, exprs),
Expand All @@ -189,6 +198,15 @@ rethrow_rapi_rel_project <- function(rel, exprs, call = parent.frame(2)) {
)
}

rethrow_rapi_rel_project2 <- function(df, con, exprs, call = parent.frame(2)) {
rlang::try_fetch(
rapi_rel_project2(df, con, exprs),
error = function(e) {
rethrow_error_from_rapi(e, call)
}
)
}

rethrow_rapi_rel_aggregate <- function(rel, groups, aggregates, call = parent.frame(2)) {
rlang::try_fetch(
rapi_rel_aggregate(rel, groups, aggregates),
Expand Down Expand Up @@ -579,7 +597,9 @@ rethrow_restore <- function() {
rethrow_rapi_expr_tostring <<- rapi_expr_tostring
rethrow_rapi_rel_from_df <<- rapi_rel_from_df
rethrow_rapi_rel_filter <<- rapi_rel_filter
rethrow_rapi_rel_filter2 <<- rapi_rel_filter2
rethrow_rapi_rel_project <<- rapi_rel_project
rethrow_rapi_rel_project2 <<- rapi_rel_project2
rethrow_rapi_rel_aggregate <<- rapi_rel_aggregate
rethrow_rapi_rel_order <<- rapi_rel_order
rethrow_rapi_expr_window <<- rapi_expr_window
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ add_library(
register.cpp
relational.cpp
reltoaltrep.cpp
altrepdataframe_relation.cpp
scan.cpp
statement.cpp
transform.cpp
Expand Down
2 changes: 1 addition & 1 deletion src/Makevars
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ include Makevars.duckdb

CXX_STD = CXX17
PKG_CPPFLAGS = -Iinclude -I../inst/include -DDUCKDB_DISABLE_PRINT -DDUCKDB_R_BUILD -DBROTLI_ENCODER_CLEANUP_ON_OOM -Iduckdb/src/include -Iduckdb/third_party/concurrentqueue -Iduckdb/third_party/fast_float -Iduckdb/third_party/fastpforlib -Iduckdb/third_party/fmt/include -Iduckdb/third_party/fsst -Iduckdb/third_party/httplib -Iduckdb/third_party/hyperloglog -Iduckdb/third_party/jaro_winkler -Iduckdb/third_party/jaro_winkler/details -Iduckdb/third_party/libpg_query -Iduckdb/third_party/libpg_query/include -Iduckdb/third_party/lz4 -Iduckdb/third_party/brotli/include -Iduckdb/third_party/brotli/common -Iduckdb/third_party/brotli/dec -Iduckdb/third_party/brotli/enc -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -Iduckdb/third_party/mbedtls/library -Iduckdb/third_party/miniz -Iduckdb/third_party/pcg -Iduckdb/third_party/re2 -Iduckdb/third_party/skiplist -Iduckdb/third_party/tdigest -Iduckdb/third_party/utf8proc -Iduckdb/third_party/utf8proc/include -Iduckdb/third_party/yyjson/include -Iduckdb/third_party/zstd/include -Iduckdb/extension/parquet/include -Iduckdb/third_party/parquet -Iduckdb/third_party/thrift -Iduckdb/third_party/lz4 -Iduckdb/third_party/brotli/include -Iduckdb/third_party/brotli/common -Iduckdb/third_party/brotli/dec -Iduckdb/third_party/brotli/enc -Iduckdb/third_party/snappy -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -Iduckdb/third_party/zstd/include -Iduckdb/extension/core_functions/include -I../inst/include -Iduckdb -DDUCKDB_EXTENSION_PARQUET_LINKED -DDUCKDB_BUILD_LIBRARY -DDUCKDB_EXTENSION_CORE_FUNCTIONS_LINKED -DDUCKDB_BUILD_LIBRARY
OBJECTS=rfuns.o database.o connection.o statement.o register.o relational.o scan.o signal.o transform.o utils.o reltoaltrep.o types.o cpp11.o $(SOURCES)
OBJECTS=rfuns.o database.o connection.o statement.o register.o relational.o scan.o signal.o transform.o utils.o reltoaltrep.o altrepdataframe_relation.o types.o cpp11.o $(SOURCES)
27 changes: 27 additions & 0 deletions src/altrepdataframe_relation.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#include "altrepdataframe_relation.hpp"

namespace duckdb {

AltrepDataFrameRelation::AltrepDataFrameRelation(shared_ptr<Relation> parent)
// TODO: which RelationType should be used?
: Relation(parent->context, RelationType::AGGREGATE_RELATION), parent(std::move(parent)) {
TryBindRelation(columns);
}

const vector<ColumnDefinition> &AltrepDataFrameRelation::Columns() {
return columns;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? Can this still be parent->...() ?

}

string AltrepDataFrameRelation::ToString(idx_t depth) {
return parent->ToString(depth);
}

bool AltrepDataFrameRelation::IsReadOnly() {
return parent->IsReadOnly();
}

unique_ptr<QueryNode> AltrepDataFrameRelation::GetQueryNode() {
return parent->GetQueryNode();
}

}
16 changes: 16 additions & 0 deletions src/cpp11.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,27 @@ extern "C" SEXP _duckdb_rapi_rel_filter(SEXP rel, SEXP exprs) {
END_CPP11
}
// relational.cpp
SEXP rapi_rel_filter2(data_frame df, duckdb::conn_eptr_t con, list exprs);
extern "C" SEXP _duckdb_rapi_rel_filter2(SEXP df, SEXP con, SEXP exprs) {
BEGIN_CPP11
return cpp11::as_sexp(rapi_rel_filter2(cpp11::as_cpp<cpp11::decay_t<data_frame>>(df), cpp11::as_cpp<cpp11::decay_t<duckdb::conn_eptr_t>>(con), cpp11::as_cpp<cpp11::decay_t<list>>(exprs)));
END_CPP11
}
// relational.cpp
SEXP rapi_rel_project(duckdb::rel_extptr_t rel, list exprs);
extern "C" SEXP _duckdb_rapi_rel_project(SEXP rel, SEXP exprs) {
BEGIN_CPP11
return cpp11::as_sexp(rapi_rel_project(cpp11::as_cpp<cpp11::decay_t<duckdb::rel_extptr_t>>(rel), cpp11::as_cpp<cpp11::decay_t<list>>(exprs)));
END_CPP11
}
// relational.cpp
SEXP rapi_rel_project2(data_frame df, duckdb::conn_eptr_t con, list exprs);
extern "C" SEXP _duckdb_rapi_rel_project2(SEXP df, SEXP con, SEXP exprs) {
BEGIN_CPP11
return cpp11::as_sexp(rapi_rel_project2(cpp11::as_cpp<cpp11::decay_t<data_frame>>(df), cpp11::as_cpp<cpp11::decay_t<duckdb::conn_eptr_t>>(con), cpp11::as_cpp<cpp11::decay_t<list>>(exprs)));
END_CPP11
}
// relational.cpp
SEXP rapi_rel_aggregate(duckdb::rel_extptr_t rel, list groups, list aggregates);
extern "C" SEXP _duckdb_rapi_rel_aggregate(SEXP rel, SEXP groups, SEXP aggregates) {
BEGIN_CPP11
Expand Down Expand Up @@ -488,6 +502,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_duckdb_rapi_rel_distinct", (DL_FUNC) &_duckdb_rapi_rel_distinct, 1},
{"_duckdb_rapi_rel_explain", (DL_FUNC) &_duckdb_rapi_rel_explain, 3},
{"_duckdb_rapi_rel_filter", (DL_FUNC) &_duckdb_rapi_rel_filter, 2},
{"_duckdb_rapi_rel_filter2", (DL_FUNC) &_duckdb_rapi_rel_filter2, 3},
{"_duckdb_rapi_rel_from_altrep_df", (DL_FUNC) &_duckdb_rapi_rel_from_altrep_df, 3},
{"_duckdb_rapi_rel_from_df", (DL_FUNC) &_duckdb_rapi_rel_from_df, 3},
{"_duckdb_rapi_rel_from_sql", (DL_FUNC) &_duckdb_rapi_rel_from_sql, 2},
Expand All @@ -499,6 +514,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_duckdb_rapi_rel_names", (DL_FUNC) &_duckdb_rapi_rel_names, 1},
{"_duckdb_rapi_rel_order", (DL_FUNC) &_duckdb_rapi_rel_order, 3},
{"_duckdb_rapi_rel_project", (DL_FUNC) &_duckdb_rapi_rel_project, 2},
{"_duckdb_rapi_rel_project2", (DL_FUNC) &_duckdb_rapi_rel_project2, 3},
{"_duckdb_rapi_rel_set_alias", (DL_FUNC) &_duckdb_rapi_rel_set_alias, 2},
{"_duckdb_rapi_rel_set_diff", (DL_FUNC) &_duckdb_rapi_rel_set_diff, 2},
{"_duckdb_rapi_rel_set_intersect", (DL_FUNC) &_duckdb_rapi_rel_set_intersect, 2},
Expand Down
19 changes: 19 additions & 0 deletions src/include/altrepdataframe_relation.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#include "duckdb/main/relation.hpp"

namespace duckdb {

class AltrepDataFrameRelation final : public Relation {
public:
AltrepDataFrameRelation(shared_ptr<Relation> parent);

shared_ptr<Relation> parent;
vector<ColumnDefinition> columns;
public:
unique_ptr<QueryNode> GetQueryNode() override;

const vector<ColumnDefinition> &Columns() override;
string ToString(idx_t depth) override;
bool IsReadOnly() override;
};

}
2 changes: 2 additions & 0 deletions src/include/rapi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ SEXP rapi_record_batch(duckdb::rqry_eptr_t, int);

cpp11::r_string rapi_ptr_to_str(SEXP extptr);

SEXP rapi_rel_from_df(duckdb::conn_eptr_t con, cpp11::data_frame df, bool experimental);

void duckdb_r_transform(duckdb::Vector &src_vec, SEXP dest, duckdb::idx_t dest_offset, duckdb::idx_t n, bool integer64);
SEXP duckdb_r_allocate(const duckdb::LogicalType &type, duckdb::idx_t nrows);
void duckdb_r_decorate(const duckdb::LogicalType &type, SEXP dest, bool integer64);
Expand Down
4 changes: 4 additions & 0 deletions src/include/reltoaltrep.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ struct RelToAltrep {
static R_altrep_class_t list_class;
#endif
};

SEXP rapi_rel_from_altrep_df(SEXP df, bool strict, bool allow_materialized);

SEXP rapi_rel_from_any_df(duckdb::conn_eptr_t con, SEXP df, bool allow_materialized);
59 changes: 59 additions & 0 deletions src/relational.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "rapi.hpp"
#include "signal.hpp"
#include "typesr.hpp"
#include "reltoaltrep.hpp"
#include "altrepdataframe_relation.hpp"

#include "R_ext/Random.h"

Expand Down Expand Up @@ -141,6 +143,15 @@ using namespace cpp11;
return res;
}

SEXP rapi_rel_from_any_df(duckdb::conn_eptr_t con, SEXP df, bool allow_materialized) {
auto rel = rapi_rel_from_altrep_df(df, false, allow_materialized);
if (rel != R_NilValue) {
return rel;
}

return rapi_rel_from_df(con, df, false);
}

[[cpp11::register]] SEXP rapi_rel_filter(duckdb::rel_extptr_t rel, list exprs) {
duckdb::unique_ptr<ParsedExpression> filter_expr;
if (exprs.size() == 0) { // nop
Expand All @@ -162,6 +173,30 @@ using namespace cpp11;
return make_external_prot<RelationWrapper>("duckdb_relation", prot, res);
}

[[cpp11::register]] SEXP rapi_rel_filter2(data_frame df, duckdb::conn_eptr_t con, list exprs) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move these to a separate .cpp file and keep them in the same order as here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still valid. Can you please follow up?

duckdb::unique_ptr<ParsedExpression> filter_expr;
if (exprs.size() == 0) { // nop
stop("expected filter expressions");
} else if (exprs.size() == 1) {
filter_expr = ((expr_extptr_t)exprs[0])->Copy();
} else {
vector<duckdb::unique_ptr<ParsedExpression>> filters;
for (expr_extptr_t expr : exprs) {
filters.push_back(expr->Copy());
}
filter_expr = make_uniq<ConjunctionExpression>(ExpressionType::CONJUNCTION_AND, std::move(filters));
}
duckdb::rel_extptr_t rel = cpp11::as_cpp<cpp11::decay_t<duckdb::rel_extptr_t>>(rapi_rel_from_any_df(con, df, true));

auto filter = make_shared_ptr<FilterRelation>(rel->rel, std::move(filter_expr));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be wrapped in an ALTREP data frame at this stage.


auto res = make_shared_ptr<AltrepDataFrameRelation>(filter);
Antonov548 marked this conversation as resolved.
Show resolved Hide resolved

cpp11::writable::list prot = {rel};

return make_external_prot<RelationWrapper>("duckdb_relation", prot, res);
}

[[cpp11::register]] SEXP rapi_rel_project(duckdb::rel_extptr_t rel, list exprs) {
if (exprs.size() == 0) {
warning("rel_project without projection expressions has no effect");
Expand All @@ -183,6 +218,30 @@ using namespace cpp11;
return make_external_prot<RelationWrapper>("duckdb_relation", prot, res);
}

[[cpp11::register]] SEXP rapi_rel_project2(data_frame df, duckdb::conn_eptr_t con, list exprs) {
if (exprs.size() == 0) {
stop("expected projection expressions");
}
vector<duckdb::unique_ptr<ParsedExpression>> projections;
vector<string> aliases;

for (expr_extptr_t expr : exprs) {
auto dexpr = expr->Copy();
aliases.push_back(dexpr->GetName());
projections.push_back(std::move(dexpr));
}

duckdb::rel_extptr_t rel = cpp11::as_cpp<cpp11::decay_t<duckdb::rel_extptr_t>>(rapi_rel_from_any_df(con, df, true));

auto projection = make_shared_ptr<ProjectionRelation>(rel->rel, std::move(projections), std::move(aliases));

auto res = make_shared_ptr<AltrepDataFrameRelation>(projection);

cpp11::writable::list prot = {rel};

return make_external_prot<RelationWrapper>("duckdb_relation", prot, res);
}

[[cpp11::register]] SEXP rapi_rel_aggregate(duckdb::rel_extptr_t rel, list groups, list aggregates) {
vector<duckdb::unique_ptr<ParsedExpression>> res_groups, res_aggregates;

Expand Down
2 changes: 1 addition & 1 deletion src/reltoaltrep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ struct AltrepRelationWrapper {
}

auto materialize_message = Rf_GetOption(RStrings::get().materialize_message_sym, R_BaseEnv);
if (Rf_isLogical(materialize_message) && Rf_length(materialize_message) == 1 && LOGICAL_ELT(materialize_message, 0) == true) {
if (Rf_isLogical(materialize_message) && Rf_length(materialize_message) == 1 && LOGICAL_ELT(materialize_message, 0) == true) {
// Legacy
Rprintf("duckplyr: materializing\n");
}
Expand Down
43 changes: 43 additions & 0 deletions test-old.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
drv <- duckdb::duckdb()
con <- DBI::dbConnect(drv)
df1 <- tibble::tibble(a = 1)

"mutate"
#> [1] "mutate"
rel1 <- duckdb:::rel_from_df(con, df1)
"mutate"
#> [1] "mutate"
rel2 <- duckdb:::rel_project(
rel1,
list(
{
tmp_expr <- duckdb:::expr_reference("a")
duckdb:::expr_set_alias(tmp_expr, "a")
tmp_expr
},
{
tmp_expr <- duckdb:::expr_constant(2)
duckdb:::expr_set_alias(tmp_expr, "b")
tmp_expr
}
)
)
"filter"
#> [1] "filter"
rel3 <- duckdb:::rel_filter(
rel2,
list(
duckdb:::expr_comparison(
"==",
list(
duckdb:::expr_reference("b"),
duckdb:::expr_constant(2)
)
)
)
)
rel2
rel3
duckdb:::rel_to_altrep(rel2)
rel2
rel3
Loading
Loading