Skip to content

Commit

Permalink
Quack node
Browse files Browse the repository at this point in the history
* Hook into postgres planner rather than on execution. Idea is to split
  DuckDB execution into prepare/execute phase. If DuckDB prepare is not
  valid will we fall back to normal postgres execution.

* Another benefit of having planner hook is that custom RETURN TABLE
  FUNCTIONS can be used in PG syntax. This custom function should only
  be created to pass parsing phase. Wee can now use `SELECT * FROM
  read_parquet('...')` that will read parquet files through DuckDB.

* Created quack node which will be used for duckdb exection. This quack
  nodes is defined as `Custom Scan Node`. EXPLAIN will work out of box
  with this approach - we'll output just explain plan from DuckDB
  execution.

* Added `httpfs` extension to be build together with parquer extension.
  • Loading branch information
mkaruza committed May 6, 2024
1 parent e55721c commit 5ad3580
Show file tree
Hide file tree
Showing 17 changed files with 386 additions and 156 deletions.
13 changes: 7 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ SRCS = src/quack_detoast.cpp \
src/quack_heap_scan.cpp \
src/quack_heap_seq_scan.cpp \
src/quack_hooks.cpp \
src/quack_select.cpp \
src/quack_types.cpp \
src/quack_memory_allocator.cpp \
src/quack_node.cpp \
src/quack_planner.cpp \
src/quack_types.cpp \
src/quack.cpp

OBJS = $(subst .cpp,.o, $(SRCS))
Expand All @@ -32,8 +33,8 @@ ifeq ($(QUACK_BUILD), Debug)
QUACK_BUILD_CXX_FLAGS = -g -O0
QUACK_BUILD_DUCKDB = debug
else
QUACK_BUILD_CXX_FLAGS =
QUACK_BUILD_DUCKDB = release
QUACK_BUILD_CXX_FLAGS = -g -O0
QUACK_BUILD_DUCKDB = debug
endif

override PG_CPPFLAGS += -Iinclude -Ithird_party/duckdb/src/include -std=c++17 -Wno-sign-compare ${QUACK_BUILD_CXX_FLAGS}
Expand All @@ -54,7 +55,7 @@ ifeq ($(UNAME_S),Linux)
DUCKDB_LIB = libduckdb.so
endif

all: duckdb $(OBJS)
all: duckdb $(OBJS) .depend

include $(PGXS)

Expand All @@ -64,7 +65,7 @@ third_party/duckdb/Makefile:
git submodule update --init --recursive

third_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src/$(DUCKDB_LIB):
$(MAKE) -C third_party/duckdb $(QUACK_BUILD_DUCKDB) DISABLE_SANITIZER=1 ENABLE_UBSAN=0 BUILD_UNITTESTS=OFF CMAKE_EXPORT_COMPILE_COMMANDS=1
$(MAKE) -C third_party/duckdb $(QUACK_BUILD_DUCKDB) DISABLE_SANITIZER=1 ENABLE_UBSAN=0 BUILD_UNITTESTS=OFF BUILD_HTTPFS=1 CMAKE_EXPORT_COMPILE_COMMANDS=1

install_duckdb:
$(install_bin) -m 755 third_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src/$(DUCKDB_LIB) $(DESTDIR)$(PG_LIB)
Expand Down
2 changes: 2 additions & 0 deletions include/quack/quack_filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,7 @@ extern "C" {
}

namespace quack {

bool ApplyValueFilter(duckdb::TableFilter &filter, Datum &value, bool isNull, Oid typeOid);

} // namespace quack
5 changes: 3 additions & 2 deletions include/quack/quack_heap_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,13 @@ struct PostgresHeapScanFunction : public duckdb::TableFunction {

struct PostgresHeapReplacementScanData : public duckdb::ReplacementScanData {
public:
PostgresHeapReplacementScanData(QueryDesc *desc) : desc(desc) {
PostgresHeapReplacementScanData(Query *parse, const char *query) : m_parse(parse), m_query(query) {
}
~PostgresHeapReplacementScanData() override {};

public:
QueryDesc *desc;
Query *m_parse;
std::string m_query;
};

duckdb::unique_ptr<duckdb::TableRef> PostgresHeapReplacementScan(duckdb::ClientContext &context,
Expand Down
2 changes: 2 additions & 0 deletions include/quack/quack_heap_seq_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class PostgresHeapSeqScan {

public:
Relation GetRelation();
void CloseRelation();
TupleDesc GetTupleDesc();
bool ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo);
bool IsValid() const;
Expand All @@ -87,6 +88,7 @@ class PostgresHeapSeqScan {
Page PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanInfo);

private:
RangeTblEntry * m_tableEntry = nullptr;
Relation m_rel = nullptr;
Snapshot m_snapshot = nullptr;
PostgresHeapSeqParallelScanState m_parallel_scan_state;
Expand Down
9 changes: 9 additions & 0 deletions include/quack/quack_node.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once

extern "C" {
#include "postgres.h"
#include "nodes/extensible.h"
}

extern CustomScanMethods quack_scan_scan_methods;
extern "C" void quack_init_node(void);
8 changes: 8 additions & 0 deletions include/quack/quack_planner.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#pragma once

extern "C" {
#include "postgres.h"
#include "optimizer/planner.h"
}

PlannedStmt *quack_plan_node(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams);
8 changes: 0 additions & 8 deletions include/quack/quack_select.h

This file was deleted.

3 changes: 3 additions & 0 deletions include/quack/quack_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ extern "C" {
#include "quack/quack_heap_seq_scan.hpp"

namespace quack {

duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type);
Oid GetPostgresDuckDBType(duckdb::LogicalTypeId type);
void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset);
void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col);
void InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo,
PostgresHeapSeqParallelScanState &parallelScanState);

} // namespace quack
8 changes: 8 additions & 0 deletions quack--0.0.1.sql
Original file line number Diff line number Diff line change
@@ -1 +1,9 @@
LOAD 'quack';

CREATE OR REPLACE FUNCTION read_parquet(path text)
RETURNS SETOF record LANGUAGE 'plpgsql' AS
$func$
BEGIN
RETURN QUERY EXECUTE 'SELECT 1';
END;
$func$;
3 changes: 2 additions & 1 deletion src/quack.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
extern "C" {
#include "postgres.h"

#include "utils/guc.h"
}

#include "quack/quack.h"
#include "quack/quack_node.hpp"

static void quack_init_guc(void);

Expand All @@ -17,6 +17,7 @@ void
_PG_init(void) {
quack_init_guc();
quack_init_hooks();
quack_init_node();
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/quack_heap_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,14 @@ PostgresHeapReplacementScan(duckdb::ClientContext &context, const duckdb::string
auto &scan_data = reinterpret_cast<PostgresHeapReplacementScanData &>(*data);

/* Check name against query rtable list and verify that it is heap table */
auto table = FindMatchingHeapRelation(scan_data.desc->plannedstmt->rtable, table_name);
auto table = FindMatchingHeapRelation(scan_data.m_parse->rtable, table_name);

if (!table) {
return nullptr;
}

// Create POINTER values from the 'table' and 'snapshot' variables
auto children = CreateFunctionArguments(table, scan_data.desc->estate->es_snapshot);
auto children = CreateFunctionArguments(table, GetActiveSnapshot());
auto table_function = duckdb::make_uniq<duckdb::TableFunctionRef>();
table_function->function = duckdb::make_uniq<duckdb::FunctionExpression>("postgres_heap_scan", std::move(children));

Expand Down
28 changes: 20 additions & 8 deletions src/quack_heap_seq_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ extern "C" {
namespace quack {

PostgresHeapSeqScan::PostgresHeapSeqScan(RangeTblEntry *table)
: m_rel(RelationIdGetRelation(table->relid)), m_snapshot(nullptr) {
: m_tableEntry(table), m_rel(nullptr), m_snapshot(nullptr) {
}

PostgresHeapSeqScan::~PostgresHeapSeqScan() {
Expand All @@ -25,15 +25,28 @@ PostgresHeapSeqScan::~PostgresHeapSeqScan() {
}
}

PostgresHeapSeqScan::PostgresHeapSeqScan(PostgresHeapSeqScan &&other) : m_rel(other.m_rel) {
other.m_rel = nullptr;
PostgresHeapSeqScan::PostgresHeapSeqScan(PostgresHeapSeqScan &&other)
: m_tableEntry(other.m_tableEntry), m_rel(nullptr) {
other.CloseRelation();
other.m_tableEntry = nullptr;
}

Relation
PostgresHeapSeqScan::GetRelation() {
if (m_tableEntry && m_rel == nullptr) {
m_rel = RelationIdGetRelation(m_tableEntry->relid);
}
return m_rel;
}

void
PostgresHeapSeqScan::CloseRelation() {
if (IsValid()) {
RelationClose(m_rel);
}
m_rel = nullptr;
}

bool
PostgresHeapSeqScan::IsValid() const {
return RelationIsValid(m_rel);
Expand All @@ -56,7 +69,8 @@ PostgresHeapSeqScan::PreparePageRead(PostgresHeapSeqScanThreadInfo &threadScanIn
}

void
PostgresHeapSeqScan::InitParallelScanState( duckdb::TableFunctionInitInput &input) {
PostgresHeapSeqScan::InitParallelScanState(duckdb::TableFunctionInitInput &input) {
(void) GetRelation();
m_parallel_scan_state.m_nblocks = RelationGetNumberOfBlocks(m_rel);

/* SELECT COUNT(*) FROM */
Expand All @@ -80,8 +94,7 @@ PostgresHeapSeqScan::InitParallelScanState( duckdb::TableFunctionInitInput &inpu
}
}


//m_parallel_scan_state.PrefetchNextRelationPages(m_rel);
// m_parallel_scan_state.PrefetchNextRelationPages(m_rel);
m_parallel_scan_state.m_filters = input.filters.get();
}

Expand Down Expand Up @@ -110,7 +123,7 @@ PostgresHeapSeqScan::ReadPageTuples(duckdb::DataChunk &output, PostgresHeapSeqSc
threadScanInfo.m_buffer =
ReadBufferExtended(m_rel, MAIN_FORKNUM, block, RBM_NORMAL, m_parallel_scan_state.m_strategy);
LockBuffer(threadScanInfo.m_buffer, BUFFER_LOCK_SHARE);
//m_parallel_scan_state.PrefetchNextRelationPages(m_rel);
// m_parallel_scan_state.PrefetchNextRelationPages(m_rel);
m_parallel_scan_state.m_lock.unlock();
page = PreparePageRead(threadScanInfo);
threadScanInfo.m_read_next_page = false;
Expand Down Expand Up @@ -196,7 +209,6 @@ PostgresHeapSeqParallelScanState::PrefetchNextRelationPages(Relation rel) {
(m_last_prefetch_block - m_last_assigned_block_number) > 8)
return;


for (BlockNumber i = m_last_prefetch_block; i < last_batch_prefetch_block_num; i++) {
PrefetchBuffer(rel, MAIN_FORKNUM, m_last_prefetch_block);
m_last_prefetch_block++;
Expand Down
28 changes: 14 additions & 14 deletions src/quack_hooks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ extern "C" {
}

#include "quack/quack.h"
#include "quack/quack_select.h"
#include "quack/quack_planner.hpp"

static ExecutorRun_hook_type PrevExecutorRunHook = NULL;
static planner_hook_type PrevPlannerHook = NULL;

static bool
is_quack_extension_registered() {
Expand All @@ -33,24 +33,24 @@ is_catalog_table(List *tables) {
return false;
}

static void
quack_executor_run(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) {
if (is_quack_extension_registered() && !is_catalog_table(queryDesc->plannedstmt->rtable) &&
queryDesc->operation == CMD_SELECT) {
if (quack_execute_select(queryDesc, direction, count)) {
return;
static PlannedStmt *
quack_planner(Query *parse, const char *query_string, int cursorOptions, ParamListInfo boundParams) {
if (is_quack_extension_registered() && !is_catalog_table(parse->rtable) && parse->commandType == CMD_SELECT) {
PlannedStmt * quackPlan = quack_plan_node(parse, query_string, cursorOptions, boundParams);
if (quackPlan) {
return quackPlan;
}
}

elog(DEBUG3, "quack_executor_run: Failing back to PG execution");

if (PrevExecutorRunHook) {
PrevExecutorRunHook(queryDesc, direction, count, execute_once);
if (PrevPlannerHook) {
return PrevPlannerHook(parse, query_string, cursorOptions, boundParams);
} else {
return standard_planner(parse, query_string, cursorOptions, boundParams);
}
}

void
quack_init_hooks(void) {
PrevExecutorRunHook = ExecutorRun_hook ? ExecutorRun_hook : standard_ExecutorRun;
ExecutorRun_hook = quack_executor_run;
PrevPlannerHook = planner_hook;
planner_hook = quack_planner;
}
Loading

0 comments on commit 5ad3580

Please sign in to comment.