@@ -28,13 +28,16 @@ use parquet::arrow::arrow_reader::{
28
28
ArrowReaderMetadata , ArrowReaderOptions , ParquetRecordBatchReaderBuilder , RowSelection ,
29
29
RowSelector ,
30
30
} ;
31
- use parquet:: arrow:: ArrowWriter ;
31
+ use parquet:: arrow:: arrow_writer:: {
32
+ compute_leaves, ArrowColumnChunk , ArrowLeafColumn , ArrowRowGroupWriterFactory ,
33
+ } ;
34
+ use parquet:: arrow:: { ArrowSchemaConverter , ArrowWriter } ;
32
35
use parquet:: data_type:: { ByteArray , ByteArrayType } ;
33
36
use parquet:: encryption:: decrypt:: FileDecryptionProperties ;
34
37
use parquet:: encryption:: encrypt:: FileEncryptionProperties ;
35
38
use parquet:: errors:: ParquetError ;
36
39
use parquet:: file:: metadata:: ParquetMetaData ;
37
- use parquet:: file:: properties:: WriterProperties ;
40
+ use parquet:: file:: properties:: { WriterProperties , WriterPropertiesBuilder } ;
38
41
use parquet:: file:: writer:: SerializedFileWriter ;
39
42
use parquet:: schema:: parser:: parse_message_type;
40
43
use std:: fs:: File ;
@@ -1062,14 +1065,10 @@ fn test_decrypt_page_index(
1062
1065
Ok ( ( ) )
1063
1066
}
1064
1067
1065
- fn read_and_roundtrip_to_encrypted_file (
1068
+ fn read_encrypted_file (
1066
1069
path : & str ,
1067
1070
decryption_properties : FileDecryptionProperties ,
1068
- encryption_properties : FileEncryptionProperties ,
1069
- ) {
1070
- let temp_file = tempfile:: tempfile ( ) . unwrap ( ) ;
1071
-
1072
- // read example data
1071
+ ) -> Result < ( Vec < RecordBatch > , ArrowReaderMetadata ) , ParquetError > {
1073
1072
let file = File :: open ( path) . unwrap ( ) ;
1074
1073
let options = ArrowReaderOptions :: default ( )
1075
1074
. with_file_decryption_properties ( decryption_properties. clone ( ) ) ;
@@ -1080,7 +1079,18 @@ fn read_and_roundtrip_to_encrypted_file(
1080
1079
let batches = batch_reader
1081
1080
. collect :: < parquet:: errors:: Result < Vec < RecordBatch > , _ > > ( )
1082
1081
. unwrap ( ) ;
1082
+ Ok ( ( batches, metadata) )
1083
+ }
1084
+
1085
+ fn read_and_roundtrip_to_encrypted_file (
1086
+ path : & str ,
1087
+ decryption_properties : FileDecryptionProperties ,
1088
+ encryption_properties : FileEncryptionProperties ,
1089
+ ) {
1090
+ // read example data
1091
+ let ( batches, metadata) = read_encrypted_file ( path, decryption_properties. clone ( ) ) . unwrap ( ) ;
1083
1092
1093
+ let temp_file = tempfile:: tempfile ( ) . unwrap ( ) ;
1084
1094
// write example data
1085
1095
let props = WriterProperties :: builder ( )
1086
1096
. with_file_encryption_properties ( encryption_properties)
@@ -1101,3 +1111,110 @@ fn read_and_roundtrip_to_encrypted_file(
1101
1111
// check re-written example data
1102
1112
verify_encryption_test_file_read ( temp_file, decryption_properties) ;
1103
1113
}
1114
+
1115
+ #[ tokio:: test]
1116
+ async fn test_multi_threaded_encrypted_writing ( ) {
1117
+ // Read example data and set up encryption/decryption properties
1118
+ let testdata = arrow:: util:: test_util:: parquet_test_data ( ) ;
1119
+ let path = format ! ( "{testdata}/encrypt_columns_and_footer.parquet.encrypted" ) ;
1120
+
1121
+ let file_encryption_properties = FileEncryptionProperties :: builder ( b"0123456789012345" . into ( ) )
1122
+ . with_column_key ( "double_field" , b"1234567890123450" . into ( ) )
1123
+ . with_column_key ( "float_field" , b"1234567890123451" . into ( ) )
1124
+ . build ( )
1125
+ . unwrap ( ) ;
1126
+ let decryption_properties = FileDecryptionProperties :: builder ( b"0123456789012345" . into ( ) )
1127
+ . with_column_key ( "double_field" , b"1234567890123450" . into ( ) )
1128
+ . with_column_key ( "float_field" , b"1234567890123451" . into ( ) )
1129
+ . build ( )
1130
+ . unwrap ( ) ;
1131
+
1132
+ let ( record_batches, metadata) =
1133
+ read_encrypted_file ( & path, decryption_properties. clone ( ) ) . unwrap ( ) ;
1134
+ let to_write: Vec < _ > = record_batches
1135
+ . iter ( )
1136
+ . map ( |rb| rb. columns ( ) . to_vec ( ) )
1137
+ . flatten ( )
1138
+ . collect ( ) ;
1139
+ let schema = metadata. schema ( ) . clone ( ) ;
1140
+
1141
+ let props = Arc :: new (
1142
+ WriterPropertiesBuilder :: with_defaults ( )
1143
+ . with_file_encryption_properties ( file_encryption_properties)
1144
+ . build ( ) ,
1145
+ ) ;
1146
+ let parquet_schema = ArrowSchemaConverter :: new ( )
1147
+ . with_coerce_types ( props. coerce_types ( ) )
1148
+ . convert ( & schema)
1149
+ . unwrap ( ) ;
1150
+ let root_schema = parquet_schema. root_schema_ptr ( ) ;
1151
+
1152
+ // Create a temporary file to write the encrypted data
1153
+ let temp_file = tempfile:: NamedTempFile :: new ( ) . unwrap ( ) ;
1154
+ let mut file_writer =
1155
+ SerializedFileWriter :: new ( & temp_file, root_schema. clone ( ) , props. clone ( ) ) . unwrap ( ) ;
1156
+
1157
+ let arrow_row_group_writer_factory = ArrowRowGroupWriterFactory :: new ( & file_writer) ;
1158
+ let arrow_row_group_writer = arrow_row_group_writer_factory
1159
+ . create_row_group_writer ( & parquet_schema, & props. clone ( ) , & schema, 0 )
1160
+ . unwrap ( ) ;
1161
+
1162
+ // Get column writers with encryptor from ArrowRowGroupWriter
1163
+ let col_writers = arrow_row_group_writer. writers ;
1164
+
1165
+ let mut workers: Vec < _ > = col_writers
1166
+ . into_iter ( )
1167
+ . map ( |mut col_writer| {
1168
+ let ( send, recv) = std:: sync:: mpsc:: channel :: < ArrowLeafColumn > ( ) ;
1169
+ let handle = std:: thread:: spawn ( move || {
1170
+ // receive Arrays to encode via the channel
1171
+ for col in recv {
1172
+ col_writer. write ( & col) ?;
1173
+ }
1174
+ // once the input is complete, close the writer
1175
+ // to return the newly created ArrowColumnChunk
1176
+ col_writer. close ( )
1177
+ } ) ;
1178
+ ( handle, send)
1179
+ } )
1180
+ . collect ( ) ;
1181
+
1182
+ let mut worker_iter = workers. iter_mut ( ) ;
1183
+ for ( arr, field) in to_write. iter ( ) . zip ( & schema. fields ) {
1184
+ for leaves in compute_leaves ( field, arr) . unwrap ( ) {
1185
+ worker_iter. next ( ) . unwrap ( ) . 1 . send ( leaves) . unwrap ( ) ;
1186
+ }
1187
+ }
1188
+
1189
+ // Wait for the workers to complete encoding, and append
1190
+ // the resulting column chunks to the row group (and the file)
1191
+ let mut row_group_writer = file_writer. next_row_group ( ) . unwrap ( ) ;
1192
+
1193
+ for ( handle, send) in workers {
1194
+ drop ( send) ; // Drop send side to signal termination
1195
+ // wait for the worker to send the completed chunk
1196
+ let chunk: ArrowColumnChunk = handle. join ( ) . unwrap ( ) . unwrap ( ) ;
1197
+ chunk. append_to_row_group ( & mut row_group_writer) . unwrap ( ) ;
1198
+ }
1199
+ // Close the row group which writes to the underlying file
1200
+ row_group_writer. close ( ) . unwrap ( ) ;
1201
+
1202
+ // Close the file writer which writes the footer
1203
+ let metadata = file_writer. close ( ) . unwrap ( ) ;
1204
+ assert_eq ! ( metadata. num_rows, 50 ) ;
1205
+
1206
+ // Check that the file was written correctly
1207
+ let ( read_record_batches, read_metadata) = read_encrypted_file (
1208
+ temp_file. path ( ) . to_str ( ) . unwrap ( ) ,
1209
+ decryption_properties. clone ( ) ,
1210
+ )
1211
+ . unwrap ( ) ;
1212
+ verify_encryption_test_data ( read_record_batches, read_metadata. metadata ( ) ) ;
1213
+
1214
+ // Check that file was encrypted
1215
+ let result = ArrowReaderMetadata :: load ( & temp_file. into_file ( ) , ArrowReaderOptions :: default ( ) ) ;
1216
+ assert_eq ! (
1217
+ result. unwrap_err( ) . to_string( ) ,
1218
+ "Parquet error: Parquet file has an encrypted footer but decryption properties were not provided"
1219
+ ) ;
1220
+ }
0 commit comments