Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions examples/floodsub/basic_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/usr/bin/env python3
"""
Basic FloodSub Example

This is a simple example that demonstrates FloodSub publishing and subscribing
without relying on test utilities. It shows the core functionality.

Run this example with:
python examples/floodsub/basic_example.py
"""

import asyncio
import logging
import sys

import trio

from libp2p import new_host
from libp2p.crypto.secp256k1 import create_new_key_pair
from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.pubsub import Pubsub
from libp2p.tools.async_service import background_trio_service
from libp2p.tools.constants import FLOODSUB_PROTOCOL_ID

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("floodsub_basic")


async def main() -> None:
"""Main function demonstrating basic FloodSub functionality."""
logger.info("Starting basic FloodSub example...")

# Create two hosts
key_pair1 = create_new_key_pair()
key_pair2 = create_new_key_pair()

host1 = new_host(
key_pair=key_pair1,
listen_addrs=["/ip4/127.0.0.1/tcp/0"],
)

host2 = new_host(
key_pair=key_pair2,
listen_addrs=["/ip4/127.0.0.1/tcp/0"],
)

# Create FloodSub routers
floodsub1 = FloodSub(protocols=[FLOODSUB_PROTOCOL_ID])
floodsub2 = FloodSub(protocols=[FLOODSUB_PROTOCOL_ID])

# Create Pubsub instances
pubsub1 = Pubsub(
host=host1,
router=floodsub1,
strict_signing=False, # Disable for simplicity
)

pubsub2 = Pubsub(
host=host2,
router=floodsub2,
strict_signing=False, # Disable for simplicity
)

# Start both pubsub services
async with background_trio_service(pubsub1):
async with background_trio_service(pubsub2):
await pubsub1.wait_until_ready()
await pubsub2.wait_until_ready()

logger.info(f"Host 1 ID: {host1.get_id()}")
logger.info(f"Host 2 ID: {host2.get_id()}")

# Start listening on both hosts
logger.info("Starting hosts...")
await host1.get_network().listen()
await host2.get_network().listen()
await trio.sleep(0.5) # Wait for hosts to start listening

# Connect the hosts
logger.info("Connecting hosts...")
await host1.connect(host2.get_id(), host2.get_addrs())
await trio.sleep(1) # Wait for connection

# Subscribe to topic on host2
topic = "test-topic"
logger.info(f"Subscribing to topic: {topic}")
subscription = await pubsub2.subscribe(topic)
await trio.sleep(0.5) # Wait for subscription to propagate

# Publish messages from host1
messages = [
"Hello from FloodSub!",
"This is message number 2",
"FloodSub is working great!"
]

for i, message in enumerate(messages):
logger.info(f"Publishing message {i+1}: {message}")
await pubsub1.publish(topic, message.encode())
await trio.sleep(0.5)

# Receive messages on host2
logger.info("Receiving messages...")
for i in range(len(messages)):
message = await subscription.get()
logger.info(f"Received message {i+1}: {message.data.decode()}")
logger.info(f" From peer: {message.from_id.hex()}")
logger.info(f" Topics: {message.topicIDs}")

logger.info("Basic FloodSub example completed successfully!")


if __name__ == "__main__":
try:
trio.run(main)
except KeyboardInterrupt:
logger.info("Example interrupted by user")
sys.exit(0)
except Exception as e:
logger.error(f"Example failed: {e}")
sys.exit(1)
76 changes: 74 additions & 2 deletions libp2p/kad_dht/kad_dht.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,20 @@ async def handle_stream(self, stream: INetStream) -> None:
closest_peers = self.routing_table.find_local_closest_peers(
target_key, 20
)

# Fallback to connected peers if routing table has insufficient peers
MIN_PEERS_THRESHOLD = 5 # Configurable minimum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constants should be placed at top.

if len(closest_peers) < MIN_PEERS_THRESHOLD:
logger.debug("Routing table has insufficient peers (%d < %d) for FIND_NODE in KadDHT, using connected peers as fallback",
len(closest_peers), MIN_PEERS_THRESHOLD)
connected_peers = self.host.get_connected_peers()
if connected_peers:
# Sort connected peers by distance to target and use as response
from .utils import sort_peer_ids_by_distance
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import statements should be placed at top.

fallback_peers = sort_peer_ids_by_distance(target_key, connected_peers)[:20]
closest_peers = fallback_peers
logger.debug("Using %d connected peers as fallback for FIND_NODE in KadDHT", len(closest_peers))

logger.debug(f"Found {len(closest_peers)} peers close to target")

# Consume the source signed_peer_record if sent
Expand Down Expand Up @@ -459,6 +473,20 @@ async def handle_stream(self, stream: INetStream) -> None:
closest_peers = self.routing_table.find_local_closest_peers(
key, 20
)

# Fallback to connected peers if routing table has insufficient peers
MIN_PEERS_THRESHOLD = 5 # Configurable minimum
if len(closest_peers) < MIN_PEERS_THRESHOLD:
logger.debug("Routing table has insufficient peers (%d < %d) for provider response, using connected peers as fallback",
len(closest_peers), MIN_PEERS_THRESHOLD)
connected_peers = self.host.get_connected_peers()
if connected_peers:
# Sort connected peers by distance to target and use as response
from .utils import sort_peer_ids_by_distance
fallback_peers = sort_peer_ids_by_distance(key, connected_peers)[:20]
closest_peers = fallback_peers
logger.debug("Using %d connected peers as fallback for provider response", len(closest_peers))

logger.debug(
f"No providers found, including {len(closest_peers)}"
"closest peers"
Expand Down Expand Up @@ -550,6 +578,20 @@ async def handle_stream(self, stream: INetStream) -> None:
closest_peers = self.routing_table.find_local_closest_peers(
key, 20
)

# Fallback to connected peers if routing table has insufficient peers
MIN_PEERS_THRESHOLD = 5 # Configurable minimum
if len(closest_peers) < MIN_PEERS_THRESHOLD:
logger.debug("Routing table has insufficient peers (%d < %d) for GET_VALUE response, using connected peers as fallback",
len(closest_peers), MIN_PEERS_THRESHOLD)
connected_peers = self.host.get_connected_peers()
if connected_peers:
# Sort connected peers by distance to target and use as response
from .utils import sort_peer_ids_by_distance
fallback_peers = sort_peer_ids_by_distance(key, connected_peers)[:20]
closest_peers = fallback_peers
logger.debug("Using %d connected peers as fallback for GET_VALUE response", len(closest_peers))

logger.debug(
"No value found,"
f"including {len(closest_peers)} closest peers"
Expand Down Expand Up @@ -677,9 +719,24 @@ async def put_value(self, key: bytes, value: bytes) -> None:
)

# 2. Get closest peers, excluding self
routing_table_peers = self.routing_table.find_local_closest_peers(key)

# Fallback to connected peers if routing table has insufficient peers
MIN_PEERS_THRESHOLD = 5 # Configurable minimum
if len(routing_table_peers) < MIN_PEERS_THRESHOLD:
logger.debug("Routing table has insufficient peers (%d < %d) for put_value, using connected peers as fallback",
len(routing_table_peers), MIN_PEERS_THRESHOLD)
connected_peers = self.host.get_connected_peers()
if connected_peers:
# Sort connected peers by distance to target and use as fallback
from .utils import sort_peer_ids_by_distance
fallback_peers = sort_peer_ids_by_distance(key, connected_peers)
routing_table_peers = fallback_peers
logger.debug("Using %d connected peers as fallback for put_value", len(routing_table_peers))

closest_peers = [
peer
for peer in self.routing_table.find_local_closest_peers(key)
for peer in routing_table_peers
if peer != self.local_peer_id
]
logger.debug(f"Found {len(closest_peers)} peers to store value at")
Expand Down Expand Up @@ -722,9 +779,24 @@ async def get_value(self, key: bytes) -> bytes | None:
return value

# 2. Get closest peers, excluding self
routing_table_peers = self.routing_table.find_local_closest_peers(key)

# Fallback to connected peers if routing table has insufficient peers
MIN_PEERS_THRESHOLD = 5 # Configurable minimum
if len(routing_table_peers) < MIN_PEERS_THRESHOLD:
logger.debug("Routing table has insufficient peers (%d < %d) for get_value, using connected peers as fallback",
len(routing_table_peers), MIN_PEERS_THRESHOLD)
connected_peers = self.host.get_connected_peers()
if connected_peers:
# Sort connected peers by distance to target and use as fallback
from .utils import sort_peer_ids_by_distance
fallback_peers = sort_peer_ids_by_distance(key, connected_peers)
routing_table_peers = fallback_peers
logger.debug("Using %d connected peers as fallback for get_value", len(routing_table_peers))

closest_peers = [
peer
for peer in self.routing_table.find_local_closest_peers(key)
for peer in routing_table_peers
if peer != self.local_peer_id
]
logger.debug(f"Searching {len(closest_peers)} peers for value")
Expand Down
25 changes: 25 additions & 0 deletions libp2p/kad_dht/peer_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,19 @@ async def find_closest_peers_network(
# Start with closest peers from our routing table
closest_peers = self.routing_table.find_local_closest_peers(target_key, count)
logger.debug("Local closest peers: %d found", len(closest_peers))

# Fallback to connected peers if routing table has insufficient peers
MIN_PEERS_THRESHOLD = 5 # Configurable minimum
if len(closest_peers) < MIN_PEERS_THRESHOLD:
logger.debug("Routing table has insufficient peers (%d < %d), using connected peers as fallback",
len(closest_peers), MIN_PEERS_THRESHOLD)
connected_peers = self.host.get_connected_peers()
if connected_peers:
# Sort connected peers by distance to target and use as initial query targets
fallback_peers = sort_peer_ids_by_distance(target_key, connected_peers)[:count]
closest_peers = fallback_peers
logger.debug("Using %d connected peers as fallback", len(closest_peers))

queried_peers: set[ID] = set()
rounds = 0

Expand Down Expand Up @@ -387,6 +400,18 @@ async def _handle_kad_stream(self, stream: INetStream) -> None:
closest_peers = self.routing_table.find_local_closest_peers(
target_key, 20
)

# Fallback to connected peers if routing table has insufficient peers
MIN_PEERS_THRESHOLD = 5 # Configurable minimum
if len(closest_peers) < MIN_PEERS_THRESHOLD:
logger.debug("Routing table has insufficient peers (%d < %d) for FIND_NODE response, using connected peers as fallback",
len(closest_peers), MIN_PEERS_THRESHOLD)
connected_peers = self.host.get_connected_peers()
if connected_peers:
# Sort connected peers by distance to target and use as response
fallback_peers = sort_peer_ids_by_distance(target_key, connected_peers)[:20]
closest_peers = fallback_peers
logger.debug("Using %d connected peers as fallback for FIND_NODE response", len(closest_peers))

# Create protobuf response
response = Message()
Expand Down
65 changes: 47 additions & 18 deletions libp2p/pubsub/gossipsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ class GossipSub(IPubsubRouter, Service):
prune_back_off: int
unsubscribe_back_off: int

flood_publish: bool

def __init__(
self,
protocols: Sequence[TProtocol],
Expand All @@ -116,6 +118,7 @@ def __init__(
px_peers_count: int = 16,
prune_back_off: int = 60,
unsubscribe_back_off: int = 10,
flood_publish: bool = False,
) -> None:
self.protocols = list(protocols)
self.pubsub = None
Expand Down Expand Up @@ -156,6 +159,8 @@ def __init__(
self.prune_back_off = prune_back_off
self.unsubscribe_back_off = unsubscribe_back_off

self.flood_publish = flood_publish

async def run(self) -> None:
self.manager.run_daemon_task(self.heartbeat)
if len(self.direct_peers) > 0:
Expand Down Expand Up @@ -300,6 +305,11 @@ def _get_peers_to_send(
if topic not in self.pubsub.peer_topics:
continue

if self.flood_publish and msg_forwarder == self.pubsub.my_id:
for peer in self.pubsub.peer_topics[topic]:
# TODO: add score threshold check when peer scoring is implemented
# if direct peer then skip score check
send_to.add(peer)
# direct peers
_direct_peers: set[ID] = {_peer for _peer in self.direct_peers}
send_to.update(_direct_peers)
Expand All @@ -318,25 +328,44 @@ def _get_peers_to_send(
if topic in self.mesh:
gossipsub_peers = self.mesh[topic]
else:
# When we publish to a topic that we have not subscribe to, we randomly
# pick `self.degree` number of peers who have subscribed to the topic
# and add them as our `fanout` peers.
topic_in_fanout: bool = topic in self.fanout
fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set()
fanout_size = len(fanout_peers)
if not topic_in_fanout or (
topic_in_fanout and fanout_size < self.degree
):
if topic in self.pubsub.peer_topics:
# Combine fanout peers with selected peers
fanout_peers.update(
self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree - fanout_size, fanout_peers
# direct peers
direct_peers: set[ID] = {_peer for _peer in self.direct_peers}
send_to.update(direct_peers)

# floodsub peers
floodsub_peers: set[ID] = {
peer_id
for peer_id in self.pubsub.peer_topics[topic]
if self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID
}
send_to.update(floodsub_peers)

# gossipsub peers
gossipsub_peers: set[ID] = set()
if topic in self.mesh:
gossipsub_peers = self.mesh[topic]
else:
# When we publish to a topic that we have not subscribe to, we
# randomly pick `self.degree` number of peers who have subscribed
# to the topic and add them as our `fanout` peers.
topic_in_fanout: bool = topic in self.fanout
fanout_peers: set[ID] = (
self.fanout[topic] if topic_in_fanout else set()
)
fanout_size = len(fanout_peers)
if not topic_in_fanout or (
topic_in_fanout and fanout_size < self.degree
):
if topic in self.pubsub.peer_topics:
# Combine fanout peers with selected peers
fanout_peers.update(
self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree - fanout_size, fanout_peers
)
)
)
self.fanout[topic] = fanout_peers
gossipsub_peers = fanout_peers
send_to.update(gossipsub_peers)
self.fanout[topic] = fanout_peers
gossipsub_peers = fanout_peers
send_to.update(gossipsub_peers)
# Excludes `msg_forwarder` and `origin`
yield from send_to.difference([msg_forwarder, origin])

Expand Down
1 change: 1 addition & 0 deletions libp2p/tools/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class GossipsubParams(NamedTuple):
px_peers_count: int = 16
prune_back_off: int = 60
unsubscribe_back_off: int = 10
flood_publish: bool = False


GOSSIPSUB_PARAMS = GossipsubParams()
1 change: 1 addition & 0 deletions newsfragments/713.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added flood publishing.
Loading