Skip to content

Conversation

@raulk
Copy link
Member

@raulk raulk commented Oct 25, 2025

Implements a new Announce() API that allows advertising messages via IHAVE without pushing them to the mesh. These messages are retained in the mcache until, at least, the specified deadline.

We send the IHAVE immediately to all connected topic subscribers, whether in mesh, in gossip, in a cached fanout, or none of these active pubsub states.

This enables pull-based message distribution, useful for scenarios outside of the app's critical path, such as backup availability. Subscribers can pull messages on-demand via IWANT requests.

Details

  • Add Topic.Announce() method that sends IHAVE gossip to topic subscribers with expiry-based message retention
  • Refactor MessageCache to support dual storage model:
    • Sliding window for regular published messages (current)
    • Time wheel for announced messages with TTL-based expiry
  • Add GossipSubAnnouncementMaxTTL parameter (default 60s) for sizing announcement storage
  • Rename MessageCache methods for clarity (Put→AppendWindow, Shift→ShiftWindow, GetGossipIDs→GossipForTopic). Add missing godocs.
  • Implement unified message storage with reference counting to handle messages in both window and announcement wheel
  • Add heartbeat cleanup for expired announcements via PruneAnns()
  • Add comprehensive test coverage for announcement functionality including storage, delivery, expiry, duplicates, and edge cases

Open points

  • Figure out the threat model. Any scoring implications? Spamming possible?
  • Decide whether we need Announce handlers, so that the receiver can decide if they want to react by pulling the message or not (e.g. in the case of Ethereum, a node knows whether it has received the correct column at a particular slot, and can ignore any further announcements during that slot window)

Implements a new Announce API that allows advertising messages via IHAVE
without pushing them to the mesh. These messages are retained in the
mcache until, at least, the specified deadline.

We send the IHAVE immediately to all connected topic subscribers,
whether in mesh, in gossip, in a cached fanout, or none of these active
pubsub states.

This enables pull-based message distribution, useful for scenarios
outside of the app's critical path, such as backup availability.
Subscribers can pull messages on-demand via IWANT requests.

Further details:

- Add Topic.Announce() method that sends IHAVE gossip to topic
  subscribers with expiry-based message retention
- Refactor MessageCache to support dual storage model:
  - Sliding window for regular published messages
  - Time wheel for announced messages with TTL-based expiry
- Add GossipSubAnnouncementMaxTTL parameter (default 60s) for sizing
  announcement storage
- Rename MessageCache methods for clarity (Put→AppendWindow,
  Shift→ShiftWindow, GetGossipIDs→GossipForTopic). Add missing godocs.
- Implement unified message storage with reference counting to handle
  messages in both window and announcement wheel
- Add heartbeat cleanup for expired announcements via PruneAnns()
- Add comprehensive test coverage for announcement functionality
  including storage, delivery, expiry, duplicates, and edge cases
Comment on lines +172 to +174
// AnnouncementMaxTTL is the maximum possible time-to-live for a message announced
// via Announce. This is used to size internal data structures. Deadlines passed to
// Announce exceeding this value will be clamped, and a warning will be logged.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: I haven't implemented the clamping yet, just realised.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could avoid another parameter here and adjust the wheel size dynamically from the largest TTL seen.

Comment on lines +2082 to +2083
// Track announcement in message cache for IWANT retrieval
gs.mcache.TrackAnn(msg, expiry)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can just remove the msgID from the MessageCache? We seem to be calculating the message ID over and over again, which adds up unnecessary compute time. We can calculate once and memoize.

More importantly the msg ID function can be overriden per topic, so having a global msgID in the MessageCache is definitely a design flaw.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT @sukunrt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this was cached:
https://github.com/libp2p/go-libp2p-pubsub/blob/raulk/announce/midgen.go#L41

Is this not the function that's ultimately used?

I agree this code is unclear and we should rely on some conspicuously cached message id as opposed to calling this function again and again.


type messageRef struct {
*Message
refs int
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't like this refcounting, but it seemed like unified message storage was best. Although we can choose to be stricter (e.g. do not accept an announcement for a message that's sitting in the sliding window, and viceversa), in which case we can drop the refcount because a Message can only be tracked by one cache region at a time.

This has some problems though, namely dealing with a single message sent on multiple topics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be surprising to me if I did an Announce for 60s, and the message stopped being available after 3 heartbeats because the message was actually held in the sliding window cache already. Likewise if I Announce multiple times.

The refcount seems fine in order to avoid that behavior.

This has some problems though, namely dealing with a single message sent on multiple topics.

Does anyone send the same exact message (same message id) on multiple topics? It might break how a peer responds to an IWANT (as it only includes the message id. What topic should the peer send back?)

@MarcoPolo MarcoPolo self-requested a review October 27, 2025 22:09
@MarcoPolo
Copy link
Contributor

We send the IHAVE immediately to all connected topic subscribers, whether in mesh, in gossip, in a cached fanout, or none of these active pubsub states.

As a callout, this would be the first time we send an IHAVE to our mesh peers. It might be okay (at least the Go implementation would behave fine), but we could consider putting this specific behavior behind an extension.


Skimmed the diff, seems reasonable. When you move it out of draft I'll do a deeper review.

Copy link
Member

@sukunrt sukunrt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The structure seems fine to me. I'll review once again when it's ready for review.

Comment on lines +2061 to +2062
// Send IHAVE to all topic peers (excluding direct peers, applying score threshold)
// Match the filtering logic from emitGossip
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we exclude directPeers here? Alternatively, how do they discover that we have this message?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code doesn't actually exclude, so just a stale comment?

Comment on lines +2082 to +2083
// Track announcement in message cache for IWANT retrieval
gs.mcache.TrackAnn(msg, expiry)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this was cached:
https://github.com/libp2p/go-libp2p-pubsub/blob/raulk/announce/midgen.go#L41

Is this not the function that's ultimately used?

I agree this code is unclear and we should rely on some conspicuously cached message id as opposed to calling this function again and again.

Comment on lines +140 to 141
mc.tryDropMessage(entry.mid)
delete(mc.peertx, entry.mid)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does the mc.peertx counter work? we should clear it when the message is actually dropped, no?

if !mc.tryDropMessage(entry.mid) {
    delete(mc.peertx, entry.mid)
}

Comment on lines +55 to +57
if gossipLen > historyLen {
err := fmt.Errorf("invalid parameters for message cache; gossip slots (%d) cannot be larger than history slots (%d)",
gossip, history)
gossipLen, historyLen)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not introduced in this PR, so we can let this be.

Do we ever use entries from [gossipsubLen: historyLen]?

// The message will be retained for the duration of the window.
// If the message already exists in the cache, its reference count is incremented.
func (mc *MessageCache) AppendWindow(msg *Message) {
mid := mc.upsertMessage(msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we've subtly changed the definition of AppendWindow / Put

Previously a message appended multiple times would be removed from the cache at the expiry of the first put.

Now it'll be removed from the cache at the expiry of the last put.

Arguably, the previous definition is wrong. I haven't thought this through, but we should consider the impact this would have.
https://github.com/libp2p/go-libp2p-pubsub/blob/master/mcache.go#L97

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants