Skip to content

Commit 3f7f6eb

Browse files
modev2301pront
andauthored
feat(codecs): add varint length delimited framing for protobuf (#23352)
* feat(codecs): add varint length delimited framing for protobuf This commit adds support for varint length-delimited framing for protobuf sources and sinks in Vector. This addresses the use case where tools like ClickHouse expect protobuf messages with varint length prefixes instead of the standard 32-bit length prefixes. - Add VarintLengthDelimitedEncoder for encoding varint length prefixes - Add VarintLengthDelimited option to FramingConfig enums - Update default protobuf framing to use varint instead of 32-bit length - Add comprehensive tests for varint framing (7 tests, all passing) - Update validation resources to handle new framing option - Better compatibility with tools like ClickHouse - Eliminates risk of protobuf messages being cut or skipped - Properly handles zero-length messages - Backward compatible with existing configurations ```yaml sources: protobuf_source: type: socket decoding: codec: protobuf protobuf: desc_file: "path/to/protobuf.desc" message_type: "package.MessageType" framing: method: varint_length_delimited sinks: protobuf_sink: type: socket encoding: codec: protobuf protobuf: desc_file: "path/to/protobuf.desc" message_type: "package.MessageType" framing: method: varint_length_delimited ``` - All varint framing tests pass (7/7) - Vector compiles successfully - Configuration validation works - Default behavior updated correctly Closes: [Issue number] * Delete COMMIT_MESSAGE.md * Delete PULL_REQUEST_SUMMARY.md * Address Vector team feedback for varint framing - Change usize to u64 for varint values and u32 to u8 for shift - Fix test assertions to use Bytes::from() instead of string literals - Move documentation example to config/examples/varint_framing_protobuf.yaml - Add custom error types with Snafu for better error handling - Implement proper FramingError trait for custom error types - Fix validation resources to handle varint length delimited framing - All tests passing (9 varint tests + 7 integration tests) * Add TrailingData error variant and improve can_continue logic * docs: add changelog fragment for protobuf varint framing default * Address Vector team feedback: revert protobuf default to length_delimited framing - Revert default from VarintLengthDelimitedEncoder to LengthDelimitedEncoder - Keep varint framing as opt-in feature to avoid breaking changes - Update documentation to reflect opt-in approach - Fix formatting issues (trailing spaces, newlines) - Rename changelog file to reflect feature addition rather than default change * Fix formatting and clippy issues for protobuf varint framing * fix example * generate components docs * fix example config --------- Co-authored-by: Pavlos Rontidis <[email protected]>
1 parent bf948f7 commit 3f7f6eb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+942
-5
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Added support for varint length delimited framing for protobuf, which is compatible with standard protobuf streaming implementations and tools like ClickHouse.
2+
3+
Users can now opt-in to varint framing by explicitly specifying `framing.method: varint_length_delimited` in their configuration. The default remains length-delimited framing for backward compatibility.
4+
5+
authors: modev2301
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Example configuration demonstrating varint framing for protobuf
2+
# This is compatible with tools like ClickHouse that use protobuf with varint length prefixes
3+
4+
sources:
5+
protobuf_source:
6+
type: socket
7+
mode: tcp
8+
address: "0.0.0.0:8080"
9+
decoding:
10+
codec: protobuf
11+
protobuf:
12+
desc_file: "path/to/your/protobuf.desc"
13+
message_type: "your.package.MessageType"
14+
framing:
15+
method: varint_length_delimited
16+
17+
socket_source:
18+
type: socket
19+
mode: tcp
20+
address: "0.0.0.0:8081"
21+
decoding:
22+
codec: protobuf
23+
protobuf:
24+
desc_file: "input.desc"
25+
message_type: "input.Message"
26+
framing:
27+
method: varint_length_delimited
28+
29+
sinks:
30+
protobuf_sink:
31+
inputs:
32+
- protobuf_source
33+
type: socket
34+
mode: tcp
35+
address: "localhost:9090"
36+
encoding:
37+
codec: protobuf
38+
protobuf:
39+
desc_file: "path/to/your/protobuf.desc"
40+
message_type: "your.package.MessageType"
41+
framing:
42+
method: varint_length_delimited
43+
44+
socket_output:
45+
inputs:
46+
- socket_source
47+
type: socket
48+
mode: tcp
49+
address: "localhost:9090"
50+
encoding:
51+
codec: protobuf
52+
protobuf:
53+
desc_file: "output.desc"
54+
message_type: "output.Message"
55+
framing:
56+
method: varint_length_delimited

lib/codecs/src/decoding/framing/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ mod chunked_gelf;
99
mod length_delimited;
1010
mod newline_delimited;
1111
mod octet_counting;
12+
mod varint_length_delimited;
1213

1314
use std::{any::Any, fmt::Debug};
1415

@@ -26,6 +27,9 @@ pub use octet_counting::{
2627
OctetCountingDecoder, OctetCountingDecoderConfig, OctetCountingDecoderOptions,
2728
};
2829
use tokio_util::codec::LinesCodecError;
30+
pub use varint_length_delimited::{
31+
VarintLengthDelimitedDecoder, VarintLengthDelimitedDecoderConfig,
32+
};
2933

3034
pub use self::bytes::{BytesDecoder, BytesDecoderConfig};
3135
use super::StreamDecodingError;
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
use bytes::{Buf, Bytes, BytesMut};
2+
use derivative::Derivative;
3+
use snafu::Snafu;
4+
use tokio_util::codec::Decoder;
5+
use vector_config::configurable_component;
6+
7+
use super::{BoxedFramingError, FramingError, StreamDecodingError};
8+
9+
/// Errors that can occur during varint length delimited framing.
10+
#[derive(Debug, Snafu)]
11+
pub enum VarintFramingError {
12+
#[snafu(display("Varint too large"))]
13+
VarintOverflow,
14+
15+
#[snafu(display("Frame too large: {length} bytes (max: {max})"))]
16+
FrameTooLarge { length: usize, max: usize },
17+
18+
#[snafu(display("Trailing data at EOF"))]
19+
TrailingData,
20+
}
21+
22+
impl StreamDecodingError for VarintFramingError {
23+
fn can_continue(&self) -> bool {
24+
match self {
25+
// Varint overflow and frame too large are not recoverable
26+
Self::VarintOverflow | Self::FrameTooLarge { .. } => false,
27+
// Trailing data at EOF is not recoverable
28+
Self::TrailingData => false,
29+
}
30+
}
31+
}
32+
33+
impl FramingError for VarintFramingError {
34+
fn as_any(&self) -> &dyn std::any::Any {
35+
self as &dyn std::any::Any
36+
}
37+
}
38+
39+
/// Config used to build a `VarintLengthDelimitedDecoder`.
40+
#[configurable_component]
41+
#[derive(Debug, Clone, Derivative)]
42+
#[derivative(Default)]
43+
pub struct VarintLengthDelimitedDecoderConfig {
44+
/// Maximum frame length
45+
#[serde(default = "default_max_frame_length")]
46+
pub max_frame_length: usize,
47+
}
48+
49+
const fn default_max_frame_length() -> usize {
50+
8 * 1_024 * 1_024
51+
}
52+
53+
impl VarintLengthDelimitedDecoderConfig {
54+
/// Build the `VarintLengthDelimitedDecoder` from this configuration.
55+
pub fn build(&self) -> VarintLengthDelimitedDecoder {
56+
VarintLengthDelimitedDecoder::new(self.max_frame_length)
57+
}
58+
}
59+
60+
/// A codec for handling bytes sequences whose length is encoded as a varint prefix.
61+
/// This is compatible with protobuf's length-delimited encoding.
62+
#[derive(Debug, Clone)]
63+
pub struct VarintLengthDelimitedDecoder {
64+
max_frame_length: usize,
65+
}
66+
67+
impl VarintLengthDelimitedDecoder {
68+
/// Creates a new `VarintLengthDelimitedDecoder`.
69+
pub fn new(max_frame_length: usize) -> Self {
70+
Self { max_frame_length }
71+
}
72+
73+
/// Decode a varint from the buffer
74+
fn decode_varint(&self, buf: &mut BytesMut) -> Result<Option<u64>, BoxedFramingError> {
75+
if buf.is_empty() {
76+
return Ok(None);
77+
}
78+
79+
let mut value: u64 = 0;
80+
let mut shift: u8 = 0;
81+
let mut bytes_read = 0;
82+
83+
for byte in buf.iter() {
84+
bytes_read += 1;
85+
let byte_value = (*byte & 0x7F) as u64;
86+
value |= byte_value << shift;
87+
88+
if *byte & 0x80 == 0 {
89+
// Last byte of varint
90+
buf.advance(bytes_read);
91+
return Ok(Some(value));
92+
}
93+
94+
shift += 7;
95+
if shift >= 64 {
96+
return Err(VarintFramingError::VarintOverflow.into());
97+
}
98+
}
99+
100+
// Incomplete varint
101+
Ok(None)
102+
}
103+
}
104+
105+
impl Default for VarintLengthDelimitedDecoder {
106+
fn default() -> Self {
107+
Self::new(default_max_frame_length())
108+
}
109+
}
110+
111+
impl Decoder for VarintLengthDelimitedDecoder {
112+
type Item = Bytes;
113+
type Error = BoxedFramingError;
114+
115+
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
116+
// First, try to decode the varint length
117+
let length = match self.decode_varint(src)? {
118+
Some(len) => len as usize,
119+
None => return Ok(None), // Incomplete varint
120+
};
121+
122+
// Check if the length is reasonable
123+
if length > self.max_frame_length {
124+
return Err(VarintFramingError::FrameTooLarge {
125+
length,
126+
max: self.max_frame_length,
127+
}
128+
.into());
129+
}
130+
131+
// Check if we have enough data for the complete frame
132+
if src.len() < length {
133+
return Ok(None); // Incomplete frame
134+
}
135+
136+
// Extract the frame
137+
let frame = src.split_to(length).freeze();
138+
Ok(Some(frame))
139+
}
140+
141+
fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
142+
if src.is_empty() {
143+
Ok(None)
144+
} else {
145+
// Try to decode what we have, even if incomplete
146+
match self.decode(src)? {
147+
Some(frame) => Ok(Some(frame)),
148+
None => {
149+
// If we have data but couldn't decode it, it's trailing data
150+
if !src.is_empty() {
151+
Err(VarintFramingError::TrailingData.into())
152+
} else {
153+
Ok(None)
154+
}
155+
}
156+
}
157+
}
158+
}
159+
}
160+
161+
#[cfg(test)]
162+
mod tests {
163+
use super::*;
164+
165+
#[test]
166+
fn decode_single_byte_varint() {
167+
let mut input = BytesMut::from(&[0x03, b'f', b'o', b'o'][..]);
168+
let mut decoder = VarintLengthDelimitedDecoder::default();
169+
170+
assert_eq!(
171+
decoder.decode(&mut input).unwrap().unwrap(),
172+
Bytes::from("foo")
173+
);
174+
assert_eq!(decoder.decode(&mut input).unwrap(), None);
175+
}
176+
177+
#[test]
178+
fn decode_multi_byte_varint() {
179+
// 300 in varint encoding: 0xAC 0x02
180+
let mut input = BytesMut::from(&[0xAC, 0x02][..]);
181+
// Add 300 bytes of data
182+
input.extend_from_slice(&vec![b'x'; 300]);
183+
let mut decoder = VarintLengthDelimitedDecoder::default();
184+
185+
let result = decoder.decode(&mut input).unwrap().unwrap();
186+
assert_eq!(result.len(), 300);
187+
assert_eq!(decoder.decode(&mut input).unwrap(), None);
188+
}
189+
190+
#[test]
191+
fn decode_incomplete_varint() {
192+
let mut input = BytesMut::from(&[0x80][..]); // Incomplete varint
193+
let mut decoder = VarintLengthDelimitedDecoder::default();
194+
195+
assert_eq!(decoder.decode(&mut input).unwrap(), None);
196+
}
197+
198+
#[test]
199+
fn decode_incomplete_frame() {
200+
let mut input = BytesMut::from(&[0x05, b'f', b'o'][..]); // Length 5, but only 2 bytes
201+
let mut decoder = VarintLengthDelimitedDecoder::default();
202+
203+
assert_eq!(decoder.decode(&mut input).unwrap(), None);
204+
}
205+
206+
#[test]
207+
fn decode_frame_too_large() {
208+
let mut input =
209+
BytesMut::from(&[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01][..]);
210+
let mut decoder = VarintLengthDelimitedDecoder::new(1000);
211+
212+
assert!(decoder.decode(&mut input).is_err());
213+
}
214+
215+
#[test]
216+
fn decode_trailing_data_at_eof() {
217+
let mut input = BytesMut::from(&[0x03, b'f', b'o', b'o', b'e', b'x', b't', b'r', b'a'][..]);
218+
let mut decoder = VarintLengthDelimitedDecoder::default();
219+
220+
// First decode should succeed
221+
assert_eq!(
222+
decoder.decode(&mut input).unwrap().unwrap(),
223+
Bytes::from("foo")
224+
);
225+
226+
// Second decode should fail with trailing data
227+
assert!(decoder.decode_eof(&mut input).is_err());
228+
}
229+
}

lib/codecs/src/decoding/mod.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub use framing::{
2424
ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions, FramingError, LengthDelimitedDecoder,
2525
LengthDelimitedDecoderConfig, NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig,
2626
NewlineDelimitedDecoderOptions, OctetCountingDecoder, OctetCountingDecoderConfig,
27-
OctetCountingDecoderOptions,
27+
OctetCountingDecoderOptions, VarintLengthDelimitedDecoder, VarintLengthDelimitedDecoderConfig,
2828
};
2929
use smallvec::SmallVec;
3030
use std::fmt::Debug;
@@ -105,6 +105,10 @@ pub enum FramingConfig {
105105
///
106106
/// [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html
107107
ChunkedGelf(ChunkedGelfDecoderConfig),
108+
109+
/// Byte frames which are prefixed by a varint indicating the length.
110+
/// This is compatible with protobuf's length-delimited encoding.
111+
VarintLengthDelimited(VarintLengthDelimitedDecoderConfig),
108112
}
109113

110114
impl From<BytesDecoderConfig> for FramingConfig {
@@ -143,6 +147,12 @@ impl From<ChunkedGelfDecoderConfig> for FramingConfig {
143147
}
144148
}
145149

150+
impl From<VarintLengthDelimitedDecoderConfig> for FramingConfig {
151+
fn from(config: VarintLengthDelimitedDecoderConfig) -> Self {
152+
Self::VarintLengthDelimited(config)
153+
}
154+
}
155+
146156
impl FramingConfig {
147157
/// Build the `Framer` from this configuration.
148158
pub fn build(&self) -> Framer {
@@ -153,6 +163,9 @@ impl FramingConfig {
153163
FramingConfig::NewlineDelimited(config) => Framer::NewlineDelimited(config.build()),
154164
FramingConfig::OctetCounting(config) => Framer::OctetCounting(config.build()),
155165
FramingConfig::ChunkedGelf(config) => Framer::ChunkedGelf(config.build()),
166+
FramingConfig::VarintLengthDelimited(config) => {
167+
Framer::VarintLengthDelimited(config.build())
168+
}
156169
}
157170
}
158171
}
@@ -174,6 +187,8 @@ pub enum Framer {
174187
Boxed(BoxedFramer),
175188
/// Uses a `ChunkedGelfDecoder` for framing.
176189
ChunkedGelf(ChunkedGelfDecoder),
190+
/// Uses a `VarintLengthDelimitedDecoder` for framing.
191+
VarintLengthDelimited(VarintLengthDelimitedDecoder),
177192
}
178193

179194
impl tokio_util::codec::Decoder for Framer {
@@ -189,6 +204,7 @@ impl tokio_util::codec::Decoder for Framer {
189204
Framer::OctetCounting(framer) => framer.decode(src),
190205
Framer::Boxed(framer) => framer.decode(src),
191206
Framer::ChunkedGelf(framer) => framer.decode(src),
207+
Framer::VarintLengthDelimited(framer) => framer.decode(src),
192208
}
193209
}
194210

@@ -201,6 +217,7 @@ impl tokio_util::codec::Decoder for Framer {
201217
Framer::OctetCounting(framer) => framer.decode_eof(src),
202218
Framer::Boxed(framer) => framer.decode_eof(src),
203219
Framer::ChunkedGelf(framer) => framer.decode_eof(src),
220+
Framer::VarintLengthDelimited(framer) => framer.decode_eof(src),
204221
}
205222
}
206223
}

0 commit comments

Comments
 (0)