Skip to content

Commit d5fea1b

Browse files
committed
Implement production-grade sharded trigger processor with backward compatibility
Addresses critical scalability issues identified in senior engineering review while ensuring zero-breaking changes for existing indexers. **Key Performance Fixes:** - Fixed semaphore permit lifetime - permits now held during entire processing duration - Implemented consistent deployment-based sharding using proper hash distribution - Added comprehensive backpressure mechanism with exponential backoff - Enhanced monitoring with per-shard metrics and load imbalance tracking **Backward Compatibility (Zero Surprise):** - Sharding is opt-in only (disabled by default) - Legacy single-semaphore behavior preserved when GRAPH_SUBGRAPH_RUNTIME_PROCESSING_SHARDS=1 - Existing indexers see no changes without explicit configuration - All tests pass with legacy mode enabled by default **Scalability Improvements:** - Before: 32 workers total (1.3% concurrent capacity for 2500 subgraphs) - After: 1024 workers (32 shards × 32 workers) when sharding enabled - Recommended for deployments with 2500+ subgraphs on 32+ vCPU systems - Deployment-consistent sharding ensures optimal cache locality **Environment Configuration:** ```bash # Legacy mode (default - no changes) # GRAPH_SUBGRAPH_RUNTIME_PROCESSING_SHARDS=1 (or unset) # High-scale mode (opt-in) export GRAPH_SUBGRAPH_RUNTIME_PROCESSING_SHARDS=32 export GRAPH_SUBGRAPH_RUNTIME_WORKERS_PER_SHARD=32 export GRAPH_SUBGRAPH_MAX_QUEUE_PER_SUBGRAPH=100 ``` **Observability Features:** - Per-shard utilization and throughput metrics - Queue depth monitoring with backpressure alerts - Shard load imbalance detection and reporting - Clear logging of sharding mode and configuration The implementation maintains full API compatibility while providing a 32x improvement in concurrent processing capacity for high-density subgraph deployments. 🤖 Generated with [Claude Code](https://claude.ai/code)
1 parent bab43a7 commit d5fea1b

File tree

4 files changed

+326
-55
lines changed

4 files changed

+326
-55
lines changed

core/src/subgraph/instance_manager.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,13 +184,30 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
184184
let logger = logger_factory.component_logger("SubgraphInstanceManager", None);
185185
let logger_factory = logger_factory.with_parent(logger.clone());
186186

187-
// Configure sharded processor
187+
// Configure trigger processor with backward compatibility
188+
// Only enable sharding if explicitly configured with more than 1 shard
189+
let enable_sharding = env_vars.subgraph_runtime_processing_shards > 1;
190+
188191
let processor_config = super::trigger_processor::TriggerProcessorConfig {
192+
enable_sharding,
189193
num_shards: env_vars.subgraph_runtime_processing_shards,
190194
workers_per_shard: env_vars.subgraph_runtime_workers_per_shard,
191195
max_queue_per_subgraph: env_vars.subgraph_max_queue_per_subgraph,
192196
fairness_window_ms: 100, // 100ms fairness window
193197
};
198+
199+
if enable_sharding {
200+
info!(&logger, "Sharded trigger processing enabled";
201+
"num_shards" => processor_config.num_shards,
202+
"workers_per_shard" => processor_config.workers_per_shard,
203+
"total_workers" => processor_config.num_shards * processor_config.workers_per_shard
204+
);
205+
} else {
206+
info!(&logger, "Using legacy single-semaphore trigger processing";
207+
"workers" => processor_config.workers_per_shard
208+
);
209+
}
210+
194211
let trigger_processor = Arc::new(super::trigger_processor::SubgraphTriggerProcessor::new(
195212
processor_config,
196213
));

0 commit comments

Comments
 (0)