Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: data frame with lazy relation AltrepDataFrameRelation #960

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
8 changes: 8 additions & 0 deletions R/cpp11.R
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,18 @@ rapi_rel_filter <- function(rel, exprs) {
.Call(`_duckdb_rapi_rel_filter`, rel, exprs)
}

rapi_rel_filter2 <- function(df, con, exprs) {
.Call(`_duckdb_rapi_rel_filter2`, df, con, exprs)
}

rapi_rel_project <- function(rel, exprs) {
.Call(`_duckdb_rapi_rel_project`, rel, exprs)
}

rapi_rel_project2 <- function(df, con, exprs) {
.Call(`_duckdb_rapi_rel_project2`, df, con, exprs)
}

rapi_rel_aggregate <- function(rel, groups, aggregates) {
.Call(`_duckdb_rapi_rel_aggregate`, rel, groups, aggregates)
}
Expand Down
29 changes: 28 additions & 1 deletion R/relational.R
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ rel_project <- function(rel, exprs) {
rethrow_rapi_rel_project(rel, exprs)
}

#' Lazily project a DuckDB relation object
#' @param rel the DuckDB relation object
#' @param exprs a list of DuckDB expressions to project
#' @return the now projected `duckdb_relation` object
#' @noRd
#' @examples
#' con <- DBI::dbConnect(duckdb())
#' rel <- rel_from_df(con, mtcars)
#' rel2 <- rel_project(rel, list(expr_reference("cyl"), expr_reference("disp")))
rel_project2 <- function(df, con, exprs) {
rethrow_rapi_rel_project2(df, con@conn_ref, exprs)
}

#' Lazily filter a DuckDB relation object
#' @param rel the DuckDB relation object
#' @param exprs a list of DuckDB expressions to filter by
Expand All @@ -147,6 +160,20 @@ rel_filter <- function(rel, exprs) {
rethrow_rapi_rel_filter(rel, exprs)
}

#' Lazily filter a DuckDB relation object
#' @param rel the DuckDB relation object
#' @param exprs a list of DuckDB expressions to filter by
#' @return the now filtered `duckdb_relation` object
#' @noRd
#' @examples
#' con <- DBI::dbConnect(duckdb())
#' DBI::dbExecute(con, "CREATE OR REPLACE MACRO gt(a, b) AS a > b")
#' rel <- rel_from_df(con, mtcars)
#' rel2 <- rel_filter(rel, list(expr_function("gt", list(expr_reference("cyl"), expr_constant("6")))))
rel_filter2 <- function(df, con, exprs) {
rethrow_rapi_rel_filter2(df, con@conn_ref, exprs)
}

#' Lazily aggregate a DuckDB relation object
#' @param rel the DuckDB relation object
#' @param groups a list of DuckDB expressions to group by
Expand Down Expand Up @@ -412,7 +439,7 @@ rel_set_alias <- function(rel, alias) {
#' con <- DBI::dbConnect(duckdb())
#' rel <- rel_from_df(con, mtcars)
#' print(rel_to_altrep(rel))
rel_to_altrep <- function(rel, allow_materialization = TRUE) {
rel_to_altrep <- function(rel, con, allow_materialization = TRUE) {
rethrow_rapi_rel_to_altrep(rel, allow_materialization)
}

Expand Down
20 changes: 20 additions & 0 deletions R/rethrow-gen.R
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,15 @@ rethrow_rapi_rel_filter <- function(rel, exprs, call = parent.frame(2)) {
)
}

rethrow_rapi_rel_filter2 <- function(df, con, exprs, call = parent.frame(2)) {
rlang::try_fetch(
rapi_rel_filter2(df, con, exprs),
error = function(e) {
rethrow_error_from_rapi(e, call)
}
)
}

rethrow_rapi_rel_project <- function(rel, exprs, call = parent.frame(2)) {
rlang::try_fetch(
rapi_rel_project(rel, exprs),
Expand All @@ -189,6 +198,15 @@ rethrow_rapi_rel_project <- function(rel, exprs, call = parent.frame(2)) {
)
}

rethrow_rapi_rel_project2 <- function(df, con, exprs, call = parent.frame(2)) {
rlang::try_fetch(
rapi_rel_project2(df, con, exprs),
error = function(e) {
rethrow_error_from_rapi(e, call)
}
)
}

rethrow_rapi_rel_aggregate <- function(rel, groups, aggregates, call = parent.frame(2)) {
rlang::try_fetch(
rapi_rel_aggregate(rel, groups, aggregates),
Expand Down Expand Up @@ -579,7 +597,9 @@ rethrow_restore <- function() {
rethrow_rapi_expr_tostring <<- rapi_expr_tostring
rethrow_rapi_rel_from_df <<- rapi_rel_from_df
rethrow_rapi_rel_filter <<- rapi_rel_filter
rethrow_rapi_rel_filter2 <<- rapi_rel_filter2
rethrow_rapi_rel_project <<- rapi_rel_project
rethrow_rapi_rel_project2 <<- rapi_rel_project2
rethrow_rapi_rel_aggregate <<- rapi_rel_aggregate
rethrow_rapi_rel_order <<- rapi_rel_order
rethrow_rapi_expr_window <<- rapi_expr_window
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ add_library(
register.cpp
relational.cpp
reltoaltrep.cpp
altrepdataframe_relation.cpp
scan.cpp
statement.cpp
transform.cpp
Expand Down
2 changes: 1 addition & 1 deletion src/Makevars
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ include Makevars.duckdb

CXX_STD = CXX17
PKG_CPPFLAGS = -Iinclude -I../inst/include -DDUCKDB_DISABLE_PRINT -DDUCKDB_R_BUILD -DBROTLI_ENCODER_CLEANUP_ON_OOM -Iduckdb/src/include -Iduckdb/third_party/concurrentqueue -Iduckdb/third_party/fast_float -Iduckdb/third_party/fastpforlib -Iduckdb/third_party/fmt/include -Iduckdb/third_party/fsst -Iduckdb/third_party/httplib -Iduckdb/third_party/hyperloglog -Iduckdb/third_party/jaro_winkler -Iduckdb/third_party/jaro_winkler/details -Iduckdb/third_party/libpg_query -Iduckdb/third_party/libpg_query/include -Iduckdb/third_party/lz4 -Iduckdb/third_party/brotli/include -Iduckdb/third_party/brotli/common -Iduckdb/third_party/brotli/dec -Iduckdb/third_party/brotli/enc -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -Iduckdb/third_party/mbedtls/library -Iduckdb/third_party/miniz -Iduckdb/third_party/pcg -Iduckdb/third_party/re2 -Iduckdb/third_party/skiplist -Iduckdb/third_party/tdigest -Iduckdb/third_party/utf8proc -Iduckdb/third_party/utf8proc/include -Iduckdb/third_party/yyjson/include -Iduckdb/third_party/zstd/include -Iduckdb/extension/parquet/include -Iduckdb/third_party/parquet -Iduckdb/third_party/thrift -Iduckdb/third_party/lz4 -Iduckdb/third_party/brotli/include -Iduckdb/third_party/brotli/common -Iduckdb/third_party/brotli/dec -Iduckdb/third_party/brotli/enc -Iduckdb/third_party/snappy -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -Iduckdb/third_party/zstd/include -Iduckdb/extension/core_functions/include -I../inst/include -Iduckdb -DDUCKDB_EXTENSION_PARQUET_LINKED -DDUCKDB_BUILD_LIBRARY -DDUCKDB_EXTENSION_CORE_FUNCTIONS_LINKED -DDUCKDB_BUILD_LIBRARY
OBJECTS=rfuns.o database.o connection.o statement.o register.o relational.o scan.o signal.o transform.o utils.o reltoaltrep.o types.o cpp11.o $(SOURCES)
OBJECTS=rfuns.o database.o connection.o statement.o register.o relational.o scan.o signal.o transform.o utils.o reltoaltrep.o altrepdataframe_relation.o types.o cpp11.o $(SOURCES)
60 changes: 60 additions & 0 deletions src/altrepdataframe_relation.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#include "altrepdataframe_relation.hpp"

#include "R_ext/Random.h"

namespace duckdb {

AltrepDataFrameRelation::AltrepDataFrameRelation(duckdb::shared_ptr<Relation> p, cpp11::sexp df, duckdb::conn_eptr_t con, duckdb::shared_ptr<AltrepRelationWrapper> altrep)
: Relation(p->context, RelationType::EXTENSION_RELATION)
, altrep(std::move(altrep))
, dataframe(df)
, connection(std::move(con))
, parent(std::move(p)) {
TryBindRelation(columns);
}

const vector<ColumnDefinition> &AltrepDataFrameRelation::Columns() {
return columns;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? Can this still be parent->...() ?

}

string AltrepDataFrameRelation::ToString(idx_t depth) {
return GetParent().ToString(depth);
}

bool AltrepDataFrameRelation::IsReadOnly() {
return GetParent().IsReadOnly();
}

unique_ptr<QueryNode> AltrepDataFrameRelation::GetQueryNode() {
return GetParent().GetQueryNode();
}

Relation& AltrepDataFrameRelation::GetParent() {
if (altrep->HasQueryResult()) {
// here context mutex locked
return GetTableRelation();
} else {
return *parent;
}
}

void AltrepDataFrameRelation::BuildTableRelation() {
if (!table_function_relation) {
named_parameter_map_t other_params;
other_params["experimental"] = Value::BOOLEAN(false);
auto alias = StringUtil::Format("dataframe_%d_%d", (uintptr_t)(SEXP)dataframe,
(int32_t)(NumericLimits<int32_t>::Maximum() * unif_rand()));

table_function_relation = connection->conn->TableFunction("r_dataframe_scan", {Value::POINTER((uintptr_t)(SEXP)dataframe)}, other_params)->Alias(alias);
}
}

Relation& AltrepDataFrameRelation::GetTableRelation() {
if (!table_function_relation) {
throw RebuildRelationException(this);
}

return *table_function_relation;
}

}
16 changes: 16 additions & 0 deletions src/cpp11.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,27 @@ extern "C" SEXP _duckdb_rapi_rel_filter(SEXP rel, SEXP exprs) {
END_CPP11
}
// relational.cpp
SEXP rapi_rel_filter2(data_frame df, duckdb::conn_eptr_t con, list exprs);
extern "C" SEXP _duckdb_rapi_rel_filter2(SEXP df, SEXP con, SEXP exprs) {
BEGIN_CPP11
return cpp11::as_sexp(rapi_rel_filter2(cpp11::as_cpp<cpp11::decay_t<data_frame>>(df), cpp11::as_cpp<cpp11::decay_t<duckdb::conn_eptr_t>>(con), cpp11::as_cpp<cpp11::decay_t<list>>(exprs)));
END_CPP11
}
// relational.cpp
SEXP rapi_rel_project(duckdb::rel_extptr_t rel, list exprs);
extern "C" SEXP _duckdb_rapi_rel_project(SEXP rel, SEXP exprs) {
BEGIN_CPP11
return cpp11::as_sexp(rapi_rel_project(cpp11::as_cpp<cpp11::decay_t<duckdb::rel_extptr_t>>(rel), cpp11::as_cpp<cpp11::decay_t<list>>(exprs)));
END_CPP11
}
// relational.cpp
SEXP rapi_rel_project2(data_frame df, duckdb::conn_eptr_t con, list exprs);
extern "C" SEXP _duckdb_rapi_rel_project2(SEXP df, SEXP con, SEXP exprs) {
BEGIN_CPP11
return cpp11::as_sexp(rapi_rel_project2(cpp11::as_cpp<cpp11::decay_t<data_frame>>(df), cpp11::as_cpp<cpp11::decay_t<duckdb::conn_eptr_t>>(con), cpp11::as_cpp<cpp11::decay_t<list>>(exprs)));
END_CPP11
}
// relational.cpp
SEXP rapi_rel_aggregate(duckdb::rel_extptr_t rel, list groups, list aggregates);
extern "C" SEXP _duckdb_rapi_rel_aggregate(SEXP rel, SEXP groups, SEXP aggregates) {
BEGIN_CPP11
Expand Down Expand Up @@ -488,6 +502,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_duckdb_rapi_rel_distinct", (DL_FUNC) &_duckdb_rapi_rel_distinct, 1},
{"_duckdb_rapi_rel_explain", (DL_FUNC) &_duckdb_rapi_rel_explain, 3},
{"_duckdb_rapi_rel_filter", (DL_FUNC) &_duckdb_rapi_rel_filter, 2},
{"_duckdb_rapi_rel_filter2", (DL_FUNC) &_duckdb_rapi_rel_filter2, 3},
{"_duckdb_rapi_rel_from_altrep_df", (DL_FUNC) &_duckdb_rapi_rel_from_altrep_df, 3},
{"_duckdb_rapi_rel_from_df", (DL_FUNC) &_duckdb_rapi_rel_from_df, 3},
{"_duckdb_rapi_rel_from_sql", (DL_FUNC) &_duckdb_rapi_rel_from_sql, 2},
Expand All @@ -499,6 +514,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_duckdb_rapi_rel_names", (DL_FUNC) &_duckdb_rapi_rel_names, 1},
{"_duckdb_rapi_rel_order", (DL_FUNC) &_duckdb_rapi_rel_order, 3},
{"_duckdb_rapi_rel_project", (DL_FUNC) &_duckdb_rapi_rel_project, 2},
{"_duckdb_rapi_rel_project2", (DL_FUNC) &_duckdb_rapi_rel_project2, 3},
{"_duckdb_rapi_rel_set_alias", (DL_FUNC) &_duckdb_rapi_rel_set_alias, 2},
{"_duckdb_rapi_rel_set_diff", (DL_FUNC) &_duckdb_rapi_rel_set_diff, 2},
{"_duckdb_rapi_rel_set_intersect", (DL_FUNC) &_duckdb_rapi_rel_set_intersect, 2},
Expand Down
7 changes: 4 additions & 3 deletions src/duckdb/src/common/enum_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3034,19 +3034,20 @@ const StringUtil::EnumStringLiteral *GetRelationTypeValues() {
{ static_cast<uint32_t>(RelationType::VIEW_RELATION), "VIEW_RELATION" },
{ static_cast<uint32_t>(RelationType::QUERY_RELATION), "QUERY_RELATION" },
{ static_cast<uint32_t>(RelationType::DELIM_JOIN_RELATION), "DELIM_JOIN_RELATION" },
{ static_cast<uint32_t>(RelationType::DELIM_GET_RELATION), "DELIM_GET_RELATION" }
{ static_cast<uint32_t>(RelationType::DELIM_GET_RELATION), "DELIM_GET_RELATION" },
{ static_cast<uint32_t>(RelationType::EXTENSION_RELATION), "EXTENSION_RELATION" }
};
return values;
}

template<>
const char* EnumUtil::ToChars<RelationType>(RelationType value) {
return StringUtil::EnumToString(GetRelationTypeValues(), 28, "RelationType", static_cast<uint32_t>(value));
return StringUtil::EnumToString(GetRelationTypeValues(), 29, "RelationType", static_cast<uint32_t>(value));
}

template<>
RelationType EnumUtil::FromString<RelationType>(const char *value) {
return static_cast<RelationType>(StringUtil::StringToEnum(GetRelationTypeValues(), 28, "RelationType", value));
return static_cast<RelationType>(StringUtil::StringToEnum(GetRelationTypeValues(), 29, "RelationType", value));
}

const StringUtil::EnumStringLiteral *GetRenderModeValues() {
Expand Down
2 changes: 2 additions & 0 deletions src/duckdb/src/common/enums/relation_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ string RelationTypeToString(RelationType type) {
return "VIEW_RELATION";
case RelationType::QUERY_RELATION:
return "QUERY_RELATION";
case RelationType::EXTENSION_RELATION:
return "EXTENSION_RELATION";
case RelationType::INVALID_RELATION:
break;
}
Expand Down
3 changes: 2 additions & 1 deletion src/duckdb/src/include/duckdb/common/enums/relation_type.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ enum class RelationType : uint8_t {
VIEW_RELATION,
QUERY_RELATION,
DELIM_JOIN_RELATION,
DELIM_GET_RELATION
DELIM_GET_RELATION,
EXTENSION_RELATION = 255
};

string RelationTypeToString(RelationType type);
Expand Down
41 changes: 41 additions & 0 deletions src/include/altrepdataframe_relation.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#include "duckdb/main/relation.hpp"
#include "rapi.hpp"
#include "reltoaltrep.hpp"

namespace duckdb {

class AltrepDataFrameRelation final : public Relation {
public:
AltrepDataFrameRelation(duckdb::shared_ptr<Relation> p, cpp11::sexp df, duckdb::conn_eptr_t con, duckdb::shared_ptr<AltrepRelationWrapper> altrep);

shared_ptr<Relation> table_function_relation;
cpp11::sexp dataframe;
duckdb::conn_eptr_t connection;
duckdb::shared_ptr<AltrepRelationWrapper> altrep;
duckdb::shared_ptr<Relation> parent;
vector<ColumnDefinition> columns;
public:
unique_ptr<QueryNode> GetQueryNode() override;

const vector<ColumnDefinition> &Columns() override;
string ToString(idx_t depth) override;
bool IsReadOnly() override;

void BuildTableRelation();

private:
Relation& GetTableRelation();

Relation& GetParent();
};

class RebuildRelationException : public std::exception {
public:
RebuildRelationException(AltrepDataFrameRelation* target_) : target(target_) {
}

public:
AltrepDataFrameRelation* target;
};

} // namespace duckdb
2 changes: 2 additions & 0 deletions src/include/rapi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ SEXP rapi_record_batch(duckdb::rqry_eptr_t, int);

cpp11::r_string rapi_ptr_to_str(SEXP extptr);

SEXP rapi_rel_from_df(duckdb::conn_eptr_t con, cpp11::data_frame df, bool experimental);

void duckdb_r_transform(duckdb::Vector &src_vec, SEXP dest, duckdb::idx_t dest_offset, duckdb::idx_t n, bool integer64);
SEXP duckdb_r_allocate(const duckdb::LogicalType &type, duckdb::idx_t nrows);
void duckdb_r_decorate(const duckdb::LogicalType &type, SEXP dest, bool integer64);
Expand Down
26 changes: 26 additions & 0 deletions src/include/reltoaltrep.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,26 @@

#include "rapi.hpp"

namespace duckdb {

struct AltrepRelationWrapper {
AltrepRelationWrapper(rel_extptr_t rel_, bool allow_materialization_);

static AltrepRelationWrapper *Get(SEXP x);

bool HasQueryResult() const;

MaterializedQueryResult *GetQueryResult();

bool allow_materialization;

rel_extptr_t rel_eptr;
duckdb::shared_ptr<Relation> rel;
duckdb::unique_ptr<QueryResult> res;
};

}

struct RelToAltrep {
static void Initialize(DllInfo *dll);
static R_xlen_t RownamesLength(SEXP x);
Expand All @@ -27,3 +47,9 @@ struct RelToAltrep {
static R_altrep_class_t list_class;
#endif
};

SEXP rapi_rel_from_altrep_df(SEXP df, bool strict, bool allow_materialized);

SEXP rapi_rel_from_any_df(duckdb::conn_eptr_t con, SEXP df, bool allow_materialized);

SEXP rapi_rel_to_altrep2(duckdb::rel_extptr_t rel, duckdb::conn_eptr_t con, bool allow_materialization);
Loading
Loading