From 982ee3207e6aaaa849cdcc18078377970e077583 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Thu, 24 Jul 2025 11:40:10 +0200 Subject: [PATCH 01/32] Clean up first example --- .gitignore | 1 + examples/startup/data_transfer_runner.py | 22 +++++++++++++++------- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index a8f5df39..8adbe249 100644 --- a/.gitignore +++ b/.gitignore @@ -184,3 +184,4 @@ cython_debug/ build_info.* doc/source/api/_autosummary test_run/ +**/dt_download/worker/** diff --git a/examples/startup/data_transfer_runner.py b/examples/startup/data_transfer_runner.py index 89b6fe76..fac97bef 100644 --- a/examples/startup/data_transfer_runner.py +++ b/examples/startup/data_transfer_runner.py @@ -55,7 +55,7 @@ def main( auth_url = f"{url}/auth/realms/rep" log = logging.getLogger() - logging.basicConfig(format="%(levelname)8s > %(message)s", level=logging.DEBUG) + logging.basicConfig(format="%(levelname)8s > %(message)s", level=logging.DEBUG if debug else logging.INFO) user_token = authenticate(username=username, password=password, verify=False, url=auth_url) user_token = user_token.get("access_token", None) @@ -63,26 +63,34 @@ def main( client = Client() client.binary_config.update( - verbosity=3, - debug=debug, insecure=True, token=user_token, ) + if debug: + client.binary_config.update(verbosity=3, debug=True) - client.binary_config.debug = True client.start() api = DataTransferApi(client) s = api.status(wait=True) - log.info("Status: %s" % s) + log.info("--- Worker info ---") + log.info(f"Ready: {s.ready}") + log.info(f"Build info:") + for k, v in s.build_info.__dict__.items(): + log.info(f" {k}: {v}") + log.info(f"Features:") + for k, v in s.features.__dict__.items(): + log.info(f" {k}: {v}") log.info("Available storage:") for d in api.storages(): - log.info(f"- {json.dumps(d, indent=4)}") + log.info(f" name={d['name']} type={d['type']} priority={d['priority']}") - for i in range(10): + log.info("--- Idling for a while ---") + for i in range(5): log.info("Idling for a while...") time.sleep(2) + log.info("--- Stopping ---") client.stop() From 6feb8c81c7d34107d59f045e6c7548621629933d Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Thu, 24 Jul 2025 11:50:51 +0200 Subject: [PATCH 02/32] Reduce spam --- src/ansys/hps/data_transfer/client/binary.py | 6 +++--- src/ansys/hps/data_transfer/client/client.py | 14 ++++++++------ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/ansys/hps/data_transfer/client/binary.py b/src/ansys/hps/data_transfer/client/binary.py index b0329558..e541c2e4 100644 --- a/src/ansys/hps/data_transfer/client/binary.py +++ b/src/ansys/hps/data_transfer/client/binary.py @@ -263,7 +263,7 @@ def start(self): # Mark binary as executable if not os.access(bin_path, os.X_OK): - log.debug(f"Marking binary as executable: {bin_path}") + # log.debug(f"Marking binary as executable: {bin_path}") st = os.stat(bin_path) os.chmod(bin_path, st.st_mode | stat.S_IEXEC) @@ -363,9 +363,9 @@ def _monitor(self): env.update(self._config.env) env_str = ",".join([k for k in self._config.env.keys() if k != "PATH"]) - log.debug(f"Starting worker: {redacted}") + log.debug(f"Command: {redacted}") if self._config.debug: - log.debug(f"Worker environment: {env_str}") + log.debug(f"Environment: {env_str}") with PrepareSubprocess(): self._process = subprocess.Popen( diff --git a/src/ansys/hps/data_transfer/client/client.py b/src/ansys/hps/data_transfer/client/client.py index b96a8969..e6e0a578 100644 --- a/src/ansys/hps/data_transfer/client/client.py +++ b/src/ansys/hps/data_transfer/client/client.py @@ -47,7 +47,7 @@ urllib3.disable_warnings() -for n in ["httpx", "httpcore", "requests", "urllib3"]: +for n in ["httpx", "httpcore", "requests", "urllib3", "filelock"]: logger = logging.getLogger(n) logger.setLevel(logging.CRITICAL) @@ -355,7 +355,9 @@ def _platform(self): return f"{plat}-{arch}" def _prepare_bin_path(self, build_info): - log.debug(f"Server version: {build_info}") + log.debug(f"Server build info:") + for k, v in build_info.items(): + log.debug(f" {k}: {v}") version_hash = build_info["version_hash"] # Figure out binary download path @@ -453,16 +455,16 @@ def _prepare_platform_binary(self): os.remove(bin_path) st = os.stat(bin_path) - log.debug(f"Marking binary as executable: {bin_path}") + # log.debug(f"Marking binary as executable: {bin_path}") os.chmod(bin_path, st.st_mode | stat.S_IEXEC) - if self._bin_config.debug: - log.debug(f"Binary mode: {stat.filemode(os.stat(bin_path).st_mode)}") + # if self._bin_config.debug: + # log.debug(f"Binary mode: {stat.filemode(os.stat(bin_path).st_mode)}") except filelock.Timeout as ex: raise BinaryError(f"Failed to acquire lock for binary download: {lock_path}") from ex def _create_session(self, url: str, *, sync: bool = True): verify = not self._bin_config.insecure - log.debug("Creating session for %s with verify=%s", url, verify) + # log.debug("Creating session for %s with verify=%s", url, verify) args = { "timeout": httpx.Timeout(self._timeout), From 635a62577689f75064ac348b1d36c4393fea9264 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Thu, 24 Jul 2025 13:53:49 +0200 Subject: [PATCH 03/32] Tweaking the settings --- examples/startup/data_transfer_runner.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/examples/startup/data_transfer_runner.py b/examples/startup/data_transfer_runner.py index fac97bef..1f53bec1 100644 --- a/examples/startup/data_transfer_runner.py +++ b/examples/startup/data_transfer_runner.py @@ -46,6 +46,7 @@ def main( debug: Annotated[bool, typer.Option(help="Enable debug logging")] = False, + verbose: Annotated[bool, typer.Option(help="Increase verbosity")] = False, url: Annotated[str, typer.Option(help="HPS URL to connect to")] = "https://localhost:8443/hps", username: Annotated[str, typer.Option(help="Username to authenticate with")] = "repadmin", password: Annotated[ @@ -55,7 +56,7 @@ def main( auth_url = f"{url}/auth/realms/rep" log = logging.getLogger() - logging.basicConfig(format="%(levelname)8s > %(message)s", level=logging.DEBUG if debug else logging.INFO) + logging.basicConfig(format="%(levelname)8s > %(message)s", level=logging.DEBUG if debug or verbose else logging.INFO) user_token = authenticate(username=username, password=password, verify=False, url=auth_url) user_token = user_token.get("access_token", None) @@ -68,6 +69,8 @@ def main( ) if debug: client.binary_config.update(verbosity=3, debug=True) + if verbose: + client.binary_config.update(verbosity=3) client.start() api = DataTransferApi(client) From 838f65dc98dbbb1836a164426040c54328143e90 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Thu, 24 Jul 2025 13:53:57 +0200 Subject: [PATCH 04/32] Allow custom message logging --- src/ansys/hps/data_transfer/client/binary.py | 67 +++++++++++--------- 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/src/ansys/hps/data_transfer/client/binary.py b/src/ansys/hps/data_transfer/client/binary.py index e541c2e4..659fcca1 100644 --- a/src/ansys/hps/data_transfer/client/binary.py +++ b/src/ansys/hps/data_transfer/client/binary.py @@ -51,7 +51,6 @@ "panic": logging.CRITICAL, } - class PrepareSubprocess: """Provides for letting the context manager disable ``vfork`` and ``posix_spawn`` in the subprocess.""" @@ -74,6 +73,37 @@ def __exit__(self, exc_type, exc_val, exc_tb): subprocess._USE_VFORK = self._orig_use_vfork subprocess._USE_POSIX_SPAWN = self._orig_use_pspawn +def default_log_message(debug : bool, data : dict[str, any]): + """Default log message handler. + + Parameters + ---------- + data : dict + Data to log. + """ + # log.warning(f"Worker: {d}") + + level = data.pop("level", "info") + data.pop("time", None) + if not debug: + data.pop("caller", None) + data.pop("mode", None) + msg = data.pop("message", None) + + if msg is None: + return + + msg = msg.capitalize() + level_no = level_map.get(level, logging.INFO) + other = "" + for k, v in data.items(): + formatted_value = f'"{v}"' if isinstance(v, str) and " " in v else v + other += f"{k}={formatted_value} " + other = other.strip() + if other: + msg += f" {other}" + msg = msg.encode("ascii", errors="ignore").decode().strip() + log.log(level_no, f"{msg}") class BinaryConfig: """Provides for configuring the worker binary connection to the HPS data transfer client. @@ -109,6 +139,7 @@ def __init__( # Process related settings log: bool = True, log_to_file: bool = False, + log_message: callable = default_log_message, monitor_interval: float = 0.5, # TODO: Remove path? not used anywhere path=None, @@ -128,6 +159,7 @@ def __init__( # Process related settings self.log = log self.log_to_file = log_to_file + self._log_message = log_message self.monitor_interval = monitor_interval self.path = path @@ -302,6 +334,8 @@ def stop(self, wait=5.0): time.sleep(wait * 0.1) def _log_output(self): + log_message = self._config._log_message + while not self._stop.is_set(): if self._process is None or self._process.stdout is None: time.sleep(1) @@ -311,8 +345,9 @@ def _log_output(self): if not line: break line = line.decode(errors="strip").strip() - # log.info("Worker: %s" % line) - self._log_line(line) + if log_message is not None: + d = json.loads(line) + log_message(self._config.debug, d) except json.decoder.JSONDecodeError: pass except Exception as e: @@ -321,32 +356,6 @@ def _log_output(self): time.sleep(1) log.debug("Worker log output stopped") - def _log_line(self, line): - d = json.loads(line) - # log.warning(f"Worker: {d}") - - level = d.pop("level", "info") - d.pop("time", None) - if not self._config.debug: - d.pop("caller", None) - d.pop("mode", None) - msg = d.pop("message", None) - - if msg is None: - return - - msg = msg.capitalize() - level_no = level_map.get(level, logging.INFO) - other = "" - for k, v in d.items(): - formatted_value = f'"{v}"' if isinstance(v, str) and " " in v else v - other += f"{k}={formatted_value} " - other = other.strip() - if other: - msg += f" {other}" - msg = msg.encode("ascii", errors="ignore").decode().strip() - log.log(level_no, f"{msg}") - def _monitor(self): while not self._stop.is_set(): if self._process is None: From 6faeef504b9a5ffe72545bd643c273fd49f38332 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Thu, 24 Jul 2025 14:18:16 +0200 Subject: [PATCH 05/32] Clean up stop message --- src/ansys/hps/data_transfer/client/binary.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ansys/hps/data_transfer/client/binary.py b/src/ansys/hps/data_transfer/client/binary.py index 659fcca1..ca4d9bdb 100644 --- a/src/ansys/hps/data_transfer/client/binary.py +++ b/src/ansys/hps/data_transfer/client/binary.py @@ -319,7 +319,6 @@ def stop(self, wait=5.0): if self._process is None: return - log.debug("Stopping worker ...") self._stop.set() self._prepared.clear() From e1e9535fb975aa465ab1e08e9db27d8281511851 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Thu, 24 Jul 2025 14:27:43 +0200 Subject: [PATCH 06/32] Allow verbosity config in example --- examples/startup/data_transfer_runner.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/examples/startup/data_transfer_runner.py b/examples/startup/data_transfer_runner.py index 1f53bec1..b78c9f3a 100644 --- a/examples/startup/data_transfer_runner.py +++ b/examples/startup/data_transfer_runner.py @@ -46,7 +46,7 @@ def main( debug: Annotated[bool, typer.Option(help="Enable debug logging")] = False, - verbose: Annotated[bool, typer.Option(help="Increase verbosity")] = False, + verbosity: Annotated[int, typer.Option(help="Increase verbosity")] = 1, url: Annotated[str, typer.Option(help="HPS URL to connect to")] = "https://localhost:8443/hps", username: Annotated[str, typer.Option(help="Username to authenticate with")] = "repadmin", password: Annotated[ @@ -56,7 +56,7 @@ def main( auth_url = f"{url}/auth/realms/rep" log = logging.getLogger() - logging.basicConfig(format="%(levelname)8s > %(message)s", level=logging.DEBUG if debug or verbose else logging.INFO) + logging.basicConfig(format="%(levelname)8s > %(message)s", level=logging.DEBUG if debug or verbosity > 1 else logging.INFO) user_token = authenticate(username=username, password=password, verify=False, url=auth_url) user_token = user_token.get("access_token", None) @@ -66,11 +66,10 @@ def main( client.binary_config.update( insecure=True, token=user_token, + verbosity=verbosity, ) if debug: client.binary_config.update(verbosity=3, debug=True) - if verbose: - client.binary_config.update(verbosity=3) client.start() api = DataTransferApi(client) From f971920041edf7453d6a1822565b4d39db335959 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Thu, 24 Jul 2025 14:31:40 +0200 Subject: [PATCH 07/32] Adjust example options --- examples/basic/00_file_operations_client.py | 7 ++++--- examples/basic/01_async_data_transfer_client.py | 11 +++++------ examples/permissions/set_permissions_example.py | 7 ++++--- examples/transfer_files/transfer_files.py | 7 ++++--- 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/examples/basic/00_file_operations_client.py b/examples/basic/00_file_operations_client.py index 8896dceb..c9300f56 100644 --- a/examples/basic/00_file_operations_client.py +++ b/examples/basic/00_file_operations_client.py @@ -99,6 +99,7 @@ def main( local_path: Annotated[str, typer.Option(help="Path to the files or directory to transfer. Supports wildcards.")], remote_path: Annotated[str, typer.Option(help="Optional path to the remote directory to transfer files to.")] = None, debug: Annotated[bool, typer.Option(help="Enable debug logging.")] = False, + verbosity: Annotated[int, typer.Option(help="Increase verbosity")] = 1, url: Annotated[str, typer.Option(help="HPS URL to connect to.")] = "https://localhost:8443/hps", username: Annotated[str, typer.Option(help="Username to authenticate with.")] = "repadmin", password: Annotated[ @@ -107,7 +108,7 @@ def main( ): logging.basicConfig( - format="[%(asctime)s | %(levelname)s] %(message)s", level=logging.DEBUG if debug else logging.INFO + format="[%(asctime)s | %(levelname)s] %(message)s", level=logging.DEBUG if debug or verbosity > 1 else logging.INFO ) dt_url = f"{url}/dt/api/v1" @@ -121,8 +122,8 @@ def main( client = Client(clean=True) client.binary_config.update( - verbosity=3, - debug=False, + verbosity=verbosity, + debug=debug, insecure=True, token=token, data_transfer_url=dt_url, diff --git a/examples/basic/01_async_data_transfer_client.py b/examples/basic/01_async_data_transfer_client.py index c1728b7c..b1377305 100644 --- a/examples/basic/01_async_data_transfer_client.py +++ b/examples/basic/01_async_data_transfer_client.py @@ -48,13 +48,9 @@ from ansys.hps.data_transfer.client.authenticate import authenticate from ansys.hps.data_transfer.client.models.msg import SrcDst, StoragePath -log = logging.getLogger(__name__) -logger = logging.getLogger() -logging.basicConfig(format="%(asctime)s %(levelname)8s > %(message)s", level=logging.DEBUG) - - async def main( debug: Annotated[bool, typer.Option(help="Enable debug logging")] = False, + verbosity: Annotated[int, typer.Option(help="Increase verbosity")] = 1, url: Annotated[str, typer.Option(help="HPS URL to connect to")] = "https://localhost:8443/hps", username: Annotated[str, typer.Option(help="Username to authenticate with")] = "repadmin", password: Annotated[ @@ -62,6 +58,9 @@ async def main( ] = "repadmin", ): + log = logging.getLogger() + logging.basicConfig(format="%(asctime)s %(levelname)8s > %(message)s", level=logging.DEBUG if debug or verbosity > 1 else logging.INFO) + dt_url = f"{url}/dt/api/v1" auth_url = f"{url}/auth/realms/rep" token = authenticate(username=username, password=password, verify=False, url=auth_url) @@ -75,7 +74,7 @@ async def main( client = AsyncClient(clean=True) client.binary_config.update( - verbosity=3, + verbosity=verbosity, debug=debug, insecure=True, token=token, diff --git a/examples/permissions/set_permissions_example.py b/examples/permissions/set_permissions_example.py index fdc096b9..7c85c6f1 100644 --- a/examples/permissions/set_permissions_example.py +++ b/examples/permissions/set_permissions_example.py @@ -235,6 +235,7 @@ def permissions(api: DataTransferApi, url: str): # ======================== def main( debug: Annotated[bool, typer.Option(help="Enable debug logging")] = False, + verbosity: Annotated[int, typer.Option(help="Increase verbosity")] = 1, url: Annotated[str, typer.Option(help="HPS URL to connect to")] = "https://localhost:8443/hps", username: Annotated[str, typer.Option(help="Username to authenticate with")] = "repadmin", password: Annotated[ @@ -242,7 +243,7 @@ def main( ] = "repadmin", ): logging.basicConfig( - format="[%(asctime)s | %(levelname)s] %(message)s", level=logging.DEBUG if debug else logging.INFO + format="[%(asctime)s | %(levelname)s] %(message)s", level=logging.DEBUG if debug or verbosity > 1 else logging.INFO ) dt_url = f"{url}/dt/api/v1" @@ -256,8 +257,8 @@ def main( client = Client(clean=True) client.binary_config.update( - verbosity=3, - debug=False, + verbosity=verbosity, + debug=debug, insecure=True, token=token, data_transfer_url=dt_url, diff --git a/examples/transfer_files/transfer_files.py b/examples/transfer_files/transfer_files.py index 481e13f4..5e351344 100644 --- a/examples/transfer_files/transfer_files.py +++ b/examples/transfer_files/transfer_files.py @@ -151,6 +151,7 @@ def main( local_path: Annotated[str, typer.Option(help="Path to the files or directory to transfer. Supports wildcards")], remote_path: Annotated[str, typer.Option(help="Optional path to the remote directory to transfer files to")] = None, debug: Annotated[bool, typer.Option(help="Enable debug logging")] = False, + verbosity: Annotated[int, typer.Option(help="Increase verbosity")] = 1, url: Annotated[str, typer.Option(help="HPS URL to connect to")] = "https://localhost:8443/hps", username: Annotated[str, typer.Option(help="Username to authenticate with")] = "repadmin", password: Annotated[ @@ -158,7 +159,7 @@ def main( ] = "repadmin", ): logging.basicConfig( - format="[%(asctime)s | %(levelname)s] %(message)s", level=logging.DEBUG if debug else logging.INFO + format="[%(asctime)s | %(levelname)s] %(message)s", level=logging.DEBUG if debug or verbosity > 1 else logging.INFO ) dt_url = f"{url}/dt/api/v1" @@ -174,8 +175,8 @@ def main( client = Client(clean=True) client.binary_config.update( - verbosity=3, - debug=False, + verbosity=verbosity, + debug=debug, insecure=True, token=token, data_transfer_url=dt_url, From dc3ad30537cd12647c082babdc207e0020921594 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Thu, 24 Jul 2025 14:36:47 +0200 Subject: [PATCH 08/32] Add shared function to figure out the log level --- examples/basic/00_file_operations_client.py | 6 ++---- examples/basic/01_async_data_transfer_client.py | 4 ++-- examples/permissions/set_permissions_example.py | 6 ++---- examples/startup/data_transfer_runner.py | 4 ++-- examples/transfer_files/transfer_files.py | 7 +++---- src/ansys/hps/data_transfer/client/__init__.py | 1 + src/ansys/hps/data_transfer/client/binary.py | 13 +++++++++++++ 7 files changed, 25 insertions(+), 16 deletions(-) diff --git a/examples/basic/00_file_operations_client.py b/examples/basic/00_file_operations_client.py index c9300f56..15f9aa72 100644 --- a/examples/basic/00_file_operations_client.py +++ b/examples/basic/00_file_operations_client.py @@ -43,7 +43,7 @@ import typer from typing_extensions import Annotated -from ansys.hps.data_transfer.client import Client, DataTransferApi +from ansys.hps.data_transfer.client import Client, DataTransferApi, get_log_level from ansys.hps.data_transfer.client.authenticate import authenticate from ansys.hps.data_transfer.client.models.msg import SrcDst, StoragePath @@ -107,9 +107,7 @@ def main( ] = "repadmin", ): - logging.basicConfig( - format="[%(asctime)s | %(levelname)s] %(message)s", level=logging.DEBUG if debug or verbosity > 1 else logging.INFO - ) + logging.basicConfig(format="%(levelname)8s > %(message)s", level=get_log_level(debug, verbosity)) dt_url = f"{url}/dt/api/v1" auth_url = f"{url}/auth/realms/rep" diff --git a/examples/basic/01_async_data_transfer_client.py b/examples/basic/01_async_data_transfer_client.py index b1377305..d5dd5a05 100644 --- a/examples/basic/01_async_data_transfer_client.py +++ b/examples/basic/01_async_data_transfer_client.py @@ -44,7 +44,7 @@ import typer from typing_extensions import Annotated -from ansys.hps.data_transfer.client import AsyncClient, AsyncDataTransferApi +from ansys.hps.data_transfer.client import AsyncClient, AsyncDataTransferApi, get_log_level from ansys.hps.data_transfer.client.authenticate import authenticate from ansys.hps.data_transfer.client.models.msg import SrcDst, StoragePath @@ -59,7 +59,7 @@ async def main( ): log = logging.getLogger() - logging.basicConfig(format="%(asctime)s %(levelname)8s > %(message)s", level=logging.DEBUG if debug or verbosity > 1 else logging.INFO) + logging.basicConfig(yyformat="%(levelname)8s > %(message)s", level=get_log_level(debug, verbosity)) dt_url = f"{url}/dt/api/v1" auth_url = f"{url}/auth/realms/rep" diff --git a/examples/permissions/set_permissions_example.py b/examples/permissions/set_permissions_example.py index 7c85c6f1..d94924c1 100644 --- a/examples/permissions/set_permissions_example.py +++ b/examples/permissions/set_permissions_example.py @@ -44,7 +44,7 @@ import typer from typing_extensions import Annotated -from ansys.hps.data_transfer.client import Client, DataTransferApi +from ansys.hps.data_transfer.client import Client, DataTransferApi, get_log_level from ansys.hps.data_transfer.client.authenticate import authenticate from ansys.hps.data_transfer.client.models.msg import SrcDst, StoragePath from ansys.hps.data_transfer.client.models.permissions import ( @@ -242,9 +242,7 @@ def main( str, typer.Option(prompt=True, hide_input=True, help="Password to authenticate with") ] = "repadmin", ): - logging.basicConfig( - format="[%(asctime)s | %(levelname)s] %(message)s", level=logging.DEBUG if debug or verbosity > 1 else logging.INFO - ) + logging.basicConfig(format="%(levelname)8s > %(message)s", level=get_log_level(debug, verbosity)) dt_url = f"{url}/dt/api/v1" auth_url = f"{url}/auth/realms/rep" diff --git a/examples/startup/data_transfer_runner.py b/examples/startup/data_transfer_runner.py index b78c9f3a..33645f51 100644 --- a/examples/startup/data_transfer_runner.py +++ b/examples/startup/data_transfer_runner.py @@ -40,7 +40,7 @@ import typer from typing_extensions import Annotated -from ansys.hps.data_transfer.client import Client, DataTransferApi +from ansys.hps.data_transfer.client import Client, DataTransferApi, get_log_level from ansys.hps.data_transfer.client.authenticate import authenticate @@ -56,7 +56,7 @@ def main( auth_url = f"{url}/auth/realms/rep" log = logging.getLogger() - logging.basicConfig(format="%(levelname)8s > %(message)s", level=logging.DEBUG if debug or verbosity > 1 else logging.INFO) + logging.basicConfig(format="%(levelname)8s > %(message)s", level=get_log_level(debug, verbosity)) user_token = authenticate(username=username, password=password, verify=False, url=auth_url) user_token = user_token.get("access_token", None) diff --git a/examples/transfer_files/transfer_files.py b/examples/transfer_files/transfer_files.py index 5e351344..b334008d 100644 --- a/examples/transfer_files/transfer_files.py +++ b/examples/transfer_files/transfer_files.py @@ -49,7 +49,7 @@ import typer from typing_extensions import Annotated -from ansys.hps.data_transfer.client import Client, DataTransferApi +from ansys.hps.data_transfer.client import Client, DataTransferApi, get_log_level from ansys.hps.data_transfer.client.authenticate import authenticate from ansys.hps.data_transfer.client.models.msg import SrcDst, StoragePath @@ -158,9 +158,8 @@ def main( str, typer.Option(prompt=True, hide_input=True, help="Password to authenticate with") ] = "repadmin", ): - logging.basicConfig( - format="[%(asctime)s | %(levelname)s] %(message)s", level=logging.DEBUG if debug or verbosity > 1 else logging.INFO - ) + logging.basicConfig() + logging.basicConfig(format="%(levelname)8s > %(message)s", level=get_log_level(debug, verbosity)) dt_url = f"{url}/dt/api/v1" auth_url = f"{url}/auth/realms/rep" diff --git a/src/ansys/hps/data_transfer/client/__init__.py b/src/ansys/hps/data_transfer/client/__init__.py index 925fc2d7..9e1aac9f 100644 --- a/src/ansys/hps/data_transfer/client/__init__.py +++ b/src/ansys/hps/data_transfer/client/__init__.py @@ -25,3 +25,4 @@ from .api import AsyncDataTransferApi, DataTransferApi from .client import AsyncClient, Client from .exceptions import APIError, ClientError, HPSError +from .binary import get_log_level diff --git a/src/ansys/hps/data_transfer/client/binary.py b/src/ansys/hps/data_transfer/client/binary.py index ca4d9bdb..c3d2fbf3 100644 --- a/src/ansys/hps/data_transfer/client/binary.py +++ b/src/ansys/hps/data_transfer/client/binary.py @@ -51,6 +51,19 @@ "panic": logging.CRITICAL, } +verbosity_map = { + 0: logging.WARNING, + 1: logging.INFO, + 2: logging.DEBUG, + 3: logging.DEBUG, +} + +def get_log_level(verbosity: int, debug: bool = False) -> int: + """Get the log level based on verbosity and debug flag.""" + if debug: + return logging.DEBUG + return verbosity_map.get(verbosity, logging.INFO) + class PrepareSubprocess: """Provides for letting the context manager disable ``vfork`` and ``posix_spawn`` in the subprocess.""" From 79c80e5524d8982fdaf1326eaa8f3ac65d907254 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Thu, 24 Jul 2025 14:41:08 +0200 Subject: [PATCH 09/32] Adjust messaging --- src/ansys/hps/data_transfer/client/binary.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/ansys/hps/data_transfer/client/binary.py b/src/ansys/hps/data_transfer/client/binary.py index c3d2fbf3..bf4fb6b2 100644 --- a/src/ansys/hps/data_transfer/client/binary.py +++ b/src/ansys/hps/data_transfer/client/binary.py @@ -325,7 +325,7 @@ def start(self): t.start() if not self._prepared.wait(timeout=5.0): - log.warning("Worker did not prepare in time.") + log.warning("Worker preparation is taking longer than expected ...") def stop(self, wait=5.0): """Stop the worker binary.""" @@ -338,6 +338,7 @@ def stop(self, wait=5.0): start = time.time() while True: if self._process.poll() is not None: + log.debug("Worker stopped.") break if time.time() - start > wait: log.warning("Worker did not stop in time, killing ...") @@ -366,7 +367,7 @@ def _log_output(self): if self._config.debug: log.debug(f"Error reading worker output: {e}") time.sleep(1) - log.debug("Worker log output stopped") + # log.debug("Worker log output stopped") def _monitor(self): while not self._stop.is_set(): @@ -406,7 +407,7 @@ def _monitor(self): # log.debug(f"Worker running ...") time.sleep(self._config.monitor_interval) - log.debug("Worker monitor stopped") + # log.debug("Worker monitor stopped") def _prepare(self): if self._config._selected_port is None: From 9907df936c884657af184bd932b8fc12b350af9c Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Fri, 25 Jul 2025 14:44:34 +0200 Subject: [PATCH 10/32] Fail model gen on frist error --- scripts/generate_models.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scripts/generate_models.sh b/scripts/generate_models.sh index 060fcae8..e7d34491 100755 --- a/scripts/generate_models.sh +++ b/scripts/generate_models.sh @@ -1,5 +1,7 @@ #!/bin/bash +set -e + models_dir=src/ansys/hps/data_transfer/client/models curl --insecure -L https://localhost:8443/hps/dt/swagger/doc.json -o openapi2.json From d9c05ef3fffdbb9ad6a113207efacb1d5fbb958e Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Fri, 25 Jul 2025 15:34:59 +0200 Subject: [PATCH 11/32] Add operation handler --- .gitignore | 2 +- examples/basic/00_file_operations_client.py | 11 +- .../basic/01_async_data_transfer_client.py | 8 +- .../permissions/set_permissions_example.py | 2 +- examples/startup/data_transfer_runner.py | 2 +- .../_downloaded/transfer_files.py | 201 ++++++++++++++++++ examples/transfer_files/transfer_files.py | 12 +- .../hps/data_transfer/client/__init__.py | 2 +- src/ansys/hps/data_transfer/client/api/api.py | 67 ++++-- src/ansys/hps/data_transfer/client/binary.py | 6 +- src/ansys/hps/data_transfer/client/client.py | 4 +- .../data_transfer/client/models/metadata.py | 4 +- .../hps/data_transfer/client/models/ops.py | 1 + 13 files changed, 275 insertions(+), 47 deletions(-) create mode 100644 examples/transfer_files/_downloaded/transfer_files.py diff --git a/.gitignore b/.gitignore index 8adbe249..f13084ed 100644 --- a/.gitignore +++ b/.gitignore @@ -184,4 +184,4 @@ cython_debug/ build_info.* doc/source/api/_autosummary test_run/ -**/dt_download/worker/** +**/dt_download/worker/** diff --git a/examples/basic/00_file_operations_client.py b/examples/basic/00_file_operations_client.py index 15f9aa72..f88de62e 100644 --- a/examples/basic/00_file_operations_client.py +++ b/examples/basic/00_file_operations_client.py @@ -47,6 +47,7 @@ from ansys.hps.data_transfer.client.authenticate import authenticate from ansys.hps.data_transfer.client.models.msg import SrcDst, StoragePath + log = logging.getLogger(__name__) #################################### @@ -56,11 +57,6 @@ def file_operations(api: DataTransferApi, local_path: str, remote_path: Optional if not remote_path: remote_path = Path(local_path).parent.name - log.info("Query storages ...") - storages = api.storages() - storage_names = [f"{storage['name']}({storage['type']})" for storage in storages] - log.info(f"Available storages: {storage_names}") - log.info("Creating a directory ...") base_dir = "basic-example" mkdir_op = api.mkdir([StoragePath(path=f"{base_dir}")]) @@ -73,12 +69,10 @@ def file_operations(api: DataTransferApi, local_path: str, remote_path: Optional op = api.copy([SrcDst(src=src, dst=dst) for src, dst in zip(srcs, dsts)]) op = api.wait_for([op.id]) - log.info(f"Operation {op[0].state}") log.info("Listing files ...") op = api.list([StoragePath(path=base_dir)]) op = api.wait_for([op.id]) - log.info(f"Operation {op[0].state}") log.info(f"Files in {base_dir}: {op[0].result}") log.info("Getting metadata ...") @@ -90,7 +84,6 @@ def file_operations(api: DataTransferApi, local_path: str, remote_path: Optional log.info("Removing files ...") op = api.rmdir([StoragePath(path=base_dir)]) op = api.wait_for([op.id]) - log.info(f"Operation {op[0].state}") #################################### # Define the main function @@ -106,8 +99,8 @@ def main( str, typer.Option(prompt=True, hide_input=True, help="Password to authenticate with.") ] = "repadmin", ): + logging.basicConfig(format="%(levelname)8s > %(message)s", level=get_log_level(verbosity, debug)) - logging.basicConfig(format="%(levelname)8s > %(message)s", level=get_log_level(debug, verbosity)) dt_url = f"{url}/dt/api/v1" auth_url = f"{url}/auth/realms/rep" diff --git a/examples/basic/01_async_data_transfer_client.py b/examples/basic/01_async_data_transfer_client.py index d5dd5a05..6df3af5f 100644 --- a/examples/basic/01_async_data_transfer_client.py +++ b/examples/basic/01_async_data_transfer_client.py @@ -59,8 +59,8 @@ async def main( ): log = logging.getLogger() - logging.basicConfig(yyformat="%(levelname)8s > %(message)s", level=get_log_level(debug, verbosity)) - + logging.basicConfig(yyformat="%(levelname)8s > %(message)s", level=get_log_level(verbosity, debug)) + dt_url = f"{url}/dt/api/v1" auth_url = f"{url}/auth/realms/rep" token = authenticate(username=username, password=password, verify=False, url=auth_url) @@ -112,7 +112,6 @@ async def main( op = await api.copy([SrcDst(src=src, dst=dst) for src, dst in zip(srcs, dsts)]) op = await api.wait_for([op.id]) - log.info(f"Operation {op[0].state}") files = glob.glob(os.path.join(os.path.dirname(__file__), "*.txt")) srcs = [StoragePath(path=file, remote="local") for file in files] @@ -120,12 +119,10 @@ async def main( op = await api.copy([SrcDst(src=src, dst=dst) for src, dst in zip(srcs, dsts)]) op = await api.wait_for([op.id]) - log.info(f"Operation {op[0].state}") log.info("Listing files ...") op = await api.list([StoragePath(path=base_dir)]) op = await api.wait_for([op.id]) - log.info(f"Operation {op[0].state}") log.info(f"Files in {base_dir}: {op[0].result}") log.info("Getting metadata ...") @@ -137,7 +134,6 @@ async def main( log.info("Removing files ...") op = await api.rmdir([StoragePath(path=base_dir)]) op = await api.wait_for([op.id]) - log.info(f"Operation {op[0].state}") await client.stop() diff --git a/examples/permissions/set_permissions_example.py b/examples/permissions/set_permissions_example.py index d94924c1..1eac9e76 100644 --- a/examples/permissions/set_permissions_example.py +++ b/examples/permissions/set_permissions_example.py @@ -242,7 +242,7 @@ def main( str, typer.Option(prompt=True, hide_input=True, help="Password to authenticate with") ] = "repadmin", ): - logging.basicConfig(format="%(levelname)8s > %(message)s", level=get_log_level(debug, verbosity)) + logging.basicConfig(format="%(levelname)8s > %(message)s", level=get_log_level(verbosity, debug)) dt_url = f"{url}/dt/api/v1" auth_url = f"{url}/auth/realms/rep" diff --git a/examples/startup/data_transfer_runner.py b/examples/startup/data_transfer_runner.py index 33645f51..2ea54e31 100644 --- a/examples/startup/data_transfer_runner.py +++ b/examples/startup/data_transfer_runner.py @@ -56,7 +56,7 @@ def main( auth_url = f"{url}/auth/realms/rep" log = logging.getLogger() - logging.basicConfig(format="%(levelname)8s > %(message)s", level=get_log_level(debug, verbosity)) + logging.basicConfig(format="%(levelname)8s > %(message)s", level=get_log_level(verbosity, debug)) user_token = authenticate(username=username, password=password, verify=False, url=auth_url) user_token = user_token.get("access_token", None) diff --git a/examples/transfer_files/_downloaded/transfer_files.py b/examples/transfer_files/_downloaded/transfer_files.py new file mode 100644 index 00000000..bd19f202 --- /dev/null +++ b/examples/transfer_files/_downloaded/transfer_files.py @@ -0,0 +1,201 @@ +# Copyright (C) 2024 - 2025 ANSYS, Inc. and/or its affiliates. +# SPDX-License-Identifier: MIT +# +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +""" +.. _ref_transfer: + +============== +Transfer files +============== + +This example script transfers files to remote backends and back using the data transfer service. +The ``local-path`` argument is the path to the files or directory to transfer. Wildcards are supported. + +Example usage: ``python examples/transfer_files.py --local_path=examples/basic/files/* --remote-path=hello --debug`` + +""" + +################################################### +# Perform necessary imports +# ========================= +import filecmp +import glob +import logging +import os +from pathlib import Path +from time import perf_counter +from typing import Optional + +from humanfriendly import format_size +import typer +from typing_extensions import Annotated + +from ansys.hps.data_transfer.client import Client, DataTransferApi, get_log_level +from ansys.hps.data_transfer.client.authenticate import authenticate +from ansys.hps.data_transfer.client.models.msg import SrcDst, StoragePath + +log = logging.getLogger(__name__) + +######################################################################## +# Define a method to transfer files using the data transfer service +# ================================================================= +def transfer_files(api: DataTransferApi, local_path: str, remote_path: Optional[str] = None): + """Transfer files to remote backends and back using new data transfer service.""" + if not remote_path: + remote_path = Path(local_path).parent.name + local_dir = os.path.dirname(local_path) + + log.info(10 * "=") + log.info(f"Transfer files: {local_path} to {remote_path}") + + log.info("== Removing remote directory if it exists ...") + op = api.rmdir([StoragePath(path=remote_path)]) + op = api.wait_for([op.id]) + + + log.info("== Creating remote directory ...") + mkdir_op = api.mkdir([StoragePath(path=remote_path)]) + api.wait_for([mkdir_op.id]) + log.info(f"Directory {remote_path} created") + + local_files = [f for f in glob.glob(local_path, recursive=True)] + log.info(f"== Uploading {len(local_files)} files ...") + log.debug(f"Local files: {local_files}") + + copy_args = [ + SrcDst( + src=StoragePath(path=f, remote="local"), + dst=StoragePath(path=f"{remote_path}/{os.path.basename(f)}", remote="any"), + ) + for f in local_files + ] + t0 = perf_counter() + op = api.copy(copy_args) + op = api.wait_for([op.id]) + t1 = perf_counter() + + + log.info(f"== Querying files and metadata in {remote_path} ...") + op = api.list([StoragePath(path=remote_path)]) + op = api.wait_for([op.id]) + + log.debug(f"Files in {remote_path}: {op[0].result}") + fnames = op[0].result[f"any:{remote_path}"] + + op = api.get_metadata([StoragePath(path=f"{remote_path}/{fname}") for fname in fnames]) + op = api.wait_for(op.id) + + log.debug(f"Metadata for {remote_path}: {op[0].result}") + + log.info("== List of uploaded files:") + total_size = 0 + for fname in fnames: + size = op[0].result[f"{remote_path}/{fname}"].get("size", 0) + total_size += size + checksum = op[0].result[f"{remote_path}/{fname}"].get("checksum") + log.info(f"- name={fname} size={format_size(size)} checksum={checksum if checksum else 'n/a'}") + + log.info("== Upload performance:") + log.info(f"- Total time: {t1-t0:.5f} s") + log.info(f"- Total size: {format_size(total_size)}") + log.info(f"- Throughput: {format_size(total_size / (t1 - t0) )}/s") + + log.info("== Downloading files again") + copy_args = [ + SrcDst( + src=StoragePath(path=f"{remote_path}/{fname}", remote="any"), + dst=StoragePath(path=f"{local_dir}_downloaded/{fname}", remote="local"), + ) + for fname in fnames + ] + t0 = perf_counter() + op = api.copy(copy_args) + op = api.wait_for([op.id]) + t1 = perf_counter() + + + log.info("== Download performance:") + log.info(f"- Total time: {t1-t0:.5f} s") + log.info(f"- Total size: {format_size(total_size)}") + log.info(f"- Throughput: {format_size(total_size / (t1 - t0) )}/s") + + log.info("== Comparing files ...") + for fname in fnames: + success = filecmp.cmp(f"{local_dir}/{fname}", f"{local_dir}_downloaded/{fname}", shallow=True) + log.info(f"- {fname}: {'Success' if success else 'Failed'}") + assert success, f"File {fname} comparison failed!" + +################################################### +# Define the main function +# ======================== +def main( + local_path: Annotated[str, typer.Option(help="Path to the files or directory to transfer. Supports wildcards")], + remote_path: Annotated[str, typer.Option(help="Optional path to the remote directory to transfer files to")] = None, + debug: Annotated[bool, typer.Option(help="Enable debug logging")] = False, + verbosity: Annotated[int, typer.Option(help="Increase verbosity")] = 1, + url: Annotated[str, typer.Option(help="HPS URL to connect to")] = "https://localhost:8443/hps", + username: Annotated[str, typer.Option(help="Username to authenticate with")] = "repadmin", + password: Annotated[ + str, typer.Option(prompt=True, hide_input=True, help="Password to authenticate with") + ] = "repadmin", +): + logging.basicConfig() + logging.basicConfig(format="%(levelname)8s > %(message)s", level=get_log_level(verbosity, debug)) + + dt_url = f"{url}/dt/api/v1" + auth_url = f"{url}/auth/realms/rep" + + token = authenticate(username=username, password=password, verify=False, url=auth_url) + token = token.get("access_token", None) + assert token is not None + +################################################### +# Create a ``Client`` instance +# ============================ + client = Client(clean=True) + + client.binary_config.update( + verbosity=verbosity, + debug=debug, + insecure=True, + token=token, + data_transfer_url=dt_url, + ) + client.start() +################################################### +# Create a ``DataTransferApi`` instance +# ===================================== + api = DataTransferApi(client) + api.status(wait=True) +################################################### +# Get available storages +# ====================== + storage_names = [f"{s['name']}({s['type']})" for s in api.storages()] + log.info(f"Available storages: {storage_names}") + + transfer_files(api=api, local_path=local_path, remote_path=remote_path) + + client.stop() + + +if __name__ == "__main__": + typer.run(main) diff --git a/examples/transfer_files/transfer_files.py b/examples/transfer_files/transfer_files.py index b334008d..bd19f202 100644 --- a/examples/transfer_files/transfer_files.py +++ b/examples/transfer_files/transfer_files.py @@ -70,7 +70,7 @@ def transfer_files(api: DataTransferApi, local_path: str, remote_path: Optional[ log.info("== Removing remote directory if it exists ...") op = api.rmdir([StoragePath(path=remote_path)]) op = api.wait_for([op.id]) - log.debug(f"Operation {op[0].state}") + log.info("== Creating remote directory ...") mkdir_op = api.mkdir([StoragePath(path=remote_path)]) @@ -92,18 +92,18 @@ def transfer_files(api: DataTransferApi, local_path: str, remote_path: Optional[ op = api.copy(copy_args) op = api.wait_for([op.id]) t1 = perf_counter() - log.debug(f"Operation {op[0].state}") + log.info(f"== Querying files and metadata in {remote_path} ...") op = api.list([StoragePath(path=remote_path)]) op = api.wait_for([op.id]) - log.debug(f"Operation {op[0].state}") + log.debug(f"Files in {remote_path}: {op[0].result}") fnames = op[0].result[f"any:{remote_path}"] op = api.get_metadata([StoragePath(path=f"{remote_path}/{fname}") for fname in fnames]) op = api.wait_for(op.id) - log.debug(f"Operation {op[0].state}") + log.debug(f"Metadata for {remote_path}: {op[0].result}") log.info("== List of uploaded files:") @@ -131,7 +131,7 @@ def transfer_files(api: DataTransferApi, local_path: str, remote_path: Optional[ op = api.copy(copy_args) op = api.wait_for([op.id]) t1 = perf_counter() - log.debug(f"Operation {op[0].state}") + log.info("== Download performance:") log.info(f"- Total time: {t1-t0:.5f} s") @@ -159,7 +159,7 @@ def main( ] = "repadmin", ): logging.basicConfig() - logging.basicConfig(format="%(levelname)8s > %(message)s", level=get_log_level(debug, verbosity)) + logging.basicConfig(format="%(levelname)8s > %(message)s", level=get_log_level(verbosity, debug)) dt_url = f"{url}/dt/api/v1" auth_url = f"{url}/auth/realms/rep" diff --git a/src/ansys/hps/data_transfer/client/__init__.py b/src/ansys/hps/data_transfer/client/__init__.py index 9e1aac9f..38482e7f 100644 --- a/src/ansys/hps/data_transfer/client/__init__.py +++ b/src/ansys/hps/data_transfer/client/__init__.py @@ -23,6 +23,6 @@ from .__version__ import __version__ from .api import AsyncDataTransferApi, DataTransferApi +from .binary import get_log_level from .client import AsyncClient, Client from .exceptions import APIError, ClientError, HPSError -from .binary import get_log_level diff --git a/src/ansys/hps/data_transfer/client/api/api.py b/src/ansys/hps/data_transfer/client/api/api.py index a466aa53..ab64404d 100644 --- a/src/ansys/hps/data_transfer/client/api/api.py +++ b/src/ansys/hps/data_transfer/client/api/api.py @@ -53,10 +53,52 @@ from ..models.permissions import RoleAssignment, RoleQuery from ..utils.jitter import get_expo_backoff from .retry import retry +from dateutil import parser log = logging.getLogger(__name__) +class DefaultOperationHandler: + """Allows additional handling of the operations.""" + + def __init__(self): + """Initializes the OperationHandler class object.""" + self.start = time.time() + self.report_threshold = 60.0 # seconds + + def __call__(self, ops: list[Operation]): + """Handle operations after they are fetched.""" + so_far = time.time() - self.start + done_ops = [op.state for op in ops if op.state in [OperationState.Succeeded, OperationState.Failed]] + for op in ops: + if op.state in [OperationState.Succeeded, OperationState.Failed]: + try: + start = parser.parse(op.started_at) + end = parser.parse(op.ended_at) + duration = hf.format_timespan(end - start) + except Exception: + duration = "unknown" + op_type = "operation" if len(op.children) == 0 else "operation group" + + extras = "" + if op.info: + for k, v in op.info.items(): + extras += f", {k} {v}" + log.info(f"{op_type.capitalize()} '{op.description}'({op.id}) has {self._formatState(op)}, took {duration}") + + num_running = len(ops) - len(done_ops) + if num_running > 0 and so_far > self.report_threshold: + log.info(f"Waiting for {num_running} operations to complete. {hf.format_timespan(so_far)} so far ...") + else: + duration = hf.format_timespan(time.time() - self.start) + log.debug(f"Completed {duration} operations after {duration}") + + def _formatState(self, op: Operation) -> str: + """Format the state of the operation.""" + s = f"{op.state.value}" + return s + + class DataTransferApi: """Provides the data transfer API. @@ -283,6 +325,7 @@ def wait_for( cap: float = 2.0, raise_on_error: bool = False, progress_handler: Callable[[str, float], None] = None, + operation_handler: Callable[[builtins.list[Operation]], None] = None, ): """Wait for operations to complete. @@ -301,6 +344,9 @@ def wait_for( progress_handler: Callable[[str, float], None] A function to handle progress updates. Default is None. """ + if operation_handler is None: + operation_handler = DefaultOperationHandler() + if not isinstance(operation_ids, list): operation_ids = [operation_ids] operation_ids = [op.id if isinstance(op, Operation | OpIdResponse) else op for op in operation_ids] @@ -312,22 +358,11 @@ def wait_for( attempt += 1 try: ops = self._operations(operation_ids) - so_far = hf.format_timespan(time.time() - start) - log.debug(f"Waiting for {len(operation_ids)} operations to complete, {so_far} so far") - if self.client.binary_config.debug: - for op in ops: - fields = [ - f"id={op.id}", - f"state={op.state}", - f"start={op.started_at}", - f"succeeded_on={op.succeeded_on}", - ] - if op.progress > 0: - fields.append(f"progress={op.progress:.3f}") - log.debug(f"- Operation '{op.description}' {' '.join(fields)}") if progress_handler is not None: for op in ops: progress_handler(op.id, op.progress) + if operation_handler is not None: + operation_handler(ops) if all(op.state in [OperationState.Succeeded, OperationState.Failed] for op in ops): break except Exception as e: @@ -340,10 +375,8 @@ def wait_for( # TODO: Adjust based on transfer speed and file size duration = get_expo_backoff(interval, attempts=attempt, cap=cap, jitter=True) - if self.client.binary_config.debug: - log.debug(f"Next check in {hf.format_timespan(duration)} ...") + # if self.client.binary_config.debug: + # log.debug(f"Next check in {hf.format_timespan(duration)} ...") time.sleep(duration) - duration = hf.format_timespan(time.time() - start) - log.debug(f"Operations completed after {duration}: {op_str}") return ops diff --git a/src/ansys/hps/data_transfer/client/binary.py b/src/ansys/hps/data_transfer/client/binary.py index bf4fb6b2..50f26546 100644 --- a/src/ansys/hps/data_transfer/client/binary.py +++ b/src/ansys/hps/data_transfer/client/binary.py @@ -58,12 +58,14 @@ 3: logging.DEBUG, } + def get_log_level(verbosity: int, debug: bool = False) -> int: """Get the log level based on verbosity and debug flag.""" if debug: return logging.DEBUG return verbosity_map.get(verbosity, logging.INFO) + class PrepareSubprocess: """Provides for letting the context manager disable ``vfork`` and ``posix_spawn`` in the subprocess.""" @@ -86,7 +88,8 @@ def __exit__(self, exc_type, exc_val, exc_tb): subprocess._USE_VFORK = self._orig_use_vfork subprocess._USE_POSIX_SPAWN = self._orig_use_pspawn -def default_log_message(debug : bool, data : dict[str, any]): + +def default_log_message(debug: bool, data: dict[str, any]): """Default log message handler. Parameters @@ -118,6 +121,7 @@ def default_log_message(debug : bool, data : dict[str, any]): msg = msg.encode("ascii", errors="ignore").decode().strip() log.log(level_no, f"{msg}") + class BinaryConfig: """Provides for configuring the worker binary connection to the HPS data transfer client. diff --git a/src/ansys/hps/data_transfer/client/client.py b/src/ansys/hps/data_transfer/client/client.py index e6e0a578..a25ba9e2 100644 --- a/src/ansys/hps/data_transfer/client/client.py +++ b/src/ansys/hps/data_transfer/client/client.py @@ -355,7 +355,7 @@ def _platform(self): return f"{plat}-{arch}" def _prepare_bin_path(self, build_info): - log.debug(f"Server build info:") + log.debug("Server build info:") for k, v in build_info.items(): log.debug(f" {k}: {v}") version_hash = build_info["version_hash"] @@ -458,7 +458,7 @@ def _prepare_platform_binary(self): # log.debug(f"Marking binary as executable: {bin_path}") os.chmod(bin_path, st.st_mode | stat.S_IEXEC) # if self._bin_config.debug: - # log.debug(f"Binary mode: {stat.filemode(os.stat(bin_path).st_mode)}") + # log.debug(f"Binary mode: {stat.filemode(os.stat(bin_path).st_mode)}") except filelock.Timeout as ex: raise BinaryError(f"Failed to acquire lock for binary download: {lock_path}") from ex diff --git a/src/ansys/hps/data_transfer/client/models/metadata.py b/src/ansys/hps/data_transfer/client/models/metadata.py index 54d3b40f..e97cdadb 100644 --- a/src/ansys/hps/data_transfer/client/models/metadata.py +++ b/src/ansys/hps/data_transfer/client/models/metadata.py @@ -26,12 +26,12 @@ from typing import Any, Optional -from pydantic import BaseModel, Field, RootModel +from pydantic import BaseModel, RootModel class DataAssignment(BaseModel): compressed_size: int | None = None - compression: str | None = Field(None, description="Make sure to add new fields to MergeBase method down below") + compression: str | None = None custom: dict[str, Any] | None = None uncompressed_size: int | None = None diff --git a/src/ansys/hps/data_transfer/client/models/ops.py b/src/ansys/hps/data_transfer/client/models/ops.py index 4f5b677e..73a82bd0 100644 --- a/src/ansys/hps/data_transfer/client/models/ops.py +++ b/src/ansys/hps/data_transfer/client/models/ops.py @@ -44,6 +44,7 @@ class Operation(BaseModel): ended_at: str | None = None error: str | None = None id: str | None = None + info: dict[str, Any] | None = Field(None, description="Additional info about the operation, used for logging") messages: list[str] | None = None progress: float | None = None progress_current: int | None = None From 584048857dc9b0701792af8d30eb8b8ed7923b93 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Tue, 29 Jul 2025 10:15:49 +0200 Subject: [PATCH 12/32] Adjust formatting Allow group expansion via Meta sub-class --- src/ansys/hps/data_transfer/client/api/api.py | 81 +++++++++++-------- .../hps/data_transfer/client/api/async_api.py | 4 +- 2 files changed, 51 insertions(+), 34 deletions(-) diff --git a/src/ansys/hps/data_transfer/client/api/api.py b/src/ansys/hps/data_transfer/client/api/api.py index ab64404d..950e6fa5 100644 --- a/src/ansys/hps/data_transfer/client/api/api.py +++ b/src/ansys/hps/data_transfer/client/api/api.py @@ -30,6 +30,7 @@ from collections.abc import Callable import logging import textwrap +import traceback import time import backoff @@ -61,10 +62,14 @@ class DefaultOperationHandler: """Allows additional handling of the operations.""" + class Meta: + """Meta class for DefaultOperationHandler.""" + expand_groups = True + def __init__(self): """Initializes the OperationHandler class object.""" self.start = time.time() - self.report_threshold = 60.0 # seconds + self.report_threshold = 10.0 # seconds def __call__(self, ops: list[Operation]): """Handle operations after they are fetched.""" @@ -72,32 +77,41 @@ def __call__(self, ops: list[Operation]): done_ops = [op.state for op in ops if op.state in [OperationState.Succeeded, OperationState.Failed]] for op in ops: if op.state in [OperationState.Succeeded, OperationState.Failed]: - try: - start = parser.parse(op.started_at) - end = parser.parse(op.ended_at) - duration = hf.format_timespan(end - start) - except Exception: - duration = "unknown" - op_type = "operation" if len(op.children) == 0 else "operation group" - - extras = "" - if op.info: - for k, v in op.info.items(): - extras += f", {k} {v}" - log.info(f"{op_type.capitalize()} '{op.description}'({op.id}) has {self._formatState(op)}, took {duration}") + log.info(self._format_op(op)) + elif so_far > self.report_threshold: + self._format_op(op, progress=True) num_running = len(ops) - len(done_ops) + # num_completed = len(done_ops) if num_running > 0 and so_far > self.report_threshold: log.info(f"Waiting for {num_running} operations to complete. {hf.format_timespan(so_far)} so far ...") + # elif num_completed == len(ops): + # duration = hf.format_timespan(time.time() - self.start) + # log.debug(f"Completed {num_completed} operations after {duration}") + + def _format_op(self, op: Operation) -> str: + """Format the operation description.""" + op_type = "operation" if len(op.children) == 0 else "operation group" + + msg = f"{op_type.capitalize()} '{op.description}'({op.id})" + + try: + start = parser.parse(op.started_at) + end = parser.parse(op.ended_at) + duration = hf.format_timespan(end - start) + except Exception: + duration = "unknown" + + state = op.state.value + if op.state in [OperationState.Succeeded, OperationState.Failed]: + msg += f" has {state} after {duration}" else: - duration = hf.format_timespan(time.time() - self.start) - log.debug(f"Completed {duration} operations after {duration}") - - def _formatState(self, op: Operation) -> str: - """Format the state of the operation.""" - s = f"{op.state.value}" - return s + msg += f" is {state}, {duration} so far, progress {op.progress:.2f}%" + if op.info is not None: + info = ", ".join([f"{k}={v}" for k, v in op.info.items()]) + msg += ", " + info + return msg class DataTransferApi: """Provides the data transfer API. @@ -324,7 +338,6 @@ def wait_for( interval: float = 0.1, cap: float = 2.0, raise_on_error: bool = False, - progress_handler: Callable[[str, float], None] = None, operation_handler: Callable[[builtins.list[Operation]], None] = None, ): """Wait for operations to complete. @@ -341,8 +354,8 @@ def wait_for( The maximum backoff value used to calculate the next wait time. Default is 2.0. raise_on_error: bool Raise an exception if an error occurs. Default is False. - progress_handler: Callable[[str, float], None] - A function to handle progress updates. Default is None. + operation_handler: Callable[[builtins.list[Operation]], None] + A callable that will be called with the list of operations when they are fetched. """ if operation_handler is None: operation_handler = DefaultOperationHandler() @@ -352,17 +365,23 @@ def wait_for( operation_ids = [op.id if isinstance(op, Operation | OpIdResponse) else op for op in operation_ids] start = time.time() attempt = 0 - op_str = textwrap.wrap(", ".join(operation_ids), width=60, placeholder="...") - # log.debug(f"Waiting for operations to complete: {op_str}") while True: attempt += 1 try: ops = self._operations(operation_ids) - if progress_handler is not None: + + meta = getattr(operation_handler, "Meta", dict()) + expand_groups = getattr(meta, "expand_groups", False) + if expand_groups: + expanded = [] for op in ops: - progress_handler(op.id, op.progress) - if operation_handler is not None: - operation_handler(ops) + if not op.children: + continue + expanded.extend(self._operations(op.children)) + expanded.append(op) + if operation_handler is not None: + operation_handler(expanded) + if all(op.state in [OperationState.Succeeded, OperationState.Failed] for op in ops): break except Exception as e: @@ -375,8 +394,6 @@ def wait_for( # TODO: Adjust based on transfer speed and file size duration = get_expo_backoff(interval, attempts=attempt, cap=cap, jitter=True) - # if self.client.binary_config.debug: - # log.debug(f"Next check in {hf.format_timespan(duration)} ...") time.sleep(duration) return ops diff --git a/src/ansys/hps/data_transfer/client/api/async_api.py b/src/ansys/hps/data_transfer/client/api/async_api.py index 7fbe3e2a..e7c75f1b 100644 --- a/src/ansys/hps/data_transfer/client/api/async_api.py +++ b/src/ansys/hps/data_transfer/client/api/async_api.py @@ -264,8 +264,8 @@ async def wait_for( # TODO: Adjust based on transfer speed and file size duration = get_expo_backoff(interval, attempts=attempt, cap=cap, jitter=True) - if self.client.binary_config.debug: - log.debug(f"Next check in {hf.format_timespan(duration)} ...") + # if self.client.binary_config.debug: + # log.debug(f"Next check in {hf.format_timespan(duration)} ...") await asyncio.sleep(duration) duration = hf.format_timespan(time.time() - start) From 8b59a7115d4b56a82e2c992ce58639b4ff540b9d Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Tue, 29 Jul 2025 10:23:58 +0200 Subject: [PATCH 13/32] Add setter/gettter --- src/ansys/hps/data_transfer/client/binary.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/ansys/hps/data_transfer/client/binary.py b/src/ansys/hps/data_transfer/client/binary.py index 50f26546..bddbe470 100644 --- a/src/ansys/hps/data_transfer/client/binary.py +++ b/src/ansys/hps/data_transfer/client/binary.py @@ -248,6 +248,18 @@ def env(self, value): raise TypeError("Environment variables must be a dictionary.") self._env = value + @property + def log_message(self): + """Get the log message handler.""" + return self._log_message + + @log_message.setter + def log_message(self, value): + """Set the log message handler.""" + if not callable(value): + raise TypeError("Log message handler must be a callable.") + self._log_message = value + class Binary: """Provides for starting, stopping, and monitoring the worker binary. From fc065af5d12593946ac86da2680d12a8bec122f2 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Tue, 29 Jul 2025 11:06:31 +0200 Subject: [PATCH 14/32] Add async operation handler --- examples/basic/00_file_operations_client.py | 1 + .../basic/01_async_data_transfer_client.py | 2 +- .../hps/data_transfer/client/api/__init__.py | 1 + src/ansys/hps/data_transfer/client/api/api.py | 56 +---------- .../hps/data_transfer/client/api/async_api.py | 39 ++++---- .../hps/data_transfer/client/api/handler.py | 99 +++++++++++++++++++ 6 files changed, 123 insertions(+), 75 deletions(-) create mode 100644 src/ansys/hps/data_transfer/client/api/handler.py diff --git a/examples/basic/00_file_operations_client.py b/examples/basic/00_file_operations_client.py index f88de62e..3ecec973 100644 --- a/examples/basic/00_file_operations_client.py +++ b/examples/basic/00_file_operations_client.py @@ -115,6 +115,7 @@ def main( client.binary_config.update( verbosity=verbosity, debug=debug, + log=debug, insecure=True, token=token, data_transfer_url=dt_url, diff --git a/examples/basic/01_async_data_transfer_client.py b/examples/basic/01_async_data_transfer_client.py index 6df3af5f..655fc032 100644 --- a/examples/basic/01_async_data_transfer_client.py +++ b/examples/basic/01_async_data_transfer_client.py @@ -59,7 +59,7 @@ async def main( ): log = logging.getLogger() - logging.basicConfig(yyformat="%(levelname)8s > %(message)s", level=get_log_level(verbosity, debug)) + logging.basicConfig(format="%(levelname)8s > %(message)s", level=get_log_level(verbosity, debug)) dt_url = f"{url}/dt/api/v1" auth_url = f"{url}/auth/realms/rep" diff --git a/src/ansys/hps/data_transfer/client/api/__init__.py b/src/ansys/hps/data_transfer/client/api/__init__.py index b3b29492..8eee6ef3 100644 --- a/src/ansys/hps/data_transfer/client/api/__init__.py +++ b/src/ansys/hps/data_transfer/client/api/__init__.py @@ -24,3 +24,4 @@ from .api import DataTransferApi from .async_api import AsyncDataTransferApi +from .handler import DefaultOperationHandler, AsyncOperationHandler diff --git a/src/ansys/hps/data_transfer/client/api/api.py b/src/ansys/hps/data_transfer/client/api/api.py index 950e6fa5..db4da75a 100644 --- a/src/ansys/hps/data_transfer/client/api/api.py +++ b/src/ansys/hps/data_transfer/client/api/api.py @@ -55,64 +55,10 @@ from ..utils.jitter import get_expo_backoff from .retry import retry from dateutil import parser +from .handler import DefaultOperationHandler log = logging.getLogger(__name__) - -class DefaultOperationHandler: - """Allows additional handling of the operations.""" - - class Meta: - """Meta class for DefaultOperationHandler.""" - expand_groups = True - - def __init__(self): - """Initializes the OperationHandler class object.""" - self.start = time.time() - self.report_threshold = 10.0 # seconds - - def __call__(self, ops: list[Operation]): - """Handle operations after they are fetched.""" - so_far = time.time() - self.start - done_ops = [op.state for op in ops if op.state in [OperationState.Succeeded, OperationState.Failed]] - for op in ops: - if op.state in [OperationState.Succeeded, OperationState.Failed]: - log.info(self._format_op(op)) - elif so_far > self.report_threshold: - self._format_op(op, progress=True) - - num_running = len(ops) - len(done_ops) - # num_completed = len(done_ops) - if num_running > 0 and so_far > self.report_threshold: - log.info(f"Waiting for {num_running} operations to complete. {hf.format_timespan(so_far)} so far ...") - # elif num_completed == len(ops): - # duration = hf.format_timespan(time.time() - self.start) - # log.debug(f"Completed {num_completed} operations after {duration}") - - def _format_op(self, op: Operation) -> str: - """Format the operation description.""" - op_type = "operation" if len(op.children) == 0 else "operation group" - - msg = f"{op_type.capitalize()} '{op.description}'({op.id})" - - try: - start = parser.parse(op.started_at) - end = parser.parse(op.ended_at) - duration = hf.format_timespan(end - start) - except Exception: - duration = "unknown" - - state = op.state.value - if op.state in [OperationState.Succeeded, OperationState.Failed]: - msg += f" has {state} after {duration}" - else: - msg += f" is {state}, {duration} so far, progress {op.progress:.2f}%" - - if op.info is not None: - info = ", ".join([f"{k}={v}" for k, v in op.info.items()]) - msg += ", " + info - return msg - class DataTransferApi: """Provides the data transfer API. diff --git a/src/ansys/hps/data_transfer/client/api/async_api.py b/src/ansys/hps/data_transfer/client/api/async_api.py index e7c75f1b..39f0e09c 100644 --- a/src/ansys/hps/data_transfer/client/api/async_api.py +++ b/src/ansys/hps/data_transfer/client/api/async_api.py @@ -54,6 +54,7 @@ from ..models.permissions import RoleAssignment, RoleQuery from ..utils.jitter import get_expo_backoff from .retry import retry +from .handler import AsyncOperationHandler log = logging.getLogger(__name__) @@ -205,7 +206,8 @@ async def wait_for( interval: float = 0.1, cap: float = 2.0, raise_on_error: bool = False, - progress_handler: Callable[[str, float], Awaitable[None]] = None, + # progress_handler: Callable[[str, float], Awaitable[None]] = None, + operation_handler: Callable[[builtins.list[Operation]], Awaitable[None]] = None, ): """Provides an async interface to wait for a list of operations to complete. @@ -221,10 +223,12 @@ async def wait_for( The maximum backoff value used to calculate the next wait time. Default is 2.0. raise_on_error: bool Raise an exception if an error occurs. Default is False. - progress_handler: Callable[[str, float], None] - A async function to handle progress updates. Default is None. - + operation_handler: Callable[[builtins.list[Operation]], None] + A callable that will be called with the list of operations when they are fetched. """ + if operation_handler is None: + operation_handler = AsyncOperationHandler() + if not isinstance(operation_ids, list): operation_ids = [operation_ids] operation_ids = [op.id if isinstance(op, Operation | OpIdResponse) else op for op in operation_ids] @@ -236,22 +240,19 @@ async def wait_for( attempt += 1 try: ops = await self._operations(operation_ids) - so_far = hf.format_timespan(time.time() - start) - log.debug(f"Waiting for {len(operation_ids)} operations to complete, {so_far} so far") - if self.client.binary_config.debug: - for op in ops: - fields = [ - f"id={op.id}", - f"state={op.state}", - f"start={op.started_at}", - f"succeeded_on={op.succeeded_on}", - ] - if op.progress > 0: - fields.append(f"progress={op.progress:.3f}") - log.debug(f"- Operation '{op.description}' {' '.join(fields)}") - if progress_handler is not None: + + meta = getattr(operation_handler, "Meta", dict()) + expand_groups = getattr(meta, "expand_groups", False) + if expand_groups: + expanded = [] for op in ops: - await progress_handler(op.id, op.progress) + if not op.children: + continue + expanded.extend(await self._operations(op.children)) + expanded.append(op) + if operation_handler is not None: + await operation_handler(expanded) + if all(op.state in [OperationState.Succeeded, OperationState.Failed] for op in ops): break except Exception as e: diff --git a/src/ansys/hps/data_transfer/client/api/handler.py b/src/ansys/hps/data_transfer/client/api/handler.py new file mode 100644 index 00000000..bf2ae3ef --- /dev/null +++ b/src/ansys/hps/data_transfer/client/api/handler.py @@ -0,0 +1,99 @@ +import builtins +from collections.abc import Callable +import logging +import textwrap +import traceback +import time + +import backoff +import humanfriendly as hf + +from ..client import Client +from ..exceptions import TimeoutError +from ..models.metadata import DataAssignment +from ..models.msg import ( + CheckPermissionsResponse, + GetPermissionsResponse, + OpIdResponse, + OpsResponse, + SetMetadataRequest, + SrcDst, + Status, + StorageConfigResponse, + StoragePath, +) +from ..models.ops import Operation, OperationState +from ..models.permissions import RoleAssignment, RoleQuery +from ..utils.jitter import get_expo_backoff +from .retry import retry +from dateutil import parser + +log = logging.getLogger(__name__) + + +class DefaultOperationHandler: + """Allows additional handling of the operations.""" + + class Meta: + """Meta class for DefaultOperationHandler.""" + expand_groups = True + + def __init__(self): + """Initializes the OperationHandler class object.""" + self.start = time.time() + self.report_threshold = 10.0 # seconds + + def __call__(self, ops: list[Operation]): + """Handle operations after they are fetched.""" + self._log_ops(ops) + + def _log_ops(self, ops: list[Operation]) -> str: + so_far = time.time() - self.start + done_ops = [op.state for op in ops if op.state in [OperationState.Succeeded, OperationState.Failed]] + for op in ops: + if op.state in [OperationState.Succeeded, OperationState.Failed]: + log.info(self._format_op(op)) + elif so_far > self.report_threshold: + self._format_op(op, progress=True) + + num_running = len(ops) - len(done_ops) + # num_completed = len(done_ops) + if num_running > 0 and so_far > self.report_threshold: + log.info(f"Waiting for {num_running} operations to complete. {hf.format_timespan(so_far)} so far ...") + # elif num_completed == len(ops): + # duration = hf.format_timespan(time.time() - self.start) + # log.debug(f"Completed {num_completed} operations after {duration}") + + def _format_op(self, op: Operation) -> str: + """Format the operation description.""" + op_type = "operation" if len(op.children) == 0 else "operation group" + + msg = f"{op_type.capitalize()} '{op.description}'({op.id})" + + try: + start = parser.parse(op.started_at) + end = parser.parse(op.ended_at) + duration = hf.format_timespan(end - start) + except Exception: + duration = "unknown" + + state = op.state.value + if op.state in [OperationState.Succeeded, OperationState.Failed]: + msg += f" has {state} after {duration}" + else: + msg += f" is {state}, {duration} so far, progress {op.progress:.2f}%" + + if op.info is not None: + info = ", ".join([f"{k}={v}" for k, v in op.info.items()]) + msg += ", " + info + return msg + +class AsyncOperationHandler(DefaultOperationHandler): + """Asynchronous operation handler for operations.""" + + def __init__(self): + """Initializes the AsyncOperationHandler class object.""" + super().__init__() + + async def __call__(self, ops: list[Operation]): + self._log_ops(ops) \ No newline at end of file From 983fbb97adca1111048f400e782d482cfc5fdff9 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Tue, 29 Jul 2025 15:52:28 +0200 Subject: [PATCH 15/32] Rework the handling using expand argument --- .../_downloaded/transfer_files.py | 10 +- examples/transfer_files/transfer_files.py | 10 +- .../hps/data_transfer/client/api/__init__.py | 2 +- src/ansys/hps/data_transfer/client/api/api.py | 35 ++---- .../hps/data_transfer/client/api/async_api.py | 30 ++--- .../hps/data_transfer/client/api/handler.py | 114 ++++++++++-------- src/ansys/hps/data_transfer/client/binary.py | 20 +-- .../hps/data_transfer/client/models/ops.py | 6 +- 8 files changed, 111 insertions(+), 116 deletions(-) diff --git a/examples/transfer_files/_downloaded/transfer_files.py b/examples/transfer_files/_downloaded/transfer_files.py index bd19f202..f9b8f5b3 100644 --- a/examples/transfer_files/_downloaded/transfer_files.py +++ b/examples/transfer_files/_downloaded/transfer_files.py @@ -70,7 +70,7 @@ def transfer_files(api: DataTransferApi, local_path: str, remote_path: Optional[ log.info("== Removing remote directory if it exists ...") op = api.rmdir([StoragePath(path=remote_path)]) op = api.wait_for([op.id]) - + log.info("== Creating remote directory ...") mkdir_op = api.mkdir([StoragePath(path=remote_path)]) @@ -92,18 +92,18 @@ def transfer_files(api: DataTransferApi, local_path: str, remote_path: Optional[ op = api.copy(copy_args) op = api.wait_for([op.id]) t1 = perf_counter() - + log.info(f"== Querying files and metadata in {remote_path} ...") op = api.list([StoragePath(path=remote_path)]) op = api.wait_for([op.id]) - + log.debug(f"Files in {remote_path}: {op[0].result}") fnames = op[0].result[f"any:{remote_path}"] op = api.get_metadata([StoragePath(path=f"{remote_path}/{fname}") for fname in fnames]) op = api.wait_for(op.id) - + log.debug(f"Metadata for {remote_path}: {op[0].result}") log.info("== List of uploaded files:") @@ -131,7 +131,7 @@ def transfer_files(api: DataTransferApi, local_path: str, remote_path: Optional[ op = api.copy(copy_args) op = api.wait_for([op.id]) t1 = perf_counter() - + log.info("== Download performance:") log.info(f"- Total time: {t1-t0:.5f} s") diff --git a/examples/transfer_files/transfer_files.py b/examples/transfer_files/transfer_files.py index bd19f202..f9b8f5b3 100644 --- a/examples/transfer_files/transfer_files.py +++ b/examples/transfer_files/transfer_files.py @@ -70,7 +70,7 @@ def transfer_files(api: DataTransferApi, local_path: str, remote_path: Optional[ log.info("== Removing remote directory if it exists ...") op = api.rmdir([StoragePath(path=remote_path)]) op = api.wait_for([op.id]) - + log.info("== Creating remote directory ...") mkdir_op = api.mkdir([StoragePath(path=remote_path)]) @@ -92,18 +92,18 @@ def transfer_files(api: DataTransferApi, local_path: str, remote_path: Optional[ op = api.copy(copy_args) op = api.wait_for([op.id]) t1 = perf_counter() - + log.info(f"== Querying files and metadata in {remote_path} ...") op = api.list([StoragePath(path=remote_path)]) op = api.wait_for([op.id]) - + log.debug(f"Files in {remote_path}: {op[0].result}") fnames = op[0].result[f"any:{remote_path}"] op = api.get_metadata([StoragePath(path=f"{remote_path}/{fname}") for fname in fnames]) op = api.wait_for(op.id) - + log.debug(f"Metadata for {remote_path}: {op[0].result}") log.info("== List of uploaded files:") @@ -131,7 +131,7 @@ def transfer_files(api: DataTransferApi, local_path: str, remote_path: Optional[ op = api.copy(copy_args) op = api.wait_for([op.id]) t1 = perf_counter() - + log.info("== Download performance:") log.info(f"- Total time: {t1-t0:.5f} s") diff --git a/src/ansys/hps/data_transfer/client/api/__init__.py b/src/ansys/hps/data_transfer/client/api/__init__.py index 8eee6ef3..091cfa03 100644 --- a/src/ansys/hps/data_transfer/client/api/__init__.py +++ b/src/ansys/hps/data_transfer/client/api/__init__.py @@ -24,4 +24,4 @@ from .api import DataTransferApi from .async_api import AsyncDataTransferApi -from .handler import DefaultOperationHandler, AsyncOperationHandler +from .handler import AsyncOperationHandler, DefaultOperationHandler diff --git a/src/ansys/hps/data_transfer/client/api/api.py b/src/ansys/hps/data_transfer/client/api/api.py index db4da75a..de61f70a 100644 --- a/src/ansys/hps/data_transfer/client/api/api.py +++ b/src/ansys/hps/data_transfer/client/api/api.py @@ -29,12 +29,9 @@ import builtins from collections.abc import Callable import logging -import textwrap -import traceback import time import backoff -import humanfriendly as hf from ..client import Client from ..exceptions import TimeoutError @@ -53,12 +50,12 @@ from ..models.ops import Operation, OperationState from ..models.permissions import RoleAssignment, RoleQuery from ..utils.jitter import get_expo_backoff -from .retry import retry -from dateutil import parser from .handler import DefaultOperationHandler +from .retry import retry log = logging.getLogger(__name__) + class DataTransferApi: """Provides the data transfer API. @@ -97,7 +94,7 @@ def _sleep(): return s @retry() - def operations(self, ids: list[str]): + def operations(self, ids: list[str], expand: bool = False): """Get a list of operations. Parameters @@ -105,7 +102,7 @@ def operations(self, ids: list[str]): ids: List[str] List of IDs. """ - return self._operations(ids) + return self._operations(ids, expand=expand) def storages(self): """Get types of storages available on the storage backend.""" @@ -188,9 +185,12 @@ def _exec_operation_req( r = OpIdResponse(**json) return r - def _operations(self, ids: builtins.list[str]): + def _operations(self, ids: builtins.list[str], expand: bool = False): url = "/operations" - resp = self.client.session.get(url, params={"ids": ids}) + params = {"ids": ids} + if expand: + params["expand"] = "true" + resp = self.client.session.get(url, params=params) json = resp.json() return OpsResponse(**json).operations @@ -314,20 +314,9 @@ def wait_for( while True: attempt += 1 try: - ops = self._operations(operation_ids) - - meta = getattr(operation_handler, "Meta", dict()) - expand_groups = getattr(meta, "expand_groups", False) - if expand_groups: - expanded = [] - for op in ops: - if not op.children: - continue - expanded.extend(self._operations(op.children)) - expanded.append(op) - if operation_handler is not None: - operation_handler(expanded) - + ops = self._operations(operation_ids, expand=True) + if operation_handler is not None: + operation_handler(ops) if all(op.state in [OperationState.Succeeded, OperationState.Failed] for op in ops): break except Exception as e: diff --git a/src/ansys/hps/data_transfer/client/api/async_api.py b/src/ansys/hps/data_transfer/client/api/async_api.py index 39f0e09c..7ad9b7c3 100644 --- a/src/ansys/hps/data_transfer/client/api/async_api.py +++ b/src/ansys/hps/data_transfer/client/api/async_api.py @@ -53,8 +53,8 @@ from ..models.ops import Operation, OperationState from ..models.permissions import RoleAssignment, RoleQuery from ..utils.jitter import get_expo_backoff -from .retry import retry from .handler import AsyncOperationHandler +from .retry import retry log = logging.getLogger(__name__) @@ -92,9 +92,9 @@ async def _sleep(): return s @retry() - async def operations(self, ids: list[str]): + async def operations(self, ids: list[str], expand: bool = False): """Provides an async interface to get a list of operations by their IDs.""" - return await self._operations(ids) + return await self._operations(ids, expand=expand) async def storages(self): """Provides an async interface to get the list of storage configurations.""" @@ -141,9 +141,12 @@ async def _exec_async_operation_req( json = resp.json() return OpIdResponse(**json) - async def _operations(self, ids: builtins.list[str]): + async def _operations(self, ids: builtins.list[str], expand: bool = False): url = "/operations" - resp = await self.client.session.get(url, params={"ids": ids}) + params = {"ids": ids} + if expand: + params["expand"] = "true" + resp = await self.client.session.get(url, params=params) json = resp.json() return OpsResponse(**json).operations @@ -239,20 +242,9 @@ async def wait_for( while True: attempt += 1 try: - ops = await self._operations(operation_ids) - - meta = getattr(operation_handler, "Meta", dict()) - expand_groups = getattr(meta, "expand_groups", False) - if expand_groups: - expanded = [] - for op in ops: - if not op.children: - continue - expanded.extend(await self._operations(op.children)) - expanded.append(op) - if operation_handler is not None: - await operation_handler(expanded) - + ops = await self._operations(operation_ids, expand=True) + if operation_handler is not None: + await operation_handler(ops) if all(op.state in [OperationState.Succeeded, OperationState.Failed] for op in ops): break except Exception as e: diff --git a/src/ansys/hps/data_transfer/client/api/handler.py b/src/ansys/hps/data_transfer/client/api/handler.py index bf2ae3ef..634ae301 100644 --- a/src/ansys/hps/data_transfer/client/api/handler.py +++ b/src/ansys/hps/data_transfer/client/api/handler.py @@ -1,32 +1,35 @@ -import builtins -from collections.abc import Callable +# Copyright (C) 2025 ANSYS, Inc. and/or its affiliates. +# SPDX-License-Identifier: MIT +# +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Provides functionality for monitoring of operations.""" + import logging -import textwrap -import traceback import time -import backoff +from dateutil import parser import humanfriendly as hf +import datetime -from ..client import Client -from ..exceptions import TimeoutError -from ..models.metadata import DataAssignment -from ..models.msg import ( - CheckPermissionsResponse, - GetPermissionsResponse, - OpIdResponse, - OpsResponse, - SetMetadataRequest, - SrcDst, - Status, - StorageConfigResponse, - StoragePath, -) from ..models.ops import Operation, OperationState -from ..models.permissions import RoleAssignment, RoleQuery -from ..utils.jitter import get_expo_backoff -from .retry import retry -from dateutil import parser log = logging.getLogger(__name__) @@ -34,9 +37,7 @@ class DefaultOperationHandler: """Allows additional handling of the operations.""" - class Meta: - """Meta class for DefaultOperationHandler.""" - expand_groups = True + final = [OperationState.Succeeded, OperationState.Failed] def __init__(self): """Initializes the OperationHandler class object.""" @@ -49,51 +50,60 @@ def __call__(self, ops: list[Operation]): def _log_ops(self, ops: list[Operation]) -> str: so_far = time.time() - self.start - done_ops = [op.state for op in ops if op.state in [OperationState.Succeeded, OperationState.Failed]] + num_running = 0 for op in ops: - if op.state in [OperationState.Succeeded, OperationState.Failed]: - log.info(self._format_op(op)) - elif so_far > self.report_threshold: - self._format_op(op, progress=True) + for ch in op.children_detail or []: + if ch.state not in self.final: + num_running += 1 + self._log_op(logging.DEBUG, ch) + + if op.state not in self.final: + num_running += 1 + self._log_op(logging.INFO, op) - num_running = len(ops) - len(done_ops) - # num_completed = len(done_ops) if num_running > 0 and so_far > self.report_threshold: log.info(f"Waiting for {num_running} operations to complete. {hf.format_timespan(so_far)} so far ...") - # elif num_completed == len(ops): - # duration = hf.format_timespan(time.time() - self.start) - # log.debug(f"Completed {num_completed} operations after {duration}") - def _format_op(self, op: Operation) -> str: + def _log_op(self, lvl: int, op: Operation): """Format the operation description.""" op_type = "operation" if len(op.children) == 0 else "operation group" - + msg = f"{op_type.capitalize()} '{op.description}'({op.id})" + op_done = op.state in self.final try: start = parser.parse(op.started_at) - end = parser.parse(op.ended_at) - duration = hf.format_timespan(end - start) - except Exception: - duration = "unknown" + if op_done: + end = parser.parse(op.ended_at) + else: + end = datetime.datetime.now(start.tzinfo) + duration = (end - start).seconds + durationStr = hf.format_timespan(end - start) + except Exception as ex: + log.debug(f"Failed to parse operation duration: {ex}") + duration = 0 + durationStr = "unknown" state = op.state.value - if op.state in [OperationState.Succeeded, OperationState.Failed]: - msg += f" has {state} after {duration}" + if op_done: + msg += f" has {state} after {durationStr}" + if op.info is not None: + info = ", ".join([f"{k}={v}" for k, v in op.info.items()]) + msg += ", " + info else: - msg += f" is {state}, {duration} so far, progress {op.progress:.2f}%" + msg += f" is {state}, {durationStr} so far, progress {op.progress:.2f}%" + + if op_done or duration > self.report_threshold: + log.log(lvl, msg) - if op.info is not None: - info = ", ".join([f"{k}={v}" for k, v in op.info.items()]) - msg += ", " + info - return msg class AsyncOperationHandler(DefaultOperationHandler): """Asynchronous operation handler for operations.""" - def __init__(self): + def __init__(self): """Initializes the AsyncOperationHandler class object.""" super().__init__() - + async def __call__(self, ops: list[Operation]): - self._log_ops(ops) \ No newline at end of file + """Handle operations after they are fetched.""" + self._log_ops(ops) diff --git a/src/ansys/hps/data_transfer/client/binary.py b/src/ansys/hps/data_transfer/client/binary.py index bddbe470..779e3baa 100644 --- a/src/ansys/hps/data_transfer/client/binary.py +++ b/src/ansys/hps/data_transfer/client/binary.py @@ -154,7 +154,7 @@ def __init__( # Required data_transfer_url: str = "https://localhost:8443/hps/dt/api/v1", # Process related settings - log: bool = True, + # log: bool = True, log_to_file: bool = False, log_message: callable = default_log_message, monitor_interval: float = 0.5, @@ -164,7 +164,7 @@ def __init__( token: str = None, host: str = "127.0.0.1", port: int = None, - verbosity: int = 1, + verbosity: int = 3, insecure: bool = False, debug: bool = False, auth_type: str = None, @@ -174,7 +174,7 @@ def __init__( self.data_transfer_url = data_transfer_url # Process related settings - self.log = log + self.log = debug self.log_to_file = log_to_file self._log_message = log_message self.monitor_interval = monitor_interval @@ -252,7 +252,7 @@ def env(self, value): def log_message(self): """Get the log message handler.""" return self._log_message - + @log_message.setter def log_message(self, value): """Set the log message handler.""" @@ -466,12 +466,12 @@ def _build_args(self): ] ) - self._args.extend( - [ - "-v", - str(self._config.verbosity), - ] - ) + # self._args.extend( + # [ + # "-v", + # str(self._config.verbosity), + # ] + # ) if self._config.insecure: self._args.append("--insecure") diff --git a/src/ansys/hps/data_transfer/client/models/ops.py b/src/ansys/hps/data_transfer/client/models/ops.py index 73a82bd0..a8488eb5 100644 --- a/src/ansys/hps/data_transfer/client/models/ops.py +++ b/src/ansys/hps/data_transfer/client/models/ops.py @@ -40,11 +40,12 @@ class OperationState(Enum): class Operation(BaseModel): children: list[str] | None = None + children_detail: list[Operation] | None = Field(None, description="Used when expanding groups") description: str | None = None ended_at: str | None = None error: str | None = None id: str | None = None - info: dict[str, Any] | None = Field(None, description="Additional info about the operation, used for logging") + info: dict[str, Any] | None = Field(None, description="Additional information about the operation") messages: list[str] | None = None progress: float | None = None progress_current: int | None = None @@ -56,3 +57,6 @@ class Operation(BaseModel): state: OperationState | None = None succeeded_on: list[str] | None = Field(None, description="Remotes that the operation succeeded on") user_id: str | None = None + + +Operation.model_rebuild() From c7d186c86a73e765c2455d0f311b37e52602268e Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Wed, 30 Jul 2025 09:48:55 +0200 Subject: [PATCH 16/32] Adjust handler naming Reduce progress frequency --- .../hps/data_transfer/client/api/__init__.py | 2 +- src/ansys/hps/data_transfer/client/api/api.py | 5 +-- .../hps/data_transfer/client/api/async_api.py | 5 +-- .../hps/data_transfer/client/api/handler.py | 31 ++++++++++--------- 4 files changed, 24 insertions(+), 19 deletions(-) diff --git a/src/ansys/hps/data_transfer/client/api/__init__.py b/src/ansys/hps/data_transfer/client/api/__init__.py index 091cfa03..dcc68e9b 100644 --- a/src/ansys/hps/data_transfer/client/api/__init__.py +++ b/src/ansys/hps/data_transfer/client/api/__init__.py @@ -24,4 +24,4 @@ from .api import DataTransferApi from .async_api import AsyncDataTransferApi -from .handler import AsyncOperationHandler, DefaultOperationHandler +from .handler import AsyncWaitHandler, WaitHandler diff --git a/src/ansys/hps/data_transfer/client/api/api.py b/src/ansys/hps/data_transfer/client/api/api.py index de61f70a..d86072b9 100644 --- a/src/ansys/hps/data_transfer/client/api/api.py +++ b/src/ansys/hps/data_transfer/client/api/api.py @@ -50,7 +50,7 @@ from ..models.ops import Operation, OperationState from ..models.permissions import RoleAssignment, RoleQuery from ..utils.jitter import get_expo_backoff -from .handler import DefaultOperationHandler +from .handler import WaitHandler from .retry import retry log = logging.getLogger(__name__) @@ -69,6 +69,7 @@ def __init__(self, client: Client): """Initializes the DataTransferApi class object.""" self.dump_mode = "json" self.client = client + self.wait_handler_class = WaitHandler @retry() def status(self, wait=False, sleep=5, jitter=True, timeout: float | None = 20.0): @@ -304,7 +305,7 @@ def wait_for( A callable that will be called with the list of operations when they are fetched. """ if operation_handler is None: - operation_handler = DefaultOperationHandler() + operation_handler = self.wait_handler_class() if not isinstance(operation_ids, list): operation_ids = [operation_ids] diff --git a/src/ansys/hps/data_transfer/client/api/async_api.py b/src/ansys/hps/data_transfer/client/api/async_api.py index 7ad9b7c3..365c416d 100644 --- a/src/ansys/hps/data_transfer/client/api/async_api.py +++ b/src/ansys/hps/data_transfer/client/api/async_api.py @@ -53,7 +53,7 @@ from ..models.ops import Operation, OperationState from ..models.permissions import RoleAssignment, RoleQuery from ..utils.jitter import get_expo_backoff -from .handler import AsyncOperationHandler +from .handler import AsyncWaitHandler from .retry import retry log = logging.getLogger(__name__) @@ -66,6 +66,7 @@ def __init__(self, client: AsyncClient): """Initialize the async data transfer API with the client object.""" self.dump_mode = "json" self.client = client + self.wait_handler_class = AsyncWaitHandler @retry() async def status(self, wait=False, sleep=5, jitter=True, timeout: float | None = 20.0): @@ -230,7 +231,7 @@ async def wait_for( A callable that will be called with the list of operations when they are fetched. """ if operation_handler is None: - operation_handler = AsyncOperationHandler() + operation_handler = self.wait_handler_class() if not isinstance(operation_ids, list): operation_ids = [operation_ids] diff --git a/src/ansys/hps/data_transfer/client/api/handler.py b/src/ansys/hps/data_transfer/client/api/handler.py index 634ae301..b1c60f26 100644 --- a/src/ansys/hps/data_transfer/client/api/handler.py +++ b/src/ansys/hps/data_transfer/client/api/handler.py @@ -22,27 +22,29 @@ """Provides functionality for monitoring of operations.""" +import datetime import logging import time from dateutil import parser import humanfriendly as hf -import datetime from ..models.ops import Operation, OperationState log = logging.getLogger(__name__) -class DefaultOperationHandler: - """Allows additional handling of the operations.""" +class WaitHandler: + """Allows additional handling of operation status on wait.""" final = [OperationState.Succeeded, OperationState.Failed] def __init__(self): - """Initializes the OperationHandler class object.""" + """Initializes the WaitHandler class object.""" self.start = time.time() self.report_threshold = 10.0 # seconds + self.min_progress_interval = 2.0 # seconds + self.last_progress = self.start def __call__(self, ops: list[Operation]): """Handle operations after they are fetched.""" @@ -78,27 +80,28 @@ def _log_op(self, lvl: int, op: Operation): else: end = datetime.datetime.now(start.tzinfo) duration = (end - start).seconds - durationStr = hf.format_timespan(end - start) + duration_str = hf.format_timespan(end - start) except Exception as ex: log.debug(f"Failed to parse operation duration: {ex}") duration = 0 - durationStr = "unknown" + duration_str = "unknown" state = op.state.value if op_done: - msg += f" has {state} after {durationStr}" + msg += f" has {state} after {duration_str}" if op.info is not None: info = ", ".join([f"{k}={v}" for k, v in op.info.items()]) msg += ", " + info - else: - msg += f" is {state}, {durationStr} so far, progress {op.progress:.2f}%" - - if op_done or duration > self.report_threshold: + # if op.messages: + # msg += f', messages="{"; ".join(op.messages)}"' + log.log(lvl, msg) + elif duration > self.report_threshold and time.time() - self.last_progress > self.min_progress_interval: + self.last_progress = time.time() + msg += f" is {state}, {duration_str} so far, progress {op.progress:.2f}%" log.log(lvl, msg) - -class AsyncOperationHandler(DefaultOperationHandler): - """Asynchronous operation handler for operations.""" +class AsyncWaitHandler(WaitHandler): + """Allows additional, asynchronous handling of operation status on wait.""" def __init__(self): """Initializes the AsyncOperationHandler class object.""" From 0879d175e0927bc9f759b1632e97bfa87d3df8a8 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Wed, 30 Jul 2025 10:10:00 +0200 Subject: [PATCH 17/32] Adjust log settings --- examples/transfer_files/transfer_files.py | 3 +-- src/ansys/hps/data_transfer/client/api/handler.py | 9 +++++---- src/ansys/hps/data_transfer/client/binary.py | 2 ++ 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/examples/transfer_files/transfer_files.py b/examples/transfer_files/transfer_files.py index f9b8f5b3..9609804d 100644 --- a/examples/transfer_files/transfer_files.py +++ b/examples/transfer_files/transfer_files.py @@ -53,7 +53,7 @@ from ansys.hps.data_transfer.client.authenticate import authenticate from ansys.hps.data_transfer.client.models.msg import SrcDst, StoragePath -log = logging.getLogger(__name__) +log = logging.getLogger() ######################################################################## # Define a method to transfer files using the data transfer service @@ -158,7 +158,6 @@ def main( str, typer.Option(prompt=True, hide_input=True, help="Password to authenticate with") ] = "repadmin", ): - logging.basicConfig() logging.basicConfig(format="%(levelname)8s > %(message)s", level=get_log_level(verbosity, debug)) dt_url = f"{url}/dt/api/v1" diff --git a/src/ansys/hps/data_transfer/client/api/handler.py b/src/ansys/hps/data_transfer/client/api/handler.py index b1c60f26..36174bb1 100644 --- a/src/ansys/hps/data_transfer/client/api/handler.py +++ b/src/ansys/hps/data_transfer/client/api/handler.py @@ -42,7 +42,7 @@ class WaitHandler: def __init__(self): """Initializes the WaitHandler class object.""" self.start = time.time() - self.report_threshold = 10.0 # seconds + self.report_threshold = 1.0 # seconds self.min_progress_interval = 2.0 # seconds self.last_progress = self.start @@ -51,7 +51,7 @@ def __call__(self, ops: list[Operation]): self._log_ops(ops) def _log_ops(self, ops: list[Operation]) -> str: - so_far = time.time() - self.start + # so_far = time.time() - self.start num_running = 0 for op in ops: for ch in op.children_detail or []: @@ -63,8 +63,8 @@ def _log_ops(self, ops: list[Operation]) -> str: num_running += 1 self._log_op(logging.INFO, op) - if num_running > 0 and so_far > self.report_threshold: - log.info(f"Waiting for {num_running} operations to complete. {hf.format_timespan(so_far)} so far ...") + # if num_running > 0 and so_far > self.report_threshold: + # log.info(f"Waiting for {num_running} operations to complete. {hf.format_timespan(so_far)} so far ...") def _log_op(self, lvl: int, op: Operation): """Format the operation description.""" @@ -97,6 +97,7 @@ def _log_op(self, lvl: int, op: Operation): log.log(lvl, msg) elif duration > self.report_threshold and time.time() - self.last_progress > self.min_progress_interval: self.last_progress = time.time() + log.warning(f"{op.progress} {op.progress_current} {op.progress_total}") msg += f" is {state}, {duration_str} so far, progress {op.progress:.2f}%" log.log(lvl, msg) diff --git a/src/ansys/hps/data_transfer/client/binary.py b/src/ansys/hps/data_transfer/client/binary.py index 779e3baa..fdbc0613 100644 --- a/src/ansys/hps/data_transfer/client/binary.py +++ b/src/ansys/hps/data_transfer/client/binary.py @@ -203,6 +203,8 @@ def update(self, **kwargs): setattr(self, key, value) else: raise AttributeError(f"Unknown attribute {key}") + if self.debug: + self.log = True @property def port(self): From 4ecf4cc224af13d5e0170e797de22e6da1d6f8db Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Wed, 30 Jul 2025 10:19:09 +0200 Subject: [PATCH 18/32] Remove tmp message --- src/ansys/hps/data_transfer/client/api/handler.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/ansys/hps/data_transfer/client/api/handler.py b/src/ansys/hps/data_transfer/client/api/handler.py index 36174bb1..641b1278 100644 --- a/src/ansys/hps/data_transfer/client/api/handler.py +++ b/src/ansys/hps/data_transfer/client/api/handler.py @@ -42,8 +42,8 @@ class WaitHandler: def __init__(self): """Initializes the WaitHandler class object.""" self.start = time.time() - self.report_threshold = 1.0 # seconds - self.min_progress_interval = 2.0 # seconds + self.report_threshold = 2.0 # seconds + self.min_progress_interval = 3.0 # seconds self.last_progress = self.start def __call__(self, ops: list[Operation]): @@ -97,7 +97,6 @@ def _log_op(self, lvl: int, op: Operation): log.log(lvl, msg) elif duration > self.report_threshold and time.time() - self.last_progress > self.min_progress_interval: self.last_progress = time.time() - log.warning(f"{op.progress} {op.progress_current} {op.progress_total}") msg += f" is {state}, {duration_str} so far, progress {op.progress:.2f}%" log.log(lvl, msg) From 0bf64af72dac8a3b9b6d0487f656188b1c85937b Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Thu, 31 Jul 2025 14:00:33 +0200 Subject: [PATCH 19/32] Get rid of progress mode --- src/ansys/hps/data_transfer/client/api/handler.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/ansys/hps/data_transfer/client/api/handler.py b/src/ansys/hps/data_transfer/client/api/handler.py index 641b1278..07f2518d 100644 --- a/src/ansys/hps/data_transfer/client/api/handler.py +++ b/src/ansys/hps/data_transfer/client/api/handler.py @@ -63,9 +63,6 @@ def _log_ops(self, ops: list[Operation]) -> str: num_running += 1 self._log_op(logging.INFO, op) - # if num_running > 0 and so_far > self.report_threshold: - # log.info(f"Waiting for {num_running} operations to complete. {hf.format_timespan(so_far)} so far ...") - def _log_op(self, lvl: int, op: Operation): """Format the operation description.""" op_type = "operation" if len(op.children) == 0 else "operation group" @@ -93,13 +90,16 @@ def _log_op(self, lvl: int, op: Operation): info = ", ".join([f"{k}={v}" for k, v in op.info.items()]) msg += ", " + info # if op.messages: - # msg += f', messages="{"; ".join(op.messages)}"' + # msg += f', messages="{"; ".join(op.messages)}"' log.log(lvl, msg) elif duration > self.report_threshold and time.time() - self.last_progress > self.min_progress_interval: self.last_progress = time.time() - msg += f" is {state}, {duration_str} so far, progress {op.progress:.2f}%" + msg += f" is {state}, {duration_str} so far" + if op.progress_current > 0: + msg += f", progress {op.progress * 100.0:.1f}%" log.log(lvl, msg) + class AsyncWaitHandler(WaitHandler): """Allows additional, asynchronous handling of operation status on wait.""" From af6c15f64e63a1d6fe7376ec0e9afca3557ac58f Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Mon, 4 Aug 2025 13:46:36 +0200 Subject: [PATCH 20/32] Working --- src/ansys/hps/data_transfer/client/api/handler.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/ansys/hps/data_transfer/client/api/handler.py b/src/ansys/hps/data_transfer/client/api/handler.py index 07f2518d..79bc888f 100644 --- a/src/ansys/hps/data_transfer/client/api/handler.py +++ b/src/ansys/hps/data_transfer/client/api/handler.py @@ -86,9 +86,7 @@ def _log_op(self, lvl: int, op: Operation): state = op.state.value if op_done: msg += f" has {state} after {duration_str}" - if op.info is not None: - info = ", ".join([f"{k}={v}" for k, v in op.info.items()]) - msg += ", " + info + msg += self._info_str(op) # if op.messages: # msg += f', messages="{"; ".join(op.messages)}"' log.log(lvl, msg) @@ -96,9 +94,18 @@ def _log_op(self, lvl: int, op: Operation): self.last_progress = time.time() msg += f" is {state}, {duration_str} so far" if op.progress_current > 0: - msg += f", progress {op.progress * 100.0:.1f}%" + msg += f", progress {op.progress*100.0:.1f}%" + msg += self._info_str(op) log.log(lvl, msg) + + def _info_str(self, op: Operation) -> str: + """Format the operation info.""" + if not op.info: + return "" + info = ", ".join([f"{k}={v}" for k, v in op.info.items()]) + return f", {info}" + class AsyncWaitHandler(WaitHandler): """Allows additional, asynchronous handling of operation status on wait.""" From 382dc3e48af9bf72696e5151e63722b04fba664e Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Mon, 4 Aug 2025 14:15:38 +0200 Subject: [PATCH 21/32] Clean up async client shutdown --- src/ansys/hps/data_transfer/client/client.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/ansys/hps/data_transfer/client/client.py b/src/ansys/hps/data_transfer/client/client.py index a25ba9e2..f50be752 100644 --- a/src/ansys/hps/data_transfer/client/client.py +++ b/src/ansys/hps/data_transfer/client/client.py @@ -528,11 +528,12 @@ def __init__(self, *args, **kwargs): """Initializes the AsyncClient class object.""" super().__init__(*args, **kwargs) self._bin_config._on_token_update = self._update_token + self._monitor_task = None async def start(self): """Start the async binary worker.""" super().start() - asyncio.create_task(self._monitor()) + self._monitor_task = asyncio.create_task(self._monitor()) async def stop(self, wait=5.0): """Stop the async binary worker.""" @@ -540,6 +541,8 @@ async def stop(self, wait=5.0): try: await self._session.post(self.base_api_url + "/shutdown") await asyncio.sleep(0.1) + if self._monitor_task is not None: + self._monitor_task.cancel() except Exception as ex: log.warning(f"Failed to send shutdown request: {ex}") super().stop(wait=wait) @@ -575,18 +578,21 @@ def _update_token(self): log.debug(f"Error updating token: {e}") async def _monitor(self): - while not self._monitor_stop.is_set(): - await asyncio.sleep(self._monitor_state.sleep_for) - - if self._session is None or self.binary is None: - continue + while True: try: + await asyncio.sleep(self._monitor_state.sleep_for) + if self._session is None or self.binary is None: + continue + resp = await self._session.get(self.base_api_url) if resp.status_code == 200: ready = resp.json().get("ready", False) self._monitor_state.mark_ready(ready) continue + except asyncio.CancelledError: + log.debug("Monitor task cancelled") + return except Exception as ex: if self.binary_config.debug: log.debug("URL: %s", self.base_api_url) @@ -595,7 +601,6 @@ async def _monitor(self): continue self._monitor_state.report(self.binary) - log.debug("Worker status monitor stopped") class Client(ClientBase): From 648a67c984f381af3fe06385fc8d24e093cf4667 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Mon, 4 Aug 2025 14:27:20 +0200 Subject: [PATCH 22/32] Test adjustments --- src/ansys/hps/data_transfer/client/api/api.py | 15 ++--- .../hps/data_transfer/client/api/async_api.py | 16 ++--- .../hps/data_transfer/client/api/handler.py | 4 ++ tests/large_file_test.py | 63 +++++++++++-------- 4 files changed, 58 insertions(+), 40 deletions(-) diff --git a/src/ansys/hps/data_transfer/client/api/api.py b/src/ansys/hps/data_transfer/client/api/api.py index d86072b9..1e2bec3f 100644 --- a/src/ansys/hps/data_transfer/client/api/api.py +++ b/src/ansys/hps/data_transfer/client/api/api.py @@ -69,7 +69,7 @@ def __init__(self, client: Client): """Initializes the DataTransferApi class object.""" self.dump_mode = "json" self.client = client - self.wait_handler_class = WaitHandler + self.wait_handler_factory = WaitHandler @retry() def status(self, wait=False, sleep=5, jitter=True, timeout: float | None = 20.0): @@ -285,7 +285,7 @@ def wait_for( interval: float = 0.1, cap: float = 2.0, raise_on_error: bool = False, - operation_handler: Callable[[builtins.list[Operation]], None] = None, + handler: Callable[[builtins.list[Operation]], None] = None, ): """Wait for operations to complete. @@ -304,8 +304,8 @@ def wait_for( operation_handler: Callable[[builtins.list[Operation]], None] A callable that will be called with the list of operations when they are fetched. """ - if operation_handler is None: - operation_handler = self.wait_handler_class() + if handler is None: + handler = self.wait_handler_factory() if not isinstance(operation_ids, list): operation_ids = [operation_ids] @@ -315,9 +315,10 @@ def wait_for( while True: attempt += 1 try: - ops = self._operations(operation_ids, expand=True) - if operation_handler is not None: - operation_handler(ops) + expand = getattr(handler.Meta, "expand_group", False) if hasattr(handler, "Meta") else False + ops = self._operations(operation_ids, expand=expand) + if handler is not None: + handler(ops) if all(op.state in [OperationState.Succeeded, OperationState.Failed] for op in ops): break except Exception as e: diff --git a/src/ansys/hps/data_transfer/client/api/async_api.py b/src/ansys/hps/data_transfer/client/api/async_api.py index 365c416d..e3bc1452 100644 --- a/src/ansys/hps/data_transfer/client/api/async_api.py +++ b/src/ansys/hps/data_transfer/client/api/async_api.py @@ -66,7 +66,7 @@ def __init__(self, client: AsyncClient): """Initialize the async data transfer API with the client object.""" self.dump_mode = "json" self.client = client - self.wait_handler_class = AsyncWaitHandler + self.wait_handler_factory = AsyncWaitHandler @retry() async def status(self, wait=False, sleep=5, jitter=True, timeout: float | None = 20.0): @@ -210,8 +210,7 @@ async def wait_for( interval: float = 0.1, cap: float = 2.0, raise_on_error: bool = False, - # progress_handler: Callable[[str, float], Awaitable[None]] = None, - operation_handler: Callable[[builtins.list[Operation]], Awaitable[None]] = None, + handler: Callable[[builtins.list[Operation]], Awaitable[None]] = None, ): """Provides an async interface to wait for a list of operations to complete. @@ -230,8 +229,8 @@ async def wait_for( operation_handler: Callable[[builtins.list[Operation]], None] A callable that will be called with the list of operations when they are fetched. """ - if operation_handler is None: - operation_handler = self.wait_handler_class() + if handler is None: + handler = self.wait_handler_factory() if not isinstance(operation_ids, list): operation_ids = [operation_ids] @@ -243,9 +242,10 @@ async def wait_for( while True: attempt += 1 try: - ops = await self._operations(operation_ids, expand=True) - if operation_handler is not None: - await operation_handler(ops) + expand = getattr(handler.Meta, "expand_group", False) if hasattr(handler, "Meta") else False + ops = await self._operations(operation_ids, expand=expand) + if handler is not None: + await handler(ops) if all(op.state in [OperationState.Succeeded, OperationState.Failed] for op in ops): break except Exception as e: diff --git a/src/ansys/hps/data_transfer/client/api/handler.py b/src/ansys/hps/data_transfer/client/api/handler.py index 79bc888f..eff83982 100644 --- a/src/ansys/hps/data_transfer/client/api/handler.py +++ b/src/ansys/hps/data_transfer/client/api/handler.py @@ -37,6 +37,10 @@ class WaitHandler: """Allows additional handling of operation status on wait.""" + class Meta: + """Meta class for WaitHandler.""" + expand_group = True + final = [OperationState.Succeeded, OperationState.Failed] def __init__(self): diff --git a/tests/large_file_test.py b/tests/large_file_test.py index 6e74ef61..b1467998 100644 --- a/tests/large_file_test.py +++ b/tests/large_file_test.py @@ -132,7 +132,7 @@ def test_large_batch(storage_path, client): def test_batch_with_wait_parameters(storage_path, client): - """Test copying a large file to a remote storage with wait parameter progress_handler.""" + """Test copying a large file to a remote storage with wait parameter handler.""" api = DataTransferApi(client) log.info("Copy with progress handler") op, manager = sync_copy(storage_path, api, 1, 1) @@ -142,12 +142,13 @@ def test_batch_with_wait_parameters(storage_path, client): progress_data = [] # test progress handler - def handler(id, current_progress): + def handler(ops): + current_progress = ops[0].progress progress_data.append(current_progress) - log.info(f"{current_progress * 100.0}% completed for operation id: {id}") + log.info(f"{current_progress * 100.0}% completed for operation id: {ops[0].id}") # Wait for the operation to complete with progress handler - op = api.wait_for(op.id, progress_handler=handler) + op = api.wait_for(op.id, handler=handler) assert op[0].state == OperationState.Succeeded, op[0].messages # Check if progress data is collected assert len(progress_data) > 0, "No progress data collected" @@ -157,7 +158,7 @@ def handler(id, current_progress): def test_batch_with_multiple_operations_to_wait(storage_path, client): - """Test copying a large file to a remote storage with wait parameter progress_handler.""" + """Test copying a large file to a remote storage with wait parameter handler.""" api = DataTransferApi(client) log.info("Copy with progress handler") op1, manager1 = sync_copy(storage_path, api, 1, 1) @@ -166,22 +167,27 @@ def test_batch_with_multiple_operations_to_wait(storage_path, client): assert op2.id is not None # List to store progress data - progress_data = [] + progress_data = { + op1.id: [], + op2.id: [] + } # test progress handler - def handler(id, current_progress): - progress_data.append(current_progress) - log.info(f"{current_progress * 100.0}% completed for operation id: {id}") + def handler(ops): + for op in ops: + progress_data[op.id].append(op.progress) + log.info(f"{op.progress * 100.0}% completed for operation id: {op.id}") # Wait for the operation to complete with progress handler - op = api.wait_for([op1.id, op2.id], progress_handler=handler) + op = api.wait_for([op1.id, op2.id], handler=handler) assert op[0].state == OperationState.Succeeded, op[0].messages assert op[1].state == OperationState.Succeeded, op[1].messages # Check if progress data is collected at least twice - assert len(progress_data) > 2, "No progress data collected" + assert len(progress_data[op1.id]) > 2, "No progress data collected" + assert len(progress_data[op2.id]) > 2, "No progress data collected" # Check if the last progress is 100% - assert progress_data[-1] == 1.0, "Last progress is not 100%" - assert progress_data[-2] == 1.0, "Last progress is not 100%" + assert progress_data[op1.id][-1] == 1.0, "Last progress is not 100%" + assert progress_data[op2.id][-1] == 1.0, "Last progress is not 100%" manager1.delete_file() manager2.delete_file() @@ -198,7 +204,7 @@ async def test_async_large_batch(storage_path, async_client): async def test_async_batch_with_wait_parameters(storage_path, async_client): """Test copying a large file to a remote storage using the AsyncDataTransferApi - with wait parameter progress_handler.""" + with wait parameter handler.""" api = AsyncDataTransferApi(async_client) log.info("Copy with progress handler") op, manager = await async_copy(storage_path, api, 1, 1) @@ -208,12 +214,13 @@ async def test_async_batch_with_wait_parameters(storage_path, async_client): progress_data = [] # test progress handler - async def handler(id, current_progress): + async def handler(ops): + current_progress = ops[0].progress progress_data.append(current_progress) - log.info(f"{current_progress * 100.0}% completed for operation id: {id}") + log.info(f"{current_progress * 100.0}% completed for operation id: {ops[0].id}") # Wait for the operation to complete with progress handler - op = await api.wait_for(op.id, progress_handler=handler) + op = await api.wait_for(op.id, handler=handler) assert op[0].state == OperationState.Succeeded, op[0].messages # Check if progress data is collected assert len(progress_data) > 0, "No progress data collected" @@ -224,7 +231,7 @@ async def handler(id, current_progress): async def test_async_batch_with_multiple_operations_to_wait(storage_path, async_client): """Test copying a large file to a remote storage using the AsyncDataTransferApi - with wait parameter progress_handler.""" + with wait parameter handler.""" api = AsyncDataTransferApi(async_client) log.info("Copy with progress handler") op1, manager1 = await async_copy(storage_path, api, 1, 1) @@ -233,20 +240,26 @@ async def test_async_batch_with_multiple_operations_to_wait(storage_path, async_ assert op2.id is not None # List to store progress data - progress_data = [] + progress_data = { + op1.id: [], + op2.id: [] + } # test progress handler - async def handler(id, current_progress): - progress_data.append(current_progress) - log.info(f"{current_progress * 100.0}% completed for operation id: {id}") + async def handler(ops): + for op in ops: + progress_data.append(op.progress) + log.info(f"{op.progress * 100.0}% completed for operation id: {op.id}") # Wait for the operation to complete with progress handler - op = await api.wait_for([op1.id, op2.id], progress_handler=handler) + op = await api.wait_for([op1.id, op2.id], handler=handler) assert op[0].state == OperationState.Succeeded, op[0].messages assert op[1].state == OperationState.Succeeded, op[1].messages # Check if progress data is collected at least twice - assert len(progress_data) > 2, "No progress data collected" + assert len(progress_data[op1.id]) > 2, "No progress data collected" + assert len(progress_data[op2.id]) > 2, "No progress data collected" # Check if the last progress is 100% - assert progress_data[-1] == 1.0, "Last progress is not 100%" + assert progress_data[op1.id][-1] == 1.0, "Last progress is not 100%" + assert progress_data[op2.id][-1] == 1.0, "Last progress is not 100%" manager1.delete_file() manager2.delete_file() From 9113995c213b74bca9fc4521a8e830544109b4be Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Mon, 4 Aug 2025 14:29:04 +0200 Subject: [PATCH 23/32] Adjust log output for tests --- examples/transfer_files/_downloaded/transfer_files.py | 1 - tests/conftest.py | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/transfer_files/_downloaded/transfer_files.py b/examples/transfer_files/_downloaded/transfer_files.py index f9b8f5b3..c7ac98ab 100644 --- a/examples/transfer_files/_downloaded/transfer_files.py +++ b/examples/transfer_files/_downloaded/transfer_files.py @@ -158,7 +158,6 @@ def main( str, typer.Option(prompt=True, hide_input=True, help="Password to authenticate with") ] = "repadmin", ): - logging.basicConfig() logging.basicConfig(format="%(levelname)8s > %(message)s", level=get_log_level(verbosity, debug)) dt_url = f"{url}/dt/api/v1" diff --git a/tests/conftest.py b/tests/conftest.py index eda64021..63dda171 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -36,6 +36,9 @@ from ansys.hps.data_transfer.client.binary import BinaryConfig from ansys.hps.data_transfer.client.models.msg import StoragePath +log = logging.getLogger() +logging.basicConfig(format="%(levelname)8s > %(message)s", level=logging.DEBUG) + log = logging.getLogger(__name__) From 640649dbadef34f3e1f86dd8375a76d02615fdc9 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Mon, 4 Aug 2025 14:39:54 +0200 Subject: [PATCH 24/32] Revert last change --- tests/conftest.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 63dda171..eda64021 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -36,9 +36,6 @@ from ansys.hps.data_transfer.client.binary import BinaryConfig from ansys.hps.data_transfer.client.models.msg import StoragePath -log = logging.getLogger() -logging.basicConfig(format="%(levelname)8s > %(message)s", level=logging.DEBUG) - log = logging.getLogger(__name__) From 7620e019e1327c8f07ee2f00bd8fd06aa36aa4a8 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Mon, 4 Aug 2025 14:57:35 +0200 Subject: [PATCH 25/32] Extra output in case of handler failures --- src/ansys/hps/data_transfer/client/api/api.py | 8 +++++++- src/ansys/hps/data_transfer/client/api/async_api.py | 8 +++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/ansys/hps/data_transfer/client/api/api.py b/src/ansys/hps/data_transfer/client/api/api.py index 1e2bec3f..4db26e9a 100644 --- a/src/ansys/hps/data_transfer/client/api/api.py +++ b/src/ansys/hps/data_transfer/client/api/api.py @@ -30,6 +30,7 @@ from collections.abc import Callable import logging import time +import traceback import backoff @@ -318,7 +319,12 @@ def wait_for( expand = getattr(handler.Meta, "expand_group", False) if hasattr(handler, "Meta") else False ops = self._operations(operation_ids, expand=expand) if handler is not None: - handler(ops) + try: + handler(ops) + except Exception as e: + log.warning(f"Handler error: {e}") + log.debug(traceback.format_exc()) + if all(op.state in [OperationState.Succeeded, OperationState.Failed] for op in ops): break except Exception as e: diff --git a/src/ansys/hps/data_transfer/client/api/async_api.py b/src/ansys/hps/data_transfer/client/api/async_api.py index e3bc1452..abd2e13f 100644 --- a/src/ansys/hps/data_transfer/client/api/async_api.py +++ b/src/ansys/hps/data_transfer/client/api/async_api.py @@ -32,6 +32,7 @@ import logging import textwrap import time +import traceback import backoff import humanfriendly as hf @@ -245,7 +246,12 @@ async def wait_for( expand = getattr(handler.Meta, "expand_group", False) if hasattr(handler, "Meta") else False ops = await self._operations(operation_ids, expand=expand) if handler is not None: - await handler(ops) + try: + await handler(ops) + except Exception as e: + log.warning(f"Handler error: {e}") + log.debug(traceback.format_exc()) + if all(op.state in [OperationState.Succeeded, OperationState.Failed] for op in ops): break except Exception as e: From 9d9a492e75a50a50a5bee60bea0da62be81fae56 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Mon, 4 Aug 2025 14:57:47 +0200 Subject: [PATCH 26/32] Test corrections --- tests/conftest.py | 3 +++ tests/large_file_test.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index eda64021..63dda171 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -36,6 +36,9 @@ from ansys.hps.data_transfer.client.binary import BinaryConfig from ansys.hps.data_transfer.client.models.msg import StoragePath +log = logging.getLogger() +logging.basicConfig(format="%(levelname)8s > %(message)s", level=logging.DEBUG) + log = logging.getLogger(__name__) diff --git a/tests/large_file_test.py b/tests/large_file_test.py index b1467998..413fca81 100644 --- a/tests/large_file_test.py +++ b/tests/large_file_test.py @@ -248,7 +248,7 @@ async def test_async_batch_with_multiple_operations_to_wait(storage_path, async_ # test progress handler async def handler(ops): for op in ops: - progress_data.append(op.progress) + progress_data[op.id].append(op.progress) log.info(f"{op.progress * 100.0}% completed for operation id: {op.id}") # Wait for the operation to complete with progress handler From e72c7243f8043cd0135352a225a1dc314ac054ea Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Mon, 4 Aug 2025 15:34:56 +0200 Subject: [PATCH 27/32] Fix async client --- src/ansys/hps/data_transfer/client/client.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/ansys/hps/data_transfer/client/client.py b/src/ansys/hps/data_transfer/client/client.py index f50be752..5e8b3b48 100644 --- a/src/ansys/hps/data_transfer/client/client.py +++ b/src/ansys/hps/data_transfer/client/client.py @@ -530,6 +530,18 @@ def __init__(self, *args, **kwargs): self._bin_config._on_token_update = self._update_token self._monitor_task = None + def __getstate__(self): + """Return pickled state of the object.""" + state = super().__getstate__() + del state["_monitor_task"] + return state + + def __setstate__(self, state): + """Restore state from pickled state.""" + super().__setstate__(state) + self.__dict__.update(state) + self._monitor_task = None + async def start(self): """Start the async binary worker.""" super().start() From d01125f73eb4029cc205b621f61e0823fdc37882 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Tue, 5 Aug 2025 10:02:56 +0200 Subject: [PATCH 28/32] Remove duplicate log type --- src/ansys/hps/data_transfer/client/binary.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/ansys/hps/data_transfer/client/binary.py b/src/ansys/hps/data_transfer/client/binary.py index fdbc0613..f2ef90c7 100644 --- a/src/ansys/hps/data_transfer/client/binary.py +++ b/src/ansys/hps/data_transfer/client/binary.py @@ -463,8 +463,6 @@ def _build_args(self): [ "--dt-url", self._config.data_transfer_url, - "--log-types", - "console", ] ) From 13aa4969e147480d87e452be7499e71ba00abe51 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Tue, 5 Aug 2025 10:21:03 +0200 Subject: [PATCH 29/32] Fix stderr redirect --- src/ansys/hps/data_transfer/client/binary.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ansys/hps/data_transfer/client/binary.py b/src/ansys/hps/data_transfer/client/binary.py index f2ef90c7..339e6825 100644 --- a/src/ansys/hps/data_transfer/client/binary.py +++ b/src/ansys/hps/data_transfer/client/binary.py @@ -409,7 +409,9 @@ def _monitor(self): with PrepareSubprocess(): self._process = subprocess.Popen( - args, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env + args, shell=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + env=env ) else: ret_code = self._process.poll() From 9a1d1be867b5094b3a390d0a0f76772494fd631f Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Tue, 5 Aug 2025 13:38:06 +0200 Subject: [PATCH 30/32] Style corrections --- src/ansys/hps/data_transfer/client/api/handler.py | 4 ++-- src/ansys/hps/data_transfer/client/binary.py | 4 +--- src/ansys/hps/data_transfer/client/client.py | 2 +- tests/large_file_test.py | 10 ++-------- 4 files changed, 6 insertions(+), 14 deletions(-) diff --git a/src/ansys/hps/data_transfer/client/api/handler.py b/src/ansys/hps/data_transfer/client/api/handler.py index eff83982..fce03583 100644 --- a/src/ansys/hps/data_transfer/client/api/handler.py +++ b/src/ansys/hps/data_transfer/client/api/handler.py @@ -39,6 +39,7 @@ class WaitHandler: class Meta: """Meta class for WaitHandler.""" + expand_group = True final = [OperationState.Succeeded, OperationState.Failed] @@ -98,11 +99,10 @@ def _log_op(self, lvl: int, op: Operation): self.last_progress = time.time() msg += f" is {state}, {duration_str} so far" if op.progress_current > 0: - msg += f", progress {op.progress*100.0:.1f}%" + msg += f", progress {op.progress * 100.0:.1f}%" msg += self._info_str(op) log.log(lvl, msg) - def _info_str(self, op: Operation) -> str: """Format the operation info.""" if not op.info: diff --git a/src/ansys/hps/data_transfer/client/binary.py b/src/ansys/hps/data_transfer/client/binary.py index 339e6825..ea3ff740 100644 --- a/src/ansys/hps/data_transfer/client/binary.py +++ b/src/ansys/hps/data_transfer/client/binary.py @@ -409,9 +409,7 @@ def _monitor(self): with PrepareSubprocess(): self._process = subprocess.Popen( - args, shell=True, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - env=env + args, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env ) else: ret_code = self._process.poll() diff --git a/src/ansys/hps/data_transfer/client/client.py b/src/ansys/hps/data_transfer/client/client.py index 5e8b3b48..e11c1f69 100644 --- a/src/ansys/hps/data_transfer/client/client.py +++ b/src/ansys/hps/data_transfer/client/client.py @@ -595,7 +595,7 @@ async def _monitor(self): await asyncio.sleep(self._monitor_state.sleep_for) if self._session is None or self.binary is None: continue - + resp = await self._session.get(self.base_api_url) if resp.status_code == 200: diff --git a/tests/large_file_test.py b/tests/large_file_test.py index 413fca81..00b0dfdf 100644 --- a/tests/large_file_test.py +++ b/tests/large_file_test.py @@ -167,10 +167,7 @@ def test_batch_with_multiple_operations_to_wait(storage_path, client): assert op2.id is not None # List to store progress data - progress_data = { - op1.id: [], - op2.id: [] - } + progress_data = {op1.id: [], op2.id: []} # test progress handler def handler(ops): @@ -240,10 +237,7 @@ async def test_async_batch_with_multiple_operations_to_wait(storage_path, async_ assert op2.id is not None # List to store progress data - progress_data = { - op1.id: [], - op2.id: [] - } + progress_data = {op1.id: [], op2.id: []} # test progress handler async def handler(ops): From 04a70ed8b61dcbc21688137ae2a326ba087f8579 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Wed, 6 Aug 2025 14:44:03 +0200 Subject: [PATCH 31/32] Add binary download progress information --- .../hps/data_transfer/client/api/handler.py | 2 +- src/ansys/hps/data_transfer/client/client.py | 29 ++++++++++++++++--- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/src/ansys/hps/data_transfer/client/api/handler.py b/src/ansys/hps/data_transfer/client/api/handler.py index fce03583..4dd2f144 100644 --- a/src/ansys/hps/data_transfer/client/api/handler.py +++ b/src/ansys/hps/data_transfer/client/api/handler.py @@ -72,7 +72,7 @@ def _log_op(self, lvl: int, op: Operation): """Format the operation description.""" op_type = "operation" if len(op.children) == 0 else "operation group" - msg = f"{op_type.capitalize()} '{op.description}'({op.id})" + msg = f"Data transfer {op_type} '{op.description}'({op.id})" op_done = op.state in self.final try: diff --git a/src/ansys/hps/data_transfer/client/client.py b/src/ansys/hps/data_transfer/client/client.py index e11c1f69..7ffb931d 100644 --- a/src/ansys/hps/data_transfer/client/client.py +++ b/src/ansys/hps/data_transfer/client/client.py @@ -38,6 +38,7 @@ import backoff import filelock import httpx +import humanfriendly as hf import psutil import urllib3 @@ -227,6 +228,8 @@ def __init__( self._monitor_stop = None self._monitor_state = MonitorState() + self.progress_interval = 5.0 # seconds + def __getstate__(self): """Return pickled state of the object.""" state = self.__dict__.copy() @@ -436,17 +439,35 @@ def _prepare_platform_binary(self): log.warning(f"Failed to create directory {bin_dir}: {ex}") platform_str = self._platform() - log.debug( - f"Downloading binary for platform '{platform_str}' from {dt_url} to {bin_path}, reason: {reason}" - ) + log.info(f"Downloading binary for platform '{platform_str}' from {dt_url}, reason: {reason}") + log.debug(f"Binary download path: {bin_path}") url = f"/binaries/worker/{platform_str}/hpsdata{bin_ext}" try: + start = time.time() + last_progress = start + written = 0 with open(bin_path, "wb") as f, session.stream("GET", url) as resp: resp.read() if resp.status_code != 200: raise BinaryError(f"Failed to download binary: {resp.text}") + for chunk in resp.iter_bytes(): - f.write(chunk) + now = time.time() + written += f.write(chunk) + + since_last_progress = now - last_progress + if since_last_progress > self.progress_interval: + msg = f"Downloading binary, {hf.format_timespan(now - start)} so far ..." + content_length = resp.headers.get("Content-Length", None) + if content_length is not None: + content_length = int(content_length) + prog = float(written) / float(content_length) * 100.0 + msg += f" {prog:.1f}%, {hf.format_size(written)}/{hf.format_size(content_length)}" + else: + msg += f" {hf.format_size(written)} downloaded" + log.info(msg) + last_progress = now + self._bin_config.path = bin_path except Exception as ex: if self._bin_config.debug: From d11013ea77d62a85beee8a257724fe927cf97750 Mon Sep 17 00:00:00 2001 From: Michal Pawlik Date: Wed, 6 Aug 2025 15:29:42 +0200 Subject: [PATCH 32/32] Adjust worker download message --- src/ansys/hps/data_transfer/client/client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ansys/hps/data_transfer/client/client.py b/src/ansys/hps/data_transfer/client/client.py index 7ffb931d..b8b381c6 100644 --- a/src/ansys/hps/data_transfer/client/client.py +++ b/src/ansys/hps/data_transfer/client/client.py @@ -439,7 +439,9 @@ def _prepare_platform_binary(self): log.warning(f"Failed to create directory {bin_dir}: {ex}") platform_str = self._platform() - log.info(f"Downloading binary for platform '{platform_str}' from {dt_url}, reason: {reason}") + log.info( + f"Downloading data transfer worker for platform '{platform_str}' from {dt_url}, reason: {reason}" + ) log.debug(f"Binary download path: {bin_path}") url = f"/binaries/worker/{platform_str}/hpsdata{bin_ext}" try: