-
Notifications
You must be signed in to change notification settings - Fork 1k
Enable parallel writing across row groups when writing encrypted parquet #8162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
bcab6f9
to
b701695
Compare
@alamb @adamreeve I think this will make for a better multithreading API. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me thanks Rok, just some minor comments
Thanks for the review @adamreeve ! I suppose it's time to ask @alamb to do a pass :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rok and @adamreeve -- I went over the PR more carefully and it makes sense to me. The examples are quite cool.
Thanks again
I merged up from main and plan to merge this PR in once the tests pass |
Hi @rok and @adamreeve -- there appears to be a clippy error now (due to a new API that was added on main). Can you possibly take a look? Again, I apologize for the delay in review |
Thanks for the review @alamb! |
@alamb I've pushed a couple of ignores just to confirm the codepath. Please don't merge yet. |
@alamb the deprecation warning comes from |
15ed2de
to
c4e38db
Compare
I don't have the mixed sync/async-write test quite right yet, but I wonder if we really need it? |
I am getting ready to create the 56.2.0 release candidate. Shall we try and get this one in, or can it wait for 57.0.0 in October? |
@alamb it'd be great to get this one in. Let me take a look if I can fix up the |
I am not sure -- what I would love to see is an example like this https://docs.rs/parquet/latest/parquet/arrow/arrow_writer/struct.ArrowColumnWriter.html#example-encoding-two-arrow-arrays-in-parallel That shows how to use whatever APIs we have to write data in parallel I am sorry I haven't found time to study this PR in detail. @adamreeve it sounds like you have some good ideas and have devoted time here. What would you recommend? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I went over this PR quite carefully this morning, and reviewed all the PRs / APIs and I think it makes sense. Thank you @rok and @adamreeve
Divergent Writing APIs
One thought I had (which we can do as a follow on PR) would be to unify the APIs for doing concurrent writing so they always use ArrowRowGroupWriterFactory
, which I think would mean:
- Deprecate
get_column_writers
- Make
ArrowRowGroupWriterFactory::new
public - Update the examples to use ArrowRowGroupWriterFactory
If this sounds reasonable, I can file a ticket.
I will also see if I can debug the test failures
} | ||
|
||
/// Create a new row group writer and return its column writers. | ||
pub async fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I double checked and this code is not yet released, so this is not a public API change. It was added in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is very fresh indeed. I hope we're not spoiling some plan here. We did find the ArrowRowGroupWriterFactory
route better so I think it should be ok.
} | ||
|
||
/// Create a new row group writer and return its column writers. | ||
#[deprecated(since = "56.2.0", note = "Use into_serialized_writer instead")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to myself, these APIs were added in
- Support multi-threaded writing of Parquet files with modular encryption #8029
So they are relatively new
|
||
let mut writers = writer.get_column_writers().await.unwrap(); | ||
// Use low-level API to write an Arrow group | ||
let arrow_writer = writer.sync_writer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to use public APIs in the tests, rather than accessing the inner fields directly (which is not possible from other crates)
|
||
/// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`]. | ||
/// This can be useful to provide more control over how files are written. | ||
pub fn into_serialized_writer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I spent quite some time trying to figure out why this API is needed -- specifically "why do we need an ArrowWriter
at all, why not use SerializedFileWriter and get_column_writers
directly, as shown in this example
After study I concluded the reason we need to expose ArrowRowGroupWriterFactory
is that ArrowRowGroupWriterFactory::create_column_writers
also has the appropriate encryption properties.
It is unfortunate that we'll now have two different sets of APIs for creating column writers -- via get_column_writers
AND ArrowRowGroupWriterFactory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed a ticket with a suggestion for a unified API:
Ok(()) | ||
} | ||
|
||
/// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I spent quite a while trying to figure out why we can't just use get_column_writers
as in the example to use ArrowRowGroupWriterFactory and I (finally) realized the reason is the encryption configuration isn't passed to get_column_writers
. ArrowRowGroupWriterFactory
does have the encryption details and thus can make the correct ArrowColumnWriters.
I think it is somewhat of a strange API to create an ArrowWriter
only to immediately destructure it into a SerializedWriter / the underlying writer. It is also unfortunate we now have two different APIs for writing row groups in parallel, depending on encryption.
I have an idea to make the APIs better as a follow on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed it's odd. Motivation was to introduce as few new pub
s as possible. Would be very curious about alternative API shapes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#8162 (review) is my suggestion , basically TLDR is to make ArrowRowGroupWriterFactory
constructor public and deprecate get_column_writers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I spent quite a while trying to figure out why we can't just use get_column_writers.
Sorry, I realise now that I didn't make this very clear in my original issue (#7359).
One other factor is that we couldn't just use the WriterProperties
passed to get_column_writers
to internally create a new FileEncryptor
. When a FileEncryptor
is created for the SerializedFileWriter
, it generates random AAD (additional authentication data), and this AAD has to be the same for all encrypted modules in the file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed a ticket with a suggestion for a unified API:
} | ||
|
||
/// Create column writers for a new row group. | ||
pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see now that this is the key API -- create column writers with the relevant encryption properties, if relevant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you thinks to set create_row_group_writer
pub and remove this function and use function into_writers
add in issue #8260
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I am not sure making ArrowRowGroupWriter
public gets us much of anything, and it would not allow per-column parallel encoding
One benefit of getting the column writers individually, is that then the columns can be encoded in parallel. The ArrowRowGroupWriter
can only write RowGroups in parallel.
I looked at ArrowRowGroupWriter
a bit more, and the only substantial thing it does is call a loop with compute_leaves which is already public.
arrow-rs/parquet/src/arrow/arrow_writer/mod.rs
Lines 831 to 835 in bac3690
struct ArrowRowGroupWriter { | |
writers: Vec<ArrowColumnWriter>, | |
schema: SchemaRef, | |
buffered_rows: usize, | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the failing test (which was also added in #8262 which we removed in this PR too)
assert_eq!(to_write, read); | ||
} | ||
|
||
#[tokio::test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I removed this test because it was added in [Parquet] Write row group with async writer #8262 along with
AsyncArrowWriter::get_column_writers
andAsyncArrowWriter::append_row_group
Since we removed those APIs in this PR, we should also remove the test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
Sorry for the late response @alamb and thanks for the added changes. I think this is good as is and can be included in the release. |
This was my fault for not reviewing this PR before in more detail. I leave this open until tomorrow to give people a chance to respond, and if not I'll merge it in and make the RC |
I've read through the latest comments and it sounds like you have a good understanding of the motivation for this change now, thanks for taking a close look Andrew. I agree it would be good to consolidate on using If you'd rather not have the new |
Yeah I agree -- one method is not a big deal if we are going to go with the ArrowRowGroupWriterFactory I'll merge this PR and file some follow on tickets shortly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this one is ready to go so I am going to merge it in
I filed a ticket to track simplifying / unifying the API here
Thank you @rok @lilianm and @adamreeve |
# Which issue does this PR close? - related to #8162 # Rationale for this change - While reviewing #8162 I read a bunch more of the parquet code and I wanted to capture some of my understanding in comments. # What changes are included in this PR? Add more documentation to various parquet writing APIs # Are these changes tested? By CI # Are there any user-facing changes? Documentation only, no function changes --------- Co-authored-by: Ed Seidl <[email protected]>
Rationale for this change
#8029 introduced
pub ArrowWriter.get_column_writers
andpub ArrowWriter.append_row_group
to enable multi-threaded parquet encrypted writing. However testing downstream showed the API is not feasible, see #8115.What changes are included in this PR?
This introduces
pub ArrowWriter.into_serialized_writer
and deprecatespub ArrowWriter.get_column_writers
andpub ArrowWriter.append_row_group
. It also makesArrowRowGroupWriterFactory
public and adds apub ArrowRowGroupWriterFactory.create_column_writers
.Are these changes tested?
This includes a DataFusion inspired test for concurrent writing across columns and row groups to make sure parallel writing is and remains possible with
ArrowWriter
s API. Further we created a draft PR in DataFusion apache/datafusion#16738 to test for multithreaded writing support.Are there any user-facing changes?
See description of changes.