This project illustrates how you can build Apache Kafka Streams applications using Quarkus.
Is based on Quarkus Kafka Streams quickstart but refactors applied on my version.
This quickstart is made up of the following parts:
- Apache Kafka and ZooKeeper
- producer, a Quarkus application that publishes some test data on two Kafka topics:
weather-stationsandtemperature-values - aggregator, a Quarkus application processing the two topics, using the Kafka Streams API
The aggregator application is the interesting piece; it
- runs a KStreams pipeline, that joins the two topics (on the weather station id),
groups the values by weather station and emits the minimum/maximum temperature value per station to the
temperatures-aggregatedtopic - exposes an HTTP endpoint for getting the current minimum/maximum values for a given station using Kafka Streams interactive queries.
To build the producer and aggregator applications, run
mvn clean installA Docker Compose file is provided for running all the components. Start all containers by running:
docker-compose up -d --buildNow run an instance of the debezium/tooling image which comes with several useful tools such as kafkacat and httpie:
docker run --tty --rm -i --network ks debezium/tooling:1.1In the tooling container, run kafkacat to examine the results of the streaming pipeline:
kafkacat -b kafka:9092 -C -o beginning -q -t temperatures-aggregatedYou also can obtain the current aggregated state for a given weather station using httpie, which will invoke an Kafka Streams interactive query for that value:
http aggregator:8080/weather-stations/data/1THIS ONE DOESN'T WORK
HTTP/1.1 404 Not Found content-length: 0
Kafka Streams pipelines can be scaled out, i.e. the load can be distributed amongst multiple application instances running the same pipeline. To try this out, scale the aggregator service to three nodes:
docker-compose up -d --scale aggregator=3This will spin up two more instances of this service. The state store that materializes the current state of the streaming pipeline (which we queried before using the interactive query), is now distributed amongst the three nodes. I.e. when invoking the REST API on any of the three instances, it might either be that the aggregation for the requested weather station id is stored locally on the node receiving the query, or it could be stored on one of the other two nodes.
As the load balancer of Docker Compose will distribute requests to the aggregator service in a round-robin fashion, we'll invoke the actual nodes directly. The application exposes information about all the host names via REST:
http aggregator:8080/weather-stations/meta-dataRetrieve the data from one of the three hosts shown in the response (your actual host names will differ):
http cf143d359acc:8080/weather-stations/data/1If that node holds the data for key "1", you'll get a response like this:
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 74
Content-Type: application/json
Date: Tue, 11 Jun 2019 19:16:31 GMT
{
"avg": 15.7,
"count": 11,
"max": 31.0,
"min": 3.3,
"stationId": 1,
"stationName": "Hamburg"
}
Otherwise, the service will send a redirect:
HTTP/1.1 303 See Other
Connection: keep-alive
Content-Length: 0
Date: Tue, 11 Jun 2019 19:17:51 GMT
Location: http://72064bb97be9:8080/weather-stations/data/2
You can have httpie automatically follow the redirect by passing the --follow option:
http --follow aggregator:8080/weather-stations/data/2In case HTTP is disabled via:
quarkus.http.insecure-requests=disabledThe endpoint URL becomes:
curl -L --insecure https://aggregator:8443/weather-stations/data/2To run the producer and aggregator applications as native binaries via GraalVM,
first run the Maven builds using the native profile:
mvn clean install -Pnative -Dnative-image.container-runtime=dockerThen create an environment variable named QUARKUS_MODE and with value set to "native":
export QUARKUS_MODE=nativeNow start Docker Compose as described above.
For development purposes it can be handy to run the producer and aggregator applications
directly on your local machine instead of via Docker.
For that purpose, a separate Docker Compose file is provided which just starts Apache Kafka and ZooKeeper, docker-compose-local.yaml
configured to be accessible from your host system.
Open this file an editor and change the value of the KAFKA_ADVERTISED_LISTENERS variable so it contains your host machine's name or ip address.
Then run:
docker-compose -f docker-compose-local.yaml up
mvn quarkus:dev -f producer/pom.xml
mvn quarkus:dev -Dquarkus.http.port=8081 -f aggregator/pom.xmlAny changes done to the aggregator application will be picked up instantly, and a reload of the stream processing application will be triggered upon the next Kafka message to be processed.