Skip to content
Draft
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 106 additions & 19 deletions cpp/include/cudf/detail/utilities/vector_factories.hpp
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original file line number Diff line number Diff line change
Expand Up @@ -267,16 +267,19 @@ rmm::device_uvector<typename Container::value_type> make_device_uvector(
* @note This function does not synchronize `stream` after the copy.
*
* @tparam T The type of the data to copy
* @param source_data The device data to copy
* @param source_data The device_span of data to copy
* @param stream The stream on which to perform the copy
* @return The data copied to the host
*/
template <typename T>
std::vector<T> make_std_vector_async(device_span<T const> v, rmm::cuda_stream_view stream)
std::vector<T> make_std_vector_async(device_span<T const> source_data, rmm::cuda_stream_view stream)
{
std::vector<T> result(v.size());
CUDF_CUDA_TRY(cudaMemcpyAsync(
result.data(), v.data(), v.size() * sizeof(T), cudaMemcpyDefault, stream.value()));
std::vector<T> result(source_data.size());
CUDF_CUDA_TRY(cudaMemcpyAsync(result.data(),
source_data.data(),
source_data.size() * sizeof(T),
cudaMemcpyDefault,
stream.value()));
return result;
}

Expand Down Expand Up @@ -307,14 +310,14 @@ std::vector<typename Container::value_type> make_std_vector_async(Container cons
* @note This function does a synchronize on `stream` after the copy.
*
* @tparam T The type of the data to copy
* @param source_data The device data to copy
* @param source_data The device_span of data to copy
* @param stream The stream on which to perform the copy
* @return The data copied to the host
*/
template <typename T>
std::vector<T> make_std_vector(device_span<T const> v, rmm::cuda_stream_view stream)
std::vector<T> make_std_vector(device_span<T const> source_data, rmm::cuda_stream_view stream)
{
auto result = make_std_vector_async(v, stream);
auto result = make_std_vector_async(source_data, stream);
stream.synchronize();
return result;
}
Expand Down Expand Up @@ -374,22 +377,23 @@ host_vector<T> make_empty_host_vector(size_t capacity, rmm::cuda_stream_view str
}

/**
* @brief Asynchronously construct a `thrust::host_vector` containing a copy of data from a
* @brief Asynchronously construct a `cudf::detail::host_vector` containing a copy of data from a
* `device_span`
*
* @note This function does not synchronize `stream` after the copy. The returned vector may be
* using a pinned memory resource.
*
* @tparam T The type of the data to copy
* @param source_data The device data to copy
* @param source_data The device_span of data to copy
* @param stream The stream on which to perform the copy
* @return The data copied to the host
*/
template <typename T>
host_vector<T> make_host_vector_async(device_span<T const> v, rmm::cuda_stream_view stream)
host_vector<T> make_host_vector_async(device_span<T const> source_data,
rmm::cuda_stream_view stream)
{
auto result = make_host_vector<T>(v.size(), stream);
cuda_memcpy_async<T>(result, v, stream);
auto result = make_host_vector<T>(source_data.size(), stream);
cuda_memcpy_async<T>(result, source_data, stream);
return result;
}

Expand All @@ -415,28 +419,28 @@ host_vector<typename Container::value_type> make_host_vector_async(Container con
}

/**
* @brief Synchronously construct a `thrust::host_vector` containing a copy of data from a
* @brief Synchronously construct a `cudf::detail::host_vector` containing a copy of data from a
* `device_span`
*
* @note This function does a synchronize on `stream` after the copy. The returned vector may be
* using a pinned memory resource.
*
* @tparam T The type of the data to copy
* @param source_data The device data to copy
* @param source_data The device_span of data to copy
* @param stream The stream on which to perform the copy
* @return The data copied to the host
*/
template <typename T>
host_vector<T> make_host_vector(device_span<T const> v, rmm::cuda_stream_view stream)
host_vector<T> make_host_vector(device_span<T const> source_data, rmm::cuda_stream_view stream)
{
auto result = make_host_vector_async(v, stream);
auto result = make_host_vector_async(source_data, stream);
stream.synchronize();
return result;
}

/**
* @brief Synchronously construct a `thrust::host_vector` containing a copy of data from a device
* container
* @brief Synchronously construct a `cudf::detail::host_vector` containing a copy of data from a
* device container
*
* @note This function synchronizes `stream` after the copy.
*
Expand Down Expand Up @@ -488,6 +492,89 @@ host_vector<T> make_pinned_vector(size_t size, rmm::cuda_stream_view stream)
return result;
}

/**
* @brief Asynchronously construct a pinned `cudf::detail::host_vector` containing a copy of data
* from a `device_span`
*
* @note This function does not synchronize `stream` after the copy. The returned vector uses
* a pinned memory resource.
*
* @tparam T The type of the data to copy
* @param source_data The device_span of data to copy
* @param stream The stream on which to perform the copy
* @return The data copied to the host
*/
template <typename T>
host_vector<T> make_pinned_vector_async(device_span<T const> source_data,
rmm::cuda_stream_view stream)
{
auto result = make_pinned_vector_async<T>(source_data.size(), stream);
cuda_memcpy_async<T>(result, source_data, stream);
return result;
}

/**
*
* @brief Asynchronously construct a pinned `cudf::detail::host_vector` containing a copy of data
* from a device container
*
* @note This function does not synchronize `stream` after the copy. The returned vector uses
* a pinned memory resource.
*
* @tparam Container The type of the container to copy from
* @tparam T The type of the data to copy
* @param c The input device container from which to copy
* @param stream The stream on which to perform the copy
* @return The data copied to the host
*/
template <typename Container>
host_vector<typename Container::value_type> make_pinned_vector_async(Container const& c,
rmm::cuda_stream_view stream)
requires(std::is_convertible_v<Container, device_span<typename Container::value_type const>>)
{
return make_pinned_vector_async(device_span<typename Container::value_type const>{c}, stream);
}

/**
* @brief Synchronously construct a pinned `cudf::detail::host_vector` containing a copy of data
* from a `device_span`
*
* @note This function does a synchronize on `stream` after the copy. The returned vector uses
* a pinned memory resource.
*
* @tparam T The type of the data to copy
* @param source_data The device_span of data to copy
* @param stream The stream on which to perform the copy
* @return The data copied to the host
*/
template <typename T>
host_vector<T> make_pinned_vector(device_span<T const> source_data, rmm::cuda_stream_view stream)
{
auto result = make_pinned_vector_async(source_data, stream);
stream.synchronize();
return result;
}

/**
* @brief Synchronously construct a pinned `cudf::detail::host_vector` containing a copy of data
* from a device container
*
* @note This function synchronizes `stream` after the copy. The returned vector uses
* a pinned memory resource.
*
* @tparam Container The type of the container to copy from
* @tparam T The type of the data to copy
* @param c The input device container from which to copy
* @param stream The stream on which to perform the copy
* @return The data copied to the host
*/
template <typename Container>
host_vector<typename Container::value_type> make_pinned_vector(Container const& c,
rmm::cuda_stream_view stream)
requires(std::is_convertible_v<Container, device_span<typename Container::value_type const>>)
{
return make_pinned_vector(device_span<typename Container::value_type const>{c}, stream);
}
} // namespace detail

} // namespace CUDF_EXPORT cudf
26 changes: 16 additions & 10 deletions cpp/include/cudf/reduction/detail/reduction.cuh
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This links back to the draft PR for reference, but covering the full change in reduction.cuh:

https://github.com/rapidsai/cudf/pull/18968/files#diff-d99740825ef0d2e73c3e8392d06ca11b229400d864913b4221f3f3626ad95f85

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/device_scalar.hpp>
#include <cudf/detail/utilities/cast_functor.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/type_dispatcher.hpp>

Expand Down Expand Up @@ -65,8 +66,10 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
{
auto const binary_op = cudf::detail::cast_functor<OutputType>(op.get_binary_op());
auto const initial_value = init.value_or(op.template get_identity<OutputType>());
auto pinned_initial = cudf::detail::make_pinned_vector_async<OutputType>(1, stream);
pinned_initial[0] = initial_value;
using ScalarType = cudf::scalar_type_t<OutputType>;
auto result = std::make_unique<ScalarType>(initial_value, true, stream, mr);
auto result = std::make_unique<ScalarType>(pinned_initial[0], true, stream, mr);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed on Slack: assign initial_value to element zero of a pinned vector, effectively treating it like a pinned scalar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot most of the context here :(
are we passing the value by reference here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we are not passing by reference here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me bring back some context from our Slack chat. The goal is for ScalarType and cub::DeviceReduce::Reduce to copy the initial_value from host-pinned memory.

Back around August 19th in Slack, we discussed:

  • placing the initial_value in a pinned host vector of size 1
  • and then assigning the value to the first element [0].


// Allocate temporary storage
rmm::device_buffer d_temp_storage;
Expand All @@ -77,7 +80,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
result->data(),
num_items,
binary_op,
initial_value,
pinned_initial[0],
stream.value());
d_temp_storage = rmm::device_buffer{temp_storage_bytes, stream};

Expand All @@ -88,7 +91,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
result->data(),
num_items,
binary_op,
initial_value,
pinned_initial[0],
stream.value());
return result;
}
Expand Down Expand Up @@ -123,7 +126,9 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
{
auto const binary_op = cudf::detail::cast_functor<OutputType>(op.get_binary_op());
auto const initial_value = init.value_or(op.template get_identity<OutputType>());
auto dev_result = cudf::detail::device_scalar<OutputType>{initial_value, stream};
auto pinned_initial = cudf::detail::make_pinned_vector_async<OutputType>(1, stream);
pinned_initial[0] = initial_value;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need the pinned vector here, since cudf::detail::device_scalar will use the bounce buffer for the H2D copy anyway.

Copy link
Contributor Author

@JigaoLuo JigaoLuo Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s true—I’ll revert it. But I have one question: does cub::DeviceReduce::Reduce actually copy the initial_value from host memory?

To investigate this question, I ran experiments using both a pinned host vector and a regular one.

$ nsys profile ./REDUCTIONS_TEST 
$ nsys export --output report1.sqlite --type sqlite report1.nsys-rep
$ nsys analyze -r cuda_memcpy_async:rows=-1 report1.nsys-rep | wc -l 

I didn’t observe any difference in pageable-copy counter, which suggests that CUB avoids pageable memory internally.


  • What makes this confusing is that I recall doing a similar experiment a few months ago to pinpoint a pageable memory bottleneck. I’m fairly sure I found one and managed to eliminate it back then.
  • (I also tried reading through the CUB source code, but it gets pretty hard to follow after the dispatch logic and various specialization paths)

auto dev_result = cudf::detail::device_scalar<OutputType>{pinned_initial[0], stream};

// Allocate temporary storage
rmm::device_buffer d_temp_storage;
Expand All @@ -134,7 +139,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
dev_result.data(),
num_items,
binary_op,
initial_value,
pinned_initial[0],
stream.value());
d_temp_storage = rmm::device_buffer{temp_storage_bytes, stream};

Expand All @@ -145,7 +150,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
dev_result.data(),
num_items,
binary_op,
initial_value,
pinned_initial[0],
stream.value());

return std::make_unique<cudf::string_scalar>(dev_result, true, stream, mr);
Expand Down Expand Up @@ -185,8 +190,9 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
{
auto const binary_op = cudf::detail::cast_functor<IntermediateType>(op.get_binary_op());
auto const initial_value = op.template get_identity<IntermediateType>();

cudf::detail::device_scalar<IntermediateType> intermediate_result{initial_value, stream};
auto pinned_initial = cudf::detail::make_pinned_vector_async<IntermediateType>(1, stream);
pinned_initial[0] = initial_value;
cudf::detail::device_scalar<IntermediateType> intermediate_result{pinned_initial[0], stream};

// Allocate temporary storage
rmm::device_buffer d_temp_storage;
Expand All @@ -197,7 +203,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
intermediate_result.data(),
num_items,
binary_op,
initial_value,
pinned_initial[0],
stream.value());
d_temp_storage = rmm::device_buffer{temp_storage_bytes, stream};

Expand All @@ -208,7 +214,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
intermediate_result.data(),
num_items,
binary_op,
initial_value,
pinned_initial[0],
stream.value());

// compute the result value from intermediate value in device
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/io/parquet/reader_impl.cpp
Copy link
Contributor Author

@JigaoLuo JigaoLuo Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the type of offsets and buff_addrs to cudf::detail::host_vector for calling write_final_offsets function. This is the only place where the function is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So no need to change the write_final_offsets function in cpp/src/io/parquet/page_data.cu

Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,8 @@ void reader_impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num_
// that it is difficult/impossible for a given page to know that it is writing the very
// last value that should then be followed by a terminator (because rows can span
// page boundaries).
std::vector<size_type*> out_buffers;
std::vector<size_type> final_offsets;
auto out_buffers = cudf::detail::make_host_vector<size_type*>(_input_columns.size(), _stream);
auto final_offsets = cudf::detail::make_host_vector<size_type>(_input_columns.size(), _stream);
out_buffers.reserve(_input_columns.size());
final_offsets.reserve(_input_columns.size());
for (size_t idx = 0; idx < _input_columns.size(); idx++) {
Expand All @@ -486,14 +486,14 @@ void reader_impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num_

// the final offset for a list at level N is the size of it's child
size_type const offset = child.type.id() == type_id::LIST ? child.size - 1 : child.size;
out_buffers.emplace_back(static_cast<size_type*>(out_buf.data()) + (out_buf.size - 1));
final_offsets.emplace_back(offset);
out_buffers.push_back(static_cast<size_type*>(out_buf.data()) + (out_buf.size - 1));
final_offsets.push_back(offset);
out_buf.user_data |= PARQUET_COLUMN_BUFFER_FLAG_LIST_TERMINATED;
} else if (out_buf.type.id() == type_id::STRING) {
// only if it is not a large strings column
if (std::cmp_less_equal(col_string_sizes[idx], strings::detail::get_offset64_threshold())) {
out_buffers.emplace_back(static_cast<size_type*>(out_buf.data()) + out_buf.size);
final_offsets.emplace_back(static_cast<size_type>(col_string_sizes[idx]));
out_buffers.push_back(static_cast<size_type*>(out_buf.data()) + out_buf.size);
final_offsets.push_back(static_cast<size_type>(col_string_sizes[idx]));
}
// Nested large strings column
else if (input_col.nesting_depth() > 0) {
Expand Down