diff --git a/Makefile b/Makefile index 214fffe2..bc69710f 100644 --- a/Makefile +++ b/Makefile @@ -9,9 +9,10 @@ SRCS = src/quack_detoast.cpp \ src/quack_heap_scan.cpp \ src/quack_heap_seq_scan.cpp \ src/quack_hooks.cpp \ - src/quack_select.cpp \ - src/quack_types.cpp \ src/quack_memory_allocator.cpp \ + src/quack_node.cpp \ + src/quack_planner.cpp \ + src/quack_types.cpp \ src/quack.cpp OBJS = $(subst .cpp,.o, $(SRCS)) @@ -54,7 +55,7 @@ ifeq ($(UNAME_S),Linux) DUCKDB_LIB = libduckdb.so endif -all: duckdb $(OBJS) +all: duckdb $(OBJS) .depend include $(PGXS) @@ -64,7 +65,7 @@ third_party/duckdb/Makefile: git submodule update --init --recursive third_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src/$(DUCKDB_LIB): - $(MAKE) -C third_party/duckdb $(QUACK_BUILD_DUCKDB) DISABLE_SANITIZER=1 ENABLE_UBSAN=0 BUILD_UNITTESTS=OFF CMAKE_EXPORT_COMPILE_COMMANDS=1 + $(MAKE) -C third_party/duckdb $(QUACK_BUILD_DUCKDB) DISABLE_SANITIZER=1 ENABLE_UBSAN=0 BUILD_UNITTESTS=OFF BUILD_HTTPFS=1 CMAKE_EXPORT_COMPILE_COMMANDS=1 install_duckdb: $(install_bin) -m 755 third_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src/$(DUCKDB_LIB) $(DESTDIR)$(PG_LIB) diff --git a/expected/basic.out b/expected/basic.out index 7cdb86c5..cd677f0c 100644 --- a/expected/basic.out +++ b/expected/basic.out @@ -4,41 +4,45 @@ INSERT INTO t SELECT g % 10 from generate_series(1,1000000) g; SET client_min_messages to 'DEBUG3'; SELECT COUNT(*) FROM t; DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- +DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- DEBUG: -- (DuckDB/PostgresHeapScanGlobalState) Running 1 threads -- - count ---------- - 1000000 + count_star() +-------------- + 1000000 (1 row) SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- +DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- DEBUG: -- (DuckDB/PostgresHeapScanGlobalState) Running 1 threads -- - a | count ----+-------- - 6 | 100000 - 7 | 100000 - 8 | 100000 - 9 | 100000 + a | count_star() +---+-------------- + 6 | 100000 + 7 | 100000 + 8 | 100000 + 9 | 100000 (4 rows) SET quack.max_threads_per_query to 4; SELECT COUNT(*) FROM t; DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- +DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- DEBUG: -- (DuckDB/PostgresHeapScanGlobalState) Running 4 threads -- - count ---------- - 1000000 + count_star() +-------------- + 1000000 (1 row) SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- +DEBUG: -- (DuckDB/PostgresHeapBind) Column name: a, Type: INTEGER -- DEBUG: -- (DuckDB/PostgresHeapScanGlobalState) Running 4 threads -- - a | count ----+-------- - 6 | 100000 - 7 | 100000 - 8 | 100000 - 9 | 100000 + a | count_star() +---+-------------- + 6 | 100000 + 7 | 100000 + 8 | 100000 + 9 | 100000 (4 rows) SET quack.max_threads_per_query TO default; diff --git a/include/quack/quack_filter.hpp b/include/quack/quack_filter.hpp index 456cdb7a..eff8ecc7 100644 --- a/include/quack/quack_filter.hpp +++ b/include/quack/quack_filter.hpp @@ -7,5 +7,7 @@ extern "C" { } namespace quack { + bool ApplyValueFilter(duckdb::TableFilter &filter, Datum &value, bool isNull, Oid typeOid); + } // namespace quack \ No newline at end of file diff --git a/include/quack/quack_heap_scan.hpp b/include/quack/quack_heap_scan.hpp index 9d7ecac3..92abe2cf 100644 --- a/include/quack/quack_heap_scan.hpp +++ b/include/quack/quack_heap_scan.hpp @@ -78,12 +78,13 @@ struct PostgresHeapScanFunction : public duckdb::TableFunction { struct PostgresHeapReplacementScanData : public duckdb::ReplacementScanData { public: - PostgresHeapReplacementScanData(QueryDesc *desc) : desc(desc) { + PostgresHeapReplacementScanData(Query *parse, const char *query) : m_parse(parse), m_query(query) { } ~PostgresHeapReplacementScanData() override {}; public: - QueryDesc *desc; + Query *m_parse; + std::string m_query; }; duckdb::unique_ptr PostgresHeapReplacementScan(duckdb::ClientContext &context, diff --git a/include/quack/quack_heap_seq_scan.hpp b/include/quack/quack_heap_seq_scan.hpp index 1921d407..67ba1113 100644 --- a/include/quack/quack_heap_seq_scan.hpp +++ b/include/quack/quack_heap_seq_scan.hpp @@ -79,6 +79,7 @@ class PostgresHeapSeqScan { public: Relation GetRelation(); + void CloseRelation(); TupleDesc GetTupleDesc(); bool ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo); bool IsValid() const; @@ -87,6 +88,7 @@ class PostgresHeapSeqScan { Page PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanInfo); private: + RangeTblEntry * m_tableEntry = nullptr; Relation m_rel = nullptr; Snapshot m_snapshot = nullptr; PostgresHeapSeqParallelScanState m_parallel_scan_state; diff --git a/include/quack/quack_node.hpp b/include/quack/quack_node.hpp new file mode 100644 index 00000000..25c30727 --- /dev/null +++ b/include/quack/quack_node.hpp @@ -0,0 +1,9 @@ +#pragma once + +extern "C" { +#include "postgres.h" +#include "nodes/extensible.h" +} + +extern CustomScanMethods quack_scan_scan_methods; +extern "C" void quack_init_node(void); \ No newline at end of file diff --git a/include/quack/quack_planner.hpp b/include/quack/quack_planner.hpp new file mode 100644 index 00000000..c2a70514 --- /dev/null +++ b/include/quack/quack_planner.hpp @@ -0,0 +1,8 @@ +#pragma once + +extern "C" { +#include "postgres.h" +#include "optimizer/planner.h" +} + +PlannedStmt *quack_plan_node(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams); \ No newline at end of file diff --git a/include/quack/quack_select.h b/include/quack/quack_select.h deleted file mode 100644 index 7490bdcf..00000000 --- a/include/quack/quack_select.h +++ /dev/null @@ -1,8 +0,0 @@ -#pragma once - -extern "C" { -#include "postgres.h" -#include "executor/executor.h" -} - -extern "C" bool quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t count); \ No newline at end of file diff --git a/include/quack/quack_types.hpp b/include/quack/quack_types.hpp index eeb7e4a8..bb98cb5f 100644 --- a/include/quack/quack_types.hpp +++ b/include/quack/quack_types.hpp @@ -10,9 +10,16 @@ extern "C" { #include "quack/quack_heap_seq_scan.hpp" namespace quack { + +// DuckDB has date starting from 1/1/1970 while PG starts from 1/1/2000 +constexpr int32_t QUACK_DUCK_DATE_OFFSET = 10957; +constexpr int64_t QUACK_DUCK_TIMESTAMP_OFFSET = INT64CONST(10957) * USECS_PER_DAY; + duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type); +Oid GetPostgresDuckDBType(duckdb::LogicalTypeId type); void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset); void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col); void InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo, PostgresHeapSeqParallelScanState ¶llelScanState); + } // namespace quack \ No newline at end of file diff --git a/quack--0.0.1.sql b/quack--0.0.1.sql index 97a4d362..e61f1782 100644 --- a/quack--0.0.1.sql +++ b/quack--0.0.1.sql @@ -1 +1,9 @@ LOAD 'quack'; + +CREATE OR REPLACE FUNCTION read_parquet(path text) +RETURNS SETOF record LANGUAGE 'plpgsql' AS +$func$ +BEGIN + RETURN QUERY EXECUTE 'SELECT 1'; +END; +$func$; diff --git a/src/quack.cpp b/src/quack.cpp index dd507911..69ff02b5 100644 --- a/src/quack.cpp +++ b/src/quack.cpp @@ -1,10 +1,10 @@ extern "C" { #include "postgres.h" - #include "utils/guc.h" } #include "quack/quack.h" +#include "quack/quack_node.hpp" static void quack_init_guc(void); @@ -17,6 +17,7 @@ void _PG_init(void) { quack_init_guc(); quack_init_hooks(); + quack_init_node(); } } diff --git a/src/quack_filter.cpp b/src/quack_filter.cpp index 946b9720..099cdb23 100644 --- a/src/quack_filter.cpp +++ b/src/quack_filter.cpp @@ -6,6 +6,9 @@ extern "C" { #include "catalog/pg_type.h" } +#include "quack/quack_filter.hpp" +#include "quack/quack_types.hpp" + namespace quack { template @@ -33,8 +36,24 @@ FilterOperationSwitch(Datum &value, duckdb::Value &constant, Oid typeOid) { case INT8OID: return TemplatedFilterOperation(value, constant); break; + case FLOAT4OID: + return TemplatedFilterOperation(value, constant); + break; + case FLOAT8OID: + return TemplatedFilterOperation(value, constant); + break; + case DATEOID: { + Datum dateDatum = static_cast(value + quack::QUACK_DUCK_DATE_OFFSET); + return TemplatedFilterOperation(dateDatum, constant); + break; + } + case TIMESTAMPOID: { + Datum timeStampDatum = static_cast(value + quack::QUACK_DUCK_TIMESTAMP_OFFSET); + return TemplatedFilterOperation(timeStampDatum, constant); + break; + } default: - elog(ERROR, "Unsupported quack type: %d", typeOid); + elog(ERROR, "(DuckDB/FilterOperationSwitch) Unsupported quack type: %d", typeOid); } } diff --git a/src/quack_heap_scan.cpp b/src/quack_heap_scan.cpp index 19d7a98a..031524ef 100644 --- a/src/quack_heap_scan.cpp +++ b/src/quack_heap_scan.cpp @@ -188,14 +188,14 @@ PostgresHeapReplacementScan(duckdb::ClientContext &context, const duckdb::string auto &scan_data = reinterpret_cast(*data); /* Check name against query rtable list and verify that it is heap table */ - auto table = FindMatchingHeapRelation(scan_data.desc->plannedstmt->rtable, table_name); + auto table = FindMatchingHeapRelation(scan_data.m_parse->rtable, table_name); if (!table) { return nullptr; } // Create POINTER values from the 'table' and 'snapshot' variables - auto children = CreateFunctionArguments(table, scan_data.desc->estate->es_snapshot); + auto children = CreateFunctionArguments(table, GetActiveSnapshot()); auto table_function = duckdb::make_uniq(); table_function->function = duckdb::make_uniq("postgres_heap_scan", std::move(children)); diff --git a/src/quack_heap_seq_scan.cpp b/src/quack_heap_seq_scan.cpp index 3d028632..0c7cf37a 100644 --- a/src/quack_heap_seq_scan.cpp +++ b/src/quack_heap_seq_scan.cpp @@ -14,9 +14,8 @@ extern "C" { #include namespace quack { - PostgresHeapSeqScan::PostgresHeapSeqScan(RangeTblEntry *table) - : m_rel(RelationIdGetRelation(table->relid)), m_snapshot(nullptr) { + : m_tableEntry(table), m_rel(nullptr), m_snapshot(nullptr) { } PostgresHeapSeqScan::~PostgresHeapSeqScan() { @@ -25,15 +24,28 @@ PostgresHeapSeqScan::~PostgresHeapSeqScan() { } } -PostgresHeapSeqScan::PostgresHeapSeqScan(PostgresHeapSeqScan &&other) : m_rel(other.m_rel) { - other.m_rel = nullptr; +PostgresHeapSeqScan::PostgresHeapSeqScan(PostgresHeapSeqScan &&other) + : m_tableEntry(other.m_tableEntry), m_rel(nullptr) { + other.CloseRelation(); + other.m_tableEntry = nullptr; } Relation PostgresHeapSeqScan::GetRelation() { + if (m_tableEntry && m_rel == nullptr) { + m_rel = RelationIdGetRelation(m_tableEntry->relid); + } return m_rel; } +void +PostgresHeapSeqScan::CloseRelation() { + if (IsValid()) { + RelationClose(m_rel); + } + m_rel = nullptr; +} + bool PostgresHeapSeqScan::IsValid() const { return RelationIsValid(m_rel); @@ -56,7 +68,8 @@ PostgresHeapSeqScan::PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanIn } void -PostgresHeapSeqScan::InitParallelScanState( duckdb::TableFunctionInitInput &input) { +PostgresHeapSeqScan::InitParallelScanState(duckdb::TableFunctionInitInput &input) { + (void) GetRelation(); m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel); /* SELECT COUNT(*) FROM */ @@ -72,7 +85,7 @@ PostgresHeapSeqScan::InitParallelScanState( duckdb::TableFunctionInitInput &inpu if (input.CanRemoveFilterColumns()) { for (duckdb::idx_t i = 0; i < input.projection_ids.size(); i++) { - m_parallel_scan_state.m_projections[input.projection_ids[i]] = input.column_ids[i]; + m_parallel_scan_state.m_projections[i] = input.column_ids[input.projection_ids[i]]; } } else { for (duckdb::idx_t i = 0; i < input.projection_ids.size(); i++) { @@ -80,8 +93,7 @@ PostgresHeapSeqScan::InitParallelScanState( duckdb::TableFunctionInitInput &inpu } } - - //m_parallel_scan_state.PrefetchNextRelationPages(m_rel); + // m_parallel_scan_state.PrefetchNextRelationPages(m_rel); m_parallel_scan_state.m_filters = input.filters.get(); } @@ -110,7 +122,7 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc threadScanInfo.m_buffer = ReadBufferExtended(m_rel, MAIN_FORKNUM, block, RBM_NORMAL, m_parallel_scan_state.m_strategy); LockBuffer(threadScanInfo.m_buffer, BUFFER_LOCK_SHARE); - //m_parallel_scan_state.PrefetchNextRelationPages(m_rel); + // m_parallel_scan_state.PrefetchNextRelationPages(m_rel); m_parallel_scan_state.m_lock.unlock(); page = PreparePageRead(threadScanInfo); threadScanInfo.m_read_next_page = false; @@ -196,7 +208,6 @@ PostgresHeapSeqParallelScanState::PrefetchNextRelationPages(Relation rel) { (m_last_prefetch_block - m_last_assigned_block_number) > 8) return; - for (BlockNumber i = m_last_prefetch_block; i < last_batch_prefetch_block_num; i++) { PrefetchBuffer(rel, MAIN_FORKNUM, m_last_prefetch_block); m_last_prefetch_block++; diff --git a/src/quack_hooks.cpp b/src/quack_hooks.cpp index d280a157..0cb95467 100644 --- a/src/quack_hooks.cpp +++ b/src/quack_hooks.cpp @@ -6,9 +6,9 @@ extern "C" { } #include "quack/quack.h" -#include "quack/quack_select.h" +#include "quack/quack_planner.hpp" -static ExecutorRun_hook_type PrevExecutorRunHook = NULL; +static planner_hook_type PrevPlannerHook = NULL; static bool is_quack_extension_registered() { @@ -20,6 +20,12 @@ is_catalog_table(List *tables) { ListCell *lc; foreach (lc, tables) { RangeTblEntry *table = (RangeTblEntry *)lfirst(lc); + if (table->rtekind == RTE_SUBQUERY) { + /* Check Subquery rtable list if any table is from PG catalog */ + if (is_catalog_table(table->subquery->rtable)) { + return true; + }; + } if (table->relid) { auto rel = RelationIdGetRelation(table->relid); auto namespaceOid = RelationGetNamespace(rel); @@ -33,24 +39,24 @@ is_catalog_table(List *tables) { return false; } -static void -quack_executor_run(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) { - if (is_quack_extension_registered() && !is_catalog_table(queryDesc->plannedstmt->rtable) && - queryDesc->operation == CMD_SELECT) { - if (quack_execute_select(queryDesc, direction, count)) { - return; +static PlannedStmt * +quack_planner(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams) { + if (is_quack_extension_registered() && !is_catalog_table(parse->rtable) && parse->commandType == CMD_SELECT) { + PlannedStmt * quackPlan = quack_plan_node(parse, query_string, cursorOptions, boundParams); + if (quackPlan) { + return quackPlan; } } - elog(DEBUG3, "quack_executor_run: Failing back to PG execution"); - - if (PrevExecutorRunHook) { - PrevExecutorRunHook(queryDesc, direction, count, execute_once); + if (PrevPlannerHook) { + return PrevPlannerHook(parse, query_string, cursorOptions, boundParams); + } else { + return standard_planner(parse, query_string, cursorOptions, boundParams); } } void quack_init_hooks(void) { - PrevExecutorRunHook = ExecutorRun_hook ? ExecutorRun_hook : standard_ExecutorRun; - ExecutorRun_hook = quack_executor_run; + PrevPlannerHook = planner_hook; + planner_hook = quack_planner; } \ No newline at end of file diff --git a/src/quack_node.cpp b/src/quack_node.cpp new file mode 100644 index 00000000..ccb32e16 --- /dev/null +++ b/src/quack_node.cpp @@ -0,0 +1,156 @@ + +#include "duckdb.hpp" + +#include "quack/quack_node.hpp" +#include "quack/quack_heap_scan.hpp" +#include "quack/quack_types.hpp" + +/* global variables */ +CustomScanMethods quack_scan_scan_methods; + +/* static variables */ +static CustomExecMethods quack_scan_exec_methods; + +typedef struct QuackScanState { + CustomScanState css; /* must be first field */ + duckdb::DuckDB *duckdb; + duckdb::PreparedStatement *preparedStatement; + bool is_executed; + bool fetch_next; + duckdb::unique_ptr queryResult; + duckdb::idx_t columnCount; + duckdb::unique_ptr currentDataChunk; + duckdb::idx_t currentRow; +} QuackScanState; + +/* static callbacks */ +static Node *Quack_CreateCustomScanState(CustomScan *cscan); +static void Quack_BeginCustomScan(CustomScanState *node, EState *estate, int eflags); +static TupleTableSlot *Quack_ExecCustomScan(CustomScanState *node); +static void Quack_EndCustomScan(CustomScanState *node); +static void Quack_ReScanCustomScan(CustomScanState *node); +static void Quack_ExplainCustomScan(CustomScanState *node, List *ancestors, ExplainState *es); + +static Node * +Quack_CreateCustomScanState(CustomScan *cscan) { + QuackScanState *quackScanState = (QuackScanState *)newNode(sizeof(QuackScanState), T_CustomScanState); + CustomScanState *customScanState = &quackScanState->css; + quackScanState->duckdb = (duckdb::DuckDB *)linitial(cscan->custom_private); + quackScanState->preparedStatement = (duckdb::PreparedStatement *)lsecond(cscan->custom_private); + quackScanState->is_executed = false; + quackScanState->fetch_next = true; + customScanState->methods = &quack_scan_exec_methods; + return (Node *)customScanState; +} + +void +Quack_BeginCustomScan(CustomScanState *cscanstate, EState *estate, int eflags) { + QuackScanState *quackScanState = (QuackScanState *)cscanstate; + quackScanState->css.ss.ps.ps_ResultTupleDesc = quackScanState->css.ss.ss_ScanTupleSlot->tts_tupleDescriptor; +} + +static TupleTableSlot * +Quack_ExecCustomScan(CustomScanState *node) { + QuackScanState *quackScanState = (QuackScanState *)node; + TupleTableSlot *slot = quackScanState->css.ss.ss_ScanTupleSlot; + MemoryContext oldContext; + + + if (!quackScanState->is_executed) { + quackScanState->queryResult = quackScanState->preparedStatement->Execute(); + quackScanState->columnCount = quackScanState->queryResult->ColumnCount(); + quackScanState->is_executed = true; + } + + if (quackScanState->fetch_next) { + quackScanState->currentDataChunk = quackScanState->queryResult->Fetch(); + quackScanState->currentRow = 0; + quackScanState->fetch_next = false; + if (!quackScanState->currentDataChunk || quackScanState->currentDataChunk->size() == 0) { + MemoryContextReset(quackScanState->css.ss.ps.ps_ExprContext->ecxt_per_tuple_memory); + ExecClearTuple(slot); + return slot; + } + } + + MemoryContextReset(quackScanState->css.ss.ps.ps_ExprContext->ecxt_per_tuple_memory); + ExecClearTuple(slot); + + /* MemoryContext used for allocation */ + oldContext = MemoryContextSwitchTo(quackScanState->css.ss.ps.ps_ExprContext->ecxt_per_tuple_memory); + + for (idx_t col = 0; col < quackScanState->columnCount; col++) { + auto value = quackScanState->currentDataChunk->GetValue(col, quackScanState->currentRow); + if (value.IsNull()) { + slot->tts_isnull[col] = true; + } else { + slot->tts_isnull[col] = false; + quack::ConvertDuckToPostgresValue(slot, value, col); + } + } + + MemoryContextSwitchTo(oldContext); + + quackScanState->currentRow++; + if (quackScanState->currentRow >= quackScanState->currentDataChunk->size()) { + delete quackScanState->currentDataChunk.release(); + quackScanState->fetch_next = true; + } + + ExecStoreVirtualTuple(slot); + return slot; +} + +void +Quack_EndCustomScan(CustomScanState *node) { + QuackScanState *quackScanState = (QuackScanState *)node; + quackScanState->queryResult.reset(); + delete quackScanState->preparedStatement; + delete quackScanState->duckdb; +} + +void +Quack_ReScanCustomScan(CustomScanState *node) { +} + +void +Quack_ExplainCustomScan(CustomScanState *node, List *ancestors, ExplainState *es) { + QuackScanState *quackScanState = (QuackScanState *)node; + auto res = quackScanState->preparedStatement->Execute(); + std::string explainOutput = "\n\n"; + auto chunk = res->Fetch(); + if (!chunk || chunk->size() == 0) { + return; + } + /* Is it safe to hardcode this as result of DuckDB explain? */ + auto value = chunk->GetValue(1, 0); + explainOutput += value.GetValue(); + explainOutput += "\n"; + ExplainPropertyText("DuckDB Execution Plan", explainOutput.c_str(), es); +} + +extern "C" void +quack_init_node() { + /* setup scan methods */ + memset(&quack_scan_scan_methods, 0, sizeof(quack_scan_scan_methods)); + quack_scan_scan_methods.CustomName = "DuckDBScan"; + quack_scan_scan_methods.CreateCustomScanState = Quack_CreateCustomScanState; + RegisterCustomScanMethods(&quack_scan_scan_methods); + + /* setup exec methods */ + memset(&quack_scan_exec_methods, 0, sizeof(quack_scan_exec_methods)); + quack_scan_exec_methods.CustomName = "DuckDBScan"; + + quack_scan_exec_methods.BeginCustomScan = Quack_BeginCustomScan; + quack_scan_exec_methods.ExecCustomScan = Quack_ExecCustomScan; + quack_scan_exec_methods.EndCustomScan = Quack_EndCustomScan; + quack_scan_exec_methods.ReScanCustomScan = Quack_ReScanCustomScan; + + quack_scan_exec_methods.EstimateDSMCustomScan = NULL; + quack_scan_exec_methods.InitializeDSMCustomScan = NULL; + quack_scan_exec_methods.ReInitializeDSMCustomScan = NULL; + quack_scan_exec_methods.InitializeWorkerCustomScan = NULL; + quack_scan_exec_methods.ShutdownCustomScan = NULL; + + quack_scan_exec_methods.ExplainCustomScan = Quack_ExplainCustomScan; +} \ No newline at end of file diff --git a/src/quack_planner.cpp b/src/quack_planner.cpp new file mode 100644 index 00000000..780b9612 --- /dev/null +++ b/src/quack_planner.cpp @@ -0,0 +1,130 @@ +#include "duckdb.hpp" +#include "duckdb/parser/parsed_data/create_table_function_info.hpp" + +extern "C" { +#include "postgres.h" +#include "catalog/pg_type.h" +#include "nodes/nodes.h" +#include "nodes/makefuncs.h" +#include "utils/syscache.h" +} + +#include "quack/quack_heap_scan.hpp" +#include "quack/quack_node.hpp" +#include "quack/quack_planner.hpp" +#include "quack/quack_types.hpp" + +namespace quack { + +static duckdb::unique_ptr +quack_open_database() { + duckdb::DBConfig config; + // config.SetOption("memory_limit", "2GB"); + // config.SetOption("threads", "8"); + // config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); + return duckdb::make_uniq(nullptr, &config); +} + +} // namespace quack + +static Plan * +quack_create_plan(Query *parse, const char *query) { + auto db = quack::quack_open_database(); + + /* Add heap tables */ + db->instance->config.replacement_scans.emplace_back( + quack::PostgresHeapReplacementScan, + duckdb::make_uniq_base(parse, query)); + auto connection = duckdb::make_uniq(*db); + + // Add the postgres_scan inserted by the replacement scan + auto &context = *connection->context; + quack::PostgresHeapScanFunction heap_scan_fun; + duckdb::CreateTableFunctionInfo heap_scan_info(heap_scan_fun); + + auto &catalog = duckdb::Catalog::GetSystemCatalog(context); + context.transaction.BeginTransaction(); + catalog.CreateTableFunction(context, &heap_scan_info); + context.transaction.Commit(); + + auto preparedQuery = context.Prepare(query); + + if (preparedQuery->HasError()) { + elog(INFO, "(Quack) %s", preparedQuery->GetError().c_str()); + return nullptr; + } + + CustomScan *quackNode = makeNode(CustomScan); + + auto &preparedResultTypes = preparedQuery->GetTypes(); + + for (auto i = 0; i < preparedResultTypes.size(); i++) { + auto &column = preparedResultTypes[i]; + Oid postgresColumnOid = quack::GetPostgresDuckDBType(column.id()); + + if (OidIsValid(postgresColumnOid)) { + HeapTuple tp; + Form_pg_type typtup; + + tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(postgresColumnOid)); + if (!HeapTupleIsValid(tp)) + elog(ERROR, "cache lookup failed for type %u", postgresColumnOid); + + typtup = (Form_pg_type)GETSTRUCT(tp); + + Var *var = makeVar(INDEX_VAR, i + 1, postgresColumnOid, typtup->typtypmod, typtup->typcollation, 0); + + quackNode->custom_scan_tlist = + lappend(quackNode->custom_scan_tlist, + makeTargetEntry((Expr *)var, i + 1, (char *)preparedQuery->GetNames()[i].c_str(), false)); + + ReleaseSysCache(tp); + } + } + + quackNode->custom_private = list_make2(db.release(), preparedQuery.release()); + quackNode->methods = &quack_scan_scan_methods; + + return (Plan *)quackNode; +} + +PlannedStmt * +quack_plan_node(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams) { + + /* We need to check can we DuckDB create plan */ + Plan *quackPlan = (Plan *)castNode(CustomScan, quack_create_plan(parse, query_string)); + + if (!quackPlan) { + return nullptr; + } + + /* build the PlannedStmt result */ + PlannedStmt *result = makeNode(PlannedStmt); + + result->commandType = parse->commandType; + result->queryId = parse->queryId; + result->hasReturning = (parse->returningList != NIL); + result->hasModifyingCTE = parse->hasModifyingCTE; + result->canSetTag = parse->canSetTag; + result->transientPlan = false; + result->dependsOnRole = false; + result->parallelModeNeeded = false; + result->planTree = quackPlan; + result->rtable = NULL; + result->permInfos = NULL; + result->resultRelations = NULL; + result->appendRelations = NULL; + result->subplans = NIL; + result->rewindPlanIDs = NULL; + result->rowMarks = NIL; + result->relationOids = NIL; + result->invalItems = NIL; + result->paramExecTypes = NIL; + + /* utilityStmt should be null, but we might as well copy it */ + result->utilityStmt = parse->utilityStmt; + result->stmt_location = parse->stmt_location; + result->stmt_len = parse->stmt_len; + + return result; +} diff --git a/src/quack_select.cpp b/src/quack_select.cpp deleted file mode 100644 index bf879499..00000000 --- a/src/quack_select.cpp +++ /dev/null @@ -1,113 +0,0 @@ -#include "duckdb/parser/parsed_data/create_table_function_info.hpp" -#include "duckdb.hpp" - -extern "C" { -#include "postgres.h" -#include "fmgr.h" - -#include "access/genam.h" -#include "access/table.h" -#include "catalog/namespace.h" -#include "catalog/pg_proc.h" -#include "utils/lsyscache.h" -#include "utils/syscache.h" -#include "utils/rel.h" - -#include "quack/quack_select.h" -} - -#include "quack/quack_heap_scan.hpp" -#include "quack/quack_types.hpp" -#include "quack/quack_memory_allocator.hpp" - -namespace quack { - -static duckdb::unique_ptr -quack_open_database() { - duckdb::DBConfig config; - //config.SetOption("memory_limit", "2GB"); - //config.SetOption("threads", "8"); - //config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); - return duckdb::make_uniq(nullptr, &config); -} - -} // namespace quack - -extern "C" bool -quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t count) { - auto db = quack::quack_open_database(); - - /* Add heap tables */ - db->instance->config.replacement_scans.emplace_back( - quack::PostgresHeapReplacementScan, - duckdb::make_uniq_base(query_desc)); - auto connection = duckdb::make_uniq(*db); - - // Add the postgres_scan inserted by the replacement scan - auto &context = *connection->context; - quack::PostgresHeapScanFunction heap_scan_fun; - duckdb::CreateTableFunctionInfo heap_scan_info(heap_scan_fun); - - auto &catalog = duckdb::Catalog::GetSystemCatalog(context); - context.transaction.BeginTransaction(); - catalog.CreateTableFunction(context, &heap_scan_info); - context.transaction.Commit(); - - idx_t column_count; - - CmdType operation; - DestReceiver *dest; - - TupleTableSlot *slot = NULL; - - // FIXME: try-catch ? - - duckdb::unique_ptr res = nullptr; - - res = connection->Query(query_desc->sourceText); - if (res->HasError()) { - return false; - } - - operation = query_desc->operation; - dest = query_desc->dest; - - dest->rStartup(dest, operation, query_desc->tupDesc); - - slot = MakeTupleTableSlot(query_desc->tupDesc, &TTSOpsHeapTuple); - column_count = res->ColumnCount(); - - while (true) { - auto chunk = res->Fetch(); - - if (!chunk || chunk->size() == 0) { - break; - } - - for (idx_t row = 0; row < chunk->size(); row++) { - ExecClearTuple(slot); - - for (idx_t col = 0; col < column_count; col++) { - auto value = chunk->GetValue(col, row); - if (value.IsNull()) { - slot->tts_isnull[col] = true; - } else { - slot->tts_isnull[col] = false; - quack::ConvertDuckToPostgresValue(slot, value, col); - } - } - - ExecStoreVirtualTuple(slot); - - dest->receiveSlot(slot, dest); - - for (idx_t i = 0; i < column_count; i++) { - if (slot->tts_tupleDescriptor->attrs[i].attbyval == false) { - pfree(DatumGetPointer(slot->tts_values[i])); - } - } - } - } - dest->rShutdown(dest); - return true; -} \ No newline at end of file diff --git a/src/quack_types.cpp b/src/quack_types.cpp index c3e29390..34ca106a 100644 --- a/src/quack_types.cpp +++ b/src/quack_types.cpp @@ -12,13 +12,10 @@ extern "C" { #include "quack/quack_filter.hpp" #include "quack/quack_heap_seq_scan.hpp" #include "quack/quack_detoast.hpp" +#include "quack/quack_types.hpp" namespace quack { -// DuckDB has date starting from 1/1/1970 while PG starts from 1/1/2000 -constexpr int32_t QUACK_DUCK_DATE_OFFSET = 10957; -constexpr int64_t QUACK_DUCK_TIMESTAMP_OFFSET = INT64CONST(10957) * USECS_PER_DAY; - void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col) { Oid oid = slot->tts_tupleDescriptor->attrs[col].atttypid; @@ -54,12 +51,12 @@ ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col } case DATEOID: { duckdb::date_t date = value.GetValue(); - slot->tts_values[col] = date.days - QUACK_DUCK_DATE_OFFSET; + slot->tts_values[col] = date.days - quack::QUACK_DUCK_DATE_OFFSET; break; } case TIMESTAMPOID: { duckdb::dtime_t timestamp = value.GetValue(); - slot->tts_values[col] = timestamp.micros - QUACK_DUCK_TIMESTAMP_OFFSET; + slot->tts_values[col] = timestamp.micros - quack::QUACK_DUCK_TIMESTAMP_OFFSET; break; } case FLOAT8OID: @@ -71,7 +68,7 @@ ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col break; } default: - elog(ERROR, "Unsuported quack type: %d", oid); + elog(ERROR, "(DuckDB/ConvertDuckToPostgresValue) Unsuported quack type: %d", oid); } } @@ -97,7 +94,35 @@ ConvertPostgresToDuckColumnType(Oid type) { case TIMESTAMPOID: return duckdb::LogicalTypeId::TIMESTAMP; default: - elog(ERROR, "Unsupported quack type: %d", type); + elog(ERROR, "(DuckDB/ConvertPostgresToDuckColumnType) Unsupported quack type: %d", type); + } +} + +Oid +GetPostgresDuckDBType(duckdb::LogicalTypeId type) { + switch (type) { + case duckdb::LogicalTypeId::BOOLEAN: + return BOOLOID; + case duckdb::LogicalTypeId::TINYINT: + return CHAROID; + case duckdb::LogicalTypeId::SMALLINT: + return INT2OID; + case duckdb::LogicalTypeId::INTEGER: + return INT4OID; + case duckdb::LogicalTypeId::BIGINT: + return INT8OID; + case duckdb::LogicalTypeId::HUGEINT: + return NUMERICOID; + case duckdb::LogicalTypeId::VARCHAR: + return VARCHAROID; + case duckdb::LogicalTypeId::DATE: + return DATEOID; + case duckdb::LogicalTypeId::TIMESTAMP: + return TIMESTAMPOID; + case duckdb::LogicalTypeId::DOUBLE: + return FLOAT8OID; + default: + elog(ERROR, "(DuckDB/GetPostgresDuckDBType) Unsupported quack type: %d", static_cast(type)); } } @@ -147,7 +172,8 @@ ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) { offset); break; default: - elog(ERROR, "Unsupported quack type: %d", static_cast(result.GetType().id())); + elog(ERROR, "(DuckDB/ConvertPostgresToDuckValue) Unsupported quack type: %d", + static_cast(result.GetType().id())); break; } } @@ -271,16 +297,18 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &t auto &array_mask = duckdb::FlatVector::Validity(result); array_mask.SetInvalid(threadScanInfo.m_output_vector_size); } else { + idx_t projectionColumnIdx = parallelScanState.m_columns[parallelScanState.m_projections[idx]]; if (threadScanInfo.m_tuple_desc->attrs[parallelScanState.m_projections[idx]].attlen == -1) { bool shouldFree = false; - values[idx] = DetoastPostgresDatum(reinterpret_cast(values[idx]), parallelScanState.m_lock, - &shouldFree); - ConvertPostgresToDuckValue(values[idx], result, threadScanInfo.m_output_vector_size); + values[projectionColumnIdx] = + DetoastPostgresDatum(reinterpret_cast(values[projectionColumnIdx]), + parallelScanState.m_lock, &shouldFree); + ConvertPostgresToDuckValue(values[projectionColumnIdx], result, threadScanInfo.m_output_vector_size); if (shouldFree) { - duckdb_free(reinterpret_cast(values[idx])); + duckdb_free(reinterpret_cast(values[projectionColumnIdx])); } } else { - ConvertPostgresToDuckValue(values[idx], result, threadScanInfo.m_output_vector_size); + ConvertPostgresToDuckValue(values[projectionColumnIdx], result, threadScanInfo.m_output_vector_size); } } }