Skip to content

Commit

Permalink
Update vendored DuckDB sources to 73b56c6
Browse files Browse the repository at this point in the history
  • Loading branch information
duckdblabs-bot committed Dec 7, 2024
1 parent 73b56c6 commit ed948a5
Show file tree
Hide file tree
Showing 35 changed files with 636 additions and 110 deletions.
2 changes: 1 addition & 1 deletion src/duckdb/extension/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ idx_t ColumnReader::Read(uint64_t num_values, parquet_filter_t &filter, data_ptr
ConvertDictToSelVec(reinterpret_cast<uint32_t *>(offset_buffer.ptr),
reinterpret_cast<uint8_t *>(define_out), filter, read_now, result_offset);
if (result_offset == 0) {
result.Slice(*dictionary, dictionary_selection_vector, read_now);
result.Dictionary(*dictionary, dictionary_size + 1, dictionary_selection_vector, read_now);
D_ASSERT(result.GetVectorType() == VectorType::DICTIONARY_VECTOR);
} else {
D_ASSERT(result.GetVectorType() == VectorType::FLAT_VECTOR);
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/common/compressed_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ StreamWrapper::~StreamWrapper() {
}

CompressedFile::CompressedFile(CompressedFileSystem &fs, unique_ptr<FileHandle> child_handle_p, const string &path)
: FileHandle(fs, path), compressed_fs(fs), child_handle(std::move(child_handle_p)) {
: FileHandle(fs, path, child_handle_p->GetFlags()), compressed_fs(fs), child_handle(std::move(child_handle_p)) {
}

CompressedFile::~CompressedFile() {
Expand Down
19 changes: 19 additions & 0 deletions src/duckdb/src/common/enum_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "duckdb/common/enums/file_compression_type.hpp"
#include "duckdb/common/enums/file_glob_options.hpp"
#include "duckdb/common/enums/filter_propagate_result.hpp"
#include "duckdb/common/enums/function_errors.hpp"
#include "duckdb/common/enums/index_constraint_type.hpp"
#include "duckdb/common/enums/join_type.hpp"
#include "duckdb/common/enums/joinref_type.hpp"
Expand Down Expand Up @@ -1702,6 +1703,24 @@ FunctionCollationHandling EnumUtil::FromString<FunctionCollationHandling>(const
return static_cast<FunctionCollationHandling>(StringUtil::StringToEnum(GetFunctionCollationHandlingValues(), 3, "FunctionCollationHandling", value));
}

const StringUtil::EnumStringLiteral *GetFunctionErrorsValues() {
static constexpr StringUtil::EnumStringLiteral values[] {
{ static_cast<uint32_t>(FunctionErrors::CANNOT_ERROR), "CANNOT_ERROR" },
{ static_cast<uint32_t>(FunctionErrors::CAN_THROW_ERROR), "CAN_THROW_ERROR" }
};
return values;
}

template<>
const char* EnumUtil::ToChars<FunctionErrors>(FunctionErrors value) {
return StringUtil::EnumToString(GetFunctionErrorsValues(), 2, "FunctionErrors", static_cast<uint32_t>(value));
}

template<>
FunctionErrors EnumUtil::FromString<FunctionErrors>(const char *value) {
return static_cast<FunctionErrors>(StringUtil::StringToEnum(GetFunctionErrorsValues(), 2, "FunctionErrors", value));
}

const StringUtil::EnumStringLiteral *GetFunctionNullHandlingValues() {
static constexpr StringUtil::EnumStringLiteral values[] {
{ static_cast<uint32_t>(FunctionNullHandling::DEFAULT_NULL_HANDLING), "DEFAULT_NULL_HANDLING" },
Expand Down
3 changes: 2 additions & 1 deletion src/duckdb/src/common/file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,8 @@ bool FileSystem::OnDiskFile(FileHandle &handle) {
}
// LCOV_EXCL_STOP

FileHandle::FileHandle(FileSystem &file_system, string path_p) : file_system(file_system), path(std::move(path_p)) {
FileHandle::FileHandle(FileSystem &file_system, string path_p, FileOpenFlags flags)
: file_system(file_system), path(std::move(path_p)), flags(flags) {
}

FileHandle::~FileHandle() {
Expand Down
11 changes: 6 additions & 5 deletions src/duckdb/src/common/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ bool LocalFileSystem::IsPipe(const string &filename, optional_ptr<FileOpener> op

struct UnixFileHandle : public FileHandle {
public:
UnixFileHandle(FileSystem &file_system, string path, int fd) : FileHandle(file_system, std::move(path)), fd(fd) {
UnixFileHandle(FileSystem &file_system, string path, int fd, FileOpenFlags flags)
: FileHandle(file_system, std::move(path), flags), fd(fd) {
}
~UnixFileHandle() override {
UnixFileHandle::Close();
Expand Down Expand Up @@ -417,7 +418,7 @@ unique_ptr<FileHandle> LocalFileSystem::OpenFile(const string &path_p, FileOpenF
}
}
}
return make_uniq<UnixFileHandle>(*this, path, fd);
return make_uniq<UnixFileHandle>(*this, path, fd, flags);
}

void LocalFileSystem::SetFilePointer(FileHandle &handle, idx_t location) {
Expand Down Expand Up @@ -716,8 +717,8 @@ std::string LocalFileSystem::GetLastErrorAsString() {

struct WindowsFileHandle : public FileHandle {
public:
WindowsFileHandle(FileSystem &file_system, string path, HANDLE fd)
: FileHandle(file_system, path), position(0), fd(fd) {
WindowsFileHandle(FileSystem &file_system, string path, HANDLE fd, FileOpenFlags flags)
: FileHandle(file_system, path, flags), position(0), fd(fd) {
}
~WindowsFileHandle() override {
Close();
Expand Down Expand Up @@ -855,7 +856,7 @@ unique_ptr<FileHandle> LocalFileSystem::OpenFile(const string &path_p, FileOpenF
throw IOException("Cannot open file \"%s\": %s", path.c_str(), error);
}
}
auto handle = make_uniq<WindowsFileHandle>(*this, path.c_str(), hFile);
auto handle = make_uniq<WindowsFileHandle>(*this, path.c_str(), hFile, flags);
if (flags.OpenForAppending()) {
auto file_size = GetFileSize(*handle);
SetFilePointer(*handle, file_size);
Expand Down
8 changes: 4 additions & 4 deletions src/duckdb/src/common/pipe_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
namespace duckdb {
class PipeFile : public FileHandle {
public:
PipeFile(unique_ptr<FileHandle> child_handle_p, const string &path)
: FileHandle(pipe_fs, path), child_handle(std::move(child_handle_p)) {
explicit PipeFile(unique_ptr<FileHandle> child_handle_p)
: FileHandle(pipe_fs, child_handle_p->path, child_handle_p->GetFlags()),
child_handle(std::move(child_handle_p)) {
}

PipeFileSystem pipe_fs;
Expand Down Expand Up @@ -51,8 +52,7 @@ void PipeFileSystem::FileSync(FileHandle &handle) {
}

unique_ptr<FileHandle> PipeFileSystem::OpenPipe(unique_ptr<FileHandle> handle) {
auto path = handle->path;
return make_uniq<PipeFile>(std::move(handle), path);
return make_uniq<PipeFile>(std::move(handle));
}

} // namespace duckdb
20 changes: 20 additions & 0 deletions src/duckdb/src/common/types/vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ void Vector::Slice(const SelectionVector &sel, idx_t count) {
if (GetVectorType() == VectorType::DICTIONARY_VECTOR) {
// already a dictionary, slice the current dictionary
auto &current_sel = DictionaryVector::SelVector(*this);
auto dictionary_size = DictionaryVector::DictionarySize(*this);
auto sliced_dictionary = current_sel.Slice(sel, count);
buffer = make_buffer<DictionaryBuffer>(std::move(sliced_dictionary));
if (GetType().InternalType() == PhysicalType::STRUCT) {
Expand All @@ -240,6 +241,9 @@ void Vector::Slice(const SelectionVector &sel, idx_t count) {
new_child.auxiliary = make_buffer<VectorStructBuffer>(new_child, sel, count);
auxiliary = make_buffer<VectorChildBuffer>(std::move(new_child));
}
if (dictionary_size.IsValid()) {
this->buffer->Cast<DictionaryBuffer>().SetDictionarySize(dictionary_size.GetIndex());
}
return;
}

Expand All @@ -260,11 +264,24 @@ void Vector::Slice(const SelectionVector &sel, idx_t count) {
auxiliary = std::move(child_ref);
}

void Vector::Dictionary(idx_t dictionary_size, const SelectionVector &sel, idx_t count) {
Slice(sel, count);
if (GetVectorType() == VectorType::DICTIONARY_VECTOR) {
buffer->Cast<DictionaryBuffer>().SetDictionarySize(dictionary_size);
}
}

void Vector::Dictionary(const Vector &dict, idx_t dictionary_size, const SelectionVector &sel, idx_t count) {
Reference(dict);
Dictionary(dictionary_size, sel, count);
}

void Vector::Slice(const SelectionVector &sel, idx_t count, SelCache &cache) {
if (GetVectorType() == VectorType::DICTIONARY_VECTOR && GetType().InternalType() != PhysicalType::STRUCT) {
// dictionary vector: need to merge dictionaries
// check if we have a cached entry
auto &current_sel = DictionaryVector::SelVector(*this);
auto dictionary_size = DictionaryVector::DictionarySize(*this);
auto target_data = current_sel.data();
auto entry = cache.cache.find(target_data);
if (entry != cache.cache.end()) {
Expand All @@ -275,6 +292,9 @@ void Vector::Slice(const SelectionVector &sel, idx_t count, SelCache &cache) {
Slice(sel, count);
cache.cache[target_data] = this->buffer;
}
if (dictionary_size.IsValid()) {
this->buffer->Cast<DictionaryBuffer>().SetDictionarySize(dictionary_size.GetIndex());
}
} else {
Slice(sel, count);
}
Expand Down
185 changes: 170 additions & 15 deletions src/duckdb/src/execution/aggregate_hashtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,29 +235,164 @@ idx_t GroupedAggregateHashTable::AddChunk(DataChunk &groups, DataChunk &payload,
return AddChunk(groups, payload, aggregate_filter);
}

idx_t GroupedAggregateHashTable::AddChunk(DataChunk &groups, DataChunk &payload, const unsafe_vector<idx_t> &filter) {
Vector hashes(LogicalType::HASH);
groups.Hash(hashes);
GroupedAggregateHashTable::AggregateDictionaryState::AggregateDictionaryState()
: hashes(LogicalType::HASH), new_dictionary_pointers(LogicalType::POINTER), unique_entries(STANDARD_VECTOR_SIZE) {
}

optional_idx GroupedAggregateHashTable::TryAddDictionaryGroups(DataChunk &groups, DataChunk &payload,
const unsafe_vector<idx_t> &filter) {
static constexpr idx_t DICTIONARY_THRESHOLD = 2;
// dictionary vector - check if this is a duplicate eliminated dictionary from the storage
auto &dict_col = groups.data[0];
auto opt_dict_size = DictionaryVector::DictionarySize(dict_col);
if (!opt_dict_size.IsValid()) {
// dict size not known - this is not a dictionary that comes from the storage
return optional_idx();
}
idx_t dict_size = opt_dict_size.GetIndex();
if (dict_size >= groups.size() * DICTIONARY_THRESHOLD) {
// dictionary is too large - use regular aggregation
return optional_idx();
}
auto &dictionary_vector = DictionaryVector::Child(dict_col);
auto &offsets = DictionaryVector::SelVector(dict_col);
auto &dict_state = state.dict_state;
// initialize the index state
if (dict_size > dict_state.capacity) {
dict_state.dictionary_addresses = make_uniq<Vector>(LogicalType::POINTER, dict_size);
dict_state.found_entry = make_unsafe_uniq_array<bool>(dict_size);
dict_state.capacity = dict_size;
}
memset(dict_state.found_entry.get(), 0, dict_size * sizeof(bool));
dict_state.dictionary = dictionary_vector;

auto &found_entry = dict_state.found_entry;
auto &unique_entries = dict_state.unique_entries;
idx_t unique_count = 0;
// for each of the dictionary entries - check if we have already done a look-up into the hash table
// if we have, we can just use the cached group pointers
for (idx_t i = 0; i < groups.size(); i++) {
auto dict_idx = offsets.get_index(i);
unique_entries.set_index(unique_count, dict_idx);
unique_count += !found_entry[dict_idx];
found_entry[dict_idx] = true;
}
auto &new_dictionary_pointers = dict_state.new_dictionary_pointers;
idx_t new_group_count = 0;
if (unique_count > 0) {
auto &unique_values = dict_state.unique_values;
if (unique_values.ColumnCount() == 0) {
unique_values.InitializeEmpty(groups.GetTypes());
}
// slice the dictionary
unique_values.data[0].Slice(dictionary_vector, unique_entries, unique_count);
unique_values.SetCardinality(unique_count);
// now we know which entries we are going to add - hash them
auto &hashes = dict_state.hashes;
unique_values.Hash(hashes);

// add the dictionary groups to the hash table
new_group_count = FindOrCreateGroups(unique_values, hashes, new_dictionary_pointers, state.new_groups);
}
auto &aggregates = layout.GetAggregates();
if (aggregates.empty()) {
// early-out - no aggregates to update
return new_group_count;
}

return AddChunk(groups, hashes, payload, filter);
// set the addresses that we found for each of the unique groups in the main addresses vector
auto new_dict_addresses = FlatVector::GetData<uintptr_t>(new_dictionary_pointers);
// for each of the new groups, add them to the global (cached) list of addresses for the dictionary
auto &dictionary_addresses = *dict_state.dictionary_addresses;
auto dict_addresses = FlatVector::GetData<uintptr_t>(dictionary_addresses);
for (idx_t i = 0; i < unique_count; i++) {
auto dict_idx = unique_entries.get_index(i);
dict_addresses[dict_idx] = new_dict_addresses[i] + layout.GetAggrOffset();
}
// now set up the addresses for the aggregates
auto result_addresses = FlatVector::GetData<uintptr_t>(state.addresses);
for (idx_t i = 0; i < groups.size(); i++) {
auto dict_idx = offsets.get_index(i);
result_addresses[i] = dict_addresses[dict_idx];
}

// finally process the aggregates
UpdateAggregates(payload, filter);

return new_group_count;
}

idx_t GroupedAggregateHashTable::AddChunk(DataChunk &groups, Vector &group_hashes, DataChunk &payload,
const unsafe_vector<idx_t> &filter) {
if (groups.size() == 0) {
return 0;
optional_idx GroupedAggregateHashTable::TryAddConstantGroups(DataChunk &groups, DataChunk &payload,
const unsafe_vector<idx_t> &filter) {
#ifndef DEBUG
if (groups.size() <= 1) {
// this only has a point if we have multiple groups
return optional_idx();
}
#endif
auto &dict_state = state.dict_state;
auto &unique_values = dict_state.unique_values;
if (unique_values.ColumnCount() == 0) {
unique_values.InitializeEmpty(groups.GetTypes());
}
// slice the dictionary
unique_values.Reference(groups);
unique_values.SetCardinality(1);
unique_values.Flatten();

#ifdef DEBUG
D_ASSERT(groups.ColumnCount() + 1 == layout.ColumnCount());
for (idx_t i = 0; i < groups.ColumnCount(); i++) {
D_ASSERT(groups.GetTypes()[i] == layout.GetTypes()[i]);
auto &hashes = dict_state.hashes;
unique_values.Hash(hashes);

// add the single constant group to the hash table
auto &new_dictionary_pointers = dict_state.new_dictionary_pointers;
auto new_group_count = FindOrCreateGroups(unique_values, hashes, new_dictionary_pointers, state.new_groups);

auto &aggregates = layout.GetAggregates();
if (aggregates.empty()) {
// early-out - no aggregates to update
return new_group_count;
}
#endif

const auto new_group_count = FindOrCreateGroups(groups, group_hashes, state.addresses, state.new_groups);
VectorOperations::AddInPlace(state.addresses, NumericCast<int64_t>(layout.GetAggrOffset()), payload.size());
auto new_dict_addresses = FlatVector::GetData<uintptr_t>(new_dictionary_pointers);
auto result_addresses = FlatVector::GetData<uintptr_t>(state.addresses);
uintptr_t aggregate_address = new_dict_addresses[0] + layout.GetAggrOffset();
for (idx_t i = 0; i < payload.size(); i++) {
result_addresses[i] = aggregate_address;
}

// process the aggregates
// FIXME: we can use simple_update here if the aggregates support it
UpdateAggregates(payload, filter);

return new_group_count;
}

optional_idx GroupedAggregateHashTable::TryAddCompressedGroups(DataChunk &groups, DataChunk &payload,
const unsafe_vector<idx_t> &filter) {
// all groups must be compressed
if (groups.AllConstant()) {
return TryAddConstantGroups(groups, payload, filter);
}
if (groups.ColumnCount() == 1 && groups.data[0].GetVectorType() == VectorType::DICTIONARY_VECTOR) {
return TryAddDictionaryGroups(groups, payload, filter);
}
return optional_idx();
}

idx_t GroupedAggregateHashTable::AddChunk(DataChunk &groups, DataChunk &payload, const unsafe_vector<idx_t> &filter) {
// check if we can use an optimized path that utilizes compressed vectors
auto result = TryAddCompressedGroups(groups, payload, filter);
if (result.IsValid()) {
return result.GetIndex();
}
// otherwise append the raw values
Vector hashes(LogicalType::HASH);
groups.Hash(hashes);

return AddChunk(groups, hashes, payload, filter);
}

void GroupedAggregateHashTable::UpdateAggregates(DataChunk &payload, const unsafe_vector<idx_t> &filter) {
// Now every cell has an entry, update the aggregates
auto &aggregates = layout.GetAggregates();
idx_t filter_idx = 0;
Expand Down Expand Up @@ -287,6 +422,26 @@ idx_t GroupedAggregateHashTable::AddChunk(DataChunk &groups, Vector &group_hashe
}

Verify();
}

idx_t GroupedAggregateHashTable::AddChunk(DataChunk &groups, Vector &group_hashes, DataChunk &payload,
const unsafe_vector<idx_t> &filter) {
if (groups.size() == 0) {
return 0;
}

#ifdef DEBUG
D_ASSERT(groups.ColumnCount() + 1 == layout.ColumnCount());
for (idx_t i = 0; i < groups.ColumnCount(); i++) {
D_ASSERT(groups.GetTypes()[i] == layout.GetTypes()[i]);
}
#endif

const auto new_group_count = FindOrCreateGroups(groups, group_hashes, state.addresses, state.new_groups);
VectorOperations::AddInPlace(state.addresses, NumericCast<int64_t>(layout.GetAggrOffset()), payload.size());

UpdateAggregates(payload, filter);

return new_group_count;
}

Expand Down
Loading

0 comments on commit ed948a5

Please sign in to comment.