Skip to content

Commit a63d488

Browse files
AMW-162 Add support for KRaft
1 parent c8862f7 commit a63d488

File tree

17 files changed

+478
-21
lines changed

17 files changed

+478
-21
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: CI
33
on:
44
push:
55
branches:
6-
- main
6+
- AMW-162
77
pull_request:
88
schedule:
99
- cron: '0 6 * * *'
@@ -16,4 +16,4 @@ jobs:
1616
fqcn: 'middleware_automation/amq_streams'
1717
root_permission_varname: 'amq_streams_install_requires_become'
1818
molecule_tests: >-
19-
[ "default" ]
19+
[ "default", "amq_streams_kraft" ]
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
---
2+
- name: "Automate AMQ Streams KRaft install"
3+
hosts: all
4+
vars:
5+
# Topic Management
6+
amq_streams_broker_topics:
7+
- name: sampleTopic
8+
partitions: 2
9+
replication_factor: 1
10+
- name: otherTopic
11+
partitions: 4
12+
replication_factor: 1
13+
vars_files:
14+
- vars.yml
15+
roles:
16+
- role: amq_streams_common
17+
- role: amq_streams_kraft
18+
tasks:
19+
- name: "Ensure AMQ Streams Broker is running and available."
20+
ansible.builtin.include_role:
21+
name: amq_streams_broker
22+
vars:
23+
amq_streams_common_skip_download: true
24+
25+
- name: "Ensure AMQ Streams Connect is running and available."
26+
ansible.builtin.include_role:
27+
name: amq_streams_connect
28+
vars:
29+
connectors:
30+
- { name: "file", path: "connectors/file.yml" }
31+
32+
- name: "Validate that KRaft deployment is functional."
33+
ansible.builtin.include_role:
34+
name: amq_streams_kraft
35+
tasks_from: validate.yml
36+
37+
- name: "Validate that Broker deployment is functional."
38+
ansible.builtin.include_role:
39+
name: amq_streams_broker
40+
tasks_from: validate.yml
41+
42+
- name: "Validate that Connect deployment is functional."
43+
ansible.builtin.include_role:
44+
name: amq_streams_connect
45+
tasks_from: validate.yml
46+
47+
post_tasks:
48+
- name: "Ensures topics exist."
49+
ansible.builtin.include_role:
50+
name: amq_streams_broker
51+
tasks_from: topic/create.yml
52+
loop: "{{ amq_streams_broker_topics }}"
53+
loop_control:
54+
loop_var: topic
55+
vars:
56+
topic_name: "{{ topic.name }}"
57+
topic_partitions: "{{ topic.partitions }}"
58+
topic_replication_factor: "{{ topic.replication_factor }}"
59+
60+
- name: "Describe created topics."
61+
ansible.builtin.include_role:
62+
name: amq_streams_broker
63+
tasks_from: topic/describe.yml
64+
loop: "{{ amq_streams_broker_topics }}"
65+
loop_control:
66+
loop_var: topic
67+
vars:
68+
topic_name: "{{ topic.name }}"
69+
70+
- name: "Delete topics"
71+
ansible.builtin.include_role:
72+
name: amq_streams_broker
73+
tasks_from: topic/delete.yml
74+
loop: "{{ amq_streams_broker_topics }}"
75+
loop_control:
76+
loop_var: topic
77+
vars:
78+
topic_name: "{{ topic.name }}"
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
---
2+
driver:
3+
name: docker
4+
platforms:
5+
- name: instance
6+
image: registry.access.redhat.com/ubi9/ubi-init:latest
7+
command: "/usr/sbin/init"
8+
pre_build_image: true
9+
privileged: true
10+
groups:
11+
- brokers
12+
provisioner:
13+
name: ansible
14+
config_options:
15+
defaults:
16+
interpreter_python: auto_silent
17+
ssh_connection:
18+
pipelining: false
19+
playbooks:
20+
prepare: ../prepare.yml
21+
converge: converge.yml
22+
verify: verify.yml
23+
inventory:
24+
host_vars:
25+
localhost:
26+
ansible_python_interpreter: "{{ ansible_playbook_python }}"
27+
env:
28+
ANSIBLE_FORCE_COLOR: "true"
29+
verifier:
30+
name: ansible
31+
scenario:
32+
test_sequence:
33+
- cleanup
34+
- destroy
35+
- syntax
36+
- create
37+
- prepare
38+
- converge
39+
- idempotence
40+
- side_effect
41+
- verify
42+
- cleanup
43+
- destroy

molecule/amq_streams_kraft/roles

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../../roles
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
---
2+
amq_streams_common_escalade_privilege_group_create: "{{ amq_streams_install_requires_become | default(true) }}"
3+
amq_streams_common_escalade_privilege_user_create: "{{ amq_streams_install_requires_become | default(true) }}"
4+
amq_streams_common_archive_extraction_requires_privilege_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
5+
amq_streams_common_dependencies_require_priv: "{{ amq_streams_install_requires_become | default(true) }}"
6+
amq_streams_zookeeper_data_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
7+
amq_streams_zookeeper_restart_requires_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
8+
amq_streams_broker_tls_truststore_client_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
9+
amq_streams_broker_config_files_requires_privilege_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
10+
amq_streams_cruise_control_path_to_capacity_file_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
11+
amq_streams_cruise_control_logfiles_requires_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
12+
amq_streams_connect_source_file_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
13+
amq_streams_kraft_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}"
14+
amq_streams_common_product_version: 4.1.1
15+
# Run the Systemd Service as root
16+
amq_streams_broker_user: root
17+
amq_streams_broker_group: root
18+
19+
# Run KRaft tasks as root
20+
amq_streams_kraft_user: root
21+
amq_streams_kraft_group: root
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
---
2+
- name: Verify
3+
hosts: all
4+
tasks:
5+
6+
- name: Populate service facts
7+
ansible.builtin.service_facts:
8+
9+
- name: Check broker service
10+
assert:
11+
that:
12+
- ansible_facts.services["amq_streams_broker.service"]["state"] == "running"
13+
- ansible_facts.services["amq_streams_broker.service"]["status"] == "enabled"
14+
15+
- name: Check controller service
16+
assert:
17+
that:
18+
- ansible_facts.services["amq_streams_controller.service"]["state"] == "running"
19+
- ansible_facts.services["amq_streams_controller.service"]["status"] == "enabled"

roles/amq_streams_broker/templates/server.properties.j2

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,35 @@
2020
# See kafka.server.KafkaConfig for additional details and defaults
2121
#
2222

23+
# -----------------------------------------------------------------------------
24+
# MODE DETERMINATION (Added for Kafka 4.0+ KRaft Support)
25+
# -----------------------------------------------------------------------------
26+
{% set enable_kraft = amq_streams_enable_kraft | default(amq_streams_common_product_version is version('4.0.0', '>=')) | bool %}
27+
2328
############################# Server Basics #############################
2429

2530
# The id of the broker. This must be set to a unique integer for each broker.
31+
{% if enable_kraft %}
32+
# KRaft uses node.id instead of broker.id
33+
node.id={{ amq_streams_kraft_node_id | default(1) }}
34+
{% else %}
2635
broker.id={{ amq_streams_broker_broker_id | default(amq_streams_broker_inventory_group.index(inventory_hostname)) }}
36+
{% endif %}
37+
38+
############################# KRaft Settings (Kafka 4.0+) #############################
39+
{% if enable_kraft %}
40+
# The roles of this process. broker, controller, or both.
41+
process.roles={{ amq_streams_kraft_process_roles | default('broker,controller') }}
42+
43+
# The connect string for the controller quorum
44+
controller.quorum.voters={{ amq_streams_kraft_controller_quorum_voters }}
45+
46+
# Listener name used for the controller
47+
controller.listener.names={{ amq_streams_kraft_controller_listener_names | default('CONTROLLER') }}
48+
49+
# Listener name used for inter-broker communication
50+
inter.broker.listener.name={{ amq_streams_kraft_inter_broker_listener_name | default('PLAINTEXT') }}
51+
{% endif %}
2752

2853
############################# Socket Server Settings #############################
2954

@@ -33,40 +58,57 @@ broker.id={{ amq_streams_broker_broker_id | default(amq_streams_broker_inventory
3358
# listeners = listener_name://host_name:port
3459
# EXAMPLE:
3560
# listeners = PLAINTEXT://your.host.name:9092
36-
{% if amq_streams_broker_listeners is defined %}
61+
{% if enable_kraft %}
62+
# KRaft Mode Listeners (Requires Broker + Controller ports)
63+
listeners={{ amq_streams_kraft_listeners | join(",") }}
64+
{% else %}
65+
# Legacy ZK Mode Listeners
66+
{% if amq_streams_broker_listeners is defined %}
3767
listeners={{ amq_streams_broker_listeners | join(",") }}
38-
{% elif amq_streams_broker_listener_port is defined %}
68+
{% elif amq_streams_broker_listener_port is defined %}
3969
listeners=PLAINTEXT://:{{ amq_streams_broker_listener_port }}
40-
{% else %}
70+
{% else %}
4171
#listeners=PLAINTEXT://:9092
72+
{% endif %}
4273
{% endif %}
4374

44-
{% if amq_streams_broker_inter_broker_listener is defined %}
45-
# Name of listener used for communication between brokers
75+
{% if amq_streams_broker_inter_broker_listener is defined and not enable_kraft %}
76+
# Name of listener used for communication between brokers (Legacy ZK only)
4677
inter.broker.listener.name={{ amq_streams_broker_inter_broker_listener }}
4778
{% endif %}
4879

4980
# Listener name, hostname and port the broker will advertise to clients.
5081
# If not set, it uses the value for "listeners".
51-
{% if amq_streams_broker_advertised_listeners is defined %}
52-
advertised.listeners={{ amq_streams_broker_advertised_listeners | join(",") }}
82+
{% if enable_kraft %}
83+
# KRaft Mode Advertised Listeners (Broker port only)
84+
advertised.listeners={{ amq_streams_kraft_advertised_listeners | join(",") }}
5385
{% else %}
86+
# Legacy ZK Mode Advertised Listeners
87+
{% if amq_streams_broker_advertised_listeners is defined %}
88+
advertised.listeners={{ amq_streams_broker_advertised_listeners | join(",") }}
89+
{% else %}
5490
#advertised.listeners=PLAINTEXT://your.host.name:9092
91+
{% endif %}
5592
{% endif %}
5693

57-
{% if amq_streams_broker_auth_enabled and amq_streams_broker_auth_listeners is defined %}
94+
{% if enable_kraft %}
95+
# KRaft Mode Security Map (Must include Controller)
96+
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
97+
{% else %}
98+
{% if amq_streams_broker_auth_enabled and amq_streams_broker_auth_listeners is defined %}
5899
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
59100
listener.security.protocol.map={{ amq_streams_broker_auth_listeners | join(",") }}
60101

61102
# The list of SASL mechanisms enabled in the Kafka server
62103
sasl.enabled.mechanisms={{ amq_streams_broker_auth_sasl_mechanisms | join(",") }}
63-
{% if amq_streams_broker_inter_broker_auth_sasl_mechanisms is defined %}
104+
{% if amq_streams_broker_inter_broker_auth_sasl_mechanisms is defined %}
64105
# SASL mechanism used for inter-broker communication
65106
sasl.mechanism.inter.broker.protocol={{ amq_streams_broker_inter_broker_auth_sasl_mechanisms }}
66-
{% endif %}
67-
{% else %}
107+
{% endif %}
108+
{% else %}
68109
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
69110
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
111+
{% endif %}
70112
{% endif %}
71113

72114
{% if amq_streams_broker_inter_broker_listener_auth is defined %}
@@ -105,7 +147,11 @@ socket.request.max.bytes={{ amq_streams_broker_socket_request_max_bytes }}
105147
############################# Log Basics #############################
106148

107149
# A comma separated list of directories under which to store log files
150+
{% if enable_kraft %}
151+
log.dirs={{ amq_streams_kraft_log_dirs }}
152+
{% else %}
108153
log.dirs={{ amq_streams_broker_data_dir }}
154+
{% endif %}
109155

110156
# The default number of log partitions per topic. More partitions allow greater
111157
# parallelism for consumption, but this will also result in more files across
@@ -162,7 +208,7 @@ log.retention.hours={{ amq_streams_broker_log_retention_hours }}
162208
log.retention.check.interval.ms={{ amq_streams_broker_log_retention_check_interval_ms }}
163209

164210
############################# Zookeeper #############################
165-
211+
{% if not enable_kraft %}
166212
# Zookeeper connection string (see zookeeper docs for details).
167213
# This is a comma separated host:port pairs, each corresponding to a zk
168214
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
@@ -173,6 +219,7 @@ zookeeper.connect={{ amq_streams_broker_zookeeper_host }}:{{ amq_streams_broker_
173219
# Timeout in ms for connecting to zookeeper
174220
zookeeper.connection.timeout.ms={{ amq_streams_broker_zookeeper_connection_timeout_ms }}
175221
zookeeper.session.timeout.ms={{ amq_streams_broker_zookeeper_session_timeout_ms }}
222+
{% endif %}
176223

177224
############################# Group Coordinator Settings #############################
178225

roles/amq_streams_common/tasks/systemd.yml

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,44 @@
6969
enabled: yes
7070
state: started
7171

72-
- name: "Wait for service port {{ server_port }} to be available - (if provided)"
73-
ansible.builtin.wait_for:
74-
port: "{{ server_port }}"
75-
delay: "{{ delay_before_server_port_check | default(omit) }}"
76-
when:
77-
- skip_wait_for_server_port is defined and not skip_wait_for_server_port
78-
- server_port is defined and server_port != ''
72+
# --- CHANGED SECTION START ---
73+
# Replaced simple wait_for with block/rescue to capture crash logs
74+
- name: "Wait for service port {{ server_port }} (with crash log capture)"
75+
block:
76+
- name: "Wait for service port {{ server_port }} to be available"
77+
ansible.builtin.wait_for:
78+
port: "{{ server_port }}"
79+
# If listeners bind to 0.0.0.0, check localhost. Otherwise check specific IP.
80+
host: "127.0.0.1"
81+
delay: "{{ delay_before_server_port_check | default(5) }}"
82+
timeout: 60 # Fail fast (1 min) to see logs quicker
83+
when:
84+
- skip_wait_for_server_port is defined and not skip_wait_for_server_port
85+
- server_port is defined and server_port != ''
86+
87+
rescue:
88+
- name: "FATAL: Service failed to start. Fetching systemd logs..."
89+
ansible.builtin.shell: "journalctl -u {{ server_name }} -xe --no-pager | tail -n 50"
90+
register: systemd_logs
91+
changed_when: false
92+
ignore_errors: true
93+
94+
- name: "FATAL: Fetching Kafka server logs..."
95+
# CHANGE: Use 'amq_streams_common_home' instead of 'amq_streams_kafka_home'
96+
ansible.builtin.shell: "cat {{ amq_streams_common_home }}/logs/server.log | tail -n 50"
97+
register: kafka_logs
98+
changed_when: false
99+
ignore_errors: true
100+
101+
- name: "PRINT SYSTEMD CRASH LOGS"
102+
ansible.builtin.debug:
103+
msg: "{{ systemd_logs.stdout_lines }}"
104+
105+
- name: "PRINT KAFKA SERVER LOGS"
106+
ansible.builtin.debug:
107+
msg: "{{ kafka_logs.stdout_lines }}"
108+
109+
- name: "Fail the playbook explicitly"
110+
ansible.builtin.fail:
111+
msg: "The Kafka service crashed immediately. Review the Java exceptions printed above."
112+
# --- CHANGED SECTION END ---

0 commit comments

Comments
 (0)