Skip to content

Commit

Permalink
synchronize around prepared->Execute and around calls
Browse files Browse the repository at this point in the history
  • Loading branch information
Tishj committed Jun 18, 2024
1 parent f75eb98 commit 65e4345
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 45 deletions.
33 changes: 33 additions & 0 deletions include/quack/quack_error.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include <mutex>

namespace quack {

struct Logger {
public:
static std::mutex &GetLock() {
static std::mutex lock;
return lock;
}
};

} // namespace quack

extern "C" {
#include "postgres.h"
};

#define elog_quack(elevel, ...) \
quack::Logger::GetLock().lock(); \
PG_TRY(); \
{ \
ereport(elevel, errmsg_internal(__VA_ARGS__)); \
} \
PG_CATCH(); \
{ \
quack::Logger::GetLock().unlock(); \
PG_RE_THROW(); \
} \
quack::Logger::GetLock().unlock(); \
PG_END_TRY(); \
5 changes: 3 additions & 2 deletions src/quack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ extern "C" {
#include "quack/quack.h"
#include "quack/quack_node.hpp"
#include "quack/quack_utils.hpp"
#include "quack/quack_error.hpp"

static void quack_init_guc(void);

Expand Down Expand Up @@ -35,12 +36,12 @@ quack_cloud_secret_check_hooks(char **newval, void **extra, GucSource source) {
}

if (tokens.size() != 4) {
elog(WARNING, "Incorrect quack.cloud_secret format.");
elog_quack(WARNING, "Incorrect quack.cloud_secret format.");
return false;
}

if (tokens[0].compare("S3")) {
elog(WARNING, "quack.cloud_secret supports only S3.");
elog_quack(WARNING, "quack.cloud_secret supports only S3.");
return false;
}

Expand Down
5 changes: 3 additions & 2 deletions src/quack_detoast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ extern "C" {
}

#include "quack/quack_types.hpp"
#include "quack/quack_error.hpp"
#include "quack/quack_detoast.hpp"

/*
Expand Down Expand Up @@ -77,7 +78,7 @@ _toast_decompress_datum(struct varlena *attr) {
case TOAST_LZ4_COMPRESSION_ID:
return _lz4_decompress_datum(attr);
default:
elog(ERROR, "invalid compression method id %d", TOAST_COMPRESS_METHOD(attr));
elog_quack(ERROR, "invalid compression method id %d", TOAST_COMPRESS_METHOD(attr));
return NULL; /* keep compiler quiet */
}
}
Expand All @@ -90,7 +91,7 @@ _toast_fetch_datum(struct varlena *attr, std::mutex &lock) {
int32 attrsize;

if (!VARATT_IS_EXTERNAL_ONDISK(attr))
elog(ERROR, "toast_fetch_datum shouldn't be called for non-ondisk datums");
elog_quack(ERROR, "toast_fetch_datum shouldn't be called for non-ondisk datums");

/* Must copy to access aligned fields */
VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
Expand Down
3 changes: 2 additions & 1 deletion src/quack_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ extern "C" {

#include "quack/quack_filter.hpp"
#include "quack/quack_types.hpp"
#include "quack/quack_error.hpp"

namespace quack {

Expand Down Expand Up @@ -53,7 +54,7 @@ FilterOperationSwitch(Datum &value, duckdb::Value &constant, Oid typeOid) {
break;
}
default:
elog(ERROR, "(DuckDB/FilterOperationSwitch) Unsupported quack type: %d", typeOid);
elog_quack(ERROR, "(DuckDB/FilterOperationSwitch) Unsupported quack type: %d", typeOid);
}
}

Expand Down
21 changes: 15 additions & 6 deletions src/quack_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ extern "C" {

#include "quack/quack_node.hpp"
#include "quack/quack_types.hpp"
#include "quack/quack_error.hpp"

#include <mutex>

/* global variables */
CustomScanMethods quack_scan_scan_methods;
Expand All @@ -19,6 +22,7 @@ typedef struct QuackScanState {
CustomScanState css; /* must be first field */
duckdb::Connection *duckdbConnection;
duckdb::PreparedStatement *preparedStatement;
std::mutex execution_lock;
bool is_executed;
bool fetch_next;
duckdb::unique_ptr<duckdb::QueryResult> queryResult;
Expand All @@ -41,9 +45,11 @@ Quack_CreateCustomScanState(CustomScan *cscan) {
CustomScanState *customScanState = &quackScanState->css;
quackScanState->duckdbConnection = (duckdb::Connection *)linitial(cscan->custom_private);
quackScanState->preparedStatement = (duckdb::PreparedStatement *)lsecond(cscan->custom_private);
customScanState->methods = &quack_scan_exec_methods;
quackScanState->is_executed = false;
quackScanState->fetch_next = true;
customScanState->methods = &quack_scan_exec_methods;
std::allocator<std::mutex> allocator;
allocator.construct(&quackScanState->execution_lock);
return (Node *)customScanState;
}

Expand All @@ -60,14 +66,17 @@ Quack_ExecCustomScan(CustomScanState *node) {
TupleTableSlot *slot = quackScanState->css.ss.ss_ScanTupleSlot;
MemoryContext oldContext;

if (!quackScanState->is_executed) {
quackScanState->queryResult = quackScanState->preparedStatement->Execute();
quackScanState->columnCount = quackScanState->queryResult->ColumnCount();
quackScanState->is_executed = true;
{
std::lock_guard<std::mutex> l(quackScanState->execution_lock);
if (!quackScanState->is_executed) {
quackScanState->queryResult = quackScanState->preparedStatement->Execute();
quackScanState->columnCount = quackScanState->queryResult->ColumnCount();
quackScanState->is_executed = true;
}
}

if (quackScanState->queryResult->HasError()) {
elog(ERROR, "Quack execute returned an error: %s", quackScanState->queryResult->GetError().c_str());
elog_quack(ERROR, "Quack execute returned an error: %s", quackScanState->queryResult->GetError().c_str());
}

if (quackScanState->fetch_next) {
Expand Down
7 changes: 4 additions & 3 deletions src/quack_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ extern "C" {
#include "quack/quack_node.hpp"
#include "quack/quack_planner.hpp"
#include "quack/quack_types.hpp"
#include "quack/quack_error.hpp"
#include "quack/quack_utils.hpp"

static PlannerInfo *
Expand Down Expand Up @@ -58,7 +59,7 @@ quack_create_plan(Query *query, const char *queryString, ParamListInfo boundPara
auto preparedQuery = context->Prepare(queryString);

if (preparedQuery->HasError()) {
elog(WARNING, "(Quack) %s", preparedQuery->GetError().c_str());
elog_quack(WARNING, "(Quack) %s", preparedQuery->GetError().c_str());
return nullptr;
}

Expand All @@ -71,14 +72,14 @@ quack_create_plan(Query *query, const char *queryString, ParamListInfo boundPara
Oid postgresColumnOid = quack::GetPostgresDuckDBType(column);

if (!OidIsValid(postgresColumnOid)) {
elog(ERROR, "Could not convert DuckDB to Postgres type, likely because the postgres->duckdb conversion was not supported");
elog_quack(ERROR, "Could not convert DuckDB to Postgres type, likely because the postgres->duckdb conversion was not supported");
}
HeapTuple tp;
Form_pg_type typtup;

tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(postgresColumnOid));
if (!HeapTupleIsValid(tp))
elog(ERROR, "cache lookup failed for type %u", postgresColumnOid);
elog_quack(ERROR, "cache lookup failed for type %u", postgresColumnOid);

typtup = (Form_pg_type)GETSTRUCT(tp);

Expand Down
33 changes: 17 additions & 16 deletions src/quack_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "duckdb/common/shared_ptr.hpp"
#include "duckdb/common/extra_type_info.hpp"
#include "duckdb/common/types/uuid.hpp"
#include "quack/quack_error.hpp"

extern "C" {
#include "postgres.h"
Expand Down Expand Up @@ -240,7 +241,7 @@ struct PostgresArrayAppendState {
expected_values *= to_append;
}
if (dimensions[dimension] != to_append) {
elog(ERROR, "Expected %d values in list at dimension %ld, found %ld instead", dimensions[dimension],
elog_quack(ERROR, "Expected %d values in list at dimension %ld, found %ld instead", dimensions[dimension],
dimension, to_append);
}

Expand All @@ -252,7 +253,7 @@ struct PostgresArrayAppendState {
if (child_val.IsNull()) {
// Postgres arrays can not contains nulls at the array level
// i.e {{1,2}, NULL, {3,4}} is not supported
elog(ERROR, "Returned LIST contains a NULL at an intermediate dimension (not the value level), "
elog_quack(ERROR, "Returned LIST contains a NULL at an intermediate dimension (not the value level), "
"which is not supported in Postgres");
}
AppendValueAtDimension(child_val, dimension + 1);
Expand Down Expand Up @@ -389,27 +390,27 @@ ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col
auto scale = duckdb::DecimalType::GetScale(value.type());
switch (physical_type) {
case duckdb::PhysicalType::INT16: {
elog(INFO, "SMALLINT");
elog_quack(INFO, "SMALLINT");
numeric_var = ConvertNumeric<int16_t>(value.GetValueUnsafe<int16_t>(), scale);
break;
}
case duckdb::PhysicalType::INT32: {
elog(INFO, "INTEGER");
elog_quack(INFO, "INTEGER");
numeric_var = ConvertNumeric<int32_t>(value.GetValueUnsafe<int32_t>(), scale);
break;
}
case duckdb::PhysicalType::INT64: {
elog(INFO, "BIGINT");
elog_quack(INFO, "BIGINT");
numeric_var = ConvertNumeric<int64_t>(value.GetValueUnsafe<int64_t>(), scale);
break;
}
case duckdb::PhysicalType::INT128: {
elog(INFO, "HUGEINT");
elog_quack(INFO, "HUGEINT");
numeric_var = ConvertNumeric<hugeint_t, DecimalConversionHugeint>(value.GetValueUnsafe<hugeint_t>(), scale);
break;
}
default: {
elog(ERROR, "Unrecognized physical type for DECIMAL value");
elog_quack(ERROR, "Unrecognized physical type for DECIMAL value");
break;
}
}
Expand Down Expand Up @@ -449,7 +450,7 @@ ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col
break;
}
default:
elog(ERROR, "(DuckDB/ConvertDuckToPostgresValue) Unsuported quack type: %d", oid);
elog_quack(ERROR, "(DuckDB/ConvertDuckToPostgresValue) Unsuported quack type: %d", oid);
}
}

Expand All @@ -475,7 +476,7 @@ ChildTypeFromArray(Oid array_type) {
case INT8ARRAYOID:
return duckdb::LogicalTypeId::BIGINT;
default:
elog(ERROR, "No child type set for Postgres OID %d", array_type);
elog_quack(ERROR, "No child type set for Postgres OID %d", array_type);
}
}

Expand Down Expand Up @@ -585,7 +586,7 @@ GetPostgresDuckDBType(duckdb::LogicalType type) {
case duckdb::LogicalTypeId::BIGINT:
return INT8ARRAYOID;
default:
elog(ERROR, "(DuckDB/GetPostgresDuckDBType) Unsupported quack type: %s", type.ToString().c_str());
elog_quack(ERROR, "(DuckDB/GetPostgresDuckDBType) Unsupported quack type: %s", type.ToString().c_str());
}
}
default: {
Expand Down Expand Up @@ -692,7 +693,7 @@ ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) {
auto bpchar_data = VARDATA_ANY(value);

if (bpchar_length != 1) {
elog(ERROR, "Expected 1 length BPCHAR for TINYINT marked with IsBpChar at offset %lu", offset);
elog_quack(ERROR, "Expected 1 length BPCHAR for TINYINT marked with IsBpChar at offset %lu", offset);
}
Append<int8_t>(result, bpchar_data[0], offset);
} else {
Expand Down Expand Up @@ -760,7 +761,7 @@ ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) {
break;
}
default: {
elog(ERROR, "Unrecognized physical type for DECIMAL value");
elog_quack(ERROR, "Unrecognized physical type for DECIMAL value");
break;
}
}
Expand Down Expand Up @@ -796,7 +797,7 @@ ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) {
deconstruct_array(array, ARR_ELEMTYPE(array), typlen, typbyval, typalign, &elems, &nulls, &nelems);

if (ndims == -1) {
elog(ERROR, "Array type has an ndims of -1, so it's actually not an array??");
elog_quack(ERROR, "Array type has an ndims of -1, so it's actually not an array??");
}
// Set the list_entry_t metadata
duckdb::Vector *vec = &result;
Expand All @@ -806,7 +807,7 @@ ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) {
auto dimension = dims[dim];
if (vec->GetType().id() != duckdb::LogicalTypeId::LIST) {
// TODO: provide a more detailed description of the error
elog(ERROR, "Dimensionality of the schema and the data does not match");
elog_quack(ERROR, "Dimensionality of the schema and the data does not match");
}
auto child_offset = duckdb::ListVector::GetListSize(*vec);
auto list_data = duckdb::FlatVector::GetData<duckdb::list_entry_t>(*vec);
Expand All @@ -831,7 +832,7 @@ ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) {
if (vec->GetType().id() == duckdb::LogicalTypeId::LIST) {
// Same as before, but now the data has fewer dimensions than the schema
// TODO: provide a more detailed description of the error
elog(ERROR, "Dimensionality of the schema and the data does not match");
elog_quack(ERROR, "Dimensionality of the schema and the data does not match");
}

auto child_type = vec->GetType();
Expand All @@ -848,7 +849,7 @@ ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) {
break;
}
default:
elog(ERROR, "(DuckDB/ConvertPostgresToDuckValue) Unsupported quack type: %s",
elog_quack(ERROR, "(DuckDB/ConvertPostgresToDuckValue) Unsupported quack type: %s",
result.GetType().ToString().c_str());
break;
}
Expand Down
11 changes: 6 additions & 5 deletions src/scan/index_scan_utils.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "quack/scan/index_scan_utils.hpp"
#include "quack/quack_error.hpp"

namespace quack {

Expand Down Expand Up @@ -35,15 +36,15 @@ fix_indexqual_operand(Node *node, IndexOptInfo *index, int indexcol) {
result->varattno = indexcol + 1;
return (Node *)result;
} else
elog(ERROR, "index key does not match expected index column");
elog_quack(ERROR, "index key does not match expected index column");
}

/* It's an index expression, so find and cross-check the expression */
indexpr_item = list_head(index->indexprs);
for (pos = 0; pos < index->ncolumns; pos++) {
if (index->indexkeys[pos] == 0) {
if (indexpr_item == NULL)
elog(ERROR, "too few entries in indexprs list");
elog_quack(ERROR, "too few entries in indexprs list");
if (pos == indexcol) {
Node *indexkey;

Expand All @@ -55,14 +56,14 @@ fix_indexqual_operand(Node *node, IndexOptInfo *index, int indexcol) {
exprCollation((const Node *)lfirst(indexpr_item)), 0);
return (Node *)result;
} else
elog(ERROR, "index key does not match expected index column");
elog_quack(ERROR, "index key does not match expected index column");
}
indexpr_item = lnext(index->indexprs, indexpr_item);
}
}

/* Oops... */
elog(ERROR, "index key does not match expected index column");
elog_quack(ERROR, "index key does not match expected index column");
return NULL; /* keep compiler quiet */
}

Expand Down Expand Up @@ -105,7 +106,7 @@ fix_indexqual_clause(PlannerInfo *root, IndexOptInfo *index, int indexcol, Node
/* Replace the indexkey expression with an index Var. */
nt->arg = (Expr *)fix_indexqual_operand((Node *)(Node *)nt->arg, index, indexcol);
} else
elog(ERROR, "unsupported indexqual type: %d", (int)nodeTag(clause));
elog_quack(ERROR, "unsupported indexqual type: %d", (int)nodeTag(clause));

return clause;
}
Expand Down
Loading

0 comments on commit 65e4345

Please sign in to comment.