diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ac21ce21..4a0880b3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,7 +3,7 @@ name: CI on: push: branches: - - main + - AMW-162 pull_request: schedule: - cron: '0 6 * * *' @@ -16,4 +16,4 @@ jobs: fqcn: 'middleware_automation/amq_streams' root_permission_varname: 'amq_streams_install_requires_become' molecule_tests: >- - [ "default" ] + [ "default", "amq_streams_kraft" ] diff --git a/molecule/amq_streams_kraft/converge.yml b/molecule/amq_streams_kraft/converge.yml new file mode 100644 index 00000000..0204488f --- /dev/null +++ b/molecule/amq_streams_kraft/converge.yml @@ -0,0 +1,78 @@ +--- +- name: "Automate AMQ Streams KRaft install" + hosts: all + vars: + # Topic Management + amq_streams_broker_topics: + - name: sampleTopic + partitions: 2 + replication_factor: 1 + - name: otherTopic + partitions: 4 + replication_factor: 1 + vars_files: + - vars.yml + roles: + - role: amq_streams_common + - role: amq_streams_kraft + tasks: + - name: "Ensure AMQ Streams Broker is running and available." + ansible.builtin.include_role: + name: amq_streams_broker + vars: + amq_streams_common_skip_download: true + + - name: "Ensure AMQ Streams Connect is running and available." + ansible.builtin.include_role: + name: amq_streams_connect + vars: + connectors: + - { name: "file", path: "connectors/file.yml" } + + - name: "Validate that KRaft deployment is functional." + ansible.builtin.include_role: + name: amq_streams_kraft + tasks_from: validate.yml + + - name: "Validate that Broker deployment is functional." + ansible.builtin.include_role: + name: amq_streams_broker + tasks_from: validate.yml + + - name: "Validate that Connect deployment is functional." + ansible.builtin.include_role: + name: amq_streams_connect + tasks_from: validate.yml + + post_tasks: + - name: "Ensures topics exist." + ansible.builtin.include_role: + name: amq_streams_broker + tasks_from: topic/create.yml + loop: "{{ amq_streams_broker_topics }}" + loop_control: + loop_var: topic + vars: + topic_name: "{{ topic.name }}" + topic_partitions: "{{ topic.partitions }}" + topic_replication_factor: "{{ topic.replication_factor }}" + + - name: "Describe created topics." + ansible.builtin.include_role: + name: amq_streams_broker + tasks_from: topic/describe.yml + loop: "{{ amq_streams_broker_topics }}" + loop_control: + loop_var: topic + vars: + topic_name: "{{ topic.name }}" + + - name: "Delete topics" + ansible.builtin.include_role: + name: amq_streams_broker + tasks_from: topic/delete.yml + loop: "{{ amq_streams_broker_topics }}" + loop_control: + loop_var: topic + vars: + topic_name: "{{ topic.name }}" diff --git a/molecule/amq_streams_kraft/molecule.yml b/molecule/amq_streams_kraft/molecule.yml new file mode 100644 index 00000000..3acb8655 --- /dev/null +++ b/molecule/amq_streams_kraft/molecule.yml @@ -0,0 +1,43 @@ +--- +driver: + name: docker +platforms: + - name: instance + image: registry.access.redhat.com/ubi9/ubi-init:latest + command: "/usr/sbin/init" + pre_build_image: true + privileged: true + groups: + - brokers +provisioner: + name: ansible + config_options: + defaults: + interpreter_python: auto_silent + ssh_connection: + pipelining: false + playbooks: + prepare: ../prepare.yml + converge: converge.yml + verify: verify.yml + inventory: + host_vars: + localhost: + ansible_python_interpreter: "{{ ansible_playbook_python }}" + env: + ANSIBLE_FORCE_COLOR: "true" +verifier: + name: ansible +scenario: + test_sequence: + - cleanup + - destroy + - syntax + - create + - prepare + - converge + - idempotence + - side_effect + - verify + - cleanup + - destroy diff --git a/molecule/amq_streams_kraft/roles b/molecule/amq_streams_kraft/roles new file mode 120000 index 00000000..b741aa3d --- /dev/null +++ b/molecule/amq_streams_kraft/roles @@ -0,0 +1 @@ +../../roles \ No newline at end of file diff --git a/molecule/amq_streams_kraft/vars.yml b/molecule/amq_streams_kraft/vars.yml new file mode 100644 index 00000000..9eded8bb --- /dev/null +++ b/molecule/amq_streams_kraft/vars.yml @@ -0,0 +1,21 @@ +--- +amq_streams_common_escalade_privilege_group_create: "{{ amq_streams_install_requires_become | default(true) }}" +amq_streams_common_escalade_privilege_user_create: "{{ amq_streams_install_requires_become | default(true) }}" +amq_streams_common_archive_extraction_requires_privilege_escalation: "{{ amq_streams_install_requires_become | default(true) }}" +amq_streams_common_dependencies_require_priv: "{{ amq_streams_install_requires_become | default(true) }}" +amq_streams_zookeeper_data_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}" +amq_streams_zookeeper_restart_requires_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}" +amq_streams_broker_tls_truststore_client_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}" +amq_streams_broker_config_files_requires_privilege_escalation: "{{ amq_streams_install_requires_become | default(true) }}" +amq_streams_cruise_control_path_to_capacity_file_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}" +amq_streams_cruise_control_logfiles_requires_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}" +amq_streams_connect_source_file_require_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}" +amq_streams_kraft_priv_escalation: "{{ amq_streams_install_requires_become | default(true) }}" +amq_streams_common_product_version: 4.1.1 +# Run the Systemd Service as root +amq_streams_broker_user: root +amq_streams_broker_group: root + +# Run KRaft tasks as root +amq_streams_kraft_user: root +amq_streams_kraft_group: root diff --git a/molecule/amq_streams_kraft/verify.yml b/molecule/amq_streams_kraft/verify.yml new file mode 100644 index 00000000..1ee96fb6 --- /dev/null +++ b/molecule/amq_streams_kraft/verify.yml @@ -0,0 +1,13 @@ +--- +- name: Verify + hosts: all + tasks: + + - name: Populate service facts + ansible.builtin.service_facts: + + - name: Check broker service + assert: + that: + - ansible_facts.services["amq_streams_broker.service"]["state"] == "running" + - ansible_facts.services["amq_streams_broker.service"]["status"] == "enabled" diff --git a/roles/amq_streams_broker/templates/server.properties.j2 b/roles/amq_streams_broker/templates/server.properties.j2 index 0fd69b4c..a62da74e 100644 --- a/roles/amq_streams_broker/templates/server.properties.j2 +++ b/roles/amq_streams_broker/templates/server.properties.j2 @@ -20,10 +20,35 @@ # See kafka.server.KafkaConfig for additional details and defaults # +# ----------------------------------------------------------------------------- +# MODE DETERMINATION (Added for Kafka 4.0+ KRaft Support) +# ----------------------------------------------------------------------------- +{% set enable_kraft = amq_streams_enable_kraft | default(amq_streams_common_product_version is version('4.0.0', '>=')) | bool %} + ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. +{% if enable_kraft %} +# KRaft uses node.id instead of broker.id +node.id={{ amq_streams_kraft_node_id | default(1) }} +{% else %} broker.id={{ amq_streams_broker_broker_id | default(amq_streams_broker_inventory_group.index(inventory_hostname)) }} +{% endif %} + +############################# KRaft Settings (Kafka 4.0+) ############################# +{% if enable_kraft %} +# The roles of this process. broker, controller, or both. +process.roles={{ amq_streams_kraft_process_roles | default('broker,controller') }} + +# The connect string for the controller quorum +controller.quorum.voters={{ amq_streams_kraft_controller_quorum_voters }} + +# Listener name used for the controller +controller.listener.names={{ amq_streams_kraft_controller_listener_names | default('CONTROLLER') }} + +# Listener name used for inter-broker communication +inter.broker.listener.name={{ amq_streams_kraft_inter_broker_listener_name | default('PLAINTEXT') }} +{% endif %} ############################# Socket Server Settings ############################# @@ -33,6 +58,11 @@ broker.id={{ amq_streams_broker_broker_id | default(amq_streams_broker_inventory # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 +{% if enable_kraft %} +# KRaft Mode Listeners (Requires Broker + Controller ports) +listeners={{ amq_streams_kraft_listeners | join(",") }} +{% else %} +# Legacy ZK Mode Listeners {% if amq_streams_broker_listeners is defined %} listeners={{ amq_streams_broker_listeners | join(",") }} {% elif amq_streams_broker_listener_port is defined %} @@ -40,20 +70,31 @@ listeners=PLAINTEXT://:{{ amq_streams_broker_listener_port }} {% else %} #listeners=PLAINTEXT://:9092 {% endif %} +{% endif %} -{% if amq_streams_broker_inter_broker_listener is defined %} -# Name of listener used for communication between brokers +{% if amq_streams_broker_inter_broker_listener is defined and not enable_kraft %} +# Name of listener used for communication between brokers (Legacy ZK only) inter.broker.listener.name={{ amq_streams_broker_inter_broker_listener }} {% endif %} # Listener name, hostname and port the broker will advertise to clients. # If not set, it uses the value for "listeners". +{% if enable_kraft %} +# KRaft Mode Advertised Listeners (Broker port only) +advertised.listeners={{ amq_streams_kraft_advertised_listeners | join(",") }} +{% else %} +# Legacy ZK Mode Advertised Listeners {% if amq_streams_broker_advertised_listeners is defined %} advertised.listeners={{ amq_streams_broker_advertised_listeners | join(",") }} {% else %} #advertised.listeners=PLAINTEXT://your.host.name:9092 {% endif %} +{% endif %} +{% if enable_kraft %} +# KRaft Mode Security Map (Must include Controller) +listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL +{% else %} {% if amq_streams_broker_auth_enabled and amq_streams_broker_auth_listeners is defined %} # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details listener.security.protocol.map={{ amq_streams_broker_auth_listeners | join(",") }} @@ -68,6 +109,7 @@ sasl.mechanism.inter.broker.protocol={{ amq_streams_broker_inter_broker_auth_sas # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL {% endif %} +{% endif %} {% if amq_streams_broker_inter_broker_listener_auth is defined %} # Authentication mechanism for the inter-broker listener @@ -105,7 +147,11 @@ socket.request.max.bytes={{ amq_streams_broker_socket_request_max_bytes }} ############################# Log Basics ############################# # A comma separated list of directories under which to store log files +{% if enable_kraft %} +log.dirs={{ amq_streams_kraft_log_dirs }} +{% else %} log.dirs={{ amq_streams_broker_data_dir }} +{% endif %} # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across @@ -162,7 +208,7 @@ log.retention.hours={{ amq_streams_broker_log_retention_hours }} log.retention.check.interval.ms={{ amq_streams_broker_log_retention_check_interval_ms }} ############################# Zookeeper ############################# - +{% if not enable_kraft %} # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". @@ -173,6 +219,7 @@ zookeeper.connect={{ amq_streams_broker_zookeeper_host }}:{{ amq_streams_broker_ # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms={{ amq_streams_broker_zookeeper_connection_timeout_ms }} zookeeper.session.timeout.ms={{ amq_streams_broker_zookeeper_session_timeout_ms }} +{% endif %} ############################# Group Coordinator Settings ############################# diff --git a/roles/amq_streams_kraft/README.md b/roles/amq_streams_kraft/README.md new file mode 100644 index 00000000..0111aade --- /dev/null +++ b/roles/amq_streams_kraft/README.md @@ -0,0 +1,59 @@ +# AMQ Streams KRaft Role + +This role orchestrates the initialization and configuration of Apache Kafka in KRaft (Kafka Raft Metadata) mode, removing the traditional dependency on ZooKeeper. It handles the mandatory bootstrapping process by generating a unique Cluster UUID (if not provided) and executing kafka-storage.sh to format the storage directories with the necessary metadata. Additionally, it prepares the critical configuration parameters required for a ZooKeeper-less environment, ensuring the correct setup of process roles, controller quorums, and the strict separation of broker and controller listeners. + +## Requirements + +* **Role Dependencies**: `amq_streams_common` (must be run first to install the Kafka binaries). +* **Kafka Version**: Designed for Kafka 4.0.0+ or Kafka 3.x with KRaft enabled. + +## Role Variables + +| Variable | Default Value | Description | +| :--- | :--- | :--- | +| `amq_streams_install_dir` | `/opt` | The base directory where AMQ Streams/Kafka is installed. | +| `amq_streams_kafka_home` | `{{ amq_streams_install_dir }}/kafka_{{ amq_streams_common_version }}/` | The absolute path to the Kafka installation home directory. | +| `amq_streams_kraft_config_dir` | `{{ amq_streams_kafka_home }}/config` | The directory containing `server.properties` and other config files. | +| `amq_streams_kraft_data_dir` | `{{ amq_streams_kafka_home }}/data/kraft` | The directory where Kafka stores its KRaft metadata and log data. | +| `amq_streams_cluster_id` | `""` | The UUID of the Kafka cluster. If left empty, the role will auto-generate a random UUID during the first run. | +| `amq_streams_kraft_node_id` | `1` | The unique integer ID for this specific broker/controller node. **Must be unique per host.** | +| `amq_streams_kraft_process_roles` | `"broker,controller"` | Defines the role of this node. Options: `broker`, `controller`, or `broker,controller` (combined). | +| `amq_streams_kraft_controller_quorum_voters` | `1@{{ ansible_host }}:{{ amq_streams_kraft_controller_port }}` | The voter connection string in the format `nodeId@host:port`. Must list all controller nodes in the cluster. | +| `amq_streams_kraft_listener_port` | `9092` | The port used for standard Client (broker) traffic. | +| `amq_streams_kraft_controller_port` | `9093` | The port used for Controller-to-Controller Raft communication. | +| `amq_streams_kraft_listeners` | *(See defaults)* | A list of all listeners to bind. Must include **both** the Controller and Broker listeners (e.g., `PLAINTEXT://...` and `CONTROLLER://...`). | +| `amq_streams_kraft_advertised_listeners` | *(See defaults)* | A list of listeners advertised to clients. Must **ONLY** include Broker listeners (e.g., `PLAINTEXT://...`). | +| `amq_streams_kraft_controller_listener_names` | `"CONTROLLER"` | The listener name used by the controller quorum (must match an entry in `listeners`). | +| `amq_streams_kraft_inter_broker_listener_name` | `"PLAINTEXT"` | The listener name used for replication between brokers. | +| `amq_streams_kraft_log_dirs` | `{{ amq_streams_kraft_data_dir }}` | The comma-separated list of directories for log data. Usually matches the data dir. | +| `amq_streams_kraft_log_retention_hours` | `168` | The number of hours to keep log segments before deletion (Default: 7 days). | +| `amq_streams_kraft_priv_escalation` | `yes` | Controls whether tasks (like creating directories and formatting storage) run with elevated privileges (`become: true`). | + +## Example Playbook + +```yaml +--- +- hosts: kafka_brokers + vars: + amq_streams_common_version: "3.7.0" + amq_streams_enable_kraft: true + + roles: + # Install Java and Kafka Binaries + - role: amq_streams_common + + # Configure and Format Storage for KRaft + - role: amq_streams_kraft + when: amq_streams_enable_kraft | bool + + # Start the Kafka Service + - role: amq_streams_broker +``` + +## License + +Apache License v2.0 or later + +## Author Information + +* [Ranabir Chakraborty](https://github.com/RanabirChakraborty) diff --git a/roles/amq_streams_kraft/defaults/main.yml b/roles/amq_streams_kraft/defaults/main.yml new file mode 100644 index 00000000..2c3bec7b --- /dev/null +++ b/roles/amq_streams_kraft/defaults/main.yml @@ -0,0 +1,21 @@ +--- +amq_streams_install_dir: "/opt" +amq_streams_kafka_home: "{{ amq_streams_install_dir }}/kafka_{{ amq_streams_common_version }}/" +amq_streams_kraft_config_dir: "{{ amq_streams_kafka_home }}/config" +amq_streams_kraft_data_dir: "{{ amq_streams_kafka_home }}/data/kraft" +amq_streams_cluster_id: "" +amq_streams_kraft_node_id: 1 +amq_streams_kraft_listener_port: 9092 +amq_streams_kraft_controller_port: 9093 +amq_streams_kraft_controller_quorum_voters: "1@{{ ansible_host }}:{{ amq_streams_kraft_controller_port }}" +amq_streams_kraft_listeners: + - "PLAINTEXT://0.0.0.0:{{ amq_streams_kraft_listener_port }}" + - "CONTROLLER://0.0.0.0:{{ amq_streams_kraft_controller_port }}" +amq_streams_kraft_advertised_listeners: + - "PLAINTEXT://{{ ansible_host }}:{{ amq_streams_kraft_listener_port }}" +amq_streams_kraft_controller_listener_names: "CONTROLLER" +amq_streams_kraft_inter_broker_listener_name: "PLAINTEXT" +amq_streams_kraft_process_roles: "broker,controller" +amq_streams_kraft_log_dirs: "{{ amq_streams_kraft_data_dir }}" +amq_streams_kraft_log_retention_hours: 168 +amq_streams_kraft_priv_escalation: yes diff --git a/roles/amq_streams_kraft/handlers/main.yml b/roles/amq_streams_kraft/handlers/main.yml new file mode 100644 index 00000000..655d3e35 --- /dev/null +++ b/roles/amq_streams_kraft/handlers/main.yml @@ -0,0 +1,9 @@ +--- +- name: Reload systemd + ansible.builtin.systemd: + daemon_reload: true + +- name: Restart Kafka + ansible.builtin.systemd: + name: "{{ server_name | default('amq_streams_broker') }}" + state: restarted diff --git a/roles/amq_streams_kraft/tasks/configure.yml b/roles/amq_streams_kraft/tasks/configure.yml new file mode 100644 index 00000000..c3ed0063 --- /dev/null +++ b/roles/amq_streams_kraft/tasks/configure.yml @@ -0,0 +1,25 @@ +--- +- name: "Ensure user and group for KRaft exist." + ansible.builtin.include_role: + name: amq_streams_common + tasks_from: prereqs.yml + vars: + prereqs_user: "{{ amq_streams_kraft_user | default('kafka') }}" + prereqs_group: "{{ amq_streams_kraft_group | default('kafka') }}" + +- name: Create KRaft config directory + ansible.builtin.file: + path: "{{ amq_streams_kraft_config_dir }}" + state: directory + owner: "{{ amq_streams_kraft_user | default('kafka') }}" + group: "{{ amq_streams_kraft_group | default('kafka') }}" + mode: '0755' + +- name: Deploy server.properties + ansible.builtin.template: + src: server.properties.j2 + dest: "{{ amq_streams_kraft_config_dir }}/server.properties" + owner: "{{ amq_streams_kraft_user | default('kafka') }}" + group: "{{ amq_streams_kraft_group | default('kafka') }}" + mode: '0644' + notify: Restart Kafka diff --git a/roles/amq_streams_kraft/tasks/main.yml b/roles/amq_streams_kraft/tasks/main.yml new file mode 100644 index 00000000..babc909d --- /dev/null +++ b/roles/amq_streams_kraft/tasks/main.yml @@ -0,0 +1,6 @@ +--- +- name: Configure KRaft node + ansible.builtin.include_tasks: configure.yml + +- name: Format storage if needed + ansible.builtin.include_tasks: storage.yml diff --git a/roles/amq_streams_kraft/tasks/storage.yml b/roles/amq_streams_kraft/tasks/storage.yml new file mode 100644 index 00000000..f3db43e8 --- /dev/null +++ b/roles/amq_streams_kraft/tasks/storage.yml @@ -0,0 +1,40 @@ +--- +- name: Check if KRaft metadata log already formatted + ansible.builtin.stat: + path: "{{ amq_streams_kraft_data_dir }}/meta.properties" + register: meta_file + +- name: Generate cluster ID if not provided and not already formatted + ansible.builtin.command: "{{ amq_streams_kafka_home }}/bin/kafka-storage.sh random-uuid" + register: generated_cluster_id + when: + - amq_streams_cluster_id | length == 0 + - not meta_file.stat.exists + changed_when: false + +- name: Set cluster ID fact + ansible.builtin.set_fact: + amq_streams_cluster_id: "{{ generated_cluster_id.stdout }}" + when: + - amq_streams_cluster_id | length == 0 + - generated_cluster_id.skipped is not defined + - generated_cluster_id.stdout is defined + +- name: Format storage directory if not formatted + ansible.builtin.command: > + {{ amq_streams_kafka_home }}/bin/kafka-storage.sh format + --config {{ amq_streams_kraft_config_dir }}/server.properties + --cluster-id {{ amq_streams_cluster_id }} + become: "{{ amq_streams_kraft_priv_escalation }}" + when: not meta_file.stat.exists + +- name: Enforce ownership on Kafka data directory + ansible.builtin.file: + path: "{{ amq_streams_kraft_data_dir }}" + state: directory + owner: "{{ amq_streams_kraft_user | default('kafka') }}" + group: "{{ amq_streams_kraft_group | default('kafka') }}" + mode: '0750' + recurse: yes + become: "{{ amq_streams_kraft_priv_escalation }}" + when: not meta_file.stat.exists diff --git a/roles/amq_streams_kraft/tasks/validate.yml b/roles/amq_streams_kraft/tasks/validate.yml new file mode 100644 index 00000000..ef38f22d --- /dev/null +++ b/roles/amq_streams_kraft/tasks/validate.yml @@ -0,0 +1,10 @@ +--- +- name: Check kafka service is running + ansible.builtin.systemd: + name: "{{ server_name | default('amq_streams_broker') }}" + register: kafka_service + +- name: Fail if Kafka is not active + ansible.builtin.fail: + msg: "Kafka KRaft service ({{ server_name | default('amq_streams_broker') }}) is not running. State: {{ kafka_service.status.ActiveState }}" + when: kafka_service.status.ActiveState != "active" diff --git a/roles/amq_streams_kraft/templates/kafka.service.j2 b/roles/amq_streams_kraft/templates/kafka.service.j2 new file mode 100644 index 00000000..e3663ef1 --- /dev/null +++ b/roles/amq_streams_kraft/templates/kafka.service.j2 @@ -0,0 +1,15 @@ +[Unit] +Description=Kafka KRaft Node +After=network.target + +[Service] +Type=simple +User="{{ amq_streams_kraft_user | default('kafka') }}" + +ExecStart={{ amq_streams_install_dir }}/current/bin/kafka-server-start.sh \ + {{ amq_streams_kraft_config_dir }}/server.properties + +Restart=on-failure + +[Install] +WantedBy=multi-user.target diff --git a/roles/amq_streams_kraft/templates/server.properties.j2 b/roles/amq_streams_kraft/templates/server.properties.j2 new file mode 100644 index 00000000..099d5fe4 --- /dev/null +++ b/roles/amq_streams_kraft/templates/server.properties.j2 @@ -0,0 +1,29 @@ +# --- KRaft Process Roles --- +# Defines if this node is a Broker, a Controller, or both. +process.roles={{ amq_streams_kraft_process_roles }} +node.id={{ amq_streams_kraft_node_id }} + +# --- Controller Quorum --- +# The list of all voters (controllers) in the cluster. Format: id@host:port +controller.quorum.voters={{ amq_streams_kraft_controller_quorum_voters }} + +# MUST contain BOTH Broker (PLAINTEXT) and Controller ports. +# Example: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 +listeners={{ amq_streams_kraft_listeners | join(',') }} + +# MUST contain ONLY the Broker port. +# Example: PLAINTEXT://192.168.1.5:9092 +advertised.listeners={{ amq_streams_kraft_advertised_listeners | join(',') }} + +# Maps the listener names to security protocols. +listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT + +# Tells the Controller which listener name to use for Raft communication. +controller.listener.names={{ amq_streams_kraft_controller_listener_names }} + +# Tells the Broker which listener name to use for replication. +inter.broker.listener.name={{ amq_streams_kraft_inter_broker_listener_name }} + +# Logs +log.dirs={{ amq_streams_kraft_log_dirs }} +log.retention.hours={{ amq_streams_kraft_log_retention_hours }}