Skip to content

Commit 83423a0

Browse files
committed
drafted the skeleton of sort node
1 parent a667539 commit 83423a0

File tree

4 files changed

+432
-0
lines changed

4 files changed

+432
-0
lines changed

crates/iceberg/src/arrow/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ mod reader;
3232
pub mod record_batch_projector;
3333
pub(crate) mod record_batch_transformer;
3434
mod value;
35+
mod partition_value_visitor;
3536

3637
pub use reader::*;
3738
pub use value::*;
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Schema visitor for partition value extraction
19+
20+
use std::collections::HashMap;
21+
use std::sync::Arc;
22+
23+
use arrow_array::{ArrayRef, RecordBatch, StructArray};
24+
use crate::{Error, ErrorKind, Result};
25+
use crate::arrow::{ArrowArrayAccessor, FieldMatchMode};
26+
use crate::spec::{ListType, MapType, NestedFieldRef, PartitionField, PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, StructType, visit_struct_with_partner};
27+
use crate::transform::{create_transform_function, BoxedTransformFunction};
28+
29+
/// Visitor which extracts partition values from a record batch based on partition fields
30+
pub struct PartitionValueVisitor {
31+
/// Map from source ids to their fields
32+
source_id_to_field: HashMap<i32, PartitionField>,
33+
/// Match mode for finding columns in Arrow struct
34+
match_mode: FieldMatchMode,
35+
/// Store the partition values temporarily during computation
36+
partition_values: Vec<ArrayRef>,
37+
/// Current field ID being processed
38+
current_transform_fn: Option<BoxedTransformFunction>,
39+
}
40+
41+
impl PartitionValueVisitor {
42+
/// Creates new instance of PartitionValueVisitor
43+
#[allow(dead_code)]
44+
pub fn new(partition_fields: Vec<PartitionField>) -> Self {
45+
Self::new_with_match_mode(partition_fields, FieldMatchMode::Name)
46+
}
47+
48+
/// Creates new instance of PartitionValueVisitor with explicit match mode
49+
#[allow(dead_code)]
50+
pub fn new_with_match_mode(
51+
partition_fields: Vec<PartitionField>,
52+
match_mode: FieldMatchMode,
53+
) -> Self {
54+
Self {
55+
source_id_to_field: partition_fields
56+
.into_iter()
57+
.map(|field| (field.source_id, field))
58+
.collect(),
59+
match_mode,
60+
partition_values: vec![],
61+
current_transform_fn: None,
62+
}
63+
}
64+
65+
/// Compute partition values in given schema and record batch
66+
#[allow(dead_code)]
67+
pub fn compute(
68+
&mut self,
69+
schema: SchemaRef,
70+
batch: RecordBatch,
71+
) -> Result<Vec<ArrayRef>> {
72+
self.partition_values = vec![];
73+
74+
let arrow_accessor = ArrowArrayAccessor::new_with_match_mode(self.match_mode);
75+
76+
let struct_arr = Arc::new(StructArray::from(batch)) as ArrayRef;
77+
visit_struct_with_partner(
78+
schema.as_struct(),
79+
&struct_arr,
80+
self,
81+
&arrow_accessor,
82+
)?;
83+
84+
Ok(std::mem::take(&mut self.partition_values))
85+
}
86+
87+
/// Check if the current field is a source field, if so, create a transform function for it
88+
fn check_and_create_transform_fn(&mut self, field: &NestedFieldRef) -> Result<()> {
89+
self.current_transform_fn = match self.source_id_to_field.get(&field.id) {
90+
Some(partition_field) => {
91+
if field.field_type.is_primitive() {
92+
Some(create_transform_function(&partition_field.transform)?)
93+
} else {
94+
return Err(Error::new(
95+
ErrorKind::DataInvalid,
96+
format!(
97+
"Cannot partition by non-primitive source field: '{field}'.",
98+
),
99+
));
100+
}
101+
102+
},
103+
None => None,
104+
};
105+
106+
Ok(())
107+
}
108+
}
109+
110+
impl SchemaWithPartnerVisitor<ArrayRef> for PartitionValueVisitor {
111+
type T = ();
112+
113+
fn before_struct_field(&mut self, field: &NestedFieldRef, _partner: &ArrayRef) -> Result<()> {
114+
self.check_and_create_transform_fn(field)?;
115+
Ok(())
116+
}
117+
118+
fn after_struct_field(&mut self, _field: &NestedFieldRef, _partner: &ArrayRef) -> Result<()> {
119+
self.current_transform_fn = None;
120+
Ok(())
121+
}
122+
123+
fn before_list_element(&mut self, field: &NestedFieldRef, _partner: &ArrayRef) -> Result<()> {
124+
self.check_and_create_transform_fn(field)?;
125+
Ok(())
126+
}
127+
128+
fn after_list_element(&mut self, _field: &NestedFieldRef, _partner: &ArrayRef) -> Result<()> {
129+
self.current_transform_fn = None;
130+
Ok(())
131+
}
132+
133+
fn before_map_key(&mut self, field: &NestedFieldRef, _partner: &ArrayRef) -> Result<()> {
134+
self.check_and_create_transform_fn(field)?;
135+
Ok(())
136+
}
137+
138+
fn after_map_key(&mut self, _field: &NestedFieldRef, _partner: &ArrayRef) -> Result<()> {
139+
self.current_transform_fn = None;
140+
Ok(())
141+
}
142+
143+
fn before_map_value(&mut self, field: &NestedFieldRef, _partner: &ArrayRef) -> Result<()> {
144+
self.check_and_create_transform_fn(field)?;
145+
Ok(())
146+
}
147+
148+
fn after_map_value(&mut self, _field: &NestedFieldRef, _partner: &ArrayRef) -> Result<()> {
149+
self.current_transform_fn = None;
150+
Ok(())
151+
}
152+
153+
fn schema(
154+
&mut self,
155+
_schema: &Schema,
156+
_partner: &ArrayRef,
157+
_value: Self::T,
158+
) -> Result<Self::T> {
159+
Ok(())
160+
}
161+
162+
fn field(
163+
&mut self,
164+
_field: &NestedFieldRef,
165+
_partner: &ArrayRef,
166+
_value: Self::T,
167+
) -> Result<Self::T> {
168+
Ok(())
169+
}
170+
171+
fn r#struct(
172+
&mut self,
173+
_struct: &StructType,
174+
_partner: &ArrayRef,
175+
_results: Vec<Self::T>,
176+
) -> Result<Self::T> {
177+
Ok(())
178+
}
179+
180+
fn list(&mut self, _list: &ListType, _list_arr: &ArrayRef, _value: Self::T) -> Result<Self::T> {
181+
Ok(())
182+
}
183+
184+
fn map(
185+
&mut self,
186+
_map: &MapType,
187+
_partner: &ArrayRef,
188+
_key_value: Self::T,
189+
_value: Self::T,
190+
) -> Result<Self::T> {
191+
Ok(())
192+
}
193+
194+
fn primitive(&mut self, _p: &PrimitiveType, col: &ArrayRef) -> Result<Self::T> {
195+
// If the transform fn is some, then it means we are visiting a source field,
196+
// and we should apply the current transform fn
197+
if let Some(transform_fn) = &self.current_transform_fn {
198+
self.partition_values.push(transform_fn.transform(col.clone())?);
199+
}
200+
201+
Ok(())
202+
}
203+
}

crates/integrations/datafusion/src/physical_plan/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub(crate) mod metadata_scan;
2121
pub(crate) mod project;
2222
pub(crate) mod repartition;
2323
pub(crate) mod scan;
24+
pub(crate) mod sort;
2425
pub(crate) mod write;
2526

2627
pub(crate) const DATA_FILES_COL_NAME: &str = "data_files";

0 commit comments

Comments
 (0)