|
21 | 21 | import org.apache.flink.api.common.JobID; |
22 | 22 | import org.apache.flink.api.common.RuntimeExecutionMode; |
23 | 23 | import org.apache.flink.api.dag.Pipeline; |
24 | | -import org.apache.flink.client.FlinkPipelineTranslationUtil; |
25 | 24 | import org.apache.flink.client.cli.ClientOptions; |
26 | 25 | import org.apache.flink.client.cli.ExecutionConfigAccessor; |
27 | 26 | import org.apache.flink.configuration.Configuration; |
28 | 27 | import org.apache.flink.configuration.DeploymentOptions; |
29 | 28 | import org.apache.flink.configuration.ExecutionOptions; |
30 | 29 | import org.apache.flink.configuration.PipelineOptionsInternal; |
31 | 30 | import org.apache.flink.core.execution.JobStatusChangedListener; |
32 | | -import org.apache.flink.runtime.jobgraph.JobGraph; |
33 | 31 | import org.apache.flink.streaming.api.graph.ExecutionPlan; |
34 | 32 | import org.apache.flink.streaming.api.graph.StreamGraph; |
35 | 33 | import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent; |
|
39 | 37 |
|
40 | 38 | import javax.annotation.Nonnull; |
41 | 39 |
|
42 | | -import java.net.MalformedURLException; |
43 | 40 | import java.util.List; |
44 | 41 |
|
45 | 42 | import static org.apache.flink.util.Preconditions.checkNotNull; |
|
49 | 46 | public class PipelineExecutorUtils { |
50 | 47 | private static final Logger LOG = LoggerFactory.getLogger(PipelineExecutorUtils.class); |
51 | 48 |
|
52 | | - /** |
53 | | - * Creates the {@link JobGraph} corresponding to the provided {@link Pipeline}. |
54 | | - * |
55 | | - * @param pipeline the pipeline whose job graph we are computing. |
56 | | - * @param configuration the configuration with the necessary information such as jars and |
57 | | - * classpaths to be included, the parallelism of the job and potential savepoint settings |
58 | | - * used to bootstrap its state. |
59 | | - * @param userClassloader the classloader which can load user classes. |
60 | | - * @return the corresponding {@link JobGraph}. |
61 | | - */ |
62 | | - public static JobGraph getJobGraph( |
63 | | - @Nonnull final Pipeline pipeline, |
64 | | - @Nonnull final Configuration configuration, |
65 | | - @Nonnull ClassLoader userClassloader) |
66 | | - throws MalformedURLException { |
67 | | - checkNotNull(pipeline); |
68 | | - checkNotNull(configuration); |
69 | | - |
70 | | - final ExecutionConfigAccessor executionConfigAccessor = |
71 | | - ExecutionConfigAccessor.fromConfiguration(configuration); |
72 | | - final JobGraph jobGraph = |
73 | | - FlinkPipelineTranslationUtil.getJobGraph( |
74 | | - userClassloader, |
75 | | - pipeline, |
76 | | - configuration, |
77 | | - executionConfigAccessor.getParallelism()); |
78 | | - |
79 | | - configuration |
80 | | - .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID) |
81 | | - .ifPresent(strJobID -> jobGraph.setJobID(JobID.fromHexString(strJobID))); |
82 | | - |
83 | | - if (configuration.get(DeploymentOptions.ATTACHED) |
84 | | - && configuration.get(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) { |
85 | | - jobGraph.setInitialClientHeartbeatTimeout( |
86 | | - configuration.get(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT).toMillis()); |
87 | | - } |
88 | | - |
89 | | - jobGraph.addJars(executionConfigAccessor.getJars()); |
90 | | - jobGraph.setClasspaths(executionConfigAccessor.getClasspaths()); |
91 | | - jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings()); |
92 | | - |
93 | | - return jobGraph; |
94 | | - } |
95 | | - |
96 | 49 | /** |
97 | 50 | * Notify the {@link DefaultJobCreatedEvent} to job status changed listeners. |
98 | 51 | * |
|
0 commit comments