Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 106 additions & 19 deletions cpp/include/cudf/detail/utilities/vector_factories.hpp
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original file line number Diff line number Diff line change
Expand Up @@ -267,16 +267,19 @@ rmm::device_uvector<typename Container::value_type> make_device_uvector(
* @note This function does not synchronize `stream` after the copy.
*
* @tparam T The type of the data to copy
* @param source_data The device data to copy
* @param source_data The device_span of data to copy
* @param stream The stream on which to perform the copy
* @return The data copied to the host
*/
template <typename T>
std::vector<T> make_std_vector_async(device_span<T const> v, rmm::cuda_stream_view stream)
std::vector<T> make_std_vector_async(device_span<T const> source_data, rmm::cuda_stream_view stream)
{
std::vector<T> result(v.size());
CUDF_CUDA_TRY(cudaMemcpyAsync(
result.data(), v.data(), v.size() * sizeof(T), cudaMemcpyDefault, stream.value()));
std::vector<T> result(source_data.size());
CUDF_CUDA_TRY(cudaMemcpyAsync(result.data(),
source_data.data(),
source_data.size() * sizeof(T),
cudaMemcpyDefault,
stream.value()));
return result;
}

Expand Down Expand Up @@ -307,14 +310,14 @@ std::vector<typename Container::value_type> make_std_vector_async(Container cons
* @note This function does a synchronize on `stream` after the copy.
*
* @tparam T The type of the data to copy
* @param source_data The device data to copy
* @param source_data The device_span of data to copy
* @param stream The stream on which to perform the copy
* @return The data copied to the host
*/
template <typename T>
std::vector<T> make_std_vector(device_span<T const> v, rmm::cuda_stream_view stream)
std::vector<T> make_std_vector(device_span<T const> source_data, rmm::cuda_stream_view stream)
{
auto result = make_std_vector_async(v, stream);
auto result = make_std_vector_async(source_data, stream);
stream.synchronize();
return result;
}
Expand Down Expand Up @@ -374,22 +377,23 @@ host_vector<T> make_empty_host_vector(size_t capacity, rmm::cuda_stream_view str
}

/**
* @brief Asynchronously construct a `thrust::host_vector` containing a copy of data from a
* @brief Asynchronously construct a `cudf::detail::host_vector` containing a copy of data from a
* `device_span`
*
* @note This function does not synchronize `stream` after the copy. The returned vector may be
* using a pinned memory resource.
*
* @tparam T The type of the data to copy
* @param source_data The device data to copy
* @param source_data The device_span of data to copy
* @param stream The stream on which to perform the copy
* @return The data copied to the host
*/
template <typename T>
host_vector<T> make_host_vector_async(device_span<T const> v, rmm::cuda_stream_view stream)
host_vector<T> make_host_vector_async(device_span<T const> source_data,
rmm::cuda_stream_view stream)
{
auto result = make_host_vector<T>(v.size(), stream);
cuda_memcpy_async<T>(result, v, stream);
auto result = make_host_vector<T>(source_data.size(), stream);
cuda_memcpy_async<T>(result, source_data, stream);
return result;
}

Expand All @@ -415,28 +419,28 @@ host_vector<typename Container::value_type> make_host_vector_async(Container con
}

/**
* @brief Synchronously construct a `thrust::host_vector` containing a copy of data from a
* @brief Synchronously construct a `cudf::detail::host_vector` containing a copy of data from a
* `device_span`
*
* @note This function does a synchronize on `stream` after the copy. The returned vector may be
* using a pinned memory resource.
*
* @tparam T The type of the data to copy
* @param source_data The device data to copy
* @param source_data The device_span of data to copy
* @param stream The stream on which to perform the copy
* @return The data copied to the host
*/
template <typename T>
host_vector<T> make_host_vector(device_span<T const> v, rmm::cuda_stream_view stream)
host_vector<T> make_host_vector(device_span<T const> source_data, rmm::cuda_stream_view stream)
{
auto result = make_host_vector_async(v, stream);
auto result = make_host_vector_async(source_data, stream);
stream.synchronize();
return result;
}

/**
* @brief Synchronously construct a `thrust::host_vector` containing a copy of data from a device
* container
* @brief Synchronously construct a `cudf::detail::host_vector` containing a copy of data from a
* device container
*
* @note This function synchronizes `stream` after the copy.
*
Expand Down Expand Up @@ -488,6 +492,89 @@ host_vector<T> make_pinned_vector(size_t size, rmm::cuda_stream_view stream)
return result;
}

/**
* @brief Asynchronously construct a pinned `cudf::detail::host_vector` containing a copy of data
* from a `device_span`
*
* @note This function does not synchronize `stream` after the copy. The returned vector uses
* a pinned memory resource.
*
* @tparam T The type of the data to copy
* @param source_data The device_span of data to copy
* @param stream The stream on which to perform the copy
* @return The data copied to the host
*/
template <typename T>
host_vector<T> make_pinned_vector_async(device_span<T const> source_data,
rmm::cuda_stream_view stream)
{
auto result = make_pinned_vector_async<T>(source_data.size(), stream);
cuda_memcpy_async<T>(result, source_data, stream);
return result;
}

/**
*
* @brief Asynchronously construct a pinned `cudf::detail::host_vector` containing a copy of data
* from a device container
*
* @note This function does not synchronize `stream` after the copy. The returned vector uses
* a pinned memory resource.
*
* @tparam Container The type of the container to copy from
* @tparam T The type of the data to copy
* @param c The input device container from which to copy
* @param stream The stream on which to perform the copy
* @return The data copied to the host
*/
template <typename Container>
host_vector<typename Container::value_type> make_pinned_vector_async(Container const& c,
rmm::cuda_stream_view stream)
requires(std::is_convertible_v<Container, device_span<typename Container::value_type const>>)
{
return make_pinned_vector_async(device_span<typename Container::value_type const>{c}, stream);
}

/**
* @brief Synchronously construct a pinned `cudf::detail::host_vector` containing a copy of data
* from a `device_span`
*
* @note This function does a synchronize on `stream` after the copy. The returned vector uses
* a pinned memory resource.
*
* @tparam T The type of the data to copy
* @param source_data The device_span of data to copy
* @param stream The stream on which to perform the copy
* @return The data copied to the host
*/
template <typename T>
host_vector<T> make_pinned_vector(device_span<T const> source_data, rmm::cuda_stream_view stream)
{
auto result = make_pinned_vector_async(source_data, stream);
stream.synchronize();
return result;
}

/**
* @brief Synchronously construct a pinned `cudf::detail::host_vector` containing a copy of data
* from a device container
*
* @note This function synchronizes `stream` after the copy. The returned vector uses
* a pinned memory resource.
*
* @tparam Container The type of the container to copy from
* @tparam T The type of the data to copy
* @param c The input device container from which to copy
* @param stream The stream on which to perform the copy
* @return The data copied to the host
*/
template <typename Container>
host_vector<typename Container::value_type> make_pinned_vector(Container const& c,
rmm::cuda_stream_view stream)
requires(std::is_convertible_v<Container, device_span<typename Container::value_type const>>)
{
return make_pinned_vector(device_span<typename Container::value_type const>{c}, stream);
}
} // namespace detail

} // namespace CUDF_EXPORT cudf
26 changes: 16 additions & 10 deletions cpp/include/cudf/reduction/detail/reduction.cuh
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This links back to the draft PR for reference, but covering the full change in reduction.cuh:

https://github.com/rapidsai/cudf/pull/18968/files#diff-d99740825ef0d2e73c3e8392d06ca11b229400d864913b4221f3f3626ad95f85

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/device_scalar.hpp>
#include <cudf/detail/utilities/cast_functor.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/type_dispatcher.hpp>

Expand Down Expand Up @@ -65,8 +66,10 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
{
auto const binary_op = cudf::detail::cast_functor<OutputType>(op.get_binary_op());
auto const initial_value = init.value_or(op.template get_identity<OutputType>());
auto pinned_initial = cudf::detail::make_pinned_vector<OutputType>(1, stream);
pinned_initial[0] = initial_value;
using ScalarType = cudf::scalar_type_t<OutputType>;
auto result = std::make_unique<ScalarType>(initial_value, true, stream, mr);
auto result = std::make_unique<ScalarType>(pinned_initial[0], true, stream, mr);

// Allocate temporary storage
rmm::device_buffer d_temp_storage;
Expand All @@ -77,7 +80,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
result->data(),
num_items,
binary_op,
initial_value,
pinned_initial[0],
stream.value());
d_temp_storage = rmm::device_buffer{temp_storage_bytes, stream};

Expand All @@ -88,7 +91,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
result->data(),
num_items,
binary_op,
initial_value,
pinned_initial[0],
stream.value());
return result;
}
Expand Down Expand Up @@ -123,7 +126,9 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
{
auto const binary_op = cudf::detail::cast_functor<OutputType>(op.get_binary_op());
auto const initial_value = init.value_or(op.template get_identity<OutputType>());
auto dev_result = cudf::detail::device_scalar<OutputType>{initial_value, stream};
auto pinned_initial = cudf::detail::make_pinned_vector_async<OutputType>(1, stream);
pinned_initial[0] = initial_value;
auto dev_result = cudf::detail::device_scalar<OutputType>{pinned_initial[0], stream};

// Allocate temporary storage
rmm::device_buffer d_temp_storage;
Expand All @@ -134,7 +139,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
dev_result.data(),
num_items,
binary_op,
initial_value,
pinned_initial[0],
stream.value());
d_temp_storage = rmm::device_buffer{temp_storage_bytes, stream};

Expand All @@ -145,7 +150,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
dev_result.data(),
num_items,
binary_op,
initial_value,
pinned_initial[0],
stream.value());

return std::make_unique<cudf::string_scalar>(dev_result, true, stream, mr);
Expand Down Expand Up @@ -185,8 +190,9 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
{
auto const binary_op = cudf::detail::cast_functor<IntermediateType>(op.get_binary_op());
auto const initial_value = op.template get_identity<IntermediateType>();

cudf::detail::device_scalar<IntermediateType> intermediate_result{initial_value, stream};
auto pinned_initial = cudf::detail::make_pinned_vector_async<IntermediateType>(1, stream);
pinned_initial[0] = initial_value;
cudf::detail::device_scalar<IntermediateType> intermediate_result{pinned_initial[0], stream};

// Allocate temporary storage
rmm::device_buffer d_temp_storage;
Expand All @@ -197,7 +203,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
intermediate_result.data(),
num_items,
binary_op,
initial_value,
pinned_initial[0],
stream.value());
d_temp_storage = rmm::device_buffer{temp_storage_bytes, stream};

Expand All @@ -208,7 +214,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
intermediate_result.data(),
num_items,
binary_op,
initial_value,
pinned_initial[0],
stream.value());

// compute the result value from intermediate value in device
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/io/parquet/reader_impl.cpp
Copy link
Contributor Author

@JigaoLuo JigaoLuo Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the type of offsets and buff_addrs to cudf::detail::host_vector for calling write_final_offsets function. This is the only place where the function is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So no need to change the write_final_offsets function in cpp/src/io/parquet/page_data.cu

Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,8 @@ void reader_impl::decode_page_data(read_mode mode,
// that it is difficult/impossible for a given page to know that it is writing the very
// last value that should then be followed by a terminator (because rows can span
// page boundaries).
std::vector<size_type*> out_buffers;
std::vector<size_type> final_offsets;
auto out_buffers = cudf::detail::make_host_vector<size_type*>(0, _stream);
auto final_offsets = cudf::detail::make_host_vector<size_type>(0, _stream);
Comment on lines +421 to +422
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: cudf::detail::host_vector should work well as thrust::host_vector when using the cudf/RMM memory allocator.

The reason I raise this is that most existing uses of host_vector in cudf treat it as a fixed-size array. In contrast, this particular case starts with zero-sized and relies on dynamic resizing

out_buffers.reserve(_input_columns.size());
final_offsets.reserve(_input_columns.size());
for (size_t idx = 0; idx < _input_columns.size(); idx++) {
Expand All @@ -437,14 +437,14 @@ void reader_impl::decode_page_data(read_mode mode,

// the final offset for a list at level N is the size of it's child
size_type const offset = child.type.id() == type_id::LIST ? child.size - 1 : child.size;
out_buffers.emplace_back(static_cast<size_type*>(out_buf.data()) + (out_buf.size - 1));
final_offsets.emplace_back(offset);
out_buffers.push_back(static_cast<size_type*>(out_buf.data()) + (out_buf.size - 1));
final_offsets.push_back(offset);
out_buf.user_data |= PARQUET_COLUMN_BUFFER_FLAG_LIST_TERMINATED;
} else if (out_buf.type.id() == type_id::STRING) {
// only if it is not a large strings column
if (std::cmp_less_equal(col_string_sizes[idx], strings::detail::get_offset64_threshold())) {
out_buffers.emplace_back(static_cast<size_type*>(out_buf.data()) + out_buf.size);
final_offsets.emplace_back(static_cast<size_type>(col_string_sizes[idx]));
out_buffers.push_back(static_cast<size_type*>(out_buf.data()) + out_buf.size);
final_offsets.push_back(static_cast<size_type>(col_string_sizes[idx]));
}
// Nested large strings column
else if (input_col.nesting_depth() > 0) {
Expand Down