Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resource manager template arguments to class Consolidator and derivatives. #4649

Closed
wants to merge 14 commits into from
4 changes: 3 additions & 1 deletion tiledb/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
*/
#include "common-std.h"

namespace tiledb::common {}
namespace tiledb::common {
using context_bypass_RM = void;
}
namespace tdb = tiledb::common;

/*
Expand Down
47 changes: 31 additions & 16 deletions tiledb/sm/consolidator/array_meta_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ namespace tiledb::sm {
/* CONSTRUCTOR */
/* ****************************** */

ArrayMetaConsolidator::ArrayMetaConsolidator(
template <class RM>
ArrayMetaConsolidator<RM>::ArrayMetaConsolidator(
const Config& config, StorageManager* storage_manager)
: Consolidator(storage_manager) {
: Consolidator<RM>(storage_manager) {
auto st = set_config(config);
if (!st.ok()) {
throw std::logic_error(st.message());
Expand All @@ -59,28 +60,28 @@ ArrayMetaConsolidator::ArrayMetaConsolidator(
/* API */
/* ****************************** */

Status ArrayMetaConsolidator::consolidate(
template <class RM>
Status ArrayMetaConsolidator<RM>::consolidate(
const char* array_name,
EncryptionType encryption_type,
const void* encryption_key,
uint32_t key_length) {
auto timer_se = stats_->start_timer("consolidate_array_meta");
auto timer_se = this->stats_->start_timer("consolidate_array_meta");

check_array_uri(array_name);
this->check_array_uri(array_name);

// Open array for reading
auto array_uri = URI(array_name);
Array array_for_reads(array_uri, storage_manager_);
Array array_for_reads(array_uri, this->storage_manager_);
RETURN_NOT_OK(array_for_reads.open(
QueryType::READ,
config_.timestamp_start_,
config_.timestamp_end_,
encryption_type,
encryption_key,
key_length));

// Open array for writing
Array array_for_writes(array_uri, storage_manager_);
Array array_for_writes(array_uri, this->storage_manager_);
RETURN_NOT_OK_ELSE(
array_for_writes.open(
QueryType::WRITE, encryption_type, encryption_key, key_length),
Expand Down Expand Up @@ -134,24 +135,25 @@ Status ArrayMetaConsolidator::consolidate(

auto data = ss.str();
RETURN_NOT_OK(
storage_manager_->vfs()->write(vac_uri, data.c_str(), data.size()));
RETURN_NOT_OK(storage_manager_->vfs()->close_file(vac_uri));
this->storage_manager_->vfs()->write(vac_uri, data.c_str(), data.size()));
RETURN_NOT_OK(this->storage_manager_->vfs()->close_file(vac_uri));

return Status::Ok();
}

void ArrayMetaConsolidator::vacuum(const char* array_name) {
template <class RM>
void ArrayMetaConsolidator<RM>::vacuum(const char* array_name) {
if (array_name == nullptr) {
throw Status_StorageManagerError(
"Cannot vacuum array metadata; Array name cannot be null");
}

// Get the array metadata URIs and vacuum file URIs to be vacuum
auto vfs = storage_manager_->vfs();
auto compute_tp = storage_manager_->compute_tp();
auto vfs = this->storage_manager_->vfs();
auto compute_tp = this->storage_manager_->compute_tp();

auto array_dir = ArrayDirectory(
storage_manager_->resources(),
this->storage_manager_->resources(),
URI(array_name),
0,
std::numeric_limits<uint64_t>::max());
Expand All @@ -165,9 +167,10 @@ void ArrayMetaConsolidator::vacuum(const char* array_name) {
/* PRIVATE METHODS */
/* ****************************** */

Status ArrayMetaConsolidator::set_config(const Config& config) {
template <class RM>
Status ArrayMetaConsolidator<RM>::set_config(const Config& config) {
// Set the consolidation config for ease of use
Config merged_config = storage_manager_->config();
Config merged_config = this->storage_manager_->config();
merged_config.inherit(config);
bool found = false;
RETURN_NOT_OK(merged_config.get<uint64_t>(
Expand All @@ -180,4 +183,16 @@ Status ArrayMetaConsolidator::set_config(const Config& config) {
return Status::Ok();
}

/**
* Explicit template instantiation.
*
* @section Maturity
*
* This is a temporary explicit instantiation to avoid linking errors while
* setting up resource management template arguments. While this work is being
* done, this is the only type that will be used to instantiate `Consolidator`
* and it's deriving classes.
*/
template class ArrayMetaConsolidator<context_bypass_RM>;

} // namespace tiledb::sm
5 changes: 3 additions & 2 deletions tiledb/sm/consolidator/array_meta_consolidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ using namespace tiledb::common;
namespace tiledb::sm {

/** Handles array metadata consolidation. */
class ArrayMetaConsolidator : public Consolidator {
template <class RM>
class ArrayMetaConsolidator : public Consolidator<RM> {
public:
/* ********************************* */
/* CONSTRUCTORS & DESTRUCTORS */
Expand Down Expand Up @@ -107,7 +108,7 @@ class ArrayMetaConsolidator : public Consolidator {
/* ********************************* */

/** Consolidation configuration parameters. */
Consolidator::ConsolidationConfigBase config_;
ConsolidationConfigBase config_;
};

} // namespace tiledb::sm
Expand Down
41 changes: 28 additions & 13 deletions tiledb/sm/consolidator/commits_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,40 +49,42 @@ namespace tiledb::sm {
/* CONSTRUCTOR */
/* ****************************** */

CommitsConsolidator::CommitsConsolidator(StorageManager* storage_manager)
: Consolidator(storage_manager) {
template <class RM>
CommitsConsolidator<RM>::CommitsConsolidator(StorageManager* storage_manager)
: Consolidator<RM>(storage_manager) {
}

/* ****************************** */
/* API */
/* ****************************** */

Status CommitsConsolidator::consolidate(
template <class RM>
Status CommitsConsolidator<RM>::consolidate(
const char* array_name,
EncryptionType encryption_type,
const void* encryption_key,
uint32_t key_length) {
auto timer_se = stats_->start_timer("consolidate_commits");
auto timer_se = this->stats_->start_timer("consolidate_commits");

check_array_uri(array_name);
this->check_array_uri(array_name);

// Open array for writing
auto array_uri = URI(array_name);
Array array_for_writes(array_uri, storage_manager_);
Array array_for_writes(array_uri, this->storage_manager_);
RETURN_NOT_OK(array_for_writes.open(
QueryType::WRITE, encryption_type, encryption_key, key_length));

// Ensure write version is at least 12.
auto write_version = array_for_writes.array_schema_latest().write_version();
RETURN_NOT_OK(array_for_writes.close());
if (write_version < 12) {
return logger_->status(Status_ConsolidatorError(
return this->logger_->status(Status_ConsolidatorError(
"Array version should be at least 12 to consolidate commits."));
}

// Get the array uri to consolidate from the array directory.
auto array_dir = ArrayDirectory(
storage_manager_->resources(),
this->storage_manager_->resources(),
URI(array_name),
0,
utils::time::timestamp_now_ms(),
Expand All @@ -95,32 +97,45 @@ Status CommitsConsolidator::consolidate(

// Get the file name.
auto& to_consolidate = array_dir.commit_uris_to_consolidate();
storage_manager_->write_consolidated_commits_file(
this->storage_manager_->write_consolidated_commits_file(
write_version, array_dir, to_consolidate);

return Status::Ok();
}

void CommitsConsolidator::vacuum(const char* array_name) {
template <class RM>
void CommitsConsolidator<RM>::vacuum(const char* array_name) {
if (array_name == nullptr) {
throw Status_StorageManagerError(
"Cannot vacuum array metadata; Array name cannot be null");
}

// Get the array metadata URIs and vacuum file URIs to be vacuum
ArrayDirectory array_dir(
storage_manager_->resources(),
this->storage_manager_->resources(),
URI(array_name),
0,
utils::time::timestamp_now_ms(),
ArrayDirectoryMode::COMMITS);

// Delete the commits and vacuum files
auto vfs = storage_manager_->vfs();
auto compute_tp = storage_manager_->compute_tp();
auto vfs = this->storage_manager_->vfs();
auto compute_tp = this->storage_manager_->compute_tp();
vfs->remove_files(compute_tp, array_dir.commit_uris_to_vacuum());
vfs->remove_files(
compute_tp, array_dir.consolidated_commits_uris_to_vacuum());
}

/**
* Explicit template instantiation.
*
* @section Maturity
*
* This is a temporary explicit instantiation to avoid linking errors while
* setting up resource management template arguments. While this work is being
* done, this is the only type that will be used to instantiate `Consolidator`
* and it's deriving classes.
*/
template class CommitsConsolidator<context_bypass_RM>;

} // namespace tiledb::sm
3 changes: 2 additions & 1 deletion tiledb/sm/consolidator/commits_consolidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ using namespace tiledb::common;
namespace tiledb::sm {

/** Handles commits consolidation. */
class CommitsConsolidator : public Consolidator {
template <class RM>
class CommitsConsolidator : public Consolidator<RM> {
public:
/* ********************************* */
/* CONSTRUCTORS & DESTRUCTORS */
Expand Down
34 changes: 22 additions & 12 deletions tiledb/sm/consolidator/consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,32 @@ namespace tiledb::sm {
/* ********************************* */

/** Factory function to create the consolidator depending on mode. */
shared_ptr<Consolidator> Consolidator::create(
template <class RM>
shared_ptr<Consolidator<RM>> Consolidator<RM>::create(
const ConsolidationMode mode,
const Config& config,
StorageManager* storage_manager) {
switch (mode) {
case ConsolidationMode::FRAGMENT_META:
return make_shared<FragmentMetaConsolidator>(HERE(), storage_manager);
return make_shared<FragmentMetaConsolidator<RM>>(HERE(), storage_manager);
case ConsolidationMode::FRAGMENT:
return make_shared<FragmentConsolidator>(HERE(), config, storage_manager);
return make_shared<FragmentConsolidator<RM>>(
HERE(), config, storage_manager);
case ConsolidationMode::ARRAY_META:
return make_shared<ArrayMetaConsolidator>(
return make_shared<ArrayMetaConsolidator<RM>>(
HERE(), config, storage_manager);
case ConsolidationMode::COMMITS:
return make_shared<CommitsConsolidator>(HERE(), storage_manager);
return make_shared<CommitsConsolidator<RM>>(HERE(), storage_manager);
case ConsolidationMode::GROUP_META:
return make_shared<GroupMetaConsolidator>(
return make_shared<GroupMetaConsolidator<RM>>(
HERE(), config, storage_manager);
default:
return nullptr;
}
}

ConsolidationMode Consolidator::mode_from_config(
template <class RM>
ConsolidationMode Consolidator<RM>::mode_from_config(
const Config& config, const bool vacuum_mode) {
bool found = false;
const std::string mode = vacuum_mode ?
Expand Down Expand Up @@ -99,19 +102,22 @@ ConsolidationMode Consolidator::mode_from_config(
/* CONSTRUCTORS & DESTRUCTORS */
/* ****************************** */

Consolidator::Consolidator(StorageManager* storage_manager)
template <class RM>
Consolidator<RM>::Consolidator(StorageManager* storage_manager)
: storage_manager_(storage_manager)
, stats_(storage_manager_->stats()->create_child("Consolidator"))
, logger_(storage_manager_->logger()->clone("Consolidator", ++logger_id_)) {
}

Consolidator::~Consolidator() = default;
template <class RM>
Consolidator<RM>::~Consolidator() = default;

/* ****************************** */
/* API */
/* ****************************** */

Status Consolidator::consolidate(
template <class RM>
Status Consolidator<RM>::consolidate(
[[maybe_unused]] const char* array_name,
[[maybe_unused]] EncryptionType encryption_type,
[[maybe_unused]] const void* encryption_key,
Expand All @@ -120,14 +126,18 @@ Status Consolidator::consolidate(
Status_ConsolidatorError("Cannot consolidate; Invalid object"));
}

void Consolidator::vacuum([[maybe_unused]] const char* array_name) {
template <class RM>
void Consolidator<RM>::vacuum([[maybe_unused]] const char* array_name) {
throw Status_ConsolidatorError("Cannot vacuum; Invalid object");
}

void Consolidator::check_array_uri(const char* array_name) {
template <class RM>
void Consolidator<RM>::check_array_uri(const char* array_name) {
if (URI(array_name).is_tiledb()) {
throw std::logic_error("Consolidation is not supported for remote arrays.");
}
}

template class Consolidator<context_bypass_RM>;

} // namespace tiledb::sm
22 changes: 13 additions & 9 deletions tiledb/sm/consolidator/consolidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,21 @@ enum class ConsolidationMode {
GROUP_META // Group metadata mode.
};

/** Consolidation configuration parameters. */
struct ConsolidationConfigBase {
/** Start time for consolidation. */
uint64_t timestamp_start_;
/** End time for consolidation. */
uint64_t timestamp_end_;
};

/** Handles array consolidation. */
template <class RM>
class Consolidator {
public:
/** The type of the resource manager used to construct this Consolidator. */
using resource_manager_type = RM;

/* ********************************* */
/* FACTORY METHODS */
/* ********************************* */
Expand Down Expand Up @@ -129,14 +141,6 @@ class Consolidator {
/* TYPE DEFINITIONS */
/* ********************************* */

/** Consolidation configuration parameters. */
struct ConsolidationConfigBase {
/** Start time for consolidation. */
uint64_t timestamp_start_;
/** End time for consolidation. */
uint64_t timestamp_end_;
};

protected:
/* ********************************* */
/* PROTECTED METHODS */
Expand Down Expand Up @@ -175,4 +179,4 @@ class Consolidator {

} // namespace tiledb::sm

#endif // TILEDB_FRAGMENT_H
#endif // TILEDB_CONSOLIDATOR_H
Loading
Loading