Skip to content

Commit

Permalink
Some more PR cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
ypatia committed Dec 19, 2024
1 parent 0a97612 commit 446700a
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 23 deletions.
11 changes: 6 additions & 5 deletions test/src/unit-sparse-unordered-with-dups-reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1065,11 +1065,12 @@ TEST_CASE_METHOD(
if (one_frag) {
CHECK(1 == loop_num->second);
}

// FIXME: This check has become unpredictable, see why the loop number is
// not consistent } else {
// CHECK(20 == loop_num->second);
// }
/**
* FIXME: The loop_num appears to be different on different
* architectures/build modes. SC-61065 to investigate why. } else { CHECK(20
* == loop_num->second);
* }
*/

// Try to read multiple frags without partial tile offset reading. Should
// fail
Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/query/readers/filtered_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ class FilteredData {
auto timer_se = stats_->start_timer("read");
return resources_.vfs().read(uri, offset, data, size, false);
});
// This should be changes once we use taskgraphs for modeling the data flow
// This should be changed once we use taskgraphs for modeling the data flow
block.set_io_task(task);
}

Expand Down
6 changes: 6 additions & 0 deletions tiledb/sm/query/readers/reader_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,9 @@ Status ReaderBase::unfilter_tiles(
return Status::Ok();
}

// The current threadpool design does not allow for unfiltering to
// happen in chunks using a parallel for within this async task as the
// wait_all in the end of the parallel for can deadlock.
for (uint64_t range_thread_idx = 0;
range_thread_idx < num_range_threads;
range_thread_idx++) {
Expand Down Expand Up @@ -998,6 +1001,9 @@ Status ReaderBase::unfilter_tiles(
continue;
}

// Unfiltering tasks have been launched, set the tasks to wait for in the
// corresponding tiles. When those tasks(futures) will be ready the tile
// processing that depends on the unfiltered tile will get unblocked.
auto tile_tuple = result_tile->tile_tuple(name);
tile_tuple->fixed_tile().set_unfilter_data_compute_task(task);

Expand Down
25 changes: 14 additions & 11 deletions tiledb/sm/query/readers/result_tile.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,20 @@ class ResultTile {
}

~TileData() {
// TODO: destructor should not throw, catch any exceptions
if (fixed_filtered_data_task_.valid()) {
auto st = fixed_filtered_data_task_.wait();
}

if (var_filtered_data_task_.valid()) {
auto st = var_filtered_data_task_.wait();
}

if (validity_filtered_data_task_.valid()) {
auto st = validity_filtered_data_task_.wait();
try {
if (fixed_filtered_data_task_.valid()) {
auto st = fixed_filtered_data_task_.wait();
}

if (var_filtered_data_task_.valid()) {
auto st = var_filtered_data_task_.wait();
}

if (validity_filtered_data_task_.valid()) {
auto st = validity_filtered_data_task_.wait();
}
} catch (...) {
return;
}
}

Expand Down
1 change: 0 additions & 1 deletion tiledb/sm/tile/tile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

#include "tiledb/sm/tile/tile.h"

#include <utility>
#include "tiledb/common/exception/exception.h"
#include "tiledb/common/heap_memory.h"
#include "tiledb/common/memory_tracker.h"
Expand Down
15 changes: 10 additions & 5 deletions tiledb/sm/tile/tile.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ class TileBase {
return static_cast<T*>(data());
}

/** Converts the data pointer to a specific type with no check on compute
/**
* Converts the data pointer to a specific type with no check on compute
* task. This is used for getting thte data from inside the compute thread
* itself for unfiltering. */
* itself for unfiltering.
*/
template <class T>
inline T* data_as_unsafe() const {
return static_cast<T*>(data_unsafe());
Expand All @@ -134,8 +136,10 @@ class TileBase {
return data_.get();
}

/** Returns the internal buffer. This is used for getting thte data from
* inside the compute thread itself for unfiltering. */
/**
* Returns the internal buffer. This is used for getting thte data from
* inside the compute thread itself for unfiltering.
*/
inline void* data_unsafe() const {
return data_.get();
}
Expand Down Expand Up @@ -198,7 +202,8 @@ class TileBase {
/** The tile data type. */
Datatype type_;

/** Whether to block waiting for io data to be ready before accessing data()
/**
* Whether to block waiting for io data to be ready before accessing data()
*/
const bool skip_waiting_on_io_task_;

Expand Down

0 comments on commit 446700a

Please sign in to comment.