Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions libp2p/pubsub/gossipsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,9 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
raise NoPubsubAttached
if peer_id not in self.pubsub.peers:
continue
stream = self.pubsub.peers[peer_id]

# TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages.
await self.pubsub.write_msg(stream, rpc_msg)
await self.send_rpc(to_peer=peer_id, rpc=rpc_msg, urgent=False)

for topic in pubsub_msg.topicIDs:
self.time_since_last_publish[topic] = int(time.time())
Expand Down Expand Up @@ -853,10 +852,9 @@ async def handle_iwant(
sender_peer_id,
)
return
peer_stream = self.pubsub.peers[sender_peer_id]

# 4) And write the packet to the stream
await self.pubsub.write_msg(peer_stream, packet)
await self.send_rpc(to_peer=sender_peer_id, rpc=packet, urgent=False)

async def handle_graft(
self, graft_msg: rpc_pb2.ControlGraft, sender_peer_id: ID
Expand Down Expand Up @@ -1004,13 +1002,39 @@ async def emit_control_message(

packet.control.CopyFrom(control_msg)

# Get stream for peer from pubsub
if to_peer not in self.pubsub.peers:
logger.debug(
"Fail to emit control message to %s: peer record not exist", to_peer
)
await self.send_rpc(to_peer, packet, False)

# Urgent will be true in case of IDONTWANT message
async def send_rpc(self, to_peer: ID, rpc: rpc_pb2.RPC, urgent: bool) -> None:
# TODO: Piggyback message retries

msg_bytes = rpc.SerializeToString()
msg_size = len(msg_bytes)
if self.pubsub:
max_message_size = self.pubsub.maxMessageSize
if msg_size < max_message_size:
await self.do_send_rpc(rpc, to_peer, urgent)
return
peer_stream = self.pubsub.peers[to_peer]
else:
if self.pubsub:
rpc_list = self.pubsub.split_rpc(pb_rpc=rpc, limit=max_message_size)
for rpc in rpc_list:
if rpc.ByteSize() > max_message_size:
self.drop_rpc(rpc)
continue
await self.do_send_rpc(rpc, to_peer, urgent)

async def do_send_rpc(self, rpc: rpc_pb2.RPC, to_peer: ID, urgent: bool) -> None:
if self.pubsub:
peer_queue = self.pubsub.peer_queue[to_peer]
try:
if urgent:
await peer_queue.urgent_push(rpc=rpc, block=False)
else:
await peer_queue.push(rpc=rpc, block=False)
except Exception as e:
logger.error(f"Failed to enqueue RPC to peer {to_peer}: {e}")
self.drop_rpc(rpc)

# Write rpc to stream
await self.pubsub.write_msg(peer_stream, packet)
def drop_rpc(self, rpc: rpc_pb2.RPC) -> None:
pass
Loading
Loading