Skip to content

Commit 5b16a53

Browse files
authored
Merge pull request #5479 from gchq/5408-implement-rust-leaf-partition-query-code
Issue 5408 implement rust leaf partition query code
2 parents 116a61e + 4b18549 commit 5b16a53

File tree

28 files changed

+2131
-758
lines changed

28 files changed

+2131
-758
lines changed

rust/Cargo.lock

Lines changed: 385 additions & 429 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
[workspace]
15-
members = [ "aggregator_udfs", "filter_udfs", "compactor", "objectstore_ext", "rust_sketch", "sleeper_core", "sleeper_df" ]
15+
members = [ "aggregator_udfs", "filter_udfs", "apps", "objectstore_ext", "rust_sketch", "sleeper_core", "sleeper_df" ]
1616
resolver = "2"
1717

1818
[workspace.package]
@@ -32,10 +32,9 @@ aws-config = { version = "1.8.6" } # Credential loading
3232
aws-credential-types = { version = "1.2.6" } # Credential provider types
3333
aws-types = { version = "1.3.8" } # for Region
3434
bytes = { version = "1.10.1" } # Byte buffer for S3 uploading
35-
cargo_metadata = { version = "0.21.0" } # Access cargo metadata programmtically
35+
cargo_metadata = { version = "0.22.0" } # Access cargo metadata programmtically
3636
chrono = { version = "0.4.41" } # Log helper
3737
clap = { version = "4.5.47" } # Cmd line args processing
38-
tracing-subscriber = { version = "0.3.20" }
3938
color-eyre = { version = "0.6.5" } # Colourised version of `anyhow`
4039
cxx = { version = "1.0.175" } # Exception handling for Rust
4140
cxx-build = { version = "1.0.175" }

rust/aggregator_udfs/src/nonnull.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,13 @@ use datafusion::{
2828
sum::sum_udaf,
2929
},
3030
logical_expr::{
31-
Accumulator, AggregateUDF, AggregateUDFImpl, Documentation, EmitTo, GroupsAccumulator,
32-
ReversedUDAF, SetMonotonicity, Signature, StatisticsArgs,
31+
Accumulator, AggregateUDF, AggregateUDFImpl, Documentation, EmitTo, Expr,
32+
GroupsAccumulator, ReversedUDAF, SetMonotonicity, Signature, StatisticsArgs,
3333
expr::{AggregateFunction, AggregateFunctionParams},
3434
function::{AccumulatorArgs, AggregateFunctionSimplification, StateFieldsArgs},
3535
simplify::SimplifyInfo,
3636
utils::AggregateOrderSensitivity,
3737
},
38-
prelude::Expr,
3938
scalar::ScalarValue,
4039
};
4140
use std::{
@@ -455,16 +454,15 @@ mod tests {
455454
sum::{sum, sum_udaf},
456455
},
457456
logical_expr::{
458-
Accumulator, AggregateUDFImpl, Documentation, EmitTo, GroupsAccumulator, ReversedUDAF,
459-
SetMonotonicity, Signature, StatisticsArgs, Volatility,
457+
Accumulator, AggregateUDFImpl, Documentation, EmitTo, Expr, GroupsAccumulator,
458+
ReversedUDAF, SetMonotonicity, Signature, StatisticsArgs, Volatility,
460459
expr::{AggregateFunction, AggregateFunctionParams, WindowFunctionParams},
461460
function::{AccumulatorArgs, AggregateFunctionSimplification, StateFieldsArgs},
462461
lit,
463462
simplify::SimplifyContext,
464463
utils::AggregateOrderSensitivity,
465464
},
466465
physical_expr::LexOrdering,
467-
prelude::Expr,
468466
scalar::ScalarValue,
469467
};
470468
use mockall::predicate::*;

rust/compactor/Cargo.toml renamed to rust/apps/Cargo.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
[package]
15-
name = "compactor"
16-
description = "Binary crate for running the data compactor from the command line."
15+
name = "apps"
16+
description = "Binary crate for running the Sleeper apps from the command line."
17+
default-run = "compact"
1718
keywords = ["sleeper", "compaction"]
1819
categories = ["science"]
1920
version = { workspace = true }
@@ -25,12 +26,13 @@ rust-version = { workspace = true }
2526
publish = { workspace = true }
2627

2728
[dependencies]
29+
arrow = { workspace = true }
2830
chrono = { workspace = true }
2931
clap = { workspace = true, features = ["derive"] }
30-
tracing-subscriber = { workspace = true }
3132
color-eyre = { workspace = true }
3233
sleeper_core = { path = "../sleeper_core" }
3334
env_logger = { workspace = true }
35+
futures = { workspace = true }
3436
human-panic = { workspace = true }
3537
log = { workspace = true, features = ["release_max_level_debug"] }
3638
num-format = { workspace = true }
Lines changed: 36 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,33 @@
11
/*
2-
* Copyright 2022-2025 Crown Copyright
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
2+
* Copyright 2022-2025 Crown Copyright
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
use apps::path_absolute;
1617
use chrono::Local;
1718
use clap::Parser;
1819
use color_eyre::eyre::bail;
1920
use human_panic::setup_panic;
2021
use log::info;
2122
use num_format::{Locale, ToFormattedString};
2223
use sleeper_core::{
23-
ColRange, CommonConfig, OperationOutput, PartitionBound, SleeperParquetOptions,
24+
ColRange, CommonConfigBuilder, OutputType, PartitionBound, SleeperParquetOptions,
2425
SleeperPartitionRegion, run_compaction,
2526
};
26-
use std::{collections::HashMap, io::Write, path::Path};
27+
use std::{collections::HashMap, io::Write};
2728
use url::Url;
2829

29-
/// Implements a Sleeper compaction algorithm in Rust.
30+
/// Runs a Sleeper compaction algorithm.
3031
///
3132
/// A sequence of Parquet files is read and compacted into a single output Parquet file. The input
3233
/// files must be individually sorted according to the row key columns and then the sort columns. A sketches file containing
@@ -63,16 +64,6 @@ struct CmdLineArgs {
6364
iterator_config: Option<String>,
6465
}
6566

66-
/// Converts a [`Path`] reference to an absolute path (if not already absolute)
67-
/// and returns it as a String.
68-
///
69-
/// # Panics
70-
/// If the path can't be made absolute due to not being able to get the current
71-
/// directory or the path is not valid.
72-
fn path_absolute<T: ?Sized + AsRef<Path>>(path: &T) -> String {
73-
std::path::absolute(path).unwrap().to_str().unwrap().into()
74-
}
75-
7667
#[tokio::main(flavor = "multi_thread")]
7768
async fn main() -> color_eyre::Result<()> {
7869
// Install coloured errors
@@ -149,79 +140,26 @@ async fn main() -> color_eyre::Result<()> {
149140
dict_enc_values: true,
150141
};
151142

152-
let details = CommonConfig {
153-
aws_config: None,
154-
input_files: input_urls,
155-
input_files_sorted: true,
156-
row_key_cols: args.row_keys,
157-
sort_key_cols: args.sort_keys,
158-
region: SleeperPartitionRegion::new(map),
159-
output: OperationOutput::File {
143+
let details = CommonConfigBuilder::new()
144+
.aws_config(None)
145+
.input_files(input_urls)
146+
.input_files_sorted(true)
147+
.row_key_cols(args.row_keys)
148+
.sort_key_cols(args.sort_keys)
149+
.region(SleeperPartitionRegion::new(map))
150+
.output(OutputType::File {
160151
output_file,
161152
opts: parquet_options,
162-
},
163-
iterator_config: args.iterator_config,
164-
};
165-
166-
let result = run_compaction(&details).await;
167-
match result {
168-
Ok(r) => {
169-
info!(
170-
"Compaction read {} rows and wrote {} rows",
171-
r.rows_read.to_formatted_string(&Locale::en),
172-
r.rows_written.to_formatted_string(&Locale::en)
173-
);
174-
}
175-
Err(e) => {
176-
bail!(e);
177-
}
178-
}
179-
Ok(())
180-
}
181-
182-
#[cfg(test)]
183-
mod path_test {
184-
use crate::path_absolute;
185-
186-
#[cfg(not(any(target_os = "windows", target_os = "macos")))]
187-
fn cd_to_tmp() {
188-
std::env::set_current_dir("/tmp").unwrap();
189-
}
190-
191-
#[test]
192-
#[cfg(not(any(target_os = "windows", target_os = "macos")))]
193-
fn relative_path_converts() {
194-
cd_to_tmp();
195-
assert_eq!("/tmp/foo/bar/baz.txt", path_absolute("foo/bar/baz.txt"));
196-
}
197-
198-
#[test]
199-
#[cfg(not(any(target_os = "windows", target_os = "macos")))]
200-
fn relative_path_converts_with_one_dot() {
201-
cd_to_tmp();
202-
assert_eq!("/tmp/foo/bar/baz.txt", path_absolute("./foo/bar/baz.txt"));
203-
}
204-
205-
#[test]
206-
#[cfg(not(any(target_os = "windows", target_os = "macos")))]
207-
fn relative_path_converts_with_double_dot() {
208-
cd_to_tmp();
209-
assert_eq!(
210-
"/tmp/../foo/bar/baz.txt",
211-
path_absolute("../foo/bar/baz.txt")
212-
);
213-
}
153+
})
154+
.iterator_config(args.iterator_config)
155+
.build()?;
214156

215-
#[test]
216-
#[cfg(not(any(target_os = "windows", target_os = "macos")))]
217-
fn absolute_path_unchanged() {
218-
cd_to_tmp();
219-
assert_eq!("/tmp/foo/bar", path_absolute("/tmp/foo/bar"));
220-
}
157+
let result = run_compaction(&details).await?;
158+
info!(
159+
"Compaction read {} rows and wrote {} rows",
160+
result.rows_read.to_formatted_string(&Locale::en),
161+
result.rows_written.to_formatted_string(&Locale::en)
162+
);
221163

222-
#[test]
223-
#[should_panic(expected = "cannot make an empty path absolute")]
224-
fn empty_path_panic() {
225-
let _ = path_absolute("");
226-
}
164+
Ok(())
227165
}

0 commit comments

Comments
 (0)