Skip to content

Commit 373e1ea

Browse files
committed
[FLINK-38229][runtime-web] Enhanced Job History Retention Policies for HistoryServer
1 parent 863b051 commit 373e1ea

File tree

7 files changed

+339
-22
lines changed

7 files changed

+339
-22
lines changed

docs/layouts/shortcodes/generated/history_server_configuration.html

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,13 @@
3030
<td><h5>historyserver.archive.retained-jobs</h5></td>
3131
<td style="word-wrap: break-word;">-1</td>
3232
<td>Integer</td>
33-
<td>The maximum number of jobs to retain in each archive directory defined by `historyserver.archive.fs.dir`. If set to `-1`(default), there is no limit to the number of archives. If set to `0` or less than `-1` HistoryServer will throw an <code class="highlighter-rouge">IllegalConfigurationException</code>. </td>
33+
<td>The maximum number of jobs to retain in each archive directory defined by `historyserver.archive.fs.dir`. When this option is enabled together with the `historyserver.archive.retained-ttl` option, the job archive will be removed if its TTL has expired or the retained job count has been reached. If set to `-1`(default), there is no limit to the number of archives. If set to `0` or less than `-1` HistoryServer will throw an <code class="highlighter-rouge">IllegalConfigurationException</code>. Note, when there are multiple history server instances, please enable the configuration option like following: <ul><li>Enable this feature in only one HistoryServer instance to avoid errors caused by multiple instances simultaneously cleaning up remote files, </li><li>Or you can keep the value of this configuration consistent across them. </li></ul></td>
34+
</tr>
35+
<tr>
36+
<td><h5>historyserver.archive.retained-ttl</h5></td>
37+
<td style="word-wrap: break-word;">(none)</td>
38+
<td>Duration</td>
39+
<td>The time-to-live duration to retain the jobs archived in each archive directory defined by `historyserver.archive.fs.dir`. When this option is enabled together with the `historyserver.archive.retained-jobs` option, the job archive will be removed if its TTL has expired or the retained job count has been reached. Note, when there are multiple history server instances, please enable the configuration option like following: <ul><li>Enable this feature in only one HistoryServer instance to avoid errors caused by multiple instances simultaneously cleaning up remote files, </li><li>Or you can keep the value of this configuration consistent across them. </li></ul></td>
3440
</tr>
3541
<tr>
3642
<td><h5>historyserver.log.jobmanager.url-pattern</h5></td>

flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import static org.apache.flink.configuration.ConfigOptions.key;
2727
import static org.apache.flink.configuration.description.TextElement.code;
28+
import static org.apache.flink.configuration.description.TextElement.text;
2829

2930
/** The set of configuration options relating to the HistoryServer. */
3031
@PublicEvolving
@@ -126,8 +127,21 @@ public class HistoryServerOptions {
126127
"Enable HTTPs access to the HistoryServer web frontend. This is applicable only when the"
127128
+ " global SSL flag security.ssl.enabled is set to true.");
128129

130+
private static final String HISTORY_SERVER_RETAINED_JOBS_KEY =
131+
"historyserver.archive.retained-jobs";
132+
private static final String HISTORY_SERVER_RETAINED_TTL_KEY =
133+
"historyserver.archive.retained-ttl";
134+
private static final String DESC_FORMAT =
135+
"When this option is enabled together with the `%s` option, the job archive will be removed if its TTL has expired or the retained job count has been reached. ";
136+
private static final String NOTE_MESSAGE =
137+
"Note, when there are multiple history server instances, please enable the configuration option like following: ";
138+
private static final String CONFIGURE_SINGLE_INSTANCE =
139+
"Enable this feature in only one HistoryServer instance to avoid errors caused by multiple instances simultaneously cleaning up remote files, ";
140+
private static final String CONFIGURE_CONSISTENT =
141+
"Or you can keep the value of this configuration consistent across them. ";
142+
129143
public static final ConfigOption<Integer> HISTORY_SERVER_RETAINED_JOBS =
130-
key("historyserver.archive.retained-jobs")
144+
key(HISTORY_SERVER_RETAINED_JOBS_KEY)
131145
.intType()
132146
.defaultValue(-1)
133147
.withDescription(
@@ -136,11 +150,37 @@ public class HistoryServerOptions {
136150
String.format(
137151
"The maximum number of jobs to retain in each archive directory defined by `%s`. ",
138152
HISTORY_SERVER_ARCHIVE_DIRS.key()))
153+
.text(
154+
String.format(
155+
DESC_FORMAT, HISTORY_SERVER_RETAINED_TTL_KEY))
139156
.text(
140157
"If set to `-1`(default), there is no limit to the number of archives. ")
141158
.text(
142159
"If set to `0` or less than `-1` HistoryServer will throw an %s. ",
143160
code("IllegalConfigurationException"))
161+
.text(NOTE_MESSAGE)
162+
.list(
163+
text(CONFIGURE_SINGLE_INSTANCE),
164+
text(CONFIGURE_CONSISTENT))
165+
.build());
166+
167+
public static final ConfigOption<Duration> HISTORY_SERVER_RETAINED_TTL =
168+
key(HISTORY_SERVER_RETAINED_TTL_KEY)
169+
.durationType()
170+
.noDefaultValue()
171+
.withDescription(
172+
Description.builder()
173+
.text(
174+
String.format(
175+
"The time-to-live duration to retain the jobs archived in each archive directory defined by `%s`. ",
176+
HISTORY_SERVER_ARCHIVE_DIRS.key()))
177+
.text(
178+
String.format(
179+
DESC_FORMAT, HISTORY_SERVER_RETAINED_JOBS_KEY))
180+
.text(NOTE_MESSAGE)
181+
.list(
182+
text(CONFIGURE_SINGLE_INSTANCE),
183+
text(CONFIGURE_CONSISTENT))
144184
.build());
145185

146186
private HistoryServerOptions() {}

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.flink.configuration.Configuration;
2323
import org.apache.flink.configuration.GlobalConfiguration;
2424
import org.apache.flink.configuration.HistoryServerOptions;
25-
import org.apache.flink.configuration.IllegalConfigurationException;
2625
import org.apache.flink.core.fs.FileSystem;
2726
import org.apache.flink.core.fs.Path;
2827
import org.apache.flink.core.plugin.PluginUtils;
@@ -38,6 +37,7 @@
3837
import org.apache.flink.runtime.security.SecurityUtils;
3938
import org.apache.flink.runtime.util.EnvironmentInformation;
4039
import org.apache.flink.runtime.util.Runnables;
40+
import org.apache.flink.runtime.webmonitor.history.retaining.CompositeJobRetainedStrategy;
4141
import org.apache.flink.runtime.webmonitor.utils.LogUrlUtil;
4242
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
4343
import org.apache.flink.util.ExceptionUtils;
@@ -238,19 +238,13 @@ public HistoryServer(
238238

239239
refreshIntervalMillis =
240240
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL).toMillis();
241-
int maxHistorySize = config.get(HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS);
242-
if (maxHistorySize == 0 || maxHistorySize < -1) {
243-
throw new IllegalConfigurationException(
244-
"Cannot set %s to 0 or less than -1",
245-
HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS.key());
246-
}
247241
archiveFetcher =
248242
new HistoryServerArchiveFetcher(
249243
refreshDirs,
250244
webDir,
251245
jobArchiveEventListener,
252246
cleanupExpiredArchives,
253-
maxHistorySize);
247+
CompositeJobRetainedStrategy.createFrom(config));
254248

255249
this.shutdownHook =
256250
ShutdownHookUtil.addShutdownHook(

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
3030
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
3131
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
32+
import org.apache.flink.runtime.webmonitor.history.retaining.JobRetainedStrategy;
3233
import org.apache.flink.util.FileUtils;
3334
import org.apache.flink.util.jackson.JacksonMapperFactory;
3435

@@ -112,8 +113,7 @@ public ArchiveEventType getType() {
112113
private final List<HistoryServer.RefreshLocation> refreshDirs;
113114
private final Consumer<ArchiveEvent> jobArchiveEventListener;
114115
private final boolean processExpiredArchiveDeletion;
115-
private final boolean processBeyondLimitArchiveDeletion;
116-
private final int maxHistorySize;
116+
private final JobRetainedStrategy jobRetainedStrategy;
117117

118118
/** Cache of all available jobs identified by their id. */
119119
private final Map<Path, Set<String>> cachedArchivesPerRefreshDirectory;
@@ -127,13 +127,12 @@ public ArchiveEventType getType() {
127127
File webDir,
128128
Consumer<ArchiveEvent> jobArchiveEventListener,
129129
boolean cleanupExpiredArchives,
130-
int maxHistorySize)
130+
JobRetainedStrategy jobRetainedStrategy)
131131
throws IOException {
132132
this.refreshDirs = checkNotNull(refreshDirs);
133133
this.jobArchiveEventListener = jobArchiveEventListener;
134134
this.processExpiredArchiveDeletion = cleanupExpiredArchives;
135-
this.maxHistorySize = maxHistorySize;
136-
this.processBeyondLimitArchiveDeletion = this.maxHistorySize > 0;
135+
this.jobRetainedStrategy = checkNotNull(jobRetainedStrategy);
137136
this.cachedArchivesPerRefreshDirectory = new HashMap<>();
138137
for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
139138
cachedArchivesPerRefreshDirectory.put(refreshDir.getPath(), new HashSet<>());
@@ -159,7 +158,7 @@ void fetchArchives() {
159158
Map<Path, Set<String>> jobsToRemove = new HashMap<>();
160159
cachedArchivesPerRefreshDirectory.forEach(
161160
(path, archives) -> jobsToRemove.put(path, new HashSet<>(archives)));
162-
Map<Path, Set<Path>> archivesBeyondSizeLimit = new HashMap<>();
161+
Map<Path, Set<Path>> archivesBeyondRetainedLimit = new HashMap<>();
163162
for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) {
164163
Path refreshDir = refreshLocation.getPath();
165164
LOG.debug("Checking archive directory {}.", refreshDir);
@@ -176,7 +175,7 @@ void fetchArchives() {
176175
continue;
177176
}
178177

179-
int historySize = 0;
178+
int fileOrderedIndexOnModifiedTime = 0;
180179
for (FileStatus jobArchive : jobArchives) {
181180
Path jobArchivePath = jobArchive.getPath();
182181
String jobID = jobArchivePath.getName();
@@ -186,9 +185,10 @@ void fetchArchives() {
186185

187186
jobsToRemove.get(refreshDir).remove(jobID);
188187

189-
historySize++;
190-
if (historySize > maxHistorySize && processBeyondLimitArchiveDeletion) {
191-
archivesBeyondSizeLimit
188+
fileOrderedIndexOnModifiedTime++;
189+
if (!jobRetainedStrategy.shouldRetain(
190+
jobArchive, fileOrderedIndexOnModifiedTime)) {
191+
archivesBeyondRetainedLimit
192192
.computeIfAbsent(refreshDir, ignored -> new HashSet<>())
193193
.add(jobArchivePath);
194194
continue;
@@ -220,8 +220,8 @@ void fetchArchives() {
220220
&& processExpiredArchiveDeletion) {
221221
events.addAll(cleanupExpiredJobs(jobsToRemove));
222222
}
223-
if (!archivesBeyondSizeLimit.isEmpty() && processBeyondLimitArchiveDeletion) {
224-
events.addAll(cleanupJobsBeyondSizeLimit(archivesBeyondSizeLimit));
223+
if (!archivesBeyondRetainedLimit.isEmpty()) {
224+
events.addAll(cleanupJobsBeyondSizeLimit(archivesBeyondRetainedLimit));
225225
}
226226
if (!events.isEmpty()) {
227227
updateJobOverview(webOverviewDir, webDir);
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.webmonitor.history.retaining;
20+
21+
import org.apache.flink.configuration.IllegalConfigurationException;
22+
import org.apache.flink.configuration.ReadableConfig;
23+
import org.apache.flink.core.fs.FileStatus;
24+
25+
import javax.annotation.Nullable;
26+
27+
import java.time.Duration;
28+
import java.time.Instant;
29+
import java.util.Arrays;
30+
import java.util.Collections;
31+
import java.util.List;
32+
import java.util.Optional;
33+
34+
import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS;
35+
import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL;
36+
37+
/** The retained strategy. */
38+
public class CompositeJobRetainedStrategy implements JobRetainedStrategy {
39+
40+
public static JobRetainedStrategy createFrom(ReadableConfig config) {
41+
int maxHistorySizeByOldKey = config.get(HISTORY_SERVER_RETAINED_JOBS);
42+
if (maxHistorySizeByOldKey == 0 || maxHistorySizeByOldKey < -1) {
43+
throw new IllegalConfigurationException(
44+
"Cannot set %s to 0 or less than -1", HISTORY_SERVER_RETAINED_JOBS.key());
45+
}
46+
Optional<Duration> retainedTtlOpt = config.getOptional(HISTORY_SERVER_RETAINED_TTL);
47+
return new CompositeJobRetainedStrategy(
48+
new QuantityJobRetainedStrategy(maxHistorySizeByOldKey),
49+
new TimeToLiveJobRetainedStrategy(retainedTtlOpt.orElse(null)));
50+
}
51+
52+
private final List<JobRetainedStrategy> strategies;
53+
54+
CompositeJobRetainedStrategy(JobRetainedStrategy... strategies) {
55+
this.strategies =
56+
strategies == null || strategies.length == 0
57+
? Collections.emptyList()
58+
: Arrays.asList(strategies);
59+
}
60+
61+
@Override
62+
public boolean shouldRetain(FileStatus file, int fileOrderedIndex) {
63+
if (strategies.isEmpty()) {
64+
return true;
65+
}
66+
return strategies.stream().allMatch(s -> s.shouldRetain(file, fileOrderedIndex));
67+
}
68+
}
69+
70+
/** The time to live based retained strategy. */
71+
class TimeToLiveJobRetainedStrategy implements JobRetainedStrategy {
72+
73+
@Nullable private final Duration ttlThreshold;
74+
75+
TimeToLiveJobRetainedStrategy(Duration ttlThreshold) {
76+
this.ttlThreshold = ttlThreshold;
77+
}
78+
79+
@Override
80+
public boolean shouldRetain(FileStatus file, int fileOrderedIndex) {
81+
if (ttlThreshold == null || ttlThreshold.toMillis() <= 0L) {
82+
return true;
83+
}
84+
return Instant.now().toEpochMilli() - file.getModificationTime() < ttlThreshold.toMillis();
85+
}
86+
}
87+
88+
/** The job quantity based retained strategy. */
89+
class QuantityJobRetainedStrategy implements JobRetainedStrategy {
90+
91+
private final int quantityThreshold;
92+
93+
QuantityJobRetainedStrategy(int quantityThreshold) {
94+
this.quantityThreshold = quantityThreshold;
95+
}
96+
97+
@Override
98+
public boolean shouldRetain(FileStatus file, int fileOrderedIndex) {
99+
if (quantityThreshold <= 0) {
100+
return true;
101+
}
102+
return fileOrderedIndex <= quantityThreshold;
103+
}
104+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.webmonitor.history.retaining;
20+
21+
import org.apache.flink.core.fs.FileStatus;
22+
23+
/** To define the strategy interface to judge whether the file should be retained. */
24+
public interface JobRetainedStrategy {
25+
26+
/**
27+
* Judge whether the file should be retained.
28+
*
29+
* @param file the target file to judge.
30+
* @param fileOrderedIndex the specified order index position of the target file,
31+
* @return The result that indicates whether the file should be retained.
32+
*/
33+
boolean shouldRetain(FileStatus file, int fileOrderedIndex);
34+
}

0 commit comments

Comments
 (0)