Skip to content

Commit

Permalink
start of the catalog extension - next step is that we need an Extensi…
Browse files Browse the repository at this point in the history
…on to be able to have a CatalogExtension
  • Loading branch information
Tishj committed Aug 1, 2024
1 parent 14c9ca8 commit 92d9de4
Show file tree
Hide file tree
Showing 10 changed files with 533 additions and 8 deletions.
10 changes: 7 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ SRCS = src/scan/heap_reader.cpp \
src/pgduckdb_options.cpp \
src/pgduckdb_planner.cpp \
src/pgduckdb_types.cpp \
src/pgduckdb.cpp
src/pgduckdb.cpp \
src/catalog/pgduckdb_storage.cpp \
src/catalog/pgduckdb_schema.cpp \
src/catalog/pgduckdb_table.cpp \
src/catalog/pgduckdb_catalog.cpp

OBJS = $(subst .cpp,.o, $(SRCS))

DUCKDB_BUILD_CXX_FLAGS=
DUCKDB_BUILD_TYPE=
DUCKDB_BUILD_CXX_FLAGS:=
DUCKDB_BUILD_TYPE:=

ifeq ($(DUCKDB_BUILD), Debug)
DUCKDB_BUILD_CXX_FLAGS = -g -O0
Expand Down
72 changes: 72 additions & 0 deletions include/pgduckdb/catalog/pgduckdb_catalog.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#pragma once

#include "duckdb/storage/storage_extension.hpp"
#include "duckdb/catalog/catalog.hpp"
#include "pgduckdb/catalog/pgduckdb_schema.hpp"

extern "C" {
#include "postgres.h"
#include "miscadmin.h"
#include "utils/snapshot.h"
}

namespace pgduckdb {

using duckdb::optional_ptr;
using duckdb::case_insensitive_map_t;
using duckdb::unique_ptr;
using duckdb::string;
using duckdb::CatalogEntry;
using duckdb::TableCatalogEntry;
using duckdb::SchemaCatalogEntry;
using duckdb::Catalog;
using duckdb::AttachedDatabase;
using duckdb::AccessMode;
using duckdb::AttachInfo;
using duckdb::StorageExtensionInfo;
using duckdb::ClientContext;
using duckdb::CreateSchemaInfo;
using duckdb::CatalogTransaction;
using duckdb::PhysicalOperator;
using duckdb::LogicalOperator;
using duckdb::CreateStatement;
using duckdb::LogicalInsert;
using duckdb::LogicalDelete;
using duckdb::LogicalUpdate;
using duckdb::LogicalCreateTable;
using duckdb::OnEntryNotFound;
using duckdb::QueryErrorContext;
using duckdb::Binder;
using duckdb::DropInfo;
using duckdb::DatabaseSize;

class PostgresCatalog : public Catalog {
public:
PostgresCatalog(AttachedDatabase &db, const string &connection_string, AccessMode access_mode);
public:
static unique_ptr<Catalog> Attach(StorageExtensionInfo *storage_info, ClientContext &context, AttachedDatabase &db, const string &name, AttachInfo &info, AccessMode access_mode);
public:
string path;
AccessMode access_mode;
public:
// -- Catalog API --
void Initialize(bool load_builtin) override;
string GetCatalogType() override;
optional_ptr<CatalogEntry> CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) override;
optional_ptr<SchemaCatalogEntry> GetSchema(CatalogTransaction transaction, const string &schema_name, OnEntryNotFound if_not_found, QueryErrorContext error_context = QueryErrorContext()) override;
void ScanSchemas(ClientContext &context, std::function<void(SchemaCatalogEntry &)> callback) override;
unique_ptr<PhysicalOperator> PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op, unique_ptr<PhysicalOperator> plan) override;
unique_ptr<PhysicalOperator> PlanInsert(ClientContext &context, LogicalInsert &op, unique_ptr<PhysicalOperator> plan) override;
unique_ptr<PhysicalOperator> PlanDelete(ClientContext &context, LogicalDelete &op, unique_ptr<PhysicalOperator> plan) override;
unique_ptr<PhysicalOperator> PlanUpdate(ClientContext &context, LogicalUpdate &op, unique_ptr<PhysicalOperator> plan) override;
unique_ptr<LogicalOperator> BindCreateIndex(Binder &binder, CreateStatement &stmt, TableCatalogEntry &table, unique_ptr<LogicalOperator> plan) override;
DatabaseSize GetDatabaseSize(ClientContext &context) override;
bool InMemory() override;
string GetDBPath() override;
void DropSchema(ClientContext &context, DropInfo &info) override;
private:
case_insensitive_map_t<unique_ptr<PostgresSchema>> schemas;
Snapshot snapshot;
};

} // namespace pgduckdb
67 changes: 67 additions & 0 deletions include/pgduckdb/catalog/pgduckdb_schema.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#pragma once

#include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp"
#include "pgduckdb/catalog/pgduckdb_table.hpp"

extern "C" {
#include "postgres.h"
#include "miscadmin.h"
#include "utils/snapshot.h"
}

namespace pgduckdb {

using duckdb::optional_ptr;
using duckdb::unique_ptr;
using duckdb::string;
using duckdb::case_insensitive_map_t;
using duckdb::CatalogType;
using duckdb::CatalogEntry;
using duckdb::TableCatalogEntry;
using duckdb::SchemaCatalogEntry;
using duckdb::Catalog;
using duckdb::AttachedDatabase;
using duckdb::AccessMode;
using duckdb::AlterInfo;
using duckdb::ClientContext;
using duckdb::CreateSchemaInfo;
using duckdb::CreateTableFunctionInfo;
using duckdb::CreateSequenceInfo;
using duckdb::CreateViewInfo;
using duckdb::CreateFunctionInfo;
using duckdb::CreateTypeInfo;
using duckdb::CreateIndexInfo;
using duckdb::CreateCollationInfo;
using duckdb::CreateCopyFunctionInfo;
using duckdb::CreatePragmaFunctionInfo;
using duckdb::BoundCreateTableInfo;
using duckdb::CatalogTransaction;
using duckdb::DropInfo;

class PostgresSchema : public SchemaCatalogEntry {
public:
PostgresSchema(Catalog &catalog, CreateSchemaInfo &info, Snapshot snapshot);
public:
// -- Schema API --
void Scan(ClientContext &context, CatalogType type, const std::function<void(CatalogEntry &)> &callback) override;
void Scan(CatalogType type, const std::function<void(CatalogEntry &)> &callback) override;
optional_ptr<CatalogEntry> CreateIndex(CatalogTransaction transaction, CreateIndexInfo &info, TableCatalogEntry &table) override;
optional_ptr<CatalogEntry> CreateFunction(CatalogTransaction transaction, CreateFunctionInfo &info) override;
optional_ptr<CatalogEntry> CreateTable(CatalogTransaction transaction, BoundCreateTableInfo &info) override;
optional_ptr<CatalogEntry> CreateView(CatalogTransaction transaction, CreateViewInfo &info) override;
optional_ptr<CatalogEntry> CreateSequence(CatalogTransaction transaction, CreateSequenceInfo &info) override;
optional_ptr<CatalogEntry> CreateTableFunction(CatalogTransaction transaction, CreateTableFunctionInfo &info) override;
optional_ptr<CatalogEntry> CreateCopyFunction(CatalogTransaction transaction, CreateCopyFunctionInfo &info) override;
optional_ptr<CatalogEntry> CreatePragmaFunction(CatalogTransaction transaction, CreatePragmaFunctionInfo &info) override;
optional_ptr<CatalogEntry> CreateCollation(CatalogTransaction transaction, CreateCollationInfo &info) override;
optional_ptr<CatalogEntry> CreateType(CatalogTransaction transaction, CreateTypeInfo &info) override;
optional_ptr<CatalogEntry> GetEntry(CatalogTransaction transaction, CatalogType type, const string &name) override;
void DropEntry(ClientContext &context, DropInfo &info) override;
void Alter(CatalogTransaction transaction, AlterInfo &info) override;
private:
case_insensitive_map_t<unique_ptr<PostgresTable>> tables;
Snapshot snapshot;
Catalog &catalog;
};

} // namespace pgduckdb
24 changes: 24 additions & 0 deletions include/pgduckdb/catalog/pgduckdb_storage.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include "duckdb/storage/storage_extension.hpp"
extern "C" {
#include "postgres.h"
#include "miscadmin.h"
#include "utils/snapshot.h"
}

namespace pgduckdb {

class PostgresStorageExtensionInfo : public duckdb::StorageExtensionInfo {
public:
PostgresStorageExtensionInfo(Snapshot snapshot) : snapshot(snapshot) {}
public:
Snapshot snapshot;
};

class PostgresStorageExtension : public duckdb::StorageExtension {
public:
PostgresStorageExtension(Snapshot snapshot);
};

} // namespace pgduckdb
44 changes: 44 additions & 0 deletions include/pgduckdb/catalog/pgduckdb_table.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#pragma once

#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
#include "duckdb/storage/table_storage_info.hpp"

extern "C" {
#include "postgres.h"
#include "utils/snapshot.h"
}

namespace pgduckdb {

using duckdb::optional_ptr;
using duckdb::unique_ptr;
using duckdb::string;
using duckdb::CatalogType;
using duckdb::CatalogEntry;
using duckdb::TableCatalogEntry;
using duckdb::SchemaCatalogEntry;
using duckdb::Catalog;
using duckdb::ClientContext;
using duckdb::TableStorageInfo;
using duckdb::FunctionData;
using duckdb::TableFunction;
using duckdb::column_t;
using duckdb::CreateTableInfo;
using duckdb::BaseStatistics;

class PostgresTable : public TableCatalogEntry {
public:
PostgresTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, Oid oid, Snapshot snapshot);
public:
static bool PopulateColumns(CreateTableInfo &info, Oid relid, Snapshot snapshot);
public:
// -- Table API --
unique_ptr<BaseStatistics> GetStatistics(ClientContext &context, column_t column_id) override;
TableFunction GetScanFunction(ClientContext &context, unique_ptr<FunctionData> &bind_data) override;
TableStorageInfo GetStorageInfo(ClientContext &context) override;
private:
Oid oid;
Snapshot snapshot;
};

} // namespace pgduckdb
135 changes: 135 additions & 0 deletions src/catalog/pgduckdb_catalog.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#include "pgduckdb/catalog/pgduckdb_catalog.hpp"
#include "duckdb/parser/parsed_data/attach_info.hpp"
#include "duckdb/parser/parsed_data/create_schema_info.hpp"

extern "C" {
#include "postgres.h"
#include "fmgr.h"
#include "catalog/pg_namespace.h"
#include "utils/syscache.h"
#include "utils/builtins.h"
#include "utils/rel.h"
#include "utils/snapshot.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "catalog/indexing.h"
#include "access/genam.h"
#include "access/xact.h"
}

namespace pgduckdb {

PostgresCatalog::PostgresCatalog(duckdb::AttachedDatabase &db, const duckdb::string &connection_string, duckdb::AccessMode access_mode)
: duckdb::Catalog(db), path(connection_string), access_mode(access_mode) {
}

duckdb::unique_ptr<duckdb::Catalog> PostgresCatalog::Attach(duckdb::StorageExtensionInfo *storage_info, duckdb::ClientContext &context, duckdb::AttachedDatabase &db, const duckdb::string &name, duckdb::AttachInfo &info, duckdb::AccessMode access_mode) {
duckdb::string connection_string = info.path;

if (!storage_info) {
throw duckdb::InternalException("PostgresCatalog should always have access to the PostgresStorageExtensionInfo");
}
return duckdb::make_uniq<PostgresCatalog>(db, connection_string, access_mode);
}

// ------------------ Catalog API ---------------------

void PostgresCatalog::Initialize(bool load_builtin) {
return;
}

string PostgresCatalog::GetCatalogType() {
return "pgduckdb";
}

optional_ptr<CatalogEntry> PostgresCatalog::CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) {
throw duckdb::NotImplementedException("CreateSchema not supported yet");
}

static Oid LookupSchema(const string &schema_name, Snapshot snapshot) {
auto rel = table_open(NamespaceRelationId, AccessShareLock);

ScanKeyData key;
ScanKeyInit(&key,
Anum_pg_namespace_nspname,
BTEqualStrategyNumber,
NAMEOID,
CStringGetDatum(schema_name.c_str()));

auto scan = systable_beginscan(rel, NamespaceNameIndexId, true, snapshot, 1, &key);

auto tuple = systable_getnext(scan);
Oid nspoid = InvalidOid;
if (HeapTupleIsValid(tuple)) {
nspoid = ((Form_pg_namespace) GETSTRUCT(tuple))->oid;
}

systable_endscan(scan);
table_close(rel, AccessShareLock);

return nspoid;
}

optional_ptr<SchemaCatalogEntry> PostgresCatalog::GetSchema(CatalogTransaction transaction, const string &schema_name, OnEntryNotFound if_not_found, QueryErrorContext error_context) {
if (schema_name == DEFAULT_SCHEMA) {
return GetSchema(transaction, "public", if_not_found, error_context);
}

auto it = schemas.find(schema_name);
if (it != schemas.end()) {
return it->second.get();
}

auto oid = LookupSchema(schema_name, snapshot);
if (!OidIsValid(oid)) {
// Schema could not be found
return nullptr;
}

CreateSchemaInfo create_schema;
create_schema.schema = schema_name;
schemas[schema_name] = duckdb::make_uniq<PostgresSchema>(*this, create_schema, snapshot);
return schemas[schema_name].get();
}

void PostgresCatalog::ScanSchemas(ClientContext &context, std::function<void(SchemaCatalogEntry &)> callback) {
throw duckdb::NotImplementedException("ScanSchemas not supported yet");
}

unique_ptr<PhysicalOperator> PostgresCatalog::PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op, unique_ptr<PhysicalOperator> plan) {
throw duckdb::NotImplementedException("PlanCreateTableAs not supported yet");
}

unique_ptr<PhysicalOperator> PostgresCatalog::PlanInsert(ClientContext &context, LogicalInsert &op, unique_ptr<PhysicalOperator> plan) {
throw duckdb::NotImplementedException("PlanInsert not supported yet");
}

unique_ptr<PhysicalOperator> PostgresCatalog::PlanDelete(ClientContext &context, LogicalDelete &op, unique_ptr<PhysicalOperator> plan) {
throw duckdb::NotImplementedException("PlanDelete not supported yet");
}

unique_ptr<PhysicalOperator> PostgresCatalog::PlanUpdate(ClientContext &context, LogicalUpdate &op, unique_ptr<PhysicalOperator> plan) {
throw duckdb::NotImplementedException("PlanUpdate not supported yet");
}

unique_ptr<LogicalOperator> PostgresCatalog::BindCreateIndex(Binder &binder, CreateStatement &stmt, TableCatalogEntry &table, unique_ptr<LogicalOperator> plan) {
throw duckdb::NotImplementedException("BindCreateIndex not supported yet");
}

DatabaseSize PostgresCatalog::GetDatabaseSize(ClientContext &context) {
throw duckdb::NotImplementedException("GetDatabaseSize not supported yet");
}

bool PostgresCatalog::InMemory() {
return false;
}

string PostgresCatalog::GetDBPath() {
return path;
}

void PostgresCatalog::DropSchema(ClientContext &context, DropInfo &info) {
throw duckdb::NotImplementedException("DropSchema not supported yet");
}

} // namespace pgduckdb
Loading

0 comments on commit 92d9de4

Please sign in to comment.