Skip to content

Commit 7ac48f0

Browse files
authored
ref(attachments): Prepare Kafka Messages for stored Attachments (#5083)
This is the first step towards storing attachments directly from Relay into objectstore. No such storing is happening yet, but the kafka message types are prepared to allow for transmitting the `stored_id` in the future. --- Ref FS-102
1 parent 5a50f74 commit 7ac48f0

File tree

3 files changed

+92
-75
lines changed

3 files changed

+92
-75
lines changed

relay-server/src/services/store.rs

Lines changed: 65 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -686,17 +686,19 @@ impl StoreService {
686686
) -> Result<Option<ChunkedAttachment>, StoreError> {
687687
let id = Uuid::new_v4().to_string();
688688

689-
let mut chunk_index = 0;
690689
let payload = item.payload();
691690
let size = item.len();
692691
let max_chunk_size = self.config.attachment_chunk_size();
693692

694693
// When sending individual attachments, and we have a single chunk, we want to send the
695694
// `data` inline in the `attachment` message.
696695
// This avoids a needless roundtrip through the attachments cache on the Sentry side.
697-
let data = if send_individual_attachments && size < max_chunk_size {
698-
(size > 0).then_some(payload)
696+
let payload = if size == 0 {
697+
AttachmentPayload::Chunked(0)
698+
} else if send_individual_attachments && size < max_chunk_size {
699+
AttachmentPayload::Inline(payload)
699700
} else {
701+
let mut chunk_index = 0;
700702
let mut offset = 0;
701703
// This skips chunks for empty attachments. The consumer does not require chunks for
702704
// empty attachments. `chunks` will be `0` in this case.
@@ -717,26 +719,25 @@ impl StoreService {
717719
offset += chunk_size;
718720
chunk_index += 1;
719721
}
720-
None
721-
};
722722

723-
// The chunk_index is incremented after every loop iteration. After we exit the loop, it
724-
// is one larger than the last chunk, so it is equal to the number of chunks.
723+
// The chunk_index is incremented after every loop iteration. After we exit the loop, it
724+
// is one larger than the last chunk, so it is equal to the number of chunks.
725+
AttachmentPayload::Chunked(chunk_index)
726+
};
725727

726728
let attachment = ChunkedAttachment {
727729
id,
728730
name: match item.filename() {
729731
Some(name) => name.to_owned(),
730732
None => UNNAMED_ATTACHMENT.to_owned(),
731733
},
734+
rate_limited: item.rate_limited(),
732735
content_type: item
733736
.content_type()
734737
.map(|content_type| content_type.as_str().to_owned()),
735738
attachment_type: item.attachment_type().cloned().unwrap_or_default(),
736-
chunks: chunk_index,
737-
data,
738-
size: Some(size),
739-
rate_limited: Some(item.rate_limited()),
739+
size,
740+
payload,
740741
};
741742

742743
if send_individual_attachments {
@@ -1341,6 +1342,26 @@ impl Service for StoreService {
13411342
}
13421343
}
13431344

1345+
/// This signifies how the attachment payload is being transfered.
1346+
#[derive(Debug, Serialize)]
1347+
enum AttachmentPayload {
1348+
/// The payload has been split into multiple chunks.
1349+
///
1350+
/// The individual chunks are being sent as separate [`AttachmentChunkKafkaMessage`] messages.
1351+
/// If the payload `size == 0`, the number of chunks will also be `0`.
1352+
#[serde(rename = "chunks")]
1353+
Chunked(usize),
1354+
1355+
/// The payload is inlined here directly, and thus into the [`ChunkedAttachment`].
1356+
#[serde(rename = "data")]
1357+
Inline(Bytes),
1358+
1359+
/// The attachment has already been stored into the objectstore, with the given Id.
1360+
#[serde(rename = "stored_id")]
1361+
#[allow(unused)] // TODO: actually storing it in objectstore first is still WIP
1362+
Stored(String),
1363+
}
1364+
13441365
/// Common attributes for both standalone attachments and processing-relevant attachments.
13451366
#[derive(Debug, Serialize)]
13461367
struct ChunkedAttachment {
@@ -1352,6 +1373,14 @@ struct ChunkedAttachment {
13521373
/// File name of the attachment file.
13531374
name: String,
13541375

1376+
/// Whether this attachment was rate limited and should be removed after processing.
1377+
///
1378+
/// By default, rate limited attachments are immediately removed from Envelopes. For processing,
1379+
/// native crash reports still need to be retained. These attachments are marked with the
1380+
/// `rate_limited` header, which signals to the processing pipeline that the attachment should
1381+
/// not be persisted after processing.
1382+
rate_limited: bool,
1383+
13551384
/// Content type of the attachment payload.
13561385
#[serde(skip_serializing_if = "Option::is_none")]
13571386
content_type: Option<String>,
@@ -1360,27 +1389,12 @@ struct ChunkedAttachment {
13601389
#[serde(serialize_with = "serialize_attachment_type")]
13611390
attachment_type: AttachmentType,
13621391

1363-
/// Number of outlined chunks.
1364-
/// Zero if the attachment has `size: 0`, or there was only a single chunk which has been inlined into `data`.
1365-
chunks: usize,
1366-
1367-
/// The content of the attachment,
1368-
/// if they are smaller than the configured `attachment_chunk_size`.
1369-
#[serde(skip_serializing_if = "Option::is_none")]
1370-
data: Option<Bytes>,
1371-
13721392
/// The size of the attachment in bytes.
1373-
#[serde(skip_serializing_if = "Option::is_none")]
1374-
size: Option<usize>,
1393+
size: usize,
13751394

1376-
/// Whether this attachment was rate limited and should be removed after processing.
1377-
///
1378-
/// By default, rate limited attachments are immediately removed from Envelopes. For processing,
1379-
/// native crash reports still need to be retained. These attachments are marked with the
1380-
/// `rate_limited` header, which signals to the processing pipeline that the attachment should
1381-
/// not be persisted after processing.
1382-
#[serde(skip_serializing_if = "Option::is_none")]
1383-
rate_limited: Option<bool>,
1395+
/// The attachment payload, chunked, inlined, or already stored.
1396+
#[serde(flatten)]
1397+
payload: AttachmentPayload,
13841398
}
13851399

13861400
/// A hack to make rmp-serde behave more like serde-json when serializing enums.
@@ -1823,18 +1837,13 @@ struct ProfileChunkKafkaMessage {
18231837
#[allow(clippy::large_enum_variant)]
18241838
enum KafkaMessage<'a> {
18251839
Event(EventKafkaMessage),
1826-
Attachment(AttachmentKafkaMessage),
1827-
AttachmentChunk(AttachmentChunkKafkaMessage),
18281840
UserReport(UserReportKafkaMessage),
18291841
Metric {
18301842
#[serde(skip)]
18311843
headers: BTreeMap<String, String>,
18321844
#[serde(flatten)]
18331845
message: MetricKafkaMessage<'a>,
18341846
},
1835-
Profile(ProfileKafkaMessage),
1836-
ReplayEvent(ReplayEventKafkaMessage<'a>),
1837-
ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
18381847
CheckIn(CheckInKafkaMessage),
18391848
Item {
18401849
#[serde(skip)]
@@ -1850,15 +1859,21 @@ enum KafkaMessage<'a> {
18501859
#[serde(flatten)]
18511860
message: SpanKafkaMessage<'a>,
18521861
},
1862+
1863+
Attachment(AttachmentKafkaMessage),
1864+
AttachmentChunk(AttachmentChunkKafkaMessage),
1865+
1866+
Profile(ProfileKafkaMessage),
18531867
ProfileChunk(ProfileChunkKafkaMessage),
1868+
1869+
ReplayEvent(ReplayEventKafkaMessage<'a>),
1870+
ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
18541871
}
18551872

18561873
impl Message for KafkaMessage<'_> {
18571874
fn variant(&self) -> &'static str {
18581875
match self {
18591876
KafkaMessage::Event(_) => "event",
1860-
KafkaMessage::Attachment(_) => "attachment",
1861-
KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
18621877
KafkaMessage::UserReport(_) => "user_report",
18631878
KafkaMessage::Metric { message, .. } => match message.name.namespace() {
18641879
MetricNamespace::Sessions => "metric_sessions",
@@ -1868,24 +1883,26 @@ impl Message for KafkaMessage<'_> {
18681883
MetricNamespace::Stats => "metric_metric_stats",
18691884
MetricNamespace::Unsupported => "metric_unsupported",
18701885
},
1871-
KafkaMessage::Profile(_) => "profile",
1872-
KafkaMessage::ReplayEvent(_) => "replay_event",
1873-
KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
18741886
KafkaMessage::CheckIn(_) => "check_in",
18751887
KafkaMessage::Span { .. } => "span",
1876-
KafkaMessage::ProfileChunk(_) => "profile_chunk",
18771888
KafkaMessage::Item { item_type, .. } => item_type.as_str_name(),
1889+
1890+
KafkaMessage::Attachment(_) => "attachment",
1891+
KafkaMessage::AttachmentChunk(_) => "attachment_chunk",
1892+
1893+
KafkaMessage::Profile(_) => "profile",
1894+
KafkaMessage::ProfileChunk(_) => "profile_chunk",
1895+
1896+
KafkaMessage::ReplayEvent(_) => "replay_event",
1897+
KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
18781898
}
18791899
}
18801900

18811901
/// Returns the partitioning key for this Kafka message determining.
18821902
fn key(&self) -> Option<relay_kafka::Key> {
18831903
match self {
18841904
Self::Event(message) => Some(message.event_id.0),
1885-
Self::Attachment(message) => Some(message.event_id.0),
1886-
Self::AttachmentChunk(message) => Some(message.event_id.0),
18871905
Self::UserReport(message) => Some(message.event_id.0),
1888-
Self::ReplayEvent(message) => Some(message.replay_id.0),
18891906
Self::Span { message, .. } => Some(message.trace_id.0),
18901907

18911908
// Monitor check-ins use the hinted UUID passed through from the Envelope.
@@ -1894,6 +1911,10 @@ impl Message for KafkaMessage<'_> {
18941911
// recieve the routing_key_hint form their envelopes.
18951912
Self::CheckIn(message) => message.routing_key_hint,
18961913

1914+
Self::Attachment(message) => Some(message.event_id.0),
1915+
Self::AttachmentChunk(message) => Some(message.event_id.0),
1916+
Self::ReplayEvent(message) => Some(message.replay_id.0),
1917+
18971918
// Random partitioning
18981919
_ => None,
18991920
}

tests/integration/test_attachments.py

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,12 @@ def test_mixed_attachments_with_processing(
6464
assert attachment == {
6565
"type": "attachment",
6666
"attachment": {
67-
"attachment_type": "event.attachment",
68-
"chunks": attachment_num_chunks[id1],
6967
"id": id1,
7068
"name": "foo.txt",
71-
"size": len(chunked_contents),
7269
"rate_limited": False,
70+
"attachment_type": "event.attachment",
71+
"size": len(chunked_contents),
72+
"chunks": attachment_num_chunks[id1],
7373
},
7474
"event_id": event_id,
7575
"project_id": project_id,
@@ -84,12 +84,11 @@ def test_mixed_attachments_with_processing(
8484
assert attachment == {
8585
"type": "attachment",
8686
"attachment": {
87-
"attachment_type": "event.attachment",
88-
"chunks": 0,
89-
"data": b"hell yeah",
9087
"name": "bar.txt",
91-
"size": len(b"hell yeah"),
9288
"rate_limited": False,
89+
"attachment_type": "event.attachment",
90+
"size": len(b"hell yeah"),
91+
"data": b"hell yeah",
9392
},
9493
"event_id": event_id,
9594
"project_id": project_id,
@@ -106,11 +105,11 @@ def test_mixed_attachments_with_processing(
106105
assert attachment == {
107106
"type": "attachment",
108107
"attachment": {
109-
"attachment_type": "event.attachment",
110-
"chunks": 0,
111108
"name": "foobar.txt",
112-
"size": 0,
113109
"rate_limited": False,
110+
"attachment_type": "event.attachment",
111+
"size": 0,
112+
"chunks": 0,
114113
},
115114
"event_id": event_id,
116115
"project_id": project_id,
@@ -473,13 +472,12 @@ def test_view_hierarchy_processing(
473472
assert attachment == {
474473
"type": "attachment",
475474
"attachment": {
476-
"attachment_type": "event.view_hierarchy",
477-
"chunks": 0,
478-
"data": expected_payload,
479-
"content_type": "application/json",
480475
"name": "Unnamed Attachment",
481-
"size": len(expected_payload),
482476
"rate_limited": False,
477+
"content_type": "application/json",
478+
"attachment_type": "event.view_hierarchy",
479+
"size": len(expected_payload),
480+
"data": expected_payload,
483481
},
484482
"event_id": event_id,
485483
"project_id": project_id,
@@ -524,12 +522,12 @@ def test_event_with_attachment(
524522
assert event_message["attachments"][0].pop("id")
525523
assert list(event_message["attachments"]) == [
526524
{
527-
"attachment_type": "event.attachment",
528-
"chunks": 1,
529-
"content_type": "application/octet-stream",
530525
"name": "Unnamed Attachment",
531-
"size": len(b"event attachment"),
532526
"rate_limited": False,
527+
"content_type": "application/octet-stream",
528+
"attachment_type": "event.attachment",
529+
"size": len(b"event attachment"),
530+
"chunks": 1,
533531
}
534532
]
535533

@@ -547,13 +545,12 @@ def test_event_with_attachment(
547545
relay.send_envelope(project_id, envelope)
548546

549547
expected_attachment = {
550-
"attachment_type": "event.attachment",
551-
"chunks": 0,
552-
"content_type": "application/octet-stream",
553548
"name": "Unnamed Attachment",
549+
"rate_limited": False,
550+
"content_type": "application/octet-stream",
551+
"attachment_type": "event.attachment",
554552
"size": len(b"transaction attachment"),
555553
"data": b"transaction attachment",
556-
"rate_limited": False,
557554
}
558555

559556
attachment = attachments_consumer.get_individual_attachment()
@@ -596,12 +593,11 @@ def test_form_data_is_rejected(
596593
assert attachment == {
597594
"type": "attachment",
598595
"attachment": {
599-
"attachment_type": "event.attachment",
600-
"chunks": 0,
601-
"data": b"file content",
602596
"name": "foo.txt",
603-
"size": len(b"file content"),
604597
"rate_limited": False,
598+
"attachment_type": "event.attachment",
599+
"size": len(b"file content"),
600+
"data": b"file content",
605601
},
606602
"event_id": event_id,
607603
"project_id": project_id,

tests/integration/test_minidump.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -455,11 +455,11 @@ def test_minidump_with_processing(
455455
{
456456
"id": attachment_id,
457457
"name": "minidump.dmp",
458+
"rate_limited": rate_limit == "attachment",
458459
"attachment_type": "event.minidump",
459460
"content_type": "application/x-dmp",
460-
"chunks": num_chunks,
461461
"size": len(content),
462-
"rate_limited": rate_limit == "attachment",
462+
"chunks": num_chunks,
463463
}
464464
]
465465

@@ -501,11 +501,11 @@ def test_minidump_with_processing_invalid(
501501
{
502502
"id": attachment_id,
503503
"name": "minidump.dmp",
504+
"rate_limited": False,
504505
"content_type": "application/x-dmp",
505506
"attachment_type": "event.minidump",
506-
"chunks": num_chunks,
507507
"size": len(content),
508-
"rate_limited": False,
508+
"chunks": num_chunks,
509509
}
510510
]
511511

0 commit comments

Comments
 (0)