Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compute and use the initial string offset when building nested large string cols with chunked parquet reader #17702

Open
wants to merge 35 commits into
base: branch-25.02
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1ce570c
Compute and use str_offset for large nested string cols.
mhaseeb123 Jan 9, 2025
e73432d
Clean up, add docstrings
mhaseeb123 Jan 10, 2025
a146cd6
Fix copyright year
mhaseeb123 Jan 10, 2025
9f4ede3
Fix comment
mhaseeb123 Jan 10, 2025
edaff09
Revert comment
mhaseeb123 Jan 10, 2025
2a279de
Merge branch 'branch-25.02' into fix/str_offset-nested-large-str-cols
mhaseeb123 Jan 10, 2025
0d2317a
Minor optimization. Sync stream only for `str_offsets` vector
mhaseeb123 Jan 10, 2025
9622620
Remove leftover cout
mhaseeb123 Jan 14, 2025
38652c0
Remove code duplication with utility function
mhaseeb123 Jan 14, 2025
a57ccab
fix copyright header
mhaseeb123 Jan 14, 2025
3cffe1c
Remove explicit inline and simplify branch
mhaseeb123 Jan 14, 2025
29c1754
Refactor offset computing to avoid ambiguous use of util function.
mhaseeb123 Jan 14, 2025
28835c3
Change initial_offset type to int64 and subtract from last_elem
mhaseeb123 Jan 15, 2025
46ba4ab
Reuse code with a util function
mhaseeb123 Jan 15, 2025
ebea0cd
Merge branch 'branch-25.02' into fix/str_offset-nested-large-str-cols
mhaseeb123 Jan 15, 2025
73ced83
Minor optimization. Make const ptr to const page_state
mhaseeb123 Jan 15, 2025
b0acb4c
Merge branch 'branch-25.02' into fix/str_offset-nested-large-str-cols
mhaseeb123 Jan 15, 2025
52cd41a
Revert subtraction of initial offset
mhaseeb123 Jan 15, 2025
dde3285
Merge branch 'branch-25.02' into fix/str_offset-nested-large-str-cols
mhaseeb123 Jan 15, 2025
694ec53
Merge conflicts
mhaseeb123 Jan 15, 2025
5012bc7
Revert changes
mhaseeb123 Jan 15, 2025
094a02b
Merge conflicts with branch25.02
mhaseeb123 Jan 15, 2025
fbe52d6
Trivial refactoring. Separate offset functions for small and large st…
mhaseeb123 Jan 16, 2025
399d8b0
Revert `unsetenv` in merge_tests
mhaseeb123 Jan 16, 2025
b7564d8
Revert copyrights year
mhaseeb123 Jan 16, 2025
c9ed1fc
Fix copyrights year
mhaseeb123 Jan 16, 2025
b5ff95f
Merge branch 'branch-25.02' into fix/str_offset-nested-large-str-cols
mhaseeb123 Jan 17, 2025
16b9843
Address review comments
mhaseeb123 Jan 22, 2025
e838e72
Merge branch 'fix/str_offset-nested-large-str-cols' of https://github…
mhaseeb123 Jan 22, 2025
1a34807
Remove unnecessary stream sync for `h_initial_str_offsets`
mhaseeb123 Jan 22, 2025
7990f7c
Merge branch 'branch-25.02' into fix/str_offset-nested-large-str-cols
mhaseeb123 Jan 22, 2025
d8231ca
Address review comments
mhaseeb123 Jan 24, 2025
c668fc6
Merge branch 'branch-25.02' into fix/str_offset-nested-large-str-cols
mhaseeb123 Jan 24, 2025
218782d
Fill host vector with std instead of thrust
mhaseeb123 Jan 24, 2025
3cfbe24
Fix the sanity check for offsets
mhaseeb123 Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions cpp/include/cudf/detail/sizes_to_offsets_iterator.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
* Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -255,12 +255,14 @@ static sizes_to_offsets_iterator<ScanIterator, LastType> make_sizes_to_offsets_i
* @param begin Input iterator for scan
* @param end End of the input iterator
* @param result Output iterator for scan result
* @param initial_offset Initial offset to add to scan
* @return The last element of the scan
*/
template <typename SizesIterator, typename OffsetsIterator>
auto sizes_to_offsets(SizesIterator begin,
SizesIterator end,
OffsetsIterator result,
int64_t initial_offset,
rmm::cuda_stream_view stream)
{
using SizeType = typename thrust::iterator_traits<SizesIterator>::value_type;
Expand All @@ -273,7 +275,8 @@ auto sizes_to_offsets(SizesIterator begin,
make_sizes_to_offsets_iterator(result, result + std::distance(begin, end), last_element.data());
// This function uses the type of the initialization parameter as the accumulator type
// when computing the individual scan output elements.
thrust::exclusive_scan(rmm::exec_policy(stream), begin, end, output_itr, LastType{0});
thrust::exclusive_scan(
rmm::exec_policy(stream), begin, end, output_itr, static_cast<LastType>(initial_offset));
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
return last_element.value(stream);
}

Expand Down Expand Up @@ -319,7 +322,8 @@ std::pair<std::unique_ptr<column>, size_type> make_offsets_child_column(
});
auto input_itr = cudf::detail::make_counting_transform_iterator(0, map_fn);
// Use the sizes-to-offsets iterator to compute the total number of elements
auto const total_elements = sizes_to_offsets(input_itr, input_itr + count + 1, d_offsets, stream);
auto const total_elements =
sizes_to_offsets(input_itr, input_itr + count + 1, d_offsets, 0, stream);
CUDF_EXPECTS(
total_elements <= static_cast<decltype(total_elements)>(std::numeric_limits<size_type>::max()),
"Size of output exceeds the column size limit",
Expand Down
7 changes: 4 additions & 3 deletions cpp/include/cudf/strings/detail/strings_children.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -140,7 +140,7 @@ std::pair<std::unique_ptr<column>, int64_t> make_offsets_child_column(
auto input_itr = cudf::detail::make_counting_transform_iterator(0, map_fn);
// Use the sizes-to-offsets iterator to compute the total number of elements
auto const total_bytes =
cudf::detail::sizes_to_offsets(input_itr, input_itr + strings_count + 1, d_offsets, stream);
cudf::detail::sizes_to_offsets(input_itr, input_itr + strings_count + 1, d_offsets, 0, stream);

auto const threshold = cudf::strings::get_offset64_threshold();
CUDF_EXPECTS(cudf::strings::is_large_strings_enabled() || (total_bytes < threshold),
Expand All @@ -151,7 +151,8 @@ std::pair<std::unique_ptr<column>, int64_t> make_offsets_child_column(
offsets_column = make_numeric_column(
data_type{type_id::INT64}, strings_count + 1, mask_state::UNALLOCATED, stream, mr);
auto d_offsets64 = offsets_column->mutable_view().template data<int64_t>();
cudf::detail::sizes_to_offsets(input_itr, input_itr + strings_count + 1, d_offsets64, stream);
cudf::detail::sizes_to_offsets(
input_itr, input_itr + strings_count + 1, d_offsets64, 0, stream);
}

return std::pair(std::move(offsets_column), total_bytes);
Expand Down
16 changes: 9 additions & 7 deletions cpp/src/io/parquet/decode_fixed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,7 @@ constexpr bool is_split_decode()
* @param chunks List of column chunks
* @param min_row Row index to start reading at
* @param num_rows Maximum number of rows to read
* @param initial_str_offsets A vector to store the initial str_offset for large strings
* @param error_code Error code to set if an error is encountered
*/
template <typename level_t, int decode_block_size_t, decode_kernel_mask kernel_mask_t>
Expand All @@ -950,6 +951,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
size_t* initial_str_offsets,
kernel_error::pointer error_code)
{
constexpr bool has_dict_t = has_dict<kernel_mask_t>();
Expand Down Expand Up @@ -1161,12 +1163,11 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8)
valid_count = next_valid_count;
}

// Now turn the array of lengths into offsets, but skip if this is a large string column. In the
// latter case, offsets will be computed during string column creation.
// Convert string sizes to offsets if this is not a large string column. Otherwise, update the
// initial string buffer offset to be used during large string column construction
if constexpr (has_strings_t) {
if (!s->col.is_large_string_col) {
convert_small_string_lengths_to_offsets<decode_block_size_t, has_lists_t>(s);
}
convert_string_lengths_to_offsets<decode_block_size_t>(
s, initial_str_offsets, pages[page_idx].chunk_idx, has_lists_t);
}
if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}
Expand All @@ -1185,6 +1186,7 @@ void __host__ DecodePageData(cudf::detail::hostdevice_span<PageInfo> pages,
size_t min_row,
int level_type_size,
decode_kernel_mask kernel_mask,
size_t* initial_str_offsets,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
Expand All @@ -1199,11 +1201,11 @@ void __host__ DecodePageData(cudf::detail::hostdevice_span<PageInfo> pages,
if (level_type_size == 1) {
gpuDecodePageDataGeneric<uint8_t, decode_block_size, mask>
<<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
pages.device_ptr(), chunks, min_row, num_rows, initial_str_offsets, error_code);
} else {
gpuDecodePageDataGeneric<uint16_t, decode_block_size, mask>
<<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
pages.device_ptr(), chunks, min_row, num_rows, initial_str_offsets, error_code);
}
};

Expand Down
46 changes: 17 additions & 29 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -435,6 +435,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
size_t* initial_str_offsets,
kernel_error::pointer error_code)
{
using cudf::detail::warp_size;
Expand Down Expand Up @@ -579,18 +580,10 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
__syncthreads();
}

// Now turn the array of lengths into offsets, but skip if this is a large string column. In the
// latter case, offsets will be computed during string column creation.
if (not s->col.is_large_string_col) {
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
}
// Convert string sizes to offsets if this is not a large string column. Otherwise, update the
// initial string buffer offset to be used during large string column construction
convert_string_lengths_to_offsets<decode_block_size>(
s, initial_str_offsets, pages[page_idx].chunk_idx, has_repetition);

if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}
Expand All @@ -603,6 +596,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
size_t* initial_str_offsets,
kernel_error::pointer error_code)
{
using cudf::detail::warp_size;
Expand Down Expand Up @@ -741,18 +735,10 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
__syncthreads();
}

// Now turn the array of lengths into offsets, but skip if this is a large string column. In the
// latter case, offsets will be computed during string column creation.
if (not s->col.is_large_string_col) {
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
}
// Convert string sizes to offsets if this is not a large string column. Otherwise, update the
// initial string buffer offset to be used during large string column construction
convert_string_lengths_to_offsets<decode_block_size>(
s, initial_str_offsets, pages[page_idx].chunk_idx, has_repetition);

// finally, copy the string data into place
auto const dst = nesting_info_base[leaf_level_index].string_out;
Expand Down Expand Up @@ -797,6 +783,7 @@ void DecodeDeltaByteArray(cudf::detail::hostdevice_span<PageInfo> pages,
size_t num_rows,
size_t min_row,
int level_type_size,
size_t* initial_str_offsets,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
Expand All @@ -807,10 +794,10 @@ void DecodeDeltaByteArray(cudf::detail::hostdevice_span<PageInfo> pages,

if (level_type_size == 1) {
gpuDecodeDeltaByteArray<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
pages.device_ptr(), chunks, min_row, num_rows, initial_str_offsets, error_code);
} else {
gpuDecodeDeltaByteArray<uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
pages.device_ptr(), chunks, min_row, num_rows, initial_str_offsets, error_code);
}
}

Expand All @@ -822,6 +809,7 @@ void DecodeDeltaLengthByteArray(cudf::detail::hostdevice_span<PageInfo> pages,
size_t num_rows,
size_t min_row,
int level_type_size,
size_t* initial_str_offsets,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
Expand All @@ -832,10 +820,10 @@ void DecodeDeltaLengthByteArray(cudf::detail::hostdevice_span<PageInfo> pages,

if (level_type_size == 1) {
gpuDecodeDeltaLengthByteArray<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
pages.device_ptr(), chunks, min_row, num_rows, initial_str_offsets, error_code);
} else {
gpuDecodeDeltaLengthByteArray<uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
pages.device_ptr(), chunks, min_row, num_rows, initial_str_offsets, error_code);
}
}

Expand Down
37 changes: 29 additions & 8 deletions cpp/src/io/parquet/page_string_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include <cudf/strings/detail/gather.cuh>

#include <cuda/atomic>

namespace cudf::io::parquet::detail {

// stole this from cudf/strings/detail/gather.cuh. modified to run on a single string on one warp.
Expand Down Expand Up @@ -98,21 +100,40 @@ __device__ inline void block_excl_sum(size_type* arr, size_type length, size_typ
}
}

template <int block_size, bool has_lists>
__device__ inline void convert_small_string_lengths_to_offsets(page_state_s* s)
/**
* @brief Converts string sizes to offsets if this is not a large string column. Otherwise,
* atomically update the initial string offset to be used during large string column construction
*/
template <int block_size>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has_lists can't be a tparam anymore as it is not known at compile time when called from page_delta_decode.cu. Also, we are only using it minimally at L120

__device__ void convert_string_lengths_to_offsets(page_state_s const* const state,
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
size_t* initial_str_offsets,
int32_t chunk_idx,
bool has_lists)
{
// If this is a large string column. In the
// latter case, offsets will be computed during string column creation.
auto& ni = s->nesting_info[s->col.max_nesting_depth - 1];
auto& ni = state->nesting_info[state->col.max_nesting_depth - 1];
int value_count = ni.value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if constexpr (!has_lists) { value_count -= s->first_row; }

auto const offptr = reinterpret_cast<size_type*>(ni.data_out);
auto const initial_value = s->page.str_offset;
block_excl_sum<block_size>(offptr, value_count, initial_value);
if (not has_lists) { value_count -= state->first_row; }

auto const initial_value = state->page.str_offset;

if (value_count > 0) {
// Convert the array of lengths into offsets if this not a large string column.
if (not state->col.is_large_string_col) {
auto const offptr = reinterpret_cast<size_type*>(ni.data_out);
block_excl_sum<block_size>(offptr, value_count, initial_value);
} // Atomically update the initial string offset if this is a large string column. This initial
// offset will be used to compute (64-bit) offsets during large string column construction.
else if (!threadIdx.x) {
cuda::atomic_ref<size_t, cuda::std::thread_scope_device> initial_str_offsets_ref{
initial_str_offsets[chunk_idx]};
initial_str_offsets_ref.fetch_min(initial_value, cuda::std::memory_order_relaxed);
}
}
}

template <int block_size>
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ void DecodeDeltaByteArray(cudf::detail::hostdevice_span<PageInfo> pages,
size_t num_rows,
size_t min_row,
int level_type_size,
size_t* initial_str_offsets,
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
kernel_error::pointer error_code,
rmm::cuda_stream_view stream);

Expand All @@ -884,6 +885,8 @@ void DecodeDeltaByteArray(cudf::detail::hostdevice_span<PageInfo> pages,
* @param[in] num_rows Total number of rows to read
* @param[in] min_row Minimum number of rows to read
* @param[in] level_type_size Size in bytes of the type for level decoding
* @param[out] initial_str_offsets A vector to store the initial str_offset for large nested
* string cols
* @param[out] error_code Error code for kernel failures
* @param[in] stream CUDA stream to use
*/
Expand All @@ -892,6 +895,7 @@ void DecodeDeltaLengthByteArray(cudf::detail::hostdevice_span<PageInfo> pages,
size_t num_rows,
size_t min_row,
int level_type_size,
size_t* initial_str_offsets,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream);

Expand All @@ -907,6 +911,7 @@ void DecodeDeltaLengthByteArray(cudf::detail::hostdevice_span<PageInfo> pages,
* @param[in] min_row Minimum number of rows to read
* @param[in] level_type_size Size in bytes of the type for level decoding
* @param[in] kernel_mask Mask indicating the type of decoding kernel to launch.
* @param[out] initial_str_offsets A vector to store the initial str_offset for large strings
* @param[out] error_code Error code for kernel failures
* @param[in] stream CUDA stream to use
*/
Expand All @@ -916,6 +921,7 @@ void DecodePageData(cudf::detail::hostdevice_span<PageInfo> pages,
size_t min_row,
int level_type_size,
decode_kernel_mask kernel_mask,
size_t* initial_str_offsets,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream);

Expand Down
Loading
Loading