diff --git a/miniupdate/config.py b/miniupdate/config.py index 79b28ba..a9fcf34 100644 --- a/miniupdate/config.py +++ b/miniupdate/config.py @@ -5,10 +5,11 @@ email credentials and inventory paths. """ -import toml +import os from pathlib import Path from typing import Dict, Any, Optional, List -import os + +import toml class Config: @@ -55,7 +56,7 @@ def _load_config(self) -> Dict[str, Any]: with open(self.config_path, "r", encoding="utf-8") as f: return toml.load(f) except Exception as e: - raise ValueError(f"Error parsing configuration file: {e}") + raise ValueError(f"Error parsing configuration file: {e}") from e @property def smtp_config(self) -> Dict[str, Any]: @@ -178,7 +179,8 @@ def create_example_config(path: str = "config.toml.example") -> None: "snapshot_name_prefix": "pre-update", "cleanup_snapshots": True, "snapshot_retention_days": 7, - "opt_out_hosts": [], # List of hosts to exclude from automatic updates (check-only mode) + # List of hosts to exclude from automatic updates (check-only mode) + "opt_out_hosts": [], }, } diff --git a/miniupdate/host_checker.py b/miniupdate/host_checker.py index ff782bb..2e585e0 100644 --- a/miniupdate/host_checker.py +++ b/miniupdate/host_checker.py @@ -4,12 +4,12 @@ Provides utilities to check if hosts are reachable via ping and SSH. """ -import subprocess import logging +import subprocess import time -from typing import Optional -from .ssh_manager import SSHManager + from .inventory import Host +from .ssh_manager import SSHManager logger = logging.getLogger(__name__) @@ -40,7 +40,9 @@ def ping_host(self, hostname: str, timeout: int = 5) -> bool: try: # Use ping command with timeout cmd = ["ping", "-c", "1", "-W", str(timeout), hostname] - result = subprocess.run(cmd, capture_output=True, timeout=timeout + 2) + result = subprocess.run( + cmd, capture_output=True, timeout=timeout + 2, check=False + ) return result.returncode == 0 except (subprocess.TimeoutExpired, subprocess.SubprocessError): return False @@ -65,7 +67,9 @@ def wait_for_host_availability( True if host becomes available, False if timeout """ logger.info( - f"Waiting for {host.name} to become available (timeout: {max_wait_time}s)" + "Waiting for %s to become available (timeout: %ss)", + host.name, + max_wait_time, ) start_time = time.time() @@ -76,34 +80,36 @@ def wait_for_host_availability( elapsed = int(time.time() - start_time) logger.debug( - f"Checking {host.name} availability - attempt {attempt} ({elapsed}s elapsed)" + "Checking %s availability - attempt %s (%ss elapsed)", + host.name, + attempt, + elapsed, ) # First check ping if not self.ping_host(host.hostname): - logger.debug(f"{host.name} not responding to ping") + logger.debug("%s not responding to ping", host.name) time.sleep(check_interval) continue - logger.debug(f"{host.name} responding to ping") + logger.debug("%s responding to ping", host.name) # If SSH check is requested, verify SSH connectivity if use_ssh: if self._check_ssh_connectivity(host): logger.info( - f"{host.name} is available (ping + SSH) after {elapsed}s" + "%s is available (ping + SSH) after %ss", host.name, elapsed ) return True - else: - logger.debug(f"{host.name} ping OK but SSH not ready") + logger.debug("%s ping OK but SSH not ready", host.name) else: - logger.info(f"{host.name} is available (ping only) after {elapsed}s") + logger.info("%s is available (ping only) after %ss", host.name, elapsed) return True time.sleep(check_interval) elapsed = int(time.time() - start_time) - logger.warning(f"{host.name} did not become available within {elapsed}s") + logger.warning("%s did not become available within %ss", host.name, elapsed) return False def _check_ssh_connectivity(self, host: Host) -> bool: @@ -125,7 +131,7 @@ def _check_ssh_connectivity(self, host: Host) -> bool: return exit_code == 0 return False except Exception as e: - logger.debug(f"SSH connectivity check failed for {host.name}: {e}") + logger.debug("SSH connectivity check failed for %s: %s", host.name, e) return False def reboot_host_via_ssh(self, host: Host, timeout: int = 30) -> bool: @@ -143,21 +149,21 @@ def reboot_host_via_ssh(self, host: Host, timeout: int = 30) -> bool: with SSHManager(self.ssh_config) as ssh_manager: connection = ssh_manager.connect_to_host(host, timeout=timeout) if not connection: - logger.error(f"Failed to connect to {host.name} for reboot") + logger.error("Failed to connect to %s for reboot", host.name) return False - logger.info(f"Sending reboot command to {host.name}") + logger.info("Sending reboot command to %s", host.name) # Send reboot command (don't wait for response as connection will drop) - exit_code, stdout, stderr = connection.execute_command( + _exit_code, _stdout, _stderr = connection.execute_command( "shutdown -r now || reboot", timeout=5, # Short timeout as system will reboot ) # Command may not return exit code due to immediate reboot - logger.info(f"Reboot command sent to {host.name}") + logger.info("Reboot command sent to %s", host.name) return True except Exception as e: - logger.error(f"Failed to reboot {host.name} via SSH: {e}") + logger.error("Failed to reboot %s via SSH: %s", host.name, e) return False diff --git a/miniupdate/inventory.py b/miniupdate/inventory.py index 2d7256e..d1e42d7 100644 --- a/miniupdate/inventory.py +++ b/miniupdate/inventory.py @@ -4,11 +4,11 @@ Supports parsing YAML and INI format Ansible inventory files. """ -import yaml +import logging from pathlib import Path from typing import List, Dict, Any, Optional -import configparser -import logging + +import yaml logger = logging.getLogger(__name__) @@ -47,18 +47,17 @@ def parse(self) -> List[Host]: """Parse inventory file and return list of hosts.""" if self.inventory_path.suffix.lower() in [".yml", ".yaml"]: return self._parse_yaml() - elif self.inventory_path.suffix.lower() in [ + if self.inventory_path.suffix.lower() in [ ".ini", ".cfg", "", ] or self.inventory_path.name in ["hosts", "inventory"]: return self._parse_ini() - else: - # Try YAML first, then INI - try: - return self._parse_yaml() - except Exception: - return self._parse_ini() + # Try YAML first, then INI + try: + return self._parse_yaml() + except Exception: + return self._parse_ini() def _parse_yaml(self) -> List[Host]: """Parse YAML format inventory.""" @@ -66,7 +65,7 @@ def _parse_yaml(self) -> List[Host]: with open(self.inventory_path, "r", encoding="utf-8") as f: inventory = yaml.safe_load(f) except Exception as e: - raise ValueError(f"Error parsing YAML inventory: {e}") + raise ValueError(f"Error parsing YAML inventory: {e}") from e hosts = [] @@ -80,7 +79,7 @@ def _parse_yaml(self) -> List[Host]: if "hosts" in all_section: hosts.extend(self._parse_yaml_hosts(all_section["hosts"])) if "children" in all_section: - for group_name, group_data in all_section["children"].items(): + for _group_name, group_data in all_section["children"].items(): if "hosts" in group_data: hosts.extend(self._parse_yaml_hosts(group_data["hosts"])) else: @@ -122,7 +121,7 @@ def _parse_ini(self) -> List[Host]: with open(self.inventory_path, "r", encoding="utf-8") as f: content = f.read() except Exception as e: - raise ValueError(f"Error reading INI inventory: {e}") + raise ValueError(f"Error reading INI inventory: {e}") from e # Split into lines and process lines = content.split("\n") diff --git a/miniupdate/os_detector.py b/miniupdate/os_detector.py index d1b5663..f5df2b4 100644 --- a/miniupdate/os_detector.py +++ b/miniupdate/os_detector.py @@ -4,9 +4,9 @@ Detects the operating system and distribution of remote hosts. """ -import re import logging -from typing import Optional, Dict, Any, Tuple +from typing import Optional, Dict, Tuple + from .ssh_manager import SSHConnection @@ -112,16 +112,16 @@ def detect_os(self) -> Optional[OSInfo]: architecture=architecture, ) - logger.info(f"Detected OS on {self.connection.host.name}: {os_info}") + logger.info("Detected OS on %s: %s", self.connection.host.name, os_info) return os_info except Exception as e: - logger.error(f"Failed to detect OS on {self.connection.host.name}: {e}") + logger.error("Failed to detect OS on %s: %s", self.connection.host.name, e) return None def _get_uname_info(self) -> Dict[str, str]: """Get uname information.""" - exit_code, stdout, stderr = self.connection.execute_command("uname -a") + exit_code, stdout, _stderr = self.connection.execute_command("uname -a") if exit_code != 0: return {} @@ -139,7 +139,7 @@ def _get_uname_info(self) -> Dict[str, str]: def _get_os_release_info(self) -> Dict[str, str]: """Get information from /etc/os-release.""" - exit_code, stdout, stderr = self.connection.execute_command( + exit_code, stdout, _stderr = self.connection.execute_command( "cat /etc/os-release 2>/dev/null || true" ) if exit_code != 0 or not stdout.strip(): @@ -157,7 +157,7 @@ def _get_os_release_info(self) -> Dict[str, str]: def _get_lsb_info(self) -> Dict[str, str]: """Get LSB information.""" - exit_code, stdout, stderr = self.connection.execute_command( + exit_code, stdout, _stderr = self.connection.execute_command( "lsb_release -a 2>/dev/null || true" ) if exit_code != 0 or not stdout.strip(): @@ -237,33 +237,33 @@ def _normalize_distribution_name(self, distribution: str) -> str: # Handle common variations if "red hat" in distribution or "redhat" in distribution: return "rhel" - elif "centos" in distribution: + if "centos" in distribution: return "centos" - elif "ubuntu" in distribution: + if "ubuntu" in distribution: return "ubuntu" - elif ( + if ( "linuxmint" in distribution or "linux mint" in distribution or distribution == "mint" ): return "linuxmint" - elif "debian" in distribution: + if "debian" in distribution: return "debian" - elif "fedora" in distribution: + if "fedora" in distribution: return "fedora" - elif "opensuse" in distribution or "suse" in distribution: + if "opensuse" in distribution or "suse" in distribution: return "opensuse" - elif "arch" in distribution: + if "arch" in distribution: return "arch" - elif "manjaro" in distribution: + if "manjaro" in distribution: return "manjaro" - elif "alpine" in distribution: + if "alpine" in distribution: return "alpine" - elif "freebsd" in distribution: + if "freebsd" in distribution: return "freebsd" - elif "openbsd" in distribution: + if "openbsd" in distribution: return "openbsd" - elif "darwin" in distribution or "macos" in distribution: + if "darwin" in distribution or "macos" in distribution: return "macos" return distribution @@ -278,7 +278,7 @@ def _detect_package_manager(self, distribution: str) -> str: return default_pm # Fallback: check for available package managers - for pm_name, commands in self.PACKAGE_MANAGERS.items(): + for pm_name, _commands in self.PACKAGE_MANAGERS.items(): if self._check_package_manager_exists(pm_name): return pm_name @@ -303,13 +303,12 @@ def _get_architecture(self, uname_info: Dict[str, str]) -> str: # Normalize common architectures if arch in ["x86_64", "amd64"]: return "x86_64" - elif arch in ["i386", "i686"]: + if arch in ["i386", "i686"]: return "i386" - elif arch.startswith("arm"): + if arch.startswith("arm"): return "arm" - elif arch.startswith("aarch64"): + if arch.startswith("aarch64"): return "arm64" - else: - return arch + return arch return "unknown" diff --git a/miniupdate/proxmox_client.py b/miniupdate/proxmox_client.py index 765aae6..528f062 100644 --- a/miniupdate/proxmox_client.py +++ b/miniupdate/proxmox_client.py @@ -4,11 +4,11 @@ Handles VM snapshots and management via Proxmox VE API. """ -import requests import logging import time from typing import Dict, Any, Optional, List -from urllib.parse import urljoin + +import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry @@ -18,8 +18,6 @@ class ProxmoxAPIError(Exception): """Exception for Proxmox API errors.""" - pass - class ProxmoxClient: """Proxmox VE API client for VM management.""" @@ -85,16 +83,17 @@ def authenticate(self) -> bool: self.session.headers.update({"CSRFPreventionToken": self.csrf_token}) self.session.cookies.set("PVEAuthCookie", self.ticket) - logger.info(f"Successfully authenticated to Proxmox at {self.endpoint}") - return True - else: - logger.error( - f"Authentication failed: {response.status_code} - {response.text}" + logger.info( + "Successfully authenticated to Proxmox at %s", self.endpoint ) - return False + return True + logger.error( + "Authentication failed: %s - %s", response.status_code, response.text + ) + return False except Exception as e: - logger.error(f"Authentication error: {e}") + logger.error("Authentication error: %s", e) return False def _api_request( @@ -139,7 +138,7 @@ def _api_request( return response.json() except requests.RequestException as e: - raise ProxmoxAPIError(f"Request failed: {e}") + raise ProxmoxAPIError(f"Request failed: {e}") from e def get_vm_status(self, node: str, vmid: int) -> Dict[str, Any]: """Get VM status.""" @@ -161,7 +160,8 @@ def create_snapshot( vmid: VM ID snapname: Snapshot name description: Snapshot description - include_ram: Whether to include RAM state in snapshot (default: False for faster, more reliable snapshots) + include_ram: Whether to include RAM state in snapshot + (default: False for faster, more reliable snapshots) """ path = f"/nodes/{node}/qemu/{vmid}/snapshot" data = { @@ -175,7 +175,11 @@ def create_snapshot( snapshot_type = "with RAM" if include_ram else "without RAM" logger.info( - f"Creating snapshot '{snapname}' ({snapshot_type}) for VM {vmid} on node {node}" + "Creating snapshot '%s' (%s) for VM %s on node %s", + snapname, + snapshot_type, + vmid, + node, ) return self._api_request("POST", path, data) @@ -183,7 +187,7 @@ def delete_snapshot(self, node: str, vmid: int, snapname: str) -> Dict[str, Any] """Delete VM snapshot.""" path = f"/nodes/{node}/qemu/{vmid}/snapshot/{snapname}" - logger.info(f"Deleting snapshot '{snapname}' for VM {vmid} on node {node}") + logger.info("Deleting snapshot '%s' for VM %s on node %s", snapname, vmid, node) return self._api_request("DELETE", path) def rollback_snapshot(self, node: str, vmid: int, snapname: str) -> Dict[str, Any]: @@ -191,7 +195,7 @@ def rollback_snapshot(self, node: str, vmid: int, snapname: str) -> Dict[str, An path = f"/nodes/{node}/qemu/{vmid}/snapshot/{snapname}/rollback" logger.warning( - f"Rolling back VM {vmid} on node {node} to snapshot '{snapname}'" + "Rolling back VM %s on node %s to snapshot '%s'", vmid, node, snapname ) return self._api_request("POST", path) @@ -215,20 +219,19 @@ def wait_for_task(self, node: str, upid: str, timeout: int = 300) -> bool: if status == "stopped": exitstatus = task_data.get("exitstatus") if exitstatus == "OK": - logger.info(f"Task {upid} completed successfully") + logger.info("Task %s completed successfully", upid) return True - else: - logger.error(f"Task {upid} failed with status: {exitstatus}") - return False + logger.error("Task %s failed with status: %s", upid, exitstatus) + return False # Task still running time.sleep(2) except Exception as e: - logger.warning(f"Error checking task status: {e}") + logger.warning("Error checking task status: %s", e) time.sleep(2) - logger.error(f"Task {upid} timed out after {timeout} seconds") + logger.error("Task %s timed out after %s seconds", upid, timeout) return False def start_vm(self, node: str, vmid: int, timeout: int = 30) -> bool: @@ -236,7 +239,7 @@ def start_vm(self, node: str, vmid: int, timeout: int = 30) -> bool: path = f"/nodes/{node}/qemu/{vmid}/status/start" try: - logger.info(f"Starting VM {vmid} on node {node}") + logger.info("Starting VM %s on node %s", vmid, node) response = self._api_request("POST", path) # If response contains UPID (task ID), wait for it to complete @@ -247,7 +250,7 @@ def start_vm(self, node: str, vmid: int, timeout: int = 30) -> bool: return True except Exception as e: - logger.error(f"Failed to start VM {vmid}: {e}") + logger.error("Failed to start VM %s: %s", vmid, e) return False def reboot_vm(self, node: str, vmid: int, timeout: int = 30) -> bool: @@ -255,7 +258,7 @@ def reboot_vm(self, node: str, vmid: int, timeout: int = 30) -> bool: path = f"/nodes/{node}/qemu/{vmid}/status/reboot" try: - logger.info(f"Rebooting VM {vmid} on node {node}") + logger.info("Rebooting VM %s on node %s", vmid, node) response = self._api_request("POST", path) # If response contains UPID (task ID), wait for it to complete @@ -266,5 +269,5 @@ def reboot_vm(self, node: str, vmid: int, timeout: int = 30) -> bool: return True except Exception as e: - logger.error(f"Failed to reboot VM {vmid}: {e}") + logger.error("Failed to reboot VM %s: %s", vmid, e) return False diff --git a/miniupdate/ssh_manager.py b/miniupdate/ssh_manager.py index d676279..0f3e8bb 100644 --- a/miniupdate/ssh_manager.py +++ b/miniupdate/ssh_manager.py @@ -4,12 +4,14 @@ Handles SSH connections to remote hosts and command execution. """ -import paramiko -import socket import logging -from typing import Optional, Tuple, Dict, Any -from pathlib import Path import os +import socket +from pathlib import Path +from typing import Optional, Tuple, Dict, Any + +import paramiko + from .inventory import Host @@ -76,11 +78,14 @@ def connect( connect_kwargs["password"] = password logger.debug( - f"Connecting to {self.host.hostname}:{self.host.port} as {final_username}" + "Connecting to %s:%s as %s", + self.host.hostname, + self.host.port, + final_username, ) self.client.connect(**connect_kwargs) self.connected = True - logger.info(f"Successfully connected to {self.host.name}") + logger.info("Successfully connected to %s", self.host.name) return True except ( @@ -89,7 +94,7 @@ def connect( socket.error, Exception, ) as e: - logger.error(f"Failed to connect to {self.host.name}: {e}") + logger.error("Failed to connect to %s: %s", self.host.name, e) self.connected = False return False @@ -108,18 +113,18 @@ def execute_command(self, command: str, timeout: int = 60) -> Tuple[int, str, st raise RuntimeError(f"Not connected to {self.host.name}") try: - logger.debug(f"Executing command on {self.host.name}: {command}") - stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout) + logger.debug("Executing command on %s: %s", self.host.name, command) + _stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout) exit_code = stdout.channel.recv_exit_status() stdout_data = stdout.read().decode("utf-8", errors="replace") stderr_data = stderr.read().decode("utf-8", errors="replace") - logger.debug(f"Command finished with exit code {exit_code}") + logger.debug("Command finished with exit code %s", exit_code) return exit_code, stdout_data, stderr_data except Exception as e: - logger.error(f"Error executing command on {self.host.name}: {e}") + logger.error("Error executing command on %s: %s", self.host.name, e) return -1, "", str(e) def disconnect(self): @@ -127,9 +132,9 @@ def disconnect(self): if self.client: try: self.client.close() - logger.debug(f"Disconnected from {self.host.name}") + logger.debug("Disconnected from %s", self.host.name) except Exception as e: - logger.warning(f"Error disconnecting from {self.host.name}: {e}") + logger.warning("Error disconnecting from %s: %s", self.host.name, e) finally: self.client = None self.connected = False @@ -187,7 +192,7 @@ def connect_to_hosts(self, hosts: list, **kwargs) -> Dict[str, SSHConnection]: if connection: successful_connections[host.name] = connection - logger.info(f"Connected to {len(successful_connections)}/{len(hosts)} hosts") + logger.info("Connected to %s/%s hosts", len(successful_connections), len(hosts)) return successful_connections def execute_on_host( @@ -228,7 +233,7 @@ def execute_on_all_hosts( try: results[host_name] = connection.execute_command(command, **kwargs) except Exception as e: - logger.error(f"Error executing command on {host_name}: {e}") + logger.error("Error executing command on %s: %s", host_name, e) results[host_name] = (-1, "", str(e)) return results diff --git a/miniupdate/vm_mapping.py b/miniupdate/vm_mapping.py index 4a16001..a0abc8a 100644 --- a/miniupdate/vm_mapping.py +++ b/miniupdate/vm_mapping.py @@ -4,11 +4,13 @@ Maps Ansible inventory hosts to Proxmox VM IDs and nodes. """ -import toml import logging +import os +import sys from pathlib import Path from typing import Dict, Any, Optional, NamedTuple -import os + +import toml logger = logging.getLogger(__name__) @@ -59,8 +61,8 @@ def _load_mappings(self) -> Dict[str, VMMapping]: if not self.mapping_path.exists(): logger.warning( - f"VM mapping file not found at {self.mapping_path}. " - f"VM operations will be disabled." + "VM mapping file not found at %s. VM operations will be disabled.", + self.mapping_path, ) return mappings @@ -71,7 +73,7 @@ def _load_mappings(self) -> Dict[str, VMMapping]: vms = config.get("vms", {}) for host_name, vm_info in vms.items(): if not isinstance(vm_info, dict): - logger.warning(f"Invalid VM mapping for {host_name}: {vm_info}") + logger.warning("Invalid VM mapping for %s: %s", host_name, vm_info) continue node = vm_info.get("node") @@ -80,15 +82,17 @@ def _load_mappings(self) -> Dict[str, VMMapping]: if not node or not vmid: logger.warning( - f"Incomplete VM mapping for {host_name}: " - f"missing node ({node}) or vmid ({vmid})" + "Incomplete VM mapping for %s: missing node (%s) or vmid (%s)", + host_name, + node, + vmid, ) continue try: vmid = int(vmid) except ValueError: - logger.warning(f"Invalid vmid for {host_name}: {vmid}") + logger.warning("Invalid vmid for %s: %s", host_name, vmid) continue # Validate max_snapshots if provided @@ -97,12 +101,12 @@ def _load_mappings(self) -> Dict[str, VMMapping]: max_snapshots = int(max_snapshots) if max_snapshots < 0: logger.warning( - f"Invalid max_snapshots for {host_name}: must be >= 0" + "Invalid max_snapshots for %s: must be >= 0", host_name ) max_snapshots = None except ValueError: logger.warning( - f"Invalid max_snapshots for {host_name}: {max_snapshots}" + "Invalid max_snapshots for %s: %s", host_name, max_snapshots ) max_snapshots = None @@ -113,7 +117,7 @@ def _load_mappings(self) -> Dict[str, VMMapping]: max_snapshots=max_snapshots, ) - logger.info(f"Loaded VM mappings for {len(mappings)} hosts") + logger.info("Loaded VM mappings for %s hosts", len(mappings)) # logger.info(f"All loaded mappings: \n{str(mappings)}") # input("Press enter") @@ -121,9 +125,8 @@ def _load_mappings(self) -> Dict[str, VMMapping]: return mappings except Exception as e: - logger.error(f"Failed to load VM mappings from {self.mapping_path}: {e}") - exit(1) - return mappings + logger.error("Failed to load VM mappings from %s: %s", self.mapping_path, e) + sys.exit(1) def get_vm_info(self, host_name: str) -> Optional[VMMapping]: """Get VM mapping for a host.""" @@ -158,7 +161,8 @@ def create_example_vm_mapping(path: str = "vm_mapping.toml.example") -> None: f.write("# VM Mapping Configuration for miniupdate\n") f.write("# Maps Ansible inventory host names to Proxmox VM IDs and nodes\n") f.write( - "# Optional: Set max_snapshots per VM to limit snapshot count for capacity-limited storage\n\n" + "# Optional: Set max_snapshots per VM to limit snapshot count " + "for capacity-limited storage\n\n" ) toml.dump(example_config, f) diff --git a/setup.py b/setup.py index a09e0e2..9c1dcd5 100644 --- a/setup.py +++ b/setup.py @@ -1,8 +1,9 @@ """Setup script for miniupdate.""" -from setuptools import setup, find_packages from pathlib import Path +from setuptools import setup, find_packages + # Read README readme_path = Path(__file__).parent / "README.md" if readme_path.exists():