diff --git a/Cargo.lock b/Cargo.lock index b23f9de3..4303a9a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2249,6 +2249,7 @@ dependencies = [ "anyhow", "base64", "bon", + "chrono", "clap", "dirs", "dstack-kms-rpc", @@ -2272,6 +2273,7 @@ dependencies = [ "rocket-vsock-listener", "safe-write", "serde", + "serde-duration", "serde-human-bytes", "serde_json", "sha2", diff --git a/certbot/cli/src/main.rs b/certbot/cli/src/main.rs index de44ef0c..11d7a017 100644 --- a/certbot/cli/src/main.rs +++ b/certbot/cli/src/main.rs @@ -121,7 +121,7 @@ fn load_config(config: &PathBuf) -> Result { let renew_timeout = Duration::from_secs(config.renew_timeout); let bot_config = CertBotConfig::builder() .acme_url(config.acme_url) - .cert_dir(workdir.backup_dir()) + .cert_dir(workdir.cert_backup_dir()) .cert_file(workdir.cert_path()) .key_file(workdir.key_path()) .auto_create_account(true) diff --git a/certbot/src/workdir.rs b/certbot/src/workdir.rs index 95dff248..4ca4cd5d 100644 --- a/certbot/src/workdir.rs +++ b/certbot/src/workdir.rs @@ -27,24 +27,24 @@ impl WorkDir { self.workdir.join("credentials.json") } - pub fn backup_dir(&self) -> PathBuf { + pub fn cert_backup_dir(&self) -> PathBuf { self.workdir.join("backup") } - pub fn live_dir(&self) -> PathBuf { + pub fn cert_live_dir(&self) -> PathBuf { self.workdir.join("live") } pub fn cert_path(&self) -> PathBuf { - self.live_dir().join("cert.pem") + self.cert_live_dir().join("cert.pem") } pub fn key_path(&self) -> PathBuf { - self.live_dir().join("key.pem") + self.cert_live_dir().join("key.pem") } pub fn list_certs(&self) -> Result> { - crate::bot::list_certs(self.backup_dir()) + crate::bot::list_certs(self.cert_backup_dir()) } pub fn acme_account_uri(&self) -> Result { @@ -58,6 +58,6 @@ impl WorkDir { } pub fn list_cert_public_keys(&self) -> Result>> { - crate::bot::list_cert_public_keys(self.backup_dir()) + crate::bot::list_cert_public_keys(self.cert_backup_dir()) } } diff --git a/gateway/src/config.rs b/gateway/src/config.rs index 07f1c432..acfee181 100644 --- a/gateway/src/config.rs +++ b/gateway/src/config.rs @@ -210,7 +210,7 @@ impl CertbotConfig { let workdir = certbot::WorkDir::new(&self.workdir); certbot::CertBotConfig::builder() .auto_create_account(true) - .cert_dir(workdir.backup_dir()) + .cert_dir(workdir.cert_backup_dir()) .cert_file(workdir.cert_path()) .key_file(workdir.key_path()) .credentials_file(workdir.account_credentials_path()) diff --git a/guest-agent/Cargo.toml b/guest-agent/Cargo.toml index b987bce4..4b93dbb3 100644 --- a/guest-agent/Cargo.toml +++ b/guest-agent/Cargo.toml @@ -15,7 +15,7 @@ fs-err.workspace = true rcgen.workspace = true sha2.workspace = true clap.workspace = true -tokio.workspace = true +tokio = { workspace = true, features = ["full"] } hex.workspace = true serde_json.workspace = true bollard.workspace = true diff --git a/guest-agent/src/guest_api_service.rs b/guest-agent/src/guest_api_service.rs index 1cbfeca3..83a08c59 100644 --- a/guest-agent/src/guest_api_service.rs +++ b/guest-agent/src/guest_api_service.rs @@ -1,6 +1,6 @@ use std::fmt::Debug; -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use bollard::{container::ListContainersOptions, Docker}; use cmd_lib::{run_cmd as cmd, run_fun}; use dstack_guest_agent_rpc::worker_server::WorkerRpc as _; @@ -18,6 +18,8 @@ use tracing::error; use crate::{rpc_service::ExternalRpcHandler, AppState}; +const BACKUP_LOCK_FILE: &str = "/run/dstack-backup.lock"; + pub struct GuestApiHandler { state: AppState, } @@ -43,6 +45,7 @@ impl GuestApiRpc for GuestApiHandler { device_id: info.device_id, app_cert: info.app_cert, tcb_info: info.tcb_info, + backup_in_progress: fs::metadata(BACKUP_LOCK_FILE).is_ok(), }) } @@ -112,6 +115,53 @@ impl GuestApiRpc for GuestApiHandler { async fn list_containers(self) -> Result { list_containers().await } + + async fn pre_backup(self) -> Result<()> { + fs::OpenOptions::new() + .create_new(true) + .write(true) + .open(BACKUP_LOCK_FILE) + .context("Failed to create backup lock file, there is another backup in progress")?; + // Run /dstack/hooks/pre-backup if it exists + let pre_backup_hook = "/dstack/hooks/pre-backup"; + if is_exe(pre_backup_hook) { + let status = tokio::process::Command::new(pre_backup_hook) + .spawn() + .context("Failed to run pre-backup hook")? + .wait() + .await + .context("Failed to run pre-backup hook")?; + if !status.success() { + bail!("Failed to run pre-backup hook"); + } + } + Ok(()) + } + + async fn post_backup(self) -> Result<()> { + fs::remove_file(BACKUP_LOCK_FILE).context("Failed to remove backup lock file")?; + let post_backup_hook = "/dstack/hooks/post-backup"; + if is_exe(post_backup_hook) { + let status = tokio::process::Command::new(post_backup_hook) + .spawn() + .context("Failed to run post-backup hook")? + .wait() + .await + .context("Failed to run post-backup hook")?; + if !status.success() { + bail!("Failed to run post-backup hook"); + } + } + Ok(()) + } +} + +fn is_exe(path: &str) -> bool { + use std::os::unix::fs::PermissionsExt; + let Ok(metadata) = fs::metadata(path) else { + return false; + }; + metadata.is_file() && metadata.permissions().mode() & 0o111 != 0 } pub(crate) async fn list_containers() -> Result { diff --git a/guest-api/proto/guest_api.proto b/guest-api/proto/guest_api.proto index 0e1d7898..f7f6b5ab 100644 --- a/guest-api/proto/guest_api.proto +++ b/guest-api/proto/guest_api.proto @@ -22,6 +22,8 @@ message GuestInfo { string tcb_info = 5; // Device ID bytes device_id = 6; + // true if backup is in progress + bool backup_in_progress = 7; } message IpAddress { @@ -123,6 +125,8 @@ service GuestApi { rpc NetworkInfo(google.protobuf.Empty) returns (NetworkInformation); rpc ListContainers(google.protobuf.Empty) returns (ListContainersResponse); rpc Shutdown(google.protobuf.Empty) returns (google.protobuf.Empty); + rpc PreBackup(google.protobuf.Empty) returns (google.protobuf.Empty); + rpc PostBackup(google.protobuf.Empty) returns (google.protobuf.Empty); } service ProxiedGuestApi { @@ -131,4 +135,6 @@ service ProxiedGuestApi { rpc NetworkInfo(Id) returns (NetworkInformation); rpc ListContainers(Id) returns (ListContainersResponse); rpc Shutdown(Id) returns (google.protobuf.Empty); + rpc PreBackup(Id) returns (google.protobuf.Empty); + rpc PostBackup(Id) returns (google.protobuf.Empty); } diff --git a/scripts/dstack-backup.py b/scripts/dstack-backup.py new file mode 100755 index 00000000..fb45e050 --- /dev/null +++ b/scripts/dstack-backup.py @@ -0,0 +1,595 @@ +#!/usr/bin/env python3 +""" +dstack-backup.py - Periodic backup script for dstack VMM +This script ensures each running VM gets: +- Full backup once a week +- Incremental backup once a day + +Prerequisites: +- Install https://github.com/kvinwang/qmpbackup +- Enable qmp socket in dstack-vmm config + +Usage: + ./dstack-backup.py [options] + +Arguments: + --vmm-work-dir DIR dstack-vmm work directory [default: .] + --vms-dir DIR Directory containing VM run data [default: /run/vm] + --backup-dir DIR Directory for storing backups [default: /run/backup] + --log-file FILE File for storing logs [default: /logs/backup.log] + --state-file FILE File for storing backup state [default: /state/backup_state.json] + --full-interval PERIOD Interval for full backups (e.g., 7d for 7 days) [default: 7d] + --inc-interval PERIOD Interval for incremental backups (e.g., 1d for 1 day) [default: 1d] + --log-level LEVEL Set the logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) [default: INFO] +""" + +import sys +import os +import time +import json +import logging +import argparse +import subprocess +import shutil +import tarfile +from datetime import datetime, timedelta +from pathlib import Path +from typing import Dict, List, Optional, Tuple +from logging.handlers import RotatingFileHandler + +# Set up logger +logger = logging.getLogger(__name__) + + +def parse_version(image_name: str) -> Tuple[int, int, int]: + """Parse the version from the image name""" + version = image_name.split("-")[-1] + return tuple(map(int, version.split("."))) + + +class BackupScheduler: + """Scheduler for VM backups""" + + def __init__(self, vms_dir, backup_dir, state_file, full_interval_seconds, inc_interval_seconds, max_backups=4, vm_filter=None): + self.vms_dir = vms_dir + self.backup_dir = backup_dir + self.state_file = state_file + self.full_interval_seconds = full_interval_seconds + self.inc_interval_seconds = inc_interval_seconds + self.max_backups = max_backups + self.vm_filter = vm_filter + self.state = self._load_state() + + def _load_state(self) -> Dict: + """Load backup state from JSON file""" + if not self.state_file.exists(): + return {} + + try: + with open(self.state_file, 'r') as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + logger.warning( + "Failed to load state file, starting with empty state") + return {} + + def _save_state(self): + """Save backup state to JSON file""" + with open(self.state_file, 'w') as f: + json.dump(self.state, f, indent=2) + + def get_running_vms(self, vm_filter: Optional[str] = None) -> List[Dict[str, str]]: + """Returns a list of running VMs with their IDs and names""" + logger.info("Getting list of running VMs...") + vms = [] + + if not self.vms_dir.exists(): + logger.warning(f"VMs directory {self.vms_dir} does not exist") + return vms + + # Iterate through VM directories + for vm_dir in self.vms_dir.iterdir(): + if not vm_dir.is_dir(): + logger.debug(f"Skipping non-directory {vm_dir}") + continue + + vm_id = vm_dir.name + if vm_filter and vm_filter.strip() not in vm_id: + continue + pid_file = vm_dir / "qemu.pid" + + if not pid_file.exists(): + logger.debug(f"No PID file found for VM {vm_id}") + continue + + try: + pid = int(pid_file.read_text().strip()) + os.kill(pid, 0) + except (ValueError, ProcessLookupError): + logger.debug(f"No running process found for VM {vm_id}") + continue + except OSError as e: + logger.debug(f"Failed to check process for VM {vm_id}: {e}") + if e.errno == 1: # Operation not permitted + pass + else: + continue + + manifest_file = vm_dir / "vm-manifest.json" + if not manifest_file.exists(): + logger.debug(f"No manifest file found for VM {vm_id}") + continue + + qmp_socket = vm_dir / "qmp.sock" + if not qmp_socket.exists(): + logger.debug(f"No QMP socket found for VM {vm_id}") + continue + + try: + with open(manifest_file, 'r') as f: + manifest = json.load(f) + image = manifest.get('image') or "" + if not image.startswith("dstack-"): + logger.debug( + f"Image {image} is not a dstack image, skipping") + continue + version_tuple = parse_version(image) + if version_tuple < (0, 5, 0): + hd = "hd0" + else: + hd = "hd1" + vm_name = manifest.get( + 'name') or manifest.get('id') or vm_id + + vms.append({ + 'id': vm_id, + 'name': vm_name, + 'hd': hd + }) + logger.debug(f"Found running VM: {vm_name} ({vm_id})") + except (json.JSONDecodeError, IOError) as e: + logger.error(f"Failed to read manifest for VM {vm_id}: {e}") + + logger.info(f"Found {len(vms)} running VMs") + + return vms + + def get_last_backup_time(self, vm_id: str, backup_type: str) -> Optional[int]: + """Get the timestamp of the last backup of specified type for a VM""" + logger.debug( + f"Looking for {backup_type} backup timestamp for VM {vm_id}") + + if vm_id not in self.state: + logger.debug(f"VM {vm_id} not found in state file") + return None + + timestamp = self.state[vm_id].get(backup_type) + if timestamp: + logger.debug( + f"Retrieved {backup_type} backup timestamp for VM {vm_id}: {timestamp}") + + return timestamp + + def update_backup_time(self, vm_id: str, backup_type: str): + """Update the timestamp for a backup type""" + current_time = int(time.time()) + + if vm_id not in self.state: + self.state[vm_id] = {} + + self.state[vm_id][backup_type] = current_time + self._save_state() + + logger.debug( + f"Updated {backup_type} backup timestamp for VM {vm_id} to {datetime.fromtimestamp(current_time)}") + + def _backup_vm_configs(self, vm_dir: Path, backup_dir: Path) -> bool: + """Backup VM configuration files as tar.gz""" + vm_configs = ["vm-manifest.json", "shared/"] + config_archive = backup_dir / "vm-configs.tar.gz" + + logger.info(f"Creating VM config backup: {config_archive}") + + try: + with tarfile.open(config_archive, 'w:gz') as tar: + for config_item in vm_configs: + config_path = vm_dir / config_item + if config_path.exists(): + # Add to archive with relative path + tar.add(config_path, arcname=config_item) + logger.debug(f"Added {config_item} to config archive") + else: + logger.warning( + f"Config item {config_item} not found, skipping") + + logger.info(f"VM config backup completed: {config_archive}") + return True + except Exception as e: + logger.error(f"Failed to create VM config backup: {e}") + return False + + def perform_backup(self, vm_id: str, vm_name: str, backup_type: str, hd: str) -> bool: + """Perform a backup for the specified VM""" + logger.info(f"Performing {backup_type} backup...") + + # Convert to absolute paths + vm_dir = self.vms_dir.resolve() / vm_id + backup_dir = self.backup_dir.resolve() / vm_id / "backups" + qmp_socket = vm_dir / "qmp.sock" + backup_lock = vm_dir / "backup.lock" + + # Create backup directory if it doesn't exist + backup_dir.mkdir(parents=True, exist_ok=True) + + # Set backup level based on type + backup_level = "full" if backup_type == "full" else "inc" + + vm_configs = ["vm-manifest.json", "shared/"] + + # Create or update latest symlink + latest_dir = backup_dir / "latest" + + def do_backup(): + # For full backups, clear bitmaps first + if backup_level == "full": + logger.info( + f"Clearing bitmaps for full backup of VM {vm_name}") + if qmp_socket.exists(): + try: + # Use absolute path for qmp_socket + abs_qmp_socket = qmp_socket.resolve() + result = subprocess.Popen( + ["qmpbackup", "--debug", "--socket", + str(abs_qmp_socket), "cleanup", "--remove-bitmap"], + stdout=sys.stdout, + stderr=sys.stderr + ) + returncode = result.wait() + if returncode != 0: + logger.warning( + f"Failed to clear bitmaps for VM {vm_name} ({vm_id})") + # Continue anyway as this might be the first backup + except Exception as e: + logger.error(f"Error clearing bitmaps: {e}") + return False + else: + logger.error(f"QMP socket not found at {qmp_socket}") + return False + + # Perform the backup + logger.info(f"Running qmpbackup") + + # Convert to absolute paths for qmpbackup + abs_qmp_socket = qmp_socket.resolve() + abs_latest_dir = latest_dir.resolve() + + logger.debug( + f"Running: qmpbackup --socket {abs_qmp_socket} backup -i {hd} --no-subdir -t {abs_latest_dir} -l {backup_level}") + if qmp_socket.exists(): + try: + # Use Popen for real-time output + process = subprocess.Popen( + [ + "qmpbackup", + "--debug", + "--socket", str(abs_qmp_socket), + "backup", + "-i", hd, + "--no-subdir", + "-t", str(abs_latest_dir), + "-l", backup_level + ], + stdout=sys.stdout, + stderr=sys.stderr, + text=True, + bufsize=1 # Line buffered + ) + + # Get return code + returncode = process.wait() + if returncode == 0: + logger.info(f"Disk backup successful") + + # Backup VM configuration files + if not self._backup_vm_configs(vm_dir, abs_latest_dir): + logger.error("VM config backup failed") + return False + + self.update_backup_time(vm_id, backup_type) + + # Rotate backups if needed + if backup_type == "full": + self._rotate_backups(vm_id) + return True + else: + logger.error("Backup failed") + return False + except Exception as e: + logger.error(f"Error performing backup: {e}") + return False + else: + logger.error(f"QMP socket not found at {qmp_socket}") + return False + + if backup_level == "full": + # Create timestamped directory for this backup + timestamp = datetime.now().strftime("%Y%m%dT%H%M%S") + backup_timestamp_dir = backup_dir / f"{timestamp}" + logger.info(f"Creating backup directory: {backup_timestamp_dir}") + backup_timestamp_dir.mkdir(parents=True, exist_ok=True) + try: + latest_dir.unlink() + except FileNotFoundError: + pass + latest_dir.symlink_to(timestamp) + + try: + backup_lock.touch(exist_ok=False) + locked = True + except Exception as e: + logger.error(f"Error creating backup lock: {e}") + locked = False + if locked: + try: + suc = do_backup() + except Exception as e: + logger.error(f"Error performing backup: {e}") + suc = False + finally: + backup_lock.unlink() + else: + suc = False + if backup_type == "full": + if not suc: + logger.info( + f"Removing {os.path.basename(backup_timestamp_dir)}") + try: + shutil.rmtree(backup_timestamp_dir) + except Exception as e: + logger.error(f"Error removing old backup: {e}") + else: + pass + + return suc + + def needs_backup(self, vm_id: str) -> Optional[str]: + """Determine if a VM needs a backup and what type""" + current_time = int(time.time()) + last_full = self.get_last_backup_time(vm_id, "full") + last_full_ts = datetime.fromtimestamp(last_full) if last_full else None + last_incremental = self.get_last_backup_time(vm_id, "incremental") + last_incremental_ts = datetime.fromtimestamp( + last_incremental) if last_incremental else None + + logger.debug(f"Last full backup: {last_full_ts}") + logger.debug(f"Last incremental backup: {last_incremental_ts}") + + # Determine if we need a full backup based on configured interval + if not last_full or (current_time - last_full) > self.full_interval_seconds: + return "full" + # Determine if we need an incremental backup based on configured interval + elif not last_incremental or (current_time - last_incremental) > self.inc_interval_seconds: + return "incremental" + else: + return None + + def _rotate_backups(self, vm_id): + """Remove old backups to keep only max_backups""" + backup_dir = self.backup_dir.resolve() / vm_id / "backups" + if not backup_dir.exists(): + return + + # Get all backup directories (excluding 'latest' symlink) + backup_dirs = [d for d in backup_dir.iterdir() + if d.is_dir() and d.name != "latest"] + + # Sort by name (which is timestamp format) + backup_dirs.sort() + + # If we have more backups than max_backups, remove the oldest ones + if len(backup_dirs) > self.max_backups: + logger.info( + f"Rotating backups for VM {vm_id}, keeping {self.max_backups} most recent") + for old_dir in backup_dirs[:-self.max_backups]: + logger.info( + f"Removing old backup: {os.path.basename(old_dir)}") + try: + shutil.rmtree(old_dir) + except Exception as e: + logger.error(f"Failed to remove old backup {old_dir}: {e}") + + def run(self): + """Main entry point for the backup scheduler""" + logger.info("=" * 80) + logger.info(f"Starting backup scheduler") + logger.info(f"Using VMs directory: {self.vms_dir}") + logger.info(f"Using backup directory: {self.backup_dir}") + + # Get list of running VMs + vms = self.get_running_vms(self.vm_filter) + + if not vms: + logger.info("No running VMs found") + return + + total_vms = len(vms) + # Process each VM + for i, vm in enumerate(vms): + vm_id = vm['id'] + vm_name = vm['name'] + hd = vm['hd'] + + logger.info("-" * 50) + logger.info( + f"[{i+1}/{total_vms}] Processing VM: {vm_name} ({vm_id})") + + # Check if backup is needed + backup_type = self.needs_backup(vm_id) + + if not backup_type: + logger.info(f"No backup needed") + continue + + # Perform backup + start_time = time.time() + if self.perform_backup(vm_id, vm_name, backup_type, hd): + elapsed_time = time.time() - start_time + logger.info( + f"{backup_type} backup completed successfully (total time: {elapsed_time:.2f}s)") + else: + elapsed_time = time.time() - start_time + logger.error( + f"{backup_type} backup failed (time elapsed: {elapsed_time:.2f}s)") + + logger.info("-" * 50) + logger.info("Backup scheduler run completed") + + +def parse_interval(interval_str): + """Parse interval string like '7d' or '12h' into seconds""" + if not interval_str: + raise ValueError("Interval cannot be empty") + + # Get the unit (last character) and value (everything else) + unit = interval_str[-1].lower() + try: + value = int(interval_str[:-1]) + except ValueError: + raise ValueError( + f"Invalid interval format: {interval_str}. Expected format like '7d' or '12h'") + + # Convert to seconds based on unit + if unit == 'd': + return value * 24 * 60 * 60 # days to seconds + elif unit == 'h': + return value * 60 * 60 # hours to seconds + elif unit == 'm': + return value * 60 # minutes to seconds + elif unit == 's': + return value # already in seconds + else: + raise ValueError( + f"Unknown time unit: {unit}. Use d (days), h (hours), m (minutes), or s (seconds)") + + +def parse_args(): + """Parse command line arguments""" + # First parse just the vmm-work-dir to use it for defaults + temp_parser = argparse.ArgumentParser(add_help=False) + temp_parser.add_argument("--vmm-work-dir", type=Path, default=".") + temp_args, _ = temp_parser.parse_known_args() + vmm_work_dir = temp_args.vmm_work_dir + + # Now create the real parser with all arguments + parser = argparse.ArgumentParser( + description="Periodic backup script for dstack VMM") + + # Add all arguments with proper defaults + parser.add_argument("--vmm-work-dir", type=Path, + default=".", + help="dstack-vmm work directory") + parser.add_argument("--vms-dir", type=Path, + default=vmm_work_dir / "run" / "vm", + help="Directory containing VM run data") + parser.add_argument("--backup-dir", type=Path, + default=vmm_work_dir / "run" / "backup", + help="Directory for storing backups") + parser.add_argument("--log-file", type=Path, + default=vmm_work_dir / "logs" / "backup.log", + help="Log file path (with rotation enabled)") + parser.add_argument("--state-file", type=Path, + default=vmm_work_dir / "state" / "backup_state.json", + help="File for storing backup state") + parser.add_argument("--full-interval", type=str, + default="7d", + help="Interval for full backups (e.g., 7d for 7 days)") + parser.add_argument("--inc-interval", type=str, + default="1d", + help="Interval for incremental backups (e.g., 1d for 1 day)") + parser.add_argument("--max-backups", type=int, default=4, + help="Maximum number of full backups to keep per VM") + parser.add_argument("--log-level", type=str, default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + help="Set the logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)") + parser.add_argument("--vm-filter", type=str, help="Filter VMs by ID") + + # Parse all arguments + args = parser.parse_args() + + # Parse interval strings into seconds + args.full_interval_seconds = parse_interval(args.full_interval) + args.inc_interval_seconds = parse_interval(args.inc_interval) + + return args + + +def setup_logging(log_file: Path, log_level: str = "INFO"): + """Set up logging configuration""" + # Create logs directory if it doesn't exist + log_file.parent.mkdir(parents=True, exist_ok=True) + + # Map string log level to logging constants + log_level_map = { + "DEBUG": logging.DEBUG, + "INFO": logging.INFO, + "WARNING": logging.WARNING, + "ERROR": logging.ERROR, + "CRITICAL": logging.CRITICAL + } + + # Get the numeric log level (default to INFO if invalid) + numeric_level = log_level_map.get(log_level.upper(), logging.INFO) + + # Create a rotating file handler (10MB size limit, 3 backup files) + file_handler = RotatingFileHandler( + log_file, maxBytes=10*1024*1024, backupCount=3) + file_handler.setLevel(numeric_level) + file_handler.setFormatter(logging.Formatter( + "[%(asctime)s] %(levelname)s: %(message)s", "%Y-%m-%d %H:%M:%S")) + + # Create console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(numeric_level) + console_handler.setFormatter(logging.Formatter( + "[%(asctime)s] %(levelname)s: %(message)s", "%Y-%m-%d %H:%M:%S")) + + # Configure root logger + root_logger = logging.getLogger() + root_logger.setLevel(numeric_level) + + # Remove any existing handlers + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + + # Add our handlers + root_logger.addHandler(file_handler) + root_logger.addHandler(console_handler) + + +def main(): + """Main entry point""" + try: + # Parse command line arguments + args = parse_args() + + # Set up logging with specified log file and log level + setup_logging(args.log_file, args.log_level) + + # Create directories if they don't exist + args.backup_dir.mkdir(parents=True, exist_ok=True) + args.state_file.parent.mkdir(parents=True, exist_ok=True) + + # Initialize and run scheduler + scheduler = BackupScheduler( + args.vms_dir, args.backup_dir, args.state_file, + args.full_interval_seconds, args.inc_interval_seconds, + args.max_backups, args.vm_filter) + scheduler.run() + except Exception as e: + logger.error(f"Error: {e}") + return 1 + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/vmm/Cargo.toml b/vmm/Cargo.toml index 6283f494..a085bd98 100644 --- a/vmm/Cargo.toml +++ b/vmm/Cargo.toml @@ -44,6 +44,8 @@ hex_fmt.workspace = true lspci.workspace = true base64.workspace = true serde-human-bytes.workspace = true +serde-duration.workspace = true +chrono.workspace = true [dev-dependencies] insta.workspace = true diff --git a/vmm/rpc/proto/vmm_rpc.proto b/vmm/rpc/proto/vmm_rpc.proto index ab51519e..c2a70bf1 100644 --- a/vmm/rpc/proto/vmm_rpc.proto +++ b/vmm/rpc/proto/vmm_rpc.proto @@ -31,6 +31,8 @@ message VmInfo { string shutdown_progress = 12; // Image version string image_version = 13; + // Backup in progress + bool backup_in_progress = 14; } message Id { @@ -223,6 +225,41 @@ message GpuInfo { bool is_free = 4; } +message BackupDiskRequest { + // vm id + string vm_id = 1; + // full or incremental + string level = 2; +} + +message BackupInfo { + // Group id + string backup_id = 1; + // id of the snapshot + string snapshot_id = 2; + // timestamp + string timestamp = 3; + // level: full or incremental + string level = 4; + // size of the backup in bytes + uint64 size = 5; +} + +message ListBackupsResponse { + repeated BackupInfo backups = 1; +} + +message DeleteBackupRequest { + string vm_id = 1; + string backup_id = 2; +} + +message RestoreBackupRequest { + string vm_id = 1; + string backup_id = 2; + string snapshot_id = 3; +} + // Service definition for dstack-vmm service Vmm { // RPC to create a VM @@ -261,4 +298,16 @@ service Vmm { // List GPUs rpc ListGpus(google.protobuf.Empty) returns (ListGpusResponse); + + // Backup a VM data disk + rpc BackupDisk(BackupDiskRequest) returns (google.protobuf.Empty); + + // List backups for a VM + rpc ListBackups(BackupDiskRequest) returns (ListBackupsResponse); + + // Delete a backup + rpc DeleteBackup(DeleteBackupRequest) returns (google.protobuf.Empty); + + // Restore a backup + rpc RestoreBackup(RestoreBackupRequest) returns (google.protobuf.Empty); } diff --git a/vmm/src/app.rs b/vmm/src/app.rs index be464fca..bb32f71e 100644 --- a/vmm/src/app.rs +++ b/vmm/src/app.rs @@ -6,7 +6,9 @@ use dstack_kms_rpc::kms_client::KmsClient; use dstack_types::shared_filenames::{ compat_v3, APP_COMPOSE, ENCRYPTED_ENV, INSTANCE_INFO, SYS_CONFIG, USER_CONFIG, }; -use dstack_vmm_rpc::{self as pb, GpuInfo, StatusRequest, StatusResponse, VmConfiguration}; +use dstack_vmm_rpc::{ + self as pb, BackupInfo, GpuInfo, StatusRequest, StatusResponse, VmConfiguration, +}; use fs_err as fs; use guest_api::client::DefaultClient as GuestClient; use id_pool::IdPool; @@ -16,9 +18,10 @@ use serde_json::json; use std::collections::{BTreeSet, HashMap}; use std::net::IpAddr; use std::path::{Path, PathBuf}; +use std::process::Command; use std::sync::{Arc, Mutex, MutexGuard}; use supervisor_client::SupervisorClient; -use tracing::{error, info}; +use tracing::{error, info, warn}; pub use image::{Image, ImageInfo}; pub use qemu::{VmConfig, VmWorkDir}; @@ -125,6 +128,18 @@ impl App { VmWorkDir::new(self.config.run_path.join(id)) } + fn backups_dir(&self, id: &str) -> PathBuf { + self.config.cvm.backup.path.join(id).join("backups") + } + + fn backup_dir(&self, id: &str, backup_id: &str) -> PathBuf { + self.backups_dir(id).join(backup_id) + } + + fn backup_file(&self, id: &str, backup_id: &str, snapshot_id: &str) -> PathBuf { + self.backup_dir(id, backup_id).join(snapshot_id) + } + pub fn new(config: Config, supervisor: SupervisorClient) -> Self { let cid_start = config.cvm.cid_start; let cid_end = cid_start.saturating_add(config.cvm.cid_pool_size); @@ -647,6 +662,258 @@ impl App { } Ok(()) } + + pub(crate) async fn backup_disk(&self, id: &str, level: &str) -> Result<()> { + if !self.config.cvm.backup.enabled { + bail!("Backup is not enabled"); + } + let work_dir = self.work_dir(id); + let backup_dir = self.backups_dir(id); + + // Determine backup level based on the backup_type + let backup_level = match level { + "full" => "full", + "incremental" => "inc", + _ => bail!("Invalid backup level: {level}"), + }; + + let qmp_socket = work_dir.qmp_socket(); + let _lock = BackupLock::try_lock(work_dir.backup_lock_file()) + .context("Failed to lock for backup")?; + + let id = id.to_string(); + tokio::task::spawn_blocking(move || { + let latest_dir = backup_dir.join("latest"); + if backup_level == "full" { + // clear the bitmaps + let output = Command::new("qmpbackup") + .arg("--socket") + .arg(&qmp_socket) + .arg("cleanup") + .arg("--remove-bitmap") + .output() + .context("Failed to clear bitmaps")?; + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + warn!("Failed to clear bitmaps for {id}: {stderr}"); + } + // Switch to new dir and symbol link the latest to it + let timestamp = chrono::Utc::now().format("%Y%m%dZ%H%M%S").to_string(); + let new_dir = backup_dir.join(×tamp); + fs::create_dir_all(&new_dir).context("Failed to create backup directory")?; + if fs::symlink_metadata(&latest_dir).is_ok() { + fs::remove_file(&latest_dir) + .context("Failed to remove latest directory link")?; + } + fs::os::unix::fs::symlink(×tamp, &latest_dir) + .context("Failed to create latest directory link")?; + } + let output = Command::new("qmpbackup") + .arg("--socket") + .arg(&qmp_socket) + .arg("backup") + .arg("-i") + .arg("hd1") + .arg("--no-subdir") + .arg("-t") + .arg(&latest_dir) + .arg("-l") + .arg(backup_level) + .output() + .context("Failed to execute qmpbackup command")?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + warn!("Failed to backup disk for {id}: {stderr}"); + } + Ok(()) + }) + .await + .context("Failed to execute backup task")? + } + + pub(crate) async fn list_backups(&self, id: &str) -> Result> { + let backup_dir = self.backups_dir(id); + + // Create backup directory if it doesn't exist + if !backup_dir.exists() { + return Ok(Vec::new()); + } + + // List backup groups in the directory + let mut backups = Vec::new(); + + // Read directory entries in a blocking task + let backup_dir_clone = backup_dir.clone(); + let backup_entries = + std::fs::read_dir(backup_dir_clone).context("Failed to read backup directory")?; + + fn filename(path: &Path) -> Option { + path.file_name() + .and_then(|n| n.to_str().map(|s| s.to_string())) + } + + // Process each entry + for backup_entry in backup_entries { + let backup_path = match backup_entry { + Ok(entry) => entry.path(), + Err(e) => { + warn!("Failed to read directory entry: {e:?}"); + continue; + } + }; + if !backup_path.is_dir() { + continue; + } + if backup_path.ends_with("latest") { + continue; + } + let backup_id = filename(&backup_path).context("Failed to get group name")?; + let snaps = match std::fs::read_dir(backup_path) { + Ok(entries) => entries, + Err(e) => { + warn!("Failed to read directory entry: {e:?}"); + continue; + } + }; + for snap in snaps { + let snap_path = match snap { + Ok(entry) => entry.path(), + Err(e) => { + warn!("Failed to read directory entry: {e:?}"); + continue; + } + }; + if !snap_path.is_file() { + continue; + } + // Get file name + let snap_filename = filename(&snap_path).context("Failed to get file name")?; + + if !snap_filename.ends_with(".img") { + continue; + } + let parts = snap_filename + .split('.') + .next() + .context("Failed to split filename")? + .split('-') + .collect::>(); + let [level, timestamp, _] = parts[..] else { + warn!("Invalid backup filename: {snap_filename}"); + continue; + }; + let size = snap_path + .metadata() + .context("Failed to get file metadata")? + .len(); + backups.push(BackupInfo { + backup_id: backup_id.clone(), + snapshot_id: snap_filename.clone(), + timestamp: timestamp.to_string(), + level: level.to_string(), + size, + }); + } + } + Ok(backups) + } + + pub(crate) async fn delete_backup(&self, vm_id: &str, backup_id: &str) -> Result<()> { + if !self.config.cvm.backup.enabled { + bail!("Backup is not enabled"); + } + let backup_dir = self.backup_dir(vm_id, backup_id); + if !backup_dir.exists() { + bail!("Backup does not exist"); + } + if !backup_dir.is_dir() { + bail!("Backup is not a directory"); + } + fs::remove_dir_all(&backup_dir).context("Failed to remove backup directory")?; + Ok(()) + } + + pub(crate) async fn restore_backup( + &self, + vm_id: &str, + backup_id: &str, + snapshot_id: &str, + ) -> Result<()> { + if !self.config.cvm.backup.enabled { + bail!("Backup is not enabled"); + } + // First, ensure the vm is stopped + let info = self.vm_info(vm_id).await?.context("VM not found")?; + if info.status != "stopped" { + bail!("VM is not stopped: status={}", info.status); + } + + let backup_file = self.backup_file(vm_id, backup_id, snapshot_id); + if !backup_file.exists() { + bail!("Backup file not found"); + } + let vm_work_dir = self.work_dir(vm_id); + let hda_img = vm_work_dir.hda_path(); + if snapshot_id.starts_with("FULL") { + // Just copy the file + tokio::fs::copy(&backup_file, &hda_img).await?; + } else { + let backup_dir = self.backup_dir(vm_id, backup_id); + let snapshot_id = snapshot_id.to_string(); + // Rename the current hda file to *.bak + let bak_file = hda_img.display().to_string() + ".bak"; + fs::rename(&hda_img, &bak_file).context("Failed to rename hda file")?; + + tokio::task::spawn_blocking(move || { + /* + qmprestore merge --dir --until --targetfile + */ + let mut command = Command::new("qmprestore"); + command.arg("merge"); + command.arg("--dir").arg(&backup_dir); + command.arg("--until").arg(snapshot_id); + command.arg("--targetfile").arg(&hda_img); + let output = command + .output() + .context("Failed to execute qmprestore command")?; + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + let stdout = String::from_utf8_lossy(&output.stdout); + bail!("Failed to restore backup: {stderr}:{stdout}"); + } + Ok(()) + }) + .await + .context("Failed to spawn restore command")? + .context("Failed to restore backup")?; + } + Ok(()) + } +} + +struct BackupLock { + path: PathBuf, +} + +impl BackupLock { + fn try_lock(path: impl AsRef) -> Result { + let path = path.as_ref(); + let _file = fs::OpenOptions::new() + .create_new(true) + .write(true) + .open(path) + .context("Failed to create backup lock file")?; + Ok(BackupLock { + path: path.to_path_buf(), + }) + } +} + +impl Drop for BackupLock { + fn drop(&mut self) { + fs::remove_file(&self.path).ok(); + } } fn paginate(items: Vec, page: u32, page_size: u32) -> impl Iterator { diff --git a/vmm/src/app/qemu.rs b/vmm/src/app/qemu.rs index e4efeff5..586e0e4e 100644 --- a/vmm/src/app/qemu.rs +++ b/vmm/src/app/qemu.rs @@ -168,6 +168,7 @@ impl VmInfo { app_id: self.manifest.app_id.clone(), instance_id: self.instance_id.as_deref().map(Into::into), exited_at: self.exited_at.clone(), + backup_in_progress: workdir.backup_lock_file().exists(), } } } @@ -731,6 +732,10 @@ impl VmWorkDir { pub fn path(&self) -> &Path { &self.workdir } + + pub fn backup_lock_file(&self) -> PathBuf { + self.workdir.join("backup.lock") + } } impl VmWorkDir { diff --git a/vmm/src/config.rs b/vmm/src/config.rs index 545c97f9..0dd11ef2 100644 --- a/vmm/src/config.rs +++ b/vmm/src/config.rs @@ -130,6 +130,8 @@ pub struct CvmConfig { pub qemu_pci_hole64_size: u64, /// QEMU hotplug_off pub qemu_hotplug_off: bool, + /// Backup configuration + pub backup: BackupConfig, } #[derive(Debug, Clone, Deserialize)] @@ -196,11 +198,15 @@ pub struct GatewayConfig { pub agent_port: u16, } +#[derive(Debug, Clone, Deserialize)] +pub struct BackupConfig { + pub enabled: bool, + pub path: PathBuf, +} + #[derive(Debug, Clone, Deserialize)] pub struct Config { - #[serde(default)] pub image_path: PathBuf, - #[serde(default)] pub run_path: PathBuf, /// The URL of the KMS server pub kms_url: String, @@ -227,12 +233,15 @@ pub struct Config { } impl Config { - pub fn abs_path(self) -> Result { - Ok(Self { - image_path: self.image_path.absolutize()?.to_path_buf(), - run_path: self.run_path.absolutize()?.to_path_buf(), - ..self - }) + pub fn abs_path(mut self) -> Result { + fn absolutize(path: &mut PathBuf) -> Result<()> { + *path = path.absolutize()?.to_path_buf(); + Ok(()) + } + absolutize(&mut self.image_path)?; + absolutize(&mut self.run_path)?; + absolutize(&mut self.cvm.backup.path)?; + Ok(self) } } diff --git a/vmm/src/console.html b/vmm/src/console.html index 52f95f00..e77c18a9 100644 --- a/vmm/src/console.html +++ b/vmm/src/console.html @@ -251,6 +251,124 @@ background: #FF9800; color: white; } + + /* Backup dialog styles */ + .tabs { + display: flex; + border-bottom: 1px solid #ddd; + margin-bottom: 20px; + } + + .tab-btn { + padding: 10px 20px; + background: none; + border: none; + cursor: pointer; + font-size: 14px; + font-weight: 500; + color: #666; + } + + .tab-btn.active { + color: #1976D2; + border-bottom: 2px solid #1976D2; + } + + .tab-content { + padding: 10px 0; + } + + .backup-table { + width: 100%; + border-collapse: collapse; + margin-bottom: 16px; + } + + .backup-table th, + .backup-table td { + padding: 8px; + text-align: left; + border-bottom: 1px solid #ddd; + } + + .backup-list { + max-height: 300px; + overflow-y: auto; + margin-bottom: 16px; + } + + .backup-container { + display: flex; + flex-direction: column; + gap: 20px; + } + + .backup-section { + background-color: #f9f9f9; + padding: 16px; + border-radius: 8px; + border: 1px solid #eee; + } + + .backup-section h4 { + margin-top: 0; + margin-bottom: 16px; + color: #333; + } + + .backup-group { + margin-bottom: 24px; + border: 1px solid #ddd; + border-radius: 6px; + overflow: hidden; + } + + .backup-group-header { + background-color: #f0f0f0; + padding: 10px 16px; + border-bottom: 1px solid #ddd; + } + + .backup-group-header h5 { + margin: 0; + font-size: 14px; + color: #444; + } + + .backup-group-actions { + padding: 10px; + background-color: #f5f5f5; + text-align: right; + } + + .action-buttons { + display: flex; + gap: 8px; + } + + .no-backups { + padding: 20px; + text-align: center; + color: #666; + background-color: #f9f9f9; + border-radius: 6px; + } + + .loading-spinner { + display: inline-block; + width: 16px; + height: 16px; + border: 2px solid rgba(255, 255, 255, 0.3); + border-radius: 50%; + border-top-color: white; + animation: spin 1s ease-in-out infinite; + } + + @keyframes spin { + to { + transform: rotate(360deg); + } + }