Skip to content

Commit

Permalink
Fixed query projection / Added filter for DATE
Browse files Browse the repository at this point in the history
  • Loading branch information
mkaruza committed May 8, 2024
1 parent 991a52d commit 2ae51dc
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 15 deletions.
4 changes: 4 additions & 0 deletions include/quack/quack_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ extern "C" {

namespace quack {

// DuckDB has date starting from 1/1/1970 while PG starts from 1/1/2000
constexpr int32_t QUACK_DUCK_DATE_OFFSET = 10957;
constexpr int64_t QUACK_DUCK_TIMESTAMP_OFFSET = INT64CONST(10957) * USECS_PER_DAY;

duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type);
Oid GetPostgresDuckDBType(duckdb::LogicalTypeId type);
void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset);
Expand Down
16 changes: 15 additions & 1 deletion src/quack_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ extern "C" {
#include "catalog/pg_type.h"
}

#include "quack/quack_filter.hpp"
#include "quack/quack_types.hpp"

namespace quack {

template <class T, class OP>
Expand Down Expand Up @@ -33,8 +36,19 @@ FilterOperationSwitch(Datum &value, duckdb::Value &constant, Oid typeOid) {
case INT8OID:
return TemplatedFilterOperation<int64_t, OP>(value, constant);
break;
case FLOAT4OID:
return TemplatedFilterOperation<float, OP>(value, constant);
break;
case FLOAT8OID:
return TemplatedFilterOperation<double, OP>(value, constant);
break;
case DATEOID: {
Datum dateDatum = static_cast<int32_t>(value + quack::QUACK_DUCK_DATE_OFFSET);
return TemplatedFilterOperation<int32_t, OP>(dateDatum, constant);
break;
}
default:
elog(ERROR, "Unsupported quack type: %d", typeOid);
elog(ERROR, "(DuckDB/FilterOperationSwitch) Unsupported quack type: %d", typeOid);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/quack_heap_seq_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ PostgresHeapSeqScan::InitParallelScanState(duckdb::TableFunctionInitInput &input

if (input.CanRemoveFilterColumns()) {
for (duckdb::idx_t i = 0; i < input.projection_ids.size(); i++) {
m_parallel_scan_state.m_projections[input.projection_ids[i]] = input.column_ids[i];
m_parallel_scan_state.m_projections[i] = input.column_ids[input.projection_ids[i]];
}
} else {
for (duckdb::idx_t i = 0; i < input.projection_ids.size(); i++) {
Expand Down
26 changes: 13 additions & 13 deletions src/quack_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,10 @@ extern "C" {
#include "quack/quack_filter.hpp"
#include "quack/quack_heap_seq_scan.hpp"
#include "quack/quack_detoast.hpp"
#include "quack/quack_types.hpp"

namespace quack {

// DuckDB has date starting from 1/1/1970 while PG starts from 1/1/2000
constexpr int32_t QUACK_DUCK_DATE_OFFSET = 10957;
constexpr int64_t QUACK_DUCK_TIMESTAMP_OFFSET = INT64CONST(10957) * USECS_PER_DAY;

void
ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col) {
Oid oid = slot->tts_tupleDescriptor->attrs[col].atttypid;
Expand Down Expand Up @@ -54,12 +51,12 @@ ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col
}
case DATEOID: {
duckdb::date_t date = value.GetValue<duckdb::date_t>();
slot->tts_values[col] = date.days - QUACK_DUCK_DATE_OFFSET;
slot->tts_values[col] = date.days - quack::QUACK_DUCK_DATE_OFFSET;
break;
}
case TIMESTAMPOID: {
duckdb::dtime_t timestamp = value.GetValue<duckdb::dtime_t>();
slot->tts_values[col] = timestamp.micros - QUACK_DUCK_TIMESTAMP_OFFSET;
slot->tts_values[col] = timestamp.micros - quack::QUACK_DUCK_TIMESTAMP_OFFSET;
break;
}
case FLOAT8OID:
Expand Down Expand Up @@ -123,7 +120,7 @@ GetPostgresDuckDBType(duckdb::LogicalTypeId type) {
case duckdb::LogicalTypeId::DOUBLE:
return FLOAT8OID;
default:
elog(ERROR, "(DuckDB/GetPostgresDuckDBType) Unsupported quack type: %d", static_cast<int>(type));
elog(ERROR, "(DuckDB/GetPostgresDuckDBType) Unsupported quack type: %d", static_cast<int>(type));
}
}

Expand Down Expand Up @@ -173,7 +170,8 @@ ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) {
offset);
break;
default:
elog(ERROR, "Unsupported quack type: %d", static_cast<int>(result.GetType().id()));
elog(ERROR, "(DuckDB/ConvertPostgresToDuckValue) Unsupported quack type: %d",
static_cast<int>(result.GetType().id()));
break;
}
}
Expand Down Expand Up @@ -297,16 +295,18 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &t
auto &array_mask = duckdb::FlatVector::Validity(result);
array_mask.SetInvalid(threadScanInfo.m_output_vector_size);
} else {
idx_t projectionColumnIdx = parallelScanState.m_columns[parallelScanState.m_projections[idx]];
if (threadScanInfo.m_tuple_desc->attrs[parallelScanState.m_projections[idx]].attlen == -1) {
bool shouldFree = false;
values[idx] = DetoastPostgresDatum(reinterpret_cast<varlena *>(values[idx]), parallelScanState.m_lock,
&shouldFree);
ConvertPostgresToDuckValue(values[idx], result, threadScanInfo.m_output_vector_size);
values[projectionColumnIdx] =
DetoastPostgresDatum(reinterpret_cast<varlena *>(values[projectionColumnIdx]),
parallelScanState.m_lock, &shouldFree);
ConvertPostgresToDuckValue(values[projectionColumnIdx], result, threadScanInfo.m_output_vector_size);
if (shouldFree) {
duckdb_free(reinterpret_cast<void *>(values[idx]));
duckdb_free(reinterpret_cast<void *>(values[projectionColumnIdx]));
}
} else {
ConvertPostgresToDuckValue(values[idx], result, threadScanInfo.m_output_vector_size);
ConvertPostgresToDuckValue(values[projectionColumnIdx], result, threadScanInfo.m_output_vector_size);
}
}
}
Expand Down

0 comments on commit 2ae51dc

Please sign in to comment.