From 24c6c4056a89247e2da6debc078d27800258aa6a Mon Sep 17 00:00:00 2001 From: mkaruza Date: Mon, 22 Apr 2024 12:35:20 +0200 Subject: [PATCH 1/5] Column query / thread-safe detoasting * Read columns that are needed for query * Skip query if any table is from PG_CATALOG_NAMESPACE or PG_TOAST_NAMESPACE * Thread safe detoasting --- Makefile | 3 +- include/quack/quack_detoast.hpp | 15 +++ include/quack/quack_heap_scan.hpp | 7 +- include/quack/quack_heap_seq_scan.hpp | 31 +++-- include/quack/quack_types.hpp | 5 +- src/quack_detoast.cpp | 163 ++++++++++++++++++++++++++ src/quack_heap_scan.cpp | 16 ++- src/quack_heap_seq_scan.cpp | 16 ++- src/quack_hooks.cpp | 25 +++- src/quack_select.cpp | 7 +- src/quack_types.cpp | 67 +++++++---- 11 files changed, 299 insertions(+), 56 deletions(-) create mode 100644 include/quack/quack_detoast.hpp create mode 100644 src/quack_detoast.cpp diff --git a/Makefile b/Makefile index 72aee371..0d10d2f5 100644 --- a/Makefile +++ b/Makefile @@ -4,8 +4,9 @@ MODULE_big = quack EXTENSION = quack DATA = quack.control $(wildcard quack--*.sql) -SRCS = src/quack_heap_seq_scan.cpp \ +SRCS = src/quack_detoast.cpp \ src/quack_heap_scan.cpp \ + src/quack_heap_seq_scan.cpp \ src/quack_hooks.cpp \ src/quack_select.cpp \ src/quack_types.cpp \ diff --git a/include/quack/quack_detoast.hpp b/include/quack/quack_detoast.hpp new file mode 100644 index 00000000..792a70e7 --- /dev/null +++ b/include/quack/quack_detoast.hpp @@ -0,0 +1,15 @@ +#pragma once + +#include "duckdb.hpp" + +extern "C" { +#include "postgres.h" +} + +#include + +namespace quack { + +Datum DetoastPostgresDatum(struct varlena *value, std::mutex &lock, bool *shouldFree); + +} // namespace quack \ No newline at end of file diff --git a/include/quack/quack_heap_scan.hpp b/include/quack/quack_heap_scan.hpp index 5aa4ff32..9d7ecac3 100644 --- a/include/quack/quack_heap_scan.hpp +++ b/include/quack/quack_heap_scan.hpp @@ -14,7 +14,6 @@ extern "C" { // Postgres Relation - namespace quack { struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState { @@ -23,7 +22,7 @@ struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState { ~PostgresHeapScanLocalState() override; public: - PostgresHeapSeqScan & m_rel; + PostgresHeapSeqScan &m_rel; PostgresHeapSeqScanThreadInfo m_thread_seq_scan_info; bool m_exhausted_scan = false; }; @@ -31,7 +30,7 @@ struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState { // Global State struct PostgresHeapScanGlobalState : public duckdb::GlobalTableFunctionState { - explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation); + explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation, duckdb::TableFunctionInitInput &input); ~PostgresHeapScanGlobalState(); idx_t MaxThreads() const override { @@ -67,7 +66,7 @@ struct PostgresHeapScanFunction : public duckdb::TableFunction { // LocalTableFunctionState *lstate, GlobalTableFunctionState *gstate); static double PostgresProgress(ClientContext // &context, const FunctionData *bind_data_p, const GlobalTableFunctionState *gstate); static void PostgresHeapScanFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p, - duckdb::DataChunk &output); + duckdb::DataChunk &output); // static unique_ptr PostgresCardinality(ClientContext &context, const FunctionData *bind_data); // static idx_t PostgresGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p, // LocalTableFunctionState *local_state, GlobalTableFunctionState *global_state); static void diff --git a/include/quack/quack_heap_seq_scan.hpp b/include/quack/quack_heap_seq_scan.hpp index 4a30a9a8..dfeb9780 100644 --- a/include/quack/quack_heap_seq_scan.hpp +++ b/include/quack/quack_heap_seq_scan.hpp @@ -9,6 +9,7 @@ extern "C" { } #include +#include namespace quack { @@ -31,18 +32,23 @@ class PostgresHeapSeqScanThreadInfo { HeapTupleData m_tuple; }; +class PostgresHeapSeqParallelScanState { +public: + PostgresHeapSeqParallelScanState() + : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_total_row_count(0) { + } + BlockNumber AssignNextBlockNumber(); + std::mutex m_lock; + BlockNumber m_nblocks; + BlockNumber m_last_assigned_block_number; + duckdb::map m_columns; + duckdb::vector m_projections; + duckdb::TableFilterSet *m_filters = nullptr; + std::atomic m_total_row_count; +}; + class PostgresHeapSeqScan { private: - class ParallelScanState { - public: - ParallelScanState() : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber) { - } - BlockNumber AssignNextBlockNumber(); - std::mutex m_lock; - BlockNumber m_nblocks; - BlockNumber m_last_assigned_block_number; - }; - public: PostgresHeapSeqScan(RangeTblEntry *table); ~PostgresHeapSeqScan(); @@ -52,7 +58,8 @@ class PostgresHeapSeqScan { PostgresHeapSeqScan(PostgresHeapSeqScan &&other); public: - void InitParallelScanState(); + void InitParallelScanState(const duckdb::vector &columns, + const duckdb::vector &projections, duckdb::TableFilterSet *filters); void SetSnapshot(Snapshot snapshot) { m_snapshot = snapshot; @@ -70,7 +77,7 @@ class PostgresHeapSeqScan { private: Relation m_rel = nullptr; Snapshot m_snapshot = nullptr; - ParallelScanState m_parallel_scan_state; + PostgresHeapSeqParallelScanState m_parallel_scan_state; }; } // namespace quack \ No newline at end of file diff --git a/include/quack/quack_types.hpp b/include/quack/quack_types.hpp index 05f38603..eeb7e4a8 100644 --- a/include/quack/quack_types.hpp +++ b/include/quack/quack_types.hpp @@ -7,9 +7,12 @@ extern "C" { #include "executor/tuptable.h" } +#include "quack/quack_heap_seq_scan.hpp" + namespace quack { duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type); void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset); void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col); -void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, HeapTupleData *slot, idx_t offset); +void InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo, + PostgresHeapSeqParallelScanState ¶llelScanState); } // namespace quack \ No newline at end of file diff --git a/src/quack_detoast.cpp b/src/quack_detoast.cpp new file mode 100644 index 00000000..c3d39663 --- /dev/null +++ b/src/quack_detoast.cpp @@ -0,0 +1,163 @@ +#include "duckdb.hpp" + +extern "C" { +#include "postgres.h" +#include "varatt.h" + +#ifdef USE_LZ4 +#include +#endif + +#include "access/detoast.h" +#include "access/table.h" +#include "access/tableam.h" +#include "access/toast_internals.h" +#include "common/pg_lzcompress.h" +#include "utils/expandeddatum.h" +} + +#include "quack/quack_types.hpp" +#include "quack/quack_detoast.hpp" + +/* + * Following functions are direct logic found in postgres code but for duckdb execution they are needed to be thread + * safe. Functions as palloc/pfree are exchanged with duckdb_malloc/duckdb_free. Access to toast table is protected with + * lock also for thread safe reasons. This is initial implementation but should be revisisted in future for better + * performances. + */ + +namespace quack { + +struct varlena * +_pglz_decompress_datum(const struct varlena *value) { + struct varlena *result; + int32 rawsize; + + result = (struct varlena *)duckdb_malloc(VARDATA_COMPRESSED_GET_EXTSIZE(value) + VARHDRSZ); + + rawsize = pglz_decompress((char *)value + VARHDRSZ_COMPRESSED, VARSIZE(value) - VARHDRSZ_COMPRESSED, + VARDATA(result), VARDATA_COMPRESSED_GET_EXTSIZE(value), true); + if (rawsize < 0) + ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg_internal("compressed pglz data is corrupt"))); + + SET_VARSIZE(result, rawsize + VARHDRSZ); + + return result; +} + +struct varlena * +_lz4_decompress_datum(const struct varlena *value) { +#ifndef USE_LZ4 + NO_LZ4_SUPPORT(); + return NULL; /* keep compiler quiet */ +#else + int32 rawsize; + struct varlena *result; + + result = (struct varlena *)duckdb_malloc(VARDATA_COMPRESSED_GET_EXTSIZE(value) + VARHDRSZ); + + rawsize = LZ4_decompress_safe((char *)value + VARHDRSZ_COMPRESSED, VARDATA(result), + VARSIZE(value) - VARHDRSZ_COMPRESSED, VARDATA_COMPRESSED_GET_EXTSIZE(value)); + if (rawsize < 0) + ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg_internal("compressed lz4 data is corrupt"))); + + SET_VARSIZE(result, rawsize + VARHDRSZ); + + return result; +#endif +} + +static struct varlena * +_toast_decompress_datum(struct varlena *attr) { + switch (TOAST_COMPRESS_METHOD(attr)) { + case TOAST_PGLZ_COMPRESSION_ID: + return _pglz_decompress_datum(attr); + case TOAST_LZ4_COMPRESSION_ID: + return _lz4_decompress_datum(attr); + default: + elog(ERROR, "invalid compression method id %d", TOAST_COMPRESS_METHOD(attr)); + return NULL; /* keep compiler quiet */ + } +} + +static struct varlena * +_toast_fetch_datum(struct varlena *attr, std::mutex &lock) { + Relation toastrel; + struct varlena *result; + struct varatt_external toast_pointer; + int32 attrsize; + + if (!VARATT_IS_EXTERNAL_ONDISK(attr)) + elog(ERROR, "toast_fetch_datum shouldn't be called for non-ondisk datums"); + + /* Must copy to access aligned fields */ + VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr); + + attrsize = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer); + + result = (struct varlena *)duckdb_malloc(attrsize + VARHDRSZ); + + if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)) { + SET_VARSIZE_COMPRESSED(result, attrsize + VARHDRSZ); + } else { + SET_VARSIZE(result, attrsize + VARHDRSZ); + } + + if (attrsize == 0) + return result; + + lock.lock(); + toastrel = table_open(toast_pointer.va_toastrelid, AccessShareLock); + table_relation_fetch_toast_slice(toastrel, toast_pointer.va_valueid, attrsize, 0, attrsize, result); + table_close(toastrel, AccessShareLock); + lock.unlock(); + + return result; +} + +Datum +DetoastPostgresDatum(struct varlena *attr, std::mutex &lock, bool *shouldFree) { + struct varlena *toastedValue = nullptr; + *shouldFree = true; + if (VARATT_IS_EXTERNAL_ONDISK(attr)) { + toastedValue = _toast_fetch_datum(attr, lock); + if (VARATT_IS_COMPRESSED(toastedValue)) { + struct varlena *tmp = toastedValue; + toastedValue = _toast_decompress_datum(tmp); + duckdb_free(tmp); + } + } else if (VARATT_IS_EXTERNAL_INDIRECT(attr)) { + struct varatt_indirect redirect; + VARATT_EXTERNAL_GET_POINTER(redirect, attr); + toastedValue = (struct varlena *)redirect.pointer; + toastedValue = reinterpret_cast(DetoastPostgresDatum(attr, lock, shouldFree)); + if (attr == (struct varlena *)redirect.pointer) { + struct varlena *result; + result = (struct varlena *)(VARSIZE_ANY(attr)); + memcpy(result, attr, VARSIZE_ANY(attr)); + toastedValue = result; + } + } else if (VARATT_IS_EXTERNAL_EXPANDED(attr)) { + ExpandedObjectHeader *eoh; + Size resultsize; + eoh = DatumGetEOHP(PointerGetDatum(attr)); + resultsize = EOH_get_flat_size(eoh); + toastedValue = (struct varlena *)duckdb_malloc(resultsize); + EOH_flatten_into(eoh, (void *)toastedValue, resultsize); + } else if (VARATT_IS_COMPRESSED(attr)) { + toastedValue = _toast_decompress_datum(attr); + } else if (VARATT_IS_SHORT(attr)) { + Size data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT; + Size new_size = data_size + VARHDRSZ; + toastedValue = (struct varlena *)duckdb_malloc(new_size); + SET_VARSIZE(toastedValue, new_size); + memcpy(VARDATA(toastedValue), VARDATA_SHORT(attr), data_size); + } else { + toastedValue = attr; + *shouldFree = false; + } + + return reinterpret_cast(toastedValue); +} + +} // namespace quack \ No newline at end of file diff --git a/src/quack_heap_scan.cpp b/src/quack_heap_scan.cpp index 09d06065..98ab7d16 100644 --- a/src/quack_heap_scan.cpp +++ b/src/quack_heap_scan.cpp @@ -28,9 +28,10 @@ PostgresHeapScanFunctionData::~PostgresHeapScanFunctionData() { // PostgresHeapScanGlobalState // -PostgresHeapScanGlobalState::PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation) { - relation.InitParallelScanState(); +PostgresHeapScanGlobalState::PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation, + duckdb::TableFunctionInitInput &input) { elog(DEBUG3, "-- (DuckDB/PostgresHeapScanGlobalState) Running %lu threads -- ", MaxThreads()); + relation.InitParallelScanState(input.column_ids, input.projection_ids, input.filters.get()); } PostgresHeapScanGlobalState::~PostgresHeapScanGlobalState() { @@ -58,7 +59,9 @@ PostgresHeapScanFunction::PostgresHeapScanFunction() PostgresHeapInitLocal) { named_parameters["table"] = duckdb::LogicalType::POINTER; named_parameters["snapshot"] = duckdb::LogicalType::POINTER; - // projection_pushdown = true; + projection_pushdown = true; + // filter_pushdown = true; + // filter_prune = true; } duckdb::unique_ptr @@ -94,7 +97,7 @@ duckdb::unique_ptr PostgresHeapScanFunction::PostgresHeapInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input) { auto &bind_data = input.bind_data->CastNoConst(); - return duckdb::make_uniq(bind_data.m_relation); + return duckdb::make_uniq(bind_data.m_relation, input); } duckdb::unique_ptr @@ -154,9 +157,10 @@ FindMatchingHeapRelation(List *tables, const duckdb::string &to_find) { /* This doesn't have an access method handler, we cant read from this */ RelationClose(rel); return nullptr; + } else { + RelationClose(rel); + return table; } - RelationClose(rel); - return table; } RelationClose(rel); } diff --git a/src/quack_heap_seq_scan.cpp b/src/quack_heap_seq_scan.cpp index 3cf90879..38184202 100644 --- a/src/quack_heap_seq_scan.cpp +++ b/src/quack_heap_seq_scan.cpp @@ -55,8 +55,16 @@ PostgresHeapSeqScan::PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanIn } void -PostgresHeapSeqScan::InitParallelScanState() { +PostgresHeapSeqScan::InitParallelScanState(const duckdb::vector &columns, + const duckdb::vector &projections, + duckdb::TableFilterSet *filters) { m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel); + /* We need ordered columns ids for tuple fetch */ + for(duckdb::idx_t i = 0; i < columns.size(); i++) { + m_parallel_scan_state.m_columns[columns[i]] = i; + } + m_parallel_scan_state.m_projections = projections; + m_parallel_scan_state.m_filters = filters; } bool @@ -113,9 +121,7 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc } pgstat_count_heap_getnext(m_rel); - - InsertTupleIntoChunk(output, threadScanInfo.m_tuple_desc, &threadScanInfo.m_tuple, - threadScanInfo.m_output_vector_size); + InsertTupleIntoChunk(output, threadScanInfo, m_parallel_scan_state); } /* No more items on current page */ @@ -150,7 +156,7 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc } BlockNumber -PostgresHeapSeqScan::ParallelScanState::AssignNextBlockNumber() { +PostgresHeapSeqParallelScanState::AssignNextBlockNumber() { m_lock.lock(); BlockNumber block_number = InvalidBlockNumber; if (m_last_assigned_block_number == InvalidBlockNumber) { diff --git a/src/quack_hooks.cpp b/src/quack_hooks.cpp index 93706a58..d280a157 100644 --- a/src/quack_hooks.cpp +++ b/src/quack_hooks.cpp @@ -1,6 +1,8 @@ extern "C" { #include "postgres.h" +#include "catalog/pg_namespace.h" #include "commands/extension.h" +#include "utils/rel.h" } #include "quack/quack.h" @@ -13,14 +15,35 @@ is_quack_extension_registered() { return get_extension_oid("quack", true) != InvalidOid; } +static bool +is_catalog_table(List *tables) { + ListCell *lc; + foreach (lc, tables) { + RangeTblEntry *table = (RangeTblEntry *)lfirst(lc); + if (table->relid) { + auto rel = RelationIdGetRelation(table->relid); + auto namespaceOid = RelationGetNamespace(rel); + if (namespaceOid == PG_CATALOG_NAMESPACE || namespaceOid == PG_TOAST_NAMESPACE) { + RelationClose(rel); + return true; + } + RelationClose(rel); + } + } + return false; +} + static void quack_executor_run(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) { - if (is_quack_extension_registered() && queryDesc->operation == CMD_SELECT) { + if (is_quack_extension_registered() && !is_catalog_table(queryDesc->plannedstmt->rtable) && + queryDesc->operation == CMD_SELECT) { if (quack_execute_select(queryDesc, direction, count)) { return; } } + elog(DEBUG3, "quack_executor_run: Failing back to PG execution"); + if (PrevExecutorRunHook) { PrevExecutorRunHook(queryDesc, direction, count, execute_once); } diff --git a/src/quack_select.cpp b/src/quack_select.cpp index 53b2c4d8..c607e6ad 100644 --- a/src/quack_select.cpp +++ b/src/quack_select.cpp @@ -25,7 +25,7 @@ namespace quack { static duckdb::unique_ptr quack_open_database() { duckdb::DBConfig config; - //config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); + // config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); return duckdb::make_uniq(nullptr, &config); } @@ -59,7 +59,10 @@ quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t co TupleTableSlot *slot = NULL; // FIXME: try-catch ? - auto res = connection->Query(query_desc->sourceText); + + duckdb::unique_ptr res = nullptr; + + res = connection->Query(query_desc->sourceText); if (res->HasError()) { return false; } diff --git a/src/quack_types.cpp b/src/quack_types.cpp index 11c9ac34..d469e81d 100644 --- a/src/quack_types.cpp +++ b/src/quack_types.cpp @@ -2,12 +2,15 @@ extern "C" { #include "postgres.h" +#include "fmgr.h" #include "miscadmin.h" #include "catalog/pg_type.h" #include "executor/tuptable.h" } #include "quack/quack.h" +#include "quack/quack_heap_seq_scan.hpp" +#include "quack/quack_detoast.hpp" namespace quack { @@ -149,15 +152,15 @@ ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) { } } -typedef struct HeapTuplePageReadState { +typedef struct HeapTupleReadState { bool m_slow = 0; - int m_nvalid = 0; - uint32 m_offset = 0; -} HeapTuplePageReadState; + int m_last_tuple_att = 0; + uint32 m_page_tuple_offset = 0; +} HeapTupleReadState; static Datum -HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePageReadState &heapTupleReadState, - int natts, bool *isNull) { +HeapTupleFetchNextColumnDatum(TupleDesc tupleDesc, HeapTuple tuple, HeapTupleReadState &heapTupleReadState, int attNum, + bool *isNull) { HeapTupleHeader tup = tuple->t_data; bool hasnulls = HeapTupleHasNulls(tuple); @@ -168,23 +171,21 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage bool slow = false; Datum value = (Datum)0; - /* We can only fetch as many attributes as the tuple has. */ - natts = Min(HeapTupleHeaderGetNatts(tuple->t_data), natts); + attnum = heapTupleReadState.m_last_tuple_att; - attnum = heapTupleReadState.m_nvalid; if (attnum == 0) { /* Start from the first attribute */ off = 0; heapTupleReadState.m_slow = false; } else { /* Restore state from previous execution */ - off = heapTupleReadState.m_offset; + off = heapTupleReadState.m_page_tuple_offset; slow = heapTupleReadState.m_slow; } tp = (char *)tup + tup->t_hoff; - for (; attnum < natts; attnum++) { + for (; attnum < attNum; attnum++) { Form_pg_attribute thisatt = TupleDescAttr(tupleDesc, attnum); if (hasnulls && att_isnull(attnum, bp)) { @@ -199,7 +200,6 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage if (!slow && thisatt->attcacheoff >= 0) { off = thisatt->attcacheoff; } else if (thisatt->attlen == -1) { - if (!slow && off == att_align_nominal(off, thisatt->attalign)) { thisatt->attcacheoff = off; } else { @@ -208,7 +208,6 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage } } else { off = att_align_nominal(off, thisatt->attalign); - if (!slow) { thisatt->attcacheoff = off; } @@ -223,8 +222,8 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage } } - heapTupleReadState.m_nvalid = attnum; - heapTupleReadState.m_offset = off; + heapTupleReadState.m_last_tuple_att = attNum; + heapTupleReadState.m_page_tuple_offset = off; if (slow) { heapTupleReadState.m_slow = true; @@ -236,19 +235,39 @@ HeapTupleFetchNextDatumValue(TupleDesc tupleDesc, HeapTuple tuple, HeapTuplePage } void -InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tupleDesc, HeapTupleData *slot, idx_t offset) { - HeapTuplePageReadState heapTupleReadState = {}; - for (int i = 0; i < tupleDesc->natts; i++) { - auto &result = output.data[i]; - bool isNull = false; - Datum value = HeapTupleFetchNextDatumValue(tupleDesc, slot, heapTupleReadState, i + 1, &isNull); - if (isNull) { +InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo, + PostgresHeapSeqParallelScanState ¶llelScanState) { + HeapTupleReadState heapTupleReadState = {}; + + Datum *values = (Datum *)duckdb_malloc(sizeof(Datum) * parallelScanState.m_columns.size()); + bool *nulls = (bool *)duckdb_malloc(sizeof(bool) * parallelScanState.m_columns.size()); + + for (auto const &[columnIdx, valueIdx] : parallelScanState.m_columns) { + values[valueIdx] = HeapTupleFetchNextColumnDatum(threadScanInfo.m_tuple_desc, &threadScanInfo.m_tuple, + heapTupleReadState, columnIdx + 1, &nulls[valueIdx]); + auto &result = output.data[valueIdx]; + if (nulls[valueIdx]) { auto &array_mask = duckdb::FlatVector::Validity(result); - array_mask.SetInvalid(offset); + array_mask.SetInvalid(threadScanInfo.m_output_vector_size); } else { - ConvertPostgresToDuckValue(value, result, offset); + if (threadScanInfo.m_tuple_desc->attrs[columnIdx].attlen == -1) { + bool shouldFree = false; + values[valueIdx] = + DetoastPostgresDatum(reinterpret_cast(values[valueIdx]), parallelScanState.m_lock, &shouldFree); + ConvertPostgresToDuckValue(values[valueIdx] , result, threadScanInfo.m_output_vector_size); + if (shouldFree) { + duckdb_free(reinterpret_cast(values[valueIdx])); + } + } else { + ConvertPostgresToDuckValue(values[valueIdx], result, threadScanInfo.m_output_vector_size); + } } } + + parallelScanState.m_total_row_count++; + + duckdb_free(values); + duckdb_free(nulls); } } // namespace quack From 28e46d332af788ee6329e5f547fb1cf034b52123 Mon Sep 17 00:00:00 2001 From: mkaruza Date: Fri, 26 Apr 2024 12:54:21 +0200 Subject: [PATCH 2/5] Tuple filtering on page level * Filter tuple on page level --- Makefile | 3 +- include/quack/quack_filter.hpp | 11 ++++ include/quack/quack_heap_seq_scan.hpp | 16 ++++- include/quack/quack_select.h | 1 - src/quack_detoast.cpp | 6 +- src/quack_filter.cpp | 88 +++++++++++++++++++++++++++ src/quack_heap_scan.cpp | 4 +- src/quack_heap_seq_scan.cpp | 33 ++++++++-- src/quack_select.cpp | 4 +- src/quack_types.cpp | 37 ++++++++--- 10 files changed, 179 insertions(+), 24 deletions(-) create mode 100644 include/quack/quack_filter.hpp create mode 100644 src/quack_filter.cpp diff --git a/Makefile b/Makefile index 0d10d2f5..fce4a546 100644 --- a/Makefile +++ b/Makefile @@ -5,6 +5,7 @@ EXTENSION = quack DATA = quack.control $(wildcard quack--*.sql) SRCS = src/quack_detoast.cpp \ + src/quack_filter.cpp \ src/quack_heap_scan.cpp \ src/quack_heap_seq_scan.cpp \ src/quack_hooks.cpp \ @@ -35,7 +36,7 @@ else QUACK_BUILD_DUCKDB = release endif -override PG_CPPFLAGS += -Iinclude -Ithird_party/duckdb/src/include -std=c++17 ${QUACK_BUILD_CXX_FLAGS} +override PG_CPPFLAGS += -Iinclude -Ithird_party/duckdb/src/include -std=c++17 -Wno-sign-compare ${QUACK_BUILD_CXX_FLAGS} SHLIB_LINK += -Wl,-rpath,$(PG_LIB)/ -lpq -L$(PG_LIB) -lduckdb -Lthird_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src -lstdc++ diff --git a/include/quack/quack_filter.hpp b/include/quack/quack_filter.hpp new file mode 100644 index 00000000..456cdb7a --- /dev/null +++ b/include/quack/quack_filter.hpp @@ -0,0 +1,11 @@ +#pragma once + +#include "duckdb.hpp" + +extern "C" { +#include "postgres.h" +} + +namespace quack { +bool ApplyValueFilter(duckdb::TableFilter &filter, Datum &value, bool isNull, Oid typeOid); +} // namespace quack \ No newline at end of file diff --git a/include/quack/quack_heap_seq_scan.hpp b/include/quack/quack_heap_seq_scan.hpp index dfeb9780..a209702d 100644 --- a/include/quack/quack_heap_seq_scan.hpp +++ b/include/quack/quack_heap_seq_scan.hpp @@ -6,6 +6,7 @@ extern "C" { #include "postgres.h" #include "access/tableam.h" #include "access/heapam.h" +#include "storage/bufmgr.h" } #include @@ -33,18 +34,29 @@ class PostgresHeapSeqScanThreadInfo { }; class PostgresHeapSeqParallelScanState { +private: + static int const k_max_prefetch_block_number = 32; + public: PostgresHeapSeqParallelScanState() - : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_total_row_count(0) { + : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_total_row_count(0), + m_last_prefetch_block(0), m_strategy(nullptr) { + } + ~PostgresHeapSeqParallelScanState() { + if (m_strategy) + pfree(m_strategy); } BlockNumber AssignNextBlockNumber(); + void PrefetchNextRelationPages(Relation rel); std::mutex m_lock; BlockNumber m_nblocks; BlockNumber m_last_assigned_block_number; duckdb::map m_columns; - duckdb::vector m_projections; + duckdb::map m_projections; duckdb::TableFilterSet *m_filters = nullptr; std::atomic m_total_row_count; + BlockNumber m_last_prefetch_block; + BufferAccessStrategy m_strategy; }; class PostgresHeapSeqScan { diff --git a/include/quack/quack_select.h b/include/quack/quack_select.h index 9ea8b574..7490bdcf 100644 --- a/include/quack/quack_select.h +++ b/include/quack/quack_select.h @@ -2,7 +2,6 @@ extern "C" { #include "postgres.h" - #include "executor/executor.h" } diff --git a/src/quack_detoast.cpp b/src/quack_detoast.cpp index c3d39663..40bc1a9c 100644 --- a/src/quack_detoast.cpp +++ b/src/quack_detoast.cpp @@ -2,6 +2,7 @@ extern "C" { #include "postgres.h" +#include "pg_config.h" #include "varatt.h" #ifdef USE_LZ4 @@ -48,7 +49,6 @@ _pglz_decompress_datum(const struct varlena *value) { struct varlena * _lz4_decompress_datum(const struct varlena *value) { #ifndef USE_LZ4 - NO_LZ4_SUPPORT(); return NULL; /* keep compiler quiet */ #else int32 rawsize; @@ -69,7 +69,9 @@ _lz4_decompress_datum(const struct varlena *value) { static struct varlena * _toast_decompress_datum(struct varlena *attr) { - switch (TOAST_COMPRESS_METHOD(attr)) { + ToastCompressionId cmid; + cmid = (ToastCompressionId)TOAST_COMPRESS_METHOD(attr); + switch (cmid) { case TOAST_PGLZ_COMPRESSION_ID: return _pglz_decompress_datum(attr); case TOAST_LZ4_COMPRESSION_ID: diff --git a/src/quack_filter.cpp b/src/quack_filter.cpp new file mode 100644 index 00000000..946b9720 --- /dev/null +++ b/src/quack_filter.cpp @@ -0,0 +1,88 @@ +#include "duckdb.hpp" +#include "duckdb/planner/filter/constant_filter.hpp" + +extern "C" { +#include "postgres.h" +#include "catalog/pg_type.h" +} + +namespace quack { + +template +bool +TemplatedFilterOperation(Datum &value, const duckdb::Value &constant) { + return OP::Operation((T)value, constant.GetValueUnsafe()); +} + +template +static bool +FilterOperationSwitch(Datum &value, duckdb::Value &constant, Oid typeOid) { + switch (typeOid) { + case BOOLOID: + return TemplatedFilterOperation(value, constant); + break; + case CHAROID: + return TemplatedFilterOperation(value, constant); + break; + case INT2OID: + return TemplatedFilterOperation(value, constant); + break; + case INT4OID: + return TemplatedFilterOperation(value, constant); + break; + case INT8OID: + return TemplatedFilterOperation(value, constant); + break; + default: + elog(ERROR, "Unsupported quack type: %d", typeOid); + } +} + +bool +ApplyValueFilter(duckdb::TableFilter &filter, Datum &value, bool isNull, Oid typeOid) { + switch (filter.filter_type) { + case duckdb::TableFilterType::CONJUNCTION_AND: { + auto &conjunction = filter.Cast(); + bool valueFilterResult = true; + for (auto &child_filter : conjunction.child_filters) { + valueFilterResult &= ApplyValueFilter(*child_filter, value, isNull, typeOid); + } + return valueFilterResult; + break; + } + case duckdb::TableFilterType::CONSTANT_COMPARISON: { + auto &constant_filter = filter.Cast(); + switch (constant_filter.comparison_type) { + case duckdb::ExpressionType::COMPARE_EQUAL: + return FilterOperationSwitch(value, constant_filter.constant, typeOid); + break; + case duckdb::ExpressionType::COMPARE_LESSTHAN: + return FilterOperationSwitch(value, constant_filter.constant, typeOid); + break; + case duckdb::ExpressionType::COMPARE_LESSTHANOREQUALTO: + return FilterOperationSwitch(value, constant_filter.constant, typeOid); + break; + case duckdb::ExpressionType::COMPARE_GREATERTHAN: + return FilterOperationSwitch(value, constant_filter.constant, typeOid); + break; + case duckdb::ExpressionType::COMPARE_GREATERTHANOREQUALTO: + return FilterOperationSwitch(value, constant_filter.constant, typeOid); + break; + default: + D_ASSERT(0); + } + break; + } + case duckdb::TableFilterType::IS_NOT_NULL: + return isNull == false; + break; + case duckdb::TableFilterType::IS_NULL: + return isNull == true; + break; + default: + D_ASSERT(0); + break; + } +} + +} // namespace quack \ No newline at end of file diff --git a/src/quack_heap_scan.cpp b/src/quack_heap_scan.cpp index 98ab7d16..7da2a2ee 100644 --- a/src/quack_heap_scan.cpp +++ b/src/quack_heap_scan.cpp @@ -60,8 +60,8 @@ PostgresHeapScanFunction::PostgresHeapScanFunction() named_parameters["table"] = duckdb::LogicalType::POINTER; named_parameters["snapshot"] = duckdb::LogicalType::POINTER; projection_pushdown = true; - // filter_pushdown = true; - // filter_prune = true; + filter_pushdown = true; + filter_prune = true; } duckdb::unique_ptr diff --git a/src/quack_heap_seq_scan.cpp b/src/quack_heap_seq_scan.cpp index 38184202..dd79f8f7 100644 --- a/src/quack_heap_seq_scan.cpp +++ b/src/quack_heap_seq_scan.cpp @@ -40,6 +40,7 @@ PostgresHeapSeqScan::IsValid() const { } TupleDesc + PostgresHeapSeqScan::GetTupleDesc() { return RelationGetDescr(m_rel); } @@ -60,10 +61,13 @@ PostgresHeapSeqScan::InitParallelScanState(const duckdb::vector 0 && threadScanInfo.m_output_vector_size < STANDARD_VECTOR_SIZE; - threadScanInfo.m_page_tuples_left--, threadScanInfo.m_current_tuple_index++, - threadScanInfo.m_output_vector_size++) { + threadScanInfo.m_page_tuples_left--, threadScanInfo.m_current_tuple_index++) { bool visible = true; ItemId lpp = PageGetItemId(page, threadScanInfo.m_current_tuple_index); @@ -168,6 +171,24 @@ PostgresHeapSeqParallelScanState::AssignNextBlockNumber() { return block_number; } +void +PostgresHeapSeqParallelScanState::PrefetchNextRelationPages(Relation rel) { + + BlockNumber last_batch_prefetch_block_num = m_last_prefetch_block + k_max_prefetch_block_number > m_nblocks + ? m_nblocks + : m_last_prefetch_block + k_max_prefetch_block_number; + + if (m_last_assigned_block_number != InvalidBlockNumber && + (m_last_prefetch_block - m_last_assigned_block_number) > 8) + return; + + + for (BlockNumber i = m_last_prefetch_block; i < last_batch_prefetch_block_num; i++) { + PrefetchBuffer(rel, MAIN_FORKNUM, m_last_prefetch_block); + m_last_prefetch_block++; + } +} + PostgresHeapSeqScanThreadInfo::PostgresHeapSeqScanThreadInfo() : m_tuple_desc(NULL), m_inited(false), m_read_next_page(true), m_block_number(InvalidBlockNumber), m_buffer(InvalidBuffer), m_current_tuple_index(InvalidOffsetNumber), m_page_tuples_left(0) { diff --git a/src/quack_select.cpp b/src/quack_select.cpp index c607e6ad..d69557dd 100644 --- a/src/quack_select.cpp +++ b/src/quack_select.cpp @@ -25,7 +25,9 @@ namespace quack { static duckdb::unique_ptr quack_open_database() { duckdb::DBConfig config; - // config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); + //config.SetOption("memory_limit", "2GB"); + //config.SetOption("threads", "8"); + //config.allocator = duckdb::make_uniq(QuackAllocate, QuackFree, QuackReallocate, nullptr); return duckdb::make_uniq(nullptr, &config); } diff --git a/src/quack_types.cpp b/src/quack_types.cpp index d469e81d..f8190284 100644 --- a/src/quack_types.cpp +++ b/src/quack_types.cpp @@ -9,6 +9,7 @@ extern "C" { } #include "quack/quack.h" +#include "quack/quack_filter.hpp" #include "quack/quack_heap_seq_scan.hpp" #include "quack/quack_detoast.hpp" @@ -119,7 +120,6 @@ AppendString(duckdb::Vector &result, Datum value, idx_t offset) { void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) { - switch (result.GetType().id()) { case duckdb::LogicalTypeId::BOOLEAN: Append(result, DatumGetBool(value), offset); @@ -242,28 +242,47 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &t Datum *values = (Datum *)duckdb_malloc(sizeof(Datum) * parallelScanState.m_columns.size()); bool *nulls = (bool *)duckdb_malloc(sizeof(bool) * parallelScanState.m_columns.size()); + bool validTuple = true; + for (auto const &[columnIdx, valueIdx] : parallelScanState.m_columns) { values[valueIdx] = HeapTupleFetchNextColumnDatum(threadScanInfo.m_tuple_desc, &threadScanInfo.m_tuple, heapTupleReadState, columnIdx + 1, &nulls[valueIdx]); - auto &result = output.data[valueIdx]; - if (nulls[valueIdx]) { + if (parallelScanState.m_filters && + (parallelScanState.m_filters->filters.find(columnIdx) != parallelScanState.m_filters->filters.end())) { + auto &filter = parallelScanState.m_filters->filters[valueIdx]; + validTuple = ApplyValueFilter(*filter, values[valueIdx], nulls[valueIdx], + threadScanInfo.m_tuple_desc->attrs[columnIdx].atttypid); + } + + if (!validTuple) { + break; + } + } + + for (idx_t idx = 0; validTuple && idx < parallelScanState.m_projections.size(); idx++) { + auto &result = output.data[idx]; + if (nulls[idx]) { auto &array_mask = duckdb::FlatVector::Validity(result); array_mask.SetInvalid(threadScanInfo.m_output_vector_size); } else { - if (threadScanInfo.m_tuple_desc->attrs[columnIdx].attlen == -1) { + if (threadScanInfo.m_tuple_desc->attrs[parallelScanState.m_projections[idx]].attlen == -1) { bool shouldFree = false; - values[valueIdx] = - DetoastPostgresDatum(reinterpret_cast(values[valueIdx]), parallelScanState.m_lock, &shouldFree); - ConvertPostgresToDuckValue(values[valueIdx] , result, threadScanInfo.m_output_vector_size); + values[idx] = DetoastPostgresDatum(reinterpret_cast(values[idx]), parallelScanState.m_lock, + &shouldFree); + ConvertPostgresToDuckValue(values[idx], result, threadScanInfo.m_output_vector_size); if (shouldFree) { - duckdb_free(reinterpret_cast(values[valueIdx])); + duckdb_free(reinterpret_cast(values[idx])); } } else { - ConvertPostgresToDuckValue(values[valueIdx], result, threadScanInfo.m_output_vector_size); + ConvertPostgresToDuckValue(values[idx], result, threadScanInfo.m_output_vector_size); } } } + if (validTuple) { + threadScanInfo.m_output_vector_size++; + } + parallelScanState.m_total_row_count++; duckdb_free(values); From ebe5014a21e40f6d93a39085b6d38826e0c283a0 Mon Sep 17 00:00:00 2001 From: mkaruza Date: Sat, 27 Apr 2024 16:22:05 +0200 Subject: [PATCH 3/5] Query COUNT(*) * COUNT(*) doesn't require any columns to be retrieved so we only count tuples that pass visibility without fetching. --- include/quack/quack_heap_seq_scan.hpp | 5 +++-- src/quack_heap_seq_scan.cpp | 8 ++++++++ src/quack_types.cpp | 5 +++++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/include/quack/quack_heap_seq_scan.hpp b/include/quack/quack_heap_seq_scan.hpp index a209702d..4021ef33 100644 --- a/include/quack/quack_heap_seq_scan.hpp +++ b/include/quack/quack_heap_seq_scan.hpp @@ -39,8 +39,8 @@ class PostgresHeapSeqParallelScanState { public: PostgresHeapSeqParallelScanState() - : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_total_row_count(0), - m_last_prefetch_block(0), m_strategy(nullptr) { + : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_count_tuple_only(false), + m_total_row_count(0), m_last_prefetch_block(0), m_strategy(nullptr) { } ~PostgresHeapSeqParallelScanState() { if (m_strategy) @@ -51,6 +51,7 @@ class PostgresHeapSeqParallelScanState { std::mutex m_lock; BlockNumber m_nblocks; BlockNumber m_last_assigned_block_number; + bool m_count_tuple_only; duckdb::map m_columns; duckdb::map m_projections; duckdb::TableFilterSet *m_filters = nullptr; diff --git a/src/quack_heap_seq_scan.cpp b/src/quack_heap_seq_scan.cpp index dd79f8f7..562498da 100644 --- a/src/quack_heap_seq_scan.cpp +++ b/src/quack_heap_seq_scan.cpp @@ -60,13 +60,21 @@ PostgresHeapSeqScan::InitParallelScanState(const duckdb::vector &projections, duckdb::TableFilterSet *filters) { m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel); + + if (columns.size() == 1 && columns[0] == UINT64_MAX) { + m_parallel_scan_state.m_count_tuple_only = true; + return; + } + /* We need ordered columns ids for tuple fetch */ for (duckdb::idx_t i = 0; i < columns.size(); i++) { m_parallel_scan_state.m_columns[columns[i]] = i; } + for (duckdb::idx_t i = 0; i < columns.size(); i++) { m_parallel_scan_state.m_projections[projections[i]] = columns[i]; } + //m_parallel_scan_state.PrefetchNextRelationPages(m_rel); m_parallel_scan_state.m_filters = filters; } diff --git a/src/quack_types.cpp b/src/quack_types.cpp index f8190284..d6c8fc48 100644 --- a/src/quack_types.cpp +++ b/src/quack_types.cpp @@ -239,6 +239,11 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &t PostgresHeapSeqParallelScanState ¶llelScanState) { HeapTupleReadState heapTupleReadState = {}; + if (parallelScanState.m_count_tuple_only) { + threadScanInfo.m_output_vector_size++; + return; + } + Datum *values = (Datum *)duckdb_malloc(sizeof(Datum) * parallelScanState.m_columns.size()); bool *nulls = (bool *)duckdb_malloc(sizeof(bool) * parallelScanState.m_columns.size()); From 5b66cd7a031e19fb68585939c4194144cb5505af Mon Sep 17 00:00:00 2001 From: mkaruza Date: Sat, 27 Apr 2024 19:04:28 +0200 Subject: [PATCH 4/5] Fixed issue with column filtering and projection * Fixed incorrect column id for query filtering * Writing output vector now works with/without projection information --- include/quack/quack_heap_seq_scan.hpp | 11 +++++------ src/quack_heap_scan.cpp | 2 +- src/quack_heap_seq_scan.cpp | 26 ++++++++++++++++---------- src/quack_select.cpp | 1 - src/quack_types.cpp | 4 ++-- 5 files changed, 24 insertions(+), 20 deletions(-) diff --git a/include/quack/quack_heap_seq_scan.hpp b/include/quack/quack_heap_seq_scan.hpp index 4021ef33..1921d407 100644 --- a/include/quack/quack_heap_seq_scan.hpp +++ b/include/quack/quack_heap_seq_scan.hpp @@ -39,7 +39,7 @@ class PostgresHeapSeqParallelScanState { public: PostgresHeapSeqParallelScanState() - : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_count_tuple_only(false), + : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_count_tuples_only(false), m_total_row_count(0), m_last_prefetch_block(0), m_strategy(nullptr) { } ~PostgresHeapSeqParallelScanState() { @@ -51,9 +51,9 @@ class PostgresHeapSeqParallelScanState { std::mutex m_lock; BlockNumber m_nblocks; BlockNumber m_last_assigned_block_number; - bool m_count_tuple_only; - duckdb::map m_columns; - duckdb::map m_projections; + bool m_count_tuples_only; + duckdb::map m_columns; + duckdb::map m_projections; duckdb::TableFilterSet *m_filters = nullptr; std::atomic m_total_row_count; BlockNumber m_last_prefetch_block; @@ -71,8 +71,7 @@ class PostgresHeapSeqScan { PostgresHeapSeqScan(PostgresHeapSeqScan &&other); public: - void InitParallelScanState(const duckdb::vector &columns, - const duckdb::vector &projections, duckdb::TableFilterSet *filters); + void InitParallelScanState( duckdb::TableFunctionInitInput &input); void SetSnapshot(Snapshot snapshot) { m_snapshot = snapshot; diff --git a/src/quack_heap_scan.cpp b/src/quack_heap_scan.cpp index 7da2a2ee..19d7a98a 100644 --- a/src/quack_heap_scan.cpp +++ b/src/quack_heap_scan.cpp @@ -31,7 +31,7 @@ PostgresHeapScanFunctionData::~PostgresHeapScanFunctionData() { PostgresHeapScanGlobalState::PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation, duckdb::TableFunctionInitInput &input) { elog(DEBUG3, "-- (DuckDB/PostgresHeapScanGlobalState) Running %lu threads -- ", MaxThreads()); - relation.InitParallelScanState(input.column_ids, input.projection_ids, input.filters.get()); + relation.InitParallelScanState(input); } PostgresHeapScanGlobalState::~PostgresHeapScanGlobalState() { diff --git a/src/quack_heap_seq_scan.cpp b/src/quack_heap_seq_scan.cpp index 562498da..3d028632 100644 --- a/src/quack_heap_seq_scan.cpp +++ b/src/quack_heap_seq_scan.cpp @@ -56,27 +56,33 @@ PostgresHeapSeqScan::PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanIn } void -PostgresHeapSeqScan::InitParallelScanState(const duckdb::vector &columns, - const duckdb::vector &projections, - duckdb::TableFilterSet *filters) { +PostgresHeapSeqScan::InitParallelScanState( duckdb::TableFunctionInitInput &input) { m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel); - if (columns.size() == 1 && columns[0] == UINT64_MAX) { - m_parallel_scan_state.m_count_tuple_only = true; + /* SELECT COUNT(*) FROM */ + if (input.column_ids.size() == 1 && input.column_ids[0] == UINT64_MAX) { + m_parallel_scan_state.m_count_tuples_only = true; return; } /* We need ordered columns ids for tuple fetch */ - for (duckdb::idx_t i = 0; i < columns.size(); i++) { - m_parallel_scan_state.m_columns[columns[i]] = i; + for (duckdb::idx_t i = 0; i < input.column_ids.size(); i++) { + m_parallel_scan_state.m_columns[input.column_ids[i]] = i; } - for (duckdb::idx_t i = 0; i < columns.size(); i++) { - m_parallel_scan_state.m_projections[projections[i]] = columns[i]; + if (input.CanRemoveFilterColumns()) { + for (duckdb::idx_t i = 0; i < input.projection_ids.size(); i++) { + m_parallel_scan_state.m_projections[input.projection_ids[i]] = input.column_ids[i]; + } + } else { + for (duckdb::idx_t i = 0; i < input.projection_ids.size(); i++) { + m_parallel_scan_state.m_projections[i] = input.column_ids[i]; + } } + //m_parallel_scan_state.PrefetchNextRelationPages(m_rel); - m_parallel_scan_state.m_filters = filters; + m_parallel_scan_state.m_filters = input.filters.get(); } bool diff --git a/src/quack_select.cpp b/src/quack_select.cpp index d69557dd..bf879499 100644 --- a/src/quack_select.cpp +++ b/src/quack_select.cpp @@ -78,7 +78,6 @@ quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t co column_count = res->ColumnCount(); while (true) { - auto chunk = res->Fetch(); if (!chunk || chunk->size() == 0) { diff --git a/src/quack_types.cpp b/src/quack_types.cpp index d6c8fc48..5c139436 100644 --- a/src/quack_types.cpp +++ b/src/quack_types.cpp @@ -239,7 +239,7 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &t PostgresHeapSeqParallelScanState ¶llelScanState) { HeapTupleReadState heapTupleReadState = {}; - if (parallelScanState.m_count_tuple_only) { + if (parallelScanState.m_count_tuples_only) { threadScanInfo.m_output_vector_size++; return; } @@ -253,7 +253,7 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &t values[valueIdx] = HeapTupleFetchNextColumnDatum(threadScanInfo.m_tuple_desc, &threadScanInfo.m_tuple, heapTupleReadState, columnIdx + 1, &nulls[valueIdx]); if (parallelScanState.m_filters && - (parallelScanState.m_filters->filters.find(columnIdx) != parallelScanState.m_filters->filters.end())) { + (parallelScanState.m_filters->filters.find(valueIdx) != parallelScanState.m_filters->filters.end())) { auto &filter = parallelScanState.m_filters->filters[valueIdx]; validTuple = ApplyValueFilter(*filter, values[valueIdx], nulls[valueIdx], threadScanInfo.m_tuple_desc->attrs[columnIdx].atttypid); From a6cb22de846c2e28ad54b62305dadb66bfd70b8c Mon Sep 17 00:00:00 2001 From: mkaruza Date: Thu, 2 May 2024 08:46:56 +0200 Subject: [PATCH 5/5] Add FIXME reminder for duckdb_malloc/duckdb_free calls --- src/quack_types.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/quack_types.cpp b/src/quack_types.cpp index 5c139436..c3e29390 100644 --- a/src/quack_types.cpp +++ b/src/quack_types.cpp @@ -244,6 +244,7 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &t return; } + /* FIXME: all calls to duckdb_malloc/duckdb_free should be changed in future */ Datum *values = (Datum *)duckdb_malloc(sizeof(Datum) * parallelScanState.m_columns.size()); bool *nulls = (bool *)duckdb_malloc(sizeof(bool) * parallelScanState.m_columns.size());