Skip to content

Commit 328ac21

Browse files
committed
Merge remote-tracking branch 'origin/main' into feat/0.7_rework
# Conflicts: # Cargo.lock # crates/pithos/Cargo.toml # crates/pithos/src/main.rs # crates/pithos_lib/Cargo.toml # crates/pithos_lib/src/lib.rs # crates/pithos_lib/src/transformers/mod.rs # crates/pithos_pyo3/Cargo.toml
2 parents c7b9729 + 6a26de2 commit 328ac21

File tree

1 file changed

+216
-0
lines changed

1 file changed

+216
-0
lines changed
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
use crate::helpers::notifications::{Message, Notifier};
2+
use crate::transformer::Transformer;
3+
use crate::transformer::TransformerType;
4+
use anyhow::Result;
5+
use anyhow::{anyhow, bail};
6+
use async_channel::{Receiver, Sender, TryRecvError};
7+
use bytes::BufMut;
8+
use bytes::Bytes;
9+
use bytes::BytesMut;
10+
use chacha20poly1305::{
11+
aead::{Aead, KeyInit, Payload},
12+
ChaCha20Poly1305,
13+
};
14+
use itertools::Itertools;
15+
use std::collections::HashMap;
16+
use std::sync::Arc;
17+
use tracing::debug;
18+
use tracing::error;
19+
20+
const ENCRYPTION_BLOCK_SIZE: usize = 65_536;
21+
const CIPHER_DIFF: usize = 28;
22+
const CIPHER_SEGMENT_SIZE: usize = ENCRYPTION_BLOCK_SIZE + CIPHER_DIFF;
23+
24+
pub struct ChaChaResilient {
25+
input_buffer: BytesMut,
26+
notifier: Option<Arc<Notifier>>,
27+
msg_receiver: Option<Receiver<Message>>,
28+
idx: Option<usize>,
29+
decryption_key: [u8; 32],
30+
chunk_lengths: Vec<u64>, // File data+meta keys
31+
skip_me: bool,
32+
}
33+
34+
impl ChaChaResilient {
35+
#[tracing::instrument(level = "trace")]
36+
#[allow(dead_code)]
37+
pub fn new_with_lengths(key: [u8; 32], lengths: Vec<u64>) -> Self {
38+
let mut residues = HashMap::new();
39+
40+
// Always start with the default CIPHER_SEGMENT_SIZE
41+
residues.insert(CIPHER_SEGMENT_SIZE as u64, u32::MAX);
42+
43+
for lengths in &lengths {
44+
let residue = *lengths % CIPHER_SEGMENT_SIZE as u64;
45+
if residue != 0 {
46+
let entry = residues.entry(*lengths).or_insert(1);
47+
*entry += 1;
48+
}
49+
}
50+
51+
// Sort residues by occurrences
52+
let in_order_residues: Vec<u64> = residues
53+
.into_iter()
54+
.sorted_by(|a, b| b.1.cmp(&a.1))
55+
.map(|(k, _)| k)
56+
.collect();
57+
58+
ChaChaResilient {
59+
input_buffer: BytesMut::with_capacity(5 * CIPHER_SEGMENT_SIZE),
60+
decryption_key: key,
61+
skip_me: false,
62+
notifier: None,
63+
msg_receiver: None,
64+
chunk_lengths: in_order_residues,
65+
idx: None,
66+
}
67+
}
68+
69+
#[tracing::instrument(level = "trace", skip(self))]
70+
fn process_messages(&mut self) -> Result<bool> {
71+
if let Some(rx) = &self.msg_receiver {
72+
loop {
73+
match rx.try_recv() {
74+
Ok(Message::Finished) => return Ok(true),
75+
Ok(Message::Skip) => {
76+
self.skip_me = true;
77+
}
78+
Ok(_) => {}
79+
Err(TryRecvError::Empty) => {
80+
break;
81+
}
82+
Err(TryRecvError::Closed) => {
83+
error!("Message receiver closed");
84+
return Err(anyhow!("Message receiver closed"));
85+
}
86+
}
87+
}
88+
}
89+
Ok(false)
90+
}
91+
}
92+
93+
#[async_trait::async_trait]
94+
impl Transformer for ChaChaResilient {
95+
#[tracing::instrument(level = "trace", skip(self))]
96+
async fn initialize(&mut self, idx: usize) -> (TransformerType, Sender<Message>) {
97+
self.idx = Some(idx);
98+
let (sx, rx) = async_channel::bounded(10);
99+
self.msg_receiver = Some(rx);
100+
(TransformerType::ChaCha20Decrypt, sx)
101+
}
102+
103+
#[tracing::instrument(level = "trace", skip(self, buf))]
104+
async fn process_bytes(&mut self, buf: &mut bytes::BytesMut) -> Result<()> {
105+
if self.skip_me {
106+
debug!("skipped");
107+
return Ok(());
108+
}
109+
110+
let Ok(finished) = self.process_messages() else {
111+
return Err(anyhow!("Error processing messages"));
112+
};
113+
114+
if !buf.is_empty() {
115+
self.input_buffer.put(buf.split());
116+
}
117+
let mut counter = 0;
118+
let mut current = CIPHER_SEGMENT_SIZE as u64;
119+
loop {
120+
if self.input_buffer.is_empty() {
121+
break;
122+
}
123+
let next_chunksize = if let Some(len) = self.chunk_lengths.get(counter) {
124+
len.clone()
125+
} else {
126+
if current < 15 + 1 {
127+
return Err(anyhow!("Unable to process any more chunks abort!"));
128+
}
129+
current -= 1;
130+
current + 1
131+
};
132+
133+
if self.input_buffer.len() < next_chunksize as usize {
134+
if finished {
135+
// If we are finished we must advance the chunklength calculator
136+
// until we are below the buffer size, since it will not grow anymore
137+
counter += 1;
138+
continue;
139+
} else {
140+
break;
141+
}
142+
}
143+
144+
match decrypt_chunk(
145+
&self.input_buffer.split_to(next_chunksize as usize),
146+
&self.decryption_key,
147+
) {
148+
Ok(bytes) => {
149+
buf.put(bytes);
150+
if self.input_buffer.len() > CIPHER_SEGMENT_SIZE {
151+
// Reset and continue
152+
counter = 0;
153+
current = CIPHER_SEGMENT_SIZE as u64;
154+
continue;
155+
} else {
156+
break;
157+
}
158+
}
159+
Err(e) => {
160+
counter += 1;
161+
error!(
162+
?e,
163+
"Error decrypting chunk tested with size {}", next_chunksize
164+
);
165+
continue;
166+
}
167+
};
168+
}
169+
170+
if finished && self.input_buffer.is_empty() {
171+
if let Some(notifier) = &self.notifier {
172+
notifier.send_next(
173+
self.idx.ok_or_else(|| anyhow!("Missing idx"))?,
174+
Message::Finished,
175+
)?;
176+
}
177+
}
178+
Ok(())
179+
}
180+
181+
#[tracing::instrument(level = "trace", skip(self, notifier))]
182+
#[inline]
183+
async fn set_notifier(&mut self, notifier: Arc<Notifier>) -> Result<()> {
184+
self.notifier = Some(notifier);
185+
Ok(())
186+
}
187+
}
188+
189+
#[tracing::instrument(level = "trace", skip(chunk, decryption_key))]
190+
#[inline]
191+
pub fn decrypt_chunk(chunk: &[u8], decryption_key: &[u8; 32]) -> Result<Bytes> {
192+
if chunk.len() < 15 {
193+
error!(len = chunk.len(), "Unexpected chunk size < 15");
194+
bail!("[CHACHA_DECRYPT] Unexpected chunk size < 15")
195+
}
196+
197+
let (nonce_slice, data) = chunk.split_at(12);
198+
199+
if nonce_slice.len() != 12 {
200+
error!(len = nonce_slice.len(), "Invalid nonce size");
201+
bail!("[CHACHA_DECRYPT] Invalid nonce")
202+
}
203+
204+
let payload = Payload {
205+
msg: data,
206+
aad: b"",
207+
};
208+
Ok(ChaCha20Poly1305::new_from_slice(decryption_key)
209+
.map_err(|e| {
210+
error!(?e, "Unable to initialize decryptor");
211+
anyhow::anyhow!("[CHACHA_DECRYPT] Unable to initialize decryptor")
212+
})?
213+
.decrypt(nonce_slice.into(), payload)
214+
.map_err(|_| anyhow::anyhow!("[CHACHA_DECRYPT] Unable to decrypt chunk"))?
215+
.into())
216+
}

0 commit comments

Comments
 (0)