diff --git a/src/app/firedancer-dev/commands/repair.c b/src/app/firedancer-dev/commands/repair.c index b6fa38d361d..6a5210b4402 100644 --- a/src/app/firedancer-dev/commands/repair.c +++ b/src/app/firedancer-dev/commands/repair.c @@ -61,6 +61,7 @@ repair_topo( config_t * config ) { ulong net_tile_cnt = config->layout.net_tile_count; ulong shred_tile_cnt = config->layout.shred_tile_count; ulong quic_tile_cnt = config->layout.quic_tile_count; + ulong sign_tile_cnt = config->firedancer.layout.sign_tile_count; fd_topo_t * topo = { fd_topob_new( &config->topo, config->name ) }; topo->max_page_size = fd_cstr_to_shmem_page_sz( config->hugetlbfs.max_page_size ); @@ -132,12 +133,17 @@ repair_topo( config_t * config ) { /**/ fd_topob_link( topo, "gossip_net", "net_gossip", config->net.ingress_buffer_size, FD_NET_MTU, 1UL ); /**/ fd_topob_link( topo, "repair_net", "net_repair", config->net.ingress_buffer_size, FD_NET_MTU, 1UL ); - /**/ fd_topob_link( topo, "repair_sign", "repair_sign", 128UL, 2048UL, 1UL ); - FOR(shred_tile_cnt) fd_topob_link( topo, "shred_repair", "shred_repair", pending_fec_shreds_depth, FD_SHRED_REPAIR_MTU, 2UL ); + + FOR(shred_tile_cnt) fd_topob_link( topo, "shred_repair", "shred_repair", pending_fec_shreds_depth, FD_SHRED_REPAIR_MTU, 2UL /* at most 2 msgs per after_frag */ ); FOR(shred_tile_cnt) fd_topob_link( topo, "repair_shred", "shred_repair", pending_fec_shreds_depth, sizeof(fd_ed25519_sig_t), 1UL ); - /**/ fd_topob_link( topo, "sign_repair", "sign_repair", 128UL, 64UL, 1UL ); - /**/ fd_topob_link( topo, "repair_repla", "repair_repla", 65536UL, sizeof(fd_reasm_fec_t), 1UL ); + + /**/ fd_topob_link( topo, "ping_sign", "repair_sign", 128UL, 2048UL, 1UL ); + /**/ fd_topob_link( topo, "sign_ping", "sign_repair", 128UL, sizeof(fd_ed25519_sig_t), 1UL ); + FOR(sign_tile_cnt-1) fd_topob_link( topo, "repair_sign", "repair_sign", 128UL, 2048UL, 1UL ); + FOR(sign_tile_cnt-1) fd_topob_link( topo, "sign_repair", "sign_repair", 1024UL, sizeof(fd_ed25519_sig_t), 1UL ); + + /**/ fd_topob_link( topo, "repair_repla", "repair_repla", 65536UL, sizeof(fd_reasm_fec_t), 1UL ); /**/ fd_topob_link( topo, "poh_shred", "poh_shred", 16384UL, USHORT_MAX, 1UL ); /**/ fd_topob_link( topo, "send_txns", "send_txns", 128UL, FD_TXN_MTU, 1UL ); @@ -181,7 +187,7 @@ repair_topo( config_t * config ) { /* topo, tile_name, tile_wksp, metrics_wksp, cpu_idx, is_agave, uses_keyswitch */ FOR(shred_tile_cnt) fd_topob_tile( topo, "shred", "shred", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 1 ); - /**/ fd_topob_tile( topo, "sign", "sign", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 1 ); + FOR(sign_tile_cnt) fd_topob_tile( topo, "sign", "sign", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 1 ); /**/ fd_topob_tile( topo, "metric", "metric", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); /**/ fd_topob_tile( topo, "gossip", "gossip", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); fd_topo_tile_t * repair_tile = fd_topob_tile( topo, "repair", "repair", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); @@ -303,12 +309,17 @@ repair_topo( config_t * config ) { fd_topob_tile_in( topo, "repair", 0UL, "metric_in", "snap_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); FOR(shred_tile_cnt) fd_topob_tile_in( topo, "repair", 0UL, "metric_in", "shred_repair", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); - /**/ fd_topob_tile_in( topo, "sign", 0UL, "metric_in", "repair_sign", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); - /**/ fd_topob_tile_out( topo, "repair", 0UL, "repair_sign", 0UL ); - /**/ fd_topob_tile_in( topo, "repair", 0UL, "metric_in", "sign_repair", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_UNPOLLED ); + /**/ fd_topob_tile_in( topo, "sign", 0UL, "metric_in", "ping_sign", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + /**/ fd_topob_tile_out( topo, "repair", 0UL, "ping_sign", 0UL ); /**/ fd_topob_tile_out( topo, "repair", 0UL, "repair_repla", 0UL ); FOR(shred_tile_cnt) fd_topob_tile_out( topo, "repair", 0UL, "repair_shred", i ); - /**/ fd_topob_tile_out( topo, "sign", 0UL, "sign_repair", 0UL ); + /**/ fd_topob_tile_out( topo, "sign", 0UL, "sign_ping", 0UL ); + + FOR(sign_tile_cnt-1) fd_topob_tile_out( topo, "repair", 0UL, "repair_sign", i ); + FOR(sign_tile_cnt-1) fd_topob_tile_in ( topo, "sign", i+1, "metric_in", "repair_sign", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + FOR(sign_tile_cnt-1) fd_topob_tile_out( topo, "sign", i+1, "sign_repair", i ); + FOR(sign_tile_cnt-1) fd_topob_tile_in ( topo, "repair", 0UL, "metric_in", "sign_repair", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); + /**/ fd_topob_tile_in ( topo, "repair", 0UL, "metric_in", "sign_ping", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_UNPOLLED ); if( 1 ) { fd_topob_wksp( topo, "scap" ); diff --git a/src/app/firedancer/config/default.toml b/src/app/firedancer/config/default.toml index ed8f9391c63..6a2fe85e18e 100644 --- a/src/app/firedancer/config/default.toml +++ b/src/app/firedancer/config/default.toml @@ -691,6 +691,13 @@ user = "" # very high TPS rates because the cluster size will be very small. shred_tile_count = 1 + # How many sign tiles to run. Should be set >= 2. This is + # configurable and horizontally scales repair request signing. + # One tile is reserved for synchronous signing across all tiles. + # The remaining tiles distribute the workload of signing repair + # requests. + sign_tile_count = 2 + # All memory that will be used in Firedancer is pre-allocated in two # kinds of pages: huge and gigantic. Huge pages are 2 MiB and gigantic # pages are 1 GiB. This is done to prevent TLB misses which can have a diff --git a/src/app/firedancer/topology.c b/src/app/firedancer/topology.c index ffddd7ac1b8..b3a65b2d735 100644 --- a/src/app/firedancer/topology.c +++ b/src/app/firedancer/topology.c @@ -202,6 +202,7 @@ fd_topo_initialize( config_t * config ) { ulong exec_tile_cnt = config->firedancer.layout.exec_tile_count; ulong writer_tile_cnt = config->firedancer.layout.writer_tile_count; ulong resolv_tile_cnt = config->layout.resolv_tile_count; + ulong sign_tile_cnt = config->firedancer.layout.sign_tile_count; int enable_rpc = ( config->rpc.port != 0 ); @@ -249,7 +250,6 @@ fd_topo_initialize( config_t * config ) { fd_topob_wksp( topo, "root_out" ); fd_topob_wksp( topo, "repair_sign" ); - fd_topob_wksp( topo, "sign_repair" ); fd_topob_wksp( topo, "repair_repla" ); fd_topob_wksp( topo, "replay_poh" ); @@ -312,10 +312,10 @@ fd_topo_initialize( config_t * config ) { /**/ fd_topob_link( topo, "stake_out", "stake_out", 128UL, FD_STAKE_OUT_MTU, 1UL ); FOR(shred_tile_cnt) fd_topob_link( topo, "shred_sign", "shred_sign", 128UL, 32UL, 1UL ); - FOR(shred_tile_cnt) fd_topob_link( topo, "sign_shred", "sign_shred", 128UL, 64UL, 1UL ); + FOR(shred_tile_cnt) fd_topob_link( topo, "sign_shred", "sign_shred", 128UL, sizeof(fd_ed25519_sig_t), 1UL ); /**/ fd_topob_link( topo, "gossip_sign", "gossip_sign", 128UL, 2048UL, 1UL ); - /**/ fd_topob_link( topo, "sign_gossip", "sign_gossip", 128UL, 64UL, 1UL ); + /**/ fd_topob_link( topo, "sign_gossip", "sign_gossip", 128UL, sizeof(fd_ed25519_sig_t), 1UL ); // /**/ fd_topob_link( topo, "dedup_resolv", "dedup_resolv", 65536UL, FD_TPU_PARSED_MTU, 1UL ); FOR(resolv_tile_cnt) fd_topob_link( topo, "resolv_pack", "resolv_pack", 65536UL, FD_TPU_RESOLVED_MTU, 1UL ); @@ -350,20 +350,25 @@ fd_topo_initialize( config_t * config ) { /**/ fd_topob_link( topo, "send_net", "net_send", config->net.ingress_buffer_size, FD_NET_MTU, 2UL ); /**/ fd_topob_link( topo, "repair_net", "net_repair", config->net.ingress_buffer_size, FD_NET_MTU, 1UL ); - /**/ fd_topob_link( topo, "repair_sign", "repair_sign", 128UL, 2048UL, 1UL ); + /**/ fd_topob_link( topo, "ping_sign", "repair_sign", 128UL, 2048UL, 1UL ); FOR(shred_tile_cnt) fd_topob_link( topo, "shred_repair", "shred_repair", pending_fec_shreds_depth, FD_SHRED_REPAIR_MTU, 2UL /* at most 2 msgs per after_frag */ ); - FOR(shred_tile_cnt) fd_topob_link( topo, "repair_shred", "shred_repair", pending_fec_shreds_depth, sizeof(fd_ed25519_sig_t), 1UL ); - /**/ fd_topob_link( topo, "sign_repair", "sign_repair", 128UL, 64UL, 1UL ); + + FOR(shred_tile_cnt) fd_topob_link( topo, "repair_shred", "shred_repair", pending_fec_shreds_depth, sizeof(fd_ed25519_sig_t), 1UL ); + + /**/ fd_topob_link( topo, "sign_ping", "repair_sign", 128UL, sizeof(fd_ed25519_sig_t), 1UL ); + FOR(sign_tile_cnt-1) fd_topob_link( topo, "repair_sign", "repair_sign", 128UL, 2048UL, 1UL ); + FOR(sign_tile_cnt-1) fd_topob_link( topo, "sign_repair", "repair_sign", 1024UL, sizeof(fd_ed25519_sig_t), 1UL ); + /**/ fd_topob_link( topo, "repair_repla", "repair_repla", 65536UL, sizeof(fd_reasm_fec_t), 1UL ); - /**/ fd_topob_link( topo, "poh_shred", "poh_shred", 16384UL, USHORT_MAX, 1UL ); - /**/ fd_topob_link( topo, "poh_pack", "replay_poh", 128UL, sizeof(fd_became_leader_t) , 1UL ); - FOR(bank_tile_cnt) fd_topob_link( topo, "replay_poh", "replay_poh", 128UL, (4096UL*sizeof(fd_txn_p_t))+sizeof(fd_microblock_trailer_t), 1UL ); + FOR(bank_tile_cnt) fd_topob_link( topo, "replay_poh", "replay_poh", 128UL, (4096UL*sizeof(fd_txn_p_t))+sizeof(fd_microblock_trailer_t), 1UL ); + /**/ fd_topob_link( topo, "poh_shred", "poh_shred", 16384UL, USHORT_MAX, 1UL ); + /**/ fd_topob_link( topo, "poh_pack", "replay_poh", 128UL, sizeof(fd_became_leader_t) , 1UL ); /**/ fd_topob_link( topo, "tower_send", "tower_send", 65536UL, sizeof(fd_txn_p_t), 1UL ); /**/ fd_topob_link( topo, "send_txns", "send_txns", 128UL, FD_TPU_RAW_MTU, 1UL ); /**/ fd_topob_link( topo, "send_sign", "send_sign", 128UL, FD_TXN_MTU, 1UL ); - /**/ fd_topob_link( topo, "sign_send", "sign_send", 128UL, 64UL, 1UL ); + /**/ fd_topob_link( topo, "sign_send", "sign_send", 128UL, sizeof(fd_ed25519_sig_t), 1UL ); FD_TEST( sizeof(fd_snapshot_manifest_t)<=(5UL*(1UL<<30UL)) ); /**/ fd_topob_link( topo, "snap_zstd", "snap_zstd", 8192UL, 16384UL, 1UL ); @@ -418,7 +423,7 @@ fd_topo_initialize( config_t * config ) { /**/ fd_topob_tile( topo, "dedup", "dedup", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); FOR(resolv_tile_cnt) fd_topob_tile( topo, "resolv", "resolv", "metric_in", tile_to_cpu[ topo->tile_cnt ], 1, 0 ); FOR(shred_tile_cnt) fd_topob_tile( topo, "shred", "shred", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 1 ); - /**/ fd_topob_tile( topo, "sign", "sign", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 1 ); + FOR(sign_tile_cnt) fd_topob_tile( topo, "sign", "sign", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 1 ); /**/ fd_topob_tile( topo, "metric", "metric", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); fd_topo_tile_t * pack_tile = fd_topob_tile( topo, "pack", "pack", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); /**/ fd_topob_tile( topo, "poh", "poh", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 1 ); @@ -621,19 +626,19 @@ fd_topo_initialize( config_t * config ) { FOR(resolv_tile_cnt) fd_topob_tile_out( topo, "resolv", i, "resolv_pack", i ); /**/ fd_topob_tile_in( topo, "pack", 0UL, "metric_in", "resolv_pack", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); - /**/ fd_topos_tile_in_net( topo, "metric_in", "gossip_net", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */ - /**/ fd_topos_tile_in_net( topo, "metric_in", "repair_net", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */ - /**/ fd_topos_tile_in_net( topo, "metric_in", "send_net", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); + /**/ fd_topos_tile_in_net ( topo, "metric_in", "gossip_net", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */ + /**/ fd_topos_tile_in_net ( topo, "metric_in", "repair_net", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */ + /**/ fd_topos_tile_in_net ( topo, "metric_in", "send_net", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); FOR(shred_tile_cnt) for( ulong j=0UL; jtiles.archiver.enabled ) { diff --git a/src/app/shared/fd_config.c b/src/app/shared/fd_config.c index 9fda99c6868..788ad255223 100644 --- a/src/app/shared/fd_config.c +++ b/src/app/shared/fd_config.c @@ -448,7 +448,10 @@ fd_config_fill( fd_config_t * config, static void fd_config_validatef( fd_configf_t const * config ) { - (void)config; + CFG_HAS_NON_ZERO( layout.sign_tile_count ); + if( FD_UNLIKELY( config->layout.sign_tile_count < 2 ) ) { + FD_LOG_ERR(( "layout.sign_tile_count must be >= 2" )); + } } static void diff --git a/src/app/shared/fd_config.h b/src/app/shared/fd_config.h index 604318a860c..120b2a542fe 100644 --- a/src/app/shared/fd_config.h +++ b/src/app/shared/fd_config.h @@ -112,6 +112,7 @@ struct fd_configf { struct { uint exec_tile_count; /* TODO: redundant ish with bank tile cnt */ uint writer_tile_count; + uint sign_tile_count; } layout; struct { diff --git a/src/app/shared/fd_config_parse.c b/src/app/shared/fd_config_parse.c index b01a763210e..073a8811a7a 100644 --- a/src/app/shared/fd_config_parse.c +++ b/src/app/shared/fd_config_parse.c @@ -78,6 +78,7 @@ fd_config_extract_podf( uchar * pod, fd_configf_t * config ) { CFG_POP ( uint, layout.exec_tile_count ); CFG_POP ( uint, layout.writer_tile_count ); + CFG_POP ( uint, layout.sign_tile_count ); CFG_POP ( ulong, blockstore.shred_max ); CFG_POP ( ulong, blockstore.block_max ); diff --git a/src/disco/sign/fd_sign_tile.c b/src/disco/sign/fd_sign_tile.c index 20e12602af7..a7a3b9394b3 100644 --- a/src/disco/sign/fd_sign_tile.c +++ b/src/disco/sign/fd_sign_tile.c @@ -9,6 +9,8 @@ #include "../../ballet/base58/fd_base58.h" #include "../metrics/fd_metrics.h" +#include "../../util/hist/fd_histf.h" + #include #include @@ -160,7 +162,9 @@ after_frag_sensitive( void * _ctx, fd_sign_ctx_t * ctx = (fd_sign_ctx_t *)_ctx; - int sign_type = (int)(uint)sig; + /* The upper 32 bits contain the repair tile nonce to identify the + request, while the lower 32 bits specify the sign_type. */ + int sign_type = (int)(uint)(sig); FD_TEST( in_idxsign_duration, (ulong)sign_duration ); - fd_stem_publish( stem, in_idx, 0UL, ctx->out[ in_idx ].out_chunk, 64UL, 0UL, tsorig, 0UL ); + fd_stem_publish( stem, in_idx, sig, ctx->out[ in_idx ].out_chunk, 64UL, 0UL, tsorig, 0UL ); ctx->out[ in_idx ].out_chunk = fd_dcache_compact_next( ctx->out[ in_idx ].out_chunk, 64UL, ctx->out[ in_idx ].out_chunk0, ctx->out[ in_idx ].out_wmark ); } @@ -302,9 +306,14 @@ unprivileged_init_sensitive( fd_topo_t * topo, FD_TEST( !strcmp( out_link->name, "sign_gossip" ) ); FD_TEST( in_link->mtu==2048UL ); FD_TEST( out_link->mtu==64UL ); - } else if ( !strcmp( in_link->name, "repair_sign")) { + } else if ( !strcmp( in_link->name, "repair_sign" ) + || !strcmp( in_link->name, "ping_sign" ) ) { ctx->in[ i ].role = FD_KEYGUARD_ROLE_REPAIR; - FD_TEST( !strcmp( out_link->name, "sign_repair" ) ); + if( !strcmp( in_link->name, "ping_sign" ) ) { + FD_TEST( !strcmp( out_link->name, "sign_ping" ) ); + } else { + FD_TEST( !strcmp( out_link->name, "sign_repair" ) ); + } FD_TEST( in_link->mtu==2048UL ); FD_TEST( out_link->mtu==64UL ); } else if ( !strcmp(in_link->name, "send_sign" ) ) { diff --git a/src/discof/repair/fd_repair_tile.c b/src/discof/repair/fd_repair_tile.c index 206cb252aab..435fa4cbbc2 100644 --- a/src/discof/repair/fd_repair_tile.c +++ b/src/discof/repair/fd_repair_tile.c @@ -1,4 +1,48 @@ -/* Repair tile runs the repair protocol for a Firedancer node. */ +/* REQUEST HANDLING ARCHITECTURE + ========================================= + + The repair tile implements two distinct request handling patterns + based on the nature of the operation and its latency requirements: + + 1. SYNCHRONOUS REQUEST HANDLING + ----------------------------------------- + Used for lightweight protocol messages that require immediate + signing and response. These operations use the keyguard client for + direct signing, which requires blocking. + + Message types handled synchronously: + - PINGs & PONGs: Handles peer connectivity and liveness with simple + round-trip messages. + + - PEER WARM UPs: On receiving peer information in + handle_new_cluster_contact_info, we prepay the RTT cost by sending + a placeholder Repair request immediately. + + 2. ASYNCHRONOUS REQUEST HANDLING + -------------------------------- + Used strictly for repair requests. These requests are sent to the + sign tile, and the repair tile continues handling other operations + without blocking. Once the sign tile has signed the request, the + repair tile will complete the request from its pending sign request + deque and send the response. + + Message types handled asynchronously: + - WINDOW_INDEX (exact shred): Requests for a specific shred at a + known slot and index. Used when the repair tile knows exactly + which shred is missing from a FEC set. + + - HIGHEST_WINDOW_INDEX: Requests for the highest shred in a slot. + Used to determine the end boundary of a slot when the exact count + is unknown. + + - ORPHAN: Requests for the highest shred in the parent slot of an + orphaned slot. Used to establish the chain of slot ancestry when a + slot's parent is missing. + + Async requests can be distributed across multiple sign tiles using + round-robin based on the request nonce. This provides load balancing + and prevents any single sign tile from becoming a bottleneck. */ + #define _GNU_SOURCE #include "../../disco/topo/fd_topo.h" @@ -15,6 +59,7 @@ #include "../../discof/restore/utils/fd_ssmsg.h" #include "../../util/pod/fd_pod_format.h" #include "../../util/net/fd_net_headers.h" +#include "../../tango/fd_tango_base.h" #include "../forest/fd_forest.h" #include "../reasm/fd_reasm.h" @@ -48,11 +93,14 @@ typedef union { } fd_repair_in_ctx_t; struct fd_repair_out_ctx { - ulong idx; - fd_wksp_t * mem; - ulong chunk0; - ulong wmark; - ulong chunk; + ulong idx; + fd_wksp_t * mem; + ulong chunk0; + ulong wmark; + ulong chunk; + ulong in_idx; /* Index of the incoming link */ + ulong credits; /* Available credits for this sign tile */ + ulong max_credits; /* Maximum credits (depth) */ }; typedef struct fd_repair_out_ctx fd_repair_out_ctx_t; @@ -110,6 +158,8 @@ struct fd_repair_tile_ctx { fd_wksp_t * wksp; + fd_stem_context_t * stem; + uchar in_kind[ MAX_IN_LINKS ]; fd_repair_in_ctx_t in_links[ MAX_IN_LINKS ]; @@ -141,6 +191,27 @@ struct fd_repair_tile_ctx { uint shred_tile_cnt; fd_repair_out_ctx_t shred_out_ctx[ MAX_SHRED_TILE_CNT ]; + /* ping_sign link (to sign tile 0) - used for keyguard client */ + ulong ping_sign_in_idx; + + ulong ping_sign_out_idx; + fd_wksp_t * ping_sign_out_mem; + ulong ping_sign_out_chunk0; + ulong ping_sign_out_wmark; + ulong ping_sign_out_chunk; + + /* repair_sign links (to sign tiles 1+) - for round-robin distribution */ + ulong repair_sign_cnt; + fd_repair_out_ctx_t repair_sign_out_ctx[ MAX_SHRED_TILE_CNT ]; + ulong sign_repair_in_cnt; + ulong sign_repair_in_idx[ MAX_SHRED_TILE_CNT ]; + ulong sign_repair_in_depth[ MAX_SHRED_TILE_CNT ]; + + ulong round_robin_idx; + + /* Request sequence tracking for async signing */ + ulong request_seq; + ushort net_id; /* Includes Ethernet, IP, UDP headers */ uchar buffer[ MAX_BUFFER_SIZE ]; @@ -150,35 +221,45 @@ struct fd_repair_tile_ctx { fd_keyguard_client_t keyguard_client[1]; ulong manifest_slot; + /* Pending sign requests */ + fd_repair_pending_sign_req_t * pending_sign_req_pool; + fd_repair_pending_sign_req_map_t * pending_sign_req_map; }; typedef struct fd_repair_tile_ctx fd_repair_tile_ctx_t; + FD_FN_CONST static inline ulong scratch_align( void ) { return 128UL; } + FD_FN_PURE static inline ulong loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) { return 1UL * FD_SHMEM_GIGANTIC_PAGE_SZ; } + FD_FN_PURE static inline ulong scratch_footprint( fd_topo_tile_t const * tile ) { ulong l = FD_LAYOUT_INIT; l = FD_LAYOUT_APPEND( l, alignof(fd_repair_tile_ctx_t), sizeof(fd_repair_tile_ctx_t) ); - l = FD_LAYOUT_APPEND( l, fd_repair_align(), fd_repair_footprint() ); - l = FD_LAYOUT_APPEND( l, fd_forest_align(), fd_forest_footprint( tile->repair.slot_max ) ); - l = FD_LAYOUT_APPEND( l, fd_fec_sig_align(), fd_fec_sig_footprint( 20 ) ); + l = FD_LAYOUT_APPEND( l, fd_repair_align(), fd_repair_footprint() ); + l = FD_LAYOUT_APPEND( l, fd_forest_align(), fd_forest_footprint( tile->repair.slot_max ) ); + l = FD_LAYOUT_APPEND( l, fd_fec_sig_align(), fd_fec_sig_footprint( 20 ) ); l = FD_LAYOUT_APPEND( l, fd_sreasm_align(), fd_sreasm_footprint( 20 ) ); - // l = FD_LAYOUT_APPEND( l, fd_fec_repair_align(), fd_fec_repair_footprint( ( 1<<20 ), tile->repair.shred_tile_cnt ) ); - l = FD_LAYOUT_APPEND( l, fd_reasm_align(), fd_reasm_footprint( 1 << 20 ) ); // TODO: fix this - l = FD_LAYOUT_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( FD_REPAIR_SCRATCH_MAX ) ); - l = FD_LAYOUT_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( FD_REPAIR_SCRATCH_DEPTH ) ); + l = FD_LAYOUT_APPEND( l, fd_reasm_align(), fd_reasm_footprint( 1 << 20 ) ); +//l = FD_LAYOUT_APPEND( l, fd_fec_repair_align(), fd_fec_repair_footprint( ( 1<<20 ), tile->repair.shred_tile_cnt ) ); + l = FD_LAYOUT_APPEND( l, fd_repair_pending_sign_req_pool_align(), fd_repair_pending_sign_req_pool_footprint( FD_REPAIR_PENDING_SIGN_REQ_MAX ) ); + l = FD_LAYOUT_APPEND( l, fd_repair_pending_sign_req_map_align(), fd_repair_pending_sign_req_map_footprint( FD_REPAIR_PENDING_SIGN_REQ_MAX ) ); + l = FD_LAYOUT_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( FD_REPAIR_SCRATCH_MAX ) ); + l = FD_LAYOUT_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( FD_REPAIR_SCRATCH_DEPTH ) ); return FD_LAYOUT_FINI( l, scratch_align() ); } + +/* Wrapper for keyguard client sign */ static void repair_signer( void * signer_ctx, uchar signature[ static 64 ], @@ -189,6 +270,26 @@ repair_signer( void * signer_ctx, fd_keyguard_client_sign( ctx->keyguard_client, signature, buffer, len, sign_type ); } +/* Wrapper for publishing to the sign tile*/ +static void +repair_signer_async( void * signer_ctx, + ulong nonce, + uchar const * buffer, + ulong len, + int sign_type, + fd_repair_out_ctx_t * sign_out) { + fd_repair_tile_ctx_t * ctx = (fd_repair_tile_ctx_t *) signer_ctx; + + uchar * dst = fd_chunk_to_laddr( sign_out->mem, sign_out->chunk ); + fd_memcpy( dst, buffer, len ); + + ulong sig = ((ulong)nonce << 32) | (ulong)(uint)sign_type; + fd_stem_publish( ctx->stem, sign_out->idx, sig, sign_out->chunk, len, 0UL, 0UL, 0UL ); + sign_out->chunk = fd_dcache_compact_next( sign_out->chunk, len, sign_out->chunk0, sign_out->wmark ); + + ctx->request_seq = fd_seq_inc( ctx->request_seq, 1UL ); +} + static void send_packet( fd_repair_tile_ctx_t * ctx, fd_stem_context_t * stem, @@ -225,36 +326,8 @@ send_packet( fd_repair_tile_ctx_t * ctx, ctx->net_out_chunk = fd_dcache_compact_next( chunk, packet_sz, ctx->net_out_chunk0, ctx->net_out_wmark ); } -static inline void -handle_new_cluster_contact_info( fd_repair_tile_ctx_t * ctx, - uchar const * buf, - ulong buf_sz ) { - fd_shred_dest_wire_t const * in_dests = (fd_shred_dest_wire_t const *)fd_type_pun_const( buf ); - - ulong dest_cnt = buf_sz; - if( FD_UNLIKELY( dest_cnt >= MAX_REPAIR_PEERS ) ) { - FD_LOG_WARNING(( "Cluster nodes had %lu destinations, which was more than the max of %lu", dest_cnt, MAX_REPAIR_PEERS )); - return; - } - - /* Stop adding peers after we reach the peer max, but we may want to - consider an eviction policy. */ - for( ulong i=0UL; irepair->peer_cnt >= FD_ACTIVE_KEY_MAX ) ) break;// FIXME: aiming to move all peer tracking out of lib into tile, leaving like this for now - fd_repair_peer_addr_t repair_peer = { - .addr = in_dests[i].ip4_addr, - .port = fd_ushort_bswap( in_dests[i].udp_port ), - }; - int dup = fd_repair_add_active_peer( ctx->repair, &repair_peer, in_dests[i].pubkey ); - if( !dup ) { - ulong hash_src = 0xfffffUL & fd_ulong_hash( (ulong)in_dests[i].ip4_addr | ((ulong)repair_peer.port<<32) ); - FD_LOG_INFO(( "Added repair peer: pubkey %s hash_src %lu", FD_BASE58_ENC_32_ALLOCA(in_dests[i].pubkey), hash_src )); - } - } -} - ulong -fd_repair_handle_ping( fd_repair_tile_ctx_t * repair_tile_ctx, +fd_repair_handle_ping( fd_repair_tile_ctx_t * repair_tile_ctx, fd_repair_t * glob, fd_gossip_ping_t const * ping, fd_gossip_peer_addr_t const * peer_addr FD_PARAM_UNUSED, @@ -314,7 +387,7 @@ fd_repair_recv_clnt_packet( fd_repair_tile_ctx_t * repair_tile_ctx, switch( gmsg->discriminant ) { case fd_repair_response_enum_ping: { - uchar buf[1024]; + uchar buf[FD_REPAIR_MAX_SIGN_BUF_SIZE]; ulong buflen = fd_repair_handle_ping( repair_tile_ctx, glob, &gmsg->inner.ping, src_addr, dst_ip4_addr, buf, sizeof(buf) ); ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() ); send_packet( repair_tile_ctx, stem, 1, src_addr->addr, src_addr->port, dst_ip4_addr, buf, buflen, tsorig ); @@ -328,14 +401,29 @@ fd_repair_recv_clnt_packet( fd_repair_tile_ctx_t * repair_tile_ctx, return 0; } +/* Signs and prepares a repair protocol message for sending, either + synchronously or asynchronously. This is responsible for encoding a + repair protocol message, signing and preparing it for transmission. + + In synchronous mode (is_async == 0), the message is signed + immediately using the keyguard client, and the signature is inserted + into the message buffer before returning. + + In asynchronous mode (is_async != 0), the message is sent to the sign + tile for signing, and the function returns after queuing the request. + The actual sending will be completed once the signature is available. + */ static ulong fd_repair_sign_and_send( fd_repair_tile_ctx_t * repair_tile_ctx, fd_repair_protocol_t * protocol, fd_gossip_peer_addr_t * addr FD_PARAM_UNUSED, uchar * buf, - ulong buflen ) { + ulong buflen, + int is_async, + ulong nonce, + fd_repair_out_ctx_t * sign_out) { - FD_TEST( buflen >= 1024UL ); + FD_TEST( buflen >= FD_REPAIR_MAX_SIGN_BUF_SIZE ); fd_bincode_encode_ctx_t ctx = { .data = buf, .dataend = buf + buflen }; if( FD_UNLIKELY( fd_repair_protocol_encode( protocol, &ctx ) != FD_BINCODE_SUCCESS ) ) { FD_LOG_CRIT(( "Failed to encode repair message (type %#x)", protocol->discriminant )); @@ -364,18 +452,41 @@ fd_repair_sign_and_send( fd_repair_tile_ctx_t * repair_tile_ctx, ^ ^ buf buf+4 */ - fd_signature_t sig; - repair_signer( repair_tile_ctx, sig.uc, buf, buflen, FD_KEYGUARD_SIGN_TYPE_ED25519 ); - - /* Reintroduce the signature */ + /* If async, we send the signing request to the sign tile */ + if( FD_LIKELY( is_async ) ) { + repair_signer_async( repair_tile_ctx, nonce, buf, buflen, FD_KEYGUARD_SIGN_TYPE_ED25519, sign_out); + return buflen + 64UL; + /* If sync, we sign using keyguard */ + } else { + fd_signature_t sig; + repair_signer( repair_tile_ctx, sig.uc, buf, buflen, FD_KEYGUARD_SIGN_TYPE_ED25519 ); - buf -= 64UL; - buflen += 64UL; - fd_memcpy( buf + 4U, &sig, 64U ); + /* Reintroduce the signature */ + buf -= 64UL; + buflen += 64UL; + fd_memcpy( buf + 4U, &sig, 64U ); - return buflen; + return buflen; + } } +/* Returns a sign_out context that has available credits. + If no sign_out context has available credits, returns NULL. */ +static fd_repair_out_ctx_t * +sign_avail_credits( fd_repair_tile_ctx_t * ctx ) { + fd_repair_out_ctx_t * sign_out = NULL; + + for( uint i = 0; i < ctx->repair_sign_cnt; i++ ) { + fd_repair_out_ctx_t * candidate = &ctx->repair_sign_out_ctx[ ctx->round_robin_idx ]; + ctx->round_robin_idx = (ctx->round_robin_idx + 1) % ctx->repair_sign_cnt; + if( candidate->credits > 0 ) { + sign_out = candidate; + break; + } + } + + return sign_out; +} static void fd_repair_send_request( fd_repair_tile_ctx_t * repair_tile_ctx, @@ -386,25 +497,21 @@ fd_repair_send_request( fd_repair_tile_ctx_t * repair_tile_ctx, uint shred_index, fd_pubkey_t const * recipient, long now ) { - - /* Send requests starting where we left off last time. i.e. if n < current_nonce, seek forward */ - /* Track statistics */ fd_repair_protocol_t protocol; - fd_repair_construct_request_protocol( glob, &protocol, type, slot, shred_index, recipient, glob->next_nonce, now ); - glob->next_nonce++; + fd_repair_construct_request_protocol( glob, &protocol, type, slot, shred_index, recipient, 0, now ); fd_active_elem_t * active = fd_active_table_query( glob->actives, recipient, NULL ); active->avg_reqs++; glob->metrics.send_pkt_cnt++; - uchar buf[1024]; - ulong buflen = fd_repair_sign_and_send( repair_tile_ctx, &protocol, &active->addr, buf, sizeof(buf) ); + uchar buf[FD_REPAIR_MAX_SIGN_BUF_SIZE]; + ulong buflen = fd_repair_sign_and_send( repair_tile_ctx, &protocol, &active->addr, buf, sizeof(buf), 0, 1, NULL ); ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() ); uint src_ip4_addr = 0U; /* unknown */ send_packet( repair_tile_ctx, stem, 1, active->addr.addr, active->addr.port, src_ip4_addr, buf, buflen, tsorig ); } -static void +static void FD_FN_UNUSED fd_repair_send_requests( fd_repair_tile_ctx_t * ctx, fd_stem_context_t * stem, enum fd_needed_elem_type type, @@ -420,6 +527,90 @@ fd_repair_send_requests( fd_repair_tile_ctx_t * ctx, } } +/* Sends a request asynchronously. If successful, adds it to the + pending_sign_req_map and publishes to the sign tile. If not, the + request is skipped for now and will be retried later by the forest + iterator. */ +static void +fd_repair_send_request_async( fd_repair_tile_ctx_t * ctx, + fd_stem_context_t * stem FD_PARAM_UNUSED, + fd_repair_t * glob, + fd_repair_out_ctx_t * sign_out, + enum fd_needed_elem_type type, + ulong slot, + uint shred_index, + fd_pubkey_t const * recipient, + long now ){ + fd_active_elem_t * peer = fd_active_table_query(glob->actives, recipient, NULL); + if (!peer) FD_LOG_ERR(( "No active peer found for recipient %s", FD_BASE58_ENC_32_ALLOCA(recipient) )); + + /* Acquire and add a pending request from the pool */ + fd_repair_protocol_t protocol; + fd_repair_pending_sign_req_t * pending = fd_repair_insert_pending_request( glob, &protocol, peer->addr.addr, peer->addr.port, type, slot, shred_index, now, recipient ); + if( FD_UNLIKELY( !pending ) ) { + FD_LOG_WARNING(( "No free pending sign reqs" )); + return; + } + + /* Sign and prepare the message directly into the pending buffer */ + pending->buflen = fd_repair_sign_and_send( ctx, &protocol, &peer->addr, pending->buf, sizeof(pending->buf), 1, pending->nonce, sign_out ); + + sign_out->credits--; +} + +static void +fd_repair_send_requests_async( fd_repair_tile_ctx_t * ctx, + fd_stem_context_t * stem, + fd_repair_out_ctx_t * sign_out, + enum fd_needed_elem_type type, + ulong slot, + uint shred_index, + long now ){ + fd_repair_t * glob = ctx->repair; + + for( uint i=0; ipeers[ glob->peer_idx++ ].key; + fd_repair_send_request_async( ctx, stem, glob, sign_out, type, slot, shred_index, id, now ); + if( FD_UNLIKELY( glob->peer_idx >= glob->peer_cnt ) ) glob->peer_idx = 0; + } +} + +static inline void +handle_new_cluster_contact_info( fd_repair_tile_ctx_t * ctx, + uchar const * buf, + ulong buf_sz ) { + fd_shred_dest_wire_t const * in_dests = (fd_shred_dest_wire_t const *)fd_type_pun_const( buf ); + + ulong dest_cnt = buf_sz; + if( FD_UNLIKELY( dest_cnt >= MAX_REPAIR_PEERS ) ) { + FD_LOG_WARNING(( "Cluster nodes had %lu destinations, which was more than the max of %lu", dest_cnt, MAX_REPAIR_PEERS )); + return; + } + + /* Stop adding peers after we reach the peer max, but we may want to + consider an eviction policy. */ + for( ulong i=0UL; irepair->peer_cnt >= FD_ACTIVE_KEY_MAX ) ) break;// FIXME: aiming to move all peer tracking out of lib into tile, leaving like this for now + fd_repair_peer_addr_t repair_peer = { + .addr = in_dests[i].ip4_addr, + .port = fd_ushort_bswap( in_dests[i].udp_port ), + }; + int dup = fd_repair_add_active_peer( ctx->repair, &repair_peer, in_dests[i].pubkey ); + if( !dup ) { + /* The repair process uses a Ping-Pong protocol that incurs one + round-trip time (RTT) for the initial repair request. To optimize + this, we proactively send a placeholder Repair request as soon as we + receive a peer's contact information for the first time, effectively + prepaying the RTT cost. */ + if( FD_LIKELY( ctx->repair_sign_cnt > 0 ) ) { + fd_repair_send_request(ctx, ctx->stem, ctx->repair, fd_needed_window_index, 0, 0, in_dests[i].pubkey, fd_log_wallclock()); + } + ulong hash_src = 0xfffffUL & fd_ulong_hash( (ulong)in_dests[i].ip4_addr | ((ulong)repair_peer.port<<32) ); + FD_LOG_INFO(( "Added repair peer: pubkey %s hash_src %lu", FD_BASE58_ENC_32_ALLOCA(in_dests[i].pubkey), hash_src )); + } + } +} + static inline int before_frag( fd_repair_tile_ctx_t * ctx, ulong in_idx, @@ -483,6 +674,9 @@ during_frag( fd_repair_tile_ctx_t * ctx, if( FD_UNLIKELY( ctx->in_kind[in_idx]!=IN_KIND_SNAP || fd_ssmsg_sig_message( sig )!=FD_SSMSG_DONE ) ) ctx->snap_out_chunk = chunk; return; + } else if ( FD_UNLIKELY( in_kind==IN_KIND_SIGN ) ) { + dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk ); + dcache_entry_sz = sz; } else { FD_LOG_ERR(( "Frag from unknown link (kind=%u in_idx=%lu)", in_kind, in_idx )); } @@ -513,6 +707,103 @@ after_frag_snap( fd_repair_tile_ctx_t * ctx, // else fd_reasm_publish( ctx->reasm, &ctx->root_block_id ); } +static ulong FD_FN_UNUSED +fd_repair_send_ping( fd_repair_tile_ctx_t * repair_tile_ctx, + fd_repair_t * glob, + fd_pinged_elem_t * val, + uchar * buf, + ulong buflen ) { + fd_repair_response_t gmsg; + fd_repair_response_new_disc( &gmsg, fd_repair_response_enum_ping ); + fd_gossip_ping_t * ping = &gmsg.inner.ping; + ping->from = *glob->public_key; + + uchar pre_image[FD_PING_PRE_IMAGE_SZ]; + memcpy( pre_image, "SOLANA_PING_PONG", 16UL ); + memcpy( pre_image+16UL, val->token.uc, 32UL ); + + fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, &ping->token ); + + repair_signer( repair_tile_ctx, ping->signature.uc, pre_image, FD_PING_PRE_IMAGE_SZ, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519 ); + + fd_bincode_encode_ctx_t ctx; + FD_TEST( buflen >= FD_REPAIR_MAX_SIGN_BUF_SIZE ); + ctx.data = buf; + ctx.dataend = buf + buflen; + FD_TEST(0 == fd_repair_response_encode(&gmsg, &ctx)); + return (ulong)((uchar*)ctx.data - buf); +} + +static void FD_FN_UNUSED +fd_repair_recv_pong(fd_repair_t * glob, fd_gossip_ping_t const * pong, fd_gossip_peer_addr_t const * from) { + fd_pinged_elem_t * val = fd_pinged_table_query(glob->pinged, from, NULL); + if( val == NULL || !fd_pubkey_eq( &val->id, &pong->from ) ) + return; + + /* Verify response hash token */ + uchar pre_image[FD_PING_PRE_IMAGE_SZ]; + memcpy( pre_image, "SOLANA_PING_PONG", 16UL ); + memcpy( pre_image+16UL, val->token.uc, 32UL ); + + fd_hash_t pre_image_hash; + fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, pre_image_hash.uc ); + + fd_sha256_t sha[1]; + fd_sha256_init( sha ); + fd_sha256_append( sha, "SOLANA_PING_PONG", 16UL ); + fd_sha256_append( sha, pre_image_hash.uc, 32UL ); + fd_hash_t golden; + fd_sha256_fini( sha, golden.uc ); + + fd_sha512_t sha2[1]; + if( fd_ed25519_verify( /* msg */ golden.uc, + /* sz */ 32U, + /* sig */ pong->signature.uc, + /* public_key */ pong->from.uc, + sha2 )) { + FD_LOG_WARNING(("Failed sig verify for pong")); + return; + } + + val->good = 1; +} + +static void +fd_repair_handle_sign_response( fd_repair_tile_ctx_t * ctx, + ulong in_idx, + ulong sig, + fd_stem_context_t * stem ) { + /* Nonce was packed into sig, so we need to unpack it */ + ulong response_nonce = sig >> 32; + /* Look up the pending request by nonce. Since the repair_sign links are + reliable, the incoming sign_repair fragments represent a complete + set of the previously sent outgoing messages. However, with + multiple sign tiles, the responses may not arrive in order. */ + + /* Find which sign tile sent this response and increment its credits */ + for( uint i = 0; i < ctx->repair_sign_cnt; i++ ) { + if( ctx->repair_sign_out_ctx[i].in_idx == in_idx ) { + if( ctx->repair_sign_out_ctx[i].credits < ctx->repair_sign_out_ctx[i].max_credits ) { + ctx->repair_sign_out_ctx[i].credits++; + } + break; + } + } + + fd_repair_pending_sign_req_t * pending = fd_repair_query_pending_request( ctx->repair, response_nonce ); + if( FD_LIKELY( pending ) ) { + fd_memcpy( pending->buf + pending->sig_offset, ctx->buffer, 64UL ); + ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() ); + uint src_ip4_addr = 0U; + send_packet( ctx, stem, 1, pending->dst_ip_addr, pending->dst_port, src_ip4_addr, pending->buf, pending->buflen, tsorig ); + + fd_repair_remove_pending_request( ctx->repair, response_nonce ); + return; + } else { + FD_LOG_ERR(( "No pending request found for nonce %lu", response_nonce )); + } +} + static void after_frag( fd_repair_tile_ctx_t * ctx, ulong in_idx, @@ -522,9 +813,10 @@ after_frag( fd_repair_tile_ctx_t * ctx, ulong tsorig FD_PARAM_UNUSED, ulong tspub FD_PARAM_UNUSED, fd_stem_context_t * stem ) { - if( FD_UNLIKELY( ctx->skip_frag ) ) return; + ctx->stem = stem; + uint in_kind = ctx->in_kind[ in_idx ]; if( FD_UNLIKELY( in_kind==IN_KIND_CONTACT ) ) { handle_new_cluster_contact_info( ctx, ctx->buffer, sz ); @@ -538,6 +830,11 @@ after_frag( fd_repair_tile_ctx_t * ctx, return; } + if( FD_UNLIKELY( in_kind==IN_KIND_SIGN ) ) { + fd_repair_handle_sign_response( ctx, in_idx, sig, stem ); + return; + } + if( FD_UNLIKELY( in_kind==IN_KIND_SHRED ) ) { fd_shred_t * shred = (fd_shred_t *)fd_type_pun( ctx->buffer ); if( FD_UNLIKELY( shred->slot <= fd_forest_root_slot( ctx->forest ) ) ) { @@ -732,16 +1029,25 @@ after_credit( fd_repair_tile_ctx_t * ctx, fd_forest_ele_t * pool = fd_forest_pool( forest ); fd_forest_orphaned_t * orphaned = fd_forest_orphaned( forest ); - // Always request orphans + /* Verify that there is at least one sign tile with available credits. + If not, we can't send any requests and leave early. */ + fd_repair_out_ctx_t * sign_out = sign_avail_credits( ctx ); + if( FD_UNLIKELY( !sign_out ) ) { + // FD_LOG_NOTICE(( "No sign tiles have available credits" )); + return; + } + /* Always request orphans first */ int total_req = 0; for( fd_forest_orphaned_iter_t iter = fd_forest_orphaned_iter_init( orphaned, pool ); !fd_forest_orphaned_iter_done( iter, orphaned, pool ); iter = fd_forest_orphaned_iter_next( iter, orphaned, pool ) ) { fd_forest_ele_t * orphan = fd_forest_orphaned_iter_ele( iter, orphaned, pool ); if( fd_repair_need_orphan( ctx->repair, orphan->slot ) ) { - fd_repair_send_requests( ctx, stem, fd_needed_orphan, orphan->slot, UINT_MAX, now ); + fd_repair_send_requests_async( ctx, stem, sign_out, fd_needed_orphan, orphan->slot, UINT_MAX, now); total_req += FD_REPAIR_NUM_NEEDED_PEERS; + fd_repair_continue( ctx->repair ); + return; } } @@ -756,7 +1062,7 @@ after_credit( fd_repair_tile_ctx_t * ctx, head of frontier, because we could end up traversing down a very long tree if we are far behind. */ - if( FD_UNLIKELY( now - ctx->tsreset > (long)40e6 ) ) { + if( FD_UNLIKELY( now - ctx->tsreset > (long)100e6 ) ) { // reset iterator to the beginning of the forest frontier ctx->repair_iter = fd_forest_iter_init( ctx->forest ); ctx->tsreset = now; @@ -775,10 +1081,10 @@ after_credit( fd_repair_tile_ctx_t * ctx, ele = fd_forest_pool_ele_const( pool, ctx->repair_iter.ele_idx ); // Request first, advance iterator second. if( ctx->repair_iter.shred_idx == UINT_MAX && fd_repair_need_highest_window_index( ctx->repair, ele->slot, 0 ) ){ - fd_repair_send_requests( ctx, stem, fd_needed_highest_window_index, ele->slot, 0, now ); + fd_repair_send_requests_async( ctx, stem, sign_out, fd_needed_highest_window_index, ele->slot, 0, now ); total_req += FD_REPAIR_NUM_NEEDED_PEERS; } else if( fd_repair_need_window_index( ctx->repair, ele->slot, ctx->repair_iter.shred_idx ) ) { - fd_repair_send_requests( ctx, stem, fd_needed_window_index, ele->slot, ctx->repair_iter.shred_idx, now ); + fd_repair_send_requests_async( ctx, stem, sign_out, fd_needed_window_index, ele->slot, ctx->repair_iter.shred_idx, now ); total_req += FD_REPAIR_NUM_NEEDED_PEERS; } @@ -838,7 +1144,7 @@ unprivileged_init( fd_topo_t * topo, if( FD_UNLIKELY( tile->in_cnt > MAX_IN_LINKS ) ) FD_LOG_ERR(( "repair tile has too many input links" )); - uint sign_link_in_idx = UINT_MAX; + ctx->sign_repair_in_cnt = 0; for( uint in_idx=0U; in_idx<(tile->in_cnt); in_idx++ ) { fd_topo_link_t * link = &topo->links[ tile->in_link_id[ in_idx ] ]; if( 0==strcmp( link->name, "net_repair" ) ) { @@ -851,14 +1157,22 @@ unprivileged_init( fd_topo_t * topo, ctx->in_kind[ in_idx ] = IN_KIND_ROOT; } else if( 0==strcmp( link->name, "shred_repair" ) ) { ctx->in_kind[ in_idx ] = IN_KIND_SHRED; - } else if( 0==strcmp( link->name, "sign_repair" ) ) { + } else if( 0==strcmp( link->name, "sign_repair" ) || 0==strcmp( link->name, "sign_ping" ) ) { ctx->in_kind[ in_idx ] = IN_KIND_SIGN; - sign_link_in_idx = in_idx; + if( 0==strcmp( link->name, "sign_ping" ) ) { + ctx->in_kind[ in_idx ] = IN_KIND_SIGN; + ctx->ping_sign_in_idx = in_idx; + } if( 0==strcmp( link->name, "sign_repair" ) ) { + ctx->in_kind[ in_idx ] = IN_KIND_SIGN; + ctx->sign_repair_in_idx[ ctx->sign_repair_in_cnt ] = in_idx; + ctx->sign_repair_in_depth[ ctx->sign_repair_in_cnt ] = link->depth; + ctx->sign_repair_in_cnt++; + } } else if( 0==strcmp( link->name, "snap_out" ) ) { ctx->in_kind[ in_idx ] = IN_KIND_SNAP; } else if( 0==strcmp( link->name, "stake_out" ) ) { ctx->in_kind[ in_idx ] = IN_KIND_STAKE; - } else { + }else { FD_LOG_ERR(( "repair tile has unexpected input link %s", link->name )); } @@ -869,13 +1183,18 @@ unprivileged_init( fd_topo_t * topo, ctx->in_links[ in_idx ].chunk0 = fd_dcache_compact_chunk0( ctx->in_links[ in_idx ].mem, link->dcache ); ctx->in_links[ in_idx ].wmark = fd_dcache_compact_wmark ( ctx->in_links[ in_idx ].mem, link->dcache, link->mtu ); ctx->in_links[ in_idx ].mtu = link->mtu; + FD_TEST( fd_dcache_compact_is_safe( ctx->in_links[in_idx].mem, link->dcache, link->mtu, link->depth ) ); } - if( FD_UNLIKELY( sign_link_in_idx==UINT_MAX ) ) FD_LOG_ERR(( "Missing sign_repair link" )); - uint net_link_out_idx = UINT_MAX; - uint sign_link_out_idx = UINT_MAX; - uint shred_tile_idx = 0; + uint net_link_out_idx = UINT_MAX; + ctx->ping_sign_out_idx = UINT_MAX; + ctx->repair_sign_cnt = 0; + ctx->request_seq = 0UL; + uint shred_tile_idx = 0; + uint sign_repair_match_cnt = 0; + ctx->round_robin_idx = 0UL; + for( uint out_idx=0U; out_idx<(tile->out_cnt); out_idx++ ) { fd_topo_link_t * link = &topo->links[ tile->out_link_id[ out_idx ] ]; @@ -883,15 +1202,11 @@ unprivileged_init( fd_topo_t * topo, if( net_link_out_idx!=UINT_MAX ) continue; /* only use first net link */ net_link_out_idx = out_idx; + ctx->net_out_idx = out_idx; ctx->net_out_mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp; ctx->net_out_chunk0 = fd_dcache_compact_chunk0( ctx->net_out_mem, link->dcache ); ctx->net_out_wmark = fd_dcache_compact_wmark( ctx->net_out_mem, link->dcache, link->mtu ); ctx->net_out_chunk = ctx->net_out_chunk0; - - } else if( 0==strcmp( link->name, "repair_sign" ) ) { - - sign_link_out_idx = out_idx; - } else if( 0==strcmp( link->name, "repair_repla" ) ) { ctx->replay_out_mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp; @@ -917,27 +1232,53 @@ unprivileged_init( fd_topo_t * topo, ctx->shredcap_out_wmark = fd_dcache_compact_wmark( ctx->shredcap_out_mem, link->dcache, link->mtu ); ctx->shredcap_out_chunk = ctx->shredcap_out_chunk0; + } else if( 0==strcmp( link->name, "ping_sign" ) ) { + ctx->ping_sign_out_idx = out_idx; + ctx->ping_sign_out_mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp; + ctx->ping_sign_out_chunk0 = fd_dcache_compact_chunk0( ctx->ping_sign_out_mem, link->dcache ); + ctx->ping_sign_out_wmark = fd_dcache_compact_wmark( ctx->ping_sign_out_mem, link->dcache, link->mtu ); + ctx->ping_sign_out_chunk = ctx->ping_sign_out_chunk0; + + } else if( 0==strcmp( link->name, "repair_sign" ) ) { + fd_repair_out_ctx_t * repair_sign_out = &ctx->repair_sign_out_ctx[ ctx->repair_sign_cnt++ ]; + repair_sign_out->idx = out_idx; + repair_sign_out->mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp; + repair_sign_out->chunk0 = fd_dcache_compact_chunk0( repair_sign_out->mem, link->dcache ); + repair_sign_out->wmark = fd_dcache_compact_wmark( repair_sign_out->mem, link->dcache, link->mtu ); + repair_sign_out->chunk = repair_sign_out->chunk0; + repair_sign_out->in_idx = ctx->sign_repair_in_idx[ sign_repair_match_cnt ]; + repair_sign_out->max_credits = ctx->sign_repair_in_depth[ sign_repair_match_cnt ]; + repair_sign_out->credits = ctx->sign_repair_in_depth[ sign_repair_match_cnt ]; + sign_repair_match_cnt++; + } else { FD_LOG_ERR(( "repair tile has unexpected output link %s", link->name )); } } - if( FD_UNLIKELY( sign_link_out_idx==UINT_MAX ) ) FD_LOG_ERR(( "Missing repair_sign link" )); + if( FD_UNLIKELY( ctx->ping_sign_out_idx==UINT_MAX ) ) FD_LOG_ERR(( "Missing ping_sign link for keyguard client" )); if( FD_UNLIKELY( net_link_out_idx ==UINT_MAX ) ) FD_LOG_ERR(( "Missing repair_net link" )); + if( FD_UNLIKELY( ctx->repair_sign_cnt != ctx->sign_repair_in_cnt ) ) { + FD_LOG_ERR(( "Mismatch between repair_sign output links (%lu) and sign_repair input links (%lu)", + ctx->repair_sign_cnt, ctx->sign_repair_in_cnt )); + } + ctx->shred_tile_cnt = shred_tile_idx; FD_TEST( ctx->shred_tile_cnt == fd_topo_tile_name_cnt( topo, "shred" ) ); /* Scratch mem setup */ - ctx->repair = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_align(), fd_repair_footprint() ); - ctx->forest = FD_SCRATCH_ALLOC_APPEND( l, fd_forest_align(), fd_forest_footprint( tile->repair.slot_max ) ); - ctx->fec_sigs = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_sig_align(), fd_fec_sig_footprint( 20 ) ); - ctx->sreasm = FD_SCRATCH_ALLOC_APPEND( l, fd_sreasm_align(), fd_sreasm_footprint( 20 ) ); + ctx->repair = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_align(), fd_repair_footprint() ); + ctx->forest = FD_SCRATCH_ALLOC_APPEND( l, fd_forest_align(), fd_forest_footprint( tile->repair.slot_max ) ); + ctx->fec_sigs = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_sig_align(), fd_fec_sig_footprint( 20 ) ); + ctx->sreasm = FD_SCRATCH_ALLOC_APPEND( l, fd_sreasm_align(), fd_sreasm_footprint( 20 ) ); + ctx->reasm = FD_SCRATCH_ALLOC_APPEND( l, fd_reasm_align(), fd_reasm_footprint( 1 << 20 ) ); + ctx->pending_sign_req_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_pending_sign_req_pool_align(), fd_repair_pending_sign_req_pool_footprint( FD_REPAIR_PENDING_SIGN_REQ_MAX ) ); + ctx->pending_sign_req_map = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_pending_sign_req_map_align(), fd_repair_pending_sign_req_map_footprint( FD_REPAIR_PENDING_SIGN_REQ_MAX ) ); // ctx->fec_repair = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_repair_align(), fd_fec_repair_footprint( ( 1<<20 ), tile->repair.shred_tile_cnt ) ); /* Look at fec_repair.h for an explanation of this fec_max. */ - ctx->reasm = FD_SCRATCH_ALLOC_APPEND( l, fd_reasm_align(), fd_reasm_footprint( 1 << 20 ) ); - + ctx->repair->next_nonce = 1; ctx->store = NULL; ulong store_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "store" ); if( FD_LIKELY( store_obj_id!=ULONG_MAX ) ) { /* firedancer-only */ @@ -964,9 +1305,8 @@ unprivileged_init( fd_topo_t * topo, fd_ip4_udp_hdr_init( ctx->intake_hdr, FD_REPAIR_MAX_PACKET_SIZE, 0, ctx->repair_intake_listen_port ); fd_ip4_udp_hdr_init( ctx->serve_hdr, FD_REPAIR_MAX_PACKET_SIZE, 0, ctx->repair_serve_listen_port ); - /* Keyguard setup */ - fd_topo_link_t * sign_in = &topo->links[ tile->in_link_id[ sign_link_in_idx ] ]; - fd_topo_link_t * sign_out = &topo->links[ tile->out_link_id[ sign_link_out_idx ] ]; + fd_topo_link_t * sign_in = &topo->links[ tile->in_link_id[ ctx->ping_sign_in_idx ] ]; + fd_topo_link_t * sign_out = &topo->links[ tile->out_link_id[ ctx->ping_sign_out_idx ] ]; if( fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client, sign_out->mcache, sign_out->dcache, @@ -980,12 +1320,20 @@ unprivileged_init( fd_topo_t * topo, /* Repair set up */ - ctx->repair = fd_repair_join( fd_repair_new( ctx->repair, ctx->repair_seed ) ); - ctx->forest = fd_forest_join( fd_forest_new( ctx->forest, tile->repair.slot_max, ctx->repair_seed ) ); + ctx->repair = fd_repair_join ( fd_repair_new ( ctx->repair, ctx->repair_seed ) ); + ctx->forest = fd_forest_join ( fd_forest_new ( ctx->forest, tile->repair.slot_max, ctx->repair_seed ) ); // ctx->fec_repair = fd_fec_repair_join( fd_fec_repair_new( ctx->fec_repair, ( tile->repair.max_pending_shred_sets + 2 ), tile->repair.shred_tile_cnt, 0 ) ); - ctx->fec_sigs = fd_fec_sig_join( fd_fec_sig_new( ctx->fec_sigs, 20 ) ); + ctx->fec_sigs = fd_fec_sig_join ( fd_fec_sig_new ( ctx->fec_sigs, 20 ) ); ctx->sreasm = fd_sreasm_join( fd_sreasm_new( ctx->sreasm, 20 ) ); ctx->reasm = fd_reasm_join( fd_reasm_new( ctx->reasm, 1 << 20, 0 ) ); + ctx->pending_sign_req_pool = fd_repair_pending_sign_req_pool_join ( fd_repair_pending_sign_req_pool_new ( ctx->pending_sign_req_pool, FD_REPAIR_PENDING_SIGN_REQ_MAX ) ); + ctx->pending_sign_req_map = fd_repair_pending_sign_req_map_join ( fd_repair_pending_sign_req_map_new ( ctx->pending_sign_req_map, FD_REPAIR_PENDING_SIGN_REQ_MAX, ctx->repair_seed ) ); + + ctx->repair->next_nonce = 1; + + if( FD_UNLIKELY( !ctx->pending_sign_req_pool || !ctx->pending_sign_req_map ) ) { + FD_LOG_ERR(( "Failed to join pending_sign_req_pool or pending_sign_req_map" )); + } ctx->repair_iter = fd_forest_iter_init( ctx->forest ); FD_TEST( fd_forest_iter_done( ctx->repair_iter, ctx->forest ) ); diff --git a/src/flamenco/repair/Local.mk b/src/flamenco/repair/Local.mk index 2d60eda6853..2edd5b07b8d 100644 --- a/src/flamenco/repair/Local.mk +++ b/src/flamenco/repair/Local.mk @@ -3,5 +3,6 @@ $(call add-hdrs,fd_repair.h) $(call add-objs,fd_repair,fd_flamenco) ifdef FD_HAS_HOSTED #$(call make-bin,fd_repair_tool,fd_repair_tool,fd_flamenco fd_ballet fd_util) +$(call make-unit-test,test_repair,test_repair,fd_flamenco fd_ballet fd_util) endif endif diff --git a/src/flamenco/repair/fd_repair.c b/src/flamenco/repair/fd_repair.c index 9c6e02b6232..3feacbfb392 100644 --- a/src/flamenco/repair/fd_repair.c +++ b/src/flamenco/repair/fd_repair.c @@ -40,6 +40,12 @@ fd_repair_new ( void * shmem, ulong seed ) { glob->peer_idx = 0; glob->actives_random_seed = 0; + /* Initialize pending sign request pool and map */ + shm = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_pending_sign_req_pool_align(), fd_repair_pending_sign_req_pool_footprint( FD_REPAIR_PENDING_SIGN_REQ_MAX ) ); + glob->pending_sign_req_pool = fd_repair_pending_sign_req_pool_join( fd_repair_pending_sign_req_pool_new( shm, FD_REPAIR_PENDING_SIGN_REQ_MAX ) ); + shm = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_pending_sign_req_map_align(), fd_repair_pending_sign_req_map_footprint( FD_REPAIR_PENDING_SIGN_REQ_MAX ) ); + glob->pending_sign_req_map = fd_repair_pending_sign_req_map_join( fd_repair_pending_sign_req_map_new( shm, FD_REPAIR_PENDING_SIGN_REQ_MAX, seed ) ); + ulong scratch_top = FD_SCRATCH_ALLOC_FINI(l, 1UL); if ( scratch_top > (ulong)shmem + fd_repair_footprint() ) { FD_LOG_ERR(("Enough space not allocated for repair")); @@ -60,6 +66,8 @@ fd_repair_delete ( void * shmap ) { fd_active_table_delete( fd_active_table_leave( glob->actives ) ); fd_inflight_table_delete( fd_inflight_table_leave( glob->dupdetect ) ); fd_pinged_table_delete( fd_pinged_table_leave( glob->pinged ) ); + fd_repair_pending_sign_req_pool_delete( fd_repair_pending_sign_req_pool_leave( glob->pending_sign_req_pool ) ); + fd_repair_pending_sign_req_map_delete( fd_repair_pending_sign_req_map_leave( glob->pending_sign_req_map ) ); return glob; } @@ -366,7 +374,7 @@ fd_repair_create_inflight_request( fd_repair_t * glob, int type, ulong slot, uin dupelem->last_send_time = 0L; } - if( FD_LIKELY( dupelem->last_send_time+(long)40e6 < now ) ) { /* 40ms */ + if( FD_LIKELY( dupelem->last_send_time+(long)80e6 < now ) ) { /* 80ms */ dupelem->last_send_time = now; dupelem->req_cnt = FD_REPAIR_NUM_NEEDED_PEERS; return 1; @@ -525,3 +533,84 @@ fd_repair_metrics_t * fd_repair_get_metrics( fd_repair_t * repair ) { return &repair->metrics; } + +/* Pending Sign Request API + + These functions manage the pool and map of pending sign requests in + the repair module. Each request is identified by a unique nonce, + allowing for nonce to be used as a key in the map. + + fd_repair_pending_sign_req_t * fd_repair_acquire_pending_request(...); + Acquires an empty pending sign request from the pool. Returns + pointer or NULL if pool is full. Caller is responsible for setting + all fields before adding to map. + + int fd_repair_add_pending_to_map(...); + Adds a pending sign request to the map. Returns 0 on success, -1 on + failure. The pending request must be previously acquired from the + pool. + + fd_repair_pending_sign_req_t * fd_repair_find_pending_request(...); + Finds a pending sign request by nonce. Returns pointer or NULL. + + int fd_repair_remove_pending_request(...); + Removes a pending sign request by nonce. Returns 0 on success, -1 + if not found. + + All functions assume the repair context is valid and not used concurrently. +*/ + +fd_repair_pending_sign_req_t * +fd_repair_insert_pending_request( fd_repair_t * repair, + fd_repair_protocol_t * protocol, + uint dst_ip_addr, + ushort dst_port, + enum fd_needed_elem_type type, + ulong slot, + uint shred_index, + long now, + fd_pubkey_t const * recipient ) { + /* Check if there is any space for a new pending sign request */ + if( FD_UNLIKELY( fd_repair_pending_sign_req_pool_free( repair->pending_sign_req_pool ) == 0 ) ) { + return NULL; + } + + fd_repair_pending_sign_req_t * pending = fd_repair_pending_sign_req_pool_ele_acquire( repair->pending_sign_req_pool ); + if (FD_UNLIKELY( !pending ) ) { + return NULL; + } + + pending->nonce = repair->next_nonce; + + fd_repair_pending_sign_req_map_ele_insert( repair->pending_sign_req_map, pending, repair->pending_sign_req_pool ); + + fd_repair_construct_request_protocol( repair, protocol, type, slot, shred_index, recipient, repair->next_nonce, now ); + + pending->sig_offset = 4; + pending->dst_ip_addr = dst_ip_addr; + pending->dst_port = dst_port; + pending->recipient = *recipient; + + repair->metrics.send_pkt_cnt++; + repair->next_nonce++; + return pending; +} + +fd_repair_pending_sign_req_t * +fd_repair_query_pending_request( fd_repair_t * repair, + ulong nonce ) { + return fd_repair_pending_sign_req_map_ele_query( repair->pending_sign_req_map, &nonce, NULL, repair->pending_sign_req_pool ); +} + +int +fd_repair_remove_pending_request( fd_repair_t * repair, + ulong nonce ) { + fd_repair_pending_sign_req_t * pending = fd_repair_pending_sign_req_map_ele_query( repair->pending_sign_req_map, &nonce, NULL, repair->pending_sign_req_pool ); + if( FD_UNLIKELY( !pending ) ) { + return -1; + } + + fd_repair_pending_sign_req_map_ele_remove( repair->pending_sign_req_map, &nonce, NULL, repair->pending_sign_req_pool ); + fd_repair_pending_sign_req_pool_ele_release( repair->pending_sign_req_pool, pending ); + return 0; +} diff --git a/src/flamenco/repair/fd_repair.h b/src/flamenco/repair/fd_repair.h index 6df5cb5b3b4..d6673435068 100644 --- a/src/flamenco/repair/fd_repair.h +++ b/src/flamenco/repair/fd_repair.h @@ -33,7 +33,11 @@ /* Sha256 pre-image size for pings */ #define FD_PING_PRE_IMAGE_SZ (48UL) /* Number of peers to send requests to. */ -#define FD_REPAIR_NUM_NEEDED_PEERS (2) +#define FD_REPAIR_NUM_NEEDED_PEERS (1) +/* Max number of pending sign requests */ +#define FD_REPAIR_PENDING_SIGN_REQ_MAX (1<<10) +/* Maximum size for sign buffer, typically <= 160 bytes (e.g., pings, repairs) */ +#define FD_REPAIR_MAX_SIGN_BUF_SIZE (256UL) typedef fd_gossip_peer_addr_t fd_repair_peer_addr_t; @@ -165,6 +169,28 @@ typedef struct fd_pinged_elem fd_pinged_elem_t; #define MAP_T fd_pinged_elem_t #include "../../util/tmpl/fd_map_giant.c" +/* Pending sign request structure for async request handling */ +struct fd_repair_pending_sign_req { + ulong nonce; /* map key, unique nonce */ + ulong next; /* used internally by fd_map_chain */ + uchar buf[FD_REPAIR_MAX_SIGN_BUF_SIZE]; + ulong buflen; + ulong sig_offset; + uint dst_ip_addr; + ushort dst_port; + fd_pubkey_t recipient; +}; +typedef struct fd_repair_pending_sign_req fd_repair_pending_sign_req_t; + +#define POOL_NAME fd_repair_pending_sign_req_pool +#define POOL_T fd_repair_pending_sign_req_t +#include "../../util/tmpl/fd_pool.c" + +#define MAP_NAME fd_repair_pending_sign_req_map +#define MAP_KEY nonce +#define MAP_ELE_T fd_repair_pending_sign_req_t +#include "../../util/tmpl/fd_map_chain.c" + struct fd_peer { fd_pubkey_t key; fd_ip4_port_t ip4; @@ -238,6 +264,9 @@ struct fd_repair { fd_vote_stake_weight_t * stake_weights_temp; /* Path to the file where we write the cache of known good repair peers, to make cold booting faster */ int good_peer_cache_file_fd; + /* Pending sign requests for async operations */ + fd_repair_pending_sign_req_t * pending_sign_req_pool; + fd_repair_pending_sign_req_map_t * pending_sign_req_map; /* Metrics */ fd_repair_metrics_t metrics; }; @@ -249,13 +278,16 @@ fd_repair_align ( void ) { return 128UL; } FD_FN_CONST static inline ulong fd_repair_footprint( void ) { ulong l = FD_LAYOUT_INIT; - l = FD_LAYOUT_APPEND( l, alignof(fd_repair_t), sizeof(fd_repair_t) ); - l = FD_LAYOUT_APPEND( l, fd_active_table_align(), fd_active_table_footprint(FD_ACTIVE_KEY_MAX) ); - l = FD_LAYOUT_APPEND( l, fd_inflight_table_align(), fd_inflight_table_footprint(FD_NEEDED_KEY_MAX) ); - l = FD_LAYOUT_APPEND( l, fd_pinged_table_align(), fd_pinged_table_footprint(FD_REPAIR_PINGED_MAX) ); + l = FD_LAYOUT_APPEND( l, alignof(fd_repair_t), sizeof(fd_repair_t) ); + l = FD_LAYOUT_APPEND( l, fd_active_table_align(), fd_active_table_footprint(FD_ACTIVE_KEY_MAX) ); + l = FD_LAYOUT_APPEND( l, fd_inflight_table_align(), fd_inflight_table_footprint(FD_NEEDED_KEY_MAX) ); + l = FD_LAYOUT_APPEND( l, fd_pinged_table_align(), fd_pinged_table_footprint(FD_REPAIR_PINGED_MAX) ); /* regular and temp stake weights */ - l = FD_LAYOUT_APPEND( l, alignof(fd_vote_stake_weight_t), FD_STAKE_WEIGHTS_MAX * sizeof(fd_vote_stake_weight_t) ); - l = FD_LAYOUT_APPEND( l, alignof(fd_vote_stake_weight_t), FD_STAKE_WEIGHTS_MAX * sizeof(fd_vote_stake_weight_t) ); + l = FD_LAYOUT_APPEND( l, alignof(fd_vote_stake_weight_t), FD_STAKE_WEIGHTS_MAX * sizeof(fd_vote_stake_weight_t) ); + l = FD_LAYOUT_APPEND( l, alignof(fd_vote_stake_weight_t), FD_STAKE_WEIGHTS_MAX * sizeof(fd_vote_stake_weight_t) ); + /* pending sign request structures */ + l = FD_LAYOUT_APPEND( l, fd_repair_pending_sign_req_pool_align(), fd_repair_pending_sign_req_pool_footprint( FD_REPAIR_PENDING_SIGN_REQ_MAX ) ); + l = FD_LAYOUT_APPEND( l, fd_repair_pending_sign_req_map_align(), fd_repair_pending_sign_req_map_footprint( FD_REPAIR_PENDING_SIGN_REQ_MAX ) ); return FD_LAYOUT_FINI(l, fd_repair_align() ); } @@ -330,5 +362,24 @@ void fd_repair_set_stake_weights_fini( fd_repair_t * repair ); fd_repair_metrics_t * fd_repair_get_metrics( fd_repair_t * repair ); +/* Pending sign request operations */ +fd_repair_pending_sign_req_t * +fd_repair_insert_pending_request( fd_repair_t * repair, + fd_repair_protocol_t * protocol, + uint dst_ip_addr, + ushort dst_port, + enum fd_needed_elem_type type, + ulong slot, + uint shred_index, + long now, + fd_pubkey_t const * recipient ); + +fd_repair_pending_sign_req_t * +fd_repair_query_pending_request( fd_repair_t * repair, + ulong nonce ); + +int +fd_repair_remove_pending_request( fd_repair_t * repair, + ulong nonce ); #endif /* HEADER_fd_src_flamenco_repair_fd_repair_h */ diff --git a/src/flamenco/repair/test_repair.c b/src/flamenco/repair/test_repair.c new file mode 100644 index 00000000000..bcf12cf3eca --- /dev/null +++ b/src/flamenco/repair/test_repair.c @@ -0,0 +1,412 @@ +#include "../fd_flamenco_base.h" +#include "fd_repair.h" +#include "../../util/fd_util.h" +#include + +/* init repair test */ +static fd_repair_t * +test_repair_setup( void ) { + ulong footprint = fd_repair_footprint(); + void * shmem = aligned_alloc( fd_repair_align(), footprint ); + FD_TEST( shmem ); + fd_repair_t * repair = fd_repair_join( fd_repair_new( shmem, 14919811UL ) ); + FD_TEST( repair ); + return repair; +} + +/* cleanup repair test*/ +static void +test_repair_cleanup( fd_repair_t * repair ) { + void * shmem = fd_repair_leave( repair ); + fd_repair_delete( shmem ); + free( shmem ); +} + +/* create a test buffer */ +static void +create_test_buffer( uchar * buf, ulong buflen ) { + for( ulong i = 0; i < buflen; i++ ) { + buf[i] = (uchar)(i & 0xFF); + } +} + +/* test helper to create a seed generated recipient */ +static void +create_test_recipient( fd_pubkey_t * recipient, ulong seed ) { +/* The range of values written to recipient->uc[i] is [0, 255] for + each i in [0, 31], as each byte is set to (seed + i) & 0xFF. */ + for( uint i = 0; i < 32; i++ ) { + recipient->uc[i] = (uchar)((seed + i) & 0xFF); + } +} + +/* test helper to add a pending request (similar logic to fd_repair_send_request_async) */ +static int +test_add_pending_request( fd_repair_t * repair, + ulong nonce, + uchar const * buf, + ulong buflen, + ulong sig_offset, + uint dst_ip_addr, + ushort dst_port, + fd_pubkey_t const * recipient ) { + /* Check if there is any space for a new pending sign request */ + if( FD_UNLIKELY( fd_repair_pending_sign_req_pool_free( repair->pending_sign_req_pool ) == 0 ) ) { + return -1; + } + + if( buflen > FD_REPAIR_MAX_SIGN_BUF_SIZE ) { + return -1; + } + + fd_repair_pending_sign_req_t * pending = fd_repair_pending_sign_req_pool_ele_acquire( repair->pending_sign_req_pool ); + if( !pending ) { + return -1; + } + + pending->nonce = nonce; + pending->buflen = buflen; + pending->sig_offset = sig_offset; + pending->dst_ip_addr = dst_ip_addr; + pending->dst_port = dst_port; + pending->recipient = *recipient; + + fd_memcpy( pending->buf, buf, buflen ); + + /* Add to map */ + fd_repair_pending_sign_req_map_ele_insert( repair->pending_sign_req_map, pending, repair->pending_sign_req_pool ); + + return 0; +} + +/* basic pending sign request operations (add, find, remove) */ +static void +test_pending_sign_requests_basic( void ) { + FD_LOG_NOTICE(( "Testing basic pending sign request operations" )); + + fd_repair_t * repair = test_repair_setup(); + + FD_TEST( fd_repair_pending_sign_req_pool_free( repair->pending_sign_req_pool ) == FD_REPAIR_PENDING_SIGN_REQ_MAX ); + FD_TEST( fd_repair_query_pending_request( repair, 14919811UL ) == NULL ); + + uchar test_buf[128]; + create_test_buffer( test_buf, sizeof(test_buf) ); + fd_pubkey_t recipient; + create_test_recipient( &recipient, 0x1234 ); + + ulong nonce = 14919811UL; + uint dst_ip = 14919811UL; + ushort dst_port = 8080UL; + ulong sig_offset = 4; + + int result = test_add_pending_request( repair, nonce, test_buf, sizeof(test_buf), + sig_offset, dst_ip, dst_port, &recipient ); + FD_TEST( result == 0 ); + FD_TEST( fd_repair_pending_sign_req_pool_free( repair->pending_sign_req_pool ) == FD_REPAIR_PENDING_SIGN_REQ_MAX - 1 ); + + fd_repair_pending_sign_req_t * pending = fd_repair_query_pending_request( repair, nonce ); + FD_TEST( pending != NULL ); + FD_TEST( pending->nonce == nonce ); + FD_TEST( pending->buflen == sizeof(test_buf) ); + FD_TEST( pending->sig_offset == sig_offset ); + FD_TEST( pending->dst_ip_addr == dst_ip ); + FD_TEST( pending->dst_port == dst_port ); + FD_TEST( memcmp( &pending->recipient, &recipient, sizeof(fd_pubkey_t) ) == 0 ); + FD_TEST( memcmp( pending->buf, test_buf, sizeof(test_buf) ) == 0 ); + + result = fd_repair_remove_pending_request( repair, nonce ); + FD_TEST( result == 0 ); + FD_TEST( fd_repair_pending_sign_req_pool_free( repair->pending_sign_req_pool ) == FD_REPAIR_PENDING_SIGN_REQ_MAX ); + FD_TEST( fd_repair_query_pending_request( repair, nonce ) == NULL ); + + result = fd_repair_remove_pending_request( repair, nonce ); + FD_TEST( result == -1 ); + + test_repair_cleanup( repair ); + FD_LOG_NOTICE(( "Basic pending sign request tests PASS" )); +} + +static void +test_pending_sign_requests_multiple( void ) { + FD_LOG_NOTICE(( "Testing multiple pending sign requests" )); + + fd_repair_t * repair = test_repair_setup(); + + const ulong num_requests = 10; + ulong nonces[num_requests]; + fd_pubkey_t recipients[num_requests]; + + for( ulong i = 0; i < num_requests; i++ ) { + nonces[i] = 1000 + i; + create_test_recipient( &recipients[i], i ); + + uchar test_buf[64]; + create_test_buffer( test_buf, sizeof(test_buf) ); + + int result = test_add_pending_request( repair, nonces[i], test_buf, sizeof(test_buf), + 4, 0x7F000001, (ushort)(8080 + i), &recipients[i] ); + FD_TEST( result == 0 ); + } + + FD_TEST( fd_repair_pending_sign_req_pool_free( repair->pending_sign_req_pool ) == FD_REPAIR_PENDING_SIGN_REQ_MAX - num_requests ); + + for( ulong i = 0; i < num_requests; i++ ) { + fd_repair_pending_sign_req_t * pending = fd_repair_query_pending_request( repair, nonces[i] ); + FD_TEST( pending != NULL ); + FD_TEST( pending->nonce == nonces[i] ); + FD_TEST( pending->dst_port == (ushort)(8080 + i) ); + FD_TEST( memcmp( &pending->recipient, &recipients[i], sizeof(fd_pubkey_t) ) == 0 ); + } + + for( ulong i = num_requests; i > 0; i-- ) { + int result = fd_repair_remove_pending_request( repair, nonces[i-1] ); + FD_TEST( result == 0 ); + + FD_TEST( fd_repair_query_pending_request( repair, nonces[i-1] ) == NULL ); + + for( ulong j = i - 1; j > 0; j-- ) { + FD_TEST( fd_repair_query_pending_request( repair, nonces[j-1] ) != NULL ); + } + } + + FD_TEST( fd_repair_pending_sign_req_pool_free( repair->pending_sign_req_pool ) == FD_REPAIR_PENDING_SIGN_REQ_MAX ); + + test_repair_cleanup( repair ); + FD_LOG_NOTICE(( "Multiple pending sign request tests PASS" )); +} + +/* out-of-order pending sign request operations (change order of add, find, remove)*/ +static void +test_pending_sign_requests_out_of_order( void ) { + FD_LOG_NOTICE(( "Testing out-of-order pending sign request operations" )); + + fd_repair_t * repair = test_repair_setup(); + + ulong nonces[] = { 100, 50, 200, 25, 150, 75, 300 }; + const ulong num_requests = sizeof(nonces) / sizeof(nonces[0]); + + for( ulong i = 0; i < num_requests; i++ ) { + fd_pubkey_t recipient; + create_test_recipient( &recipient, nonces[i] ); + + uchar test_buf[32]; + create_test_buffer( test_buf, sizeof(test_buf) ); + + int result = test_add_pending_request( repair, nonces[i], test_buf, sizeof(test_buf), + 4, 0x7F000001, 8080, &recipient ); + FD_TEST( result == 0 ); + } + + ulong find_order[] = { 100, 50, 200, 25, 150, 75, 300 }; + const ulong num_finds = sizeof(find_order) / sizeof(find_order[0]); + for( ulong i = 0; i < num_finds; i++ ) { + fd_repair_pending_sign_req_t * pending = fd_repair_query_pending_request( repair, find_order[i] ); + FD_TEST( pending != NULL ); + FD_TEST( pending->nonce == find_order[i] ); + } + + ulong removal_order[] = { 200, 25, 300, 50, 75, 100, 150 }; + for( ulong i = 0; i < num_requests; i++ ) { + int result = fd_repair_remove_pending_request( repair, removal_order[i] ); + FD_TEST( result == 0 ); + + FD_TEST( fd_repair_query_pending_request( repair, removal_order[i] ) == NULL ); + + for( ulong j = i + 1; j < num_requests; j++ ) { + FD_TEST( fd_repair_query_pending_request( repair, removal_order[j] ) != NULL ); + } + } + + FD_TEST( fd_repair_pending_sign_req_pool_free( repair->pending_sign_req_pool ) == FD_REPAIR_PENDING_SIGN_REQ_MAX ); + + test_repair_cleanup( repair ); + FD_LOG_NOTICE(( "Out-of-order pending sign request tests PASS" )); +} + +/* test edge cases and error conditions */ +static void +test_pending_sign_requests_edge_cases( void ) { + FD_LOG_NOTICE(( "Testing edge cases and error conditions" )); + + fd_repair_t * repair = test_repair_setup(); + + fd_pubkey_t recipient; + create_test_recipient( &recipient, 0x5678 ); + + /* zero-length buffer */ + uchar test_buf[1]; + create_test_buffer( test_buf, sizeof(test_buf) ); + + int result = test_add_pending_request( repair, 1, test_buf, 0, + 0, 14919811UL, 8080, &recipient ); + FD_TEST( result == 0 ); + + fd_repair_pending_sign_req_t * pending = fd_repair_query_pending_request( repair, 1 ); + FD_TEST( pending != NULL ); + FD_TEST( pending->buflen == 0 ); + fd_repair_remove_pending_request( repair, 1 ); + + /* maximum buffer size */ + uchar max_buf[FD_REPAIR_MAX_SIGN_BUF_SIZE]; + create_test_buffer( max_buf, sizeof(max_buf) ); + + result = test_add_pending_request( repair, 2, max_buf, sizeof(max_buf), + 4, 14919811UL, 8080, &recipient ); + FD_TEST( result == 0 ); + + pending = fd_repair_query_pending_request( repair, 2 ); + FD_TEST( pending != NULL ); + FD_TEST( pending->buflen == FD_REPAIR_MAX_SIGN_BUF_SIZE ); + fd_repair_remove_pending_request( repair, 2 ); + + /* oversized buffer */ + uchar oversized_buf[FD_REPAIR_MAX_SIGN_BUF_SIZE + 1]; + result = test_add_pending_request( repair, 3, oversized_buf, sizeof(oversized_buf), + 4, 14919811UL, 8080, &recipient ); + FD_TEST( result == -1 ); + + /* repeat nonce, will overwrite the first one */ + result = test_add_pending_request( repair, 100, test_buf, sizeof(test_buf), + 4, 16120512UL, 8080, &recipient ); + FD_TEST( result == 0 ); + + fd_repair_pending_sign_req_t * pending2 = fd_repair_pending_sign_req_pool_ele_acquire( repair->pending_sign_req_pool ); + pending2->nonce = 100; + pending2->dst_ip_addr = 16120512UL; + + fd_repair_pending_sign_req_map_ele_insert( repair->pending_sign_req_map, pending2, repair->pending_sign_req_pool ); + + FD_TEST( fd_repair_query_pending_request( repair, 100 )->dst_ip_addr == 16120512UL ); + + fd_repair_remove_pending_request( repair, 100 ); + + test_repair_cleanup( repair ); + FD_LOG_NOTICE(( "Edge case tests PASS" )); +} + +/* pool exhaustion, and cleanup */ +static void +test_pending_sign_requests_pool_exhaustion( void ) { + FD_LOG_NOTICE(( "Testing pool exhaustion" )); + + fd_repair_t * repair = test_repair_setup(); + + fd_pubkey_t recipient; + create_test_recipient( &recipient, 0x9ABC ); + uchar test_buf[32]; + create_test_buffer( test_buf, sizeof(test_buf) ); + + /* reach pool limit*/ + for( ulong i = 0; i < FD_REPAIR_PENDING_SIGN_REQ_MAX; i++ ) { + int result = test_add_pending_request( repair, i, test_buf, sizeof(test_buf), + 4, 0x7F000001, 8080, &recipient ); + FD_TEST( result == 0 ); + } + + FD_TEST( fd_repair_pending_sign_req_pool_free( repair->pending_sign_req_pool ) == 0 ); + + /* try to add one more thiis should fail */ + int result = test_add_pending_request( repair, FD_REPAIR_PENDING_SIGN_REQ_MAX, + test_buf, sizeof(test_buf), + 4, 0x7F000001, 8080, &recipient ); + FD_TEST( result == -1 ); + + /* remove and readd requests, should be able to add again */ + for( ulong i = 0; i < 10; i++ ) { + result = fd_repair_remove_pending_request( repair, i ); + FD_TEST( result == 0 ); + } + + FD_TEST( fd_repair_pending_sign_req_pool_free( repair->pending_sign_req_pool ) == 10 ); + + for( ulong i = 0; i < 5; i++ ) { + result = test_add_pending_request( repair, FD_REPAIR_PENDING_SIGN_REQ_MAX + i, + test_buf, sizeof(test_buf), + 4, 0x7F000001, 8080, &recipient ); + FD_TEST( result == 0 ); + } + + test_repair_cleanup( repair ); + FD_LOG_NOTICE(( "Pool exhaustion tests PASS" )); +} + +/* multiple data test */ +static void +test_pending_sign_requests_multiple_data( void ) { + FD_LOG_NOTICE(( "Testing multiple data" )); + + fd_repair_t * repair = test_repair_setup(); + + struct { + ulong nonce; + uchar pattern; + ulong buflen; + uint ip; + ushort port; + ulong sig_offset; + } test_cases[] = { + { 1, 0x00, 16, 0x7F000001, 8001, 0 }, + { 2, 0xFF, 32, 0xC0A80101, 8002, 4 }, + { 3, 0xAA, 64, 0x08080808, 8003, 8 }, + { 4, 0x55, 128, 0x01010101, 8004, 16 } + }; + + const ulong num_cases = sizeof(test_cases) / sizeof(test_cases[0]); + + for( ulong i = 0; i < num_cases; i++ ) { + uchar test_buf[128]; + memset( test_buf, test_cases[i].pattern, test_cases[i].buflen ); + + fd_pubkey_t recipient; + create_test_recipient( &recipient, test_cases[i].nonce ); + + int result = test_add_pending_request( repair, test_cases[i].nonce, + test_buf, test_cases[i].buflen, + test_cases[i].sig_offset, + test_cases[i].ip, + test_cases[i].port, + &recipient ); + FD_TEST( result == 0 ); + } + + for( ulong i = 0; i < num_cases; i++ ) { + fd_repair_pending_sign_req_t * pending = fd_repair_query_pending_request( repair, test_cases[i].nonce ); + FD_TEST( pending != NULL ); + + FD_TEST( pending->nonce == test_cases[i].nonce ); + FD_TEST( pending->buflen == test_cases[i].buflen ); + FD_TEST( pending->sig_offset == test_cases[i].sig_offset ); + FD_TEST( pending->dst_ip_addr == test_cases[i].ip ); + FD_TEST( pending->dst_port == test_cases[i].port ); + + for( ulong j = 0; j < test_cases[i].buflen; j++ ) { + FD_TEST( pending->buf[j] == test_cases[i].pattern ); + } + + fd_pubkey_t expected_recipient; + create_test_recipient( &expected_recipient, test_cases[i].nonce ); + FD_TEST( memcmp( &pending->recipient, &expected_recipient, sizeof(fd_pubkey_t) ) == 0 ); + } + + test_repair_cleanup( repair ); + FD_LOG_NOTICE(( "Multiple data tests PASS" )); +} + +int +main( int argc, + char ** argv ) { + fd_boot( &argc, &argv ); + + FD_LOG_NOTICE(( "Testing fd_repair pending sign request functions" )); + test_pending_sign_requests_basic(); + test_pending_sign_requests_multiple(); + test_pending_sign_requests_out_of_order(); + test_pending_sign_requests_edge_cases(); + test_pending_sign_requests_pool_exhaustion(); + test_pending_sign_requests_multiple_data(); + + FD_LOG_NOTICE(( "All pending sign request tests PASS" )); + + fd_halt(); + return 0; +}