Skip to content
This repository was archived by the owner on Dec 14, 2022. It is now read-only.

Commit 66ccaf1

Browse files
author
gavingaozhangmin
committed
emit Pulsar source and sink configuration property values to logs #335
1 parent 70d7ab1 commit 66ccaf1

File tree

2 files changed

+21
-0
lines changed

2 files changed

+21
-0
lines changed

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040

4141
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
4242

43+
import com.fasterxml.jackson.databind.ObjectMapper;
44+
import com.fasterxml.jackson.databind.ObjectWriter;
4345
import lombok.extern.slf4j.Slf4j;
4446
import org.apache.pulsar.client.admin.PulsarAdmin;
4547
import org.apache.pulsar.client.api.MessageId;
@@ -271,6 +273,14 @@ public void open(Configuration parameters) throws Exception {
271273
topic2Producer = new HashMap<>();
272274
}
273275
//super.open(parameters);
276+
277+
try {
278+
ObjectMapper m = new ObjectMapper();
279+
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
280+
log.info("Pulsar sink config: {}", w.writeValueAsString(properties));
281+
} catch (IOException e) {
282+
log.error("Failed to dump sink config info", e);
283+
}
274284
}
275285

276286
protected void initializeSendCallback() {

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,16 @@
6262

6363
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
6464

65+
import com.fasterxml.jackson.databind.ObjectMapper;
66+
import com.fasterxml.jackson.databind.ObjectWriter;
6567
import lombok.extern.slf4j.Slf4j;
6668
import org.apache.pulsar.client.api.MessageId;
6769
import org.apache.pulsar.client.api.PulsarClientException;
6870
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
6971
import org.apache.pulsar.shade.com.google.common.collect.Maps;
7072
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
7173

74+
import java.io.IOException;
7275
import java.util.ArrayList;
7376
import java.util.HashMap;
7477
import java.util.Iterator;
@@ -496,6 +499,14 @@ public void open(Configuration parameters) throws Exception {
496499
taskIndex, ownedTopicStarts.size(), ownedTopicStarts);
497500
}
498501
}
502+
503+
try {
504+
ObjectMapper m = new ObjectMapper();
505+
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
506+
log.info("Pulsar source config: {}", w.writeValueAsString(properties));
507+
} catch (IOException e) {
508+
log.error("Failed to dump source config info", e);
509+
}
499510
}
500511

501512
protected String getSubscriptionName() {

0 commit comments

Comments
 (0)