From 991a52d95c3b7b9e9f2f2db8af63bd49ba141491 Mon Sep 17 00:00:00 2001 From: mkaruza Date: Fri, 3 May 2024 11:50:26 +0200 Subject: [PATCH 1/5] Quack node * Hook into postgres planner rather than on execution. Idea is to split DuckDB execution into prepare/execute phase. If DuckDB prepare is not valid will we fall back to normal postgres execution. * Another benefit of having planner hook is that custom RETURN TABLE FUNCTIONS can be used in PG syntax. This custom function should only be created to pass parsing phase. Wee can now use `SELECT * FROM read_parquet('...')` that will read parquet files through DuckDB. * Created quack node which will be used for duckdb exection. This quack nodes is defined as `Custom Scan Node`. EXPLAIN will work out of box with this approach - we'll output just explain plan from DuckDB execution. * Added `httpfs` extension to be build together with parquer extension. --- Makefile | 9 +- include/quack/quack_filter.hpp | 2 + include/quack/quack_heap_scan.hpp | 5 +- include/quack/quack_heap_seq_scan.hpp | 2 + include/quack/quack_node.hpp | 9 ++ include/quack/quack_planner.hpp | 8 ++ include/quack/quack_select.h | 8 -- include/quack/quack_types.hpp | 3 + quack--0.0.1.sql | 8 ++ src/quack.cpp | 3 +- src/quack_heap_scan.cpp | 4 +- src/quack_heap_seq_scan.cpp | 28 +++-- src/quack_hooks.cpp | 28 ++--- src/quack_node.cpp | 148 ++++++++++++++++++++++++++ src/quack_planner.cpp | 130 ++++++++++++++++++++++ src/quack_select.cpp | 113 -------------------- src/quack_types.cpp | 30 +++++- 17 files changed, 384 insertions(+), 154 deletions(-) create mode 100644 include/quack/quack_node.hpp create mode 100644 include/quack/quack_planner.hpp delete mode 100644 include/quack/quack_select.h create mode 100644 src/quack_node.cpp create mode 100644 src/quack_planner.cpp delete mode 100644 src/quack_select.cpp diff --git a/Makefile b/Makefile index 214fffe2..bc69710f 100644 --- a/Makefile +++ b/Makefile @@ -9,9 +9,10 @@ SRCS = src/quack_detoast.cpp \ src/quack_heap_scan.cpp \ src/quack_heap_seq_scan.cpp \ src/quack_hooks.cpp \ - src/quack_select.cpp \ - src/quack_types.cpp \ src/quack_memory_allocator.cpp \ + src/quack_node.cpp \ + src/quack_planner.cpp \ + src/quack_types.cpp \ src/quack.cpp OBJS = $(subst .cpp,.o, $(SRCS)) @@ -54,7 +55,7 @@ ifeq ($(UNAME_S),Linux) DUCKDB_LIB = libduckdb.so endif -all: duckdb $(OBJS) +all: duckdb $(OBJS) .depend include $(PGXS) @@ -64,7 +65,7 @@ third_party/duckdb/Makefile: git submodule update --init --recursive third_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src/$(DUCKDB_LIB): - $(MAKE) -C third_party/duckdb $(QUACK_BUILD_DUCKDB) DISABLE_SANITIZER=1 ENABLE_UBSAN=0 BUILD_UNITTESTS=OFF CMAKE_EXPORT_COMPILE_COMMANDS=1 + $(MAKE) -C third_party/duckdb $(QUACK_BUILD_DUCKDB) DISABLE_SANITIZER=1 ENABLE_UBSAN=0 BUILD_UNITTESTS=OFF BUILD_HTTPFS=1 CMAKE_EXPORT_COMPILE_COMMANDS=1 install_duckdb: $(install_bin) -m 755 third_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src/$(DUCKDB_LIB) $(DESTDIR)$(PG_LIB) diff --git a/include/quack/quack_filter.hpp b/include/quack/quack_filter.hpp index 456cdb7a..eff8ecc7 100644 --- a/include/quack/quack_filter.hpp +++ b/include/quack/quack_filter.hpp @@ -7,5 +7,7 @@ extern "C" { } namespace quack { + bool ApplyValueFilter(duckdb::TableFilter &filter, Datum &value, bool isNull, Oid typeOid); + } // namespace quack \ No newline at end of file diff --git a/include/quack/quack_heap_scan.hpp b/include/quack/quack_heap_scan.hpp index 9d7ecac3..92abe2cf 100644 --- a/include/quack/quack_heap_scan.hpp +++ b/include/quack/quack_heap_scan.hpp @@ -78,12 +78,13 @@ struct PostgresHeapScanFunction : public duckdb::TableFunction { struct PostgresHeapReplacementScanData : public duckdb::ReplacementScanData { public: - PostgresHeapReplacementScanData(QueryDesc *desc) : desc(desc) { + PostgresHeapReplacementScanData(Query *parse, const char *query) : m_parse(parse), m_query(query) { } ~PostgresHeapReplacementScanData() override {}; public: - QueryDesc *desc; + Query *m_parse; + std::string m_query; }; duckdb::unique_ptr PostgresHeapReplacementScan(duckdb::ClientContext &context, diff --git a/include/quack/quack_heap_seq_scan.hpp b/include/quack/quack_heap_seq_scan.hpp index 1921d407..67ba1113 100644 --- a/include/quack/quack_heap_seq_scan.hpp +++ b/include/quack/quack_heap_seq_scan.hpp @@ -79,6 +79,7 @@ class PostgresHeapSeqScan { public: Relation GetRelation(); + void CloseRelation(); TupleDesc GetTupleDesc(); bool ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo); bool IsValid() const; @@ -87,6 +88,7 @@ class PostgresHeapSeqScan { Page PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanInfo); private: + RangeTblEntry * m_tableEntry = nullptr; Relation m_rel = nullptr; Snapshot m_snapshot = nullptr; PostgresHeapSeqParallelScanState m_parallel_scan_state; diff --git a/include/quack/quack_node.hpp b/include/quack/quack_node.hpp new file mode 100644 index 00000000..25c30727 --- /dev/null +++ b/include/quack/quack_node.hpp @@ -0,0 +1,9 @@ +#pragma once + +extern "C" { +#include "postgres.h" +#include "nodes/extensible.h" +} + +extern CustomScanMethods quack_scan_scan_methods; +extern "C" void quack_init_node(void); \ No newline at end of file diff --git a/include/quack/quack_planner.hpp b/include/quack/quack_planner.hpp new file mode 100644 index 00000000..c2a70514 --- /dev/null +++ b/include/quack/quack_planner.hpp @@ -0,0 +1,8 @@ +#pragma once + +extern "C" { +#include "postgres.h" +#include "optimizer/planner.h" +} + +PlannedStmt *quack_plan_node(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams); \ No newline at end of file diff --git a/include/quack/quack_select.h b/include/quack/quack_select.h deleted file mode 100644 index 7490bdcf..00000000 --- a/include/quack/quack_select.h +++ /dev/null @@ -1,8 +0,0 @@ -#pragma once - -extern "C" { -#include "postgres.h" -#include "executor/executor.h" -} - -extern "C" bool quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t count); \ No newline at end of file diff --git a/include/quack/quack_types.hpp b/include/quack/quack_types.hpp index eeb7e4a8..dd2f382b 100644 --- a/include/quack/quack_types.hpp +++ b/include/quack/quack_types.hpp @@ -10,9 +10,12 @@ extern "C" { #include "quack/quack_heap_seq_scan.hpp" namespace quack { + duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type); +Oid GetPostgresDuckDBType(duckdb::LogicalTypeId type); void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset); void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col); void InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo, PostgresHeapSeqParallelScanState ¶llelScanState); + } // namespace quack \ No newline at end of file diff --git a/quack--0.0.1.sql b/quack--0.0.1.sql index 97a4d362..e61f1782 100644 --- a/quack--0.0.1.sql +++ b/quack--0.0.1.sql @@ -1 +1,9 @@ LOAD 'quack'; + +CREATE OR REPLACE FUNCTION read_parquet(path text) +RETURNS SETOF record LANGUAGE 'plpgsql' AS +$func$ +BEGIN + RETURN QUERY EXECUTE 'SELECT 1'; +END; +$func$; diff --git a/src/quack.cpp b/src/quack.cpp index dd507911..69ff02b5 100644 --- a/src/quack.cpp +++ b/src/quack.cpp @@ -1,10 +1,10 @@ extern "C" { #include "postgres.h" - #include "utils/guc.h" } #include "quack/quack.h" +#include "quack/quack_node.hpp" static void quack_init_guc(void); @@ -17,6 +17,7 @@ void _PG_init(void) { quack_init_guc(); quack_init_hooks(); + quack_init_node(); } } diff --git a/src/quack_heap_scan.cpp b/src/quack_heap_scan.cpp index 19d7a98a..031524ef 100644 --- a/src/quack_heap_scan.cpp +++ b/src/quack_heap_scan.cpp @@ -188,14 +188,14 @@ PostgresHeapReplacementScan(duckdb::ClientContext &context, const duckdb::string auto &scan_data = reinterpret_cast(*data); /* Check name against query rtable list and verify that it is heap table */ - auto table = FindMatchingHeapRelation(scan_data.desc->plannedstmt->rtable, table_name); + auto table = FindMatchingHeapRelation(scan_data.m_parse->rtable, table_name); if (!table) { return nullptr; } // Create POINTER values from the 'table' and 'snapshot' variables - auto children = CreateFunctionArguments(table, scan_data.desc->estate->es_snapshot); + auto children = CreateFunctionArguments(table, GetActiveSnapshot()); auto table_function = duckdb::make_uniq(); table_function->function = duckdb::make_uniq("postgres_heap_scan", std::move(children)); diff --git a/src/quack_heap_seq_scan.cpp b/src/quack_heap_seq_scan.cpp index 3d028632..96470ae2 100644 --- a/src/quack_heap_seq_scan.cpp +++ b/src/quack_heap_seq_scan.cpp @@ -16,7 +16,7 @@ extern "C" { namespace quack { PostgresHeapSeqScan::PostgresHeapSeqScan(RangeTblEntry *table) - : m_rel(RelationIdGetRelation(table->relid)), m_snapshot(nullptr) { + : m_tableEntry(table), m_rel(nullptr), m_snapshot(nullptr) { } PostgresHeapSeqScan::~PostgresHeapSeqScan() { @@ -25,15 +25,28 @@ PostgresHeapSeqScan::~PostgresHeapSeqScan() { } } -PostgresHeapSeqScan::PostgresHeapSeqScan(PostgresHeapSeqScan &&other) : m_rel(other.m_rel) { - other.m_rel = nullptr; +PostgresHeapSeqScan::PostgresHeapSeqScan(PostgresHeapSeqScan &&other) + : m_tableEntry(other.m_tableEntry), m_rel(nullptr) { + other.CloseRelation(); + other.m_tableEntry = nullptr; } Relation PostgresHeapSeqScan::GetRelation() { + if (m_tableEntry && m_rel == nullptr) { + m_rel = RelationIdGetRelation(m_tableEntry->relid); + } return m_rel; } +void +PostgresHeapSeqScan::CloseRelation() { + if (IsValid()) { + RelationClose(m_rel); + } + m_rel = nullptr; +} + bool PostgresHeapSeqScan::IsValid() const { return RelationIsValid(m_rel); @@ -56,7 +69,8 @@ PostgresHeapSeqScan::PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanIn } void -PostgresHeapSeqScan::InitParallelScanState( duckdb::TableFunctionInitInput &input) { +PostgresHeapSeqScan::InitParallelScanState(duckdb::TableFunctionInitInput &input) { + (void) GetRelation(); m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel); /* SELECT COUNT(*) FROM */ @@ -80,8 +94,7 @@ PostgresHeapSeqScan::InitParallelScanState( duckdb::TableFunctionInitInput &inpu } } - - //m_parallel_scan_state.PrefetchNextRelationPages(m_rel); + // m_parallel_scan_state.PrefetchNextRelationPages(m_rel); m_parallel_scan_state.m_filters = input.filters.get(); } @@ -110,7 +123,7 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc threadScanInfo.m_buffer = ReadBufferExtended(m_rel, MAIN_FORKNUM, block, RBM_NORMAL, m_parallel_scan_state.m_strategy); LockBuffer(threadScanInfo.m_buffer, BUFFER_LOCK_SHARE); - //m_parallel_scan_state.PrefetchNextRelationPages(m_rel); + // m_parallel_scan_state.PrefetchNextRelationPages(m_rel); m_parallel_scan_state.m_lock.unlock(); page = PreparePageRead(threadScanInfo); threadScanInfo.m_read_next_page = false; @@ -196,7 +209,6 @@ PostgresHeapSeqParallelScanState::PrefetchNextRelationPages(Relation rel) { (m_last_prefetch_block - m_last_assigned_block_number) > 8) return; - for (BlockNumber i = m_last_prefetch_block; i < last_batch_prefetch_block_num; i++) { PrefetchBuffer(rel, MAIN_FORKNUM, m_last_prefetch_block); m_last_prefetch_block++; diff --git a/src/quack_hooks.cpp b/src/quack_hooks.cpp index d280a157..1c95458e 100644 --- a/src/quack_hooks.cpp +++ b/src/quack_hooks.cpp @@ -6,9 +6,9 @@ extern "C" { } #include "quack/quack.h" -#include "quack/quack_select.h" +#include "quack/quack_planner.hpp" -static ExecutorRun_hook_type PrevExecutorRunHook = NULL; +static planner_hook_type PrevPlannerHook = NULL; static bool is_quack_extension_registered() { @@ -33,24 +33,24 @@ is_catalog_table(List *tables) { return false; } -static void -quack_executor_run(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) { - if (is_quack_extension_registered() && !is_catalog_table(queryDesc->plannedstmt->rtable) && - queryDesc->operation == CMD_SELECT) { - if (quack_execute_select(queryDesc, direction, count)) { - return; +static PlannedStmt * +quack_planner(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams) { + if (is_quack_extension_registered() && !is_catalog_table(parse->rtable) && parse->commandType == CMD_SELECT) { + PlannedStmt * quackPlan = quack_plan_node(parse, query_string, cursorOptions, boundParams); + if (quackPlan) { + return quackPlan; } } - elog(DEBUG3, "quack_executor_run: Failing back to PG execution"); - - if (PrevExecutorRunHook) { - PrevExecutorRunHook(queryDesc, direction, count, execute_once); + if (PrevPlannerHook) { + return PrevPlannerHook(parse, query_string, cursorOptions, boundParams); + } else { + return standard_planner(parse, query_string, cursorOptions, boundParams); } } void quack_init_hooks(void) { - PrevExecutorRunHook = ExecutorRun_hook ? ExecutorRun_hook : standard_ExecutorRun; - ExecutorRun_hook = quack_executor_run; + PrevPlannerHook = planner_hook; + planner_hook = quack_planner; } \ No newline at end of file diff --git a/src/quack_node.cpp b/src/quack_node.cpp new file mode 100644 index 00000000..7cc4d3e4 --- /dev/null +++ b/src/quack_node.cpp @@ -0,0 +1,148 @@ + +#include "duckdb.hpp" + +#include "quack/quack_node.hpp" +#include "quack/quack_heap_scan.hpp" +#include "quack/quack_types.hpp" + +/* global variables */ +CustomScanMethods quack_scan_scan_methods; + +/* static variables */ +static CustomExecMethods quack_scan_exec_methods; + +typedef struct QuackScanState { + CustomScanState css; /* must be first field */ + duckdb::DuckDB *duckdb; + duckdb::PreparedStatement *preparedStatement; + bool is_executed; + bool fetch_next; + duckdb::unique_ptr queryResult; + duckdb::idx_t columnCount; + duckdb::unique_ptr currentDataChunk; + duckdb::idx_t currentRow; +} QuackScanState; + +/* static callbacks */ +static Node *Quack_CreateCustomScanState(CustomScan *cscan); +static void Quack_BeginCustomScan(CustomScanState *node, EState *estate, int eflags); +static TupleTableSlot *Quack_ExecCustomScan(CustomScanState *node); +static void Quack_EndCustomScan(CustomScanState *node); +static void Quack_ReScanCustomScan(CustomScanState *node); +static void Quack_ExplainCustomScan(CustomScanState *node, List *ancestors, ExplainState *es); + +static Node * +Quack_CreateCustomScanState(CustomScan *cscan) { + QuackScanState *quackScanState = (QuackScanState *)newNode(sizeof(QuackScanState), T_CustomScanState); + CustomScanState *customScanState = &quackScanState->css; + quackScanState->duckdb = (duckdb::DuckDB *)linitial(cscan->custom_private); + quackScanState->preparedStatement = (duckdb::PreparedStatement *)lsecond(cscan->custom_private); + quackScanState->is_executed = false; + quackScanState->fetch_next = true; + customScanState->methods = &quack_scan_exec_methods; + return (Node *)customScanState; +} + +void +Quack_BeginCustomScan(CustomScanState *cscanstate, EState *estate, int eflags) { + QuackScanState *quackScanState = (QuackScanState *)cscanstate; + quackScanState->css.ss.ps.ps_ResultTupleDesc = quackScanState->css.ss.ss_ScanTupleSlot->tts_tupleDescriptor; +} + +static TupleTableSlot * +Quack_ExecCustomScan(CustomScanState *node) { + QuackScanState *quackScanState = (QuackScanState *)node; + + TupleTableSlot *slot = quackScanState->css.ss.ss_ScanTupleSlot; + + if (!quackScanState->is_executed) { + quackScanState->queryResult = quackScanState->preparedStatement->Execute(); + quackScanState->columnCount = quackScanState->queryResult->ColumnCount(); + quackScanState->is_executed = true; + } + + if (quackScanState->fetch_next) { + quackScanState->currentDataChunk = quackScanState->queryResult->Fetch(); + quackScanState->currentRow = 0; + quackScanState->fetch_next = false; + if (!quackScanState->currentDataChunk || quackScanState->currentDataChunk->size() == 0) { + ExecClearTuple(slot); + return slot; + } + } + + ExecClearTuple(slot); + + for (idx_t col = 0; col < quackScanState->columnCount; col++) { + auto value = quackScanState->currentDataChunk->GetValue(col, quackScanState->currentRow); + if (value.IsNull()) { + slot->tts_isnull[col] = true; + } else { + slot->tts_isnull[col] = false; + quack::ConvertDuckToPostgresValue(slot, value, col); + } + } + + quackScanState->currentRow++; + if (quackScanState->currentRow >= quackScanState->currentDataChunk->size()) { + delete quackScanState->currentDataChunk.release(); + quackScanState->fetch_next = true; + } + + ExecStoreVirtualTuple(slot); + return slot; +} + +void +Quack_EndCustomScan(CustomScanState *node) { + QuackScanState *quackScanState = (QuackScanState *)node; + quackScanState->queryResult.reset(); + delete quackScanState->preparedStatement; + delete quackScanState->duckdb; +} + +void +Quack_ReScanCustomScan(CustomScanState *node) { +} + +void +Quack_ExplainCustomScan(CustomScanState *node, List *ancestors, ExplainState *es) { + QuackScanState *quackScanState = (QuackScanState *)node; + auto res = quackScanState->preparedStatement->Execute(); + std::string explainOutput = "\n\n"; + auto chunk = res->Fetch(); + if (!chunk || chunk->size() == 0) { + return; + } + /* Is it safe to hardcode this as result of DuckDB explain? */ + auto value = chunk->GetValue(1, 0); + explainOutput += value.GetValue(); + explainOutput += "\n"; + ExplainPropertyText("DuckDB Execution Plan", explainOutput.c_str(), es); +} + +extern "C" void +quack_init_node() { + /* setup scan methods */ + memset(&quack_scan_scan_methods, 0, sizeof(quack_scan_scan_methods)); + quack_scan_scan_methods.CustomName = "DuckDBScan"; + quack_scan_scan_methods.CreateCustomScanState = Quack_CreateCustomScanState; + RegisterCustomScanMethods(&quack_scan_scan_methods); + + /* setup exec methods */ + memset(&quack_scan_exec_methods, 0, sizeof(quack_scan_exec_methods)); + quack_scan_exec_methods.CustomName = "DuckDBScan"; + + quack_scan_exec_methods.BeginCustomScan = Quack_BeginCustomScan; + quack_scan_exec_methods.ExecCustomScan = Quack_ExecCustomScan; + quack_scan_exec_methods.EndCustomScan = Quack_EndCustomScan; + quack_scan_exec_methods.ReScanCustomScan = Quack_ReScanCustomScan; + + quack_scan_exec_methods.EstimateDSMCustomScan = NULL; + quack_scan_exec_methods.InitializeDSMCustomScan = NULL; + quack_scan_exec_methods.ReInitializeDSMCustomScan = NULL; + quack_scan_exec_methods.InitializeWorkerCustomScan = NULL; + quack_scan_exec_methods.ShutdownCustomScan = NULL; + + quack_scan_exec_methods.ExplainCustomScan = Quack_ExplainCustomScan; +} \ No newline at end of file diff --git a/src/quack_planner.cpp b/src/quack_planner.cpp new file mode 100644 index 00000000..f4baa3d0 --- /dev/null +++ b/src/quack_planner.cpp @@ -0,0 +1,130 @@ +#include "duckdb.hpp" +#include "duckdb/parser/parsed_data/create_table_function_info.hpp" + +extern "C" { +#include "postgres.h" +#include "catalog/pg_type.h" +#include "nodes/nodes.h" +#include "nodes/makefuncs.h" +#include "utils/syscache.h" +} + +#include "quack/quack_heap_scan.hpp" +#include "quack/quack_node.hpp" +#include "quack/quack_planner.hpp" +#include "quack/quack_types.hpp" + +namespace quack { + +static duckdb::DuckDB * +quack_open_database() { + duckdb::DBConfig config; + // config.SetOption("memory_limit", "2GB"); + // config.SetOption("threads", "8"); + // config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); + return new duckdb::DuckDB(nullptr, &config); +} + +} // namespace quack + +static Plan * +quack_create_plan(Query *parse, const char *query) { + auto db = quack::quack_open_database(); + + /* Add heap tables */ + db->instance->config.replacement_scans.emplace_back( + quack::PostgresHeapReplacementScan, + duckdb::make_uniq_base(parse, query)); + auto connection = duckdb::make_uniq(*db); + + // Add the postgres_scan inserted by the replacement scan + auto &context = *connection->context; + quack::PostgresHeapScanFunction heap_scan_fun; + duckdb::CreateTableFunctionInfo heap_scan_info(heap_scan_fun); + + auto &catalog = duckdb::Catalog::GetSystemCatalog(context); + context.transaction.BeginTransaction(); + catalog.CreateTableFunction(context, &heap_scan_info); + context.transaction.Commit(); + + auto preparedQuery = context.Prepare(query); + + if (preparedQuery->HasError()) { + elog(INFO, "(Quack) %s", preparedQuery->GetError().c_str()); + return nullptr; + } + + CustomScan *quackNode = makeNode(CustomScan); + + auto &preparedResultTypes = preparedQuery->GetTypes(); + + for (auto i = 0; i < preparedResultTypes.size(); i++) { + auto &column = preparedResultTypes[i]; + Oid postgresColumnOid = quack::GetPostgresDuckDBType(column.id()); + + if (OidIsValid(postgresColumnOid)) { + HeapTuple tp; + Form_pg_type typtup; + + tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(postgresColumnOid)); + if (!HeapTupleIsValid(tp)) + elog(ERROR, "cache lookup failed for type %u", postgresColumnOid); + + typtup = (Form_pg_type)GETSTRUCT(tp); + + Var *var = makeVar(INDEX_VAR, i + 1, postgresColumnOid, typtup->typtypmod, typtup->typcollation, 0); + + quackNode->custom_scan_tlist = + lappend(quackNode->custom_scan_tlist, + makeTargetEntry((Expr *)var, i + 1, (char *)preparedQuery->GetNames()[i].c_str(), false)); + + ReleaseSysCache(tp); + } + } + + quackNode->custom_private = list_make2(db, preparedQuery.release()); + quackNode->methods = &quack_scan_scan_methods; + + return (Plan *)quackNode; +} + +PlannedStmt * +quack_plan_node(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams) { + + /* We need to check can we DuckDB create plan */ + Plan *quackPlan = (Plan *)castNode(CustomScan, quack_create_plan(parse, query_string)); + + if (!quackPlan) { + return nullptr; + } + + /* build the PlannedStmt result */ + PlannedStmt *result = makeNode(PlannedStmt); + + result->commandType = parse->commandType; + result->queryId = parse->queryId; + result->hasReturning = (parse->returningList != NIL); + result->hasModifyingCTE = parse->hasModifyingCTE; + result->canSetTag = parse->canSetTag; + result->transientPlan = false; + result->dependsOnRole = false; + result->parallelModeNeeded = false; + result->planTree = quackPlan; + result->rtable = NULL; + result->permInfos = NULL; + result->resultRelations = NULL; + result->appendRelations = NULL; + result->subplans = NIL; + result->rewindPlanIDs = NULL; + result->rowMarks = NIL; + result->relationOids = NIL; + result->invalItems = NIL; + result->paramExecTypes = NIL; + + /* utilityStmt should be null, but we might as well copy it */ + result->utilityStmt = parse->utilityStmt; + result->stmt_location = parse->stmt_location; + result->stmt_len = parse->stmt_len; + + return result; +} diff --git a/src/quack_select.cpp b/src/quack_select.cpp deleted file mode 100644 index bf879499..00000000 --- a/src/quack_select.cpp +++ /dev/null @@ -1,113 +0,0 @@ -#include "duckdb/parser/parsed_data/create_table_function_info.hpp" -#include "duckdb.hpp" - -extern "C" { -#include "postgres.h" -#include "fmgr.h" - -#include "access/genam.h" -#include "access/table.h" -#include "catalog/namespace.h" -#include "catalog/pg_proc.h" -#include "utils/lsyscache.h" -#include "utils/syscache.h" -#include "utils/rel.h" - -#include "quack/quack_select.h" -} - -#include "quack/quack_heap_scan.hpp" -#include "quack/quack_types.hpp" -#include "quack/quack_memory_allocator.hpp" - -namespace quack { - -static duckdb::unique_ptr -quack_open_database() { - duckdb::DBConfig config; - //config.SetOption("memory_limit", "2GB"); - //config.SetOption("threads", "8"); - //config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); - return duckdb::make_uniq(nullptr, &config); -} - -} // namespace quack - -extern "C" bool -quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t count) { - auto db = quack::quack_open_database(); - - /* Add heap tables */ - db->instance->config.replacement_scans.emplace_back( - quack::PostgresHeapReplacementScan, - duckdb::make_uniq_base(query_desc)); - auto connection = duckdb::make_uniq(*db); - - // Add the postgres_scan inserted by the replacement scan - auto &context = *connection->context; - quack::PostgresHeapScanFunction heap_scan_fun; - duckdb::CreateTableFunctionInfo heap_scan_info(heap_scan_fun); - - auto &catalog = duckdb::Catalog::GetSystemCatalog(context); - context.transaction.BeginTransaction(); - catalog.CreateTableFunction(context, &heap_scan_info); - context.transaction.Commit(); - - idx_t column_count; - - CmdType operation; - DestReceiver *dest; - - TupleTableSlot *slot = NULL; - - // FIXME: try-catch ? - - duckdb::unique_ptr res = nullptr; - - res = connection->Query(query_desc->sourceText); - if (res->HasError()) { - return false; - } - - operation = query_desc->operation; - dest = query_desc->dest; - - dest->rStartup(dest, operation, query_desc->tupDesc); - - slot = MakeTupleTableSlot(query_desc->tupDesc, &TTSOpsHeapTuple); - column_count = res->ColumnCount(); - - while (true) { - auto chunk = res->Fetch(); - - if (!chunk || chunk->size() == 0) { - break; - } - - for (idx_t row = 0; row < chunk->size(); row++) { - ExecClearTuple(slot); - - for (idx_t col = 0; col < column_count; col++) { - auto value = chunk->GetValue(col, row); - if (value.IsNull()) { - slot->tts_isnull[col] = true; - } else { - slot->tts_isnull[col] = false; - quack::ConvertDuckToPostgresValue(slot, value, col); - } - } - - ExecStoreVirtualTuple(slot); - - dest->receiveSlot(slot, dest); - - for (idx_t i = 0; i < column_count; i++) { - if (slot->tts_tupleDescriptor->attrs[i].attbyval == false) { - pfree(DatumGetPointer(slot->tts_values[i])); - } - } - } - } - dest->rShutdown(dest); - return true; -} \ No newline at end of file diff --git a/src/quack_types.cpp b/src/quack_types.cpp index c3e29390..3c8cd6b3 100644 --- a/src/quack_types.cpp +++ b/src/quack_types.cpp @@ -71,7 +71,7 @@ ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col break; } default: - elog(ERROR, "Unsuported quack type: %d", oid); + elog(ERROR, "(DuckDB/ConvertDuckToPostgresValue) Unsuported quack type: %d", oid); } } @@ -97,7 +97,33 @@ ConvertPostgresToDuckColumnType(Oid type) { case TIMESTAMPOID: return duckdb::LogicalTypeId::TIMESTAMP; default: - elog(ERROR, "Unsupported quack type: %d", type); + elog(ERROR, "(DuckDB/ConvertPostgresToDuckColumnType) Unsupported quack type: %d", type); + } +} + +Oid +GetPostgresDuckDBType(duckdb::LogicalTypeId type) { + switch (type) { + case duckdb::LogicalTypeId::BOOLEAN: + return BOOLOID; + case duckdb::LogicalTypeId::TINYINT: + return CHAROID; + case duckdb::LogicalTypeId::SMALLINT: + return INT2OID; + case duckdb::LogicalTypeId::INTEGER: + return INT4OID; + case duckdb::LogicalTypeId::BIGINT: + return INT8OID; + case duckdb::LogicalTypeId::VARCHAR: + return VARCHAROID; + case duckdb::LogicalTypeId::DATE: + return DATEOID; + case duckdb::LogicalTypeId::TIMESTAMP: + return TIMESTAMPOID; + case duckdb::LogicalTypeId::DOUBLE: + return FLOAT8OID; + default: + elog(ERROR, "(DuckDB/GetPostgresDuckDBType) Unsupported quack type: %d", static_cast(type)); } } From 2ae51dc87467be264062e6b587c4aacddf80294c Mon Sep 17 00:00:00 2001 From: mkaruza Date: Mon, 6 May 2024 11:26:10 +0200 Subject: [PATCH 2/5] Fixed query projection / Added filter for DATE --- include/quack/quack_types.hpp | 4 ++++ src/quack_filter.cpp | 16 +++++++++++++++- src/quack_heap_seq_scan.cpp | 2 +- src/quack_types.cpp | 26 +++++++++++++------------- 4 files changed, 33 insertions(+), 15 deletions(-) diff --git a/include/quack/quack_types.hpp b/include/quack/quack_types.hpp index dd2f382b..bb98cb5f 100644 --- a/include/quack/quack_types.hpp +++ b/include/quack/quack_types.hpp @@ -11,6 +11,10 @@ extern "C" { namespace quack { +// DuckDB has date starting from 1/1/1970 while PG starts from 1/1/2000 +constexpr int32_t QUACK_DUCK_DATE_OFFSET = 10957; +constexpr int64_t QUACK_DUCK_TIMESTAMP_OFFSET = INT64CONST(10957) * USECS_PER_DAY; + duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type); Oid GetPostgresDuckDBType(duckdb::LogicalTypeId type); void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset); diff --git a/src/quack_filter.cpp b/src/quack_filter.cpp index 946b9720..a0a85cb1 100644 --- a/src/quack_filter.cpp +++ b/src/quack_filter.cpp @@ -6,6 +6,9 @@ extern "C" { #include "catalog/pg_type.h" } +#include "quack/quack_filter.hpp" +#include "quack/quack_types.hpp" + namespace quack { template @@ -33,8 +36,19 @@ FilterOperationSwitch(Datum &value, duckdb::Value &constant, Oid typeOid) { case INT8OID: return TemplatedFilterOperation(value, constant); break; + case FLOAT4OID: + return TemplatedFilterOperation(value, constant); + break; + case FLOAT8OID: + return TemplatedFilterOperation(value, constant); + break; + case DATEOID: { + Datum dateDatum = static_cast(value + quack::QUACK_DUCK_DATE_OFFSET); + return TemplatedFilterOperation(dateDatum, constant); + break; + } default: - elog(ERROR, "Unsupported quack type: %d", typeOid); + elog(ERROR, "(DuckDB/FilterOperationSwitch) Unsupported quack type: %d", typeOid); } } diff --git a/src/quack_heap_seq_scan.cpp b/src/quack_heap_seq_scan.cpp index 96470ae2..58536334 100644 --- a/src/quack_heap_seq_scan.cpp +++ b/src/quack_heap_seq_scan.cpp @@ -86,7 +86,7 @@ PostgresHeapSeqScan::InitParallelScanState(duckdb::TableFunctionInitInput &input if (input.CanRemoveFilterColumns()) { for (duckdb::idx_t i = 0; i < input.projection_ids.size(); i++) { - m_parallel_scan_state.m_projections[input.projection_ids[i]] = input.column_ids[i]; + m_parallel_scan_state.m_projections[i] = input.column_ids[input.projection_ids[i]]; } } else { for (duckdb::idx_t i = 0; i < input.projection_ids.size(); i++) { diff --git a/src/quack_types.cpp b/src/quack_types.cpp index 3c8cd6b3..bacb3773 100644 --- a/src/quack_types.cpp +++ b/src/quack_types.cpp @@ -12,13 +12,10 @@ extern "C" { #include "quack/quack_filter.hpp" #include "quack/quack_heap_seq_scan.hpp" #include "quack/quack_detoast.hpp" +#include "quack/quack_types.hpp" namespace quack { -// DuckDB has date starting from 1/1/1970 while PG starts from 1/1/2000 -constexpr int32_t QUACK_DUCK_DATE_OFFSET = 10957; -constexpr int64_t QUACK_DUCK_TIMESTAMP_OFFSET = INT64CONST(10957) * USECS_PER_DAY; - void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col) { Oid oid = slot->tts_tupleDescriptor->attrs[col].atttypid; @@ -54,12 +51,12 @@ ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col } case DATEOID: { duckdb::date_t date = value.GetValue(); - slot->tts_values[col] = date.days - QUACK_DUCK_DATE_OFFSET; + slot->tts_values[col] = date.days - quack::QUACK_DUCK_DATE_OFFSET; break; } case TIMESTAMPOID: { duckdb::dtime_t timestamp = value.GetValue(); - slot->tts_values[col] = timestamp.micros - QUACK_DUCK_TIMESTAMP_OFFSET; + slot->tts_values[col] = timestamp.micros - quack::QUACK_DUCK_TIMESTAMP_OFFSET; break; } case FLOAT8OID: @@ -123,7 +120,7 @@ GetPostgresDuckDBType(duckdb::LogicalTypeId type) { case duckdb::LogicalTypeId::DOUBLE: return FLOAT8OID; default: - elog(ERROR, "(DuckDB/GetPostgresDuckDBType) Unsupported quack type: %d", static_cast(type)); + elog(ERROR, "(DuckDB/GetPostgresDuckDBType) Unsupported quack type: %d", static_cast(type)); } } @@ -173,7 +170,8 @@ ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) { offset); break; default: - elog(ERROR, "Unsupported quack type: %d", static_cast(result.GetType().id())); + elog(ERROR, "(DuckDB/ConvertPostgresToDuckValue) Unsupported quack type: %d", + static_cast(result.GetType().id())); break; } } @@ -297,16 +295,18 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &t auto &array_mask = duckdb::FlatVector::Validity(result); array_mask.SetInvalid(threadScanInfo.m_output_vector_size); } else { + idx_t projectionColumnIdx = parallelScanState.m_columns[parallelScanState.m_projections[idx]]; if (threadScanInfo.m_tuple_desc->attrs[parallelScanState.m_projections[idx]].attlen == -1) { bool shouldFree = false; - values[idx] = DetoastPostgresDatum(reinterpret_cast(values[idx]), parallelScanState.m_lock, - &shouldFree); - ConvertPostgresToDuckValue(values[idx], result, threadScanInfo.m_output_vector_size); + values[projectionColumnIdx] = + DetoastPostgresDatum(reinterpret_cast(values[projectionColumnIdx]), + parallelScanState.m_lock, &shouldFree); + ConvertPostgresToDuckValue(values[projectionColumnIdx], result, threadScanInfo.m_output_vector_size); if (shouldFree) { - duckdb_free(reinterpret_cast(values[idx])); + duckdb_free(reinterpret_cast(values[projectionColumnIdx])); } } else { - ConvertPostgresToDuckValue(values[idx], result, threadScanInfo.m_output_vector_size); + ConvertPostgresToDuckValue(values[projectionColumnIdx], result, threadScanInfo.m_output_vector_size); } } } From 9e092f5e03a9157435438e33f26a10481e5f1dd2 Mon Sep 17 00:00:00 2001 From: mkaruza Date: Wed, 8 May 2024 09:36:54 +0200 Subject: [PATCH 3/5] Release memory that was allocated in result tuple / TIMESTAMP filter * Memory allocated columns needs to be released for each result tuple * Filter for TIMESTAMP --- src/quack_filter.cpp | 5 +++++ src/quack_node.cpp | 10 +++++++++- src/quack_types.cpp | 2 ++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/quack_filter.cpp b/src/quack_filter.cpp index a0a85cb1..099cdb23 100644 --- a/src/quack_filter.cpp +++ b/src/quack_filter.cpp @@ -47,6 +47,11 @@ FilterOperationSwitch(Datum &value, duckdb::Value &constant, Oid typeOid) { return TemplatedFilterOperation(dateDatum, constant); break; } + case TIMESTAMPOID: { + Datum timeStampDatum = static_cast(value + quack::QUACK_DUCK_TIMESTAMP_OFFSET); + return TemplatedFilterOperation(timeStampDatum, constant); + break; + } default: elog(ERROR, "(DuckDB/FilterOperationSwitch) Unsupported quack type: %d", typeOid); } diff --git a/src/quack_node.cpp b/src/quack_node.cpp index 7cc4d3e4..ccb32e16 100644 --- a/src/quack_node.cpp +++ b/src/quack_node.cpp @@ -52,8 +52,9 @@ Quack_BeginCustomScan(CustomScanState *cscanstate, EState *estate, int eflags) { static TupleTableSlot * Quack_ExecCustomScan(CustomScanState *node) { QuackScanState *quackScanState = (QuackScanState *)node; - TupleTableSlot *slot = quackScanState->css.ss.ss_ScanTupleSlot; + MemoryContext oldContext; + if (!quackScanState->is_executed) { quackScanState->queryResult = quackScanState->preparedStatement->Execute(); @@ -66,13 +67,18 @@ Quack_ExecCustomScan(CustomScanState *node) { quackScanState->currentRow = 0; quackScanState->fetch_next = false; if (!quackScanState->currentDataChunk || quackScanState->currentDataChunk->size() == 0) { + MemoryContextReset(quackScanState->css.ss.ps.ps_ExprContext->ecxt_per_tuple_memory); ExecClearTuple(slot); return slot; } } + MemoryContextReset(quackScanState->css.ss.ps.ps_ExprContext->ecxt_per_tuple_memory); ExecClearTuple(slot); + /* MemoryContext used for allocation */ + oldContext = MemoryContextSwitchTo(quackScanState->css.ss.ps.ps_ExprContext->ecxt_per_tuple_memory); + for (idx_t col = 0; col < quackScanState->columnCount; col++) { auto value = quackScanState->currentDataChunk->GetValue(col, quackScanState->currentRow); if (value.IsNull()) { @@ -83,6 +89,8 @@ Quack_ExecCustomScan(CustomScanState *node) { } } + MemoryContextSwitchTo(oldContext); + quackScanState->currentRow++; if (quackScanState->currentRow >= quackScanState->currentDataChunk->size()) { delete quackScanState->currentDataChunk.release(); diff --git a/src/quack_types.cpp b/src/quack_types.cpp index bacb3773..34ca106a 100644 --- a/src/quack_types.cpp +++ b/src/quack_types.cpp @@ -111,6 +111,8 @@ GetPostgresDuckDBType(duckdb::LogicalTypeId type) { return INT4OID; case duckdb::LogicalTypeId::BIGINT: return INT8OID; + case duckdb::LogicalTypeId::HUGEINT: + return NUMERICOID; case duckdb::LogicalTypeId::VARCHAR: return VARCHAROID; case duckdb::LogicalTypeId::DATE: From c40d80540b82a82f0a14229eed09d3f806e5f712 Mon Sep 17 00:00:00 2001 From: mkaruza Date: Mon, 13 May 2024 08:52:30 +0200 Subject: [PATCH 4/5] Check if catalog table is in RTE_SUBQUERY --- src/quack_heap_seq_scan.cpp | 1 - src/quack_hooks.cpp | 6 ++++++ src/quack_planner.cpp | 6 +++--- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/quack_heap_seq_scan.cpp b/src/quack_heap_seq_scan.cpp index 58536334..0c7cf37a 100644 --- a/src/quack_heap_seq_scan.cpp +++ b/src/quack_heap_seq_scan.cpp @@ -14,7 +14,6 @@ extern "C" { #include namespace quack { - PostgresHeapSeqScan::PostgresHeapSeqScan(RangeTblEntry *table) : m_tableEntry(table), m_rel(nullptr), m_snapshot(nullptr) { } diff --git a/src/quack_hooks.cpp b/src/quack_hooks.cpp index 1c95458e..0cb95467 100644 --- a/src/quack_hooks.cpp +++ b/src/quack_hooks.cpp @@ -20,6 +20,12 @@ is_catalog_table(List *tables) { ListCell *lc; foreach (lc, tables) { RangeTblEntry *table = (RangeTblEntry *)lfirst(lc); + if (table->rtekind == RTE_SUBQUERY) { + /* Check Subquery rtable list if any table is from PG catalog */ + if (is_catalog_table(table->subquery->rtable)) { + return true; + }; + } if (table->relid) { auto rel = RelationIdGetRelation(table->relid); auto namespaceOid = RelationGetNamespace(rel); diff --git a/src/quack_planner.cpp b/src/quack_planner.cpp index f4baa3d0..780b9612 100644 --- a/src/quack_planner.cpp +++ b/src/quack_planner.cpp @@ -16,13 +16,13 @@ extern "C" { namespace quack { -static duckdb::DuckDB * +static duckdb::unique_ptr quack_open_database() { duckdb::DBConfig config; // config.SetOption("memory_limit", "2GB"); // config.SetOption("threads", "8"); // config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); - return new duckdb::DuckDB(nullptr, &config); + return duckdb::make_uniq(nullptr, &config); } } // namespace quack @@ -82,7 +82,7 @@ quack_create_plan(Query *parse, const char *query) { } } - quackNode->custom_private = list_make2(db, preparedQuery.release()); + quackNode->custom_private = list_make2(db.release(), preparedQuery.release()); quackNode->methods = &quack_scan_scan_methods; return (Plan *)quackNode; From cd4417ae73046e792707bf9b23962b69f6562f89 Mon Sep 17 00:00:00 2001 From: mkaruza Date: Mon, 13 May 2024 08:56:50 +0200 Subject: [PATCH 5/5] Rerecord regression test --- expected/basic.out | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/expected/basic.out b/expected/basic.out index 7cdb86c5..cd677f0c 100644 --- a/expected/basic.out +++ b/expected/basic.out @@ -4,41 +4,45 @@ INSERT INTO t SELECT g % 10 from generate_series(1,1000000) g; SET client_min_messages to 'DEBUG3'; SELECT COUNT(*) FROM t; DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- +DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- DEBUG: -- (DuckDB/PostgresHeapScanGlobalState) Running 1 threads -- - count ---------- - 1000000 + count_star() +-------------- + 1000000 (1 row) SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- +DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- DEBUG: -- (DuckDB/PostgresHeapScanGlobalState) Running 1 threads -- - a | count ----+-------- - 6 | 100000 - 7 | 100000 - 8 | 100000 - 9 | 100000 + a | count_star() +---+-------------- + 6 | 100000 + 7 | 100000 + 8 | 100000 + 9 | 100000 (4 rows) SET quack.max_threads_per_query to 4; SELECT COUNT(*) FROM t; DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- +DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- DEBUG: -- (DuckDB/PostgresHeapScanGlobalState) Running 4 threads -- - count ---------- - 1000000 + count_star() +-------------- + 1000000 (1 row) SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- +DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- DEBUG: -- (DuckDB/PostgresHeapScanGlobalState) Running 4 threads -- - a | count ----+-------- - 6 | 100000 - 7 | 100000 - 8 | 100000 - 9 | 100000 + a | count_star() +---+-------------- + 6 | 100000 + 7 | 100000 + 8 | 100000 + 9 | 100000 (4 rows) SET quack.max_threads_per_query TO default;