diff --git a/agent/agent-sink/src/main/java/com/didichuxing/datachannel/agent/sink/kafkaSink/KafkaSink.java b/agent/agent-sink/src/main/java/com/didichuxing/datachannel/agent/sink/kafkaSink/KafkaSink.java index ced50468..570f2c26 100644 --- a/agent/agent-sink/src/main/java/com/didichuxing/datachannel/agent/sink/kafkaSink/KafkaSink.java +++ b/agent/agent-sink/src/main/java/com/didichuxing/datachannel/agent/sink/kafkaSink/KafkaSink.java @@ -370,14 +370,14 @@ private boolean sendReally(List kafkaEvents, byte[] content, boolean Long start = TimeUtils.getNanoTime(); String key = EventUtils.getPartitionKey(this, kafkaEvents.size() > 0 ? kafkaEvents.get(0) : new KafkaEvent(), topicPartitionKeyType); - String sourceItemKey = null; + String sourceKey = null; long rate = -1L; int bytes = 0; for (KafkaEvent kafkaEvent : kafkaEvents) { bytes += kafkaEvent.length(); - if (StringUtils.isBlank(sourceItemKey)) { - sourceItemKey = kafkaEvent.getSourceItemKey(); + if (StringUtils.isBlank(sourceKey)) { + sourceKey = kafkaEvent.getSourceKey(); } if (rate == -1 || (rate != -1 && rate > kafkaEvent.getPreRate())) { @@ -386,7 +386,7 @@ private boolean sendReally(List kafkaEvents, byte[] content, boolean } } - if (sourceItemKey == null) { + if (sourceKey == null) { return true; } @@ -421,7 +421,7 @@ private boolean sendReally(List kafkaEvents, byte[] content, boolean return producer.send(getTargetTopic(kafkaTargetConfig.getTopic()), key, content, new KafkaCallBack(this, kafkaEvents.size(), bytes, start, getTargetTopic(kafkaTargetConfig.getTopic()), - modelConfig.getCommonConfig().getModelId(), sourceItemKey, rate)); + modelConfig.getCommonConfig().getModelId(), sourceKey, rate)); } return true; } catch (Exception e) { @@ -447,7 +447,7 @@ private boolean sendByKey(KafkaEvent mqEvent, boolean isAsync) { } String key = EventUtils.getPartitionKey(this, mqEvent, topicPartitionKeyType); int bytes = mqEvent.length(); - String sourceItemKey = mqEvent.getSourceItemKey(); + String sourceKey = mqEvent.getSourceKey(); long rate = mqEvent.getPreRate(); try { if (!isAsync) { @@ -481,7 +481,7 @@ private boolean sendByKey(KafkaEvent mqEvent, boolean isAsync) { if (modelConfig.getTag().equals(Tags.TASK_LOG2KAFKA)) { return producer.send(kafkaTargetConfig.getTopic(), key, contentToSend, new KafkaCallBack(this, 1, bytes, start, kafkaTargetConfig.getTopic(), - modelConfig.getCommonConfig().getModelId(), sourceItemKey, + modelConfig.getCommonConfig().getModelId(), sourceKey, rate)); } else { //TODO:不支持其他处理方式