diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java index a90ffa77..24a03ef3 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisTableSinkFactory.java @@ -59,6 +59,7 @@ public List supportedProperties() { properties.add(REDIS_CLUSTER_PASSWORD); properties.add(REDIS_SENTINEL); properties.add(REDIS_KEY_TTL); + properties.add(REDIS_KEY_ADDITIONAL); // schema properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE); properties.add(SCHEMA + ".#." + SCHEMA_NAME); diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java index e398f619..b79ef26b 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/hanlder/RedisMapperHandler.java @@ -24,6 +24,7 @@ import java.lang.reflect.Constructor; import java.util.Map; +import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_KEY_ADDITIONAL; import static org.apache.flink.streaming.connectors.redis.descriptor.RedisValidator.REDIS_KEY_TTL; /** @@ -40,14 +41,21 @@ public interface RedisMapperHandler extends RedisHandler { */ default RedisMapper createRedisMapper(Map properties) { String ttl = properties.get(REDIS_KEY_TTL); + String additionalKey = properties.get(REDIS_KEY_ADDITIONAL); try { Class redisMapper = Class.forName(this.getClass().getCanonicalName()); - if (ttl == null) { - return (RedisMapper) redisMapper.newInstance(); + if (ttl != null) { + Constructor c = redisMapper.getConstructor(Integer.class); + return (RedisMapper) c.newInstance(Integer.parseInt(ttl)); } - Constructor c = redisMapper.getConstructor(Integer.class); - return (RedisMapper) c.newInstance(Integer.parseInt(ttl)); + + if (additionalKey != null) { + Constructor c = redisMapper.getConstructor(String.class); + return (RedisMapper) c.newInstance(additionalKey); + } + + return (RedisMapper) redisMapper.newInstance(); } catch (Exception e) { LOGGER.error("create redis mapper failed", e); throw new RuntimeException(e); diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java index bdaf9be4..6418321d 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RowRedisMapper.java @@ -41,6 +41,8 @@ public abstract class RowRedisMapper implements RedisMapper private Integer ttl; + private String additionalKey; + private RedisCommand redisCommand; public Integer getTtl() { @@ -67,6 +69,11 @@ public RowRedisMapper(int ttl, RedisCommand redisCommand) { this.redisCommand = redisCommand; } + public RowRedisMapper(String additionalKey, RedisCommand redisCommand) { + this.additionalKey = additionalKey; + this.redisCommand = redisCommand; + } + public RowRedisMapper(RedisCommand redisCommand) { this.redisCommand = redisCommand; } @@ -76,6 +83,9 @@ public RedisCommandDescription getCommandDescription() { if (ttl != null) { return new RedisCommandDescription(redisCommand, ttl); } + if (additionalKey != null) { + return new RedisCommandDescription(redisCommand, additionalKey); + } return new RedisCommandDescription(redisCommand); } diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/ZAddMapper.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/ZAddMapper.java index 3507b65f..f1f0c704 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/ZAddMapper.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/ZAddMapper.java @@ -28,4 +28,8 @@ public class ZAddMapper extends RowRedisMapper { public ZAddMapper() { super(RedisCommand.ZADD); } + + public ZAddMapper(String additionalKey) { + super(additionalKey, RedisCommand.ZADD); + } } diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/ZIncrByMapper.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/ZIncrByMapper.java index a9ea21e6..73fc5d65 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/ZIncrByMapper.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/ZIncrByMapper.java @@ -28,4 +28,7 @@ public class ZIncrByMapper extends RowRedisMapper { public ZIncrByMapper() { super(RedisCommand.ZINCRBY); } + public ZIncrByMapper(String additionalKey) { + super(additionalKey, RedisCommand.ZINCRBY); + } } diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java index 2b4aae20..9db8a152 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/descriptor/RedisValidator.java @@ -32,5 +32,6 @@ public class RedisValidator { public static final String SENTINELS_PASSWORD = "sentinels.password"; public static final String REDIS_CLUSTER_PASSWORD = "cluster.password"; public static final String REDIS_KEY_TTL = "key.ttl"; + public static final String REDIS_KEY_ADDITIONAL = "key.additional"; } diff --git a/flink-connector-redis/src/main/resources/META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler b/flink-connector-redis/src/main/resources/META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler index 7964e865..fdfee7db 100644 --- a/flink-connector-redis/src/main/resources/META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler +++ b/flink-connector-redis/src/main/resources/META-INF/services/org.apache.flink.streaming.connectors.redis.common.hanlder.RedisHandler @@ -25,5 +25,6 @@ org.apache.flink.streaming.connectors.redis.common.mapper.row.RPushMapper org.apache.flink.streaming.connectors.redis.common.mapper.row.SAddMapper org.apache.flink.streaming.connectors.redis.common.mapper.row.SetMapper org.apache.flink.streaming.connectors.redis.common.mapper.row.ZAddMapper +org.apache.flink.streaming.connectors.redis.common.mapper.row.ZIncrByMapper org.apache.flink.streaming.connectors.redis.common.config.handler.FlinkJedisClusterConfigHandler org.apache.flink.streaming.connectors.redis.common.config.handler.FlinkJedisSentinelConfigHandler \ No newline at end of file diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RedisTableSinkZIncrByTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RedisTableSinkZIncrByTest.java new file mode 100644 index 00000000..b722e05c --- /dev/null +++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/row/RedisTableSinkZIncrByTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper.row; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.redis.RedisITCaseBase; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.junit.Before; +import org.junit.Test; + +public class RedisTableSinkZIncrByTest extends RedisITCaseBase { + private static final String REDIS_KEY = "TEST_KEY"; + + StreamExecutionEnvironment env; + + @Before + public void setUp(){ + env = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + @Test + public void testRedisDescriptor() throws Exception { + DataStreamSource source = (DataStreamSource) env.addSource(new RedisTableSinkZIncrByTest.TestSourceFunctionString()) + .returns(new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(Long.class))); + + EnvironmentSettings settings = EnvironmentSettings + .newInstance() + .inStreamingMode() + .build(); + StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings); + tableEnvironment.registerDataStream("t1", source, "k, v"); + + tableEnvironment.executeSql("create table redis " + + "(k string, " + + "v bigint) " + + "with (" + + "'connector.type'='redis'," + + "'redis-mode'='cluster'," + + "'cluster-nodes'='"+String.format("%s:%s",REDIS_HOST, REDIS_PORT)+"'," + + "'command'='ZINCRBY'," + + "'key.additional'='ADDITIONAL')"); + + tableEnvironment.executeSql("insert into redis select k, v from t1"); + } + + private static class TestSourceFunctionString implements SourceFunction { + private static final long serialVersionUID = 1L; + + private volatile boolean running = true; + + @Override + public void run(SourceContext ctx) throws Exception { + while (running) { + Row row = new Row(2); + row.setField(0, REDIS_KEY); + row.setField(1, 2L); + ctx.collect(row); + Thread.sleep(2000L); + } + } + + @Override + public void cancel() { + running = false; + } + } +}