From 5b66cd7a031e19fb68585939c4194144cb5505af Mon Sep 17 00:00:00 2001 From: mkaruza Date: Sat, 27 Apr 2024 19:04:28 +0200 Subject: [PATCH] Fixed issue with column filtering and projection * Fixed incorrect column id for query filtering * Writing output vector now works with/without projection information --- include/quack/quack_heap_seq_scan.hpp | 11 +++++------ src/quack_heap_scan.cpp | 2 +- src/quack_heap_seq_scan.cpp | 26 ++++++++++++++++---------- src/quack_select.cpp | 1 - src/quack_types.cpp | 4 ++-- 5 files changed, 24 insertions(+), 20 deletions(-) diff --git a/include/quack/quack_heap_seq_scan.hpp b/include/quack/quack_heap_seq_scan.hpp index 4021ef33..1921d407 100644 --- a/include/quack/quack_heap_seq_scan.hpp +++ b/include/quack/quack_heap_seq_scan.hpp @@ -39,7 +39,7 @@ class PostgresHeapSeqParallelScanState { public: PostgresHeapSeqParallelScanState() - : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_count_tuple_only(false), + : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_count_tuples_only(false), m_total_row_count(0), m_last_prefetch_block(0), m_strategy(nullptr) { } ~PostgresHeapSeqParallelScanState() { @@ -51,9 +51,9 @@ class PostgresHeapSeqParallelScanState { std::mutex m_lock; BlockNumber m_nblocks; BlockNumber m_last_assigned_block_number; - bool m_count_tuple_only; - duckdb::map m_columns; - duckdb::map m_projections; + bool m_count_tuples_only; + duckdb::map m_columns; + duckdb::map m_projections; duckdb::TableFilterSet *m_filters = nullptr; std::atomic m_total_row_count; BlockNumber m_last_prefetch_block; @@ -71,8 +71,7 @@ class PostgresHeapSeqScan { PostgresHeapSeqScan(PostgresHeapSeqScan &&other); public: - void InitParallelScanState(const duckdb::vector &columns, - const duckdb::vector &projections, duckdb::TableFilterSet *filters); + void InitParallelScanState( duckdb::TableFunctionInitInput &input); void SetSnapshot(Snapshot snapshot) { m_snapshot = snapshot; diff --git a/src/quack_heap_scan.cpp b/src/quack_heap_scan.cpp index 7da2a2ee..19d7a98a 100644 --- a/src/quack_heap_scan.cpp +++ b/src/quack_heap_scan.cpp @@ -31,7 +31,7 @@ PostgresHeapScanFunctionData::~PostgresHeapScanFunctionData() { PostgresHeapScanGlobalState::PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation, duckdb::TableFunctionInitInput &input) { elog(DEBUG3, "-- (DuckDB/PostgresHeapScanGlobalState) Running %lu threads -- ", MaxThreads()); - relation.InitParallelScanState(input.column_ids, input.projection_ids, input.filters.get()); + relation.InitParallelScanState(input); } PostgresHeapScanGlobalState::~PostgresHeapScanGlobalState() { diff --git a/src/quack_heap_seq_scan.cpp b/src/quack_heap_seq_scan.cpp index 562498da..3d028632 100644 --- a/src/quack_heap_seq_scan.cpp +++ b/src/quack_heap_seq_scan.cpp @@ -56,27 +56,33 @@ PostgresHeapSeqScan::PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanIn } void -PostgresHeapSeqScan::InitParallelScanState(const duckdb::vector &columns, - const duckdb::vector &projections, - duckdb::TableFilterSet *filters) { +PostgresHeapSeqScan::InitParallelScanState( duckdb::TableFunctionInitInput &input) { m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel); - if (columns.size() == 1 && columns[0] == UINT64_MAX) { - m_parallel_scan_state.m_count_tuple_only = true; + /* SELECT COUNT(*) FROM */ + if (input.column_ids.size() == 1 && input.column_ids[0] == UINT64_MAX) { + m_parallel_scan_state.m_count_tuples_only = true; return; } /* We need ordered columns ids for tuple fetch */ - for (duckdb::idx_t i = 0; i < columns.size(); i++) { - m_parallel_scan_state.m_columns[columns[i]] = i; + for (duckdb::idx_t i = 0; i < input.column_ids.size(); i++) { + m_parallel_scan_state.m_columns[input.column_ids[i]] = i; } - for (duckdb::idx_t i = 0; i < columns.size(); i++) { - m_parallel_scan_state.m_projections[projections[i]] = columns[i]; + if (input.CanRemoveFilterColumns()) { + for (duckdb::idx_t i = 0; i < input.projection_ids.size(); i++) { + m_parallel_scan_state.m_projections[input.projection_ids[i]] = input.column_ids[i]; + } + } else { + for (duckdb::idx_t i = 0; i < input.projection_ids.size(); i++) { + m_parallel_scan_state.m_projections[i] = input.column_ids[i]; + } } + //m_parallel_scan_state.PrefetchNextRelationPages(m_rel); - m_parallel_scan_state.m_filters = filters; + m_parallel_scan_state.m_filters = input.filters.get(); } bool diff --git a/src/quack_select.cpp b/src/quack_select.cpp index d69557dd..bf879499 100644 --- a/src/quack_select.cpp +++ b/src/quack_select.cpp @@ -78,7 +78,6 @@ quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t co column_count = res->ColumnCount(); while (true) { - auto chunk = res->Fetch(); if (!chunk || chunk->size() == 0) { diff --git a/src/quack_types.cpp b/src/quack_types.cpp index d6c8fc48..5c139436 100644 --- a/src/quack_types.cpp +++ b/src/quack_types.cpp @@ -239,7 +239,7 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &t PostgresHeapSeqParallelScanState ¶llelScanState) { HeapTupleReadState heapTupleReadState = {}; - if (parallelScanState.m_count_tuple_only) { + if (parallelScanState.m_count_tuples_only) { threadScanInfo.m_output_vector_size++; return; } @@ -253,7 +253,7 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &t values[valueIdx] = HeapTupleFetchNextColumnDatum(threadScanInfo.m_tuple_desc, &threadScanInfo.m_tuple, heapTupleReadState, columnIdx + 1, &nulls[valueIdx]); if (parallelScanState.m_filters && - (parallelScanState.m_filters->filters.find(columnIdx) != parallelScanState.m_filters->filters.end())) { + (parallelScanState.m_filters->filters.find(valueIdx) != parallelScanState.m_filters->filters.end())) { auto &filter = parallelScanState.m_filters->filters[valueIdx]; validTuple = ApplyValueFilter(*filter, values[valueIdx], nulls[valueIdx], threadScanInfo.m_tuple_desc->attrs[columnIdx].atttypid);