Skip to content

Commit 876ad73

Browse files
committed
Add publish Start message for snapshot parsing/publishing for synchronization with consumer modules
1 parent 48b2457 commit 876ad73

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

common/src/messages.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ pub enum CardanoMessage {
311311

312312
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
313313
pub enum SnapshotMessage {
314+
Startup(), // subscirbers should listen for incremental snapshot data
314315
Bootstrap(SnapshotStateMessage),
315316
DumpRequest(SnapshotDumpMessage),
316317
Dump(SnapshotStateMessage),

modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,16 @@ impl SnapshotHandler {
9595
})
9696
}
9797

98+
async fn publish_start(
99+
&self,
100+
) -> Result<()> {
101+
self.context
102+
.message_bus
103+
.publish(&self.snapshot_topic, Arc::new(Message::Snapshot(acropolis_common::messages::SnapshotMessage::Startup())))
104+
.await
105+
.map_err(|e| anyhow::anyhow!("Failed to publish completion: {}", e))
106+
}
107+
98108
async fn publish_completion(
99109
&self,
100110
block_info: BlockInfo,
@@ -151,6 +161,7 @@ impl DRepCallback for SnapshotHandler {
151161
info!("Received {} DReps", dreps.len());
152162
self.dreps.extend(dreps);
153163
// TODO: Publish DRep data.
164+
154165
Ok(())
155166
}
156167
}
@@ -252,14 +263,16 @@ impl SnapshotBootstrapper {
252263
let parser = StreamingSnapshotParser::new(file_path);
253264
let mut callbacks = SnapshotHandler::new(context.clone(), completion_topic.to_string());
254265

255-
info!("Starting snapshot parsing from: {}", file_path);
266+
info!("Starting snapshot parsing and publishing from: {}", file_path);
256267
let start = Instant::now();
257268

269+
callbacks.publish_start().await?;
270+
258271
// Parse the snapshot with our callback handler
259272
parser.parse(&mut callbacks)?;
260273

261274
let duration = start.elapsed();
262-
info!("✓ Parse completed successfully in {:.2?}", duration);
275+
info!("✓ Parse and publish completed successfully in {:.2?}", duration);
263276

264277
// Build the final state from accumulated data
265278
let block_info = callbacks.build_block_info()?;

0 commit comments

Comments
 (0)