diff --git a/package.json b/package.json index 6e76f5624..cb36b75c9 100644 --- a/package.json +++ b/package.json @@ -119,7 +119,7 @@ "required": false } }, - "generator": ">=1.8.27 <2.0.0", + "generator": ">=1.8.27", "filters": [ "@asyncapi/generator-filters" ], diff --git a/partials/KafkaPublisher.java b/partials/KafkaPublisher.java index 77b008eb0..a014d1e12 100644 --- a/partials/KafkaPublisher.java +++ b/partials/KafkaPublisher.java @@ -24,8 +24,9 @@ public interface PublisherService { * {{line | safe}}{% endfor %}{% for line in channel.subscribe().description() | splitByLines %} * {{line | safe}}{% endfor %} */{% endif %} - public void {{channel.subscribe().id() | camelCase}}(Integer key, {{varName | upperFirst}} {{varName}}{% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, {% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% endfor %}{% endif %}); + public void {{methodName}}Async(Integer key, {{varName | upperFirst}} {{varName}}{% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, {% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% endfor %}{% endif %}); + public void {{methodName}}Sync(Integer key, {{varName | upperFirst}} {{varName}}{% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, {% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% endfor %}{% endif %}); {%- endif %} {%- endfor %} } -{% endmacro %} \ No newline at end of file +{% endmacro %} diff --git a/partials/KafkaPublisherImpl.java b/partials/KafkaPublisherImpl.java index fa5539d33..57595feb2 100644 --- a/partials/KafkaPublisherImpl.java +++ b/partials/KafkaPublisherImpl.java @@ -13,9 +13,14 @@ {%- endfor -%} {% endif -%} {% endfor %} + import javax.annotation.processing.Generated; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; @Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}") @Service @@ -23,6 +28,7 @@ public class PublisherServiceImpl implements PublisherService { @Autowired private KafkaTemplate kafkaTemplate; + {% for channelName, channel in asyncapi.channels() %} {%- if channel.hasSubscribe() %} {%- set hasParameters = channel.hasParameters() %} @@ -32,19 +38,45 @@ public class PublisherServiceImpl implements PublisherService { {%- else %} {%- set varName = channel.subscribe().message().payload().uid() | camelCase %} {%- endif %} + {% if channel.description() or channel.subscribe().description() %}/**{% for line in channel.description() | splitByLines %} * {{line | safe}}{% endfor %}{% for line in channel.subscribe().description() | splitByLines %} * {{line | safe}}{% endfor %} */{% endif %} - public void {{methodName}}(Integer key, {{varName | upperFirst}} {{varName}}{% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, {% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% endfor %}{% endif %}) { + public void {{methodName}}Async(Integer key, {{varName | upperFirst}} {{varName}}{% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, {% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% endfor %}{% endif %}) { Message<{{varName | upperFirst}}> message = MessageBuilder.withPayload({{varName}}) - .setHeader(KafkaHeaders.TOPIC, get{{methodName | upperFirst-}}Topic({% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}{{parameterName | camelCase}}{% if not loop.last %}, {% endif %}{% endfor %}{% endif %})) + .setHeader(KafkaHeaders.TOPIC, get{{methodName | upperFirst}}Topic({% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}{{parameterName | camelCase}}{% if not loop.last %}, {% endif %}{% endfor %}{% endif %})) .setHeader(KafkaHeaders.{%- if params.springBoot2 %}MESSAGE_KEY{% else %}KEY{% endif -%}, key) .build(); - kafkaTemplate.send(message); + + CompletableFuture future = kafkaTemplate.send(message); + + future.whenComplete((result, ex) -> { + if (ex == null) { + System.out.println("Message successfully sent to topic: " + get{{methodName | upperFirst}}Topic({% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}{{parameterName | camelCase}}{% if not loop.last %}, {% endif %}{% endfor %}{% endif %}) + " with key: " + key); + } else { + System.err.println("Failed to send message to topic: " + get{{methodName | upperFirst}}Topic({% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}{{parameterName | camelCase}}{% if not loop.last %}, {% endif %}{% endfor %}{% endif %}) + " with key: " + key); + ex.printStackTrace(); + } + }); } - private String get{{methodName | upperFirst-}}Topic({% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}{% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% if not loop.last %}, {% endif %}{% endfor %}{% endif %}) { + public void {{methodName}}Sync(Integer key, {{varName | upperFirst}} {{varName}}{% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}, {% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% endfor %}{% endif %}) { + Message<{{varName | upperFirst}}> message = MessageBuilder.withPayload({{varName}}) + .setHeader(KafkaHeaders.TOPIC, get{{methodName | upperFirst}}Topic({% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}{{parameterName | camelCase}}{% if not loop.last %}, {% endif %}{% endfor %}{% endif %})) + .setHeader(KafkaHeaders.{%- if params.springBoot2 %}MESSAGE_KEY{% else %}KEY{% endif -%}, key) + .build(); + + try { + kafkaTemplate.send(message).get(10, TimeUnit.SECONDS); + System.out.println("Message successfully sent synchronously to topic: " + get{{methodName | upperFirst}}Topic({% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}{{parameterName | camelCase}}{% if not loop.last %}, {% endif %}{% endfor %}{% endif %}) + " with key: " + key); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + System.err.println("Failed to send message synchronously to topic: " + get{{methodName | upperFirst}}Topic({% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}{{parameterName | camelCase}}{% if not loop.last %}, {% endif %}{% endfor %}{% endif %}) + " with key: " + key); + e.printStackTrace(); + } + } + + private String get{{methodName | upperFirst}}Topic({% if hasParameters %}{%for parameterName, parameter in channel.parameters() %}{% if parameter.schema().type() === 'object'%}{{parameterName | camelCase | upperFirst}}{% else %}{{parameter.schema().type() | toJavaType(false)}}{% endif %} {{parameterName | camelCase}}{% if not loop.last %}, {% endif %}{% endfor %}{% endif %}) { Map parameters = {% if hasParameters %}new HashMap<>(){% else %}null{% endif %}; {%- if hasParameters %} {%- for parameterName, parameter in channel.parameters() %} @@ -55,6 +87,7 @@ public class PublisherServiceImpl implements PublisherService { } {%- endif %} {%- endfor %} + private String replaceParameters(String topic, Map parameters) { if (parameters != null) { String compiledTopic = topic; @@ -66,4 +99,4 @@ private String replaceParameters(String topic, Map parameters) { return topic; } } -{% endmacro %} \ No newline at end of file +{% endmacro %} diff --git a/tests/__snapshots__/kafka.test.js.snap b/tests/__snapshots__/kafka.test.js.snap index f9bb69983..d6422e60c 100644 --- a/tests/__snapshots__/kafka.test.js.snap +++ b/tests/__snapshots__/kafka.test.js.snap @@ -110,7 +110,8 @@ public interface PublisherService { - public void updateLightMeasurement(Integer key, LightMeasuredPayload lightMeasuredPayload); + public void Async(Integer key, LightMeasuredPayload lightMeasuredPayload); + public void Sync(Integer key, LightMeasuredPayload lightMeasuredPayload); } " `; @@ -521,9 +522,14 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import com.asyncapi.model.LightMeasuredPayload; + import javax.annotation.processing.Generated; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") @Service @@ -532,19 +538,47 @@ public class PublisherServiceImpl implements PublisherService { @Autowired private KafkaTemplate kafkaTemplate; + + - public void updateLightMeasurement(Integer key, LightMeasuredPayload lightMeasuredPayload) { + public void updateLightMeasurementAsync(Integer key, LightMeasuredPayload lightMeasuredPayload) { + Message message = MessageBuilder.withPayload(lightMeasuredPayload) + .setHeader(KafkaHeaders.TOPIC, getUpdateLightMeasurementTopic()) + .setHeader(KafkaHeaders.KEY, key) + .build(); + + CompletableFuture future = kafkaTemplate.send(message); + + future.whenComplete((result, ex) -> { + if (ex == null) { + System.out.println("Message successfully sent to topic: " + getUpdateLightMeasurementTopic() + " with key: " + key); + } else { + System.err.println("Failed to send message to topic: " + getUpdateLightMeasurementTopic() + " with key: " + key); + ex.printStackTrace(); + } + }); + } + + public void updateLightMeasurementSync(Integer key, LightMeasuredPayload lightMeasuredPayload) { Message message = MessageBuilder.withPayload(lightMeasuredPayload) .setHeader(KafkaHeaders.TOPIC, getUpdateLightMeasurementTopic()) .setHeader(KafkaHeaders.KEY, key) .build(); - kafkaTemplate.send(message); + + try { + kafkaTemplate.send(message).get(10, TimeUnit.SECONDS); + System.out.println("Message successfully sent synchronously to topic: " + getUpdateLightMeasurementTopic() + " with key: " + key); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + System.err.println("Failed to send message synchronously to topic: " + getUpdateLightMeasurementTopic() + " with key: " + key); + e.printStackTrace(); + } } private String getUpdateLightMeasurementTopic() { Map parameters = null; return replaceParameters("event.lighting.measured", parameters); } + private String replaceParameters(String topic, Map parameters) { if (parameters != null) { String compiledTopic = topic; @@ -725,3 +759,4 @@ public class MessageHandlerService { } " `; +