Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ keywords = ["arrow", "query", "sql"]
include = ["benches/*.rs", "src/**/*.rs", "Cargo.toml", "LICENSE.txt", "NOTICE.txt"]
readme = "../../README.md"
version = { workspace = true }
edition = { workspace = true }
edition = "2024"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

homepage = { workspace = true }
repository = { workspace = true }
license = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/benches/data_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
//! This module provides the in-memory table for more realistic benchmarking.

use arrow::array::{
builder::{Int64Builder, StringBuilder},
ArrayRef, Float32Array, Float64Array, RecordBatch, StringArray, StringViewBuilder,
UInt64Array,
builder::{Int64Builder, StringBuilder},
};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::datasource::MemTable;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/benches/distinct_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ mod data_utils;
use crate::criterion::Criterion;
use data_utils::{create_table_provider, make_data};
use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::physical_plan::{ExecutionPlan, collect};
use datafusion::{datasource::MemTable, error::Result};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::TaskContext;
use datafusion_execution::config::SessionConfig;

use parking_lot::Mutex;
use std::hint::black_box;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/benches/filter_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use arrow::{
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use criterion::{criterion_group, criterion_main, Criterion};
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion::prelude::SessionContext;
use datafusion::{datasource::MemTable, error::Result};
use futures::executor::block_on;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/benches/map_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use std::hint::black_box;
use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array, RecordBatch};
use criterion::{criterion_group, criterion_main, Criterion};
use criterion::{Criterion, criterion_group, criterion_main};
use parking_lot::Mutex;
use rand::prelude::ThreadRng;
use rand::Rng;
use rand::prelude::ThreadRng;
use tokio::runtime::Runtime;

use datafusion::prelude::SessionContext;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/benches/parquet_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ use arrow::datatypes::{
SchemaRef,
};
use arrow::record_batch::RecordBatch;
use criterion::{criterion_group, criterion_main, Criterion};
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::instant::Instant;
use futures::stream::StreamExt;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::{WriterProperties, WriterVersion};
use rand::distr::uniform::SampleUniform;
use rand::distr::Alphanumeric;
use rand::distr::uniform::SampleUniform;
use rand::prelude::*;
use rand::rng;
use std::fs::File;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/benches/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tokio::runtime::Runtime;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::{
collect,
expressions::{col, PhysicalSortExpr},
expressions::{PhysicalSortExpr, col},
};
use datafusion::prelude::SessionContext;
use datafusion_datasource::memory::MemorySourceConfig;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/benches/preserve_file_partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ use arrow::array::{ArrayRef, Float64Array, StringArray, TimestampMillisecondArra
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::prelude::{col, ParquetReadOptions, SessionConfig, SessionContext};
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext, col};
use datafusion_expr::SortExpr;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/benches/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@
use arrow::array::RecordBatch;
use arrow::datatypes::{DataType, Field, Schema};
use bytes::{BufMut, BytesMut};
use criterion::{criterion_group, criterion_main, Criterion};
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion::config::ConfigOptions;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion_physical_plan::ExecutionPlan;
use object_store::ObjectStore;
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::ObjectStore;
use parquet::arrow::ArrowWriter;
use std::sync::Arc;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/benches/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use criterion::{criterion_group, criterion_main, Criterion};
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion::scalar::ScalarValue;

fn criterion_benchmark(c: &mut Criterion) {
Expand Down
66 changes: 33 additions & 33 deletions datafusion/core/benches/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,18 @@ use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::{
execution::context::TaskContext,
physical_plan::{
ExecutionPlan, ExecutionPlanProperties,
coalesce_partitions::CoalescePartitionsExec,
sorts::sort_preserving_merge::SortPreservingMergeExec, ExecutionPlan,
ExecutionPlanProperties,
sorts::sort_preserving_merge::SortPreservingMergeExec,
},
prelude::SessionContext,
};
use datafusion_datasource::memory::MemorySourceConfig;
use datafusion_physical_expr::{expressions::col, PhysicalSortExpr};
use datafusion_physical_expr::{PhysicalSortExpr, expressions::col};
use datafusion_physical_expr_common::sort_expr::LexOrdering;

/// Benchmarks for SortPreservingMerge stream
use criterion::{criterion_group, criterion_main, Criterion};
use criterion::{Criterion, criterion_group, criterion_main};
use futures::StreamExt;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
Expand Down Expand Up @@ -355,14 +355,14 @@ fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches {

/// Create a batch of (utf8_low, utf8_low, utf8_high)
fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut data_gen = DataGenerator::new();

// need to sort by the combined key, so combine them together
let mut tuples: Vec<_> = gen
let mut tuples: Vec<_> = data_gen
.utf8_low_cardinality_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_high_cardinality_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.utf8_high_cardinality_values())
.collect();

if sorted {
Expand All @@ -388,14 +388,14 @@ fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches {

/// Create a batch of (utf8_view_low, utf8_view_low, utf8_view_high)
fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut data_gen = DataGenerator::new();

// need to sort by the combined key, so combine them together
let mut tuples: Vec<_> = gen
let mut tuples: Vec<_> = data_gen
.utf8_low_cardinality_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_high_cardinality_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.utf8_high_cardinality_values())
.collect();

if sorted {
Expand All @@ -421,15 +421,15 @@ fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches {

/// Create a batch of (f64, utf8_low, utf8_low, i64)
fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut data_gen = DataGenerator::new();

// need to sort by the combined key, so combine them together
let mut tuples: Vec<_> = gen
let mut tuples: Vec<_> = data_gen
.i64_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_low_cardinality_values())
.zip(gen.i64_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.i64_values())
.collect();

if sorted {
Expand Down Expand Up @@ -459,15 +459,15 @@ fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches {

/// Create a batch of (f64, utf8_view_low, utf8_view_low, i64)
fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut data_gen = DataGenerator::new();

// need to sort by the combined key, so combine them together
let mut tuples: Vec<_> = gen
let mut tuples: Vec<_> = data_gen
.i64_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_low_cardinality_values())
.zip(gen.i64_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.i64_values())
.collect();

if sorted {
Expand Down Expand Up @@ -497,8 +497,8 @@ fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches {

/// Create a batch of (utf8_dict)
fn dictionary_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut values = gen.utf8_low_cardinality_values();
let mut data_gen = DataGenerator::new();
let mut values = data_gen.utf8_low_cardinality_values();
if sorted {
values.sort_unstable();
}
Expand All @@ -512,12 +512,12 @@ fn dictionary_streams(sorted: bool) -> PartitionedBatches {

/// Create a batch of (utf8_dict, utf8_dict, utf8_dict)
fn dictionary_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut tuples: Vec<_> = gen
let mut data_gen = DataGenerator::new();
let mut tuples: Vec<_> = data_gen
.utf8_low_cardinality_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_low_cardinality_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.utf8_low_cardinality_values())
.collect();

if sorted {
Expand All @@ -543,13 +543,13 @@ fn dictionary_tuple_streams(sorted: bool) -> PartitionedBatches {

/// Create a batch of (utf8_dict, utf8_dict, utf8_dict, i64)
fn mixed_dictionary_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut tuples: Vec<_> = gen
let mut data_gen = DataGenerator::new();
let mut tuples: Vec<_> = data_gen
.utf8_low_cardinality_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_low_cardinality_values())
.zip(gen.i64_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.i64_values())
.collect();

if sorted {
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ fn create_context() -> Arc<Mutex<SessionContext>> {
ctx_holder.lock().push(Arc::new(Mutex::new(ctx)))
});

let ctx = ctx_holder.lock().first().unwrap().clone();
ctx
ctx_holder.lock().first().unwrap().clone()
}

fn criterion_benchmark(c: &mut Criterion) {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/benches/spm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::{collect, ExecutionPlan};
use datafusion_physical_plan::{ExecutionPlan, collect};

use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, Criterion};
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion_datasource::memory::MemorySourceConfig;

fn generate_spm_for_round_robin_tie_breaker(
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ use arrow::datatypes::{DataType, Field, Fields, Schema};
use criterion::Bencher;
use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionContext;
use datafusion_common::{config::Dialect, ScalarValue};
use datafusion_common::{ScalarValue, config::Dialect};
use datafusion_expr::col;
use rand_distr::num_traits::NumCast;
use std::hint::black_box;
use std::path::PathBuf;
use std::sync::Arc;
use test_utils::TableDef;
use test_utils::tpcds::tpcds_schemas;
use test_utils::tpch::tpch_schemas;
use test_utils::TableDef;
use tokio::runtime::Runtime;

const BENCHMARKS_PATH_1: &str = "../../benchmarks/";
Expand Down Expand Up @@ -242,8 +242,10 @@ fn criterion_benchmark(c: &mut Criterion) {
if !PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists()
&& !PathBuf::from(format!("{BENCHMARKS_PATH_2}{CLICKBENCH_DATA_PATH}")).exists()
{
panic!("benchmarks/data/hits_partitioned/ could not be loaded. Please run \
'benchmarks/bench.sh data clickbench_partitioned' prior to running this benchmark")
panic!(
"benchmarks/data/hits_partitioned/ could not be loaded. Please run \
'benchmarks/bench.sh data clickbench_partitioned' prior to running this benchmark"
)
}

let ctx = create_context();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/benches/sql_planner_extended.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use arrow::array::{ArrayRef, RecordBatch};
use arrow_schema::DataType;
use arrow_schema::TimeUnit::Nanosecond;
use criterion::{criterion_group, criterion_main, Criterion};
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion::prelude::{DataFrame, SessionContext};
use datafusion_catalog::MemTable;
use datafusion_common::ScalarValue;
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/benches/sql_query_with_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{fmt::Write, sync::Arc, time::Duration};
use arrow::array::{Int64Builder, RecordBatch, UInt64Builder};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, Criterion, SamplingMode};
use criterion::{Criterion, SamplingMode, criterion_group, criterion_main};
use datafusion::{
datasource::{
file_format::parquet::ParquetFormat,
Expand All @@ -31,13 +31,13 @@ use datafusion::{
use datafusion_execution::runtime_env::RuntimeEnv;
use itertools::Itertools;
use object_store::{
ObjectStore,
memory::InMemory,
path::Path,
throttle::{ThrottleConfig, ThrottledStore},
ObjectStore,
};
use parquet::arrow::ArrowWriter;
use rand::{rngs::StdRng, Rng, SeedableRng};
use rand::{Rng, SeedableRng, rngs::StdRng};
use tokio::runtime::Runtime;
use url::Url;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/benches/struct_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use arrow::{
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use criterion::{criterion_group, criterion_main, Criterion};
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion::prelude::SessionContext;
use datafusion::{datasource::MemTable, error::Result};
use futures::executor::block_on;
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/benches/topk_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
mod data_utils;

use arrow::util::pretty::pretty_format_batches;
use criterion::{criterion_group, criterion_main, Criterion};
use criterion::{Criterion, criterion_group, criterion_main};
use data_utils::make_data;
use datafusion::physical_plan::{collect, displayable, ExecutionPlan};
use datafusion::physical_plan::{ExecutionPlan, collect, displayable};
use datafusion::prelude::SessionContext;
use datafusion::{datasource::MemTable, error::Result};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::TaskContext;
use datafusion_execution::config::SessionConfig;
use std::hint::black_box;
use std::sync::Arc;
use tokio::runtime::Runtime;
Expand All @@ -46,7 +46,9 @@ async fn create_context(
opts.optimizer.enable_topk_aggregation = use_topk;
let ctx = SessionContext::new_with_config(cfg);
let _ = ctx.register_table("traces", mem_table)?;
let sql = format!("select max(timestamp_ms) from traces group by trace_id order by max(timestamp_ms) desc limit {limit};");
let sql = format!(
"select max(timestamp_ms) from traces group by trace_id order by max(timestamp_ms) desc limit {limit};"
);
let df = ctx.sql(sql.as_str()).await?;
let physical_plan = df.create_physical_plan().await?;
let actual_phys_plan = displayable(physical_plan.as_ref()).indent(true).to_string();
Expand Down
Loading