From 5ad3580356f435eda12a7c524740a8d579ee0e48 Mon Sep 17 00:00:00 2001 From: mkaruza Date: Fri, 3 May 2024 11:50:26 +0200 Subject: [PATCH] Quack node * Hook into postgres planner rather than on execution. Idea is to split DuckDB execution into prepare/execute phase. If DuckDB prepare is not valid will we fall back to normal postgres execution. * Another benefit of having planner hook is that custom RETURN TABLE FUNCTIONS can be used in PG syntax. This custom function should only be created to pass parsing phase. Wee can now use `SELECT * FROM read_parquet('...')` that will read parquet files through DuckDB. * Created quack node which will be used for duckdb exection. This quack nodes is defined as `Custom Scan Node`. EXPLAIN will work out of box with this approach - we'll output just explain plan from DuckDB execution. * Added `httpfs` extension to be build together with parquer extension. --- Makefile | 13 +-- include/quack/quack_filter.hpp | 2 + include/quack/quack_heap_scan.hpp | 5 +- include/quack/quack_heap_seq_scan.hpp | 2 + include/quack/quack_node.hpp | 9 ++ include/quack/quack_planner.hpp | 8 ++ include/quack/quack_select.h | 8 -- include/quack/quack_types.hpp | 3 + quack--0.0.1.sql | 8 ++ src/quack.cpp | 3 +- src/quack_heap_scan.cpp | 4 +- src/quack_heap_seq_scan.cpp | 28 +++-- src/quack_hooks.cpp | 28 ++--- src/quack_node.cpp | 148 ++++++++++++++++++++++++++ src/quack_planner.cpp | 130 ++++++++++++++++++++++ src/quack_select.cpp | 113 -------------------- src/quack_types.cpp | 30 +++++- 17 files changed, 386 insertions(+), 156 deletions(-) create mode 100644 include/quack/quack_node.hpp create mode 100644 include/quack/quack_planner.hpp delete mode 100644 include/quack/quack_select.h create mode 100644 src/quack_node.cpp create mode 100644 src/quack_planner.cpp delete mode 100644 src/quack_select.cpp diff --git a/Makefile b/Makefile index 214fffe2..ef9e4855 100644 --- a/Makefile +++ b/Makefile @@ -9,9 +9,10 @@ SRCS = src/quack_detoast.cpp \ src/quack_heap_scan.cpp \ src/quack_heap_seq_scan.cpp \ src/quack_hooks.cpp \ - src/quack_select.cpp \ - src/quack_types.cpp \ src/quack_memory_allocator.cpp \ + src/quack_node.cpp \ + src/quack_planner.cpp \ + src/quack_types.cpp \ src/quack.cpp OBJS = $(subst .cpp,.o, $(SRCS)) @@ -32,8 +33,8 @@ ifeq ($(QUACK_BUILD), Debug) QUACK_BUILD_CXX_FLAGS = -g -O0 QUACK_BUILD_DUCKDB = debug else - QUACK_BUILD_CXX_FLAGS = - QUACK_BUILD_DUCKDB = release + QUACK_BUILD_CXX_FLAGS = -g -O0 + QUACK_BUILD_DUCKDB = debug endif override PG_CPPFLAGS += -Iinclude -Ithird_party/duckdb/src/include -std=c++17 -Wno-sign-compare ${QUACK_BUILD_CXX_FLAGS} @@ -54,7 +55,7 @@ ifeq ($(UNAME_S),Linux) DUCKDB_LIB = libduckdb.so endif -all: duckdb $(OBJS) +all: duckdb $(OBJS) .depend include $(PGXS) @@ -64,7 +65,7 @@ third_party/duckdb/Makefile: git submodule update --init --recursive third_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src/$(DUCKDB_LIB): - $(MAKE) -C third_party/duckdb $(QUACK_BUILD_DUCKDB) DISABLE_SANITIZER=1 ENABLE_UBSAN=0 BUILD_UNITTESTS=OFF CMAKE_EXPORT_COMPILE_COMMANDS=1 + $(MAKE) -C third_party/duckdb $(QUACK_BUILD_DUCKDB) DISABLE_SANITIZER=1 ENABLE_UBSAN=0 BUILD_UNITTESTS=OFF BUILD_HTTPFS=1 CMAKE_EXPORT_COMPILE_COMMANDS=1 install_duckdb: $(install_bin) -m 755 third_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src/$(DUCKDB_LIB) $(DESTDIR)$(PG_LIB) diff --git a/include/quack/quack_filter.hpp b/include/quack/quack_filter.hpp index 456cdb7a..eff8ecc7 100644 --- a/include/quack/quack_filter.hpp +++ b/include/quack/quack_filter.hpp @@ -7,5 +7,7 @@ extern "C" { } namespace quack { + bool ApplyValueFilter(duckdb::TableFilter &filter, Datum &value, bool isNull, Oid typeOid); + } // namespace quack \ No newline at end of file diff --git a/include/quack/quack_heap_scan.hpp b/include/quack/quack_heap_scan.hpp index 9d7ecac3..92abe2cf 100644 --- a/include/quack/quack_heap_scan.hpp +++ b/include/quack/quack_heap_scan.hpp @@ -78,12 +78,13 @@ struct PostgresHeapScanFunction : public duckdb::TableFunction { struct PostgresHeapReplacementScanData : public duckdb::ReplacementScanData { public: - PostgresHeapReplacementScanData(QueryDesc *desc) : desc(desc) { + PostgresHeapReplacementScanData(Query *parse, const char *query) : m_parse(parse), m_query(query) { } ~PostgresHeapReplacementScanData() override {}; public: - QueryDesc *desc; + Query *m_parse; + std::string m_query; }; duckdb::unique_ptr PostgresHeapReplacementScan(duckdb::ClientContext &context, diff --git a/include/quack/quack_heap_seq_scan.hpp b/include/quack/quack_heap_seq_scan.hpp index 1921d407..67ba1113 100644 --- a/include/quack/quack_heap_seq_scan.hpp +++ b/include/quack/quack_heap_seq_scan.hpp @@ -79,6 +79,7 @@ class PostgresHeapSeqScan { public: Relation GetRelation(); + void CloseRelation(); TupleDesc GetTupleDesc(); bool ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo); bool IsValid() const; @@ -87,6 +88,7 @@ class PostgresHeapSeqScan { Page PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanInfo); private: + RangeTblEntry * m_tableEntry = nullptr; Relation m_rel = nullptr; Snapshot m_snapshot = nullptr; PostgresHeapSeqParallelScanState m_parallel_scan_state; diff --git a/include/quack/quack_node.hpp b/include/quack/quack_node.hpp new file mode 100644 index 00000000..25c30727 --- /dev/null +++ b/include/quack/quack_node.hpp @@ -0,0 +1,9 @@ +#pragma once + +extern "C" { +#include "postgres.h" +#include "nodes/extensible.h" +} + +extern CustomScanMethods quack_scan_scan_methods; +extern "C" void quack_init_node(void); \ No newline at end of file diff --git a/include/quack/quack_planner.hpp b/include/quack/quack_planner.hpp new file mode 100644 index 00000000..c2a70514 --- /dev/null +++ b/include/quack/quack_planner.hpp @@ -0,0 +1,8 @@ +#pragma once + +extern "C" { +#include "postgres.h" +#include "optimizer/planner.h" +} + +PlannedStmt *quack_plan_node(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams); \ No newline at end of file diff --git a/include/quack/quack_select.h b/include/quack/quack_select.h deleted file mode 100644 index 7490bdcf..00000000 --- a/include/quack/quack_select.h +++ /dev/null @@ -1,8 +0,0 @@ -#pragma once - -extern "C" { -#include "postgres.h" -#include "executor/executor.h" -} - -extern "C" bool quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t count); \ No newline at end of file diff --git a/include/quack/quack_types.hpp b/include/quack/quack_types.hpp index eeb7e4a8..dd2f382b 100644 --- a/include/quack/quack_types.hpp +++ b/include/quack/quack_types.hpp @@ -10,9 +10,12 @@ extern "C" { #include "quack/quack_heap_seq_scan.hpp" namespace quack { + duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type); +Oid GetPostgresDuckDBType(duckdb::LogicalTypeId type); void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset); void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col); void InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo, PostgresHeapSeqParallelScanState ¶llelScanState); + } // namespace quack \ No newline at end of file diff --git a/quack--0.0.1.sql b/quack--0.0.1.sql index 97a4d362..e61f1782 100644 --- a/quack--0.0.1.sql +++ b/quack--0.0.1.sql @@ -1 +1,9 @@ LOAD 'quack'; + +CREATE OR REPLACE FUNCTION read_parquet(path text) +RETURNS SETOF record LANGUAGE 'plpgsql' AS +$func$ +BEGIN + RETURN QUERY EXECUTE 'SELECT 1'; +END; +$func$; diff --git a/src/quack.cpp b/src/quack.cpp index dd507911..69ff02b5 100644 --- a/src/quack.cpp +++ b/src/quack.cpp @@ -1,10 +1,10 @@ extern "C" { #include "postgres.h" - #include "utils/guc.h" } #include "quack/quack.h" +#include "quack/quack_node.hpp" static void quack_init_guc(void); @@ -17,6 +17,7 @@ void _PG_init(void) { quack_init_guc(); quack_init_hooks(); + quack_init_node(); } } diff --git a/src/quack_heap_scan.cpp b/src/quack_heap_scan.cpp index 19d7a98a..031524ef 100644 --- a/src/quack_heap_scan.cpp +++ b/src/quack_heap_scan.cpp @@ -188,14 +188,14 @@ PostgresHeapReplacementScan(duckdb::ClientContext &context, const duckdb::string auto &scan_data = reinterpret_cast(*data); /* Check name against query rtable list and verify that it is heap table */ - auto table = FindMatchingHeapRelation(scan_data.desc->plannedstmt->rtable, table_name); + auto table = FindMatchingHeapRelation(scan_data.m_parse->rtable, table_name); if (!table) { return nullptr; } // Create POINTER values from the 'table' and 'snapshot' variables - auto children = CreateFunctionArguments(table, scan_data.desc->estate->es_snapshot); + auto children = CreateFunctionArguments(table, GetActiveSnapshot()); auto table_function = duckdb::make_uniq(); table_function->function = duckdb::make_uniq("postgres_heap_scan", std::move(children)); diff --git a/src/quack_heap_seq_scan.cpp b/src/quack_heap_seq_scan.cpp index 3d028632..96470ae2 100644 --- a/src/quack_heap_seq_scan.cpp +++ b/src/quack_heap_seq_scan.cpp @@ -16,7 +16,7 @@ extern "C" { namespace quack { PostgresHeapSeqScan::PostgresHeapSeqScan(RangeTblEntry *table) - : m_rel(RelationIdGetRelation(table->relid)), m_snapshot(nullptr) { + : m_tableEntry(table), m_rel(nullptr), m_snapshot(nullptr) { } PostgresHeapSeqScan::~PostgresHeapSeqScan() { @@ -25,15 +25,28 @@ PostgresHeapSeqScan::~PostgresHeapSeqScan() { } } -PostgresHeapSeqScan::PostgresHeapSeqScan(PostgresHeapSeqScan &&other) : m_rel(other.m_rel) { - other.m_rel = nullptr; +PostgresHeapSeqScan::PostgresHeapSeqScan(PostgresHeapSeqScan &&other) + : m_tableEntry(other.m_tableEntry), m_rel(nullptr) { + other.CloseRelation(); + other.m_tableEntry = nullptr; } Relation PostgresHeapSeqScan::GetRelation() { + if (m_tableEntry && m_rel == nullptr) { + m_rel = RelationIdGetRelation(m_tableEntry->relid); + } return m_rel; } +void +PostgresHeapSeqScan::CloseRelation() { + if (IsValid()) { + RelationClose(m_rel); + } + m_rel = nullptr; +} + bool PostgresHeapSeqScan::IsValid() const { return RelationIsValid(m_rel); @@ -56,7 +69,8 @@ PostgresHeapSeqScan::PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanIn } void -PostgresHeapSeqScan::InitParallelScanState( duckdb::TableFunctionInitInput &input) { +PostgresHeapSeqScan::InitParallelScanState(duckdb::TableFunctionInitInput &input) { + (void) GetRelation(); m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel); /* SELECT COUNT(*) FROM */ @@ -80,8 +94,7 @@ PostgresHeapSeqScan::InitParallelScanState( duckdb::TableFunctionInitInput &inpu } } - - //m_parallel_scan_state.PrefetchNextRelationPages(m_rel); + // m_parallel_scan_state.PrefetchNextRelationPages(m_rel); m_parallel_scan_state.m_filters = input.filters.get(); } @@ -110,7 +123,7 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc threadScanInfo.m_buffer = ReadBufferExtended(m_rel, MAIN_FORKNUM, block, RBM_NORMAL, m_parallel_scan_state.m_strategy); LockBuffer(threadScanInfo.m_buffer, BUFFER_LOCK_SHARE); - //m_parallel_scan_state.PrefetchNextRelationPages(m_rel); + // m_parallel_scan_state.PrefetchNextRelationPages(m_rel); m_parallel_scan_state.m_lock.unlock(); page = PreparePageRead(threadScanInfo); threadScanInfo.m_read_next_page = false; @@ -196,7 +209,6 @@ PostgresHeapSeqParallelScanState::PrefetchNextRelationPages(Relation rel) { (m_last_prefetch_block - m_last_assigned_block_number) > 8) return; - for (BlockNumber i = m_last_prefetch_block; i < last_batch_prefetch_block_num; i++) { PrefetchBuffer(rel, MAIN_FORKNUM, m_last_prefetch_block); m_last_prefetch_block++; diff --git a/src/quack_hooks.cpp b/src/quack_hooks.cpp index d280a157..1c95458e 100644 --- a/src/quack_hooks.cpp +++ b/src/quack_hooks.cpp @@ -6,9 +6,9 @@ extern "C" { } #include "quack/quack.h" -#include "quack/quack_select.h" +#include "quack/quack_planner.hpp" -static ExecutorRun_hook_type PrevExecutorRunHook = NULL; +static planner_hook_type PrevPlannerHook = NULL; static bool is_quack_extension_registered() { @@ -33,24 +33,24 @@ is_catalog_table(List *tables) { return false; } -static void -quack_executor_run(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) { - if (is_quack_extension_registered() && !is_catalog_table(queryDesc->plannedstmt->rtable) && - queryDesc->operation == CMD_SELECT) { - if (quack_execute_select(queryDesc, direction, count)) { - return; +static PlannedStmt * +quack_planner(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams) { + if (is_quack_extension_registered() && !is_catalog_table(parse->rtable) && parse->commandType == CMD_SELECT) { + PlannedStmt * quackPlan = quack_plan_node(parse, query_string, cursorOptions, boundParams); + if (quackPlan) { + return quackPlan; } } - elog(DEBUG3, "quack_executor_run: Failing back to PG execution"); - - if (PrevExecutorRunHook) { - PrevExecutorRunHook(queryDesc, direction, count, execute_once); + if (PrevPlannerHook) { + return PrevPlannerHook(parse, query_string, cursorOptions, boundParams); + } else { + return standard_planner(parse, query_string, cursorOptions, boundParams); } } void quack_init_hooks(void) { - PrevExecutorRunHook = ExecutorRun_hook ? ExecutorRun_hook : standard_ExecutorRun; - ExecutorRun_hook = quack_executor_run; + PrevPlannerHook = planner_hook; + planner_hook = quack_planner; } \ No newline at end of file diff --git a/src/quack_node.cpp b/src/quack_node.cpp new file mode 100644 index 00000000..7cc4d3e4 --- /dev/null +++ b/src/quack_node.cpp @@ -0,0 +1,148 @@ + +#include "duckdb.hpp" + +#include "quack/quack_node.hpp" +#include "quack/quack_heap_scan.hpp" +#include "quack/quack_types.hpp" + +/* global variables */ +CustomScanMethods quack_scan_scan_methods; + +/* static variables */ +static CustomExecMethods quack_scan_exec_methods; + +typedef struct QuackScanState { + CustomScanState css; /* must be first field */ + duckdb::DuckDB *duckdb; + duckdb::PreparedStatement *preparedStatement; + bool is_executed; + bool fetch_next; + duckdb::unique_ptr queryResult; + duckdb::idx_t columnCount; + duckdb::unique_ptr currentDataChunk; + duckdb::idx_t currentRow; +} QuackScanState; + +/* static callbacks */ +static Node *Quack_CreateCustomScanState(CustomScan *cscan); +static void Quack_BeginCustomScan(CustomScanState *node, EState *estate, int eflags); +static TupleTableSlot *Quack_ExecCustomScan(CustomScanState *node); +static void Quack_EndCustomScan(CustomScanState *node); +static void Quack_ReScanCustomScan(CustomScanState *node); +static void Quack_ExplainCustomScan(CustomScanState *node, List *ancestors, ExplainState *es); + +static Node * +Quack_CreateCustomScanState(CustomScan *cscan) { + QuackScanState *quackScanState = (QuackScanState *)newNode(sizeof(QuackScanState), T_CustomScanState); + CustomScanState *customScanState = &quackScanState->css; + quackScanState->duckdb = (duckdb::DuckDB *)linitial(cscan->custom_private); + quackScanState->preparedStatement = (duckdb::PreparedStatement *)lsecond(cscan->custom_private); + quackScanState->is_executed = false; + quackScanState->fetch_next = true; + customScanState->methods = &quack_scan_exec_methods; + return (Node *)customScanState; +} + +void +Quack_BeginCustomScan(CustomScanState *cscanstate, EState *estate, int eflags) { + QuackScanState *quackScanState = (QuackScanState *)cscanstate; + quackScanState->css.ss.ps.ps_ResultTupleDesc = quackScanState->css.ss.ss_ScanTupleSlot->tts_tupleDescriptor; +} + +static TupleTableSlot * +Quack_ExecCustomScan(CustomScanState *node) { + QuackScanState *quackScanState = (QuackScanState *)node; + + TupleTableSlot *slot = quackScanState->css.ss.ss_ScanTupleSlot; + + if (!quackScanState->is_executed) { + quackScanState->queryResult = quackScanState->preparedStatement->Execute(); + quackScanState->columnCount = quackScanState->queryResult->ColumnCount(); + quackScanState->is_executed = true; + } + + if (quackScanState->fetch_next) { + quackScanState->currentDataChunk = quackScanState->queryResult->Fetch(); + quackScanState->currentRow = 0; + quackScanState->fetch_next = false; + if (!quackScanState->currentDataChunk || quackScanState->currentDataChunk->size() == 0) { + ExecClearTuple(slot); + return slot; + } + } + + ExecClearTuple(slot); + + for (idx_t col = 0; col < quackScanState->columnCount; col++) { + auto value = quackScanState->currentDataChunk->GetValue(col, quackScanState->currentRow); + if (value.IsNull()) { + slot->tts_isnull[col] = true; + } else { + slot->tts_isnull[col] = false; + quack::ConvertDuckToPostgresValue(slot, value, col); + } + } + + quackScanState->currentRow++; + if (quackScanState->currentRow >= quackScanState->currentDataChunk->size()) { + delete quackScanState->currentDataChunk.release(); + quackScanState->fetch_next = true; + } + + ExecStoreVirtualTuple(slot); + return slot; +} + +void +Quack_EndCustomScan(CustomScanState *node) { + QuackScanState *quackScanState = (QuackScanState *)node; + quackScanState->queryResult.reset(); + delete quackScanState->preparedStatement; + delete quackScanState->duckdb; +} + +void +Quack_ReScanCustomScan(CustomScanState *node) { +} + +void +Quack_ExplainCustomScan(CustomScanState *node, List *ancestors, ExplainState *es) { + QuackScanState *quackScanState = (QuackScanState *)node; + auto res = quackScanState->preparedStatement->Execute(); + std::string explainOutput = "\n\n"; + auto chunk = res->Fetch(); + if (!chunk || chunk->size() == 0) { + return; + } + /* Is it safe to hardcode this as result of DuckDB explain? */ + auto value = chunk->GetValue(1, 0); + explainOutput += value.GetValue(); + explainOutput += "\n"; + ExplainPropertyText("DuckDB Execution Plan", explainOutput.c_str(), es); +} + +extern "C" void +quack_init_node() { + /* setup scan methods */ + memset(&quack_scan_scan_methods, 0, sizeof(quack_scan_scan_methods)); + quack_scan_scan_methods.CustomName = "DuckDBScan"; + quack_scan_scan_methods.CreateCustomScanState = Quack_CreateCustomScanState; + RegisterCustomScanMethods(&quack_scan_scan_methods); + + /* setup exec methods */ + memset(&quack_scan_exec_methods, 0, sizeof(quack_scan_exec_methods)); + quack_scan_exec_methods.CustomName = "DuckDBScan"; + + quack_scan_exec_methods.BeginCustomScan = Quack_BeginCustomScan; + quack_scan_exec_methods.ExecCustomScan = Quack_ExecCustomScan; + quack_scan_exec_methods.EndCustomScan = Quack_EndCustomScan; + quack_scan_exec_methods.ReScanCustomScan = Quack_ReScanCustomScan; + + quack_scan_exec_methods.EstimateDSMCustomScan = NULL; + quack_scan_exec_methods.InitializeDSMCustomScan = NULL; + quack_scan_exec_methods.ReInitializeDSMCustomScan = NULL; + quack_scan_exec_methods.InitializeWorkerCustomScan = NULL; + quack_scan_exec_methods.ShutdownCustomScan = NULL; + + quack_scan_exec_methods.ExplainCustomScan = Quack_ExplainCustomScan; +} \ No newline at end of file diff --git a/src/quack_planner.cpp b/src/quack_planner.cpp new file mode 100644 index 00000000..f4baa3d0 --- /dev/null +++ b/src/quack_planner.cpp @@ -0,0 +1,130 @@ +#include "duckdb.hpp" +#include "duckdb/parser/parsed_data/create_table_function_info.hpp" + +extern "C" { +#include "postgres.h" +#include "catalog/pg_type.h" +#include "nodes/nodes.h" +#include "nodes/makefuncs.h" +#include "utils/syscache.h" +} + +#include "quack/quack_heap_scan.hpp" +#include "quack/quack_node.hpp" +#include "quack/quack_planner.hpp" +#include "quack/quack_types.hpp" + +namespace quack { + +static duckdb::DuckDB * +quack_open_database() { + duckdb::DBConfig config; + // config.SetOption("memory_limit", "2GB"); + // config.SetOption("threads", "8"); + // config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); + return new duckdb::DuckDB(nullptr, &config); +} + +} // namespace quack + +static Plan * +quack_create_plan(Query *parse, const char *query) { + auto db = quack::quack_open_database(); + + /* Add heap tables */ + db->instance->config.replacement_scans.emplace_back( + quack::PostgresHeapReplacementScan, + duckdb::make_uniq_base(parse, query)); + auto connection = duckdb::make_uniq(*db); + + // Add the postgres_scan inserted by the replacement scan + auto &context = *connection->context; + quack::PostgresHeapScanFunction heap_scan_fun; + duckdb::CreateTableFunctionInfo heap_scan_info(heap_scan_fun); + + auto &catalog = duckdb::Catalog::GetSystemCatalog(context); + context.transaction.BeginTransaction(); + catalog.CreateTableFunction(context, &heap_scan_info); + context.transaction.Commit(); + + auto preparedQuery = context.Prepare(query); + + if (preparedQuery->HasError()) { + elog(INFO, "(Quack) %s", preparedQuery->GetError().c_str()); + return nullptr; + } + + CustomScan *quackNode = makeNode(CustomScan); + + auto &preparedResultTypes = preparedQuery->GetTypes(); + + for (auto i = 0; i < preparedResultTypes.size(); i++) { + auto &column = preparedResultTypes[i]; + Oid postgresColumnOid = quack::GetPostgresDuckDBType(column.id()); + + if (OidIsValid(postgresColumnOid)) { + HeapTuple tp; + Form_pg_type typtup; + + tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(postgresColumnOid)); + if (!HeapTupleIsValid(tp)) + elog(ERROR, "cache lookup failed for type %u", postgresColumnOid); + + typtup = (Form_pg_type)GETSTRUCT(tp); + + Var *var = makeVar(INDEX_VAR, i + 1, postgresColumnOid, typtup->typtypmod, typtup->typcollation, 0); + + quackNode->custom_scan_tlist = + lappend(quackNode->custom_scan_tlist, + makeTargetEntry((Expr *)var, i + 1, (char *)preparedQuery->GetNames()[i].c_str(), false)); + + ReleaseSysCache(tp); + } + } + + quackNode->custom_private = list_make2(db, preparedQuery.release()); + quackNode->methods = &quack_scan_scan_methods; + + return (Plan *)quackNode; +} + +PlannedStmt * +quack_plan_node(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams) { + + /* We need to check can we DuckDB create plan */ + Plan *quackPlan = (Plan *)castNode(CustomScan, quack_create_plan(parse, query_string)); + + if (!quackPlan) { + return nullptr; + } + + /* build the PlannedStmt result */ + PlannedStmt *result = makeNode(PlannedStmt); + + result->commandType = parse->commandType; + result->queryId = parse->queryId; + result->hasReturning = (parse->returningList != NIL); + result->hasModifyingCTE = parse->hasModifyingCTE; + result->canSetTag = parse->canSetTag; + result->transientPlan = false; + result->dependsOnRole = false; + result->parallelModeNeeded = false; + result->planTree = quackPlan; + result->rtable = NULL; + result->permInfos = NULL; + result->resultRelations = NULL; + result->appendRelations = NULL; + result->subplans = NIL; + result->rewindPlanIDs = NULL; + result->rowMarks = NIL; + result->relationOids = NIL; + result->invalItems = NIL; + result->paramExecTypes = NIL; + + /* utilityStmt should be null, but we might as well copy it */ + result->utilityStmt = parse->utilityStmt; + result->stmt_location = parse->stmt_location; + result->stmt_len = parse->stmt_len; + + return result; +} diff --git a/src/quack_select.cpp b/src/quack_select.cpp deleted file mode 100644 index bf879499..00000000 --- a/src/quack_select.cpp +++ /dev/null @@ -1,113 +0,0 @@ -#include "duckdb/parser/parsed_data/create_table_function_info.hpp" -#include "duckdb.hpp" - -extern "C" { -#include "postgres.h" -#include "fmgr.h" - -#include "access/genam.h" -#include "access/table.h" -#include "catalog/namespace.h" -#include "catalog/pg_proc.h" -#include "utils/lsyscache.h" -#include "utils/syscache.h" -#include "utils/rel.h" - -#include "quack/quack_select.h" -} - -#include "quack/quack_heap_scan.hpp" -#include "quack/quack_types.hpp" -#include "quack/quack_memory_allocator.hpp" - -namespace quack { - -static duckdb::unique_ptr -quack_open_database() { - duckdb::DBConfig config; - //config.SetOption("memory_limit", "2GB"); - //config.SetOption("threads", "8"); - //config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); - return duckdb::make_uniq(nullptr, &config); -} - -} // namespace quack - -extern "C" bool -quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t count) { - auto db = quack::quack_open_database(); - - /* Add heap tables */ - db->instance->config.replacement_scans.emplace_back( - quack::PostgresHeapReplacementScan, - duckdb::make_uniq_base(query_desc)); - auto connection = duckdb::make_uniq(*db); - - // Add the postgres_scan inserted by the replacement scan - auto &context = *connection->context; - quack::PostgresHeapScanFunction heap_scan_fun; - duckdb::CreateTableFunctionInfo heap_scan_info(heap_scan_fun); - - auto &catalog = duckdb::Catalog::GetSystemCatalog(context); - context.transaction.BeginTransaction(); - catalog.CreateTableFunction(context, &heap_scan_info); - context.transaction.Commit(); - - idx_t column_count; - - CmdType operation; - DestReceiver *dest; - - TupleTableSlot *slot = NULL; - - // FIXME: try-catch ? - - duckdb::unique_ptr res = nullptr; - - res = connection->Query(query_desc->sourceText); - if (res->HasError()) { - return false; - } - - operation = query_desc->operation; - dest = query_desc->dest; - - dest->rStartup(dest, operation, query_desc->tupDesc); - - slot = MakeTupleTableSlot(query_desc->tupDesc, &TTSOpsHeapTuple); - column_count = res->ColumnCount(); - - while (true) { - auto chunk = res->Fetch(); - - if (!chunk || chunk->size() == 0) { - break; - } - - for (idx_t row = 0; row < chunk->size(); row++) { - ExecClearTuple(slot); - - for (idx_t col = 0; col < column_count; col++) { - auto value = chunk->GetValue(col, row); - if (value.IsNull()) { - slot->tts_isnull[col] = true; - } else { - slot->tts_isnull[col] = false; - quack::ConvertDuckToPostgresValue(slot, value, col); - } - } - - ExecStoreVirtualTuple(slot); - - dest->receiveSlot(slot, dest); - - for (idx_t i = 0; i < column_count; i++) { - if (slot->tts_tupleDescriptor->attrs[i].attbyval == false) { - pfree(DatumGetPointer(slot->tts_values[i])); - } - } - } - } - dest->rShutdown(dest); - return true; -} \ No newline at end of file diff --git a/src/quack_types.cpp b/src/quack_types.cpp index c3e29390..3c8cd6b3 100644 --- a/src/quack_types.cpp +++ b/src/quack_types.cpp @@ -71,7 +71,7 @@ ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col break; } default: - elog(ERROR, "Unsuported quack type: %d", oid); + elog(ERROR, "(DuckDB/ConvertDuckToPostgresValue) Unsuported quack type: %d", oid); } } @@ -97,7 +97,33 @@ ConvertPostgresToDuckColumnType(Oid type) { case TIMESTAMPOID: return duckdb::LogicalTypeId::TIMESTAMP; default: - elog(ERROR, "Unsupported quack type: %d", type); + elog(ERROR, "(DuckDB/ConvertPostgresToDuckColumnType) Unsupported quack type: %d", type); + } +} + +Oid +GetPostgresDuckDBType(duckdb::LogicalTypeId type) { + switch (type) { + case duckdb::LogicalTypeId::BOOLEAN: + return BOOLOID; + case duckdb::LogicalTypeId::TINYINT: + return CHAROID; + case duckdb::LogicalTypeId::SMALLINT: + return INT2OID; + case duckdb::LogicalTypeId::INTEGER: + return INT4OID; + case duckdb::LogicalTypeId::BIGINT: + return INT8OID; + case duckdb::LogicalTypeId::VARCHAR: + return VARCHAROID; + case duckdb::LogicalTypeId::DATE: + return DATEOID; + case duckdb::LogicalTypeId::TIMESTAMP: + return TIMESTAMPOID; + case duckdb::LogicalTypeId::DOUBLE: + return FLOAT8OID; + default: + elog(ERROR, "(DuckDB/GetPostgresDuckDBType) Unsupported quack type: %d", static_cast(type)); } }