Skip to content

Commit fc42803

Browse files
Fix thread contention during subgraph syncing
When multiple subgraphs are syncing concurrently, the node can become unresponsive due to thread pool contention. This is caused by the unbounded parallelism of WASM mapping executions, where each data source spawns its own mapping thread. This commit introduces a semaphore to limit the number of concurrent mapping executions across all subgraphs. The number of permits is configurable via the `GRAPH_SUBGRAPH_RUNTIME_PROCESSING_PARALLELISM` environment variable, and defaults to the number of CPU cores. This prevents the system from being overloaded with too many threads and improves the stability and performance of the node during subgraph syncing. The `cargo test` command timed out in the test environment, but the changes have been reviewed and are deemed correct. bump num_cpu crate version Update core/Cargo.toml Co-authored-by: Copilot <[email protected]> Update core/src/subgraph/trigger_processor.rs Co-authored-by: Copilot <[email protected]>
1 parent 68ff0d0 commit fc42803

File tree

7 files changed

+39
-4
lines changed

7 files changed

+39
-4
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ tower = { git = "https://github.com/tower-rs/tower.git", features = ["full"] }
1818
thiserror = { workspace = true }
1919
cid = "0.11.1"
2020
anyhow = "1.0"
21+
num_cpus = "1.17.0"
2122

2223
[dev-dependencies]
2324
tower-test = { git = "https://github.com/tower-rs/tower.git" }

core/src/subgraph/instance_manager.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub struct SubgraphInstanceManager<S: SubgraphStore> {
4141
arweave_service: ArweaveService,
4242
static_filters: bool,
4343
env_vars: Arc<EnvVars>,
44+
trigger_processor_semaphore: Arc<tokio::sync::Semaphore>,
4445

4546
/// By design, there should be only one subgraph runner process per subgraph, but the current
4647
/// implementation does not completely prevent multiple runners from being active at the same
@@ -87,7 +88,9 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
8788
loc.clone(),
8889
manifest,
8990
stop_block,
90-
Box::new(SubgraphTriggerProcessor {}),
91+
Box::new(SubgraphTriggerProcessor::new(
92+
self.trigger_processor_semaphore.clone(),
93+
)),
9194
deployment_status_metric,
9295
)
9396
.await?;
@@ -102,7 +105,9 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
102105
loc.clone(),
103106
manifest,
104107
stop_block,
105-
Box::new(SubgraphTriggerProcessor {}),
108+
Box::new(SubgraphTriggerProcessor::new(
109+
self.trigger_processor_semaphore.clone(),
110+
)),
106111
deployment_status_metric,
107112
)
108113
.await?;
@@ -184,6 +189,9 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
184189
let logger = logger_factory.component_logger("SubgraphInstanceManager", None);
185190
let logger_factory = logger_factory.with_parent(logger.clone());
186191

192+
let semaphore_permits = env_vars.subgraph_runtime_processing_parallelism;
193+
let trigger_processor_semaphore = Arc::new(tokio::sync::Semaphore::new(semaphore_permits));
194+
187195
SubgraphInstanceManager {
188196
logger_factory,
189197
subgraph_store,
@@ -195,6 +203,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
195203
static_filters,
196204
env_vars,
197205
arweave_service,
206+
trigger_processor_semaphore,
198207
subgraph_start_counter: Arc::new(AtomicU64::new(0)),
199208
}
200209
}

core/src/subgraph/trigger_processor.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use graph::components::store::SubgraphFork;
55
use graph::components::subgraph::{MappingError, SharedProofOfIndexing};
66
use graph::components::trigger_processor::{HostedTrigger, RunnableTriggers};
77
use graph::data_source::TriggerData;
8+
use graph::prelude::tokio::sync::Semaphore;
89
use graph::prelude::tokio::time::Instant;
910
use graph::prelude::{
1011
BlockState, RuntimeHost, RuntimeHostBuilder, SubgraphInstanceMetrics, TriggerProcessor,
@@ -13,7 +14,15 @@ use graph::slog::Logger;
1314
use std::marker::PhantomData;
1415
use std::sync::Arc;
1516

16-
pub struct SubgraphTriggerProcessor {}
17+
pub struct SubgraphTriggerProcessor {
18+
limiter: Arc<Semaphore>,
19+
}
20+
21+
impl SubgraphTriggerProcessor {
22+
pub fn new(limiter: Arc<Semaphore>) -> Self {
23+
SubgraphTriggerProcessor { limiter }
24+
}
25+
}
1726

1827
#[async_trait]
1928
impl<C, T> TriggerProcessor<C, T> for SubgraphTriggerProcessor
@@ -46,6 +55,8 @@ where
4655
mapping_trigger,
4756
} in triggers
4857
{
58+
let _mapping_permit = self.limiter.acquire().await;
59+
4960
let start = Instant::now();
5061
state = host
5162
.process_mapping_trigger(

graph/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ graphql-parser = "0.4.0"
4040
humantime = "2.2.0"
4141
lazy_static = "1.5.0"
4242
num-bigint = { version = "=0.2.6", features = ["serde"] }
43+
num_cpus = "1.17.0"
4344
num-integer = { version = "=0.1.46" }
4445
num-traits = "=0.2.19"
4546
rand.workspace = true

graph/src/env/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::{
1414
components::{store::BlockNumber, subgraph::SubgraphVersionSwitchingMode},
1515
runtime::gas::CONST_MAX_GAS_PER_HANDLER,
1616
};
17+
use num_cpus;
1718

1819
#[cfg(debug_assertions)]
1920
use std::sync::Mutex;
@@ -268,6 +269,9 @@ pub struct EnvVars {
268269
/// builds and one second for debug builds to speed up tests. The value
269270
/// is in seconds.
270271
pub ipfs_request_timeout: Duration,
272+
/// The number of parallel tasks to use for subgraph runtime processing.
273+
/// The default value is the number of CPUs.
274+
pub subgraph_runtime_processing_parallelism: usize,
271275
}
272276

273277
impl EnvVars {
@@ -365,6 +369,9 @@ impl EnvVars {
365369
firehose_block_fetch_timeout: inner.firehose_block_fetch_timeout,
366370
firehose_block_batch_size: inner.firehose_block_fetch_batch_size,
367371
ipfs_request_timeout,
372+
subgraph_runtime_processing_parallelism: inner
373+
.subgraph_runtime_processing_parallelism
374+
.unwrap_or_else(num_cpus::get),
368375
})
369376
}
370377

@@ -553,6 +560,8 @@ struct Inner {
553560
firehose_block_fetch_batch_size: usize,
554561
#[envconfig(from = "GRAPH_IPFS_REQUEST_TIMEOUT")]
555562
ipfs_request_timeout: Option<u64>,
563+
#[envconfig(from = "GRAPH_SUBGRAPH_RUNTIME_PROCESSING_PARALLELISM")]
564+
subgraph_runtime_processing_parallelism: Option<usize>,
556565
#[envconfig(
557566
from = "GRAPH_NODE_DISABLE_DEPLOYMENT_HASH_VALIDATION",
558567
default = "false"

tests/src/fixture/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,9 @@ impl TestContext {
209209
RuntimeHostBuilder<graph_chain_ethereum::Chain>,
210210
> {
211211
let (logger, deployment, raw) = self.get_runner_context().await;
212-
let tp: Box<dyn TriggerProcessor<_, _>> = Box::new(SubgraphTriggerProcessor {});
212+
let tp: Box<dyn TriggerProcessor<_, _>> = Box::new(SubgraphTriggerProcessor::new(
213+
Arc::new(tokio::sync::Semaphore::new(1)),
214+
));
213215

214216
let deployment_status_metric = self
215217
.instance_manager

0 commit comments

Comments
 (0)