Skip to content

Commit 5f93c07

Browse files
improve class and variable naming
1 parent 3d005cf commit 5f93c07

File tree

10 files changed

+54
-35
lines changed

10 files changed

+54
-35
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
102102
}
103103

104104
private static JobDetailsInfo createJobDetailsInfo(
105-
AccessExecutionGraph executionGraph, @Nullable MetricStore.MetricStoreJobs jobs) {
105+
AccessExecutionGraph executionGraph,
106+
@Nullable MetricStore.JobMetricStoreSnapshot jobMetrics) {
106107
final long now = System.currentTimeMillis();
107108
final long startTime = executionGraph.getStatusTimestamp(JobStatus.INITIALIZING);
108109
final long endTime =
@@ -126,7 +127,7 @@ private static JobDetailsInfo createJobDetailsInfo(
126127
executionGraph.getVerticesTopologically()) {
127128
final JobDetailsInfo.JobVertexDetailsInfo vertexDetailsInfo =
128129
createJobVertexDetailsInfo(
129-
accessExecutionJobVertex, now, executionGraph.getJobID(), jobs);
130+
accessExecutionJobVertex, now, executionGraph.getJobID(), jobMetrics);
130131

131132
jobVertexInfos.add(vertexDetailsInfo);
132133
jobVerticesPerState[vertexDetailsInfo.getExecutionState().ordinal()]++;
@@ -165,7 +166,10 @@ private static JobDetailsInfo createJobDetailsInfo(
165166
}
166167

167168
private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
168-
AccessExecutionJobVertex ejv, long now, JobID jobId, MetricStore.MetricStoreJobs jobs) {
169+
AccessExecutionJobVertex ejv,
170+
long now,
171+
JobID jobId,
172+
MetricStore.JobMetricStoreSnapshot jobMetrics) {
169173
int[] tasksPerState = new int[ExecutionState.values().length];
170174
long startTime = Long.MAX_VALUE;
171175
long endTime = 0;
@@ -216,7 +220,7 @@ private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
216220
// rather than the aggregation of all attempts.
217221
counts.addIOMetrics(
218222
vertex.getCurrentExecutionAttempt(),
219-
jobs,
223+
jobMetrics,
220224
jobId.toString(),
221225
ejv.getJobVertexId().toString());
222226
}

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
117117
private static JobVertexDetailsInfo createJobVertexDetailsInfo(
118118
AccessExecutionJobVertex jobVertex,
119119
JobID jobID,
120-
@Nullable MetricStore.MetricStoreJobs jobs) {
120+
@Nullable MetricStore.JobMetricStoreSnapshot jobMetrics) {
121121
List<SubtaskExecutionAttemptDetailsInfo> subtasks = new ArrayList<>();
122122
final long now = System.currentTimeMillis();
123123
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
@@ -133,14 +133,14 @@ private static JobVertexDetailsInfo createJobVertexDetailsInfo(
133133
if (attempt.getAttemptNumber() != execution.getAttemptNumber()) {
134134
otherConcurrentAttempts.add(
135135
SubtaskExecutionAttemptDetailsInfo.create(
136-
attempt, jobs, jobID, jobVertexID, null));
136+
attempt, jobMetrics, jobID, jobVertexID, null));
137137
}
138138
}
139139
}
140140

141141
subtasks.add(
142142
SubtaskExecutionAttemptDetailsInfo.create(
143-
execution, jobs, jobID, jobVertexID, otherConcurrentAttempts));
143+
execution, jobMetrics, jobID, jobVertexID, otherConcurrentAttempts));
144144
}
145145

146146
return new JobVertexDetailsInfo(

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
133133
private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(
134134
AccessExecutionJobVertex jobVertex,
135135
JobID jobID,
136-
@Nullable MetricStore.MetricStoreJobs jobs) {
136+
@Nullable MetricStore.JobMetricStoreSnapshot jobMetrics) {
137137
// Build a map that groups task executions by TaskManager
138138
Map<TaskManagerLocation, List<AccessExecution>> taskManagerExecutions = new HashMap<>();
139139
Set<AccessExecution> representativeExecutions = new HashSet<>();
@@ -203,10 +203,16 @@ private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(
203203
endTime = Math.max(endTime, execution.getStateTimestamp(state));
204204

205205
counts.addIOMetrics(
206-
execution, jobs, jobID.toString(), jobVertex.getJobVertexId().toString());
206+
execution,
207+
jobMetrics,
208+
jobID.toString(),
209+
jobVertex.getJobVertexId().toString());
207210
MutableIOMetrics current = new MutableIOMetrics();
208211
current.addIOMetrics(
209-
execution, jobs, jobID.toString(), jobVertex.getJobVertexId().toString());
212+
execution,
213+
jobMetrics,
214+
jobID.toString(),
215+
jobVertex.getJobVertexId().toString());
210216
ioMetricsInfos.add(
211217
new IOMetricsInfo(
212218
current.getNumBytesIn(),

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,19 +89,19 @@ protected SubtaskExecutionAttemptDetailsInfo handleRequest(
8989
List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts = null;
9090

9191
metricFetcher.update();
92-
MetricStore.MetricStoreJobs jobs = metricFetcher.getMetricStore().getJobs();
92+
MetricStore.JobMetricStoreSnapshot jobMetrics = metricFetcher.getMetricStore().getJobs();
9393
if (attempts.size() > 1) {
9494
otherConcurrentAttempts = new ArrayList<>();
9595
for (AccessExecution attempt : attempts) {
9696
if (attempt.getAttemptNumber() != execution.getAttemptNumber()) {
9797
otherConcurrentAttempts.add(
9898
SubtaskExecutionAttemptDetailsInfo.create(
99-
attempt, jobs, jobID, jobVertexID, null));
99+
attempt, jobMetrics, jobID, jobVertexID, null));
100100
}
101101
}
102102
}
103103

104104
return SubtaskExecutionAttemptDetailsInfo.create(
105-
execution, jobs, jobID, jobVertexID, otherConcurrentAttempts);
105+
execution, jobMetrics, jobID, jobVertexID, otherConcurrentAttempts);
106106
}
107107
}

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ Collection<? extends MetricStore.ComponentMetricStore> getStores(
7676
JobID jobID = request.getPathParameter(JobIDPathParameter.class);
7777
JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class);
7878

79-
MetricStore.MetricStoreJobs jobs = store.getJobs();
79+
MetricStore.JobMetricStoreSnapshot jobMetrics = store.getJobs();
8080
Collection<String> subtaskRanges =
8181
request.getQueryParameter(SubtasksFilterQueryParameter.class);
8282
if (subtaskRanges.isEmpty()) {
@@ -92,7 +92,8 @@ Collection<? extends MetricStore.ComponentMetricStore> getStores(
9292
Collection<MetricStore.ComponentMetricStore> subtaskStores = new ArrayList<>(8);
9393
for (int subtask : subtasks) {
9494
MetricStore.ComponentMetricStore subtaskMetricStore =
95-
jobs.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask);
95+
jobMetrics.getSubtaskMetricStore(
96+
jobID.toString(), taskID.toString(), subtask);
9697
if (subtaskMetricStore != null) {
9798
subtaskStores.add(subtaskMetricStore);
9899
}

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
/**
4141
* Handler that returns subtask metrics.
4242
*
43-
* @see MetricStore.MetricStoreJobs#getSubtaskMetricStore (String, String, int)
43+
* @see MetricStore.JobMetricStoreSnapshot#getSubtaskMetricStore (String, String, int)
4444
*/
4545
public class SubtaskMetricsHandler extends AbstractMetricsHandler<SubtaskMetricsMessageParameters> {
4646

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,8 @@ public synchronized TaskMetricStore getTaskMetricStore(String jobID, String task
239239
return TaskMetricStore.unmodifiable(job.getTaskMetricStore(taskID));
240240
}
241241

242-
public synchronized MetricStoreJobs getJobs() {
243-
return new MetricStoreJobs(unmodifiableMap(jobs));
242+
public synchronized JobMetricStoreSnapshot getJobs() {
243+
return new JobMetricStoreSnapshot(unmodifiableMap(jobs));
244244
}
245245

246246
public synchronized Map<String, TaskManagerMetricStore> getTaskManagers() {
@@ -653,10 +653,10 @@ private static SubtaskMetricStore unmodifiable(SubtaskMetricStore source) {
653653

654654
/** Sub-structure containing a snapshot of all jobs in the metric store. */
655655
@ThreadSafe
656-
public static class MetricStoreJobs {
656+
public static class JobMetricStoreSnapshot {
657657
private final Map<String, JobMetricStore> jobs;
658658

659-
private MetricStoreJobs(Map<String, JobMetricStore> jobs) {
659+
private JobMetricStoreSnapshot(Map<String, JobMetricStore> jobs) {
660660
this.jobs = checkNotNull(jobs);
661661
}
662662

flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,13 @@ public boolean isNumRecordsOutComplete() {
7272
* {@link MetricFetcher} is used to retrieve the required metrics.
7373
*
7474
* @param attempt Attempt whose IO metrics should be added
75-
* @param jobs metrics for running jobs
75+
* @param jobMetrics metrics for running jobs
7676
* @param jobID JobID to which the attempt belongs
7777
* @param taskID TaskID to which the attempt belongs
7878
*/
7979
public void addIOMetrics(
8080
AccessExecution attempt,
81-
@Nullable MetricStore.MetricStoreJobs jobs,
81+
@Nullable MetricStore.JobMetricStoreSnapshot jobMetrics,
8282
String jobID,
8383
String taskID) {
8484
if (attempt.getState().isTerminal()) {
@@ -98,9 +98,9 @@ public void addIOMetrics(
9898
}
9999
}
100100
} else { // execAttempt is still running, use MetricQueryService instead
101-
if (jobs != null) {
101+
if (jobMetrics != null) {
102102
MetricStore.ComponentMetricStore metrics =
103-
jobs.getSubtaskAttemptMetricStore(
103+
jobMetrics.getSubtaskAttemptMetricStore(
104104
jobID,
105105
taskID,
106106
attempt.getParallelSubtaskIndex(),

flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ public List<SubtaskExecutionAttemptDetailsInfo> getOtherConcurrentAttempts() {
195195

196196
public static SubtaskExecutionAttemptDetailsInfo create(
197197
AccessExecution execution,
198-
@Nullable MetricStore.MetricStoreJobs jobs,
198+
@Nullable MetricStore.JobMetricStoreSnapshot jobMetrics,
199199
JobID jobID,
200200
JobVertexID jobVertexID,
201201
@Nullable List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts) {
@@ -215,7 +215,7 @@ public static SubtaskExecutionAttemptDetailsInfo create(
215215
final long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1;
216216

217217
final MutableIOMetrics ioMetrics = new MutableIOMetrics();
218-
ioMetrics.addIOMetrics(execution, jobs, jobID.toString(), jobVertexID.toString());
218+
ioMetrics.addIOMetrics(execution, jobMetrics, jobID.toString(), jobVertexID.toString());
219219

220220
final IOMetricsInfo ioMetricsInfo =
221221
new IOMetricsInfo(

flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class MetricStoreTest {
5050
@Test
5151
void testAdd() {
5252
MetricStore store = setupStore(new MetricStore());
53-
MetricStore.MetricStoreJobs jobs = store.getJobs();
53+
MetricStore.JobMetricStoreSnapshot jobMetrics = store.getJobs();
5454
assertThat(store.getJobManagerMetricStore().getMetric("abc.metric1", "-1")).isEqualTo("0");
5555
assertThat(store.getTaskManagerMetricStore("tmid").getMetric("abc.metric2", "-1"))
5656
.isEqualTo("1");
@@ -64,15 +64,18 @@ void testAdd() {
6464
.getMetric("8.abc.metric5", "-1"))
6565
.isEqualTo("14");
6666
assertThat(
67-
jobs.getSubtaskMetricStore(JOB_ID.toString(), "taskid", 8)
67+
jobMetrics
68+
.getSubtaskMetricStore(JOB_ID.toString(), "taskid", 8)
6869
.getMetric("abc.metric5", "-1"))
6970
.isEqualTo("14");
7071
assertThat(
71-
jobs.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 8, 1)
72+
jobMetrics
73+
.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 8, 1)
7274
.getMetric("abc.metric5", "-1"))
7375
.isEqualTo("4");
7476
assertThat(
75-
jobs.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 8, 2)
77+
jobMetrics
78+
.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 8, 2)
7679
.getMetric("abc.metric5", "-1"))
7780
.isEqualTo("14");
7881

@@ -89,20 +92,25 @@ void testAdd() {
8992
.getMetric("1.opname.abc.metric7", "-1"))
9093
.isEqualTo("6");
9194
assertThat(
92-
jobs.getSubtaskMetricStore(JOB_ID.toString(), "taskid", 1)
95+
jobMetrics
96+
.getSubtaskMetricStore(JOB_ID.toString(), "taskid", 1)
9397
.getMetric("opname.abc.metric7", "-1"))
9498
.isEqualTo("6");
95-
assertThat(jobs.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 1, 2)).isNull();
99+
assertThat(jobMetrics.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 1, 2))
100+
.isNull();
96101
assertThat(
97-
jobs.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 1, 3)
102+
jobMetrics
103+
.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 1, 3)
98104
.getMetric("opname.abc.metric7", "-1"))
99105
.isEqualTo("6");
100106
assertThat(
101-
jobs.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 8, 2)
107+
jobMetrics
108+
.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 8, 2)
102109
.getMetric("opname.abc.metric7", "-1"))
103110
.isEqualTo("6");
104111
assertThat(
105-
jobs.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 8, 4)
112+
jobMetrics
113+
.getSubtaskAttemptMetricStore(JOB_ID.toString(), "taskid", 8, 4)
106114
.getMetric("opname.abc.metric7", "-1"))
107115
.isEqualTo("16");
108116

0 commit comments

Comments
 (0)