Skip to content

Commit 3d005cf

Browse files
[FLINK-38291][REST] Reduce thread lock overhead for REST handlers
1 parent 831f3ab commit 3d005cf

File tree

11 files changed

+114
-94
lines changed

11 files changed

+114
-94
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.flink.runtime.rest.handler.RestHandlerException;
3030
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
3131
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
32+
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
3233
import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
3334
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
3435
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
@@ -85,7 +86,8 @@ public JobDetailsHandler(
8586
protected JobDetailsInfo handleRequest(
8687
HandlerRequest<EmptyRequestBody> request, AccessExecutionGraph executionGraph)
8788
throws RestHandlerException {
88-
return createJobDetailsInfo(executionGraph, metricFetcher);
89+
metricFetcher.update();
90+
return createJobDetailsInfo(executionGraph, metricFetcher.getMetricStore().getJobs());
8991
}
9092

9193
@Override
@@ -100,7 +102,7 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
100102
}
101103

102104
private static JobDetailsInfo createJobDetailsInfo(
103-
AccessExecutionGraph executionGraph, @Nullable MetricFetcher metricFetcher) {
105+
AccessExecutionGraph executionGraph, @Nullable MetricStore.MetricStoreJobs jobs) {
104106
final long now = System.currentTimeMillis();
105107
final long startTime = executionGraph.getStatusTimestamp(JobStatus.INITIALIZING);
106108
final long endTime =
@@ -124,10 +126,7 @@ private static JobDetailsInfo createJobDetailsInfo(
124126
executionGraph.getVerticesTopologically()) {
125127
final JobDetailsInfo.JobVertexDetailsInfo vertexDetailsInfo =
126128
createJobVertexDetailsInfo(
127-
accessExecutionJobVertex,
128-
now,
129-
executionGraph.getJobID(),
130-
metricFetcher);
129+
accessExecutionJobVertex, now, executionGraph.getJobID(), jobs);
131130

132131
jobVertexInfos.add(vertexDetailsInfo);
133132
jobVerticesPerState[vertexDetailsInfo.getExecutionState().ordinal()]++;
@@ -166,7 +165,7 @@ private static JobDetailsInfo createJobDetailsInfo(
166165
}
167166

168167
private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
169-
AccessExecutionJobVertex ejv, long now, JobID jobId, MetricFetcher metricFetcher) {
168+
AccessExecutionJobVertex ejv, long now, JobID jobId, MetricStore.MetricStoreJobs jobs) {
170169
int[] tasksPerState = new int[ExecutionState.values().length];
171170
long startTime = Long.MAX_VALUE;
172171
long endTime = 0;
@@ -217,7 +216,7 @@ private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
217216
// rather than the aggregation of all attempts.
218217
counts.addIOMetrics(
219218
vertex.getCurrentExecutionAttempt(),
220-
metricFetcher,
219+
jobs,
221220
jobId.toString(),
222221
ejv.getJobVertexId().toString());
223222
}

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.runtime.rest.handler.HandlerRequest;
2929
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
3030
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
31+
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
3132
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
3233
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
3334
import org.apache.flink.runtime.rest.messages.JobVertexDetailsInfo;
@@ -89,7 +90,9 @@ protected JobVertexDetailsInfo handleRequest(
8990
throw new NotFoundException(String.format("JobVertex %s not found", jobVertexID));
9091
}
9192

92-
return createJobVertexDetailsInfo(jobVertex, jobID, metricFetcher);
93+
metricFetcher.update();
94+
return createJobVertexDetailsInfo(
95+
jobVertex, jobID, metricFetcher.getMetricStore().getJobs());
9396
}
9497

9598
@Override
@@ -114,7 +117,7 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
114117
private static JobVertexDetailsInfo createJobVertexDetailsInfo(
115118
AccessExecutionJobVertex jobVertex,
116119
JobID jobID,
117-
@Nullable MetricFetcher metricFetcher) {
120+
@Nullable MetricStore.MetricStoreJobs jobs) {
118121
List<SubtaskExecutionAttemptDetailsInfo> subtasks = new ArrayList<>();
119122
final long now = System.currentTimeMillis();
120123
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
@@ -130,14 +133,14 @@ private static JobVertexDetailsInfo createJobVertexDetailsInfo(
130133
if (attempt.getAttemptNumber() != execution.getAttemptNumber()) {
131134
otherConcurrentAttempts.add(
132135
SubtaskExecutionAttemptDetailsInfo.create(
133-
attempt, metricFetcher, jobID, jobVertexID, null));
136+
attempt, jobs, jobID, jobVertexID, null));
134137
}
135138
}
136139
}
137140

138141
subtasks.add(
139142
SubtaskExecutionAttemptDetailsInfo.create(
140-
execution, metricFetcher, jobID, jobVertexID, otherConcurrentAttempts));
143+
execution, jobs, jobID, jobVertexID, otherConcurrentAttempts));
141144
}
142145

143146
return new JobVertexDetailsInfo(

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.runtime.rest.handler.RestHandlerException;
3232
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
3333
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
34+
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
3435
import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
3536
import org.apache.flink.runtime.rest.messages.AggregatedTaskDetailsInfo;
3637
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
@@ -105,7 +106,9 @@ protected JobVertexTaskManagersInfo handleRequest(
105106
throw new NotFoundException(String.format("JobVertex %s not found", jobVertexID));
106107
}
107108

108-
return createJobVertexTaskManagersInfo(jobVertex, jobID, metricFetcher);
109+
metricFetcher.update();
110+
return createJobVertexTaskManagersInfo(
111+
jobVertex, jobID, metricFetcher.getMetricStore().getJobs());
109112
}
110113

111114
@Override
@@ -130,7 +133,7 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
130133
private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(
131134
AccessExecutionJobVertex jobVertex,
132135
JobID jobID,
133-
@Nullable MetricFetcher metricFetcher) {
136+
@Nullable MetricStore.MetricStoreJobs jobs) {
134137
// Build a map that groups task executions by TaskManager
135138
Map<TaskManagerLocation, List<AccessExecution>> taskManagerExecutions = new HashMap<>();
136139
Set<AccessExecution> representativeExecutions = new HashSet<>();
@@ -200,16 +203,10 @@ private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(
200203
endTime = Math.max(endTime, execution.getStateTimestamp(state));
201204

202205
counts.addIOMetrics(
203-
execution,
204-
metricFetcher,
205-
jobID.toString(),
206-
jobVertex.getJobVertexId().toString());
206+
execution, jobs, jobID.toString(), jobVertex.getJobVertexId().toString());
207207
MutableIOMetrics current = new MutableIOMetrics();
208208
current.addIOMetrics(
209-
execution,
210-
metricFetcher,
211-
jobID.toString(),
212-
jobVertex.getJobVertexId().toString());
209+
execution, jobs, jobID.toString(), jobVertex.getJobVertexId().toString());
213210
ioMetricsInfos.add(
214211
new IOMetricsInfo(
215212
current.getNumBytesIn(),

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.runtime.rest.handler.RestHandlerException;
2727
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
2828
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
29+
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
2930
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
3031
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
3132
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
@@ -87,18 +88,20 @@ protected SubtaskExecutionAttemptDetailsInfo handleRequest(
8788
final Collection<AccessExecution> attempts = executionVertex.getCurrentExecutions();
8889
List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts = null;
8990

91+
metricFetcher.update();
92+
MetricStore.MetricStoreJobs jobs = metricFetcher.getMetricStore().getJobs();
9093
if (attempts.size() > 1) {
9194
otherConcurrentAttempts = new ArrayList<>();
9295
for (AccessExecution attempt : attempts) {
9396
if (attempt.getAttemptNumber() != execution.getAttemptNumber()) {
9497
otherConcurrentAttempts.add(
9598
SubtaskExecutionAttemptDetailsInfo.create(
96-
attempt, metricFetcher, jobID, jobVertexID, null));
99+
attempt, jobs, jobID, jobVertexID, null));
97100
}
98101
}
99102
}
100103

101104
return SubtaskExecutionAttemptDetailsInfo.create(
102-
execution, metricFetcher, jobID, jobVertexID, otherConcurrentAttempts);
105+
execution, jobs, jobID, jobVertexID, otherConcurrentAttempts);
103106
}
104107
}

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,9 @@ protected SubtaskExecutionAttemptDetailsInfo handleRequest(
101101
final JobID jobID = request.getPathParameter(JobIDPathParameter.class);
102102
final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
103103

104+
metricFetcher.update();
104105
return SubtaskExecutionAttemptDetailsInfo.create(
105-
execution, metricFetcher, jobID, jobVertexID, null);
106+
execution, metricFetcher.getMetricStore().getJobs(), jobID, jobVertexID, null);
106107
}
107108

108109
@Override

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ Collection<? extends MetricStore.ComponentMetricStore> getStores(
7676
JobID jobID = request.getPathParameter(JobIDPathParameter.class);
7777
JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class);
7878

79+
MetricStore.MetricStoreJobs jobs = store.getJobs();
7980
Collection<String> subtaskRanges =
8081
request.getQueryParameter(SubtasksFilterQueryParameter.class);
8182
if (subtaskRanges.isEmpty()) {
@@ -91,7 +92,7 @@ Collection<? extends MetricStore.ComponentMetricStore> getStores(
9192
Collection<MetricStore.ComponentMetricStore> subtaskStores = new ArrayList<>(8);
9293
for (int subtask : subtasks) {
9394
MetricStore.ComponentMetricStore subtaskMetricStore =
94-
store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask);
95+
jobs.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask);
9596
if (subtaskMetricStore != null) {
9697
subtaskStores.add(subtaskMetricStore);
9798
}

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
/**
4141
* Handler that returns subtask metrics.
4242
*
43-
* @see MetricStore#getSubtaskMetricStore(String, String, int)
43+
* @see MetricStore.MetricStoreJobs#getSubtaskMetricStore (String, String, int)
4444
*/
4545
public class SubtaskMetricsHandler extends AbstractMetricsHandler<SubtaskMetricsMessageParameters> {
4646

@@ -67,7 +67,8 @@ protected MetricStore.ComponentMetricStore getComponentMetricStore(
6767
final JobVertexID vertexId = request.getPathParameter(JobVertexIdPathParameter.class);
6868
final int subtaskIndex = request.getPathParameter(SubtaskIndexPathParameter.class);
6969

70-
return metricStore.getSubtaskMetricStore(
71-
jobId.toString(), vertexId.toString(), subtaskIndex);
70+
return metricStore
71+
.getJobs()
72+
.getSubtaskMetricStore(jobId.toString(), vertexId.toString(), subtaskIndex);
7273
}
7374
}

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java

Lines changed: 55 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -239,47 +239,8 @@ public synchronized TaskMetricStore getTaskMetricStore(String jobID, String task
239239
return TaskMetricStore.unmodifiable(job.getTaskMetricStore(taskID));
240240
}
241241

242-
/**
243-
* Returns the {@link ComponentMetricStore} for the given job/task ID and subtask index.
244-
*
245-
* @param jobID job ID
246-
* @param taskID task ID
247-
* @param subtaskIndex subtask index
248-
* @return SubtaskMetricStore for the given IDs and index, or null if no store for the given
249-
* arguments exists
250-
*/
251-
public synchronized ComponentMetricStore getSubtaskMetricStore(
252-
String jobID, String taskID, int subtaskIndex) {
253-
JobMetricStore job = jobID == null ? null : jobs.get(jobID);
254-
if (job == null) {
255-
return null;
256-
}
257-
TaskMetricStore task = job.getTaskMetricStore(taskID);
258-
if (task == null) {
259-
return null;
260-
}
261-
return SubtaskMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex));
262-
}
263-
264-
public synchronized ComponentMetricStore getSubtaskAttemptMetricStore(
265-
String jobID, String taskID, int subtaskIndex, int attemptNumber) {
266-
JobMetricStore job = jobID == null ? null : jobs.get(jobID);
267-
if (job == null) {
268-
return null;
269-
}
270-
TaskMetricStore task = job.getTaskMetricStore(taskID);
271-
if (task == null) {
272-
return null;
273-
}
274-
SubtaskMetricStore subtask = task.getSubtaskMetricStore(subtaskIndex);
275-
if (subtask == null) {
276-
return null;
277-
}
278-
return ComponentMetricStore.unmodifiable(subtask.getAttemptsMetricStore(attemptNumber));
279-
}
280-
281-
public synchronized Map<String, JobMetricStore> getJobs() {
282-
return unmodifiableMap(jobs);
242+
public synchronized MetricStoreJobs getJobs() {
243+
return new MetricStoreJobs(unmodifiableMap(jobs));
283244
}
284245

285246
public synchronized Map<String, TaskManagerMetricStore> getTaskManagers() {
@@ -689,4 +650,57 @@ private static SubtaskMetricStore unmodifiable(SubtaskMetricStore source) {
689650
unmodifiableMap(source.metrics), unmodifiableMap(source.attempts));
690651
}
691652
}
653+
654+
/** Sub-structure containing a snapshot of all jobs in the metric store. */
655+
@ThreadSafe
656+
public static class MetricStoreJobs {
657+
private final Map<String, JobMetricStore> jobs;
658+
659+
private MetricStoreJobs(Map<String, JobMetricStore> jobs) {
660+
this.jobs = checkNotNull(jobs);
661+
}
662+
663+
/**
664+
* Returns the {@link ComponentMetricStore} for the given job/task ID and subtask index.
665+
*
666+
* @param jobID job ID
667+
* @param taskID task ID
668+
* @param subtaskIndex subtask index
669+
* @return SubtaskMetricStore for the given IDs and index, or null if no store for the given
670+
* arguments exists
671+
*/
672+
public ComponentMetricStore getSubtaskMetricStore(
673+
String jobID, String taskID, int subtaskIndex) {
674+
JobMetricStore job = jobID == null ? null : jobs.get(jobID);
675+
if (job == null) {
676+
return null;
677+
}
678+
TaskMetricStore task = job.getTaskMetricStore(taskID);
679+
if (task == null) {
680+
return null;
681+
}
682+
return SubtaskMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex));
683+
}
684+
685+
public ComponentMetricStore getSubtaskAttemptMetricStore(
686+
String jobID, String taskID, int subtaskIndex, int attemptNumber) {
687+
JobMetricStore job = jobID == null ? null : jobs.get(jobID);
688+
if (job == null) {
689+
return null;
690+
}
691+
TaskMetricStore task = job.getTaskMetricStore(taskID);
692+
if (task == null) {
693+
return null;
694+
}
695+
SubtaskMetricStore subtask = task.getSubtaskMetricStore(subtaskIndex);
696+
if (subtask == null) {
697+
return null;
698+
}
699+
return ComponentMetricStore.unmodifiable(subtask.getAttemptsMetricStore(attemptNumber));
700+
}
701+
702+
public Collection<? extends MetricStore.ComponentMetricStore> values() {
703+
return jobs.values();
704+
}
705+
}
692706
}

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,15 @@ public boolean isNumRecordsOutComplete() {
7272
* {@link MetricFetcher} is used to retrieve the required metrics.
7373
*
7474
* @param attempt Attempt whose IO metrics should be added
75-
* @param fetcher MetricFetcher to retrieve metrics for running jobs
75+
* @param jobs metrics for running jobs
7676
* @param jobID JobID to which the attempt belongs
7777
* @param taskID TaskID to which the attempt belongs
7878
*/
7979
public void addIOMetrics(
80-
AccessExecution attempt, @Nullable MetricFetcher fetcher, String jobID, String taskID) {
80+
AccessExecution attempt,
81+
@Nullable MetricStore.MetricStoreJobs jobs,
82+
String jobID,
83+
String taskID) {
8184
if (attempt.getState().isTerminal()) {
8285
IOMetrics ioMetrics = attempt.getIOMetrics();
8386
if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in
@@ -95,15 +98,13 @@ public void addIOMetrics(
9598
}
9699
}
97100
} else { // execAttempt is still running, use MetricQueryService instead
98-
if (fetcher != null) {
99-
fetcher.update();
101+
if (jobs != null) {
100102
MetricStore.ComponentMetricStore metrics =
101-
fetcher.getMetricStore()
102-
.getSubtaskAttemptMetricStore(
103-
jobID,
104-
taskID,
105-
attempt.getParallelSubtaskIndex(),
106-
attempt.getAttemptNumber());
103+
jobs.getSubtaskAttemptMetricStore(
104+
jobID,
105+
taskID,
106+
attempt.getParallelSubtaskIndex(),
107+
attempt.getAttemptNumber());
107108
if (metrics != null) {
108109
/**
109110
* We want to keep track of missing metrics to be able to make a difference

0 commit comments

Comments
 (0)