Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Changelog
:class:`~cryptography.hazmat.primitives.ciphers.aead.ChaCha20Poly1305` to
allow encrypting directly into a pre-allocated buffer.
* Added ``decrypt_into`` methods to
:class:`~cryptography.hazmat.primitives.ciphers.aead.AESCCM`,
:class:`~cryptography.hazmat.primitives.ciphers.aead.AESGCM`,
:class:`~cryptography.hazmat.primitives.ciphers.aead.AESSIV`, and
:class:`~cryptography.hazmat.primitives.ciphers.aead.ChaCha20Poly1305` to
Expand Down
29 changes: 29 additions & 0 deletions docs/hazmat/primitives/aead.rst
Original file line number Diff line number Diff line change
Expand Up @@ -767,4 +767,33 @@ also support providing integrity for associated data which is not encrypted.
when the ciphertext has been changed, but will also occur when the
key, nonce, or associated data are wrong.

.. method:: decrypt_into(nonce, data, associated_data, buf)

.. versionadded:: 47.0.0

Decrypts the ``data`` and authenticates the ``associated_data``. If you
called encrypt with ``associated_data`` you must pass the same
``associated_data`` in decrypt or the integrity check will fail. The
output is written into the ``buf`` parameter.

:param nonce: A value of between 7 and 13 bytes. This
is the same value used when you originally called encrypt.
**NEVER REUSE A NONCE** with a key.
:type nonce: :term:`bytes-like`
:param data: The data to decrypt (with tag appended).
:type data: :term:`bytes-like`
:param associated_data: Additional data to authenticate. Can be
``None`` if none was passed during encryption.
:type associated_data: :term:`bytes-like`
:param buf: A writable :term:`bytes-like` object that must be exactly
``len(data) - tag_length`` bytes. The plaintext will be written to
this buffer.
:returns int: The number of bytes written to the buffer (always
``len(data) - tag_length``).
:raises ValueError: If the buffer is not the correct size.
:raises cryptography.exceptions.InvalidTag: If the authentication tag
doesn't validate this exception will be raised. This will occur
when the ciphertext has been changed, but will also occur when the
key, nonce, or associated data are wrong.

.. _`recommends a 96-bit IV length`: https://csrc.nist.gov/pubs/sp/800/38/d/final
7 changes: 7 additions & 0 deletions src/cryptography/hazmat/bindings/_rust/openssl/aead.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ class AESCCM:
data: Buffer,
associated_data: Buffer | None,
) -> bytes: ...
def decrypt_into(
self,
nonce: Buffer,
data: Buffer,
associated_data: Buffer | None,
buf: Buffer,
) -> int: ...

class AESSIV:
def __init__(self, key: Buffer) -> None: ...
Expand Down
75 changes: 52 additions & 23 deletions src/rust/src/backend/aead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,26 +351,6 @@ impl LazyEvpCipherAead {
)
}

fn decrypt<'p>(
&self,
py: pyo3::Python<'p>,
ciphertext: &[u8],
aad: Option<Aad<'_>>,
nonce: Option<&[u8]>,
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> {
if ciphertext.len() < self.tag_len {
return Err(CryptographyError::from(exceptions::InvalidTag::new_err(())));
}
Ok(pyo3::types::PyBytes::new_with(
py,
ciphertext.len() - self.tag_len,
|b| {
self.decrypt_into(py, ciphertext, aad, nonce, b)?;
Ok(())
},
)?)
}

fn decrypt_into(
&self,
py: pyo3::Python<'_>,
Expand Down Expand Up @@ -1064,27 +1044,76 @@ impl AesCcm {
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> {
let nonce_bytes = nonce.as_bytes();
let data_bytes = data.as_bytes();

if nonce_bytes.len() < 7 || nonce_bytes.len() > 13 {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err("Nonce must be between 7 and 13 bytes"),
));
}
if data_bytes.len() < self.tag_length {
return Err(CryptographyError::from(exceptions::InvalidTag::new_err(())));
}

Ok(pyo3::types::PyBytes::new_with(
py,
data_bytes.len() - self.tag_length,
|b| {
let buf = CffiMutBuf::from_bytes(py, b);
self.decrypt_into(py, nonce, data, associated_data, buf)?;
Ok(())
},
)?)
}

#[pyo3(signature = (nonce, data, associated_data, buf))]
fn decrypt_into(
&self,
py: pyo3::Python<'_>,
nonce: CffiBuf<'_>,
data: CffiBuf<'_>,
associated_data: Option<CffiBuf<'_>>,
mut buf: CffiMutBuf<'_>,
) -> CryptographyResult<usize> {
let nonce_bytes = nonce.as_bytes();
let data_bytes = data.as_bytes();
let aad = associated_data.map(Aad::Single);

if nonce_bytes.len() < 7 || nonce_bytes.len() > 13 {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err("Nonce must be between 7 and 13 bytes"),
));
}

if data_bytes.len() < self.tag_length {
return Err(CryptographyError::from(exceptions::InvalidTag::new_err(())));
}

// For information about computing this, see
// https://tools.ietf.org/html/rfc3610#section-2.1
let l_val = 15 - nonce_bytes.len();
let max_length = 1usize.checked_shl(8 * l_val as u32);
// If `max_length` overflowed, then it's not possible for data to be
// longer than it.
let pt_length = data_bytes.len().saturating_sub(self.tag_length);
if max_length.map(|v| v < pt_length).unwrap_or(false) {
let expected_len = data_bytes.len() - self.tag_length;
if max_length.map(|v| v < expected_len).unwrap_or(false) {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err("Data too long for nonce"),
));
}

self.ctx.decrypt(py, data_bytes, aad, Some(nonce_bytes))
if buf.as_mut_bytes().len() != expected_len {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(format!(
"buffer must be {} bytes",
expected_len
)),
));
}

self.ctx
.decrypt_into(py, data_bytes, aad, Some(nonce_bytes), buf.as_mut_bytes())?;

Ok(expected_len)
}
}

Expand Down
59 changes: 59 additions & 0 deletions tests/hazmat/primitives/test_aead.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,14 @@ def test_invalid_nonce_length(self, backend):
with pytest.raises(ValueError):
aesccm.encrypt(nonce[:6], pt, None)

with pytest.raises(ValueError):
buf = bytearray(16)
aesccm.decrypt_into(nonce, b"x" * 20, None, buf)

with pytest.raises(ValueError):
buf = bytearray(16)
aesccm.decrypt_into(nonce[:6], b"x" * 20, None, buf)

def test_vectors(self, subtests, backend):
vectors = _load_all_params(
os.path.join("ciphers", "AES", "CCM"),
Expand Down Expand Up @@ -423,6 +431,10 @@ def test_decrypt_data_too_short(self, backend):
with pytest.raises(InvalidTag):
aesccm.decrypt(b"0" * 12, b"0", None)

with pytest.raises(InvalidTag):
buf = bytearray(16)
aesccm.decrypt_into(b"0" * 12, b"0", None, buf)

def test_buffer_protocol(self, backend):
key = AESCCM.generate_key(128)
aesccm = AESCCM(key)
Expand Down Expand Up @@ -472,6 +484,53 @@ def test_encrypt_into_buffer_incorrect_size(self, ptlen, buflen, backend):
with pytest.raises(ValueError, match="buffer must be"):
aesccm.encrypt_into(nonce, pt, None, buf)

def test_decrypt_into(self, backend):
key = AESCCM.generate_key(128)
aesccm = AESCCM(key)
nonce = os.urandom(12)
pt = b"decrypt me"
ad = b"additional"
ct = aesccm.encrypt(nonce, pt, ad)
buf = bytearray(len(pt))
n = aesccm.decrypt_into(nonce, ct, ad, buf)
assert n == len(pt)
assert buf == pt

@pytest.mark.parametrize(
("ctlen", "buflen"), [(26, 9), (26, 11), (31, 14), (36, 21)]
)
def test_decrypt_into_buffer_incorrect_size(self, ctlen, buflen, backend):
key = AESCCM.generate_key(128)
aesccm = AESCCM(key)
nonce = os.urandom(12)
ct = b"x" * ctlen
buf = bytearray(buflen)
with pytest.raises(ValueError, match="buffer must be"):
aesccm.decrypt_into(nonce, ct, None, buf)

def test_decrypt_into_invalid_tag(self, backend):
key = AESCCM.generate_key(128)
aesccm = AESCCM(key)
nonce = os.urandom(12)
pt = b"some data"
ad = b"additional"
ct = aesccm.encrypt(nonce, pt, ad)
# Corrupt the ciphertext
corrupted_ct = bytearray(ct)
corrupted_ct[0] ^= 1
buf = bytearray(len(pt))
with pytest.raises(InvalidTag):
aesccm.decrypt_into(nonce, bytes(corrupted_ct), ad, buf)

def test_decrypt_into_nonce_too_long(self, backend):
key = AESCCM.generate_key(128)
aesccm = AESCCM(key)
pt = b"encrypt me" * 6600
nonce = os.urandom(13)
buf = bytearray(len(pt))
with pytest.raises(ValueError, match="Data too long for nonce"):
aesccm.decrypt_into(nonce, pt, None, buf)


def _load_gcm_vectors():
vectors = _load_all_params(
Expand Down
Loading