Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
Expand Down Expand Up @@ -85,7 +86,8 @@ public JobDetailsHandler(
protected JobDetailsInfo handleRequest(
HandlerRequest<EmptyRequestBody> request, AccessExecutionGraph executionGraph)
throws RestHandlerException {
return createJobDetailsInfo(executionGraph, metricFetcher);
metricFetcher.update();
return createJobDetailsInfo(executionGraph, metricFetcher.getMetricStore().getJobs());
}

@Override
Expand All @@ -100,7 +102,8 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
}

private static JobDetailsInfo createJobDetailsInfo(
AccessExecutionGraph executionGraph, @Nullable MetricFetcher metricFetcher) {
AccessExecutionGraph executionGraph,
@Nullable MetricStore.JobMetricStoreSnapshot jobMetrics) {
final long now = System.currentTimeMillis();
final long startTime = executionGraph.getStatusTimestamp(JobStatus.INITIALIZING);
final long endTime =
Expand All @@ -124,10 +127,7 @@ private static JobDetailsInfo createJobDetailsInfo(
executionGraph.getVerticesTopologically()) {
final JobDetailsInfo.JobVertexDetailsInfo vertexDetailsInfo =
createJobVertexDetailsInfo(
accessExecutionJobVertex,
now,
executionGraph.getJobID(),
metricFetcher);
accessExecutionJobVertex, now, executionGraph.getJobID(), jobMetrics);

jobVertexInfos.add(vertexDetailsInfo);
jobVerticesPerState[vertexDetailsInfo.getExecutionState().ordinal()]++;
Expand Down Expand Up @@ -166,7 +166,10 @@ private static JobDetailsInfo createJobDetailsInfo(
}

private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
AccessExecutionJobVertex ejv, long now, JobID jobId, MetricFetcher metricFetcher) {
AccessExecutionJobVertex ejv,
long now,
JobID jobId,
MetricStore.JobMetricStoreSnapshot jobMetrics) {
int[] tasksPerState = new int[ExecutionState.values().length];
long startTime = Long.MAX_VALUE;
long endTime = 0;
Expand Down Expand Up @@ -217,7 +220,7 @@ private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
// rather than the aggregation of all attempts.
counts.addIOMetrics(
vertex.getCurrentExecutionAttempt(),
metricFetcher,
jobMetrics,
jobId.toString(),
ejv.getJobVertexId().toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexDetailsInfo;
Expand Down Expand Up @@ -89,7 +90,9 @@ protected JobVertexDetailsInfo handleRequest(
throw new NotFoundException(String.format("JobVertex %s not found", jobVertexID));
}

return createJobVertexDetailsInfo(jobVertex, jobID, metricFetcher);
metricFetcher.update();
return createJobVertexDetailsInfo(
jobVertex, jobID, metricFetcher.getMetricStore().getJobs());
}

@Override
Expand All @@ -114,7 +117,7 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
private static JobVertexDetailsInfo createJobVertexDetailsInfo(
AccessExecutionJobVertex jobVertex,
JobID jobID,
@Nullable MetricFetcher metricFetcher) {
@Nullable MetricStore.JobMetricStoreSnapshot jobMetrics) {
List<SubtaskExecutionAttemptDetailsInfo> subtasks = new ArrayList<>();
final long now = System.currentTimeMillis();
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
Expand All @@ -130,14 +133,14 @@ private static JobVertexDetailsInfo createJobVertexDetailsInfo(
if (attempt.getAttemptNumber() != execution.getAttemptNumber()) {
otherConcurrentAttempts.add(
SubtaskExecutionAttemptDetailsInfo.create(
attempt, metricFetcher, jobID, jobVertexID, null));
attempt, jobMetrics, jobID, jobVertexID, null));
}
}
}

subtasks.add(
SubtaskExecutionAttemptDetailsInfo.create(
execution, metricFetcher, jobID, jobVertexID, otherConcurrentAttempts));
execution, jobMetrics, jobID, jobVertexID, otherConcurrentAttempts));
}

return new JobVertexDetailsInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.rest.messages.AggregatedTaskDetailsInfo;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
Expand Down Expand Up @@ -105,7 +106,9 @@ protected JobVertexTaskManagersInfo handleRequest(
throw new NotFoundException(String.format("JobVertex %s not found", jobVertexID));
}

return createJobVertexTaskManagersInfo(jobVertex, jobID, metricFetcher);
metricFetcher.update();
return createJobVertexTaskManagersInfo(
jobVertex, jobID, metricFetcher.getMetricStore().getJobs());
}

@Override
Expand All @@ -130,7 +133,7 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(
AccessExecutionJobVertex jobVertex,
JobID jobID,
@Nullable MetricFetcher metricFetcher) {
@Nullable MetricStore.JobMetricStoreSnapshot jobMetrics) {
// Build a map that groups task executions by TaskManager
Map<TaskManagerLocation, List<AccessExecution>> taskManagerExecutions = new HashMap<>();
Set<AccessExecution> representativeExecutions = new HashSet<>();
Expand Down Expand Up @@ -201,13 +204,13 @@ private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(

counts.addIOMetrics(
execution,
metricFetcher,
jobMetrics,
jobID.toString(),
jobVertex.getJobVertexId().toString());
MutableIOMetrics current = new MutableIOMetrics();
current.addIOMetrics(
execution,
metricFetcher,
jobMetrics,
jobID.toString(),
jobVertex.getJobVertexId().toString());
ioMetricsInfos.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
Expand Down Expand Up @@ -87,18 +88,20 @@ protected SubtaskExecutionAttemptDetailsInfo handleRequest(
final Collection<AccessExecution> attempts = executionVertex.getCurrentExecutions();
List<SubtaskExecutionAttemptDetailsInfo> otherConcurrentAttempts = null;

metricFetcher.update();
MetricStore.JobMetricStoreSnapshot jobMetrics = metricFetcher.getMetricStore().getJobs();
if (attempts.size() > 1) {
otherConcurrentAttempts = new ArrayList<>();
for (AccessExecution attempt : attempts) {
if (attempt.getAttemptNumber() != execution.getAttemptNumber()) {
otherConcurrentAttempts.add(
SubtaskExecutionAttemptDetailsInfo.create(
attempt, metricFetcher, jobID, jobVertexID, null));
attempt, jobMetrics, jobID, jobVertexID, null));
}
}
}

return SubtaskExecutionAttemptDetailsInfo.create(
execution, metricFetcher, jobID, jobVertexID, otherConcurrentAttempts);
execution, jobMetrics, jobID, jobVertexID, otherConcurrentAttempts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ protected SubtaskExecutionAttemptDetailsInfo handleRequest(
final JobID jobID = request.getPathParameter(JobIDPathParameter.class);
final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);

metricFetcher.update();
return SubtaskExecutionAttemptDetailsInfo.create(
execution, metricFetcher, jobID, jobVertexID, null);
execution, metricFetcher.getMetricStore().getJobs(), jobID, jobVertexID, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Collection<? extends MetricStore.ComponentMetricStore> getStores(
JobID jobID = request.getPathParameter(JobIDPathParameter.class);
JobVertexID taskID = request.getPathParameter(JobVertexIdPathParameter.class);

MetricStore.JobMetricStoreSnapshot jobMetrics = store.getJobs();
Collection<String> subtaskRanges =
request.getQueryParameter(SubtasksFilterQueryParameter.class);
if (subtaskRanges.isEmpty()) {
Expand All @@ -91,7 +92,8 @@ Collection<? extends MetricStore.ComponentMetricStore> getStores(
Collection<MetricStore.ComponentMetricStore> subtaskStores = new ArrayList<>(8);
for (int subtask : subtasks) {
MetricStore.ComponentMetricStore subtaskMetricStore =
store.getSubtaskMetricStore(jobID.toString(), taskID.toString(), subtask);
jobMetrics.getSubtaskMetricStore(
jobID.toString(), taskID.toString(), subtask);
if (subtaskMetricStore != null) {
subtaskStores.add(subtaskMetricStore);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
/**
* Handler that returns subtask metrics.
*
* @see MetricStore#getSubtaskMetricStore(String, String, int)
* @see MetricStore.JobMetricStoreSnapshot#getSubtaskMetricStore (String, String, int)
*/
public class SubtaskMetricsHandler extends AbstractMetricsHandler<SubtaskMetricsMessageParameters> {

Expand All @@ -67,7 +67,8 @@ protected MetricStore.ComponentMetricStore getComponentMetricStore(
final JobVertexID vertexId = request.getPathParameter(JobVertexIdPathParameter.class);
final int subtaskIndex = request.getPathParameter(SubtaskIndexPathParameter.class);

return metricStore.getSubtaskMetricStore(
jobId.toString(), vertexId.toString(), subtaskIndex);
return metricStore
.getJobs()
.getSubtaskMetricStore(jobId.toString(), vertexId.toString(), subtaskIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,47 +239,8 @@ public synchronized TaskMetricStore getTaskMetricStore(String jobID, String task
return TaskMetricStore.unmodifiable(job.getTaskMetricStore(taskID));
}

/**
* Returns the {@link ComponentMetricStore} for the given job/task ID and subtask index.
*
* @param jobID job ID
* @param taskID task ID
* @param subtaskIndex subtask index
* @return SubtaskMetricStore for the given IDs and index, or null if no store for the given
* arguments exists
*/
public synchronized ComponentMetricStore getSubtaskMetricStore(
String jobID, String taskID, int subtaskIndex) {
JobMetricStore job = jobID == null ? null : jobs.get(jobID);
if (job == null) {
return null;
}
TaskMetricStore task = job.getTaskMetricStore(taskID);
if (task == null) {
return null;
}
return SubtaskMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex));
}

public synchronized ComponentMetricStore getSubtaskAttemptMetricStore(
String jobID, String taskID, int subtaskIndex, int attemptNumber) {
JobMetricStore job = jobID == null ? null : jobs.get(jobID);
if (job == null) {
return null;
}
TaskMetricStore task = job.getTaskMetricStore(taskID);
if (task == null) {
return null;
}
SubtaskMetricStore subtask = task.getSubtaskMetricStore(subtaskIndex);
if (subtask == null) {
return null;
}
return ComponentMetricStore.unmodifiable(subtask.getAttemptsMetricStore(attemptNumber));
}

public synchronized Map<String, JobMetricStore> getJobs() {
return unmodifiableMap(jobs);
public synchronized JobMetricStoreSnapshot getJobs() {
return new JobMetricStoreSnapshot(unmodifiableMap(jobs));
}

public synchronized Map<String, TaskManagerMetricStore> getTaskManagers() {
Expand Down Expand Up @@ -689,4 +650,57 @@ private static SubtaskMetricStore unmodifiable(SubtaskMetricStore source) {
unmodifiableMap(source.metrics), unmodifiableMap(source.attempts));
}
}

/** Sub-structure containing a snapshot of all jobs in the metric store. */
@ThreadSafe
public static class JobMetricStoreSnapshot {
private final Map<String, JobMetricStore> jobs;

private JobMetricStoreSnapshot(Map<String, JobMetricStore> jobs) {
this.jobs = checkNotNull(jobs);
}

/**
* Returns the {@link ComponentMetricStore} for the given job/task ID and subtask index.
*
* @param jobID job ID
* @param taskID task ID
* @param subtaskIndex subtask index
* @return SubtaskMetricStore for the given IDs and index, or null if no store for the given
* arguments exists
*/
public ComponentMetricStore getSubtaskMetricStore(
String jobID, String taskID, int subtaskIndex) {
JobMetricStore job = jobID == null ? null : jobs.get(jobID);
if (job == null) {
return null;
}
TaskMetricStore task = job.getTaskMetricStore(taskID);
if (task == null) {
return null;
}
return SubtaskMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex));
}

public ComponentMetricStore getSubtaskAttemptMetricStore(
String jobID, String taskID, int subtaskIndex, int attemptNumber) {
JobMetricStore job = jobID == null ? null : jobs.get(jobID);
if (job == null) {
return null;
}
TaskMetricStore task = job.getTaskMetricStore(taskID);
if (task == null) {
return null;
}
SubtaskMetricStore subtask = task.getSubtaskMetricStore(subtaskIndex);
if (subtask == null) {
return null;
}
return ComponentMetricStore.unmodifiable(subtask.getAttemptsMetricStore(attemptNumber));
}

public Collection<? extends MetricStore.ComponentMetricStore> values() {
return jobs.values();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,15 @@ public boolean isNumRecordsOutComplete() {
* {@link MetricFetcher} is used to retrieve the required metrics.
*
* @param attempt Attempt whose IO metrics should be added
* @param fetcher MetricFetcher to retrieve metrics for running jobs
* @param jobMetrics metrics for running jobs
* @param jobID JobID to which the attempt belongs
* @param taskID TaskID to which the attempt belongs
*/
public void addIOMetrics(
AccessExecution attempt, @Nullable MetricFetcher fetcher, String jobID, String taskID) {
AccessExecution attempt,
@Nullable MetricStore.JobMetricStoreSnapshot jobMetrics,
String jobID,
String taskID) {
if (attempt.getState().isTerminal()) {
IOMetrics ioMetrics = attempt.getIOMetrics();
if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in
Expand All @@ -95,15 +98,13 @@ public void addIOMetrics(
}
}
} else { // execAttempt is still running, use MetricQueryService instead
if (fetcher != null) {
fetcher.update();
if (jobMetrics != null) {
MetricStore.ComponentMetricStore metrics =
fetcher.getMetricStore()
.getSubtaskAttemptMetricStore(
jobID,
taskID,
attempt.getParallelSubtaskIndex(),
attempt.getAttemptNumber());
jobMetrics.getSubtaskAttemptMetricStore(
jobID,
taskID,
attempt.getParallelSubtaskIndex(),
attempt.getAttemptNumber());
if (metrics != null) {
/**
* We want to keep track of missing metrics to be able to make a difference
Expand Down
Loading