From 280f9c44096b36875f35bfa97f645999a4294922 Mon Sep 17 00:00:00 2001 From: huzongtang Date: Thu, 11 Apr 2019 17:16:09 +0800 Subject: [PATCH 1/4] [issue#3]Drived the configed omsDriverUrl into three configed variables.And adjust some codes for runtime. --- pom.xml | 2 +- run_worker.sh | 2 +- .../connect/runtime/ConnectController.java | 2 +- .../connect/runtime/config/ConnectConfig.java | 44 +++++++++++++++---- .../runtime/config/RuntimeConfigDefine.java | 18 ++++++-- .../runtime/connectorwrapper/Worker.java | 2 +- .../service/ConfigManagementServiceImpl.java | 2 +- .../runtime/service/RebalanceService.java | 4 +- .../resources/{connect.conf => runtime.conf} | 11 +++-- .../runtime/config/ConnectConfigTest.java | 9 +++- .../runtime/connectorwrapper/WorkerTest.java | 6 ++- .../connect/runtime/rest/RestHandlerTest.java | 6 +-- .../ClusterManagementServiceImplTest.java | 2 +- .../ConfigManagementServiceImplTest.java | 4 +- .../PositionManagementServiceImplTest.java | 2 +- .../runtime/utils/TransferUtilsTest.java | 6 +-- 16 files changed, 84 insertions(+), 38 deletions(-) rename src/main/resources/{connect.conf => runtime.conf} (73%) diff --git a/pom.xml b/pom.xml index d360696..14e7039 100644 --- a/pom.xml +++ b/pom.xml @@ -72,7 +72,7 @@ src/main/resources *.xml - connect.conf + runtime.conf true diff --git a/run_worker.sh b/run_worker.sh index bea760a..466fc3c 100755 --- a/run_worker.sh +++ b/run_worker.sh @@ -1,5 +1,5 @@ #!/bin/bash export OMS_RMQ_DIRECT_NAME_SRV=true echo "run rumtime worker" -cd target/distribution/ && java -cp .:./conf/:./lib/* io.openmessaging.connect.runtime.ConnectStartup -c conf/connect.conf +cd target/distribution/ && java -cp .:./conf/:./lib/* io.openmessaging.connect.runtime.ConnectStartup -c conf/runtime.conf diff --git a/src/main/java/io/openmessaging/connect/runtime/ConnectController.java b/src/main/java/io/openmessaging/connect/runtime/ConnectController.java index 6c81b66..8bfc740 100644 --- a/src/main/java/io/openmessaging/connect/runtime/ConnectController.java +++ b/src/main/java/io/openmessaging/connect/runtime/ConnectController.java @@ -91,7 +91,7 @@ public ConnectController(ConnectConfig connectConfig) { this.connectConfig = connectConfig; this.messagingAccessWrapper = new MessagingAccessWrapper(); - MessagingAccessPoint messageAccessPoint = messagingAccessWrapper.getMessageAccessPoint(connectConfig.getOmsDriverUrl()); + MessagingAccessPoint messageAccessPoint = messagingAccessWrapper.getMessageAccessPoint(connectConfig.getRuntimeOmsDriverUrl()); this.clusterManagementService = new ClusterManagementServiceImpl(connectConfig, messageAccessPoint); this.configManagementService = new ConfigManagementServiceImpl(connectConfig, messageAccessPoint); this.positionManagementService = new PositionManagementServiceImpl(connectConfig, messageAccessPoint); diff --git a/src/main/java/io/openmessaging/connect/runtime/config/ConnectConfig.java b/src/main/java/io/openmessaging/connect/runtime/config/ConnectConfig.java index a758af4..15f26fb 100644 --- a/src/main/java/io/openmessaging/connect/runtime/config/ConnectConfig.java +++ b/src/main/java/io/openmessaging/connect/runtime/config/ConnectConfig.java @@ -34,11 +34,21 @@ public class ConnectConfig { */ private String storePathRootDir = System.getProperty("user.home") + File.separator + "connectorStore"; + /** + * OMS driver url for source task, which determine the specific source task to pull message data from where. + */ + private String sourceOmsDriverUrl = "oms:rocketmq://localhost:9876/default:default"; + /** * OMS driver url, which determine the specific MQ to send and consume message. * The MQ is used for internal management of the connect runtime. */ - private String omsDriverUrl = "oms:rocketmq://localhost:9876/default:default"; + private String runtimeOmsDriverUrl = "oms:rocketmq://localhost:9876/default:default"; + + /** + * OMS driver url for sink task, which determine the specific source task to send message data to where. + */ + private String sinkOmsDriverUrl = "oms:rocketmq://localhost:9876/default:default"; /** * Http port for REST API. @@ -55,14 +65,6 @@ public class ConnectConfig { */ private int configPersistInterval = 20 * 1000; - public String getOmsDriverUrl() { - return omsDriverUrl; - } - - public void setOmsDriverUrl(String omsDriverUrl) { - this.omsDriverUrl = omsDriverUrl; - } - public String getWorkerId() { return workerId; } @@ -102,4 +104,28 @@ public int getConfigPersistInterval() { public void setConfigPersistInterval(int configPersistInterval) { this.configPersistInterval = configPersistInterval; } + + public String getSourceOmsDriverUrl() { + return sourceOmsDriverUrl; + } + + public void setSourceOmsDriverUrl(String sourceOmsDriverUrl) { + this.sourceOmsDriverUrl = sourceOmsDriverUrl; + } + + public String getRuntimeOmsDriverUrl() { + return runtimeOmsDriverUrl; + } + + public void setRuntimeOmsDriverUrl(String runtimeOmsDriverUrl) { + this.runtimeOmsDriverUrl = runtimeOmsDriverUrl; + } + + public String getSinkOmsDriverUrl() { + return sinkOmsDriverUrl; + } + + public void setSinkOmsDriverUrl(String sinkOmsDriverUrl) { + this.sinkOmsDriverUrl = sinkOmsDriverUrl; + } } diff --git a/src/main/java/io/openmessaging/connect/runtime/config/RuntimeConfigDefine.java b/src/main/java/io/openmessaging/connect/runtime/config/RuntimeConfigDefine.java index a5cdfda..2079d1c 100644 --- a/src/main/java/io/openmessaging/connect/runtime/config/RuntimeConfigDefine.java +++ b/src/main/java/io/openmessaging/connect/runtime/config/RuntimeConfigDefine.java @@ -34,9 +34,19 @@ public class RuntimeConfigDefine { public static final String TASK_CLASS = "task-class"; /** - * OMS driver url for the connector. + * OMS driver url for the source connector */ - public static final String OMS_DRIVER_URL = "oms-driver-url"; + public static final String SOURCE_OMS_DRIVER_URL = "source-oms-driver-url"; + + /** + * OMS driver url for the sink connector + */ + public static final String SINK_OMS_DRIVER_URL = "sink-oms-driver-url"; + + /** + * OMS driver url for the runtime connector + */ + public static final String RUNTIME_OMS_DRIVER_URL = "runtime-oms-driver-url"; /** * Last updated time of the configuration. @@ -59,8 +69,10 @@ public class RuntimeConfigDefine { public static final Set REQUEST_CONFIG = new HashSet(){ { add(CONNECTOR_CLASS); - add(OMS_DRIVER_URL); add(SOURCE_RECORD_CONVERTER); + add(SOURCE_OMS_DRIVER_URL); + add(SINK_OMS_DRIVER_URL); + add(RUNTIME_OMS_DRIVER_URL); } }; diff --git a/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/Worker.java b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/Worker.java index 0f2d8f4..9f35b2c 100644 --- a/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/Worker.java +++ b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/Worker.java @@ -209,7 +209,7 @@ public synchronized void startTasks(Map> taskConfi if(task instanceof SourceTask){ Producer producer = messagingAccessWrapper - .getMessageAccessPoint(keyValue.getString(RuntimeConfigDefine.OMS_DRIVER_URL)).createProducer(); + .getMessageAccessPoint(keyValue.getString(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL)).createProducer(); producer.startup(); WorkerSourceTask workerSourceTask = new WorkerSourceTask(connectorName, (SourceTask) task, keyValue, diff --git a/src/main/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImpl.java b/src/main/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImpl.java index a6daae6..f03864b 100644 --- a/src/main/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImpl.java +++ b/src/main/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImpl.java @@ -159,7 +159,7 @@ public String putConnectorConfig(String connectorName, ConnectKeyValue configs) newKeyValue.put(key, keyValue.getString(key)); } newKeyValue.put(RuntimeConfigDefine.TASK_CLASS, connector.taskClass().getName()); - newKeyValue.put(RuntimeConfigDefine.OMS_DRIVER_URL, configs.getString(RuntimeConfigDefine.OMS_DRIVER_URL)); + newKeyValue.put(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL, configs.getString(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL)); newKeyValue.put(RuntimeConfigDefine.UPDATE_TIMESATMP, currentTimestamp); converterdConfigs.add(newKeyValue); } diff --git a/src/main/java/io/openmessaging/connect/runtime/service/RebalanceService.java b/src/main/java/io/openmessaging/connect/runtime/service/RebalanceService.java index 0ee8fbc..007b962 100644 --- a/src/main/java/io/openmessaging/connect/runtime/service/RebalanceService.java +++ b/src/main/java/io/openmessaging/connect/runtime/service/RebalanceService.java @@ -47,7 +47,7 @@ public RebalanceService(RebalanceImpl rebalanceImpl, ConfigManagementService con this.rebalanceImpl = rebalanceImpl; this.configManagementService = configManagementService; this.clusterManagementService = clusterManagementService; - this.configManagementService.registerListener(new ConnectorConnectorConfigChangeListenerImpl()); + this.configManagementService.registerListener(new ConnectorConfigChangeListenerImpl()); this.clusterManagementService.registerListener(new WorkerStatusListenerImpl()); } @@ -79,7 +79,7 @@ public void onWorkerChange() { } } - class ConnectorConnectorConfigChangeListenerImpl implements ConfigManagementService.ConnectorConfigUpdateListener { + class ConnectorConfigChangeListenerImpl implements ConfigManagementService.ConnectorConfigUpdateListener { /** * When config change. diff --git a/src/main/resources/connect.conf b/src/main/resources/runtime.conf similarity index 73% rename from src/main/resources/connect.conf rename to src/main/resources/runtime.conf index 868be08..67603c1 100644 --- a/src/main/resources/connect.conf +++ b/src/main/resources/runtime.conf @@ -16,11 +16,14 @@ ## Worker id, should be unique workerId=DEFAULT_WORKER_1 -## Choose a MQ to support runtime data synchronize -omsDriverUrl=oms:rocketmq://localhost:9876/default:default - ## Http prot for user to access REST API httpPort=8081 ## local file dir for config store -storePathRootDir=./storeRoot/ \ No newline at end of file +storePathRootDir=./storeRoot/ + +## From logical degree,there are threes MQ cluster +## the runtime server omsDriverUrl to support runtime data synchronize +sourceOmsDriverUrl=oms:rocketmq://localhost:9876/default:default +runtimeOmsDriverUrl=oms:rocketmq://localhost:9876/default:default +sinkOmsDriverUrl=oms:rocketmq://localhost:9876/default:default diff --git a/src/test/java/io/openmessaging/connect/runtime/config/ConnectConfigTest.java b/src/test/java/io/openmessaging/connect/runtime/config/ConnectConfigTest.java index 6bcead1..68e1b0e 100644 --- a/src/test/java/io/openmessaging/connect/runtime/config/ConnectConfigTest.java +++ b/src/test/java/io/openmessaging/connect/runtime/config/ConnectConfigTest.java @@ -27,10 +27,15 @@ public class ConnectConfigTest { public void testConnectConfigAttribute() { ConnectConfig connectConfig = new ConnectConfig(); connectConfig.setHttpPort(8081); - connectConfig.setOmsDriverUrl("oms:rocketmq://localhost:9876/default:default"); connectConfig.setWorkerId("DEFAULT_WORKER_1"); + connectConfig.setSourceOmsDriverUrl("oms:rocketmq://localhost:9876/default1:default1"); + connectConfig.setSinkOmsDriverUrl("oms:rocketmq://localhost:9876/default2:default2"); + connectConfig.setRuntimeOmsDriverUrl("oms:rocketmq://localhost:9876/default3:default3"); assertThat(connectConfig.getHttpPort()).isEqualTo(8081); - assertThat(connectConfig.getOmsDriverUrl()).isEqualTo("oms:rocketmq://localhost:9876/default:default"); assertThat(connectConfig.getWorkerId()).isEqualTo("DEFAULT_WORKER_1"); + assertThat(connectConfig.getSourceOmsDriverUrl()).isEqualTo("oms:rocketmq://localhost:9876/default1:default1"); + assertThat(connectConfig.getSinkOmsDriverUrl()).isEqualTo("oms:rocketmq://localhost:9876/default2:default2"); + assertThat(connectConfig.getRuntimeOmsDriverUrl()).isEqualTo("oms:rocketmq://localhost:9876/default3:default3"); + } } \ No newline at end of file diff --git a/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java b/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java index 2e15fac..9d78039 100644 --- a/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java +++ b/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java @@ -65,7 +65,9 @@ public class WorkerTest { public void init() { connectConfig = new ConnectConfig(); connectConfig.setHttpPort(8081); - connectConfig.setOmsDriverUrl("oms:rocketmq://localhost:9876/default:default"); + connectConfig.setSinkOmsDriverUrl("oms:rocketmq://localhost:9876/default:default"); + connectConfig.setSourceOmsDriverUrl("oms:rocketmq://localhost:9876/default:default"); + connectConfig.setRuntimeOmsDriverUrl("oms:rocketmq://localhost:9876/default:default"); connectConfig.setWorkerId("DEFAULT_WORKER_1"); connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore"); messagingAccessWrapper = new MessagingAccessWrapper(); @@ -139,7 +141,7 @@ public void testStartTasks() { connectKeyValue.getProperties().put("key2", "TEST-CONN-" + i + "2"); connectKeyValue.getProperties().put(RuntimeConfigDefine.TASK_CLASS, TestSourceTask.class.getName()); connectKeyValue.getProperties().put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, TestConverter.class.getName()); - connectKeyValue.getProperties().put(RuntimeConfigDefine.OMS_DRIVER_URL, this.connectConfig.getOmsDriverUrl()); + connectKeyValue.getProperties().put(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL, this.connectConfig.getRuntimeOmsDriverUrl()); connectKeyValues.add(connectKeyValue); taskConfigs.put("TEST-CONN-" + i, connectKeyValues); } diff --git a/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java b/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java index 3b9e6b1..8ec0c8c 100644 --- a/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java +++ b/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java @@ -59,8 +59,6 @@ import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.when; -//import org.apache.rocketmq.mysql.MysqlConstants; - @RunWith(MockitoJUnitRunner.class) public class RestHandlerTest { @@ -139,12 +137,12 @@ public void init() throws Exception { String connectName = "testConnector"; ConnectKeyValue connectKeyValue = new ConnectKeyValue(); connectKeyValue.put(RuntimeConfigDefine.CONNECTOR_CLASS, "io.openmessaging.connect.runtime.service.TestConnector"); - connectKeyValue.put(RuntimeConfigDefine.OMS_DRIVER_URL, "oms:rocketmq://localhost:9876/default:default"); + connectKeyValue.put(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL, "oms:rocketmq://localhost:9876/default:default"); connectKeyValue.put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, "source-record-converter"); ConnectKeyValue connectKeyValue1 = new ConnectKeyValue(); connectKeyValue1.put(RuntimeConfigDefine.CONNECTOR_CLASS, "io.openmessaging.connect.runtime.service.TestConnector"); - connectKeyValue1.put(RuntimeConfigDefine.OMS_DRIVER_URL, "oms:kafka://localhost:1234/default:default"); + connectKeyValue1.put(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL, "oms:kafka://localhost:1234/default:default"); connectKeyValue1.put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, "source-record-converter1"); List connectKeyValues = new ArrayList(8) { diff --git a/src/test/java/io/openmessaging/connect/runtime/service/ClusterManagementServiceImplTest.java b/src/test/java/io/openmessaging/connect/runtime/service/ClusterManagementServiceImplTest.java index e0bf050..d9f7b11 100644 --- a/src/test/java/io/openmessaging/connect/runtime/service/ClusterManagementServiceImplTest.java +++ b/src/test/java/io/openmessaging/connect/runtime/service/ClusterManagementServiceImplTest.java @@ -67,7 +67,7 @@ public class ClusterManagementServiceImplTest { public void init() { connectConfig = new ConnectConfig(); connectConfig.setHttpPort(8081); - connectConfig.setOmsDriverUrl("oms:rocketmq://localhost:9876/default:default"); + connectConfig.setRuntimeOmsDriverUrl("oms:rocketmq://localhost:9876/default:default"); connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore"); connectConfig.setWorkerId("testWorkerId"); doReturn(producer).when(messagingAccessPoint).createProducer(); diff --git a/src/test/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImplTest.java b/src/test/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImplTest.java index 6c035b3..82e6117 100644 --- a/src/test/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImplTest.java +++ b/src/test/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImplTest.java @@ -88,14 +88,14 @@ public class ConfigManagementServiceImplTest { public void init() throws Exception { connectConfig = new ConnectConfig(); connectConfig.setHttpPort(8081); - connectConfig.setOmsDriverUrl("oms:rocketmq://localhost:9876/default:default"); + connectConfig.setRuntimeOmsDriverUrl("oms:rocketmq://localhost:9876/default:default"); connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore"); connectConfig.setWorkerId("testWorkerId"); connectorName = "testConnectorName"; connectKeyValue = new ConnectKeyValue(); connectKeyValue.put(RuntimeConfigDefine.CONNECTOR_CLASS, "io.openmessaging.connect.runtime.service.TestConnector"); - connectKeyValue.put(RuntimeConfigDefine.OMS_DRIVER_URL, "oms:rocketmq://localhost:9876/default:default"); + connectKeyValue.put(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL, "oms:rocketmq://localhost:9876/default:default"); connectKeyValue.put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, "source-record-converter"); diff --git a/src/test/java/io/openmessaging/connect/runtime/service/PositionManagementServiceImplTest.java b/src/test/java/io/openmessaging/connect/runtime/service/PositionManagementServiceImplTest.java index eb6b737..b437d76 100644 --- a/src/test/java/io/openmessaging/connect/runtime/service/PositionManagementServiceImplTest.java +++ b/src/test/java/io/openmessaging/connect/runtime/service/PositionManagementServiceImplTest.java @@ -84,7 +84,7 @@ public class PositionManagementServiceImplTest { public void init() throws Exception { connectConfig = new ConnectConfig(); connectConfig.setHttpPort(8081); - connectConfig.setOmsDriverUrl("oms:rocketmq://localhost:9876/default:default"); + connectConfig.setRuntimeOmsDriverUrl("oms:rocketmq://localhost:9876/default:default"); connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore"); connectConfig.setWorkerId("testWorkerId"); doReturn(producer).when(messagingAccessPoint).createProducer(); diff --git a/src/test/java/io/openmessaging/connect/runtime/utils/TransferUtilsTest.java b/src/test/java/io/openmessaging/connect/runtime/utils/TransferUtilsTest.java index 1839a4e..e360947 100644 --- a/src/test/java/io/openmessaging/connect/runtime/utils/TransferUtilsTest.java +++ b/src/test/java/io/openmessaging/connect/runtime/utils/TransferUtilsTest.java @@ -70,7 +70,7 @@ public void testToJsonStringToConnAndTaskConfigs() { String connectName = "testConnector"; ConnectKeyValue connectKeyValue = new ConnectKeyValue(); connectKeyValue.put(RuntimeConfigDefine.CONNECTOR_CLASS, "io.openmessaging.connect.runtime.service.TestConnector"); - connectKeyValue.put(RuntimeConfigDefine.OMS_DRIVER_URL, "oms:rocketmq://localhost:9876/default:default"); + connectKeyValue.put(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL, "oms:rocketmq://localhost:9876/default:default"); connectKeyValue.put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, "source-record-converter"); List connectKeyValues = new ArrayList(8) { { @@ -108,7 +108,7 @@ public void testToJsonStringToConnAndTaskConfigs() { assertNotNull(connectKeyValue1); assertEquals("io.openmessaging.connect.runtime.service.TestConnector", connectKeyValue1.getString(RuntimeConfigDefine.CONNECTOR_CLASS)); - assertEquals("oms:rocketmq://localhost:9876/default:default", connectKeyValue1.getString(RuntimeConfigDefine.OMS_DRIVER_URL)); + assertEquals("oms:rocketmq://localhost:9876/default:default", connectKeyValue1.getString(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL)); assertEquals("source-record-converter", connectKeyValue1.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER)); Map> taskConfigs1 = connAndTaskConfigs1.getTaskConfigs(); @@ -119,7 +119,7 @@ public void testToJsonStringToConnAndTaskConfigs() { ConnectKeyValue connectKeyValue2 = connectKeyValues1.get(0); assertNotNull(connectKeyValue2); assertEquals("io.openmessaging.connect.runtime.service.TestConnector", connectKeyValue2.getString(RuntimeConfigDefine.CONNECTOR_CLASS)); - assertEquals("oms:rocketmq://localhost:9876/default:default", connectKeyValue2.getString(RuntimeConfigDefine.OMS_DRIVER_URL)); + assertEquals("oms:rocketmq://localhost:9876/default:default", connectKeyValue2.getString(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL)); assertEquals("source-record-converter", connectKeyValue2.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER)); } From 537b9a8bd5e7c3f3c6809bb449ce140f038d497d Mon Sep 17 00:00:00 2001 From: huzongtang Date: Fri, 12 Apr 2019 15:12:21 +0800 Subject: [PATCH 2/4] [issue#3]Rename some class file for runtime. --- run_worker.sh | 2 +- ...Controller.java => RuntimeController.java} | 8 ++--- ...onnectStartup.java => RuntimeStartup.java} | 8 ++--- .../converter/ConnAndTaskConfigConverter.java | 10 +++---- .../connect/runtime/rest/RestHandler.java | 30 +++++++++---------- .../{TransferUtils.java => TransferUtil.java} | 2 +- .../connect/runtime/rest/RestHandlerTest.java | 14 ++++----- ...erUtilsTest.java => TransferUtilTest.java} | 18 +++++------ 8 files changed, 46 insertions(+), 46 deletions(-) rename src/main/java/io/openmessaging/connect/runtime/{ConnectController.java => RuntimeController.java} (96%) rename src/main/java/io/openmessaging/connect/runtime/{ConnectStartup.java => RuntimeStartup.java} (95%) rename src/main/java/io/openmessaging/connect/runtime/utils/{TransferUtils.java => TransferUtil.java} (99%) rename src/test/java/io/openmessaging/connect/runtime/utils/{TransferUtilsTest.java => TransferUtilTest.java} (88%) diff --git a/run_worker.sh b/run_worker.sh index 466fc3c..e977640 100755 --- a/run_worker.sh +++ b/run_worker.sh @@ -1,5 +1,5 @@ #!/bin/bash export OMS_RMQ_DIRECT_NAME_SRV=true echo "run rumtime worker" -cd target/distribution/ && java -cp .:./conf/:./lib/* io.openmessaging.connect.runtime.ConnectStartup -c conf/runtime.conf +cd target/distribution/ && java -cp .:./conf/:./lib/* io.openmessaging.connect.runtime.RuntimeStartup -c conf/runtime.conf diff --git a/src/main/java/io/openmessaging/connect/runtime/ConnectController.java b/src/main/java/io/openmessaging/connect/runtime/RuntimeController.java similarity index 96% rename from src/main/java/io/openmessaging/connect/runtime/ConnectController.java rename to src/main/java/io/openmessaging/connect/runtime/RuntimeController.java index 8bfc740..104070b 100644 --- a/src/main/java/io/openmessaging/connect/runtime/ConnectController.java +++ b/src/main/java/io/openmessaging/connect/runtime/RuntimeController.java @@ -33,7 +33,7 @@ /** * Connect controller to access and control all resource in runtime. */ -public class ConnectController { +public class RuntimeController { private static final Logger log = LoggerFactory.getLogger(LoggerName.OMS_RUNTIME); @@ -87,7 +87,7 @@ public class ConnectController { */ private ScheduledExecutorService scheduledExecutorService; - public ConnectController(ConnectConfig connectConfig) { + public RuntimeController(ConnectConfig connectConfig) { this.connectConfig = connectConfig; this.messagingAccessWrapper = new MessagingAccessWrapper(); @@ -118,7 +118,7 @@ public void start() { this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { - ConnectController.this.configManagementService.persist(); + RuntimeController.this.configManagementService.persist(); } catch (Exception e) { log.error("schedule persist config error.", e); } @@ -128,7 +128,7 @@ public void start() { this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { - ConnectController.this.positionManagementService.persist(); + RuntimeController.this.positionManagementService.persist(); } catch (Exception e) { log.error("schedule persist position error.", e); } diff --git a/src/main/java/io/openmessaging/connect/runtime/ConnectStartup.java b/src/main/java/io/openmessaging/connect/runtime/RuntimeStartup.java similarity index 95% rename from src/main/java/io/openmessaging/connect/runtime/ConnectStartup.java rename to src/main/java/io/openmessaging/connect/runtime/RuntimeStartup.java index a1390f9..7b2bbd4 100644 --- a/src/main/java/io/openmessaging/connect/runtime/ConnectStartup.java +++ b/src/main/java/io/openmessaging/connect/runtime/RuntimeStartup.java @@ -36,7 +36,7 @@ /** * Startup class of the runtime worker. */ -public class ConnectStartup { +public class RuntimeStartup { private static final Logger log = LoggerFactory.getLogger(LoggerName.OMS_RUNTIME); @@ -51,7 +51,7 @@ public static void main(String[] args) { start(createConnectController(args)); } - private static void start(ConnectController controller) { + private static void start(RuntimeController controller) { try { controller.start(); @@ -70,7 +70,7 @@ private static void start(ConnectController controller) { * @param args * @return */ - private static ConnectController createConnectController(String[] args) { + private static RuntimeController createConnectController(String[] args) { try { @@ -99,7 +99,7 @@ private static ConnectController createConnectController(String[] args) { } // Create controller and initialize. - ConnectController controller = new ConnectController(connectConfig); + RuntimeController controller = new RuntimeController(connectConfig); controller.initialize(); // Invoked when shutdown. diff --git a/src/main/java/io/openmessaging/connect/runtime/converter/ConnAndTaskConfigConverter.java b/src/main/java/io/openmessaging/connect/runtime/converter/ConnAndTaskConfigConverter.java index ff5c187..c2376fc 100644 --- a/src/main/java/io/openmessaging/connect/runtime/converter/ConnAndTaskConfigConverter.java +++ b/src/main/java/io/openmessaging/connect/runtime/converter/ConnAndTaskConfigConverter.java @@ -19,7 +19,7 @@ import io.openmessaging.connect.runtime.common.ConnAndTaskConfigs; import io.openmessaging.connect.runtime.common.LoggerName; -import io.openmessaging.connect.runtime.utils.TransferUtils; +import io.openmessaging.connect.runtime.utils.TransferUtil; import io.openmessaging.connector.api.data.Converter; import java.io.UnsupportedEncodingException; import java.util.HashMap; @@ -41,12 +41,12 @@ public byte[] objectToByte(ConnAndTaskConfigs object) { Map connectorMap = new HashMap<>(); Map taskMap = new HashMap<>(); for(String key : configs.getConnectorConfigs().keySet()){ - connectorMap.put(key, TransferUtils.keyValueToString(configs.getConnectorConfigs().get(key))); + connectorMap.put(key, TransferUtil.keyValueToString(configs.getConnectorConfigs().get(key))); } for(String key : configs.getTaskConfigs().keySet()){ - taskMap.put(key, TransferUtils.keyValueListToString(configs.getTaskConfigs().get(key))); + taskMap.put(key, TransferUtil.keyValueListToString(configs.getTaskConfigs().get(key))); } - return TransferUtils.toJsonString(connectorMap, taskMap).getBytes("UTF-8"); + return TransferUtil.toJsonString(connectorMap, taskMap).getBytes("UTF-8"); } catch (Exception e) { log.error("ConnAndTaskConfigConverter#objectToByte failed", e); } @@ -58,7 +58,7 @@ public ConnAndTaskConfigs byteToObject(byte[] bytes) { try { String jsonString = new String(bytes, "UTF-8"); - ConnAndTaskConfigs configs = TransferUtils.toConnAndTaskConfigs(jsonString); + ConnAndTaskConfigs configs = TransferUtil.toConnAndTaskConfigs(jsonString); return configs; } catch (UnsupportedEncodingException e) { log.error("ConnAndTaskConfigConverter#byteToObject failed", e); diff --git a/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java b/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java index 199d454..d98d7ec 100644 --- a/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java +++ b/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java @@ -20,7 +20,7 @@ import com.alibaba.fastjson.JSON; import io.javalin.Context; import io.javalin.Javalin; -import io.openmessaging.connect.runtime.ConnectController; +import io.openmessaging.connect.runtime.RuntimeController; import io.openmessaging.connect.runtime.common.ConnectKeyValue; import io.openmessaging.connect.runtime.common.LoggerName; import io.openmessaging.connect.runtime.connectorwrapper.WorkerConnector; @@ -38,11 +38,11 @@ public class RestHandler { private static final Logger log = LoggerFactory.getLogger(LoggerName.OMS_RUNTIME); - private final ConnectController connectController; + private final RuntimeController runtimeController; - public RestHandler(ConnectController connectController){ - this.connectController = connectController; - Javalin app = Javalin.start(connectController.getConnectConfig().getHttpPort()); + public RestHandler(RuntimeController runtimeController){ + this.runtimeController = runtimeController; + Javalin app = Javalin.start(runtimeController.getConnectConfig().getHttpPort()); app.get("/connectors/:connectorName", this::handleCreateConnector); app.get("/connectors/:connectorName/config", this::handleQueryConnectorConfig); app.get("/connectors/:connectorName/status", this::handleQueryConnectorStatus); @@ -54,8 +54,8 @@ public RestHandler(ConnectController connectController){ private void getAllocatedInfo(Context context){ - Set workerConnectors = connectController.getWorker().getWorkingConnectors(); - Set workerSourceTasks = connectController.getWorker().getWorkingTasks(); + Set workerConnectors = runtimeController.getWorker().getWorkingConnectors(); + Set workerSourceTasks = runtimeController.getWorker().getWorkingTasks(); StringBuilder sb = new StringBuilder(); sb.append("working connectors:\n"); for(WorkerConnector workerConnector : workerConnectors){ @@ -70,13 +70,13 @@ private void getAllocatedInfo(Context context){ private void getConfigInfo(Context context) { - Map connectorConfigs = connectController.getConfigManagementService().getConnectorConfigs(); - Map> taskConfigs = connectController.getConfigManagementService().getTaskConfigs(); + Map connectorConfigs = runtimeController.getConfigManagementService().getConnectorConfigs(); + Map> taskConfigs = runtimeController.getConfigManagementService().getTaskConfigs(); context.result("ConnectorConfigs:"+JSON.toJSONString(connectorConfigs)+"\nTaskConfigs:"+JSON.toJSONString(taskConfigs)); } private void getClusterInfo(Context context) { - context.result(JSON.toJSONString(connectController.getClusterManagementService().getAllAliveWorkers())); + context.result(JSON.toJSONString(runtimeController.getClusterManagementService().getAllAliveWorkers())); } private void handleCreateConnector(Context context) { @@ -89,7 +89,7 @@ private void handleCreateConnector(Context context) { } try { - String result = connectController.getConfigManagementService().putConnectorConfig(connectorName, configs); + String result = runtimeController.getConfigManagementService().putConnectorConfig(connectorName, configs); if(result != null && result.length() > 0){ context.result(result); }else{ @@ -104,8 +104,8 @@ private void handleQueryConnectorConfig(Context context){ String connectorName = context.param("connectorName"); - Map connectorConfigs = connectController.getConfigManagementService().getConnectorConfigs(); - Map> taskConfigs = connectController.getConfigManagementService().getTaskConfigs(); + Map connectorConfigs = runtimeController.getConfigManagementService().getConnectorConfigs(); + Map> taskConfigs = runtimeController.getConfigManagementService().getTaskConfigs(); StringBuilder sb = new StringBuilder(); sb.append("ConnectorConfigs:") .append(JSON.toJSONString(connectorConfigs.get(connectorName))) @@ -118,7 +118,7 @@ private void handleQueryConnectorConfig(Context context){ private void handleQueryConnectorStatus(Context context){ String connectorName = context.param("connectorName"); - Map connectorConfigs = connectController.getConfigManagementService().getConnectorConfigs(); + Map connectorConfigs = runtimeController.getConfigManagementService().getConnectorConfigs(); if(connectorConfigs.containsKey(connectorName)){ context.result("running"); @@ -131,7 +131,7 @@ private void handleStopConnector(Context context){ String connectorName = context.param("connectorName"); try { - connectController.getConfigManagementService().removeConnectorConfig(connectorName); + runtimeController.getConfigManagementService().removeConnectorConfig(connectorName); context.result("success"); } catch (Exception e) { context.result("failed"); diff --git a/src/main/java/io/openmessaging/connect/runtime/utils/TransferUtils.java b/src/main/java/io/openmessaging/connect/runtime/utils/TransferUtil.java similarity index 99% rename from src/main/java/io/openmessaging/connect/runtime/utils/TransferUtils.java rename to src/main/java/io/openmessaging/connect/runtime/utils/TransferUtil.java index 4b159e2..6914f7d 100644 --- a/src/main/java/io/openmessaging/connect/runtime/utils/TransferUtils.java +++ b/src/main/java/io/openmessaging/connect/runtime/utils/TransferUtil.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; -public class TransferUtils { +public class TransferUtil { public static String keyValueToString(ConnectKeyValue keyValue) { diff --git a/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java b/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java index 8ec0c8c..4470a6a 100644 --- a/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java +++ b/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java @@ -19,7 +19,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import io.openmessaging.connect.runtime.ConnectController; +import io.openmessaging.connect.runtime.RuntimeController; import io.openmessaging.connect.runtime.common.ConnectKeyValue; import io.openmessaging.connect.runtime.config.ConnectConfig; import io.openmessaging.connect.runtime.config.RuntimeConfigDefine; @@ -63,7 +63,7 @@ public class RestHandlerTest { @Mock - private ConnectController connectController; + private RuntimeController runtimeController; @Mock private ConfigManagementService configManagementService; @@ -129,9 +129,9 @@ public class RestHandlerTest { @Before public void init() throws Exception { - when(connectController.getConnectConfig()).thenReturn(connectConfig); + when(runtimeController.getConnectConfig()).thenReturn(connectConfig); when(connectConfig.getHttpPort()).thenReturn(8081); - when(connectController.getConfigManagementService()).thenReturn(configManagementService); + when(runtimeController.getConfigManagementService()).thenReturn(configManagementService); when(configManagementService.putConnectorConfig(anyString(), any(ConnectKeyValue.class))).thenReturn(""); String connectName = "testConnector"; @@ -170,7 +170,7 @@ public void init() throws Exception { } }; - when(connectController.getClusterManagementService()).thenReturn(clusterManagementService); + when(runtimeController.getClusterManagementService()).thenReturn(clusterManagementService); when(clusterManagementService.getAllAliveWorkers()).thenReturn(aliveWorker); sourcePartition = "127.0.0.13306".getBytes("UTF-8"); @@ -200,11 +200,11 @@ public void init() throws Exception { add(workerSourceTask2); } }; - when(connectController.getWorker()).thenReturn(worker); + when(runtimeController.getWorker()).thenReturn(worker); when(worker.getWorkingConnectors()).thenReturn(workerConnectors); when(worker.getWorkingTasks()).thenReturn(workerSourceTasks); - restHandler = new RestHandler(connectController); + restHandler = new RestHandler(runtimeController); httpClient = HttpClientBuilder.create().build(); } diff --git a/src/test/java/io/openmessaging/connect/runtime/utils/TransferUtilsTest.java b/src/test/java/io/openmessaging/connect/runtime/utils/TransferUtilTest.java similarity index 88% rename from src/test/java/io/openmessaging/connect/runtime/utils/TransferUtilsTest.java rename to src/test/java/io/openmessaging/connect/runtime/utils/TransferUtilTest.java index e360947..804eb91 100644 --- a/src/test/java/io/openmessaging/connect/runtime/utils/TransferUtilsTest.java +++ b/src/test/java/io/openmessaging/connect/runtime/utils/TransferUtilTest.java @@ -26,7 +26,7 @@ import static org.junit.Assert.*; -public class TransferUtilsTest { +public class TransferUtilTest { @Test public void testKeyValue2StringKeyValue() { @@ -35,8 +35,8 @@ public void testKeyValue2StringKeyValue() { connectKeyValue.put("key2", 2L); connectKeyValue.put("key3", 3.0); connectKeyValue.put("key4", "4"); - String s = TransferUtils.keyValueToString(connectKeyValue); - ConnectKeyValue connectKeyValue1 = TransferUtils.stringToKeyValue(s); + String s = TransferUtil.keyValueToString(connectKeyValue); + ConnectKeyValue connectKeyValue1 = TransferUtil.stringToKeyValue(s); assertEquals(1, connectKeyValue1.getInt("key1")); assertEquals(2L, connectKeyValue1.getLong("key2")); assertTrue(Objects.equals(3.0, connectKeyValue1.getDouble("key3"))); @@ -55,8 +55,8 @@ public void testKeyValueList2String2KeyValueList() { add(connectKeyValue); } }; - String s = TransferUtils.keyValueListToString(connectKeyValues); - List connectKeyValues1 = TransferUtils.stringToKeyValueList(s); + String s = TransferUtil.keyValueListToString(connectKeyValues); + List connectKeyValues1 = TransferUtil.stringToKeyValueList(s); assertNotNull(connectKeyValues1); ConnectKeyValue connectKeyValue1 = connectKeyValues1.get(0); assertEquals(1, connectKeyValue1.getInt("key1")); @@ -93,13 +93,13 @@ public void testToJsonStringToConnAndTaskConfigs() { Map connectorMap = new HashMap<>(); Map taskMap = new HashMap<>(); for (String key : connAndTaskConfigs.getConnectorConfigs().keySet()) { - connectorMap.put(key, TransferUtils.keyValueToString(connAndTaskConfigs.getConnectorConfigs().get(key))); + connectorMap.put(key, TransferUtil.keyValueToString(connAndTaskConfigs.getConnectorConfigs().get(key))); } for (String key : connAndTaskConfigs.getTaskConfigs().keySet()) { - taskMap.put(key, TransferUtils.keyValueListToString(connAndTaskConfigs.getTaskConfigs().get(key))); + taskMap.put(key, TransferUtil.keyValueListToString(connAndTaskConfigs.getTaskConfigs().get(key))); } - String s = TransferUtils.toJsonString(connectorMap, taskMap); - ConnAndTaskConfigs connAndTaskConfigs1 = TransferUtils.toConnAndTaskConfigs(s); + String s = TransferUtil.toJsonString(connectorMap, taskMap); + ConnAndTaskConfigs connAndTaskConfigs1 = TransferUtil.toConnAndTaskConfigs(s); Map connectorConfigs1 = connAndTaskConfigs1.getConnectorConfigs(); assertNotNull(connAndTaskConfigs1); From bf5f7330192c4cae6e4193b11c5a49317de50f18 Mon Sep 17 00:00:00 2001 From: huzongtang Date: Sat, 13 Apr 2019 10:38:54 +0800 Subject: [PATCH 3/4] [issue#3]return sink and source omsDriverUrl for runtime. --- .../connect/runtime/config/ConnectConfig.java | 26 ------------------- .../runtime/config/RuntimeConfigDefine.java | 14 +--------- src/main/resources/runtime.conf | 5 +--- .../runtime/config/ConnectConfigTest.java | 4 --- .../runtime/connectorwrapper/WorkerTest.java | 2 -- 5 files changed, 2 insertions(+), 49 deletions(-) diff --git a/src/main/java/io/openmessaging/connect/runtime/config/ConnectConfig.java b/src/main/java/io/openmessaging/connect/runtime/config/ConnectConfig.java index 15f26fb..296f03a 100644 --- a/src/main/java/io/openmessaging/connect/runtime/config/ConnectConfig.java +++ b/src/main/java/io/openmessaging/connect/runtime/config/ConnectConfig.java @@ -34,22 +34,12 @@ public class ConnectConfig { */ private String storePathRootDir = System.getProperty("user.home") + File.separator + "connectorStore"; - /** - * OMS driver url for source task, which determine the specific source task to pull message data from where. - */ - private String sourceOmsDriverUrl = "oms:rocketmq://localhost:9876/default:default"; - /** * OMS driver url, which determine the specific MQ to send and consume message. * The MQ is used for internal management of the connect runtime. */ private String runtimeOmsDriverUrl = "oms:rocketmq://localhost:9876/default:default"; - /** - * OMS driver url for sink task, which determine the specific source task to send message data to where. - */ - private String sinkOmsDriverUrl = "oms:rocketmq://localhost:9876/default:default"; - /** * Http port for REST API. */ @@ -105,14 +95,6 @@ public void setConfigPersistInterval(int configPersistInterval) { this.configPersistInterval = configPersistInterval; } - public String getSourceOmsDriverUrl() { - return sourceOmsDriverUrl; - } - - public void setSourceOmsDriverUrl(String sourceOmsDriverUrl) { - this.sourceOmsDriverUrl = sourceOmsDriverUrl; - } - public String getRuntimeOmsDriverUrl() { return runtimeOmsDriverUrl; } @@ -120,12 +102,4 @@ public String getRuntimeOmsDriverUrl() { public void setRuntimeOmsDriverUrl(String runtimeOmsDriverUrl) { this.runtimeOmsDriverUrl = runtimeOmsDriverUrl; } - - public String getSinkOmsDriverUrl() { - return sinkOmsDriverUrl; - } - - public void setSinkOmsDriverUrl(String sinkOmsDriverUrl) { - this.sinkOmsDriverUrl = sinkOmsDriverUrl; - } } diff --git a/src/main/java/io/openmessaging/connect/runtime/config/RuntimeConfigDefine.java b/src/main/java/io/openmessaging/connect/runtime/config/RuntimeConfigDefine.java index 2079d1c..077e4e6 100644 --- a/src/main/java/io/openmessaging/connect/runtime/config/RuntimeConfigDefine.java +++ b/src/main/java/io/openmessaging/connect/runtime/config/RuntimeConfigDefine.java @@ -32,17 +32,7 @@ public class RuntimeConfigDefine { public static final String CONNECTOR_CLASS = "connector-class"; public static final String TASK_CLASS = "task-class"; - - /** - * OMS driver url for the source connector - */ - public static final String SOURCE_OMS_DRIVER_URL = "source-oms-driver-url"; - - /** - * OMS driver url for the sink connector - */ - public static final String SINK_OMS_DRIVER_URL = "sink-oms-driver-url"; - + /** * OMS driver url for the runtime connector */ @@ -70,8 +60,6 @@ public class RuntimeConfigDefine { { add(CONNECTOR_CLASS); add(SOURCE_RECORD_CONVERTER); - add(SOURCE_OMS_DRIVER_URL); - add(SINK_OMS_DRIVER_URL); add(RUNTIME_OMS_DRIVER_URL); } }; diff --git a/src/main/resources/runtime.conf b/src/main/resources/runtime.conf index 67603c1..8fda7aa 100644 --- a/src/main/resources/runtime.conf +++ b/src/main/resources/runtime.conf @@ -22,8 +22,5 @@ httpPort=8081 ## local file dir for config store storePathRootDir=./storeRoot/ -## From logical degree,there are threes MQ cluster -## the runtime server omsDriverUrl to support runtime data synchronize -sourceOmsDriverUrl=oms:rocketmq://localhost:9876/default:default +## Choose a MQ to support runtime data synchronize runtimeOmsDriverUrl=oms:rocketmq://localhost:9876/default:default -sinkOmsDriverUrl=oms:rocketmq://localhost:9876/default:default diff --git a/src/test/java/io/openmessaging/connect/runtime/config/ConnectConfigTest.java b/src/test/java/io/openmessaging/connect/runtime/config/ConnectConfigTest.java index 68e1b0e..eb5c6a0 100644 --- a/src/test/java/io/openmessaging/connect/runtime/config/ConnectConfigTest.java +++ b/src/test/java/io/openmessaging/connect/runtime/config/ConnectConfigTest.java @@ -28,13 +28,9 @@ public void testConnectConfigAttribute() { ConnectConfig connectConfig = new ConnectConfig(); connectConfig.setHttpPort(8081); connectConfig.setWorkerId("DEFAULT_WORKER_1"); - connectConfig.setSourceOmsDriverUrl("oms:rocketmq://localhost:9876/default1:default1"); - connectConfig.setSinkOmsDriverUrl("oms:rocketmq://localhost:9876/default2:default2"); connectConfig.setRuntimeOmsDriverUrl("oms:rocketmq://localhost:9876/default3:default3"); assertThat(connectConfig.getHttpPort()).isEqualTo(8081); assertThat(connectConfig.getWorkerId()).isEqualTo("DEFAULT_WORKER_1"); - assertThat(connectConfig.getSourceOmsDriverUrl()).isEqualTo("oms:rocketmq://localhost:9876/default1:default1"); - assertThat(connectConfig.getSinkOmsDriverUrl()).isEqualTo("oms:rocketmq://localhost:9876/default2:default2"); assertThat(connectConfig.getRuntimeOmsDriverUrl()).isEqualTo("oms:rocketmq://localhost:9876/default3:default3"); } diff --git a/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java b/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java index 9d78039..ecc47b5 100644 --- a/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java +++ b/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java @@ -65,8 +65,6 @@ public class WorkerTest { public void init() { connectConfig = new ConnectConfig(); connectConfig.setHttpPort(8081); - connectConfig.setSinkOmsDriverUrl("oms:rocketmq://localhost:9876/default:default"); - connectConfig.setSourceOmsDriverUrl("oms:rocketmq://localhost:9876/default:default"); connectConfig.setRuntimeOmsDriverUrl("oms:rocketmq://localhost:9876/default:default"); connectConfig.setWorkerId("DEFAULT_WORKER_1"); connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore"); From d3755db379ebc501bde9a776ca5ef30b15686b04 Mon Sep 17 00:00:00 2001 From: huzongtang Date: Sat, 13 Apr 2019 16:10:10 +0800 Subject: [PATCH 4/4] fix post and delete restful api in the RestHandler Class. --- .../connect/runtime/rest/RestHandler.java | 6 +- .../connect/runtime/rest/RestHandlerTest.java | 66 ++++++++++--------- 2 files changed, 38 insertions(+), 34 deletions(-) diff --git a/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java b/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java index d98d7ec..5c9fd17 100644 --- a/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java +++ b/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java @@ -43,10 +43,10 @@ public class RestHandler { public RestHandler(RuntimeController runtimeController){ this.runtimeController = runtimeController; Javalin app = Javalin.start(runtimeController.getConnectConfig().getHttpPort()); - app.get("/connectors/:connectorName", this::handleCreateConnector); + app.post("/connectors/:connectorName", this::handleCreateConnector); app.get("/connectors/:connectorName/config", this::handleQueryConnectorConfig); app.get("/connectors/:connectorName/status", this::handleQueryConnectorStatus); - app.get("/connectors/:connectorName/stop", this::handleStopConnector); + app.delete("/connectors/:connectorName/stop", this::handleStopConnector); app.get("/getClusterInfo", this::getClusterInfo); app.get("/getConfigInfo", this::getConfigInfo); app.get("/getAllocatedInfo", this::getAllocatedInfo); @@ -96,6 +96,7 @@ private void handleCreateConnector(Context context) { context.result("success"); } } catch (Exception e) { + log.error("oms connect runtime create the connector exception, ", e); context.result("failed"); } } @@ -134,6 +135,7 @@ private void handleStopConnector(Context context){ runtimeController.getConfigManagementService().removeConnectorConfig(connectorName); context.result("success"); } catch (Exception e) { + log.error("oms connect runtime stop the connector exception, ", e); context.result("failed"); } } diff --git a/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java b/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java index 4470a6a..61c6d4b 100644 --- a/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java +++ b/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java @@ -44,7 +44,9 @@ import java.util.Set; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; import org.apache.http.client.utils.URIBuilder; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; @@ -211,40 +213,40 @@ public void init() throws Exception { @Test public void testRESTful() throws Exception { - URIBuilder uriBuilder = new URIBuilder(String.format(CREATE_CONNECTOR_URL, "testConnectorName")); - uriBuilder.setParameter("config", "{\"connector-class\": \"org.apache.rocketmq.mysql.connector.MysqlConnector\",\"mysqlAddr\": \"112.74.179.68\",\"mysqlPort\": \"3306\",\"mysqlUsername\": \"canal\",\"mysqlPassword\": \"canal\",\"source-record-converter\":\"io.openmessaging.connect.runtime.converter.JsonConverter\",\"oms-driver-url\":\"oms:rocketmq://localhost:9876/default:default\"}"); - URI uri = uriBuilder.build(); - HttpGet httpGet = new HttpGet(uri); - HttpResponse httpResponse = httpClient.execute(httpGet); - assertEquals(200, httpResponse.getStatusLine().getStatusCode()); - assertEquals("success", EntityUtils.toString(httpResponse.getEntity(), "UTF-8")); - - URIBuilder uriBuilder1 = new URIBuilder(String.format(STOP_CONNECTOR_URL, "testConnectorName")); - URI uri1 = uriBuilder1.build(); - HttpGet httpGet1 = new HttpGet(uri1); - HttpResponse httpResponse1 = httpClient.execute(httpGet1); - assertEquals(200, httpResponse1.getStatusLine().getStatusCode()); - assertEquals("success", EntityUtils.toString(httpResponse1.getEntity(), "UTF-8")); - - URIBuilder uriBuilder2 = new URIBuilder(GET_CLUSTER_INFO_URL); - URI uri2 = uriBuilder2.build(); - HttpGet httpGet2 = new HttpGet(uri2); - HttpResponse httpResponse2 = httpClient.execute(httpGet2); - assertEquals(200, httpResponse2.getStatusLine().getStatusCode()); - assertEquals(JSON.toJSONString(aliveWorker), EntityUtils.toString(httpResponse2.getEntity(), "UTF-8")); - - URIBuilder uriBuilder3 = new URIBuilder(GET_CONFIG_INFO_URL); - URI uri3 = uriBuilder3.build(); - HttpGet httpGet3 = new HttpGet(uri3); - HttpResponse httpResponse3 = httpClient.execute(httpGet3); - assertEquals(200, httpResponse3.getStatusLine().getStatusCode()); + URIBuilder uriCreateBuilder = new URIBuilder(String.format(CREATE_CONNECTOR_URL, "testConnectorName")); + uriCreateBuilder.setParameter("config", "{\"connector-class\": \"org.apache.rocketmq.mysql.connector.MysqlConnector\",\"mysqlAddr\": \"112.74.179.68\",\"mysqlPort\": \"3306\",\"mysqlUsername\": \"canal\",\"mysqlPassword\": \"canal\",\"source-record-converter\":\"io.openmessaging.connect.runtime.converter.JsonConverter\",\"oms-driver-url\":\"oms:rocketmq://localhost:9876/default:default\"}"); + URI uriPost = uriCreateBuilder.build(); + HttpPost httpPost = new HttpPost(uriPost); + HttpResponse httpPostResponse = httpClient.execute(httpPost); + assertEquals(200, httpPostResponse.getStatusLine().getStatusCode()); + assertEquals("success", EntityUtils.toString(httpPostResponse.getEntity(), "UTF-8")); + + URIBuilder uriDeleteBuilder = new URIBuilder(String.format(STOP_CONNECTOR_URL, "testConnectorName")); + URI uriDelete = uriDeleteBuilder.build(); + HttpDelete httpDelete = new HttpDelete(uriDelete); + HttpResponse httpDeleteResponse = httpClient.execute(httpDelete); + assertEquals(200, httpDeleteResponse.getStatusLine().getStatusCode()); + assertEquals("success", EntityUtils.toString(httpDeleteResponse.getEntity(), "UTF-8")); + + URIBuilder uriClusterInfoBuilder = new URIBuilder(GET_CLUSTER_INFO_URL); + URI uriClusterInfo = uriClusterInfoBuilder.build(); + HttpGet httpClusterInfo = new HttpGet(uriClusterInfo); + HttpResponse httpClusterInfoResponse = httpClient.execute(httpClusterInfo); + assertEquals(200, httpClusterInfoResponse.getStatusLine().getStatusCode()); + assertEquals(JSON.toJSONString(aliveWorker), EntityUtils.toString(httpClusterInfoResponse.getEntity(), "UTF-8")); + + URIBuilder uriConfigInfoBuilder = new URIBuilder(GET_CONFIG_INFO_URL); + URI uriConfigInfo = uriConfigInfoBuilder.build(); + HttpGet httpConfigInfo = new HttpGet(uriConfigInfo); + HttpResponse httpConfigInfoResponse = httpClient.execute(httpConfigInfo); + assertEquals(200, httpConfigInfoResponse.getStatusLine().getStatusCode()); String expectedResultConfig = "ConnectorConfigs:" + JSON.toJSONString(connectorConfigs) + "\nTaskConfigs:" + JSON.toJSONString(taskConfigs); - assertEquals(expectedResultConfig, EntityUtils.toString(httpResponse3.getEntity(), "UTF-8")); + assertEquals(expectedResultConfig, EntityUtils.toString(httpConfigInfoResponse.getEntity(), "UTF-8")); - URIBuilder uriBuilder4 = new URIBuilder(GET_ALLOCATED_INFO_URL); - URI uri4 = uriBuilder4.build(); - HttpGet httpGet4 = new HttpGet(uri4); - HttpResponse httpResponse4 = httpClient.execute(httpGet4); + URIBuilder uriAllocatedInfoBuilder = new URIBuilder(GET_ALLOCATED_INFO_URL); + URI uriAllocatedInfo = uriAllocatedInfoBuilder.build(); + HttpGet httpAllocatedInfo = new HttpGet(uriAllocatedInfo); + HttpResponse httpResponse4 = httpClient.execute(httpAllocatedInfo); assertEquals(200, httpResponse4.getStatusLine().getStatusCode()); StringBuilder sb = new StringBuilder(); sb.append("working connectors:\n");