Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
5cb8dbe
WIP query code
m09526 Aug 11, 2025
76ec854
Query building
m09526 Aug 12, 2025
572284c
WIP query output
m09526 Aug 12, 2025
c7669b2
Fixed
m09526 Aug 13, 2025
bb0a2af
OutputOptions compiles
m09526 Aug 13, 2025
80f66ee
CLippy
m09526 Aug 13, 2025
9410a86
Remove unneccessary lifetime
m09526 Aug 13, 2025
68dc9ca
More common code
m09526 Aug 13, 2025
2fccf4b
Clippy warnings
m09526 Aug 13, 2025
3dcd321
Query sketch generation
m09526 Aug 13, 2025
6ef8df1
Fix
m09526 Aug 13, 2025
bf63e7b
tests passing
m09526 Aug 13, 2025
a63ef5e
Doc tests pass
m09526 Aug 13, 2025
e6a2660
Clippy warnings
m09526 Aug 13, 2025
9835477
Merge branch 'develop' into 5408-implement-rust-leaf-partition-query-…
m09526 Aug 13, 2025
e09cd53
CLI for query
m09526 Aug 13, 2025
40bad8f
Merge branch 'develop' into 5408-implement-rust-leaf-partition-query-…
m09526 Aug 18, 2025
420cec7
Merge branch 'develop' into 5408-implement-rust-leaf-partition-query-…
m09526 Aug 21, 2025
000a958
Merge branch 'develop' into 5408-implement-rust-leaf-partition-query-…
rtjd6554 Aug 26, 2025
ef4d610
Merge branch 'develop' into 5408-implement-rust-leaf-partition-query-…
patchwork01 Aug 28, 2025
5abeff9
Merge branch 'develop' into 5408-implement-rust-leaf-partition-query-…
m09526 Sep 1, 2025
53eced8
Compile issue fixed
m09526 Sep 2, 2025
edde88f
SOme unit tests
m09526 Sep 2, 2025
1f4544a
Split util tests out
m09526 Sep 2, 2025
4c5897e
Unit tests written
m09526 Sep 2, 2025
4faae2f
Clippy warnings
m09526 Sep 2, 2025
512d01d
Merge branch 'develop' into 5408-implement-rust-leaf-partition-query-…
m09526 Sep 2, 2025
108cd70
Merge remote-tracking branch 'origin/develop' into 5408-implement-rus…
patchwork01 Sep 3, 2025
1004b87
5408 Reformat compaction_test.rs
patchwork01 Sep 3, 2025
1bdcbbb
5408 Remove unused dependency declaration
patchwork01 Sep 3, 2025
d339273
Merge remote-tracking branch 'origin/develop' into 5408-implement-rus…
patchwork01 Sep 3, 2025
868bccf
Remove unneeded macro
m09526 Sep 3, 2025
d20d041
Better error handling
m09526 Sep 3, 2025
3f3b161
Merge branch 'develop' into 5408-implement-rust-leaf-partition-query-…
m09526 Sep 4, 2025
30f0f2c
Update to builder pattern
m09526 Sep 4, 2025
cc9d3b6
Some query tests updated
m09526 Sep 4, 2025
9f29754
Fixes for PR
m09526 Sep 4, 2025
8795cbb
Merge branch 'develop' into 5408-implement-rust-leaf-partition-query-…
m09526 Sep 4, 2025
4b18549
5408 Revert merge problem
patchwork01 Sep 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
838 changes: 397 additions & 441 deletions rust/Cargo.lock

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
[workspace]
members = [ "aggregator_udfs", "filter_udfs", "compactor", "objectstore_ext", "rust_sketch", "sleeper_core", "sleeper_df" ]
members = [ "aggregator_udfs", "filter_udfs", "apps", "objectstore_ext", "rust_sketch", "sleeper_core", "sleeper_df" ]
resolver = "2"

[workspace.package]
Expand All @@ -32,13 +32,12 @@ aws-config = { version = "1.8.6" } # Credential loading
aws-credential-types = { version = "1.2.6" } # Credential provider types
aws-types = { version = "1.3.8" } # for Region
bytes = { version = "1.10.1" } # Byte buffer for S3 uploading
cargo_metadata = { version = "0.21.0" } # Access cargo metadata programmtically
cargo_metadata = { version = "0.22.0" } # Access cargo metadata programmtically
chrono = { version = "0.4.41" } # Log helper
clap = { version = "4.5.47" } # Cmd line args processing
tracing-subscriber = { version = "0.3.20" }
color-eyre = { version = "0.6.5" } # Colourised version of `anyhow`
cxx = { version = "1.0.170" } # Exception handling for Rust
cxx-build = { version = "1.0.174" }
cxx = { version = "1.0.175" } # Exception handling for Rust
cxx-build = { version = "1.0.175" }
datafusion = { version = "48.0.1" }
env_logger = { version = "0.11.8" } # Standard logging to stderr
futures = { version = "0.3.31" } # Async processing
Expand Down
10 changes: 4 additions & 6 deletions rust/aggregator_udfs/src/nonnull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ use datafusion::{
sum::sum_udaf,
},
logical_expr::{
Accumulator, AggregateUDF, AggregateUDFImpl, Documentation, EmitTo, GroupsAccumulator,
ReversedUDAF, SetMonotonicity, Signature, StatisticsArgs,
Accumulator, AggregateUDF, AggregateUDFImpl, Documentation, EmitTo, Expr,
GroupsAccumulator, ReversedUDAF, SetMonotonicity, Signature, StatisticsArgs,
expr::{AggregateFunction, AggregateFunctionParams},
function::{AccumulatorArgs, AggregateFunctionSimplification, StateFieldsArgs},
simplify::SimplifyInfo,
utils::AggregateOrderSensitivity,
},
prelude::Expr,
scalar::ScalarValue,
};
use std::{
Expand Down Expand Up @@ -455,16 +454,15 @@ mod tests {
sum::{sum, sum_udaf},
},
logical_expr::{
Accumulator, AggregateUDFImpl, Documentation, EmitTo, GroupsAccumulator, ReversedUDAF,
SetMonotonicity, Signature, StatisticsArgs, Volatility,
Accumulator, AggregateUDFImpl, Documentation, EmitTo, Expr, GroupsAccumulator,
ReversedUDAF, SetMonotonicity, Signature, StatisticsArgs, Volatility,
expr::{AggregateFunction, AggregateFunctionParams, WindowFunctionParams},
function::{AccumulatorArgs, AggregateFunctionSimplification, StateFieldsArgs},
lit,
simplify::SimplifyContext,
utils::AggregateOrderSensitivity,
},
physical_expr::LexOrdering,
prelude::Expr,
scalar::ScalarValue,
};
use mockall::predicate::*;
Expand Down
8 changes: 5 additions & 3 deletions rust/compactor/Cargo.toml → rust/apps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
[package]
name = "compactor"
description = "Binary crate for running the data compactor from the command line."
name = "apps"
description = "Binary crate for running the Sleeper apps from the command line."
default-run = "compact"
keywords = ["sleeper", "compaction"]
categories = ["science"]
version = { workspace = true }
Expand All @@ -25,12 +26,13 @@ rust-version = { workspace = true }
publish = { workspace = true }

[dependencies]
arrow = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true, features = ["derive"] }
tracing-subscriber = { workspace = true }
color-eyre = { workspace = true }
sleeper_core = { path = "../sleeper_core" }
env_logger = { workspace = true }
futures = { workspace = true }
human-panic = { workspace = true }
log = { workspace = true, features = ["release_max_level_debug"] }
num-format = { workspace = true }
Expand Down
134 changes: 36 additions & 98 deletions rust/compactor/src/bin/main.rs → rust/apps/src/bin/compact.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,33 @@
/*
* Copyright 2022-2025 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Copyright 2022-2025 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use apps::path_absolute;
use chrono::Local;
use clap::Parser;
use color_eyre::eyre::bail;
use human_panic::setup_panic;
use log::info;
use num_format::{Locale, ToFormattedString};
use sleeper_core::{
ColRange, CommonConfig, OperationOutput, PartitionBound, SleeperParquetOptions,
ColRange, CommonConfigBuilder, OutputType, PartitionBound, SleeperParquetOptions,
SleeperPartitionRegion, run_compaction,
};
use std::{collections::HashMap, io::Write, path::Path};
use std::{collections::HashMap, io::Write};
use url::Url;

/// Implements a Sleeper compaction algorithm in Rust.
/// Runs a Sleeper compaction algorithm.
///
/// A sequence of Parquet files is read and compacted into a single output Parquet file. The input
/// files must be individually sorted according to the row key columns and then the sort columns. A sketches file containing
Expand Down Expand Up @@ -63,16 +64,6 @@ struct CmdLineArgs {
iterator_config: Option<String>,
}

/// Converts a [`Path`] reference to an absolute path (if not already absolute)
/// and returns it as a String.
///
/// # Panics
/// If the path can't be made absolute due to not being able to get the current
/// directory or the path is not valid.
fn path_absolute<T: ?Sized + AsRef<Path>>(path: &T) -> String {
std::path::absolute(path).unwrap().to_str().unwrap().into()
}

#[tokio::main(flavor = "multi_thread")]
async fn main() -> color_eyre::Result<()> {
// Install coloured errors
Expand Down Expand Up @@ -149,79 +140,26 @@ async fn main() -> color_eyre::Result<()> {
dict_enc_values: true,
};

let details = CommonConfig {
aws_config: None,
input_files: input_urls,
input_files_sorted: true,
row_key_cols: args.row_keys,
sort_key_cols: args.sort_keys,
region: SleeperPartitionRegion::new(map),
output: OperationOutput::File {
let details = CommonConfigBuilder::new()
.aws_config(None)
.input_files(input_urls)
.input_files_sorted(true)
.row_key_cols(args.row_keys)
.sort_key_cols(args.sort_keys)
.region(SleeperPartitionRegion::new(map))
.output(OutputType::File {
output_file,
opts: parquet_options,
},
iterator_config: args.iterator_config,
};

let result = run_compaction(&details).await;
match result {
Ok(r) => {
info!(
"Compaction read {} rows and wrote {} rows",
r.rows_read.to_formatted_string(&Locale::en),
r.rows_written.to_formatted_string(&Locale::en)
);
}
Err(e) => {
bail!(e);
}
}
Ok(())
}

#[cfg(test)]
mod path_test {
use crate::path_absolute;

#[cfg(not(any(target_os = "windows", target_os = "macos")))]
fn cd_to_tmp() {
std::env::set_current_dir("/tmp").unwrap();
}

#[test]
#[cfg(not(any(target_os = "windows", target_os = "macos")))]
fn relative_path_converts() {
cd_to_tmp();
assert_eq!("/tmp/foo/bar/baz.txt", path_absolute("foo/bar/baz.txt"));
}

#[test]
#[cfg(not(any(target_os = "windows", target_os = "macos")))]
fn relative_path_converts_with_one_dot() {
cd_to_tmp();
assert_eq!("/tmp/foo/bar/baz.txt", path_absolute("./foo/bar/baz.txt"));
}

#[test]
#[cfg(not(any(target_os = "windows", target_os = "macos")))]
fn relative_path_converts_with_double_dot() {
cd_to_tmp();
assert_eq!(
"/tmp/../foo/bar/baz.txt",
path_absolute("../foo/bar/baz.txt")
);
}
})
.iterator_config(args.iterator_config)
.build()?;

#[test]
#[cfg(not(any(target_os = "windows", target_os = "macos")))]
fn absolute_path_unchanged() {
cd_to_tmp();
assert_eq!("/tmp/foo/bar", path_absolute("/tmp/foo/bar"));
}
let result = run_compaction(&details).await?;
info!(
"Compaction read {} rows and wrote {} rows",
result.rows_read.to_formatted_string(&Locale::en),
result.rows_written.to_formatted_string(&Locale::en)
);

#[test]
#[should_panic(expected = "cannot make an empty path absolute")]
fn empty_path_panic() {
let _ = path_absolute("");
}
Ok(())
}
Loading
Loading