Skip to content

Commit

Permalink
Use cloud secret from quack.secrets table
Browse files Browse the repository at this point in the history
* There can be multiple cloud secrets provided. Secrets can be added to
  `quack.secrets` table. All secrets in table will be added to duckdb
  execution.
  • Loading branch information
mkaruza committed Jun 20, 2024
1 parent 726db29 commit 93e81ee
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 44 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

MODULE_big = quack
EXTENSION = quack
DATA = quack.control $(wildcard quack--*.sql)
DATA = quack.control $(wildcard sql/quack--*.sql)

SRCS = src/utility/copy.cpp \
src/quack_detoast.cpp \
Expand Down
1 change: 0 additions & 1 deletion include/quack/quack.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// quack.c
extern bool quack_execution;
extern int quack_max_threads_per_query;
extern char *quack_secret;
extern "C" void _PG_init(void);

// quack_hooks.c
Expand Down
29 changes: 29 additions & 0 deletions quack--0.0.1.sql → sql/quack--0.0.1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,32 @@ BEGIN
RAISE EXCEPTION 'Function `read_csv(TEXT[])` only works with Duckdb execution.';
END;
$func$;

CREATE SCHEMA quack;
SET search_path TO quack;

CREATE TABLE secrets (
type TEXT NOT NULL,
id TEXT NOT NULL,
secret TEXT NOT NULL,
region TEXT,
endpoint TEXT,
r2_account_id TEXT,
CONSTRAINT type_constraint CHECK (type IN ('S3', 'GCS', 'R2'))
);

CREATE OR REPLACE FUNCTION quack_secret_r2_check()
RETURNS TRIGGER AS
$$
BEGIN
IF NEW.type = 'R2' AND NEW.r2_account_id IS NULL THEN
Raise Exception '`R2` cloud type secret requires valid `r2_account_id` column value';
END IF;
RETURN NEW;
END;
$$ LANGUAGE PLpgSQL;

CREATE TRIGGER quack_secret_r2_tr BEFORE INSERT OR UPDATE ON secrets
FOR EACH ROW EXECUTE PROCEDURE quack_secret_r2_check();

RESET search_path;
34 changes: 0 additions & 34 deletions src/quack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ static void quack_init_guc(void);

bool quack_execution = true;
int quack_max_threads_per_query = 1;
char *quack_secret = nullptr;

extern "C" {
PG_MODULE_MAGIC;
Expand All @@ -25,28 +24,6 @@ _PG_init(void) {

}

static bool
quack_cloud_secret_check_hooks(char **newval, void **extra, GucSource source) {

std::vector<std::string> tokens = quack::tokenizeString(*newval, '#');

if (tokens.size() == 0) {
return true;
}

if (tokens.size() != 4) {
elog(WARNING, "Incorrect quack.cloud_secret format.");
return false;
}

if (tokens[0].compare("S3")) {
elog(WARNING, "quack.cloud_secret supports only S3.");
return false;
}

return true;
}

/* clang-format off */
static void
quack_init_guc(void) {
Expand All @@ -73,15 +50,4 @@ quack_init_guc(void) {
NULL,
NULL,
NULL);

DefineCustomStringVariable("quack.cloud_secret",
"Quack (duckdb) cloud secret GUC. Format is TYPE#ID#SECRET#REGION",
NULL,
&quack_secret,
"",
PGC_USERSET,
0,
&quack_cloud_secret_check_hooks,
NULL,
NULL);
}
107 changes: 101 additions & 6 deletions src/quack_duckdb_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,94 @@
#include "duckdb/parser/parsed_data/create_table_function_info.hpp"
#include "duckdb/main/extension_util.hpp"

extern "C" {
#include "postgres.h"
#include "access/genam.h"
#include "catalog/namespace.h"
#include "utils/lsyscache.h"
}

#include "quack/quack_duckdb_connection.hpp"
#include "quack/quack_heap_scan.hpp"
#include "quack/quack_utils.hpp"

#include <string>

namespace quack {

/* constants for quack.secrets */
#define Natts_quack_secret 6
#define Anum_quack_secret_type 1
#define Anum_quack_secret_id 2
#define Anum_quack_secret_secret 3
#define Anum_quack_secret_region 4
#define Anum_quack_secret_endpoint 5
#define Anum_quack_secret_r2_account_id 6

typedef struct QuackSecret {
std::string type;
std::string id;
std::string secret;
std::string region;
std::string endpoint;
std::string r2_account_id;
} QuackSecret;

static Oid
quack_get_namespace(void) {
return get_namespace_oid("quack", false);
}

static Oid
quack_secret_relation_id(void) {
return get_relname_relid("secrets", quack_get_namespace());
}

static std::string
read_quack_secret_column(Datum columnDatum) {
std::string columnValue;
text *cloudType = DatumGetTextPP(columnDatum);
columnValue = VARDATA_ANY(cloudType);
columnValue.resize(VARSIZE_ANY_EXHDR(cloudType));
return columnValue;
}

std::vector<QuackSecret>
read_quack_secrets() {
HeapTuple tuple = NULL;
Oid quackSecretRelationId = quack_secret_relation_id();
Relation quackSecretRelation = table_open(quackSecretRelationId, AccessShareLock);
SysScanDescData *scan = systable_beginscan(quackSecretRelation, InvalidOid, false, GetActiveSnapshot(), 0, NULL);
std::vector<QuackSecret> quack_secrets;

while (HeapTupleIsValid(tuple = systable_getnext(scan))) {
Datum datumArray[Natts_quack_secret];
bool isNullArray[Natts_quack_secret];

heap_deform_tuple(tuple, RelationGetDescr(quackSecretRelation), datumArray, isNullArray);
QuackSecret secret;

secret.type = read_quack_secret_column(datumArray[Anum_quack_secret_type - 1]);
secret.id = read_quack_secret_column(datumArray[Anum_quack_secret_id - 1]);
secret.secret = read_quack_secret_column(datumArray[Anum_quack_secret_secret - 1]);

if (!isNullArray[Anum_quack_secret_region - 1])
secret.region = read_quack_secret_column(datumArray[Anum_quack_secret_region - 1]);

if (!isNullArray[Anum_quack_secret_endpoint - 1])
secret.endpoint = read_quack_secret_column(datumArray[Anum_quack_secret_endpoint - 1]);

if (!isNullArray[Anum_quack_secret_r2_account_id - 1])
secret.endpoint = read_quack_secret_column(datumArray[Anum_quack_secret_r2_account_id - 1]);

quack_secrets.push_back(secret);
}

systable_endscan(scan);
table_close(quackSecretRelation, NoLock);
return quack_secrets;
}

static duckdb::unique_ptr<duckdb::DuckDB>
quack_open_database() {
duckdb::DBConfig config;
Expand All @@ -27,7 +109,6 @@ quack_create_duckdb_connection(List *tables, List *neededColumns, const char *qu
duckdb::make_uniq_base<duckdb::ReplacementScanData, quack::PostgresHeapReplacementScanData>(
tables, neededColumns, query));


auto connection = duckdb::make_uniq<duckdb::Connection>(*db);

// Add the postgres_scan inserted by the replacement scan
Expand All @@ -42,14 +123,28 @@ quack_create_duckdb_connection(List *tables, List *neededColumns, const char *qu
catalog.CreateTableFunction(context, &heap_scan_info);
context.transaction.Commit();

if (strlen(quack_secret) != 0) {
std::vector<std::string> quackSecret = quack::tokenizeString(quack_secret, '#');
auto quackSecrets = read_quack_secrets();

int secretId = 0;
for (auto &secret : quackSecrets) {
StringInfo s3SecretKey = makeStringInfo();
appendStringInfoString(s3SecretKey, "CREATE SECRET s3Secret ");
appendStringInfo(s3SecretKey, "(TYPE S3, KEY_ID '%s', SECRET '%s', REGION '%s');", quackSecret[1].c_str(),
quackSecret[2].c_str(), quackSecret[3].c_str());
bool isR2CloudSecret = (secret.type.rfind("R2", 0) == 0);
appendStringInfo(s3SecretKey, "CREATE SECRET quackSecret_%d ", secretId);
appendStringInfo(s3SecretKey, "(TYPE %s, KEY_ID '%s', SECRET '%s'", secret.type.c_str(),
secret.id.c_str(), secret.secret.c_str());
if (secret.region.length() && !isR2CloudSecret) {
appendStringInfo(s3SecretKey, ", REGION '%s'", secret.region.c_str());
}
if (secret.endpoint.length() && !isR2CloudSecret) {
appendStringInfo(s3SecretKey, ", ENDPOINT '%s'", secret.endpoint.c_str());
}
if (isR2CloudSecret) {
appendStringInfo(s3SecretKey, ", ACCOUNT_ID '%s'", secret.endpoint.c_str());
}
appendStringInfo(s3SecretKey, ");");
context.Query(s3SecretKey->data, false);
pfree(s3SecretKey->data);
secretId++;
}

return connection;
Expand Down
8 changes: 6 additions & 2 deletions src/utility/copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ extern "C" {
#include "quack/quack_duckdb_connection.hpp"

static constexpr char quackCopyS3FilenamePrefix[] = "s3://";
static constexpr char quackCopyGCSFilenamePrefix[] = "gs://";
static constexpr char quackCopyR2FilenamePrefix[] = "r2://";

static bool
create_relation_copy_parse_state(ParseState *pstate, const CopyStmt *stmt, List **vars, int stmt_location,
Expand Down Expand Up @@ -80,8 +82,10 @@ bool
quack_copy(PlannedStmt *pstmt, const char *queryString, struct QueryEnvironment *queryEnv, uint64 *processed) {
CopyStmt *copyStmt = (CopyStmt *)pstmt->utilityStmt;

/* Copy `filename` should start with S3 prefix */
if (duckdb::string(copyStmt->filename).rfind(quackCopyS3FilenamePrefix, 0)) {
/* Copy `filename` should start with S3/GS/R2 prefix */
if (duckdb::string(copyStmt->filename).rfind(quackCopyS3FilenamePrefix, 0) &&
duckdb::string(copyStmt->filename).rfind(quackCopyGCSFilenamePrefix, 0) &&
duckdb::string(copyStmt->filename).rfind(quackCopyR2FilenamePrefix, 0)) {
return false;
}

Expand Down

0 comments on commit 93e81ee

Please sign in to comment.