Skip to content

Commit

Permalink
change rapi_rel_to_altrep to handle df
Browse files Browse the repository at this point in the history
  • Loading branch information
Antonov548 committed Jan 9, 2025
1 parent 32a0eb8 commit 5a89610
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 78 deletions.
39 changes: 33 additions & 6 deletions src/altrepdataframe_relation.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
#include "altrepdataframe_relation.hpp"

#include "R_ext/Random.h"

namespace duckdb {

AltrepDataFrameRelation::AltrepDataFrameRelation(shared_ptr<Relation> parent)
// TODO: which RelationType should be used?
: Relation(parent->context, RelationType::AGGREGATE_RELATION), parent(std::move(parent)) {
AltrepDataFrameRelation::AltrepDataFrameRelation(duckdb::shared_ptr<Relation> p, cpp11::sexp df, duckdb::conn_eptr_t con, duckdb::shared_ptr<AltrepRelationWrapper> altrep)
: Relation(p->context, RelationType::EXTENSION_RELATION)
, altrep(std::move(altrep))
, dataframe(df)
, connection(std::move(con))
, parent(std::move(p)) {
TryBindRelation(columns);
}

Expand All @@ -13,15 +18,37 @@ const vector<ColumnDefinition> &AltrepDataFrameRelation::Columns() {
}

string AltrepDataFrameRelation::ToString(idx_t depth) {
return parent->ToString(depth);
return GetParent().ToString(depth);
}

bool AltrepDataFrameRelation::IsReadOnly() {
return parent->IsReadOnly();
return GetParent().IsReadOnly();
}

unique_ptr<QueryNode> AltrepDataFrameRelation::GetQueryNode() {
return parent->GetQueryNode();
return GetParent().GetQueryNode();
}

Relation& AltrepDataFrameRelation::GetParent() {
if (altrep->HasQueryResult()) {
// here context mutex locked
return GetTableRelation();
} else {
return *parent;
}
}

Relation& AltrepDataFrameRelation::GetTableRelation() {
if (!table_function_relation) {
named_parameter_map_t other_params;
other_params["experimental"] = Value::BOOLEAN(false);
auto alias = StringUtil::Format("dataframe_%d_%d", (uintptr_t)(SEXP)dataframe,
(int32_t)(NumericLimits<int32_t>::Maximum() * unif_rand()));

table_function_relation = connection->conn->TableFunction("r_dataframe_scan", {Value::POINTER((uintptr_t)(SEXP)dataframe)}, other_params)->Alias(alias);
}

return *table_function_relation;
}

}
8 changes: 4 additions & 4 deletions src/cpp11.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,10 @@ extern "C" SEXP _duckdb_rapi_rel_insert(SEXP rel, SEXP schema_name, SEXP table_n
END_CPP11
}
// reltoaltrep.cpp
SEXP rapi_rel_to_altrep(duckdb::rel_extptr_t rel, bool allow_materialization);
extern "C" SEXP _duckdb_rapi_rel_to_altrep(SEXP rel, SEXP allow_materialization) {
SEXP rapi_rel_to_altrep(duckdb::rel_extptr_t rel, duckdb::conn_eptr_t con, bool allow_materialization);
extern "C" SEXP _duckdb_rapi_rel_to_altrep(SEXP rel, SEXP con, SEXP allow_materialization) {
BEGIN_CPP11
return cpp11::as_sexp(rapi_rel_to_altrep(cpp11::as_cpp<cpp11::decay_t<duckdb::rel_extptr_t>>(rel), cpp11::as_cpp<cpp11::decay_t<bool>>(allow_materialization)));
return cpp11::as_sexp(rapi_rel_to_altrep(cpp11::as_cpp<cpp11::decay_t<duckdb::rel_extptr_t>>(rel), cpp11::as_cpp<cpp11::decay_t<duckdb::conn_eptr_t>>(con), cpp11::as_cpp<cpp11::decay_t<bool>>(allow_materialization)));
END_CPP11
}
// reltoaltrep.cpp
Expand Down Expand Up @@ -520,7 +520,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_duckdb_rapi_rel_set_intersect", (DL_FUNC) &_duckdb_rapi_rel_set_intersect, 2},
{"_duckdb_rapi_rel_set_symdiff", (DL_FUNC) &_duckdb_rapi_rel_set_symdiff, 2},
{"_duckdb_rapi_rel_sql", (DL_FUNC) &_duckdb_rapi_rel_sql, 2},
{"_duckdb_rapi_rel_to_altrep", (DL_FUNC) &_duckdb_rapi_rel_to_altrep, 2},
{"_duckdb_rapi_rel_to_altrep", (DL_FUNC) &_duckdb_rapi_rel_to_altrep, 3},
{"_duckdb_rapi_rel_to_csv", (DL_FUNC) &_duckdb_rapi_rel_to_csv, 3},
{"_duckdb_rapi_rel_to_df", (DL_FUNC) &_duckdb_rapi_rel_to_df, 1},
{"_duckdb_rapi_rel_to_parquet", (DL_FUNC) &_duckdb_rapi_rel_to_parquet, 3},
Expand Down
15 changes: 13 additions & 2 deletions src/include/altrepdataframe_relation.hpp
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
#include "duckdb/main/relation.hpp"
#include "rapi.hpp"
#include "reltoaltrep.hpp"

namespace duckdb {

class AltrepDataFrameRelation final : public Relation {
public:
AltrepDataFrameRelation(shared_ptr<Relation> parent);
AltrepDataFrameRelation(duckdb::shared_ptr<Relation> p, cpp11::sexp df, duckdb::conn_eptr_t con, duckdb::shared_ptr<AltrepRelationWrapper> altrep);

shared_ptr<Relation> parent;
shared_ptr<Relation> table_function_relation;
cpp11::sexp dataframe;
duckdb::conn_eptr_t connection;
duckdb::shared_ptr<AltrepRelationWrapper> altrep;
duckdb::shared_ptr<Relation> parent;
vector<ColumnDefinition> columns;
public:
unique_ptr<QueryNode> GetQueryNode() override;

const vector<ColumnDefinition> &Columns() override;
string ToString(idx_t depth) override;
bool IsReadOnly() override;

private:
Relation& GetTableRelation();

Relation& GetParent();
};

}
22 changes: 22 additions & 0 deletions src/include/reltoaltrep.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,26 @@

#include "rapi.hpp"

namespace duckdb {

struct AltrepRelationWrapper {
AltrepRelationWrapper(rel_extptr_t rel_, bool allow_materialization_);

static AltrepRelationWrapper *Get(SEXP x);

bool HasQueryResult() const;

MaterializedQueryResult *GetQueryResult();

bool allow_materialization;

rel_extptr_t rel_eptr;
duckdb::shared_ptr<Relation> rel;
duckdb::unique_ptr<QueryResult> res;
};

}

struct RelToAltrep {
static void Initialize(DllInfo *dll);
static R_xlen_t RownamesLength(SEXP x);
Expand Down Expand Up @@ -31,3 +51,5 @@ struct RelToAltrep {
SEXP rapi_rel_from_altrep_df(SEXP df, bool strict, bool allow_materialized);

SEXP rapi_rel_from_any_df(duckdb::conn_eptr_t con, SEXP df, bool allow_materialized);

SEXP rapi_rel_to_altrep(duckdb::rel_extptr_t rel, duckdb::conn_eptr_t con, bool allow_materialization);
8 changes: 2 additions & 6 deletions src/relational.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,9 @@ SEXP rapi_rel_from_any_df(duckdb::conn_eptr_t con, SEXP df, bool allow_materiali

auto filter = make_shared_ptr<FilterRelation>(rel->rel, std::move(filter_expr));

auto res = make_shared_ptr<AltrepDataFrameRelation>(filter);

cpp11::writable::list prot = {rel};

return make_external_prot<RelationWrapper>("duckdb_relation", prot, res);
return rapi_rel_to_altrep(make_external_prot<RelationWrapper>("duckdb_relation", prot, filter), con, true);
}

[[cpp11::register]] SEXP rapi_rel_project(duckdb::rel_extptr_t rel, list exprs) {
Expand Down Expand Up @@ -235,11 +233,9 @@ SEXP rapi_rel_from_any_df(duckdb::conn_eptr_t con, SEXP df, bool allow_materiali

auto projection = make_shared_ptr<ProjectionRelation>(rel->rel, std::move(projections), std::move(aliases));

auto res = make_shared_ptr<AltrepDataFrameRelation>(projection);

cpp11::writable::list prot = {rel};

return make_external_prot<RelationWrapper>("duckdb_relation", prot, res);
return rapi_rel_to_altrep(make_external_prot<RelationWrapper>("duckdb_relation", prot, projection), con, true);
}

[[cpp11::register]] SEXP rapi_rel_aggregate(duckdb::rel_extptr_t rel, list groups, list aggregates) {
Expand Down
113 changes: 53 additions & 60 deletions src/reltoaltrep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "reltoaltrep.hpp"
#include "signal.hpp"
#include "cpp11/declarations.hpp"
#include "altrepdataframe_relation.hpp"

#include "httplib.hpp"
#include <cinttypes>
Expand Down Expand Up @@ -87,78 +88,69 @@ static T *GetFromExternalPtr(SEXP x) {
return wrapper;
}

struct AltrepRelationWrapper {

static AltrepRelationWrapper *Get(SEXP x) {
return GetFromExternalPtr<AltrepRelationWrapper>(x);
}
AltrepRelationWrapper *AltrepRelationWrapper::Get(SEXP x) {
return GetFromExternalPtr<AltrepRelationWrapper>(x);
}

AltrepRelationWrapper(rel_extptr_t rel_, bool allow_materialization_)
: allow_materialization(allow_materialization_), rel_eptr(rel_), rel(rel_->rel) {
}
AltrepRelationWrapper::AltrepRelationWrapper(rel_extptr_t rel_, bool allow_materialization_)
: allow_materialization(allow_materialization_), rel_eptr(rel_), rel(rel_->rel) {
}

bool HasQueryResult() const {
return (bool)res;
}
bool AltrepRelationWrapper::HasQueryResult() const {
return (bool)res;
}

MaterializedQueryResult *GetQueryResult() {
if (!res) {
if (!allow_materialization) {
cpp11::stop("Materialization is disabled, use collect() or as_tibble() to materialize");
}
MaterializedQueryResult *AltrepRelationWrapper::GetQueryResult() {
if (!res) {
if (!allow_materialization) {
cpp11::stop("Materialization is disabled, use collect() or as_tibble() to materialize");
}

auto materialize_callback = Rf_GetOption(RStrings::get().materialize_callback_sym, R_BaseEnv);
if (Rf_isFunction(materialize_callback)) {
sexp call = Rf_lang2(materialize_callback, rel_eptr);
Rf_eval(call, R_BaseEnv);
}
auto materialize_callback = Rf_GetOption(RStrings::get().materialize_callback_sym, R_BaseEnv);
if (Rf_isFunction(materialize_callback)) {
sexp call = Rf_lang2(materialize_callback, rel_eptr);
Rf_eval(call, R_BaseEnv);
}

auto materialize_message = Rf_GetOption(RStrings::get().materialize_message_sym, R_BaseEnv);
if (Rf_isLogical(materialize_message) && Rf_length(materialize_message) == 1 && LOGICAL_ELT(materialize_message, 0) == true) {
// Legacy
Rprintf("duckplyr: materializing\n");
}
auto materialize_message = Rf_GetOption(RStrings::get().materialize_message_sym, R_BaseEnv);
if (Rf_isLogical(materialize_message) && Rf_length(materialize_message) == 1 && LOGICAL_ELT(materialize_message, 0) == true) {
// Legacy
Rprintf("duckplyr: materializing\n");
}

ScopedInterruptHandler signal_handler(rel->context->GetContext());
ScopedInterruptHandler signal_handler(rel->context->GetContext());

// We need to temporarily allow a deeper execution stack
// https://github.com/duckdb/duckdb-r/issues/101
auto old_depth = rel->context->GetContext()->config.max_expression_depth;
rel->context->GetContext()->config.max_expression_depth = old_depth * 2;
duckdb_httplib::detail::scope_exit reset_max_expression_depth(
[&]() { rel->context->GetContext()->config.max_expression_depth = old_depth; });
// We need to temporarily allow a deeper execution stack
// https://github.com/duckdb/duckdb-r/issues/101
auto old_depth = rel->context->GetContext()->config.max_expression_depth;
rel->context->GetContext()->config.max_expression_depth = old_depth * 2;
duckdb_httplib::detail::scope_exit reset_max_expression_depth(
[&]() { rel->context->GetContext()->config.max_expression_depth = old_depth; });

res = rel->Execute();
res = rel->Execute();

// FIXME: Use std::experimental::scope_exit
if (rel->context->GetContext()->config.max_expression_depth != old_depth * 2) {
Rprintf("Internal error: max_expression_depth was changed from %" PRIu64 " to %" PRIu64 "\n",
old_depth * 2, rel->context->GetContext()->config.max_expression_depth);
}
rel->context->GetContext()->config.max_expression_depth = old_depth;
reset_max_expression_depth.release();
// FIXME: Use std::experimental::scope_exit
if (rel->context->GetContext()->config.max_expression_depth != old_depth * 2) {
Rprintf("Internal error: max_expression_depth was changed from %" PRIu64 " to %" PRIu64 "\n",
old_depth * 2, rel->context->GetContext()->config.max_expression_depth);
}
rel->context->GetContext()->config.max_expression_depth = old_depth;
reset_max_expression_depth.release();

if (signal_handler.HandleInterrupt()) {
cpp11::stop("Query execution was interrupted");
}
if (signal_handler.HandleInterrupt()) {
cpp11::stop("Query execution was interrupted");
}

signal_handler.Disable();
signal_handler.Disable();

if (res->HasError()) {
cpp11::stop("Error evaluating duckdb query: %s", res->GetError().c_str());
}
D_ASSERT(res->type == QueryResultType::MATERIALIZED_RESULT);
if (res->HasError()) {
cpp11::stop("Error evaluating duckdb query: %s", res->GetError().c_str());
}
D_ASSERT(res);
return (MaterializedQueryResult *)res.get();
D_ASSERT(res->type == QueryResultType::MATERIALIZED_RESULT);
}

bool allow_materialization;

rel_extptr_t rel_eptr;
duckdb::shared_ptr<Relation> rel;
duckdb::unique_ptr<QueryResult> res;
};
D_ASSERT(res);
return (MaterializedQueryResult *)res.get();
}

struct AltrepRownamesWrapper {

Expand Down Expand Up @@ -348,7 +340,7 @@ static R_altrep_class_t LogicalTypeToAltrepType(const LogicalType &type) {
}
}

[[cpp11::register]] SEXP rapi_rel_to_altrep(duckdb::rel_extptr_t rel, bool allow_materialization) {
[[cpp11::register]] SEXP rapi_rel_to_altrep(duckdb::rel_extptr_t rel, duckdb::conn_eptr_t con, bool allow_materialization) {
D_ASSERT(rel && rel->rel);
auto drel = rel->rel;
auto ncols = drel->Columns().size();
Expand Down Expand Up @@ -381,7 +373,8 @@ static R_altrep_class_t LogicalTypeToAltrepType(const LogicalType &type) {
// Row names
cpp11::external_pointer<AltrepRownamesWrapper> ptr(new AltrepRownamesWrapper(relation_wrapper));
R_SetExternalPtrTag(ptr, RStrings::get().duckdb_row_names_sym);
cpp11::sexp row_names_sexp = R_new_altrep(RelToAltrep::rownames_class, ptr, rel);
cpp11::sexp row_names_sexp = R_new_altrep(RelToAltrep::rownames_class, ptr, make_external<RelationWrapper>("duckdb_relation",
make_shared_ptr<duckdb::AltrepDataFrameRelation>(rel->rel, data_frame, con, relation_wrapper)));
install_new_attrib(data_frame, R_RowNamesSymbol, row_names_sexp);

// Class
Expand Down

0 comments on commit 5a89610

Please sign in to comment.