From f18822118cfd506b420871ca9c702c5caad846d4 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 6 May 2025 20:29:14 -0700 Subject: [PATCH 1/8] Add cancelled func to RPC to check if cancelled --- comm.go | 8 +++++ gossipsub.go | 74 +++++++++++++++++++++++++++++++----------- gossipsub_spam_test.go | 4 +-- pubsub.go | 3 +- 4 files changed, 67 insertions(+), 22 deletions(-) diff --git a/comm.go b/comm.go index d38cce08..2081a828 100644 --- a/comm.go +++ b/comm.go @@ -169,6 +169,11 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, ou return err } + if rpc.cancelled != nil && rpc.cancelled() { + // This rpc has been cancelled, so we don't need to send it + return nil + } + _, err = s.Write(buf) return err } @@ -229,5 +234,8 @@ func copyRPC(rpc *RPC) *RPC { res.Control = new(pb.ControlMessage) *res.Control = *rpc.Control } + if rpc.cancelled != nil { + res.cancelled = rpc.cancelled + } return res } diff --git a/gossipsub.go b/gossipsub.go index ecd4edaa..b0c1e9e5 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -7,6 +7,7 @@ import ( "io" "math/rand" "sort" + "sync" "time" pb "github.com/libp2p/go-libp2p-pubsub/pb" @@ -265,7 +266,7 @@ func DefaultGossipSubRouter(h host.Host) *GossipSubRouter { backoff: make(map[string]map[peer.ID]time.Time), peerhave: make(map[peer.ID]int), peerdontwant: make(map[peer.ID]int), - unwanted: make(map[peer.ID]map[checksum]int), + unwanted: newUnwantedState(), iasked: make(map[peer.ID]int), outbound: make(map[peer.ID]bool), connect: make(chan connectInfo, params.MaxPendingConnections), @@ -471,7 +472,7 @@ type GossipSubRouter struct { control map[peer.ID]*pb.ControlMessage // pending control messages peerhave map[peer.ID]int // number of IHAVEs received from peer in the last heartbeat peerdontwant map[peer.ID]int // number of IDONTWANTs received from peer in the last heartbeat - unwanted map[peer.ID]map[checksum]int // TTL of the message ids peers don't want + unwanted *unwantedState // TTL of the message ids peers don't want iasked map[peer.ID]int // number of messages we have asked from peer in the last heartbeat outbound map[peer.ID]bool // connection direction cache, marks peers with outbound connections backoff map[string]map[peer.ID]time.Time // prune backoff @@ -522,6 +523,48 @@ type GossipSubRouter struct { heartbeatTicks uint64 } +type unwantedState struct { + sync.RWMutex + m map[peer.ID]map[checksum]int // TTL of the message ids peers don't want +} + +func newUnwantedState() *unwantedState { + return &unwantedState{ + m: make(map[peer.ID]map[checksum]int), + } +} + +func (u *unwantedState) add(peer peer.ID, id checksum, ttl int) { + u.Lock() + defer u.Unlock() + if u.m[peer] == nil { + u.m[peer] = make(map[checksum]int) + } + u.m[peer][id] = ttl +} + +func (u *unwantedState) gc() { + u.Lock() + defer u.Unlock() + + // decrement TTLs of all the IDONTWANTs and delete it from the cache when it reaches zero + for _, mids := range u.m { + for mid := range mids { + mids[mid]-- + if mids[mid] == 0 { + delete(mids, mid) + } + } + } +} + +func (u *unwantedState) has(peer peer.ID, id checksum) bool { + u.RLock() + defer u.RUnlock() + _, ok := u.m[peer][id] + return ok +} + type connectInfo struct { p peer.ID spr *record.Envelope @@ -844,7 +887,7 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb. for _, iwant := range ctl.GetIwant() { for _, mid := range iwant.GetMessageIDs() { // Check if that peer has sent IDONTWANT before, if so don't send them the message - if _, ok := gs.unwanted[p][computeChecksum(mid)]; ok { + if gs.unwanted.has(p, computeChecksum(mid)) { continue } @@ -1013,10 +1056,6 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { } func (gs *GossipSubRouter) handleIDontWant(p peer.ID, ctl *pb.ControlMessage) { - if gs.unwanted[p] == nil { - gs.unwanted[p] = make(map[checksum]int) - } - // IDONTWANT flood protection if gs.peerdontwant[p] >= gs.params.MaxIDontWantMessages { log.Debugf("IDONWANT: peer %s has advertised too many times (%d) within this heartbeat interval; ignoring", p, gs.peerdontwant[p]) @@ -1036,7 +1075,7 @@ mainIDWLoop: } totalUnwantedIds++ - gs.unwanted[p][computeChecksum(mid)] = gs.params.IDontWantMessageTTL + gs.unwanted.add(p, computeChecksum(mid), gs.params.IDontWantMessageTTL) } } } @@ -1204,7 +1243,7 @@ func (gs *GossipSubRouter) Publish(msg *Message) { for p := range gmap { // Check if it has already received an IDONTWANT for the message. // If so, don't send it to the peer - if _, ok := gs.unwanted[p][csum]; ok { + if gs.unwanted.has(p, csum) { continue } tosend[p] = struct{}{} @@ -1217,6 +1256,11 @@ func (gs *GossipSubRouter) Publish(msg *Message) { continue } + csum := computeChecksum(gs.p.idGen.ID(msg)) + cancelled := func() bool { + return gs.unwanted.has(pid, csum) + } + out.cancelled = cancelled gs.sendRPC(pid, out, false) } } @@ -1355,6 +1399,7 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, urgent bool) { gs.doDropRPC(out, p, fmt.Sprintf("Dropping oversized RPC. Size: %d, limit: %d. (Over by %d bytes)", rpc.Size(), gs.p.maxMessageSize, rpc.Size()-gs.p.maxMessageSize)) continue } + rpc.cancelled = out.cancelled gs.doSendRPC(rpc, p, q, urgent) } } @@ -1824,16 +1869,7 @@ func (gs *GossipSubRouter) clearIDontWantCounters() { // throw away the old map and make a new one gs.peerdontwant = make(map[peer.ID]int) } - - // decrement TTLs of all the IDONTWANTs and delete it from the cache when it reaches zero - for _, mids := range gs.unwanted { - for mid := range mids { - mids[mid]-- - if mids[mid] == 0 { - delete(mids, mid) - } - } - } + gs.unwanted.gc() } func (gs *GossipSubRouter) applyIwantPenalties() { diff --git a/gossipsub_spam_test.go b/gossipsub_spam_test.go index 9f6f0f94..11bafd48 100644 --- a/gossipsub_spam_test.go +++ b/gossipsub_spam_test.go @@ -929,12 +929,12 @@ func TestGossipsubHandleIDontwantSpam(t *testing.T) { t.Errorf("Wanted message count of %d but received %d", 1, grt.peerdontwant[rPid]) } mid := fmt.Sprintf("idontwant-%d", GossipSubMaxIDontWantLength-1) - if _, ok := grt.unwanted[rPid][computeChecksum(mid)]; !ok { + if !grt.unwanted.has(rPid, computeChecksum(mid)) { t.Errorf("Desired message id was not stored in the unwanted map: %s", mid) } mid = fmt.Sprintf("idontwant-%d", GossipSubMaxIDontWantLength) - if _, ok := grt.unwanted[rPid][computeChecksum(mid)]; ok { + if grt.unwanted.has(rPid, computeChecksum(mid)) { t.Errorf("Unwanted message id was stored in the unwanted map: %s", mid) } } diff --git a/pubsub.go b/pubsub.go index 5c27c3e9..3b3a66e7 100644 --- a/pubsub.go +++ b/pubsub.go @@ -245,7 +245,8 @@ type RPC struct { pb.RPC // unexported on purpose, not sending this over the wire - from peer.ID + from peer.ID + cancelled func() bool } type Option func(*PubSub) error From 01238031ef7dd20289732e3475b56c7e92b45866 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 7 May 2025 13:35:20 -0700 Subject: [PATCH 2/8] wip refactor splitting rpcs --- gossipsub_test.go | 37 ++++++++++ pubsub.go | 185 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 219 insertions(+), 3 deletions(-) diff --git a/gossipsub_test.go b/gossipsub_test.go index abb347fd..ca61c846 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -2348,6 +2348,15 @@ func validRPCSizes(slice []*RPC, limit int) bool { return true } +func validRPCToPublishSizes(slice []RPCToPublish, limit int) bool { + for _, rpc := range slice { + if rpc.toPB().Size() > limit { + return false + } + } + return true +} + func TestFragmentRPCFunction(t *testing.T) { fragmentRPC := func(rpc *RPC, limit int) ([]*RPC, error) { rpcs := appendOrMergeRPC(nil, limit, *rpc) @@ -2556,6 +2565,34 @@ func FuzzAppendOrMergeRPC(f *testing.F) { }) } +func FuzzRPCToPublish(f *testing.F) { + minMaxMsgSize := 100 + maxMaxMsgSize := 2048 + f.Fuzz(func(t *testing.T, data []byte) { + maxSize := int(generateU16(&data)) % maxMaxMsgSize + if maxSize < minMaxMsgSize { + maxSize = minMaxMsgSize + } + rpc := generateRPC(data, maxSize) + publishMsgs := make([]*Message, len(rpc.Publish)) + for i, msg := range rpc.Publish { + publishMsgs[i] = &Message{ + Message: msg, + } + } + rpcToPublish := &RPCToPublish{ + publish: publishMsgs, + subs: rpc.Subscriptions, + control: rpc.Control, + } + rpcs := rpcToPublish.split(maxSize) + + if !validRPCToPublishSizes(rpcs, maxSize) { + t.Fatalf("invalid RPC size") + } + }) +} + func TestGossipsubManagesAnAddressBook(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pubsub.go b/pubsub.go index 3b3a66e7..c27fb88c 100644 --- a/pubsub.go +++ b/pubsub.go @@ -5,7 +5,10 @@ import ( "encoding/binary" "errors" "fmt" + "iter" + math_bits "math/bits" "math/rand" + "slices" "sync" "sync/atomic" "time" @@ -243,10 +246,186 @@ func (m *Message) GetFrom() peer.ID { type RPC struct { pb.RPC - // unexported on purpose, not sending this over the wire - from peer.ID - cancelled func() bool + from peer.ID +} + +type RPCToPublish struct { + publish []*Message + subs []*pb.RPC_SubOpts + control *pb.ControlMessage + + from peer.ID + unwanted func(msg *Message) bool +} + +// split splits the RPC into multiple RPCs if the total size exceeds maxRPCSize. +// It may still return a single RPC that is larger than maxRPCSize in case it +// can't split the RPC up further. Caller should take care of handling oversized +// RPCs appropriately. +func (r *RPCToPublish) split(maxRPCSize int) []RPCToPublish { + // Fast path: if the RPC is smaller than maxRPCSize, return it as is. + if r.Size() <= maxRPCSize { + return []RPCToPublish{*r} + + } + + out := make([]RPCToPublish, 0, 1) + currentRPC := RPCToPublish{} + + // Split control messages. This is trickier than other fields because we are + // splitting one level deeper. + var ctrlSize int + for incrementalSize, mergeFn := range r.rpcControlComponents() { + nextSize := ctrlSize + incrementalSize + nextLenPrefixSize := sovRpc(uint64(nextSize)) + // +1 for the protobuf field number + if nextSize+nextLenPrefixSize+1 >= maxRPCSize && ctrlSize > 0 { + out = append(out, currentRPC) + currentRPC = RPCToPublish{} + ctrlSize = 0 + } + ctrlSize += incrementalSize + if currentRPC.control == nil { + currentRPC.control = &pb.ControlMessage{} + } + mergeFn(currentRPC.control) + } + + var currentSize int + if ctrlSize > 0 { + currentSize = ctrlSize + sovRpc(uint64(ctrlSize)) + 1 + } + // Split subscriptions. + for _, rpc := range r.subs { + subSize := rpc.Size() + // +1 for the protobuf field number + incrementalSize := subSize + 1 + sovRpc(uint64(subSize)) + if currentSize+incrementalSize >= maxRPCSize && currentSize > 0 { + out = append(out, currentRPC) + currentRPC = RPCToPublish{} + currentSize = 0 + } + currentSize += incrementalSize + currentRPC.subs = append(currentRPC.subs, rpc) + } + + for _, msg := range r.publish { + msgSize := msg.Size() + // +1 for the protobuf field number + incrementalSize := msgSize + 1 + sovRpc(uint64(msgSize)) + if currentSize+incrementalSize >= maxRPCSize && currentSize > 0 { + out = append(out, currentRPC) + currentRPC = RPCToPublish{} + currentSize = 0 + } + currentSize += incrementalSize + currentRPC.publish = append(currentRPC.publish, msg) + } + + if currentSize > 0 { + out = append(out, currentRPC) + } + + // Set common fields for all RPCs + for i := range out { + out[i].from = r.from + out[i].unwanted = r.unwanted + } + return out +} + +func sovRpc(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} + +// rpcControlComponents returns an iterator over the control messages of the RPC. +// The first item in the pair is the incremental size of adding this component to the control message. +// The second item is a function that merges the component into an existing ControlMessage pb. +func (r *RPCToPublish) rpcControlComponents() iter.Seq2[int, func(*pb.ControlMessage)] { + return func(yield func(int, func(*pb.ControlMessage)) bool) { + if r.control == nil { + return + } + for _, idontwant := range r.control.Idontwant { + s := idontwant.Size() + incrementalSize := s + 1 + sovRpc(uint64(s)) + if !yield(incrementalSize, func(a *pb.ControlMessage) { + a.Idontwant = append(a.Idontwant, idontwant) + }) { + return + } + } + for _, graft := range r.control.Graft { + s := graft.Size() + incrementalSize := s + 1 + sovRpc(uint64(s)) + if !yield(incrementalSize, func(a *pb.ControlMessage) { + a.Graft = append(a.Graft, graft) + }) { + return + } + } + for _, prune := range r.control.Prune { + s := prune.Size() + incrementalSize := s + 1 + sovRpc(uint64(s)) + if !yield(incrementalSize, func(a *pb.ControlMessage) { + a.Prune = append(a.Prune, prune) + }) { + return + } + } + for _, iwant := range r.control.Iwant { + s := iwant.Size() + incrementalSize := s + 1 + sovRpc(uint64(s)) + if !yield(incrementalSize, func(a *pb.ControlMessage) { + a.Iwant = append(a.Iwant, iwant) + }) { + return + } + } + for _, ihave := range r.control.Ihave { + s := ihave.Size() + incrementalSize := s + 1 + sovRpc(uint64(s)) + if !yield(incrementalSize, func(a *pb.ControlMessage) { + a.Ihave = append(a.Ihave, ihave) + }) { + return + } + } + } +} + +// Size returns the size of the pb encoded RPC in bytes. +func (r *RPCToPublish) Size() int { + pbRPC := pb.RPC{ + Subscriptions: r.subs, + Control: r.control, + } + size := pbRPC.Size() + if len(r.publish) > 0 { + for _, msg := range r.publish { + mSize := msg.Size() + // +1 for the protobuf field number + size += mSize + 1 + sovRpc(uint64(mSize)) + } + } + return size +} + +func (r *RPCToPublish) deleteUnwanted() { + r.publish = slices.DeleteFunc(r.publish, r.unwanted) +} + +func (r *RPCToPublish) toPB() *pb.RPC { + rpc := &pb.RPC{ + Subscriptions: r.subs, + Publish: make([]*pb.Message, len(r.publish)), + Control: r.control, + } + for i, msg := range r.publish { + rpc.Publish[i] = msg.Message + } + return rpc } type Option func(*PubSub) error From 14139ba1c08aa3aabc52e29f3d1658d406c52bd2 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 8 May 2025 09:29:34 -0700 Subject: [PATCH 3/8] Add split method to RPC type --- comm.go | 10 +--- gossipsub.go | 10 +--- gossipsub_test.go | 21 ++----- pubsub.go | 149 +++++++++++++++++++++++++++------------------- 4 files changed, 97 insertions(+), 93 deletions(-) diff --git a/comm.go b/comm.go index 2081a828..52095aa9 100644 --- a/comm.go +++ b/comm.go @@ -158,6 +158,7 @@ func (p *PubSub) handlePeerDead(s network.Stream) { func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing *rpcQueue) { writeRpc := func(rpc *RPC) error { + rpc = rpc.filterUnwanted(s.Conn().RemotePeer()) size := uint64(rpc.Size()) buf := pool.Get(varint.UvarintSize(size) + int(size)) @@ -169,11 +170,6 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, ou return err } - if rpc.cancelled != nil && rpc.cancelled() { - // This rpc has been cancelled, so we don't need to send it - return nil - } - _, err = s.Write(buf) return err } @@ -227,6 +223,7 @@ func rpcWithControl(msgs []*pb.Message, } } +// copyRPC shallow copies a RPC message. func copyRPC(rpc *RPC) *RPC { res := new(RPC) *res = *rpc @@ -234,8 +231,5 @@ func copyRPC(rpc *RPC) *RPC { res.Control = new(pb.ControlMessage) *res.Control = *rpc.Control } - if rpc.cancelled != nil { - res.cancelled = rpc.cancelled - } return res } diff --git a/gossipsub.go b/gossipsub.go index b0c1e9e5..09113661 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -1256,11 +1256,6 @@ func (gs *GossipSubRouter) Publish(msg *Message) { continue } - csum := computeChecksum(gs.p.idGen.ID(msg)) - cancelled := func() bool { - return gs.unwanted.has(pid, csum) - } - out.cancelled = cancelled gs.sendRPC(pid, out, false) } } @@ -1392,15 +1387,14 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, urgent bool) { } // Potentially split the RPC into multiple RPCs that are below the max message size - outRPCs := appendOrMergeRPC(nil, gs.p.maxMessageSize, *out) + outRPCs := out.split(gs.p.maxMessageSize) for _, rpc := range outRPCs { if rpc.Size() > gs.p.maxMessageSize { // This should only happen if a single message/control is above the maxMessageSize. gs.doDropRPC(out, p, fmt.Sprintf("Dropping oversized RPC. Size: %d, limit: %d. (Over by %d bytes)", rpc.Size(), gs.p.maxMessageSize, rpc.Size()-gs.p.maxMessageSize)) continue } - rpc.cancelled = out.cancelled - gs.doSendRPC(rpc, p, q, urgent) + gs.doSendRPC(&rpc, p, q, urgent) } } diff --git a/gossipsub_test.go b/gossipsub_test.go index ca61c846..5913b7d9 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -2348,9 +2348,9 @@ func validRPCSizes(slice []*RPC, limit int) bool { return true } -func validRPCToPublishSizes(slice []RPCToPublish, limit int) bool { +func validRPCSizesStructSlice(slice []RPC, limit int) bool { for _, rpc := range slice { - if rpc.toPB().Size() > limit { + if rpc.Size() > limit { return false } } @@ -2565,7 +2565,7 @@ func FuzzAppendOrMergeRPC(f *testing.F) { }) } -func FuzzRPCToPublish(f *testing.F) { +func FuzzRPCSplit(f *testing.F) { minMaxMsgSize := 100 maxMaxMsgSize := 2048 f.Fuzz(func(t *testing.T, data []byte) { @@ -2574,20 +2574,9 @@ func FuzzRPCToPublish(f *testing.F) { maxSize = minMaxMsgSize } rpc := generateRPC(data, maxSize) - publishMsgs := make([]*Message, len(rpc.Publish)) - for i, msg := range rpc.Publish { - publishMsgs[i] = &Message{ - Message: msg, - } - } - rpcToPublish := &RPCToPublish{ - publish: publishMsgs, - subs: rpc.Subscriptions, - control: rpc.Control, - } - rpcs := rpcToPublish.split(maxSize) + rpcs := rpc.split(maxSize) - if !validRPCToPublishSizes(rpcs, maxSize) { + if !validRPCSizesStructSlice(rpcs, maxSize) { t.Fatalf("invalid RPC size") } }) diff --git a/pubsub.go b/pubsub.go index c27fb88c..e547e083 100644 --- a/pubsub.go +++ b/pubsub.go @@ -247,31 +247,75 @@ func (m *Message) GetFrom() peer.ID { type RPC struct { pb.RPC // unexported on purpose, not sending this over the wire - from peer.ID -} - -type RPCToPublish struct { - publish []*Message - subs []*pb.RPC_SubOpts - control *pb.ControlMessage - - from peer.ID - unwanted func(msg *Message) bool + from peer.ID + messageChecksums []checksum + unwanted *unwantedState +} + +func (r *RPC) filterUnwanted(to peer.ID) *RPC { + // First check if there are any unwanted messages. + // If all messages are wanted, we can return the original RPC. + anyUnwanted := false + for i := range r.Publish { + csum := r.messageChecksums[i] + if r.unwanted.has(to, csum) { + anyUnwanted = true + break + } + } + if !anyUnwanted { + return r + } + + // There are some unwanted messages, so we need to filter them out. + // We need to copy the RPC as other senders could be using the same RPC. + filtered := copyRPC(r) + filtered.Publish = slices.Clone(r.Publish) + // The filtered RPC will not need these checksums since it will not be + // filitered again. If we did need them, we'd have to close the slice and + // synchronize the deletes below + filtered.messageChecksums = nil + + // Remove unwanted messages from the publish list. + i := slices.IndexFunc(r.messageChecksums, func(csum checksum) bool { + return r.unwanted.has(to, csum) + }) + if i == -1 { + return r + } + for j := i + 1; j < len(filtered.Publish); j++ { + if !r.unwanted.has(to, r.messageChecksums[j]) { + filtered.Publish[i] = filtered.Publish[j] + i++ + } + } + clear(filtered.Publish[i:]) + filtered.Publish = filtered.Publish[:i] + return filtered } // split splits the RPC into multiple RPCs if the total size exceeds maxRPCSize. // It may still return a single RPC that is larger than maxRPCSize in case it // can't split the RPC up further. Caller should take care of handling oversized // RPCs appropriately. -func (r *RPCToPublish) split(maxRPCSize int) []RPCToPublish { +// +// A note for maintainers: +// The details of this are tied to Protobuf encoding. It is recommended to +// familiarize yourself with the following resource: https://protobuf.dev/programming-guides/encoding/ +// +// Also note that the +1 byte for the protobuf field number + wire type assumes the field +// number is <= 15. If the field number is larger, it will use more than one byte. The formula is: +// byteLengthOfVarint(varintEncode(fieldNumber << 3 | wireType)) or sovRpc(fieldNumber << 3 | wireType) +// +// Make sure to run the Fuzz test after any changes. It's very good at detecting issues. +func (r *RPC) split(maxRPCSize int) []RPC { // Fast path: if the RPC is smaller than maxRPCSize, return it as is. if r.Size() <= maxRPCSize { - return []RPCToPublish{*r} - + return []RPC{*r} } - out := make([]RPCToPublish, 0, 1) - currentRPC := RPCToPublish{} + out := make([]RPC, 0, 1) + currentRPC := RPC{} // Split control messages. This is trickier than other fields because we are // splitting one level deeper. @@ -279,17 +323,17 @@ func (r *RPCToPublish) split(maxRPCSize int) []RPCToPublish { for incrementalSize, mergeFn := range r.rpcControlComponents() { nextSize := ctrlSize + incrementalSize nextLenPrefixSize := sovRpc(uint64(nextSize)) - // +1 for the protobuf field number + // +1 for the protobuf field number + wire type if nextSize+nextLenPrefixSize+1 >= maxRPCSize && ctrlSize > 0 { out = append(out, currentRPC) - currentRPC = RPCToPublish{} + currentRPC = RPC{} ctrlSize = 0 } ctrlSize += incrementalSize - if currentRPC.control == nil { - currentRPC.control = &pb.ControlMessage{} + if currentRPC.Control == nil { + currentRPC.Control = &pb.ControlMessage{} } - mergeFn(currentRPC.control) + mergeFn(currentRPC.Control) } var currentSize int @@ -297,30 +341,30 @@ func (r *RPCToPublish) split(maxRPCSize int) []RPCToPublish { currentSize = ctrlSize + sovRpc(uint64(ctrlSize)) + 1 } // Split subscriptions. - for _, rpc := range r.subs { + for _, rpc := range r.Subscriptions { subSize := rpc.Size() - // +1 for the protobuf field number + // +1 for the protobuf field number + wire type incrementalSize := subSize + 1 + sovRpc(uint64(subSize)) if currentSize+incrementalSize >= maxRPCSize && currentSize > 0 { out = append(out, currentRPC) - currentRPC = RPCToPublish{} + currentRPC = RPC{} currentSize = 0 } currentSize += incrementalSize - currentRPC.subs = append(currentRPC.subs, rpc) + currentRPC.Subscriptions = append(currentRPC.Subscriptions, rpc) } - for _, msg := range r.publish { + for _, msg := range r.Publish { msgSize := msg.Size() - // +1 for the protobuf field number + // +1 for the protobuf field number + wire type incrementalSize := msgSize + 1 + sovRpc(uint64(msgSize)) if currentSize+incrementalSize >= maxRPCSize && currentSize > 0 { out = append(out, currentRPC) - currentRPC = RPCToPublish{} + currentRPC = RPC{} currentSize = 0 } currentSize += incrementalSize - currentRPC.publish = append(currentRPC.publish, msg) + currentRPC.Publish = append(currentRPC.Publish, msg) } if currentSize > 0 { @@ -330,7 +374,6 @@ func (r *RPCToPublish) split(maxRPCSize int) []RPCToPublish { // Set common fields for all RPCs for i := range out { out[i].from = r.from - out[i].unwanted = r.unwanted } return out } @@ -342,12 +385,14 @@ func sovRpc(x uint64) (n int) { // rpcControlComponents returns an iterator over the control messages of the RPC. // The first item in the pair is the incremental size of adding this component to the control message. // The second item is a function that merges the component into an existing ControlMessage pb. -func (r *RPCToPublish) rpcControlComponents() iter.Seq2[int, func(*pb.ControlMessage)] { +// +// Read the comment in RPC.split() before modifying this function. +func (r *RPC) rpcControlComponents() iter.Seq2[int, func(*pb.ControlMessage)] { return func(yield func(int, func(*pb.ControlMessage)) bool) { - if r.control == nil { + if r.Control == nil { return } - for _, idontwant := range r.control.Idontwant { + for _, idontwant := range r.Control.Idontwant { s := idontwant.Size() incrementalSize := s + 1 + sovRpc(uint64(s)) if !yield(incrementalSize, func(a *pb.ControlMessage) { @@ -356,7 +401,7 @@ func (r *RPCToPublish) rpcControlComponents() iter.Seq2[int, func(*pb.ControlMes return } } - for _, graft := range r.control.Graft { + for _, graft := range r.Control.Graft { s := graft.Size() incrementalSize := s + 1 + sovRpc(uint64(s)) if !yield(incrementalSize, func(a *pb.ControlMessage) { @@ -365,7 +410,7 @@ func (r *RPCToPublish) rpcControlComponents() iter.Seq2[int, func(*pb.ControlMes return } } - for _, prune := range r.control.Prune { + for _, prune := range r.Control.Prune { s := prune.Size() incrementalSize := s + 1 + sovRpc(uint64(s)) if !yield(incrementalSize, func(a *pb.ControlMessage) { @@ -374,7 +419,7 @@ func (r *RPCToPublish) rpcControlComponents() iter.Seq2[int, func(*pb.ControlMes return } } - for _, iwant := range r.control.Iwant { + for _, iwant := range r.Control.Iwant { s := iwant.Size() incrementalSize := s + 1 + sovRpc(uint64(s)) if !yield(incrementalSize, func(a *pb.ControlMessage) { @@ -383,7 +428,7 @@ func (r *RPCToPublish) rpcControlComponents() iter.Seq2[int, func(*pb.ControlMes return } } - for _, ihave := range r.control.Ihave { + for _, ihave := range r.Control.Ihave { s := ihave.Size() incrementalSize := s + 1 + sovRpc(uint64(s)) if !yield(incrementalSize, func(a *pb.ControlMessage) { @@ -396,38 +441,20 @@ func (r *RPCToPublish) rpcControlComponents() iter.Seq2[int, func(*pb.ControlMes } // Size returns the size of the pb encoded RPC in bytes. -func (r *RPCToPublish) Size() int { +func (r *RPC) Size() int { pbRPC := pb.RPC{ - Subscriptions: r.subs, - Control: r.control, + Subscriptions: r.Subscriptions, + Control: r.Control, } size := pbRPC.Size() - if len(r.publish) > 0 { - for _, msg := range r.publish { - mSize := msg.Size() - // +1 for the protobuf field number - size += mSize + 1 + sovRpc(uint64(mSize)) - } + for _, msg := range r.Publish { + mSize := msg.Size() + // +1 for the protobuf field number + wire type + size += mSize + 1 + sovRpc(uint64(mSize)) + 1 } return size } -func (r *RPCToPublish) deleteUnwanted() { - r.publish = slices.DeleteFunc(r.publish, r.unwanted) -} - -func (r *RPCToPublish) toPB() *pb.RPC { - rpc := &pb.RPC{ - Subscriptions: r.subs, - Publish: make([]*pb.Message, len(r.publish)), - Control: r.control, - } - for i, msg := range r.publish { - rpc.Publish[i] = msg.Message - } - return rpc -} - type Option func(*PubSub) error // NewPubSub returns a new PubSub management object. From eceaa0dba67856bafcc00b68e8f15cdef7755704 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 8 May 2025 10:03:36 -0700 Subject: [PATCH 4/8] Handle case where we don't have messageChecksums and can't filter --- pubsub.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pubsub.go b/pubsub.go index e547e083..eed67a9a 100644 --- a/pubsub.go +++ b/pubsub.go @@ -253,6 +253,9 @@ type RPC struct { } func (r *RPC) filterUnwanted(to peer.ID) *RPC { + if len(r.Publish) != len(r.messageChecksums) { + return r + } // First check if there are any unwanted messages. // If all messages are wanted, we can return the original RPC. anyUnwanted := false From f10d31fbe260c9fa4958e82369a769deb1ccb409 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 8 May 2025 10:03:59 -0700 Subject: [PATCH 5/8] Add Benchmark. Results show split is twice as fast --- gossipsub.go | 14 +++++++------- gossipsub_test.go | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index 09113661..6b524c10 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -1453,7 +1453,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { // old behavior. In the future let's not merge messages. Since, // it may increase message latency. for _, msg := range elem.GetPublish() { - if lastRPC.Publish = append(lastRPC.Publish, msg); lastRPC.Size() > limit { + if lastRPC.Publish = append(lastRPC.Publish, msg); lastRPC.RPC.Size() > limit { lastRPC.Publish = lastRPC.Publish[:len(lastRPC.Publish)-1] lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from} lastRPC.Publish = append(lastRPC.Publish, msg) @@ -1463,7 +1463,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { // Merge/Append Subscriptions for _, sub := range elem.GetSubscriptions() { - if lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub); lastRPC.Size() > limit { + if lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub); lastRPC.RPC.Size() > limit { lastRPC.Subscriptions = lastRPC.Subscriptions[:len(lastRPC.Subscriptions)-1] lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from} lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub) @@ -1475,7 +1475,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { if ctl := elem.GetControl(); ctl != nil { if lastRPC.Control == nil { lastRPC.Control = &pb.ControlMessage{} - if lastRPC.Size() > limit { + if lastRPC.RPC.Size() > limit { lastRPC.Control = nil lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} out = append(out, lastRPC) @@ -1483,7 +1483,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { } for _, graft := range ctl.GetGraft() { - if lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft); lastRPC.Size() > limit { + if lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft); lastRPC.RPC.Size() > limit { lastRPC.Control.Graft = lastRPC.Control.Graft[:len(lastRPC.Control.Graft)-1] lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft) @@ -1492,7 +1492,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { } for _, prune := range ctl.GetPrune() { - if lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune); lastRPC.Size() > limit { + if lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune); lastRPC.RPC.Size() > limit { lastRPC.Control.Prune = lastRPC.Control.Prune[:len(lastRPC.Control.Prune)-1] lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune) @@ -1506,7 +1506,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { // For IWANTs we don't need more than a single one, // since there are no topic IDs here. newIWant := &pb.ControlIWant{} - if lastRPC.Control.Iwant = append(lastRPC.Control.Iwant, newIWant); lastRPC.Size() > limit { + if lastRPC.Control.Iwant = append(lastRPC.Control.Iwant, newIWant); lastRPC.RPC.Size() > limit { lastRPC.Control.Iwant = lastRPC.Control.Iwant[:len(lastRPC.Control.Iwant)-1] lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ Iwant: []*pb.ControlIWant{newIWant}, @@ -1515,7 +1515,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { } } for _, msgID := range iwant.GetMessageIDs() { - if lastRPC.Control.Iwant[0].MessageIDs = append(lastRPC.Control.Iwant[0].MessageIDs, msgID); lastRPC.Size() > limit { + if lastRPC.Control.Iwant[0].MessageIDs = append(lastRPC.Control.Iwant[0].MessageIDs, msgID); lastRPC.RPC.Size() > limit { lastRPC.Control.Iwant[0].MessageIDs = lastRPC.Control.Iwant[0].MessageIDs[:len(lastRPC.Control.Iwant[0].MessageIDs)-1] lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ Iwant: []*pb.ControlIWant{{MessageIDs: []string{msgID}}}, diff --git a/gossipsub_test.go b/gossipsub_test.go index 5913b7d9..da5cd80e 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -8,6 +8,7 @@ import ( "fmt" "io" mrand "math/rand" + mrand2 "math/rand/v2" "sort" "sync" "sync/atomic" @@ -2582,6 +2583,42 @@ func FuzzRPCSplit(f *testing.F) { }) } +func genNRpcs(tb testing.TB, n int, maxSize int) []*RPC { + r := mrand2.NewChaCha8([32]byte{}) + rpcs := make([]*RPC, n) + for i := range rpcs { + var data [64]byte + _, err := r.Read(data[:]) + if err != nil { + tb.Fatal(err) + } + rpcs[i] = generateRPC(data[:], maxSize) + } + return rpcs +} + +func BenchmarkSplitRPC(b *testing.B) { + maxSize := 2048 + rpcs := genNRpcs(b, 100, maxSize) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + rpc := rpcs[i%len(rpcs)] + rpc.split(maxSize) + } +} + +func BenchmarkAppendOrMergeRPC(b *testing.B) { + maxSize := 2048 + rpcs := genNRpcs(b, 100, maxSize) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + rpc := rpcs[i%len(rpcs)] + appendOrMergeRPC(nil, maxSize, *rpc) + } +} + func TestGossipsubManagesAnAddressBook(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 827cc32687e0cc96790cf0f6204f16d98e4a3819 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 8 May 2025 10:12:27 -0700 Subject: [PATCH 6/8] Set the unwanted and message checksum fields in the RPC on gossipsub publish --- comm.go | 7 +++++++ gossipsub.go | 6 +++--- pubsub.go | 2 ++ 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/comm.go b/comm.go index 52095aa9..88be758c 100644 --- a/comm.go +++ b/comm.go @@ -199,6 +199,13 @@ func rpcWithSubs(subs ...*pb.RPC_SubOpts) *RPC { } } +func rpcWithMessagesAndChecksums(msgs []*pb.Message, checksums []checksum) *RPC { + return &RPC{ + RPC: pb.RPC{Publish: msgs}, + messageChecksums: checksums, + } +} + func rpcWithMessages(msgs ...*pb.Message) *RPC { return &RPC{RPC: pb.RPC{Publish: msgs}} } diff --git a/gossipsub.go b/gossipsub.go index 6b524c10..ee289b97 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -1195,6 +1195,7 @@ func (gs *GossipSubRouter) Publish(msg *Message) { if !ok { return } + messageChecksum := computeChecksum(gs.p.idGen.ID(msg)) if gs.floodPublish && from == gs.p.host.ID() { for p := range tmap { @@ -1239,18 +1240,17 @@ func (gs *GossipSubRouter) Publish(msg *Message) { gs.lastpub[topic] = time.Now().UnixNano() } - csum := computeChecksum(gs.p.idGen.ID(msg)) for p := range gmap { // Check if it has already received an IDONTWANT for the message. // If so, don't send it to the peer - if gs.unwanted.has(p, csum) { + if gs.unwanted.has(p, messageChecksum) { continue } tosend[p] = struct{}{} } } - out := rpcWithMessages(msg.Message) + out := rpcWithMessagesAndChecksums([]*pb.Message{msg.Message}, []checksum{messageChecksum}) for pid := range tosend { if pid == from || pid == peer.ID(msg.GetFrom()) { continue diff --git a/pubsub.go b/pubsub.go index eed67a9a..0962127c 100644 --- a/pubsub.go +++ b/pubsub.go @@ -377,6 +377,8 @@ func (r *RPC) split(maxRPCSize int) []RPC { // Set common fields for all RPCs for i := range out { out[i].from = r.from + out[i].unwanted = r.unwanted + out[i].messageChecksums = r.messageChecksums } return out } From 1f1aa3b9e364f4e624f883bfc431a6376fafc749 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 8 May 2025 14:51:06 -0700 Subject: [PATCH 7/8] Add test for filtering unwanted messages right before sending --- comm.go | 7 +- gossipsub.go | 2 +- gossipsub_test.go | 164 ++++++++++++++++++++++++++++++++++++++++++++++ pubsub.go | 19 +----- 4 files changed, 173 insertions(+), 19 deletions(-) diff --git a/comm.go b/comm.go index 88be758c..fd7cc8f6 100644 --- a/comm.go +++ b/comm.go @@ -160,6 +160,10 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, ou writeRpc := func(rpc *RPC) error { rpc = rpc.filterUnwanted(s.Conn().RemotePeer()) size := uint64(rpc.Size()) + if size == 0 { + // Nothing to do, the peer cancelled all our messages + return nil + } buf := pool.Get(varint.UvarintSize(size) + int(size)) defer pool.Put(buf) @@ -199,10 +203,11 @@ func rpcWithSubs(subs ...*pb.RPC_SubOpts) *RPC { } } -func rpcWithMessagesAndChecksums(msgs []*pb.Message, checksums []checksum) *RPC { +func rpcWithMessagesAndChecksums(msgs []*pb.Message, checksums []checksum, unwanted *unwantedState) *RPC { return &RPC{ RPC: pb.RPC{Publish: msgs}, messageChecksums: checksums, + unwanted: unwanted, } } diff --git a/gossipsub.go b/gossipsub.go index ee289b97..f6394585 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -1250,7 +1250,7 @@ func (gs *GossipSubRouter) Publish(msg *Message) { } } - out := rpcWithMessagesAndChecksums([]*pb.Message{msg.Message}, []checksum{messageChecksum}) + out := rpcWithMessagesAndChecksums([]*pb.Message{msg.Message}, []checksum{messageChecksum}, gs.unwanted) for pid := range tosend { if pid == from || pid == peer.ID(msg.GetFrom()) { continue diff --git a/gossipsub_test.go b/gossipsub_test.go index da5cd80e..0e19d32d 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -5,11 +5,13 @@ import ( "context" crand "crypto/rand" "encoding/base64" + "encoding/binary" "fmt" "io" mrand "math/rand" mrand2 "math/rand/v2" "sort" + "strconv" "sync" "sync/atomic" "testing" @@ -3469,3 +3471,165 @@ func BenchmarkAllocDoDropRPC(b *testing.B) { gs.doDropRPC(&RPC{}, "peerID", "reason") } } + +type blockableHost struct { + host.Host + streams []blockableStream +} + +func (bh *blockableHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (network.Stream, error) { + s, err := bh.Host.NewStream(ctx, p, pids...) + if err != nil { + return nil, err + } + + bh.streams = append(bh.streams, blockableStream{Stream: s}) + return &bh.streams[len(bh.streams)-1], nil +} + +func (bh *blockableHost) BlockAll() { + for i := range bh.streams { + bh.streams[i].Block() + } +} + +func (bh *blockableHost) UnblockAll() { + for i := range bh.streams { + bh.streams[i].Unblock() + } +} + +type blockableStream struct { + network.Stream + blocked sync.Mutex +} + +func (bs *blockableStream) Block() { + bs.blocked.Lock() +} + +func (bs *blockableStream) Unblock() { + bs.blocked.Unlock() +} + +func (bs *blockableStream) Write(p []byte) (int, error) { + bs.blocked.Lock() + defer bs.blocked.Unlock() + return bs.Stream.Write(p) +} + +func TestGossipsubIDONTWANTCancelsQueuedRPC(t *testing.T) { + msgCount := 3 + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getDefaultHosts(t, 2) + denseConnect(t, hosts) + + psubs := make([]*PubSub, 2) + + publisherHost := &blockableHost{Host: hosts[0]} + messageIDFn := func(msg *pb.Message) string { + return strconv.FormatUint(binary.BigEndian.Uint64(msg.Data), 10) + } + psubs[0] = getGossipsub(ctx, publisherHost, WithMessageIdFn(messageIDFn)) + msgIDsReceived := make(chan string, msgCount) + psubs[1] = getGossipsub(ctx, hosts[1], WithMessageIdFn(messageIDFn), WithRawTracer(&mockRawTracer{ + onRecvRPC: func(rpc *RPC) { + if len(rpc.GetPublish()) > 0 { + for _, msg := range rpc.GetPublish() { + msgIDsReceived <- messageIDFn(msg) + } + } + }, + })) + + topicString := "foobar" + var topics []*Topic + for _, ps := range psubs { + topic, err := ps.Join(topicString) + if err != nil { + t.Fatal(err) + } + topics = append(topics, topic) + + _, err = ps.Subscribe(topicString, WithBufferSize(msgCount+1)) + if err != nil { + t.Fatal(err) + } + } + + time.Sleep(2 * time.Second) + + publisherHost.BlockAll() + // Have the publisher queue up a bunch of mesages. The actual sending will + // be blocked on the call to stream.Write + for i := range msgCount { + msg := make([]byte, GossipSubIDontWantMessageThreshold+1) + binary.BigEndian.AppendUint64(msg[:0], uint64(i)) + err := topics[0].Publish(ctx, msg) + if err != nil { + t.Fatal(err) + } + } + + // Now have the receiver cancel these messages by sending IDONTWANTs + idontwantIDs := make([]string, msgCount) + for i := range idontwantIDs { + idontwantIDs[i] = strconv.Itoa(i) + } + idontwantRPC := &RPC{ + RPC: pb.RPC{ + Control: &pb.ControlMessage{ + Idontwant: []*pb.ControlIDontWant{&pb.ControlIDontWant{ + MessageIDs: idontwantIDs, + }}, + }, + }, + } + q := psubs[1].peers[publisherHost.ID()] + psubs[1].rt.(*GossipSubRouter).doSendRPC(idontwantRPC, publisherHost.ID(), q, true) + + // Wait for the RPCs to send + time.Sleep(time.Second) + + // Unblock writes + publisherHost.UnblockAll() + + // Have the publisher send one more message. We expect this one to make it + msg := make([]byte, GossipSubIDontWantMessageThreshold+1) + binary.BigEndian.AppendUint64(msg[:0], uint64(msgCount+1)) + err := topics[0].Publish(ctx, msg) + if err != nil { + t.Fatal(err) + } + + // We should get the last message +outerExpectMsg: + for { + select { + case msgID := <-msgIDsReceived: + if msgID == "0" { + // one early message that got sent before we could cancel it + continue + } + if msgID != strconv.FormatUint(uint64(msgCount)+1, 10) { + t.Fatal("received unexpected message: ", msgID) + } + break outerExpectMsg + case <-time.After(5 * time.Second): + t.Fatal("Should have received the last message") + } + } + + // We should not get any more messages +outer: + for { + select { + case <-msgIDsReceived: + t.Fatal("Should not have received a publish as the node sent IDONTWANT") + case <-time.After(5 * time.Second): + break outer + } + } +} diff --git a/pubsub.go b/pubsub.go index 0962127c..76115f5b 100644 --- a/pubsub.go +++ b/pubsub.go @@ -253,7 +253,7 @@ type RPC struct { } func (r *RPC) filterUnwanted(to peer.ID) *RPC { - if len(r.Publish) != len(r.messageChecksums) { + if len(r.Publish) != len(r.messageChecksums) || r.unwanted == nil { return r } // First check if there are any unwanted messages. @@ -341,7 +341,7 @@ func (r *RPC) split(maxRPCSize int) []RPC { var currentSize int if ctrlSize > 0 { - currentSize = ctrlSize + sovRpc(uint64(ctrlSize)) + 1 + currentSize = ctrlSize + 1 + sovRpc(uint64(ctrlSize)) } // Split subscriptions. for _, rpc := range r.Subscriptions { @@ -445,21 +445,6 @@ func (r *RPC) rpcControlComponents() iter.Seq2[int, func(*pb.ControlMessage)] { } } -// Size returns the size of the pb encoded RPC in bytes. -func (r *RPC) Size() int { - pbRPC := pb.RPC{ - Subscriptions: r.Subscriptions, - Control: r.Control, - } - size := pbRPC.Size() - for _, msg := range r.Publish { - mSize := msg.Size() - // +1 for the protobuf field number + wire type - size += mSize + 1 + sovRpc(uint64(mSize)) + 1 - } - return size -} - type Option func(*PubSub) error // NewPubSub returns a new PubSub management object. From 6942037c6ccca3722cac7c04592d65df7e9f6d9d Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 8 May 2025 21:18:09 -0700 Subject: [PATCH 8/8] Call doSendRPC from eventloop in test --- gossipsub_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/gossipsub_test.go b/gossipsub_test.go index 0e19d32d..ba0cc848 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -3588,7 +3588,11 @@ func TestGossipsubIDONTWANTCancelsQueuedRPC(t *testing.T) { }, } q := psubs[1].peers[publisherHost.ID()] - psubs[1].rt.(*GossipSubRouter).doSendRPC(idontwantRPC, publisherHost.ID(), q, true) + + // Call this via the eval func to run it in the event loop + psubs[1].eval <- func() { + psubs[1].rt.(*GossipSubRouter).doSendRPC(idontwantRPC, publisherHost.ID(), q, true) + } // Wait for the RPCs to send time.Sleep(time.Second)