Skip to content

Commit 852e64e

Browse files
committed
Merge remote-tracking branch 'public/no-miss-sync-pinned-factory' into no-miss-sync-pinned-factory
2 parents ac5a34e + 045e9aa commit 852e64e

File tree

9 files changed

+397
-22
lines changed

9 files changed

+397
-22
lines changed

cpp/benchmarks/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,11 @@ ConfigureNVBench(
322322
PARQUET_READER_NVBENCH io/parquet/parquet_reader_input.cpp io/parquet/parquet_reader_options.cpp
323323
)
324324

325+
# ##################################################################################################
326+
# * parquet reader filter benchmark
327+
# ----------------------------------------------------------------------
328+
ConfigureNVBench(PARQUET_READER_FILTER_NVBENCH io/parquet/parquet_reader_filter.cpp)
329+
325330
# ##################################################################################################
326331
# * parquet experimental reader benchmark
327332
# ----------------------------------------------------------------------
Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
/*
2+
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include <benchmarks/common/generate_input.hpp>
18+
#include <benchmarks/fixture/benchmark_fixture.hpp>
19+
#include <benchmarks/io/cuio_common.hpp>
20+
#include <benchmarks/io/nvbench_helpers.hpp>
21+
22+
#include <cudf_test/column_wrapper.hpp>
23+
24+
#include <cudf/binaryop.hpp>
25+
#include <cudf/copying.hpp>
26+
#include <cudf/detail/sequence.hpp>
27+
#include <cudf/io/parquet.hpp>
28+
#include <cudf/strings/utilities.hpp>
29+
#include <cudf/utilities/default_stream.hpp>
30+
31+
#include <nvbench/nvbench.cuh>
32+
33+
template <typename DataType>
34+
struct filter_generator;
35+
36+
template <typename DataType>
37+
requires(std::is_same_v<DataType, int32_t> || std::is_same_v<DataType, int64_t> ||
38+
std::is_same_v<DataType, float> || std::is_same_v<DataType, double>)
39+
struct filter_generator<DataType> {
40+
decltype(auto) operator()(float selectivity,
41+
cudf::size_type num_rows,
42+
cudf::size_type row_width,
43+
bool nullable,
44+
cudf::size_type predicate_intensity)
45+
{
46+
auto min = static_cast<DataType>(0);
47+
auto max = static_cast<DataType>(num_rows);
48+
auto filter_min = min;
49+
auto filter_max = static_cast<DataType>(filter_min + (max - min) * selectivity);
50+
auto filter_step =
51+
static_cast<DataType>((filter_max - filter_min) / static_cast<DataType>(predicate_intensity));
52+
53+
std::vector<cudf::numeric_scalar<DataType>> filter_min_scalars;
54+
std::vector<cudf::numeric_scalar<DataType>> filter_max_scalars;
55+
56+
for (cudf::size_type i = 0; i < predicate_intensity; i++) {
57+
auto min = static_cast<DataType>(filter_min + i * filter_step);
58+
filter_min_scalars.push_back(cudf::numeric_scalar<DataType>(min));
59+
filter_max_scalars.push_back(cudf::numeric_scalar<DataType>(filter_max));
60+
}
61+
62+
cudf::ast::tree expr;
63+
auto& col_ref = expr.push(cudf::ast::column_reference(0));
64+
65+
for (cudf::size_type i = 0; i < predicate_intensity; i++) {
66+
auto* prev_cond = (i == 0) ? nullptr : &expr.back();
67+
auto& filter_min_lit = expr.push(cudf::ast::literal(filter_min_scalars[i]));
68+
auto& filter_max_lit = expr.push(cudf::ast::literal(filter_max_scalars[i]));
69+
auto& min_filter_cond = expr.push(
70+
cudf::ast::operation(cudf::ast::ast_operator::GREATER_EQUAL, col_ref, filter_min_lit));
71+
auto& max_filter_cond = expr.push(
72+
cudf::ast::operation(cudf::ast::ast_operator::LESS_EQUAL, col_ref, filter_max_lit));
73+
auto& cond = expr.push(cudf::ast::operation(
74+
cudf::ast::ast_operator::LOGICAL_AND, min_filter_cond, max_filter_cond));
75+
76+
if (i != 0) {
77+
expr.push(cudf::ast::operation(cudf::ast::ast_operator::LOGICAL_OR, *prev_cond, cond));
78+
}
79+
}
80+
81+
auto& cond = expr.back();
82+
83+
auto filter_column_it = cudf::detail::make_counting_transform_iterator(
84+
0, [](auto i) { return static_cast<DataType>(i); });
85+
auto filter_column = cudf::test::fixed_width_column_wrapper<DataType>(
86+
filter_column_it, filter_column_it + num_rows)
87+
.release();
88+
89+
auto [null_mask, null_count] = create_random_null_mask(num_rows, nullable ? 0.3 : 0);
90+
91+
filter_column->set_null_mask(null_mask, null_count);
92+
93+
return std::make_tuple(std::move(filter_column),
94+
[filter_min_scalars = std::move(filter_min_scalars),
95+
filter_max_scalars = std::move(filter_max_scalars),
96+
expr = std::move(expr),
97+
cond = std::ref(cond)]() mutable { return cond; });
98+
}
99+
};
100+
101+
template <typename DataType>
102+
requires(std::is_same_v<DataType, cudf::string_view>)
103+
struct filter_generator<DataType> {
104+
decltype(auto) operator()(float selectivity,
105+
cudf::size_type num_rows,
106+
cudf::size_type row_width,
107+
bool nullable,
108+
cudf::size_type predicate_intensity)
109+
{
110+
// these strings are unlikely to be generated by the random string generator.
111+
// to correctly use the selectivity metric, we'll replace rows of strings with these strings so
112+
// we can deterministically know the number of elements that will pass the filter.
113+
std::array<char const*, 16> matches{"NOT MATCHING",
114+
"MATCHLESS",
115+
"STILL NOT MATCHING",
116+
"WILL NOT MATCH",
117+
"NEVER MATCHING",
118+
"MATCHING IMPOSSIBLE 1",
119+
"MATCHING IMPOSSIBLE 2",
120+
"MATCHING IMPOSSIBLE 3",
121+
"MATCHING IMPOSSIBLE 4",
122+
"MATCHING IMPOSSIBLE 5",
123+
"MATCHING IMPOSSIBLE 6",
124+
"MATCHING IMPOSSIBLE 7",
125+
"MATCHING IMPOSSIBLE 8",
126+
"MATCHING IMPOSSIBLE 9",
127+
"MATCHING IMPOSSIBLE 10",
128+
"MATCHING IMPOSSIBLE 11"};
129+
130+
std::vector<char const*> selected_matches;
131+
for (cudf::size_type i = 0; i < predicate_intensity; i++) {
132+
selected_matches.emplace_back(matches[i % (sizeof(matches) / sizeof(matches[0]))]);
133+
}
134+
135+
std::vector<cudf::string_scalar> match_scalars;
136+
for (cudf::size_type i = 0; i < predicate_intensity; i++) {
137+
match_scalars.emplace_back(selected_matches[i]);
138+
}
139+
140+
auto match_it = cudf::detail::make_counting_transform_iterator(
141+
0, [&](auto i) { return matches[i % std::size(matches)]; });
142+
cudf::test::strings_column_wrapper match_col =
143+
cudf::test::strings_column_wrapper(match_it, match_it + num_rows);
144+
145+
auto num_selected = static_cast<cudf::size_type>(num_rows * selectivity);
146+
147+
auto indices = cudf::detail::sequence(num_rows,
148+
cudf::numeric_scalar<cudf::size_type>{0},
149+
cudf::get_default_stream(),
150+
cudf::get_current_device_resource_ref());
151+
152+
auto copy_mask = cudf::binary_operation(indices->view(),
153+
cudf::numeric_scalar<cudf::size_type>{num_selected},
154+
cudf::binary_operator::LESS,
155+
cudf::data_type{cudf::type_id::BOOL8});
156+
157+
auto profile = data_profile();
158+
profile.set_null_probability(nullable ? std::optional{0.3} : std::nullopt);
159+
profile.set_distribution_params(
160+
cudf::type_id::STRING, distribution_id::UNIFORM, row_width, row_width);
161+
auto column = create_random_column(cudf::type_to_id<DataType>(), row_count{num_rows}, profile);
162+
163+
auto result = cudf::copy_if_else(match_col, column->view(), copy_mask->view());
164+
165+
cudf::ast::tree expr;
166+
auto& col_ref = expr.push(cudf::ast::column_reference(0));
167+
168+
for (cudf::size_type i = 0; i < predicate_intensity; i++) {
169+
auto& match_lit = expr.push(cudf::ast::literal(match_scalars[i]));
170+
auto& cond =
171+
expr.push(cudf::ast::operation(cudf::ast::ast_operator::EQUAL, col_ref, match_lit));
172+
173+
if (i == 0) continue;
174+
175+
expr.push(cudf::ast::operation(cudf::ast::ast_operator::LOGICAL_OR, expr.back(), cond));
176+
}
177+
178+
auto& cond = expr.back();
179+
180+
return std::make_tuple(std::move(result),
181+
[match_scalars = std::move(match_scalars),
182+
expr = std::move(expr),
183+
cond = std::ref(cond)]() mutable { return cond; });
184+
}
185+
};
186+
187+
template <typename DataType>
188+
void BM_parquet_read_filter(nvbench::state& state)
189+
{
190+
auto const num_rows = static_cast<cudf::size_type>(state.get_int64("num_rows"));
191+
auto const row_width = static_cast<cudf::size_type>(state.get_int64_or_default("row_width", 1));
192+
auto const predicate_intensity =
193+
static_cast<cudf::size_type>(state.get_int64("predicate_intensity"));
194+
auto const selectivity = static_cast<float>(state.get_float64("selectivity"));
195+
auto const nullable = state.get_string("nullable") == "true";
196+
auto const use_jit = state.get_string("executor") == "jit";
197+
auto const num_input_cols = static_cast<cudf::size_type>(state.get_int64("num_cols"));
198+
199+
CUDF_EXPECTS(num_input_cols >= 1, "Invalid number of input columns");
200+
CUDF_EXPECTS(selectivity > 0.0F && selectivity <= 1.0F, "Invalid selectivity");
201+
CUDF_EXPECTS(predicate_intensity >= 0 && predicate_intensity <= 100'000,
202+
"Invalid number of predicates");
203+
204+
auto const num_copy_only_cols = num_input_cols - 1;
205+
206+
auto [filter_column, filter_func] =
207+
filter_generator<DataType>()(selectivity, num_rows, row_width, nullable, predicate_intensity);
208+
auto copy_only_dtypes = cycle_dtypes(
209+
{cudf::type_id::BOOL8, cudf::type_id::FLOAT32, cudf::type_id::FLOAT64, cudf::type_id::STRING},
210+
num_copy_only_cols);
211+
212+
std::vector<std::unique_ptr<cudf::column>> copy_only_cols;
213+
214+
for (cudf::size_type i = 0; i < num_copy_only_cols; i++) {
215+
auto profile = data_profile();
216+
profile.set_null_probability(std::nullopt);
217+
auto col = create_random_column(copy_only_dtypes[i], row_count{num_rows}, profile);
218+
auto null_mask = cudf::copy_bitmask(filter_column->view());
219+
col->set_null_mask(null_mask, filter_column->null_count());
220+
copy_only_cols.push_back(std::move(col));
221+
}
222+
223+
std::vector<cudf::column_view> table_columns;
224+
table_columns.push_back(filter_column->view());
225+
226+
for (const auto& col : copy_only_cols) {
227+
table_columns.push_back(col->view());
228+
}
229+
230+
auto table = cudf::table_view{table_columns};
231+
232+
std::vector<char> parquet_buffer;
233+
234+
cudf::io::table_input_metadata expected_metadata(table);
235+
for (auto i = 0; i < table.num_columns(); i++) {
236+
expected_metadata.column_metadata[i].set_name("col" + std::to_string(i));
237+
}
238+
239+
cudf::io::parquet_writer_options write_opts =
240+
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{&parquet_buffer}, table)
241+
.metadata(expected_metadata)
242+
.compression(cudf::io::compression_type::AUTO)
243+
.dictionary_policy(cudf::io::dictionary_policy::ALWAYS)
244+
.stats_level(cudf::io::statistics_freq::STATISTICS_NONE);
245+
cudf::io::write_parquet(write_opts);
246+
247+
cudf::io::parquet_reader_options read_opts =
248+
cudf::io::parquet_reader_options::builder(
249+
cudf::io::source_info{cudf::host_span<char>{parquet_buffer.data(), parquet_buffer.size()}})
250+
.filter(filter_func())
251+
.use_jit_filter(use_jit);
252+
253+
// pre-warm JIT cache
254+
if (use_jit) { cudf::io::read_parquet(read_opts); }
255+
256+
if constexpr (std::is_same_v<DataType, cudf::string_view>) {
257+
state.add_global_memory_reads<char>(num_rows * static_cast<size_t>(row_width));
258+
state.add_global_memory_writes<char>(static_cast<size_t>(num_rows * selectivity) * row_width);
259+
} else {
260+
state.add_global_memory_reads<DataType>(num_rows);
261+
state.add_global_memory_writes<DataType>(static_cast<size_t>(num_rows * selectivity));
262+
}
263+
264+
state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value()));
265+
state.exec(
266+
nvbench::exec_tag::sync | nvbench::exec_tag::timer, [&](nvbench::launch& launch, auto& timer) {
267+
try_drop_l3_cache();
268+
269+
timer.start();
270+
auto const result = cudf::io::read_parquet(read_opts);
271+
timer.stop();
272+
273+
CUDF_EXPECTS(result.tbl->num_columns() == num_input_cols, "Unexpected number of columns");
274+
});
275+
}
276+
277+
#define PARQUET_READER_FILTER_BENCHMARK_DEFINE(name, type) \
278+
void name(nvbench::state& state) { BM_parquet_read_filter<type>(state); } \
279+
\
280+
NVBENCH_BENCH(name) \
281+
.add_int64_axis("predicate_intensity", {4, 16}) \
282+
.add_int64_axis("num_cols", {32}) \
283+
.add_int64_axis("num_rows", {100'000, 1'000'000, 10'000'000}) \
284+
.add_float64_axis("selectivity", {0.5, 0.8}) \
285+
.add_string_axis("nullable", {"true"}) \
286+
.add_string_axis("executor", {"jit", "ast"})
287+
288+
PARQUET_READER_FILTER_BENCHMARK_DEFINE(parquet_read_filter_i32, int32_t);
289+
290+
PARQUET_READER_FILTER_BENCHMARK_DEFINE(parquet_read_filter_f32, float);
291+
292+
PARQUET_READER_FILTER_BENCHMARK_DEFINE(parquet_read_filter_i64, int64_t);
293+
294+
PARQUET_READER_FILTER_BENCHMARK_DEFINE(parquet_read_filter_f64, double);
295+
296+
PARQUET_READER_FILTER_BENCHMARK_DEFINE(parquet_read_filter_string, cudf::string_view)
297+
.add_int64_axis("row_width", {8, 256});

cpp/include/cudf/io/parquet.hpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ class parquet_reader_options {
106106
bool _allow_mismatched_pq_schemas = false;
107107
// Cast timestamp columns to a specific type
108108
data_type _timestamp_type{type_id::EMPTY};
109+
// Whether to use JIT compilation for filtering
110+
bool _use_jit_filter = false;
109111

110112
std::optional<std::vector<reader_column_schema>> _reader_column_schema;
111113

@@ -248,6 +250,13 @@ class parquet_reader_options {
248250
*/
249251
[[nodiscard]] data_type get_timestamp_type() const { return _timestamp_type; }
250252

253+
/**
254+
* @brief Returns whether to use JIT compilation for filtering.
255+
*
256+
* @return `true` if JIT compilation should be used for filtering
257+
*/
258+
[[nodiscard]] bool is_enabled_use_jit_filter() const { return _use_jit_filter; }
259+
251260
/**
252261
* @brief Sets the names of columns to be read from all input sources.
253262
*
@@ -576,6 +585,18 @@ class parquet_reader_options_builder {
576585
return *this;
577586
}
578587

588+
/**
589+
* @brief Enable/disable use of JIT for filter step.
590+
*
591+
* @param use_jit_filter Boolean value whether to use JIT filter
592+
* @return this for chaining
593+
*/
594+
parquet_reader_options_builder& use_jit_filter(bool use_jit_filter)
595+
{
596+
options._use_jit_filter = use_jit_filter;
597+
return *this;
598+
}
599+
579600
/**
580601
* @brief move parquet_reader_options member once it's built.
581602
*/

0 commit comments

Comments
 (0)