Skip to content

Commit

Permalink
Fixed issue with column filtering and projection
Browse files Browse the repository at this point in the history
* Fixed incorrect column id for query filtering
* Writing output vector now works with/without projection information
  • Loading branch information
mkaruza committed Apr 27, 2024
1 parent ebe5014 commit 5b66cd7
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 20 deletions.
11 changes: 5 additions & 6 deletions include/quack/quack_heap_seq_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class PostgresHeapSeqParallelScanState {

public:
PostgresHeapSeqParallelScanState()
: m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_count_tuple_only(false),
: m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_count_tuples_only(false),
m_total_row_count(0), m_last_prefetch_block(0), m_strategy(nullptr) {
}
~PostgresHeapSeqParallelScanState() {
Expand All @@ -51,9 +51,9 @@ class PostgresHeapSeqParallelScanState {
std::mutex m_lock;
BlockNumber m_nblocks;
BlockNumber m_last_assigned_block_number;
bool m_count_tuple_only;
duckdb::map<duckdb::column_t, duckdb::idx_t> m_columns;
duckdb::map<duckdb::idx_t, duckdb::column_t> m_projections;
bool m_count_tuples_only;
duckdb::map<duckdb::idx_t, duckdb::idx_t> m_columns;
duckdb::map<duckdb::idx_t, duckdb::idx_t> m_projections;
duckdb::TableFilterSet *m_filters = nullptr;
std::atomic<std::uint32_t> m_total_row_count;
BlockNumber m_last_prefetch_block;
Expand All @@ -71,8 +71,7 @@ class PostgresHeapSeqScan {
PostgresHeapSeqScan(PostgresHeapSeqScan &&other);

public:
void InitParallelScanState(const duckdb::vector<duckdb::column_t> &columns,
const duckdb::vector<duckdb::idx_t> &projections, duckdb::TableFilterSet *filters);
void InitParallelScanState( duckdb::TableFunctionInitInput &input);
void
SetSnapshot(Snapshot snapshot) {
m_snapshot = snapshot;
Expand Down
2 changes: 1 addition & 1 deletion src/quack_heap_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ PostgresHeapScanFunctionData::~PostgresHeapScanFunctionData() {
PostgresHeapScanGlobalState::PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation,
duckdb::TableFunctionInitInput &input) {
elog(DEBUG3, "-- (DuckDB/PostgresHeapScanGlobalState) Running %lu threads -- ", MaxThreads());
relation.InitParallelScanState(input.column_ids, input.projection_ids, input.filters.get());
relation.InitParallelScanState(input);
}

PostgresHeapScanGlobalState::~PostgresHeapScanGlobalState() {
Expand Down
26 changes: 16 additions & 10 deletions src/quack_heap_seq_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,33 @@ PostgresHeapSeqScan::PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanIn
}

void
PostgresHeapSeqScan::InitParallelScanState(const duckdb::vector<duckdb::column_t> &columns,
const duckdb::vector<duckdb::idx_t> &projections,
duckdb::TableFilterSet *filters) {
PostgresHeapSeqScan::InitParallelScanState( duckdb::TableFunctionInitInput &input) {
m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel);

if (columns.size() == 1 && columns[0] == UINT64_MAX) {
m_parallel_scan_state.m_count_tuple_only = true;
/* SELECT COUNT(*) FROM */
if (input.column_ids.size() == 1 && input.column_ids[0] == UINT64_MAX) {
m_parallel_scan_state.m_count_tuples_only = true;
return;
}

/* We need ordered columns ids for tuple fetch */
for (duckdb::idx_t i = 0; i < columns.size(); i++) {
m_parallel_scan_state.m_columns[columns[i]] = i;
for (duckdb::idx_t i = 0; i < input.column_ids.size(); i++) {
m_parallel_scan_state.m_columns[input.column_ids[i]] = i;
}

for (duckdb::idx_t i = 0; i < columns.size(); i++) {
m_parallel_scan_state.m_projections[projections[i]] = columns[i];
if (input.CanRemoveFilterColumns()) {
for (duckdb::idx_t i = 0; i < input.projection_ids.size(); i++) {
m_parallel_scan_state.m_projections[input.projection_ids[i]] = input.column_ids[i];
}
} else {
for (duckdb::idx_t i = 0; i < input.projection_ids.size(); i++) {
m_parallel_scan_state.m_projections[i] = input.column_ids[i];
}
}


//m_parallel_scan_state.PrefetchNextRelationPages(m_rel);
m_parallel_scan_state.m_filters = filters;
m_parallel_scan_state.m_filters = input.filters.get();
}

bool
Expand Down
1 change: 0 additions & 1 deletion src/quack_select.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ quack_execute_select(QueryDesc *query_desc, ScanDirection direction, uint64_t co
column_count = res->ColumnCount();

while (true) {

auto chunk = res->Fetch();

if (!chunk || chunk->size() == 0) {
Expand Down
4 changes: 2 additions & 2 deletions src/quack_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &t
PostgresHeapSeqParallelScanState &parallelScanState) {
HeapTupleReadState heapTupleReadState = {};

if (parallelScanState.m_count_tuple_only) {
if (parallelScanState.m_count_tuples_only) {
threadScanInfo.m_output_vector_size++;
return;
}
Expand All @@ -253,7 +253,7 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &t
values[valueIdx] = HeapTupleFetchNextColumnDatum(threadScanInfo.m_tuple_desc, &threadScanInfo.m_tuple,
heapTupleReadState, columnIdx + 1, &nulls[valueIdx]);
if (parallelScanState.m_filters &&
(parallelScanState.m_filters->filters.find(columnIdx) != parallelScanState.m_filters->filters.end())) {
(parallelScanState.m_filters->filters.find(valueIdx) != parallelScanState.m_filters->filters.end())) {
auto &filter = parallelScanState.m_filters->filters[valueIdx];
validTuple = ApplyValueFilter(*filter, values[valueIdx], nulls[valueIdx],
threadScanInfo.m_tuple_desc->attrs[columnIdx].atttypid);
Expand Down

0 comments on commit 5b66cd7

Please sign in to comment.