diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index 0d24a7f0e272..5a2f9e973338 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -20,7 +20,7 @@ use std::time::SystemTime; use crate::fuzz_cases::join_fuzz::JoinTestType::{HjSmj, NljHj}; -use arrow::array::{ArrayRef, Int32Array}; +use arrow::array::{ArrayRef, BinaryArray, Int32Array}; use arrow::compute::SortOptions; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; @@ -92,8 +92,8 @@ fn col_lt_col_filter(schema1: Arc, schema2: Arc) -> JoinFilter { #[tokio::test] async fn test_inner_join_1k_filtered() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::Inner, Some(Box::new(col_lt_col_filter)), ) @@ -104,8 +104,8 @@ async fn test_inner_join_1k_filtered() { #[tokio::test] async fn test_inner_join_1k() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::Inner, None, ) @@ -116,8 +116,8 @@ async fn test_inner_join_1k() { #[tokio::test] async fn test_left_join_1k() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::Left, None, ) @@ -128,8 +128,8 @@ async fn test_left_join_1k() { #[tokio::test] async fn test_left_join_1k_filtered() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::Left, Some(Box::new(col_lt_col_filter)), ) @@ -140,8 +140,8 @@ async fn test_left_join_1k_filtered() { #[tokio::test] async fn test_right_join_1k() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::Right, None, ) @@ -152,8 +152,8 @@ async fn test_right_join_1k() { #[tokio::test] async fn test_right_join_1k_filtered() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::Right, Some(Box::new(col_lt_col_filter)), ) @@ -164,8 +164,8 @@ async fn test_right_join_1k_filtered() { #[tokio::test] async fn test_full_join_1k() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::Full, None, ) @@ -176,8 +176,8 @@ async fn test_full_join_1k() { #[tokio::test] async fn test_full_join_1k_filtered() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::Full, Some(Box::new(col_lt_col_filter)), ) @@ -188,8 +188,8 @@ async fn test_full_join_1k_filtered() { #[tokio::test] async fn test_left_semi_join_1k() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::LeftSemi, None, ) @@ -200,8 +200,8 @@ async fn test_left_semi_join_1k() { #[tokio::test] async fn test_left_semi_join_1k_filtered() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::LeftSemi, Some(Box::new(col_lt_col_filter)), ) @@ -212,8 +212,8 @@ async fn test_left_semi_join_1k_filtered() { #[tokio::test] async fn test_right_semi_join_1k() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::RightSemi, None, ) @@ -224,8 +224,8 @@ async fn test_right_semi_join_1k() { #[tokio::test] async fn test_right_semi_join_1k_filtered() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::RightSemi, Some(Box::new(col_lt_col_filter)), ) @@ -236,8 +236,8 @@ async fn test_right_semi_join_1k_filtered() { #[tokio::test] async fn test_left_anti_join_1k() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::LeftAnti, None, ) @@ -248,8 +248,8 @@ async fn test_left_anti_join_1k() { #[tokio::test] async fn test_left_anti_join_1k_filtered() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::LeftAnti, Some(Box::new(col_lt_col_filter)), ) @@ -260,8 +260,8 @@ async fn test_left_anti_join_1k_filtered() { #[tokio::test] async fn test_right_anti_join_1k() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::RightAnti, None, ) @@ -272,8 +272,8 @@ async fn test_right_anti_join_1k() { #[tokio::test] async fn test_right_anti_join_1k_filtered() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::RightAnti, Some(Box::new(col_lt_col_filter)), ) @@ -284,8 +284,8 @@ async fn test_right_anti_join_1k_filtered() { #[tokio::test] async fn test_left_mark_join_1k() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::LeftMark, None, ) @@ -296,8 +296,8 @@ async fn test_left_mark_join_1k() { #[tokio::test] async fn test_left_mark_join_1k_filtered() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::LeftMark, Some(Box::new(col_lt_col_filter)), ) @@ -309,8 +309,8 @@ async fn test_left_mark_join_1k_filtered() { #[tokio::test] async fn test_right_mark_join_1k() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), JoinType::RightMark, None, ) @@ -321,8 +321,249 @@ async fn test_right_mark_join_1k() { #[tokio::test] async fn test_right_mark_join_1k_filtered() { JoinFuzzTestCase::new( - make_staggered_batches(1000), - make_staggered_batches(1000), + make_staggered_batches_i32(1000), + make_staggered_batches_i32(1000), + JoinType::RightMark, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[NljHj], false) + .await +} + +#[tokio::test] +async fn test_inner_join_1k_binary_filtered() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::Inner, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await +} + +#[tokio::test] +async fn test_inner_join_1k_binary() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::Inner, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await +} + +#[tokio::test] +async fn test_left_join_1k_binary() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::Left, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await +} + +#[tokio::test] +async fn test_left_join_1k_binary_filtered() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::Left, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await +} + +#[tokio::test] +async fn test_right_join_1k_binary() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::Right, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await +} + +#[tokio::test] +async fn test_right_join_1k_binary_filtered() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::Right, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await +} + +#[tokio::test] +async fn test_full_join_1k_binary() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::Full, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await +} + +#[tokio::test] +async fn test_full_join_1k_binary_filtered() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::Full, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[NljHj, HjSmj], false) + .await +} + +#[tokio::test] +async fn test_left_semi_join_1k_binary() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::LeftSemi, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await +} + +#[tokio::test] +async fn test_left_semi_join_1k_binary_filtered() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::LeftSemi, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await +} + +#[tokio::test] +async fn test_right_semi_join_1k_binary() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::RightSemi, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await +} + +#[tokio::test] +async fn test_right_semi_join_1k_binary_filtered() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::RightSemi, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await +} + +#[tokio::test] +async fn test_left_anti_join_1k_binary() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::LeftAnti, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await +} + +#[tokio::test] +async fn test_left_anti_join_1k_binary_filtered() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::LeftAnti, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await +} + +#[tokio::test] +async fn test_right_anti_join_1k_binary() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::RightAnti, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await +} + +#[tokio::test] +async fn test_right_anti_join_1k_binary_filtered() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::RightAnti, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await +} + +#[tokio::test] +async fn test_left_mark_join_1k_binary() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::LeftMark, + None, + ) + .run_test(&[HjSmj, NljHj], false) + .await +} + +#[tokio::test] +async fn test_left_mark_join_1k_binary_filtered() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::LeftMark, + Some(Box::new(col_lt_col_filter)), + ) + .run_test(&[HjSmj, NljHj], false) + .await +} + +// todo: add JoinTestType::HjSmj after Right mark SortMergeJoin support +#[tokio::test] +async fn test_right_mark_join_1k_binary() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), + JoinType::RightMark, + None, + ) + .run_test(&[NljHj], false) + .await +} + +#[tokio::test] +async fn test_right_mark_join_1k_binary_filtered() { + JoinFuzzTestCase::new( + make_staggered_batches_binary(1000), + make_staggered_batches_binary(1000), JoinType::RightMark, Some(Box::new(col_lt_col_filter)), ) @@ -619,7 +860,6 @@ impl JoinFuzzTestCase { hj_formatted_sorted.iter().for_each(|s| println!("{s}")); println!("=============== NestedLoopJoinExec =================="); nlj_formatted_sorted.iter().for_each(|s| println!("{s}")); - Self::save_partitioned_batches_as_parquet( &nlj_collected, out_dir_name, @@ -791,7 +1031,7 @@ impl JoinFuzzTestCase { /// Return randomly sized record batches with: /// two sorted int32 columns 'a', 'b' ranged from 0..99 as join columns /// two random int32 columns 'x', 'y' as other columns -fn make_staggered_batches(len: usize) -> Vec { +fn make_staggered_batches_i32(len: usize) -> Vec { let mut rng = rand::rng(); let mut input12: Vec<(i32, i32)> = vec![(0, 0); len]; let mut input3: Vec = vec![0; len]; @@ -819,3 +1059,43 @@ fn make_staggered_batches(len: usize) -> Vec { // use a random number generator to pick a random sized output stagger_batch_with_seed(batch, 42) } + +fn rand_bytes(rng: &mut R, min: usize, max: usize) -> Vec { + let n = rng.random_range(min..=max); + let mut v = vec![0u8; n]; + rng.fill(&mut v[..]); + v +} + +/// Return randomly sized record batches with: +/// two sorted binary columns 'a', 'b' (lexicographically) as join columns +/// two random binary columns 'x', 'y' as other columns +fn make_staggered_batches_binary(len: usize) -> Vec { + let mut rng = rand::rng(); + + // produce (a,b) pairs then sort lexicographically so SMJ has naturally sorted keys + let mut input12: Vec<(Vec, Vec)> = (0..len) + .map(|_| (rand_bytes(&mut rng, 4, 16), rand_bytes(&mut rng, 4, 16))) + .collect(); + input12.sort_unstable(); // lexicographic on Vec + + // payload cols (also binary so the existing x < x filter is well-typed) + let input3: Vec> = (0..len).map(|_| rand_bytes(&mut rng, 4, 24)).collect(); + let input4: Vec> = (0..len).map(|_| rand_bytes(&mut rng, 4, 24)).collect(); + + let a = BinaryArray::from_iter_values(input12.iter().map(|k| &k.0)); + let b = BinaryArray::from_iter_values(input12.iter().map(|k| &k.1)); + let x = BinaryArray::from_iter_values(input3.iter()); + let y = BinaryArray::from_iter_values(input4.iter()); + + let batch = RecordBatch::try_from_iter(vec![ + ("a", Arc::new(a) as ArrayRef), + ("b", Arc::new(b) as ArrayRef), + ("x", Arc::new(x) as ArrayRef), + ("y", Arc::new(y) as ArrayRef), + ]) + .unwrap(); + + // preserve your existing randomized partitioning + stagger_batch_with_seed(batch, 42) +}