Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,12 @@ func (p *PubSub) handlePeerDead(s network.Stream) {

func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing *rpcQueue) {
writeRpc := func(rpc *RPC) error {
rpc = rpc.filterUnwanted(s.Conn().RemotePeer())
size := uint64(rpc.Size())
if size == 0 {
// Nothing to do, the peer cancelled all our messages
return nil
}

buf := pool.Get(varint.UvarintSize(size) + int(size))
defer pool.Put(buf)
Expand Down Expand Up @@ -198,6 +203,14 @@ func rpcWithSubs(subs ...*pb.RPC_SubOpts) *RPC {
}
}

func rpcWithMessagesAndChecksums(msgs []*pb.Message, checksums []checksum, unwanted *unwantedState) *RPC {
return &RPC{
RPC: pb.RPC{Publish: msgs},
messageChecksums: checksums,
unwanted: unwanted,
}
}

func rpcWithMessages(msgs ...*pb.Message) *RPC {
return &RPC{RPC: pb.RPC{Publish: msgs}}
}
Expand All @@ -222,6 +235,7 @@ func rpcWithControl(msgs []*pb.Message,
}
}

// copyRPC shallow copies a RPC message.
func copyRPC(rpc *RPC) *RPC {
res := new(RPC)
*res = *rpc
Expand Down
90 changes: 60 additions & 30 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"math/rand"
"sort"
"sync"
"time"

pb "github.com/libp2p/go-libp2p-pubsub/pb"
Expand Down Expand Up @@ -265,7 +266,7 @@ func DefaultGossipSubRouter(h host.Host) *GossipSubRouter {
backoff: make(map[string]map[peer.ID]time.Time),
peerhave: make(map[peer.ID]int),
peerdontwant: make(map[peer.ID]int),
unwanted: make(map[peer.ID]map[checksum]int),
unwanted: newUnwantedState(),
iasked: make(map[peer.ID]int),
outbound: make(map[peer.ID]bool),
connect: make(chan connectInfo, params.MaxPendingConnections),
Expand Down Expand Up @@ -471,7 +472,7 @@ type GossipSubRouter struct {
control map[peer.ID]*pb.ControlMessage // pending control messages
peerhave map[peer.ID]int // number of IHAVEs received from peer in the last heartbeat
peerdontwant map[peer.ID]int // number of IDONTWANTs received from peer in the last heartbeat
unwanted map[peer.ID]map[checksum]int // TTL of the message ids peers don't want
unwanted *unwantedState // TTL of the message ids peers don't want
iasked map[peer.ID]int // number of messages we have asked from peer in the last heartbeat
outbound map[peer.ID]bool // connection direction cache, marks peers with outbound connections
backoff map[string]map[peer.ID]time.Time // prune backoff
Expand Down Expand Up @@ -522,6 +523,48 @@ type GossipSubRouter struct {
heartbeatTicks uint64
}

type unwantedState struct {
sync.RWMutex
m map[peer.ID]map[checksum]int // TTL of the message ids peers don't want
}

func newUnwantedState() *unwantedState {
return &unwantedState{
m: make(map[peer.ID]map[checksum]int),
}
}

func (u *unwantedState) add(peer peer.ID, id checksum, ttl int) {
u.Lock()
defer u.Unlock()
if u.m[peer] == nil {
u.m[peer] = make(map[checksum]int)
}
u.m[peer][id] = ttl
}

func (u *unwantedState) gc() {
u.Lock()
defer u.Unlock()

// decrement TTLs of all the IDONTWANTs and delete it from the cache when it reaches zero
for _, mids := range u.m {
for mid := range mids {
mids[mid]--
if mids[mid] == 0 {
delete(mids, mid)
}
}
}
}

func (u *unwantedState) has(peer peer.ID, id checksum) bool {
u.RLock()
defer u.RUnlock()
_, ok := u.m[peer][id]
return ok
}

type connectInfo struct {
p peer.ID
spr *record.Envelope
Expand Down Expand Up @@ -844,7 +887,7 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.
for _, iwant := range ctl.GetIwant() {
for _, mid := range iwant.GetMessageIDs() {
// Check if that peer has sent IDONTWANT before, if so don't send them the message
if _, ok := gs.unwanted[p][computeChecksum(mid)]; ok {
if gs.unwanted.has(p, computeChecksum(mid)) {
continue
}

Expand Down Expand Up @@ -1013,10 +1056,6 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
}

func (gs *GossipSubRouter) handleIDontWant(p peer.ID, ctl *pb.ControlMessage) {
if gs.unwanted[p] == nil {
gs.unwanted[p] = make(map[checksum]int)
}

// IDONTWANT flood protection
if gs.peerdontwant[p] >= gs.params.MaxIDontWantMessages {
log.Debugf("IDONWANT: peer %s has advertised too many times (%d) within this heartbeat interval; ignoring", p, gs.peerdontwant[p])
Expand All @@ -1036,7 +1075,7 @@ mainIDWLoop:
}

totalUnwantedIds++
gs.unwanted[p][computeChecksum(mid)] = gs.params.IDontWantMessageTTL
gs.unwanted.add(p, computeChecksum(mid), gs.params.IDontWantMessageTTL)
}
}
}
Expand Down Expand Up @@ -1156,6 +1195,7 @@ func (gs *GossipSubRouter) Publish(msg *Message) {
if !ok {
return
}
messageChecksum := computeChecksum(gs.p.idGen.ID(msg))

if gs.floodPublish && from == gs.p.host.ID() {
for p := range tmap {
Expand Down Expand Up @@ -1200,18 +1240,17 @@ func (gs *GossipSubRouter) Publish(msg *Message) {
gs.lastpub[topic] = time.Now().UnixNano()
}

csum := computeChecksum(gs.p.idGen.ID(msg))
for p := range gmap {
// Check if it has already received an IDONTWANT for the message.
// If so, don't send it to the peer
if _, ok := gs.unwanted[p][csum]; ok {
if gs.unwanted.has(p, messageChecksum) {
continue
}
tosend[p] = struct{}{}
}
}

out := rpcWithMessages(msg.Message)
out := rpcWithMessagesAndChecksums([]*pb.Message{msg.Message}, []checksum{messageChecksum}, gs.unwanted)
for pid := range tosend {
if pid == from || pid == peer.ID(msg.GetFrom()) {
continue
Expand Down Expand Up @@ -1348,14 +1387,14 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC, urgent bool) {
}

// Potentially split the RPC into multiple RPCs that are below the max message size
outRPCs := appendOrMergeRPC(nil, gs.p.maxMessageSize, *out)
outRPCs := out.split(gs.p.maxMessageSize)
for _, rpc := range outRPCs {
if rpc.Size() > gs.p.maxMessageSize {
// This should only happen if a single message/control is above the maxMessageSize.
gs.doDropRPC(out, p, fmt.Sprintf("Dropping oversized RPC. Size: %d, limit: %d. (Over by %d bytes)", rpc.Size(), gs.p.maxMessageSize, rpc.Size()-gs.p.maxMessageSize))
continue
}
gs.doSendRPC(rpc, p, q, urgent)
gs.doSendRPC(&rpc, p, q, urgent)
}
}

Expand Down Expand Up @@ -1414,7 +1453,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
// old behavior. In the future let's not merge messages. Since,
// it may increase message latency.
for _, msg := range elem.GetPublish() {
if lastRPC.Publish = append(lastRPC.Publish, msg); lastRPC.Size() > limit {
if lastRPC.Publish = append(lastRPC.Publish, msg); lastRPC.RPC.Size() > limit {
lastRPC.Publish = lastRPC.Publish[:len(lastRPC.Publish)-1]
lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from}
lastRPC.Publish = append(lastRPC.Publish, msg)
Expand All @@ -1424,7 +1463,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {

// Merge/Append Subscriptions
for _, sub := range elem.GetSubscriptions() {
if lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub); lastRPC.Size() > limit {
if lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub); lastRPC.RPC.Size() > limit {
lastRPC.Subscriptions = lastRPC.Subscriptions[:len(lastRPC.Subscriptions)-1]
lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from}
lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub)
Expand All @@ -1436,15 +1475,15 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
if ctl := elem.GetControl(); ctl != nil {
if lastRPC.Control == nil {
lastRPC.Control = &pb.ControlMessage{}
if lastRPC.Size() > limit {
if lastRPC.RPC.Size() > limit {
lastRPC.Control = nil
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
out = append(out, lastRPC)
}
}

for _, graft := range ctl.GetGraft() {
if lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft); lastRPC.Size() > limit {
if lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft); lastRPC.RPC.Size() > limit {
lastRPC.Control.Graft = lastRPC.Control.Graft[:len(lastRPC.Control.Graft)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft)
Expand All @@ -1453,7 +1492,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
}

for _, prune := range ctl.GetPrune() {
if lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune); lastRPC.Size() > limit {
if lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune); lastRPC.RPC.Size() > limit {
lastRPC.Control.Prune = lastRPC.Control.Prune[:len(lastRPC.Control.Prune)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune)
Expand All @@ -1467,7 +1506,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
// For IWANTs we don't need more than a single one,
// since there are no topic IDs here.
newIWant := &pb.ControlIWant{}
if lastRPC.Control.Iwant = append(lastRPC.Control.Iwant, newIWant); lastRPC.Size() > limit {
if lastRPC.Control.Iwant = append(lastRPC.Control.Iwant, newIWant); lastRPC.RPC.Size() > limit {
lastRPC.Control.Iwant = lastRPC.Control.Iwant[:len(lastRPC.Control.Iwant)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
Iwant: []*pb.ControlIWant{newIWant},
Expand All @@ -1476,7 +1515,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
}
}
for _, msgID := range iwant.GetMessageIDs() {
if lastRPC.Control.Iwant[0].MessageIDs = append(lastRPC.Control.Iwant[0].MessageIDs, msgID); lastRPC.Size() > limit {
if lastRPC.Control.Iwant[0].MessageIDs = append(lastRPC.Control.Iwant[0].MessageIDs, msgID); lastRPC.RPC.Size() > limit {
lastRPC.Control.Iwant[0].MessageIDs = lastRPC.Control.Iwant[0].MessageIDs[:len(lastRPC.Control.Iwant[0].MessageIDs)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
Iwant: []*pb.ControlIWant{{MessageIDs: []string{msgID}}},
Expand Down Expand Up @@ -1824,16 +1863,7 @@ func (gs *GossipSubRouter) clearIDontWantCounters() {
// throw away the old map and make a new one
gs.peerdontwant = make(map[peer.ID]int)
}

// decrement TTLs of all the IDONTWANTs and delete it from the cache when it reaches zero
for _, mids := range gs.unwanted {
for mid := range mids {
mids[mid]--
if mids[mid] == 0 {
delete(mids, mid)
}
}
}
gs.unwanted.gc()
}

func (gs *GossipSubRouter) applyIwantPenalties() {
Expand Down
4 changes: 2 additions & 2 deletions gossipsub_spam_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,12 +929,12 @@ func TestGossipsubHandleIDontwantSpam(t *testing.T) {
t.Errorf("Wanted message count of %d but received %d", 1, grt.peerdontwant[rPid])
}
mid := fmt.Sprintf("idontwant-%d", GossipSubMaxIDontWantLength-1)
if _, ok := grt.unwanted[rPid][computeChecksum(mid)]; !ok {
if !grt.unwanted.has(rPid, computeChecksum(mid)) {
t.Errorf("Desired message id was not stored in the unwanted map: %s", mid)
}

mid = fmt.Sprintf("idontwant-%d", GossipSubMaxIDontWantLength)
if _, ok := grt.unwanted[rPid][computeChecksum(mid)]; ok {
if grt.unwanted.has(rPid, computeChecksum(mid)) {
t.Errorf("Unwanted message id was stored in the unwanted map: %s", mid)
}
}
Expand Down
Loading