diff --git a/README.md b/README.md index e3c02cd..18d8697 100644 --- a/README.md +++ b/README.md @@ -120,6 +120,18 @@ A number between 1 and 100. If set, this is the percentage of connectors that mu By default, **any** failures will cause the healthcheck to fail. +#### Failure Less Tasks +If `True`, it will validate if the number of running tasks is less them tasks.max for the healthcheck to fail. + +| Usage | Value | +|-----------------------|---------------------------------------------| +| Environment Variable | `HEALTHCHECK_FAILURE_LESS_TASKS` | +| Command-Line Argument | `--failure-less-tasks` | +| Default Value | `True` | +| Valid Values | `True` or `False` | + +By default, **any** failures will cause the healthcheck to fail. + #### Log Level The level of logs to be shown by the application. diff --git a/kafka_connect_healthcheck/health.py b/kafka_connect_healthcheck/health.py index 57e12f6..637f576 100644 --- a/kafka_connect_healthcheck/health.py +++ b/kafka_connect_healthcheck/health.py @@ -22,12 +22,13 @@ class Health: - def __init__(self, connect_url, worker_id, unhealthy_states, auth, failure_threshold_percentage, considered_containers): + def __init__(self, connect_url, worker_id, unhealthy_states, auth, failure_threshold_percentage, considered_containers, failure_less_tasks): self.connect_url = connect_url self.worker_id = worker_id self.unhealthy_states = [x.upper().strip() for x in unhealthy_states] self.failure_threshold = failure_threshold_percentage * .01 self.considered_containers = [x.lower().strip() for x in considered_containers] + self.failure_less_tasks = failure_less_tasks self.kwargs = {} if auth and ":" in auth: self.kwargs["auth"] = tuple(auth.split(":")) @@ -108,6 +109,21 @@ def handle_broker_healthcheck(self, health_result, connector_name): def handle_task_healthcheck(self, connector, health_result): if "task" in self.considered_containers: + if self.failure_less_tasks is True: + logging.debug("Connector '{}' state: {} tasks running: '{}' desired: '{}'".format( + connector["name"], connector["state"], len(connector["tasks"]), connector["tasks_max"] + )) + if self.has_less_tasks(len(connector["tasks"]), connector["tasks_max"]): + logging.warning("Connector '{}' is healthy in state: {} but has less tasks ({}) than configured ({}). Assuming unhealthy.".format( + connector["name"], connector["state"], len(connector["tasks"]), connector["tasks_max"] + )) + health_result["failures"].append({ + "type": "task", + "connector": connector["name"], + "state": connector["state"], + "tasks_count": len(connector["tasks"]), + "tasks_desired": connector["tasks_max"] + }) for task in connector["tasks"]: if self.is_on_this_worker(task["worker_id"]): if self.is_in_unhealthy_state(task["state"]): @@ -135,12 +151,15 @@ def get_connectors_health(self, connector_names): def get_connector_health(self, connector_name): connector_status = self.get_connector_status(connector_name) + connector_details = self.get_connector_details(connector_name) + connector_tasks_max = connector_details['config']['tasks.max'] connector_state = connector_status["connector"]["state"].upper() connector_worker = connector_status["connector"]["worker_id"] return { "name": connector_name, "state": connector_state, "worker_id": connector_worker, + "tasks_max": connector_tasks_max, "tasks": connector_status["tasks"] } @@ -166,6 +185,9 @@ def is_in_unhealthy_state(self, state): def is_on_this_worker(self, response_worker_id): return response_worker_id.lower() == self.worker_id.lower() if self.worker_id is not None else True + def has_less_tasks(self, tasks_count, tasks_max): + return True if int(tasks_count) < int(tasks_max) else False + def log_initialization_values(self): logging.info("Server will report unhealthy for states: '{}'".format(", ".join(self.unhealthy_states))) logging.info("Server will healthcheck against Kafka Connect at: {}".format(self.connect_url)) diff --git a/kafka_connect_healthcheck/main.py b/kafka_connect_healthcheck/main.py index a69280a..8d4fb4d 100644 --- a/kafka_connect_healthcheck/main.py +++ b/kafka_connect_healthcheck/main.py @@ -37,7 +37,7 @@ def main(): server_class = HTTPServer health_object = health.Health(args.connect_url, args.connect_worker_id, args.unhealthy_states.split(","), - args.basic_auth, args.failure_threshold_percentage, args.considered_containers.split(",")) + args.basic_auth, args.failure_threshold_percentage, args.considered_containers.split(","), args.failure_less_tasks) handler = partial(RequestHandler, health_object) httpd = server_class(("0.0.0.0", args.healthcheck_port), handler) logging.info("Healthcheck server started at: http://localhost:{}".format(args.healthcheck_port)) diff --git a/kafka_connect_healthcheck/parser.py b/kafka_connect_healthcheck/parser.py index 4e28b9c..9cf3478 100644 --- a/kafka_connect_healthcheck/parser.py +++ b/kafka_connect_healthcheck/parser.py @@ -67,6 +67,13 @@ def get_parser(): help="A number between 1 and 100. If set, this is the percentage of connectors that must fail for the healthcheck to fail." ) + parser.add_argument("--failure-less-tasks", + default=os.environ.get("HEALTHCHECK_FAILURE_LESS_TASKS", True), + dest="failure_less_tasks", + action='store_true', + help="If set True, will validate if the number of tasks running was less them tasks.max for the healthcheck to fail. Default: True" + ) + parser.add_argument("--basic-auth", default=os.environ.get("HEALTHCHECK_BASIC_AUTH", ""), dest="basic_auth",