Skip to content

Commit

Permalink
Tuple filtering on page level
Browse files Browse the repository at this point in the history
* Filter tuple on page level
  • Loading branch information
mkaruza committed Apr 27, 2024
1 parent 24c6c40 commit 28e46d3
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 24 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ EXTENSION = quack
DATA = quack.control $(wildcard quack--*.sql)

SRCS = src/quack_detoast.cpp \
src/quack_filter.cpp \
src/quack_heap_scan.cpp \
src/quack_heap_seq_scan.cpp \
src/quack_hooks.cpp \
Expand Down Expand Up @@ -35,7 +36,7 @@ else
QUACK_BUILD_DUCKDB = release
endif

override PG_CPPFLAGS += -Iinclude -Ithird_party/duckdb/src/include -std=c++17 ${QUACK_BUILD_CXX_FLAGS}
override PG_CPPFLAGS += -Iinclude -Ithird_party/duckdb/src/include -std=c++17 -Wno-sign-compare ${QUACK_BUILD_CXX_FLAGS}

SHLIB_LINK += -Wl,-rpath,$(PG_LIB)/ -lpq -L$(PG_LIB) -lduckdb -Lthird_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src -lstdc++

Expand Down
11 changes: 11 additions & 0 deletions include/quack/quack_filter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#pragma once

#include "duckdb.hpp"

extern "C" {
#include "postgres.h"
}

namespace quack {
bool ApplyValueFilter(duckdb::TableFilter &filter, Datum &value, bool isNull, Oid typeOid);
} // namespace quack
16 changes: 14 additions & 2 deletions include/quack/quack_heap_seq_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ extern "C" {
#include "postgres.h"
#include "access/tableam.h"
#include "access/heapam.h"
#include "storage/bufmgr.h"
}

#include <mutex>
Expand Down Expand Up @@ -33,18 +34,29 @@ class PostgresHeapSeqScanThreadInfo {
};

class PostgresHeapSeqParallelScanState {
private:
static int const k_max_prefetch_block_number = 32;

public:
PostgresHeapSeqParallelScanState()
: m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_total_row_count(0) {
: m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_total_row_count(0),
m_last_prefetch_block(0), m_strategy(nullptr) {
}
~PostgresHeapSeqParallelScanState() {
if (m_strategy)
pfree(m_strategy);
}
BlockNumber AssignNextBlockNumber();
void PrefetchNextRelationPages(Relation rel);
std::mutex m_lock;
BlockNumber m_nblocks;
BlockNumber m_last_assigned_block_number;
duckdb::map<duckdb::column_t, duckdb::idx_t> m_columns;
duckdb::vector<duckdb::idx_t> m_projections;
duckdb::map<duckdb::idx_t, duckdb::column_t> m_projections;
duckdb::TableFilterSet *m_filters = nullptr;
std::atomic<std::uint32_t> m_total_row_count;
BlockNumber m_last_prefetch_block;
BufferAccessStrategy m_strategy;
};

class PostgresHeapSeqScan {
Expand Down
1 change: 0 additions & 1 deletion include/quack/quack_select.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

extern "C" {
#include "postgres.h"

#include "executor/executor.h"
}

Expand Down
6 changes: 4 additions & 2 deletions src/quack_detoast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

extern "C" {
#include "postgres.h"
#include "pg_config.h"
#include "varatt.h"

#ifdef USE_LZ4
Expand Down Expand Up @@ -48,7 +49,6 @@ _pglz_decompress_datum(const struct varlena *value) {
struct varlena *
_lz4_decompress_datum(const struct varlena *value) {
#ifndef USE_LZ4
NO_LZ4_SUPPORT();
return NULL; /* keep compiler quiet */
#else
int32 rawsize;
Expand All @@ -69,7 +69,9 @@ _lz4_decompress_datum(const struct varlena *value) {

static struct varlena *
_toast_decompress_datum(struct varlena *attr) {
switch (TOAST_COMPRESS_METHOD(attr)) {
ToastCompressionId cmid;
cmid = (ToastCompressionId)TOAST_COMPRESS_METHOD(attr);
switch (cmid) {
case TOAST_PGLZ_COMPRESSION_ID:
return _pglz_decompress_datum(attr);
case TOAST_LZ4_COMPRESSION_ID:
Expand Down
88 changes: 88 additions & 0 deletions src/quack_filter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#include "duckdb.hpp"
#include "duckdb/planner/filter/constant_filter.hpp"

extern "C" {
#include "postgres.h"
#include "catalog/pg_type.h"
}

namespace quack {

template <class T, class OP>
bool
TemplatedFilterOperation(Datum &value, const duckdb::Value &constant) {
return OP::Operation((T)value, constant.GetValueUnsafe<T>());
}

template <class OP>
static bool
FilterOperationSwitch(Datum &value, duckdb::Value &constant, Oid typeOid) {
switch (typeOid) {
case BOOLOID:
return TemplatedFilterOperation<bool, OP>(value, constant);
break;
case CHAROID:
return TemplatedFilterOperation<uint8_t, OP>(value, constant);
break;
case INT2OID:
return TemplatedFilterOperation<int16_t, OP>(value, constant);
break;
case INT4OID:
return TemplatedFilterOperation<int32_t, OP>(value, constant);
break;
case INT8OID:
return TemplatedFilterOperation<int64_t, OP>(value, constant);
break;
default:
elog(ERROR, "Unsupported quack type: %d", typeOid);
}
}

bool
ApplyValueFilter(duckdb::TableFilter &filter, Datum &value, bool isNull, Oid typeOid) {
switch (filter.filter_type) {
case duckdb::TableFilterType::CONJUNCTION_AND: {
auto &conjunction = filter.Cast<duckdb::ConjunctionAndFilter>();
bool valueFilterResult = true;
for (auto &child_filter : conjunction.child_filters) {
valueFilterResult &= ApplyValueFilter(*child_filter, value, isNull, typeOid);
}
return valueFilterResult;
break;
}
case duckdb::TableFilterType::CONSTANT_COMPARISON: {
auto &constant_filter = filter.Cast<duckdb::ConstantFilter>();
switch (constant_filter.comparison_type) {
case duckdb::ExpressionType::COMPARE_EQUAL:
return FilterOperationSwitch<duckdb::Equals>(value, constant_filter.constant, typeOid);
break;
case duckdb::ExpressionType::COMPARE_LESSTHAN:
return FilterOperationSwitch<duckdb::LessThan>(value, constant_filter.constant, typeOid);
break;
case duckdb::ExpressionType::COMPARE_LESSTHANOREQUALTO:
return FilterOperationSwitch<duckdb::LessThanEquals>(value, constant_filter.constant, typeOid);
break;
case duckdb::ExpressionType::COMPARE_GREATERTHAN:
return FilterOperationSwitch<duckdb::GreaterThan>(value, constant_filter.constant, typeOid);
break;
case duckdb::ExpressionType::COMPARE_GREATERTHANOREQUALTO:
return FilterOperationSwitch<duckdb::GreaterThanEquals>(value, constant_filter.constant, typeOid);
break;
default:
D_ASSERT(0);
}
break;
}
case duckdb::TableFilterType::IS_NOT_NULL:
return isNull == false;
break;
case duckdb::TableFilterType::IS_NULL:
return isNull == true;
break;
default:
D_ASSERT(0);
break;
}
}

} // namespace quack
4 changes: 2 additions & 2 deletions src/quack_heap_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ PostgresHeapScanFunction::PostgresHeapScanFunction()
named_parameters["table"] = duckdb::LogicalType::POINTER;
named_parameters["snapshot"] = duckdb::LogicalType::POINTER;
projection_pushdown = true;
// filter_pushdown = true;
// filter_prune = true;
filter_pushdown = true;
filter_prune = true;
}

duckdb::unique_ptr<duckdb::FunctionData>
Expand Down
33 changes: 27 additions & 6 deletions src/quack_heap_seq_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ PostgresHeapSeqScan::IsValid() const {
}

TupleDesc

PostgresHeapSeqScan::GetTupleDesc() {
return RelationGetDescr(m_rel);
}
Expand All @@ -60,10 +61,13 @@ PostgresHeapSeqScan::InitParallelScanState(const duckdb::vector<duckdb::column_t
duckdb::TableFilterSet *filters) {
m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel);
/* We need ordered columns ids for tuple fetch */
for(duckdb::idx_t i = 0; i < columns.size(); i++) {
for (duckdb::idx_t i = 0; i < columns.size(); i++) {
m_parallel_scan_state.m_columns[columns[i]] = i;
}
m_parallel_scan_state.m_projections = projections;
for (duckdb::idx_t i = 0; i < columns.size(); i++) {
m_parallel_scan_state.m_projections[projections[i]] = columns[i];
}
//m_parallel_scan_state.PrefetchNextRelationPages(m_rel);
m_parallel_scan_state.m_filters = filters;
}

Expand All @@ -90,17 +94,16 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc
m_parallel_scan_state.m_lock.lock();
block = threadScanInfo.m_block_number;
threadScanInfo.m_buffer =
ReadBufferExtended(m_rel, MAIN_FORKNUM, block, RBM_NORMAL, GetAccessStrategy(BAS_BULKREAD));
ReadBufferExtended(m_rel, MAIN_FORKNUM, block, RBM_NORMAL, m_parallel_scan_state.m_strategy);
LockBuffer(threadScanInfo.m_buffer, BUFFER_LOCK_SHARE);
//m_parallel_scan_state.PrefetchNextRelationPages(m_rel);
m_parallel_scan_state.m_lock.unlock();

page = PreparePageRead(threadScanInfo);
threadScanInfo.m_read_next_page = false;
}

for (; threadScanInfo.m_page_tuples_left > 0 && threadScanInfo.m_output_vector_size < STANDARD_VECTOR_SIZE;
threadScanInfo.m_page_tuples_left--, threadScanInfo.m_current_tuple_index++,
threadScanInfo.m_output_vector_size++) {
threadScanInfo.m_page_tuples_left--, threadScanInfo.m_current_tuple_index++) {
bool visible = true;
ItemId lpp = PageGetItemId(page, threadScanInfo.m_current_tuple_index);

Expand Down Expand Up @@ -168,6 +171,24 @@ PostgresHeapSeqParallelScanState::AssignNextBlockNumber() {
return block_number;
}

void
PostgresHeapSeqParallelScanState::PrefetchNextRelationPages(Relation rel) {

BlockNumber last_batch_prefetch_block_num = m_last_prefetch_block + k_max_prefetch_block_number > m_nblocks
? m_nblocks
: m_last_prefetch_block + k_max_prefetch_block_number;

if (m_last_assigned_block_number != InvalidBlockNumber &&
(m_last_prefetch_block - m_last_assigned_block_number) > 8)
return;


for (BlockNumber i = m_last_prefetch_block; i < last_batch_prefetch_block_num; i++) {
PrefetchBuffer(rel, MAIN_FORKNUM, m_last_prefetch_block);
m_last_prefetch_block++;
}
}

PostgresHeapSeqScanThreadInfo::PostgresHeapSeqScanThreadInfo()
: m_tuple_desc(NULL), m_inited(false), m_read_next_page(true), m_block_number(InvalidBlockNumber),
m_buffer(InvalidBuffer), m_current_tuple_index(InvalidOffsetNumber), m_page_tuples_left(0) {
Expand Down
4 changes: 3 additions & 1 deletion src/quack_select.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ namespace quack {
static duckdb::unique_ptr<duckdb::DuckDB>
quack_open_database() {
duckdb::DBConfig config;
// config.allocator = duckdb::make_uniq<duckdb::Allocator>(QuackAllocate, QuackFree, QuackReallocate, nullptr);
//config.SetOption("memory_limit", "2GB");
//config.SetOption("threads", "8");
//config.allocator = duckdb::make_uniq<duckdb::Allocator>(QuackAllocate, QuackFree, QuackReallocate, nullptr);
return duckdb::make_uniq<duckdb::DuckDB>(nullptr, &config);
}

Expand Down
37 changes: 28 additions & 9 deletions src/quack_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ extern "C" {
}

#include "quack/quack.h"
#include "quack/quack_filter.hpp"
#include "quack/quack_heap_seq_scan.hpp"
#include "quack/quack_detoast.hpp"

Expand Down Expand Up @@ -119,7 +120,6 @@ AppendString(duckdb::Vector &result, Datum value, idx_t offset) {

void
ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) {

switch (result.GetType().id()) {
case duckdb::LogicalTypeId::BOOLEAN:
Append<bool>(result, DatumGetBool(value), offset);
Expand Down Expand Up @@ -242,28 +242,47 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &t
Datum *values = (Datum *)duckdb_malloc(sizeof(Datum) * parallelScanState.m_columns.size());
bool *nulls = (bool *)duckdb_malloc(sizeof(bool) * parallelScanState.m_columns.size());

bool validTuple = true;

for (auto const &[columnIdx, valueIdx] : parallelScanState.m_columns) {
values[valueIdx] = HeapTupleFetchNextColumnDatum(threadScanInfo.m_tuple_desc, &threadScanInfo.m_tuple,
heapTupleReadState, columnIdx + 1, &nulls[valueIdx]);
auto &result = output.data[valueIdx];
if (nulls[valueIdx]) {
if (parallelScanState.m_filters &&
(parallelScanState.m_filters->filters.find(columnIdx) != parallelScanState.m_filters->filters.end())) {
auto &filter = parallelScanState.m_filters->filters[valueIdx];
validTuple = ApplyValueFilter(*filter, values[valueIdx], nulls[valueIdx],
threadScanInfo.m_tuple_desc->attrs[columnIdx].atttypid);
}

if (!validTuple) {
break;
}
}

for (idx_t idx = 0; validTuple && idx < parallelScanState.m_projections.size(); idx++) {
auto &result = output.data[idx];
if (nulls[idx]) {
auto &array_mask = duckdb::FlatVector::Validity(result);
array_mask.SetInvalid(threadScanInfo.m_output_vector_size);
} else {
if (threadScanInfo.m_tuple_desc->attrs[columnIdx].attlen == -1) {
if (threadScanInfo.m_tuple_desc->attrs[parallelScanState.m_projections[idx]].attlen == -1) {
bool shouldFree = false;
values[valueIdx] =
DetoastPostgresDatum(reinterpret_cast<varlena *>(values[valueIdx]), parallelScanState.m_lock, &shouldFree);
ConvertPostgresToDuckValue(values[valueIdx] , result, threadScanInfo.m_output_vector_size);
values[idx] = DetoastPostgresDatum(reinterpret_cast<varlena *>(values[idx]), parallelScanState.m_lock,
&shouldFree);
ConvertPostgresToDuckValue(values[idx], result, threadScanInfo.m_output_vector_size);
if (shouldFree) {
duckdb_free(reinterpret_cast<void*>(values[valueIdx]));
duckdb_free(reinterpret_cast<void *>(values[idx]));
}
} else {
ConvertPostgresToDuckValue(values[valueIdx], result, threadScanInfo.m_output_vector_size);
ConvertPostgresToDuckValue(values[idx], result, threadScanInfo.m_output_vector_size);
}
}
}

if (validTuple) {
threadScanInfo.m_output_vector_size++;
}

parallelScanState.m_total_row_count++;

duckdb_free(values);
Expand Down

0 comments on commit 28e46d3

Please sign in to comment.