Skip to content

Commit 598fcc2

Browse files
authored
Merge pull request #9 from DTStack/v1.4.1
V1.4.1
2 parents 6d64c24 + 63d6b21 commit 598fcc2

File tree

6 files changed

+187
-89
lines changed

6 files changed

+187
-89
lines changed

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterMode.java renamed to core/src/main/java/com/dtstack/flink/sql/ClusterMode.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,22 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.dtstack.flink.sql.launcher;
19+
package com.dtstack.flink.sql;
2020

2121
/**
2222
* This class defines three running mode of FlinkX
2323
*
2424
* Company: www.dtstack.com
2525
2626
*/
27-
public class ClusterMode {
27+
public enum ClusterMode {
2828

29-
public static final String MODE_LOCAL = "local";
29+
local(0),standalone(1),yarn(2),yarnPer(3);
3030

31-
public static final String MODE_STANDALONE = "standalone";
31+
private int type;
3232

33-
public static final String MODE_YARN = "yarn";
33+
ClusterMode(int type){
34+
this.type = type;
35+
}
3436

3537
}

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,6 @@ public class Main {
8888

8989
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
9090

91-
private static final String LOCAL_MODE = "local";
92-
9391
private static final int failureRate = 3;
9492

9593
private static final int failureInterval = 6; //min
@@ -135,7 +133,7 @@ public static void main(String[] args) throws Exception {
135133
Thread.currentThread().setContextClassLoader(dtClassLoader);
136134

137135
URLClassLoader parentClassloader;
138-
if(!LOCAL_MODE.equals(deployMode)){
136+
if(!ClusterMode.local.name().equals(deployMode)){
139137
parentClassloader = (URLClassLoader) threadClassLoader.getParent();
140138
}else{
141139
parentClassloader = dtClassLoader;
@@ -286,7 +284,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
286284
}
287285

288286
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException {
289-
StreamExecutionEnvironment env = !LOCAL_MODE.equals(deployMode) ?
287+
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
290288
StreamExecutionEnvironment.getExecutionEnvironment() :
291289
new MyLocalStreamEnvironment();
292290

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
3434
import org.apache.hadoop.yarn.client.api.YarnClient;
3535
import org.apache.hadoop.yarn.conf.YarnConfiguration;
36-
36+
import com.dtstack.flink.sql.ClusterMode;
3737
import java.io.File;
3838
import java.io.FilenameFilter;
3939
import java.lang.reflect.Field;
@@ -42,10 +42,8 @@
4242
import java.util.Iterator;
4343
import java.util.List;
4444
import java.util.Map;
45-
import java.util.Properties;
4645
import java.util.Set;
4746

48-
import static com.dtstack.flink.sql.launcher.LauncherOptions.*;
4947

5048
/**
5149
* The Factory of ClusterClient
@@ -55,29 +53,29 @@
5553
*/
5654
public class ClusterClientFactory {
5755

58-
public static ClusterClient createClusterClient(Properties props) {
59-
String clientType = props.getProperty(OPTION_MODE);
60-
if(clientType.equals(ClusterMode.MODE_STANDALONE)) {
61-
return createStandaloneClient(props);
62-
} else if(clientType.equals(ClusterMode.MODE_YARN)) {
63-
return createYarnClient(props);
56+
public static ClusterClient createClusterClient(LauncherOptions launcherOptions) {
57+
String mode = launcherOptions.getMode();
58+
if(mode.equals(ClusterMode.standalone.name())) {
59+
return createStandaloneClient(launcherOptions);
60+
} else if(mode.equals(ClusterMode.yarn.name())) {
61+
return createYarnClient(launcherOptions);
6462
}
6563
throw new IllegalArgumentException("Unsupported cluster client type: ");
6664
}
6765

68-
public static StandaloneClusterClient createStandaloneClient(Properties props) {
69-
String flinkConfDir = props.getProperty(LauncherOptions.OPTION_FLINK_CONF_DIR);
66+
public static StandaloneClusterClient createStandaloneClient(LauncherOptions launcherOptions) {
67+
String flinkConfDir = launcherOptions.getFlinkconf();
7068
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
7169
StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
7270
StandaloneClusterClient clusterClient = descriptor.retrieve(null);
7371
clusterClient.setDetached(true);
7472
return clusterClient;
7573
}
7674

77-
public static YarnClusterClient createYarnClient(Properties props) {
78-
String flinkConfDir = props.getProperty(LauncherOptions.OPTION_FLINK_CONF_DIR);
75+
public static YarnClusterClient createYarnClient(LauncherOptions launcherOptions) {
76+
String flinkConfDir = launcherOptions.getFlinkconf();
7977
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
80-
String yarnConfDir = props.getProperty(LauncherOptions.OPTION_YARN_CONF_DIR);
78+
String yarnConfDir =launcherOptions.getYarnconf();
8179
org.apache.hadoop.conf.Configuration yarnConf = new YarnConfiguration();
8280
if(StringUtils.isNotBlank(yarnConfDir)) {
8381
try {

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,14 @@
2222

2323
import avro.shaded.com.google.common.collect.Lists;
2424
import com.dtstack.flink.sql.Main;
25+
import org.apache.commons.lang.BooleanUtils;
26+
import org.apache.commons.lang3.StringUtils;
2527
import org.apache.flink.client.program.ClusterClient;
2628
import org.apache.flink.client.program.PackagedProgram;
27-
2829
import java.io.File;
2930
import java.util.List;
30-
31-
import static com.dtstack.flink.sql.launcher.ClusterMode.MODE_LOCAL;
32-
import static com.dtstack.flink.sql.launcher.LauncherOptions.OPTION_LOCAL_SQL_PLUGIN_PATH;
33-
import static com.dtstack.flink.sql.launcher.LauncherOptions.OPTION_MODE;
31+
import com.dtstack.flink.sql.ClusterMode;
32+
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
3433

3534
/**
3635
* Date: 2017/2/20
@@ -51,18 +50,21 @@ private static String getLocalCoreJarPath(String localSqlRootJar){
5150

5251
public static void main(String[] args) throws Exception {
5352
LauncherOptionParser optionParser = new LauncherOptionParser(args);
54-
String mode = (String) optionParser.getVal(OPTION_MODE);
53+
LauncherOptions launcherOptions = optionParser.getLauncherOptions();
54+
String mode = launcherOptions.getMode();
5555
List<String> argList = optionParser.getProgramExeArgList();
56-
57-
if(mode.equals(MODE_LOCAL)) {
56+
if(mode.equals(ClusterMode.local.name())) {
5857
String[] localArgs = argList.toArray(new String[argList.size()]);
5958
Main.main(localArgs);
6059
} else {
61-
ClusterClient clusterClient = ClusterClientFactory.createClusterClient(optionParser.getProperties());
62-
String pluginRoot = (String) optionParser.getVal(OPTION_LOCAL_SQL_PLUGIN_PATH);
60+
ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions);
61+
String pluginRoot = launcherOptions.getLocalSqlPluginPath();
6362
File jarFile = new File(getLocalCoreJarPath(pluginRoot));
6463
String[] remoteArgs = argList.toArray(new String[argList.size()]);
6564
PackagedProgram program = new PackagedProgram(jarFile, Lists.newArrayList(), remoteArgs);
65+
if(StringUtils.isNotBlank(launcherOptions.getSavePointPath())){
66+
program.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(launcherOptions.getSavePointPath(), BooleanUtils.toBoolean(launcherOptions.getAllowNonRestoredState())));
67+
}
6668
clusterClient.run(program, 1);
6769
clusterClient.shutdown();
6870
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptionParser.java

Lines changed: 55 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,19 @@
1919
package com.dtstack.flink.sql.launcher;
2020

2121
import avro.shaded.com.google.common.collect.Lists;
22+
import com.dtstack.flink.sql.util.PluginUtil;
2223
import org.apache.commons.cli.BasicParser;
2324
import org.apache.commons.cli.CommandLine;
2425
import org.apache.commons.cli.Options;
2526
import org.apache.commons.lang.StringUtils;
2627
import org.apache.flink.hadoop.shaded.com.google.common.base.Charsets;
2728
import org.apache.flink.hadoop.shaded.com.google.common.base.Preconditions;
28-
2929
import java.io.File;
3030
import java.io.FileInputStream;
3131
import java.net.URLEncoder;
3232
import java.util.List;
3333
import java.util.Map;
34-
import java.util.Properties;
35-
36-
import static com.dtstack.flink.sql.launcher.LauncherOptions.*;
37-
import static com.dtstack.flink.sql.launcher.ClusterMode.*;
38-
34+
import com.dtstack.flink.sql.ClusterMode;
3935

4036
/**
4137
* The Parser of Launcher commandline options
@@ -45,14 +41,36 @@
4541
*/
4642
public class LauncherOptionParser {
4743

44+
public static final String OPTION_MODE = "mode";
45+
46+
public static final String OPTION_NAME = "name";
47+
48+
public static final String OPTION_SQL = "sql";
49+
50+
public static final String OPTION_FLINK_CONF_DIR = "flinkconf";
51+
52+
public static final String OPTION_YARN_CONF_DIR = "yarnconf";
53+
54+
public static final String OPTION_LOCAL_SQL_PLUGIN_PATH = "localSqlPluginPath";
55+
56+
public static final String OPTION_REMOTE_SQL_PLUGIN_PATH = "remoteSqlPluginPath";
57+
58+
public static final String OPTION_ADDJAR = "addjar";
59+
60+
public static final String OPTION_CONF_PROP = "confProp";
61+
62+
public static final String OPTION_SAVE_POINT_PATH = "savePointPath";
63+
64+
public static final String OPTION_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState";
65+
4866
private Options options = new Options();
4967

5068
private BasicParser parser = new BasicParser();
5169

52-
private Properties properties = new Properties();
70+
private LauncherOptions properties = new LauncherOptions();
5371

5472
public LauncherOptionParser(String[] args) {
55-
options.addOption(LauncherOptions.OPTION_MODE, true, "Running mode");
73+
options.addOption(OPTION_MODE, true, "Running mode");
5674
options.addOption(OPTION_SQL, true, "Job sql file");
5775
options.addOption(OPTION_NAME, true, "Job name");
5876
options.addOption(OPTION_FLINK_CONF_DIR, true, "Flink configuration directory");
@@ -62,11 +80,14 @@ public LauncherOptionParser(String[] args) {
6280
options.addOption(OPTION_CONF_PROP, true, "sql ref prop,eg specify event time");
6381
options.addOption(OPTION_YARN_CONF_DIR, true, "Yarn and hadoop configuration directory");
6482

83+
options.addOption(OPTION_SAVE_POINT_PATH, true, "Savepoint restore path");
84+
options.addOption(OPTION_ALLOW_NON_RESTORED_STATE, true, "Flag indicating whether non restored state is allowed if the savepoint");
85+
6586
try {
6687
CommandLine cl = parser.parse(options, args);
67-
String mode = cl.getOptionValue(OPTION_MODE, MODE_LOCAL);
88+
String mode = cl.getOptionValue(OPTION_MODE, ClusterMode.local.name());
6889
//check mode
69-
properties.put(OPTION_MODE, mode);
90+
properties.setMode(mode);
7091

7192
String job = Preconditions.checkNotNull(cl.getOptionValue(OPTION_SQL),
7293
"Must specify job file using option '" + OPTION_SQL + "'");
@@ -76,78 +97,65 @@ public LauncherOptionParser(String[] args) {
7697
in.read(filecontent);
7798
String content = new String(filecontent, "UTF-8");
7899
String sql = URLEncoder.encode(content, Charsets.UTF_8.name());
79-
properties.put(OPTION_SQL, sql);
80-
100+
properties.setSql(sql);
81101
String localPlugin = Preconditions.checkNotNull(cl.getOptionValue(OPTION_LOCAL_SQL_PLUGIN_PATH));
82-
properties.put(OPTION_LOCAL_SQL_PLUGIN_PATH, localPlugin);
83-
102+
properties.setLocalSqlPluginPath(localPlugin);
84103
String remotePlugin = cl.getOptionValue(OPTION_REMOTE_SQL_PLUGIN_PATH);
85-
if(!mode.equalsIgnoreCase(ClusterMode.MODE_LOCAL)){
104+
if(!ClusterMode.local.name().equals(mode)){
86105
Preconditions.checkNotNull(remotePlugin);
87-
properties.put(OPTION_REMOTE_SQL_PLUGIN_PATH, remotePlugin);
106+
properties.setRemoteSqlPluginPath(remotePlugin);
88107
}
89-
90108
String name = Preconditions.checkNotNull(cl.getOptionValue(OPTION_NAME));
91-
properties.put(OPTION_NAME, name);
92-
109+
properties.setName(name);
93110
String addJar = cl.getOptionValue(OPTION_ADDJAR);
94111
if(StringUtils.isNotBlank(addJar)){
95-
properties.put(OPTION_ADDJAR, addJar);
112+
properties.setAddjar(addJar);
96113
}
97-
98114
String confProp = cl.getOptionValue(OPTION_CONF_PROP);
99115
if(StringUtils.isNotBlank(confProp)){
100-
properties.put(OPTION_CONF_PROP, confProp);
116+
properties.setConfProp(confProp);
101117
}
102-
103118
String flinkConfDir = cl.getOptionValue(OPTION_FLINK_CONF_DIR);
104119
if(StringUtils.isNotBlank(flinkConfDir)) {
105-
properties.put(OPTION_FLINK_CONF_DIR, flinkConfDir);
120+
properties.setFlinkconf(flinkConfDir);
106121
}
107122

108123
String yarnConfDir = cl.getOptionValue(OPTION_YARN_CONF_DIR);
109124
if(StringUtils.isNotBlank(yarnConfDir)) {
110-
properties.put(OPTION_YARN_CONF_DIR, yarnConfDir);
125+
properties.setYarnconf(yarnConfDir);
126+
}
127+
128+
String savePointPath = cl.getOptionValue(OPTION_SAVE_POINT_PATH);
129+
if(StringUtils.isNotBlank(savePointPath)) {
130+
properties.setSavePointPath(savePointPath);
131+
}
132+
133+
String allow_non = cl.getOptionValue(OPTION_ALLOW_NON_RESTORED_STATE);
134+
if(StringUtils.isNotBlank(allow_non)) {
135+
properties.setAllowNonRestoredState(allow_non);
111136
}
112137

113138
} catch (Exception e) {
114139
throw new RuntimeException(e);
115140
}
116-
117141
}
118142

119-
public Properties getProperties(){
143+
public LauncherOptions getLauncherOptions(){
120144
return properties;
121145
}
122146

123-
public Object getVal(String key){
124-
return properties.get(key);
125-
}
126-
127-
public List<String> getAllArgList(){
147+
public List<String> getProgramExeArgList() throws Exception {
148+
Map<String,Object> mapConf = PluginUtil.ObjectToMap(properties);
128149
List<String> args = Lists.newArrayList();
129-
for(Map.Entry<Object, Object> one : properties.entrySet()){
130-
args.add("-" + one.getKey().toString());
131-
args.add(one.getValue().toString());
132-
}
133-
134-
return args;
135-
}
136-
137-
public List<String> getProgramExeArgList(){
138-
List<String> args = Lists.newArrayList();
139-
for(Map.Entry<Object, Object> one : properties.entrySet()){
140-
String key = one.getKey().toString();
150+
for(Map.Entry<String, Object> one : mapConf.entrySet()){
151+
String key = one.getKey();
141152
if(OPTION_FLINK_CONF_DIR.equalsIgnoreCase(key)
142153
|| OPTION_YARN_CONF_DIR.equalsIgnoreCase(key)){
143154
continue;
144155
}
145-
146156
args.add("-" + key);
147157
args.add(one.getValue().toString());
148158
}
149-
150159
return args;
151160
}
152-
153161
}

0 commit comments

Comments
 (0)