Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: CI
on:
push:
branches:
- main
- AMW-162
pull_request:
schedule:
- cron: '0 6 * * *'
Expand All @@ -16,4 +16,4 @@ jobs:
fqcn: 'middleware_automation/amq_streams'
root_permission_varname: 'amq_streams_install_requires_become'
molecule_tests: >-
[ "default" ]
[ "default", "amq_streams_kraft" ]
78 changes: 78 additions & 0 deletions molecule/amq_streams_kraft/converge.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
---
- name: "Automate AMQ Streams KRaft install"
hosts: all
vars:
# Topic Management
amq_streams_broker_topics:
- name: sampleTopic
partitions: 2
replication_factor: 1
- name: otherTopic
partitions: 4
replication_factor: 1
vars_files:
- vars.yml
roles:
- role: amq_streams_common
- role: amq_streams_kraft
tasks:
- name: "Ensure AMQ Streams Broker is running and available."
ansible.builtin.include_role:
name: amq_streams_broker
vars:
amq_streams_common_skip_download: true

- name: "Ensure AMQ Streams Connect is running and available."
ansible.builtin.include_role:
name: amq_streams_connect
vars:
connectors:
- { name: "file", path: "connectors/file.yml" }

- name: "Validate that KRaft deployment is functional."
ansible.builtin.include_role:
name: amq_streams_kraft
tasks_from: validate.yml

- name: "Validate that Broker deployment is functional."
ansible.builtin.include_role:
name: amq_streams_broker
tasks_from: validate.yml

- name: "Validate that Connect deployment is functional."
ansible.builtin.include_role:
name: amq_streams_connect
tasks_from: validate.yml

post_tasks:
- name: "Ensures topics exist."
ansible.builtin.include_role:
name: amq_streams_broker
tasks_from: topic/create.yml
loop: "{{ amq_streams_broker_topics }}"
loop_control:
loop_var: topic
vars:
topic_name: "{{ topic.name }}"
topic_partitions: "{{ topic.partitions }}"
topic_replication_factor: "{{ topic.replication_factor }}"

- name: "Describe created topics."
ansible.builtin.include_role:
name: amq_streams_broker
tasks_from: topic/describe.yml
loop: "{{ amq_streams_broker_topics }}"
loop_control:
loop_var: topic
vars:
topic_name: "{{ topic.name }}"

- name: "Delete topics"
ansible.builtin.include_role:
name: amq_streams_broker
tasks_from: topic/delete.yml
loop: "{{ amq_streams_broker_topics }}"
loop_control:
loop_var: topic
vars:
topic_name: "{{ topic.name }}"
43 changes: 43 additions & 0 deletions molecule/amq_streams_kraft/molecule.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
---
driver:
name: docker
platforms:
- name: instance
image: registry.access.redhat.com/ubi9/ubi-init:latest
command: "/usr/sbin/init"
pre_build_image: true
privileged: true
groups:
- brokers
provisioner:
name: ansible
config_options:
defaults:
interpreter_python: auto_silent
ssh_connection:
pipelining: false
playbooks:
prepare: ../prepare.yml
converge: converge.yml
verify: verify.yml
inventory:
host_vars:
localhost:
ansible_python_interpreter: "{{ ansible_playbook_python }}"
env:
ANSIBLE_FORCE_COLOR: "true"
verifier:
name: ansible
scenario:
test_sequence:
- cleanup
- destroy
- syntax
- create
- prepare
- converge
- idempotence
- side_effect
- verify
- cleanup
- destroy
1 change: 1 addition & 0 deletions molecule/amq_streams_kraft/roles
21 changes: 21 additions & 0 deletions molecule/amq_streams_kraft/vars.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
amq_streams_common_escalade_privilege_group_create: "{{ amq_streams_install_requires_become | default(true) }}"
amq_streams_common_escalade_privilege_user_create: "{{ amq_streams_install_requires_become | default(true) }}"
amq_streams_common_archive_extraction_requires_privilege_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
amq_streams_common_dependencies_require_priv: "{{ amq_streams_install_requires_become | default(true) }}"
amq_streams_zookeeper_data_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
amq_streams_zookeeper_restart_requires_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
amq_streams_broker_tls_truststore_client_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
amq_streams_broker_config_files_requires_privilege_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
amq_streams_cruise_control_path_to_capacity_file_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
amq_streams_cruise_control_logfiles_requires_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
amq_streams_connect_source_file_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
amq_streams_kraft_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
amq_streams_common_product_version: 4.1.1
# Run the Systemd Service as root
amq_streams_broker_user: root
amq_streams_broker_group: root

# Run KRaft tasks as root
amq_streams_kraft_user: root
amq_streams_kraft_group: root
13 changes: 13 additions & 0 deletions molecule/amq_streams_kraft/verify.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
- name: Verify
hosts: all
tasks:

- name: Populate service facts
ansible.builtin.service_facts:

- name: Check broker service
assert:
that:
- ansible_facts.services["amq_streams_broker.service"]["state"] == "running"
- ansible_facts.services["amq_streams_broker.service"]["status"] == "enabled"
53 changes: 50 additions & 3 deletions roles/amq_streams_broker/templates/server.properties.j2
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,35 @@
# See kafka.server.KafkaConfig for additional details and defaults
#

# -----------------------------------------------------------------------------
# MODE DETERMINATION (Added for Kafka 4.0+ KRaft Support)
# -----------------------------------------------------------------------------
{% set enable_kraft = amq_streams_enable_kraft | default(amq_streams_common_product_version is version('4.0.0', '>=')) | bool %}

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
{% if enable_kraft %}
# KRaft uses node.id instead of broker.id
node.id={{ amq_streams_kraft_node_id | default(1) }}
{% else %}
broker.id={{ amq_streams_broker_broker_id | default(amq_streams_broker_inventory_group.index(inventory_hostname)) }}
{% endif %}

############################# KRaft Settings (Kafka 4.0+) #############################
{% if enable_kraft %}
# The roles of this process. broker, controller, or both.
process.roles={{ amq_streams_kraft_process_roles | default('broker,controller') }}

# The connect string for the controller quorum
controller.quorum.voters={{ amq_streams_kraft_controller_quorum_voters }}

# Listener name used for the controller
controller.listener.names={{ amq_streams_kraft_controller_listener_names | default('CONTROLLER') }}

# Listener name used for inter-broker communication
inter.broker.listener.name={{ amq_streams_kraft_inter_broker_listener_name | default('PLAINTEXT') }}
{% endif %}

############################# Socket Server Settings #############################

Expand All @@ -33,27 +58,43 @@ broker.id={{ amq_streams_broker_broker_id | default(amq_streams_broker_inventory
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
{% if enable_kraft %}
# KRaft Mode Listeners (Requires Broker + Controller ports)
listeners={{ amq_streams_kraft_listeners | join(",") }}
{% else %}
# Legacy ZK Mode Listeners
{% if amq_streams_broker_listeners is defined %}
listeners={{ amq_streams_broker_listeners | join(",") }}
{% elif amq_streams_broker_listener_port is defined %}
listeners=PLAINTEXT://:{{ amq_streams_broker_listener_port }}
{% else %}
#listeners=PLAINTEXT://:9092
{% endif %}
{% endif %}

{% if amq_streams_broker_inter_broker_listener is defined %}
# Name of listener used for communication between brokers
{% if amq_streams_broker_inter_broker_listener is defined and not enable_kraft %}
# Name of listener used for communication between brokers (Legacy ZK only)
inter.broker.listener.name={{ amq_streams_broker_inter_broker_listener }}
{% endif %}

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
{% if enable_kraft %}
# KRaft Mode Advertised Listeners (Broker port only)
advertised.listeners={{ amq_streams_kraft_advertised_listeners | join(",") }}
{% else %}
# Legacy ZK Mode Advertised Listeners
{% if amq_streams_broker_advertised_listeners is defined %}
advertised.listeners={{ amq_streams_broker_advertised_listeners | join(",") }}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong indentation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected.

{% else %}
#advertised.listeners=PLAINTEXT://your.host.name:9092
{% endif %}
{% endif %}

{% if enable_kraft %}
# KRaft Mode Security Map (Must include Controller)
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
{% else %}
{% if amq_streams_broker_auth_enabled and amq_streams_broker_auth_listeners is defined %}
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map={{ amq_streams_broker_auth_listeners | join(",") }}
Expand All @@ -68,6 +109,7 @@ sasl.mechanism.inter.broker.protocol={{ amq_streams_broker_inter_broker_auth_sas
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
{% endif %}
{% endif %}

{% if amq_streams_broker_inter_broker_listener_auth is defined %}
# Authentication mechanism for the inter-broker listener
Expand Down Expand Up @@ -105,7 +147,11 @@ socket.request.max.bytes={{ amq_streams_broker_socket_request_max_bytes }}
############################# Log Basics #############################

# A comma separated list of directories under which to store log files
{% if enable_kraft %}
log.dirs={{ amq_streams_kraft_log_dirs }}
{% else %}
log.dirs={{ amq_streams_broker_data_dir }}
{% endif %}

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
Expand Down Expand Up @@ -162,7 +208,7 @@ log.retention.hours={{ amq_streams_broker_log_retention_hours }}
log.retention.check.interval.ms={{ amq_streams_broker_log_retention_check_interval_ms }}

############################# Zookeeper #############################

{% if not enable_kraft %}
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
Expand All @@ -173,6 +219,7 @@ zookeeper.connect={{ amq_streams_broker_zookeeper_host }}:{{ amq_streams_broker_
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms={{ amq_streams_broker_zookeeper_connection_timeout_ms }}
zookeeper.session.timeout.ms={{ amq_streams_broker_zookeeper_session_timeout_ms }}
{% endif %}

############################# Group Coordinator Settings #############################

Expand Down
59 changes: 59 additions & 0 deletions roles/amq_streams_kraft/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# AMQ Streams KRaft Role

This role orchestrates the initialization and configuration of Apache Kafka in KRaft (Kafka Raft Metadata) mode, removing the traditional dependency on ZooKeeper. It handles the mandatory bootstrapping process by generating a unique Cluster UUID (if not provided) and executing kafka-storage.sh to format the storage directories with the necessary metadata. Additionally, it prepares the critical configuration parameters required for a ZooKeeper-less environment, ensuring the correct setup of process roles, controller quorums, and the strict separation of broker and controller listeners.

## Requirements

* **Role Dependencies**: `amq_streams_common` (must be run first to install the Kafka binaries).
* **Kafka Version**: Designed for Kafka 4.0.0+ or Kafka 3.x with KRaft enabled.

## Role Variables

| Variable | Default Value | Description |
| :--- | :--- | :--- |
| `amq_streams_install_dir` | `/opt` | The base directory where AMQ Streams/Kafka is installed. |
| `amq_streams_kafka_home` | `{{ amq_streams_install_dir }}/kafka_{{ amq_streams_common_version }}/` | The absolute path to the Kafka installation home directory. |
| `amq_streams_kraft_config_dir` | `{{ amq_streams_kafka_home }}/config` | The directory containing `server.properties` and other config files. |
| `amq_streams_kraft_data_dir` | `{{ amq_streams_kafka_home }}/data/kraft` | The directory where Kafka stores its KRaft metadata and log data. |
| `amq_streams_cluster_id` | `""` | The UUID of the Kafka cluster. If left empty, the role will auto-generate a random UUID during the first run. |
| `amq_streams_kraft_node_id` | `1` | The unique integer ID for this specific broker/controller node. **Must be unique per host.** |
| `amq_streams_kraft_process_roles` | `"broker,controller"` | Defines the role of this node. Options: `broker`, `controller`, or `broker,controller` (combined). |
| `amq_streams_kraft_controller_quorum_voters` | `1@{{ ansible_host }}:{{ amq_streams_kraft_controller_port }}` | The voter connection string in the format `nodeId@host:port`. Must list all controller nodes in the cluster. |
| `amq_streams_kraft_listener_port` | `9092` | The port used for standard Client (broker) traffic. |
| `amq_streams_kraft_controller_port` | `9093` | The port used for Controller-to-Controller Raft communication. |
| `amq_streams_kraft_listeners` | *(See defaults)* | A list of all listeners to bind. Must include **both** the Controller and Broker listeners (e.g., `PLAINTEXT://...` and `CONTROLLER://...`). |
| `amq_streams_kraft_advertised_listeners` | *(See defaults)* | A list of listeners advertised to clients. Must **ONLY** include Broker listeners (e.g., `PLAINTEXT://...`). |
| `amq_streams_kraft_controller_listener_names` | `"CONTROLLER"` | The listener name used by the controller quorum (must match an entry in `listeners`). |
| `amq_streams_kraft_inter_broker_listener_name` | `"PLAINTEXT"` | The listener name used for replication between brokers. |
| `amq_streams_kraft_log_dirs` | `{{ amq_streams_kraft_data_dir }}` | The comma-separated list of directories for log data. Usually matches the data dir. |
| `amq_streams_kraft_log_retention_hours` | `168` | The number of hours to keep log segments before deletion (Default: 7 days). |
| `amq_streams_kraft_priv_escalation` | `yes` | Controls whether tasks (like creating directories and formatting storage) run with elevated privileges (`become: true`). |

## Example Playbook

```yaml
---
- hosts: kafka_brokers
vars:
amq_streams_common_version: "3.7.0"
amq_streams_enable_kraft: true

roles:
# Install Java and Kafka Binaries
- role: amq_streams_common

# Configure and Format Storage for KRaft
- role: amq_streams_kraft
when: amq_streams_enable_kraft | bool

# Start the Kafka Service
- role: amq_streams_broker
```

## License

Apache License v2.0 or later

## Author Information

* [Ranabir Chakraborty](https://github.com/RanabirChakraborty)
21 changes: 21 additions & 0 deletions roles/amq_streams_kraft/defaults/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
amq_streams_install_dir: "/opt"
amq_streams_kafka_home: "{{ amq_streams_install_dir }}/kafka_{{ amq_streams_common_version }}/"
amq_streams_kraft_config_dir: "{{ amq_streams_kafka_home }}/config"
amq_streams_kraft_data_dir: "{{ amq_streams_kafka_home }}/data/kraft"
amq_streams_cluster_id: ""
amq_streams_kraft_node_id: 1
amq_streams_kraft_listener_port: 9092
amq_streams_kraft_controller_port: 9093
amq_streams_kraft_controller_quorum_voters: "1@{{ ansible_host }}:{{ amq_streams_kraft_controller_port }}"
amq_streams_kraft_listeners:
- "PLAINTEXT://0.0.0.0:{{ amq_streams_kraft_listener_port }}"
- "CONTROLLER://0.0.0.0:{{ amq_streams_kraft_controller_port }}"
amq_streams_kraft_advertised_listeners:
- "PLAINTEXT://{{ ansible_host }}:{{ amq_streams_kraft_listener_port }}"
amq_streams_kraft_controller_listener_names: "CONTROLLER"
amq_streams_kraft_inter_broker_listener_name: "PLAINTEXT"
amq_streams_kraft_process_roles: "broker,controller"
amq_streams_kraft_log_dirs: "{{ amq_streams_kraft_data_dir }}"
amq_streams_kraft_log_retention_hours: 168
amq_streams_kraft_priv_escalation: yes
9 changes: 9 additions & 0 deletions roles/amq_streams_kraft/handlers/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
- name: Reload systemd
ansible.builtin.systemd:
daemon_reload: true

- name: Restart Kafka
ansible.builtin.systemd:
name: "{{ server_name | default('amq_streams_broker') }}"
state: restarted
Loading
Loading