Skip to content

Commit e0ac859

Browse files
committed
Repair & Sign: async repair requests
1 parent 91cd09c commit e0ac859

File tree

7 files changed

+52
-72
lines changed

7 files changed

+52
-72
lines changed

src/app/firedancer/config/default.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -689,10 +689,10 @@ user = ""
689689
shred_tile_count = 1
690690

691691
# How many sign tiles to run. Should be set to 2. This is
692-
# configurable and designed to scale for the repair tile.
692+
# configurable and designed to scale for the repair tile.
693693
# The 0th tile gets used by other tiles for signing messages. While
694694
# the remaining tiles distribute the workload of signing repair
695-
# requests.
695+
# requests.
696696
sign_tile_count = 2
697697

698698
# All memory that will be used in Firedancer is pre-allocated in two

src/app/firedancer/topology.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -348,15 +348,15 @@ fd_topo_initialize( config_t * config ) {
348348
/**/ fd_topob_link( topo, "send_net", "net_send", config->net.ingress_buffer_size, FD_NET_MTU, 2UL );
349349
350350
/**/ fd_topob_link( topo, "repair_net", "net_repair", config->net.ingress_buffer_size, FD_NET_MTU, 1UL );
351-
/**/ fd_topob_link( topo, "ping_sign", "repair_sign", 128UL, 2048UL, 1UL );
351+
/**/ fd_topob_link( topo, "ping_sign", "repair_sign", 1024UL, 2048UL, 1UL );
352352
FOR(shred_tile_cnt) fd_topob_link( topo, "shred_repair", "shred_repair", pending_fec_shreds_depth, FD_SHRED_REPAIR_MTU, 2UL /* at most 2 msgs per after_frag */ );
353353

354354

355355
FOR(shred_tile_cnt) fd_topob_link( topo, "repair_shred", "shred_repair", pending_fec_shreds_depth, sizeof(fd_ed25519_sig_t), 1UL );
356356
357-
/**/ fd_topob_link( topo, "sign_ping", "repair_sign", 128UL, 64UL, 1UL );
358-
FOR(sign_tile_cnt-1) fd_topob_link( topo, "repair_sign", "repair_sign", 128UL, 2048UL, 1UL );
359-
FOR(sign_tile_cnt-1) fd_topob_link( topo, "sign_repair", "repair_sign", 128UL, 64UL, 1UL );
357+
/**/ fd_topob_link( topo, "sign_ping", "repair_sign", 1024UL, 64UL, 1UL );
358+
FOR(sign_tile_cnt-1) fd_topob_link( topo, "repair_sign", "repair_sign", 1024UL, 2048UL, 1UL );
359+
FOR(sign_tile_cnt-1) fd_topob_link( topo, "sign_repair", "repair_sign", 1024UL, 64UL, 1UL );
360360

361361
/**/ fd_topob_link( topo, "repair_repla", "repair_repla", 65536UL, sizeof(fd_fec_out_t), 1UL );
362362
FOR(bank_tile_cnt) fd_topob_link( topo, "replay_poh", "replay_poh", 128UL, (4096UL*sizeof(fd_txn_p_t))+sizeof(fd_microblock_trailer_t), 1UL );

src/disco/shred/fd_shred_tile.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -898,7 +898,6 @@ after_frag( fd_shred_ctx_t * ctx,
898898
ulong sz = fd_shred_header_sz( shred->variant );
899899
fd_memcpy( fd_chunk_to_laddr( ctx->repair_out_mem, ctx->repair_out_chunk ), shred, sz );
900900
ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
901-
// FD_LOG_INFO(("Published shred %lu %u %u, time: %ld", shred->slot, shred->idx, shred->fec_set_idx, fd_log_wallclock()));
902901
fd_stem_publish( stem, ctx->repair_out_idx, sig, ctx->repair_out_chunk, sz, 0UL, ctx->tsorig, tspub );
903902
ctx->repair_out_chunk = fd_dcache_compact_next( ctx->repair_out_chunk, sz, ctx->repair_out_chunk0, ctx->repair_out_wmark );
904903
}

src/disco/sign/fd_sign_tile.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ during_frag_sensitive( void * _ctx,
129129
ulong mtu = ctx->in[ in_idx ].mtu;
130130

131131
if( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>mtu ) {
132-
FD_LOG_EMERG(( "oversz or out of bounds signing request (role=%d chunk=%lu sz=%lu mtu=%lu, chunk0=%lu, wmark=%lu)", role, chunk, sz, mtu, ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
132+
FD_LOG_EMERG(( "oversz or out of bounds signing request (role=%d chunk=%lu sz=%lu mtu=%lu, chunk0=%lu, wmark=%lu)",
133+
role, chunk, sz, mtu, ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
133134
}
134135

135136
void * src = fd_chunk_to_laddr( ctx->in[ in_idx ].mem, chunk );

src/discof/repair/fd_repair_tile.c

Lines changed: 43 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,8 @@ struct fd_repair_tile_ctx {
130130

131131
fd_wksp_t * wksp;
132132

133+
fd_stem_context_t * stem;
134+
133135
uchar in_kind[ MAX_IN_LINKS ];
134136
fd_repair_in_ctx_t in_links[ MAX_IN_LINKS ];
135137

@@ -235,19 +237,19 @@ repair_signer_async( void * signer_ctx,
235237
ulong len,
236238
int sign_type ) {
237239
fd_repair_tile_ctx_t * ctx = (fd_repair_tile_ctx_t *) signer_ctx;
238-
240+
239241
if( FD_UNLIKELY( ctx->repair_sign_cnt == 0 ) ) {
240242
FD_LOG_ERR(( "No repair_sign links configured for async signing" ));
241243
}
242-
244+
243245
uint round_robin_idx = (uint)(nonce % ctx->repair_sign_cnt);
244246
fd_repair_out_ctx_t * sign_out = &ctx->repair_sign_out_ctx[ round_robin_idx ];
245-
247+
246248
uchar * dst = fd_chunk_to_laddr( sign_out->mem, sign_out->chunk );
247249
fd_memcpy( dst, buffer, len );
248-
250+
249251
ulong sig = ((ulong)nonce << 32) | (ulong)(uint)sign_type;
250-
fd_stem_publish( ctx->stem, sign_out->idx, sig, sign_out->chunk, len, 0UL, 0UL, 0UL );
252+
fd_stem_publish( ctx->stem, sign_out->idx, sig, sign_out->chunk, len, 0UL, 0UL, 0UL );
251253
sign_out->chunk = fd_dcache_compact_next( sign_out->chunk, len, sign_out->chunk0, sign_out->wmark );
252254

253255
ctx->request_seq = fd_seq_inc( ctx->request_seq, 1UL );
@@ -409,7 +411,7 @@ fd_repair_sign_and_send( fd_repair_tile_ctx_t * repair_tile_ctx,
409411
if( is_async ) {
410412
repair_signer_async( repair_tile_ctx, nonce, buf, buflen, FD_KEYGUARD_SIGN_TYPE_ED25519 );
411413
return 0UL;
412-
414+
413415
/* If sync, we sign using keyguard */
414416
} else {
415417
fd_signature_t sig;
@@ -424,7 +426,7 @@ fd_repair_sign_and_send( fd_repair_tile_ctx_t * repair_tile_ctx,
424426
}
425427
}
426428

427-
/* REPAIR TILE REQUEST HANDLING ARCHITECTURE
429+
/* REQUEST HANDLING ARCHITECTURE
428430
=========================================
429431
430432
The repair tile implements two distinct request handling patterns
@@ -440,11 +442,7 @@ fd_repair_sign_and_send( fd_repair_tile_ctx_t * repair_tile_ctx,
440442
- PINGs & PONGs: Handles peer connectivity and liveness with simple
441443
round-trip messages.
442444
443-
- PEER WARM UPs: On receiving peer information in
444-
handle_new_cluster_contact_info, we prepay the RTT cost by sending
445-
a placeholder Repair request immediately.
446-
447-
2. ASYNCHRONOUS REQUEST HANDLING
445+
2. ASYNCHRONOUS REQUEST HANDLING
448446
--------------------------------
449447
Used strictly for repair requests. These requests are sent to the
450448
sign tile, and the repair tile continues handling other operations
@@ -456,15 +454,19 @@ fd_repair_sign_and_send( fd_repair_tile_ctx_t * repair_tile_ctx,
456454
- WINDOW_INDEX (exact shred): Requests for a specific shred at a
457455
known slot and index. Used when the repair tile knows exactly
458456
which shred is missing from a FEC set.
459-
457+
460458
- HIGHEST_WINDOW_INDEX: Requests for the highest shred in a slot.
461459
Used to determine the end boundary of a slot when the exact count
462460
is unknown.
463-
461+
464462
- ORPHAN: Requests for the highest shred in the parent slot of an
465463
orphaned slot. Used to establish the chain of slot ancestry when a
466464
slot's parent is missing.
467465
466+
- PEER WARM UPs: On receiving peer information in
467+
handle_new_cluster_contact_info, we prepay the RTT cost by sending
468+
a placeholder Repair request immediately.
469+
468470
Async requests can be distributed across multiple sign tiles using
469471
round-robin based on the request nonce. This provides load balancing
470472
and prevents any single sign tile from becoming a bottleneck. */
@@ -519,32 +521,29 @@ fd_repair_send_request_async( fd_repair_tile_ctx_t * ctx,
519521
long now ){
520522
fd_active_elem_t * peer = fd_active_table_query(glob->actives, recipient, NULL);
521523
if (!peer) return;
522-
524+
523525
uint nonce = (uint)glob->next_nonce;
524526

525527
fd_repair_protocol_t protocol;
526528
fd_repair_construct_request_protocol( glob, &protocol, type, slot, shred_index, recipient, glob->next_nonce, now );
527529
glob->next_nonce++;
528530
glob->metrics.send_pkt_cnt++;
529-
531+
530532
if( FD_UNLIKELY( fd_repair_pending_sign_req_deque_full( ctx->pending_sign_req_deque ) ) ) {
531533
return;
532534
}
533-
535+
534536
fd_repair_pending_sign_req_t * pending = fd_repair_pending_sign_req_deque_push_tail_nocopy( ctx->pending_sign_req_deque );
535-
536-
/* Use the unified sign_and_send function in async mode */
537+
537538
fd_repair_sign_and_send( ctx, &protocol, &peer->addr, pending->buf, sizeof(pending->buf), 1, nonce );
538-
539-
/* Since async requests don't complete the buffer with signature,
540-
we need to track the encoded length */
539+
541540
fd_bincode_encode_ctx_t encode_ctx = { .data = pending->buf, .dataend = pending->buf + sizeof(pending->buf) };
542541
if( FD_UNLIKELY( fd_repair_protocol_encode( &protocol, &encode_ctx ) != FD_BINCODE_SUCCESS ) ) {
543542
FD_LOG_CRIT(( "Failed to encode repair message (type %#x)", protocol.discriminant ));
544543
}
545-
544+
546545
ulong buflen = (ulong)encode_ctx.data - (ulong)pending->buf;
547-
546+
548547
pending->buflen = buflen;
549548
pending->sig_offset = 4;
550549
pending->dst_ip_addr = peer->addr.addr;
@@ -593,12 +592,12 @@ handle_new_cluster_contact_info( fd_repair_tile_ctx_t * ctx,
593592
};
594593
int dup = fd_repair_add_active_peer( ctx->repair, &repair_peer, in_dests[i].pubkey );
595594
if( !dup ) {
596-
/* The repair process uses a Ping-Pong protocol that incurs one
597-
round-trip time (RTT) for the initial repair request. To optimize
598-
this, we proactively send a placeholder Repair request as soon as we
599-
receive a peer's contact information for the first time, effectively
600-
prepaying the RTT cost. */
601-
fd_repair_send_request(ctx, ctx->stem, ctx->repair, 0, 0, 0, in_dests[i].pubkey, fd_log_wallclock());
595+
/* The repair process uses a Ping-Pong protocol that incurs one
596+
round-trip time (RTT) for the initial repair request. To optimize
597+
this, we proactively send a placeholder Repair request as soon as we
598+
receive a peer's contact information for the first time, effectively
599+
prepaying the RTT cost. */
600+
fd_repair_send_request_async(ctx, ctx->stem, ctx->repair, 0, 0, 0, in_dests[i].pubkey, fd_log_wallclock());
602601
ulong hash_src = 0xfffffUL & fd_ulong_hash( (ulong)in_dests[i].ip4_addr | ((ulong)repair_peer.port<<32) );
603602
FD_LOG_INFO(( "Added repair peer: pubkey %s hash_src %lu", FD_BASE58_ENC_32_ALLOCA(in_dests[i].pubkey), hash_src ));
604603
}
@@ -611,7 +610,6 @@ before_frag( fd_repair_tile_ctx_t * ctx,
611610
ulong in_idx,
612611
ulong seq FD_PARAM_UNUSED,
613612
ulong sig ) {
614-
// FD_LOG_NOTICE(( "repair: before_frag %lu", in_idx ));
615613
uint in_kind = ctx->in_kind[ in_idx ];
616614
if( FD_LIKELY( in_kind==IN_KIND_NET ) ) return fd_disco_netmux_sig_proto( sig )!=DST_PROTO_REPAIR;
617615
return 0;
@@ -670,7 +668,7 @@ during_frag( fd_repair_tile_ctx_t * ctx,
670668
fd_memcpy( ctx->buffer, dcache_entry, dcache_entry_sz );
671669
}
672670

673-
static ulong
671+
static ulong FD_FN_UNUSED
674672
fd_repair_send_ping( fd_repair_tile_ctx_t * repair_tile_ctx,
675673
fd_repair_t * glob,
676674
fd_pinged_elem_t * val,
@@ -697,7 +695,7 @@ fd_repair_send_ping( fd_repair_tile_ctx_t * repair_tile_ctx,
697695
return (ulong)((uchar*)ctx.data - buf);
698696
}
699697

700-
static void
698+
static void FD_FN_UNUSED
701699
fd_repair_recv_pong(fd_repair_t * glob, fd_gossip_ping_t const * pong, fd_gossip_peer_addr_t const * from) {
702700
fd_pinged_elem_t * val = fd_pinged_table_query(glob->pinged, from, NULL);
703701
if( val == NULL || !fd_pubkey_eq( &val->id, &pong->from ) )
@@ -731,10 +729,6 @@ fd_repair_recv_pong(fd_repair_t * glob, fd_gossip_ping_t const * pong, fd_gossip
731729
val->good = 1;
732730
}
733731

734-
/* Pass a raw service request packet into the protocol.
735-
src_addr is the address of the sender
736-
dst_ip4_addr is the dst IPv4 address of the incoming packet (i.e. our IP) */
737-
738732
static void
739733
after_frag( fd_repair_tile_ctx_t * ctx,
740734
ulong in_idx,
@@ -747,8 +741,9 @@ after_frag( fd_repair_tile_ctx_t * ctx,
747741

748742
if( FD_UNLIKELY( ctx->skip_frag ) ) return;
749743

744+
ctx->stem = stem;
745+
750746
uint in_kind = ctx->in_kind[ in_idx ];
751-
// FD_LOG_INFO(( "in_idx: %lu, in_kind: %u", in_idx, in_kind ));
752747
if( FD_UNLIKELY( in_kind==IN_KIND_CONTACT ) ) {
753748
handle_new_cluster_contact_info( ctx, ctx->buffer, sz );
754749
return;
@@ -763,37 +758,37 @@ after_frag( fd_repair_tile_ctx_t * ctx,
763758
/* Nonce was packed into sig, so we need to unpack it */
764759
ulong response_nonce = sig >> 32;
765760
fd_repair_pending_sign_req_t pending;
766-
761+
767762
/* Iterate over all pending requests, as every request sent to the
768763
sign tile will be returned. Since the repair_sign links are
769764
reliable, the incoming sign_repair fragments represent a complete
770765
set of the previously sent outgoing messages. However, with
771-
multiple sign tiles, the responses may not arrive in order. But,
766+
multiple sign tiles, the responses may not arrive in order. But,
772767
we can safely process them sequentially as we encounter them in
773768
the deque. */
774769
while( !fd_repair_pending_sign_req_deque_empty( ctx->pending_sign_req_deque ) ) {
775770
fd_repair_pending_sign_req_t * head_req = fd_repair_pending_sign_req_deque_peek_head( ctx->pending_sign_req_deque );
776-
771+
777772
if( head_req->nonce > response_nonce ) {
778773
break;
779774
}
780-
775+
781776
pending = fd_repair_pending_sign_req_deque_pop_head( ctx->pending_sign_req_deque );
782-
777+
783778
if( pending.nonce == response_nonce ) {
784779
fd_memcpy( pending.buf + pending.sig_offset, ctx->buffer, 64UL );
785780
ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
786-
781+
787782
uint src_ip4_addr = 0U;
788783
send_packet( ctx, stem, 1, pending.dst_ip_addr, pending.dst_port, src_ip4_addr, pending.buf, pending.buflen, tsorig );
789784
return;
790785
}
791786
}
792-
787+
793788
FD_LOG_WARNING(( "No matching request found for nonce %lu", response_nonce ));
794789
return;
795790
}
796-
791+
797792
if( FD_UNLIKELY( in_kind==IN_KIND_SHRED ) ) {
798793

799794
/* Initialize the forest, which requires the root to be ready. This
@@ -941,10 +936,6 @@ after_credit( fd_repair_tile_ctx_t * ctx,
941936
int * opt_poll_in,
942937
int * charge_busy ) {
943938

944-
/* TODO: Don't charge the tile as busy if after_credit isn't actually
945-
doing any work. */
946-
*charge_busy = 1;
947-
948939
if( FD_LIKELY( !fd_fec_out_empty( ctx->fec_chainer->out ) && ctx->store ) ) {
949940

950941
fd_fec_out_t out = fd_fec_out_pop_head( ctx->fec_chainer->out );
@@ -1036,7 +1027,6 @@ after_credit( fd_repair_tile_ctx_t * ctx,
10361027
fd_stem_publish( ctx->stem, REPLAY_OUT_IDX, sig, 0, 0, 0, 0, tspub );
10371028
if( FD_UNLIKELY( out.slot_complete ) ) {
10381029
fd_reasm_remove( ctx->reasm, reasm );
1039-
// FD_LOG_INFO(( "SLOT COMPLETE: %lu, time: %ld", out.slot, fd_log_wallclock() ));
10401030
}
10411031
}
10421032
*opt_poll_in = 1;
@@ -1192,17 +1182,7 @@ unprivileged_init( fd_topo_t * topo,
11921182
ctx->in_links[ in_idx ].chunk0 = fd_dcache_compact_chunk0( ctx->in_links[ in_idx ].mem, link->dcache );
11931183
ctx->in_links[ in_idx ].wmark = fd_dcache_compact_wmark ( ctx->in_links[ in_idx ].mem, link->dcache, link->mtu );
11941184
ctx->in_links[ in_idx ].mtu = link->mtu;
1195-
1196-
if( ctx->in_kind[ in_idx ] == IN_KIND_SIGN ) {
1197-
// fd_wksp_t * wksp = fd_wksp_containing( link->dcache );
1198-
// FD_LOG_NOTICE(( "repair tile: link %s[%lu] mem=%p, dcache=%p, wksp=%p, wksp_name=%s, in_idx=%u",
1199-
// link->name, link->kind_id,
1200-
// (void*)ctx->in_links[ in_idx ].mem,
1201-
// link->dcache,
1202-
// (void*)wksp,
1203-
// wksp ? fd_wksp_name( wksp ) : "NULL",
1204-
// in_idx ));
1205-
}
1185+
12061186
FD_TEST( fd_dcache_compact_is_safe( ctx->in_links[in_idx].mem, link->dcache, link->mtu, link->depth ) );
12071187
}
12081188

@@ -1247,7 +1227,7 @@ unprivileged_init( fd_topo_t * topo,
12471227
ctx->shredcap_out_chunk0 = fd_dcache_compact_chunk0( ctx->shredcap_out_mem, link->dcache );
12481228
ctx->shredcap_out_wmark = fd_dcache_compact_wmark( ctx->shredcap_out_mem, link->dcache, link->mtu );
12491229
ctx->shredcap_out_chunk = ctx->shredcap_out_chunk0;
1250-
1230+
12511231
} else if( 0==strcmp( link->name, "ping_sign" ) ) {
12521232
ctx->ping_sign_out_idx = out_idx;
12531233
ctx->ping_sign_out_mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;

src/discof/send/fd_send_tile.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "../../disco/topo/fd_topo.h"
33
#include "../../disco/keyguard/fd_keyload.h"
44
#include "../../disco/keyguard/fd_keyguard.h"
5+
#include "../../disco/fd_txn_m_t.h"
56
#include "generated/fd_send_tile_seccomp.h"
67

78

src/util/hist/fd_histf.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
#include <math.h> /* FIXME: HMMM */
99
#include "../log/fd_log.h"
10-
1110
#if FD_HAS_AVX
1211
#include "../simd/fd_avx.h"
1312
#endif

0 commit comments

Comments
 (0)