@@ -658,7 +658,13 @@ where
658658
659659 let topic_hash = raw_message. topic . clone ( ) ;
660660
661- let recipient_peers = self . get_publish_peers ( & topic_hash, true ) ;
661+ #[ cfg( feature = "partial_messages" ) ]
662+ let recipient_peers = self . get_publish_peers ( & topic_hash, |_, peer| {
663+ !peer. partial_only_topics . contains ( & topic_hash)
664+ } ) ;
665+
666+ #[ cfg( not( feature = "partial_messages" ) ) ]
667+ let recipient_peers = self . get_publish_peers ( & topic_hash, |_, _| true ) ;
662668
663669 // If the message isn't a duplicate and we have sent it to some peers add it to the
664670 // duplicate cache and memcache.
@@ -715,129 +721,99 @@ where
715721 Ok ( msg_id)
716722 }
717723
718- // Get Peers from the mesh or fanout to publish a message to.
719- // If `exclude_partial_only` set, filter out peers who only want partial messages for the topic .
724+ // Get Peers from the mesh or fanout to publish a message to
725+ // filtered out further by the provided `f` callback .
720726 fn get_publish_peers (
721727 & mut self ,
722728 topic_hash : & TopicHash ,
723- exclude_partial_only : bool ,
729+ f : impl Fn ( & PeerId , & PeerDetails ) -> bool ,
724730 ) -> HashSet < PeerId > {
725- let mesh_n = self . config . mesh_n_for_topic ( topic_hash) ;
726-
727731 let peers_on_topic = self
728732 . connected_peers
729733 . iter ( )
730- . filter ( |( _, peer) | {
731- #[ cfg( feature = "partial_messages" ) ]
732- {
733- if exclude_partial_only && peer. partial_only_topics . contains ( topic_hash) {
734- return false ;
735- }
736- }
737- let _ = peer;
738- true
739- } )
740- . map ( |( peer_id, _) | peer_id)
741- . peekable ( ) ;
742-
743- let mut recipient_peers = HashSet :: new ( ) ;
744- if self . config . flood_publish ( ) {
745- // Forward to all peers above score and all explicit peers
746- recipient_peers. extend ( peers_on_topic. filter ( |p| {
747- self . explicit_peers . contains ( * p)
734+ . filter ( |( _, peer) | peer. topics . contains ( topic_hash) )
735+ . filter ( |( peer_id, _) | {
736+ self . explicit_peers . contains ( * peer_id)
748737 || !self
749738 . peer_score
750- . below_threshold ( p , |ts| ts. publish_threshold )
739+ . below_threshold ( peer_id , |ts| ts. publish_threshold )
751740 . 0
752- } ) ) ;
753- } else {
754- match self . mesh . get ( topic_hash) {
755- // Mesh peers
756- Some ( mesh_peers) => {
757- // We have a mesh set. We want to make sure to publish to at least `mesh_n`
758- // peers (if possible).
759- let needed_extra_peers = mesh_n. saturating_sub ( mesh_peers. len ( ) ) ;
760-
761- if needed_extra_peers > 0 {
762- // We don't have `mesh_n` peers in our mesh, we will randomly select extras
763- // and publish to them.
764-
765- // Get a random set of peers that are appropriate to send messages too.
766- let peer_list = get_random_peers (
767- & self . connected_peers ,
768- topic_hash,
769- needed_extra_peers,
770- exclude_partial_only,
771- |peer| {
772- !mesh_peers. contains ( peer)
773- && !self . explicit_peers . contains ( peer)
774- && !self
775- . peer_score
776- . below_threshold ( peer, |ts| ts. publish_threshold )
777- . 0
778- } ,
779- ) ;
780- recipient_peers. extend ( peer_list) ;
781- }
741+ } )
742+ . filter ( |( peer_id, peer_details) | f ( peer_id, peer_details) ) ;
782743
783- recipient_peers. extend ( mesh_peers) ;
784- }
785- // Gossipsub peers
786- None => {
787- tracing:: debug!( topic=%topic_hash, "Topic not in the mesh" ) ;
788- // `fanout_peers` is always non-empty if it's `Some`.
789- let fanout_peers = self
790- . fanout
791- . get ( topic_hash)
792- . filter ( |peers| !peers. is_empty ( ) ) ;
793- // If we have fanout peers add them to the map.
794- if let Some ( peers) = fanout_peers {
795- for peer in peers {
796- recipient_peers. insert ( * peer) ;
797- }
798- } else {
799- // We have no fanout peers, select mesh_n of them and add them to the fanout
800- let new_peers = get_random_peers (
801- & self . connected_peers ,
802- topic_hash,
803- mesh_n,
804- exclude_partial_only,
805- |p| {
806- !self . explicit_peers . contains ( p)
807- && !self
808- . peer_score
809- . below_threshold ( p, |ts| ts. publish_threshold )
810- . 0
811- } ,
812- ) ;
813- // Add the new peers to the fanout and recipient peers
814- self . fanout . insert ( topic_hash. clone ( ) , new_peers. clone ( ) ) ;
815- for peer in new_peers {
816- tracing:: debug!( %peer, "Peer added to fanout" ) ;
817- recipient_peers. insert ( peer) ;
818- }
819- }
820- // We are publishing to fanout peers - update the time we published
821- self . fanout_last_pub
822- . insert ( topic_hash. clone ( ) , Instant :: now ( ) ) ;
744+ // Forward to all peers above score and all explicit peers
745+ if self . config . flood_publish ( ) {
746+ return peers_on_topic. map ( |( peer_id, _) | * peer_id) . collect ( ) ;
747+ }
748+
749+ let mesh_n = self . config . mesh_n_for_topic ( topic_hash) ;
750+ let mut recipient_peers = HashSet :: new ( ) ;
751+ // Explicit peers that are part of the topic and Floodsub peers.
752+ recipient_peers. extend (
753+ peers_on_topic
754+ . clone ( )
755+ . filter ( |( peer_id, peer) | {
756+ self . explicit_peers . contains ( peer_id) || peer. kind == PeerKind :: Floodsub
757+ } )
758+ . map ( |( peer_id, _) | * peer_id) ,
759+ ) ;
760+
761+ match self . mesh . get ( topic_hash) {
762+ // Mesh peers
763+ Some ( mesh_peers) => {
764+ // We have a mesh set. We want to make sure to publish to at least `mesh_n`
765+ // peers (if possible).
766+ let mesh_peers = peers_on_topic
767+ . clone ( )
768+ . filter_map ( |( peer_id, _) | mesh_peers. get ( peer_id) )
769+ . copied ( )
770+ . collect :: < Vec < PeerId > > ( ) ;
771+
772+ let needed_extra_peers = mesh_n. saturating_sub ( mesh_peers. len ( ) ) ;
773+ if needed_extra_peers > 0 {
774+ // We don't have `mesh_n` peers in our mesh, we will randomly select extras
775+ // and publish to them.
776+
777+ // Get a random set of peers that are appropriate to send messages too.
778+ let peer_list =
779+ get_random_peers ( peers_on_topic, topic_hash, needed_extra_peers, |_, _| {
780+ true
781+ } ) ;
782+ recipient_peers. extend ( peer_list) ;
823783 }
824- }
825784
826- // Explicit peers that are part of the topic
827- recipient_peers
828- . extend ( peers_on_topic. filter ( |peer_id| self . explicit_peers . contains ( peer_id) ) ) ;
785+ recipient_peers. extend ( mesh_peers) ;
786+ }
787+ // Gossipsub peers
788+ None => {
789+ tracing:: debug!( topic=%topic_hash, "Topic not in the mesh" ) ;
790+ let fanout_peers = peers_on_topic
791+ . clone ( )
792+ . filter_map ( |( peer_id, _) | {
793+ self . fanout
794+ . get ( topic_hash)
795+ . and_then ( |fanout| fanout. get ( peer_id) )
796+ } )
797+ . copied ( )
798+ . collect :: < Vec < PeerId > > ( ) ;
829799
830- // Floodsub peers
831- for ( peer, connections) in & self . connected_peers {
832- if connections. kind == PeerKind :: Floodsub
833- && connections. topics . contains ( topic_hash)
834- && !self
835- . peer_score
836- . below_threshold ( peer, |ts| ts. publish_threshold )
837- . 0
838- {
839- recipient_peers. insert ( * peer) ;
800+ // If we have fanout peers add them to the map.
801+ if !fanout_peers. is_empty ( ) {
802+ recipient_peers. extend ( fanout_peers) ;
803+ } else {
804+ // We have no fanout peers, select mesh_n of them and add them to the fanout
805+ let new_peers =
806+ get_random_peers ( peers_on_topic, topic_hash, mesh_n, |_, _| true ) ;
807+ // Add the new peers to the fanout and recipient peers
808+ self . fanout . insert ( topic_hash. clone ( ) , new_peers. clone ( ) ) ;
809+ for peer in new_peers {
810+ tracing:: debug!( %peer, "Peer added to fanout" ) ;
811+ recipient_peers. insert ( peer) ;
812+ }
840813 }
814+ // We are publishing to fanout peers - update the time we published
815+ self . fanout_last_pub
816+ . insert ( topic_hash. clone ( ) , Instant :: now ( ) ) ;
841817 }
842818 }
843819
@@ -854,7 +830,9 @@ where
854830
855831 let group_id = partial_message. group_id ( ) . as_ref ( ) . to_vec ( ) ;
856832
857- let recipient_peers = self . get_publish_peers ( & topic_hash, false ) ;
833+ let recipient_peers = self . get_publish_peers ( & topic_hash, |_, peer| {
834+ peer. partial_only_topics . contains ( & topic_hash)
835+ } ) ;
858836 let metadata = partial_message. parts_metadata ( ) . as_ref ( ) . to_vec ( ) ;
859837 for peer_id in recipient_peers. iter ( ) {
860838 // TODO: this can be optimized, we are going to get the peer again on `send_message`
@@ -1151,12 +1129,11 @@ where
11511129 & self . connected_peers ,
11521130 topic_hash,
11531131 mesh_n - added_peers. len ( ) ,
1154- true ,
1155- |peer| {
1156- !added_peers. contains ( peer)
1157- && !self . explicit_peers . contains ( peer)
1158- && !self . peer_score . below_threshold ( peer, |_| 0.0 ) . 0
1159- && !self . backoffs . is_backoff_with_slack ( topic_hash, peer)
1132+ |peer_id, _| {
1133+ !added_peers. contains ( peer_id)
1134+ && !self . explicit_peers . contains ( peer_id)
1135+ && !self . peer_score . below_threshold ( peer_id, |_| 0.0 ) . 0
1136+ && !self . backoffs . is_backoff_with_slack ( topic_hash, peer_id)
11601137 } ,
11611138 ) ;
11621139
@@ -1246,8 +1223,9 @@ where
12461223 & self . connected_peers ,
12471224 topic_hash,
12481225 self . config . prune_peers ( ) ,
1249- true ,
1250- |p| p != peer && !self . peer_score . below_threshold ( p, |_| 0.0 ) . 0 ,
1226+ |peer_id, _| {
1227+ peer_id != peer && !self . peer_score . below_threshold ( peer_id, |_| 0.0 ) . 0
1228+ } ,
12511229 )
12521230 . into_iter ( )
12531231 . map ( |p| PeerInfo { peer_id : Some ( p) } )
@@ -2419,12 +2397,11 @@ where
24192397 & self . connected_peers ,
24202398 topic_hash,
24212399 desired_peers,
2422- true ,
2423- |peer| {
2424- !peers. contains ( peer)
2425- && !explicit_peers. contains ( peer)
2426- && !backoffs. is_backoff_with_slack ( topic_hash, peer)
2427- && scores. get ( peer) . map ( |r| r. score ) . unwrap_or_default ( ) >= 0.0
2400+ |peer_id, _| {
2401+ !peers. contains ( peer_id)
2402+ && !explicit_peers. contains ( peer_id)
2403+ && !backoffs. is_backoff_with_slack ( topic_hash, peer_id)
2404+ && scores. get ( peer_id) . map ( |r| r. score ) . unwrap_or_default ( ) >= 0.0
24282405 } ,
24292406 ) ;
24302407 for peer in & peer_list {
@@ -2527,8 +2504,7 @@ where
25272504 & self . connected_peers ,
25282505 topic_hash,
25292506 needed,
2530- false ,
2531- |peer_id| {
2507+ |peer_id, _| {
25322508 !peers. contains ( peer_id)
25332509 && !explicit_peers. contains ( peer_id)
25342510 && !backoffs. is_backoff_with_slack ( topic_hash, peer_id)
@@ -2604,8 +2580,7 @@ where
26042580 & self . connected_peers ,
26052581 topic_hash,
26062582 self . config . opportunistic_graft_peers ( ) ,
2607- false ,
2608- |peer_id| {
2583+ |peer_id, _| {
26092584 !peers. contains ( peer_id)
26102585 && !explicit_peers. contains ( peer_id)
26112586 && !backoffs. is_backoff_with_slack ( topic_hash, peer_id)
@@ -2701,8 +2676,7 @@ where
27012676 & self . connected_peers ,
27022677 topic_hash,
27032678 needed_peers,
2704- false ,
2705- |peer_id| {
2679+ |peer_id, _| {
27062680 !peers. contains ( peer_id)
27072681 && !explicit_peers. contains ( peer_id)
27082682 && !self
@@ -2816,15 +2790,19 @@ where
28162790 )
28172791 } ;
28182792 // get gossip_lazy random peers
2819- let to_msg_peers =
2820- get_random_peers_dynamic ( & self . connected_peers , topic_hash, false , n_map, |peer| {
2821- !peers. contains ( peer)
2822- && !self . explicit_peers . contains ( peer)
2793+ let to_msg_peers = get_random_peers_dynamic (
2794+ self . connected_peers . iter ( ) ,
2795+ topic_hash,
2796+ n_map,
2797+ |peer_id, _| {
2798+ !peers. contains ( peer_id)
2799+ && !self . explicit_peers . contains ( peer_id)
28232800 && !self
28242801 . peer_score
2825- . below_threshold ( peer , |ts| ts. gossip_threshold )
2802+ . below_threshold ( peer_id , |ts| ts. gossip_threshold )
28262803 . 0
2827- } ) ;
2804+ } ,
2805+ ) ;
28282806
28292807 tracing:: debug!( "Gossiping IHAVE to {} peers" , to_msg_peers. len( ) ) ;
28302808
@@ -3787,28 +3765,17 @@ fn peer_removed_from_mesh(
37873765/// Helper function to get a subset of random gossipsub peers for a `topic_hash`
37883766/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
37893767/// that gets as input the number of filtered peers.
3790- #[ allow( unused, reason = "partial is used with partial_messages feature" ) ]
3791- fn get_random_peers_dynamic (
3792- connected_peers : & HashMap < PeerId , PeerDetails > ,
3768+ fn get_random_peers_dynamic < ' a > (
3769+ peers : impl IntoIterator < Item = ( & ' a PeerId , & ' a PeerDetails ) > ,
37933770 topic_hash : & TopicHash ,
3794- // If we want to exclude partial only peers.
3795- exclude_partial : bool ,
37963771 // maps the number of total peers to the number of selected peers
37973772 n_map : impl Fn ( usize ) -> usize ,
3798- mut f : impl FnMut ( & PeerId ) -> bool ,
3773+ f : impl Fn ( & PeerId , & PeerDetails ) -> bool ,
37993774) -> BTreeSet < PeerId > {
3800- let mut gossip_peers = connected_peers
3801- . iter ( )
3802- . filter_map ( |( peer_id, peer) | {
3803- #[ cfg( feature = "partial_messages" ) ]
3804- {
3805- if exclude_partial && peer. partial_only_topics . contains ( topic_hash) {
3806- return None ;
3807- }
3808- }
3809- Some ( ( peer_id, peer) )
3810- } )
3811- . filter ( |( peer_id, _) | f ( peer_id) )
3775+ let mut gossip_peers = peers
3776+ . into_iter ( )
3777+ . filter ( |( _, p) | p. topics . contains ( topic_hash) )
3778+ . filter ( |( peer_id, peer_details) | f ( peer_id, peer_details) )
38123779 . filter ( |( _, p) | p. kind . is_gossipsub ( ) )
38133780 . map ( |( peer_id, _) | * peer_id)
38143781 . collect :: < Vec < PeerId > > ( ) ;
@@ -3831,15 +3798,13 @@ fn get_random_peers_dynamic(
38313798
38323799/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
38333800/// filtered by the function `f`.
3834- #[ allow( unused, reason = "partial is used with partial_messages feature" ) ]
3835- fn get_random_peers (
3836- connected_peers : & HashMap < PeerId , PeerDetails > ,
3801+ fn get_random_peers < ' a > (
3802+ peers : impl IntoIterator < Item = ( & ' a PeerId , & ' a PeerDetails ) > ,
38373803 topic_hash : & TopicHash ,
38383804 n : usize ,
3839- exclude_partial : bool ,
3840- f : impl FnMut ( & PeerId ) -> bool ,
3805+ f : impl Fn ( & PeerId , & PeerDetails ) -> bool ,
38413806) -> BTreeSet < PeerId > {
3842- get_random_peers_dynamic ( connected_peers , topic_hash, exclude_partial , |_| n, f)
3807+ get_random_peers_dynamic ( peers , topic_hash, |_| n, f)
38433808}
38443809
38453810/// Validates the combination of signing, privacy and message validation to ensure the
0 commit comments