From ee5a712b999199d7c6c3a6fd4e2bb8887e494522 Mon Sep 17 00:00:00 2001 From: Kristof Herrmann Date: Fri, 13 Jun 2025 15:50:17 +0200 Subject: [PATCH] feat: timeout reopen session --- deepset_cloud_sdk/_s3/upload.py | 61 +++++++++++++-- deepset_cloud_sdk/_service/files_service.py | 82 +++++++++++++++------ 2 files changed, 111 insertions(+), 32 deletions(-) diff --git a/deepset_cloud_sdk/_s3/upload.py b/deepset_cloud_sdk/_s3/upload.py index d047b277..58c16c49 100644 --- a/deepset_cloud_sdk/_s3/upload.py +++ b/deepset_cloud_sdk/_s3/upload.py @@ -4,10 +4,11 @@ import os import re from dataclasses import dataclass +from datetime import timedelta from http import HTTPStatus from pathlib import Path from types import TracebackType -from typing import Any, Coroutine, List, Optional, Sequence, Type, Union +from typing import Any, Coroutine, List, Optional, Sequence, Type, Union, cast import aiofiles import aiohttp @@ -62,6 +63,7 @@ class S3UploadSummary: successful_upload_count: int failed_upload_count: int failed: List[S3UploadResult] + cancelled: bool = False def make_safe_file_name(file_name: str) -> str: @@ -277,18 +279,41 @@ async def _process_results( self, tasks: List[Coroutine[Any, Any, S3UploadResult]], show_progress: bool = True, + timeout: timedelta = timedelta(hours=1), ) -> S3UploadSummary: """Summarize the results of the uploads to S3. :param tasks: List of upload tasks. + :param show_progress: Whether to show a progress bar. + :param timeout: Timeout for the upload. If set, cancels remaining uploads after timeout. :return: S3UploadResult object. """ results: List[S3UploadResult] = [] + cancelled = False - if show_progress: - results = await tqdm.gather(*tasks, desc="Upload to S3") - else: - results = await asyncio.gather(*tasks) + async def process_with_timeout() -> List[S3UploadResult]: + try: + if show_progress: + return cast( + List[S3UploadResult], + await tqdm.gather(*tasks, desc="Upload to S3"), + ) + return cast(List[S3UploadResult], await asyncio.gather(*tasks)) + except asyncio.TimeoutError: + task_objects = [asyncio.create_task(task) for task in tasks] + results = [task.result() for task in task_objects if task.done()] + return results + + try: + results = await asyncio.wait_for(process_with_timeout(), timeout=timeout.total_seconds()) + except asyncio.TimeoutError: + cancelled = True + logger.warning( + "Upload timeout reached. Returning partial results.", + timeout_s=timeout.total_seconds(), + ) + task_objects = [asyncio.create_task(task) for task in tasks] + results = [task.result() for task in task_objects if task.done()] logger.info( "Finished uploading files.", @@ -309,6 +334,7 @@ async def _process_results( failed_upload_count=len(failed), failed=failed, total_files=len(tasks), + cancelled=cancelled, ) if result_summary.successful_upload_count == 0: logger.error("Could not upload any files to S3.") @@ -320,21 +346,34 @@ async def upload_files_from_paths( upload_session: UploadSession, file_paths: List[Path], show_progress: bool = True, + timeout: timedelta = timedelta(hours=1), ) -> S3UploadSummary: """Upload a set of files to the prefixed S3 namespace given a list of paths. + The timeout can be set to automatically canceling the remaining uploads if + the upload takes too long. The method will return the summary of all processed + files until the timeout is reached. + :param upload_session: UploadSession to associate the upload with. :param file_paths: A list of paths to upload. :param show_progress: Whether to show a progress bar on the upload. + :param timeout: Timeout for the upload. :return: S3UploadSummary object. """ - async with aiohttp.ClientSession(connector=self.connector) as client_session: + async with aiohttp.ClientSession( + connector=self.connector, + ) as client_session: tasks = [] for file_path in file_paths: tasks.append(self.upload_from_file(file_path, upload_session, client_session)) - result_summary = await self._process_results(tasks, show_progress=show_progress) + result_summary = await self._process_results(tasks, show_progress=show_progress, timeout=timeout) + if result_summary.cancelled: + logger.warning( + "Upload timeout reached. Returning partial results.", + timeout_s=timeout.total_seconds(), + ) return result_summary async def upload_in_memory( @@ -342,12 +381,18 @@ async def upload_in_memory( upload_session: UploadSession, files: Sequence[DeepsetCloudFileBase], show_progress: bool = True, + timeout: timedelta = timedelta(hours=1), ) -> S3UploadSummary: """Upload a set of files to the prefixed S3 namespace given a list of paths. + The timeout can be set to automatically canceling the remaining uploads if + the upload takes too long. The method will return the summary of all processed + files until the timeout is reached. + :param upload_session: UploadSession to associate the upload with. :param files: A list of DeepsetCloudFileBase to upload. :param show_progress: Whether to show a progress bar on the upload. + :param timeout: Timeout for the upload. :return: S3UploadSummary object. """ async with aiohttp.ClientSession( @@ -373,6 +418,6 @@ async def upload_in_memory( ) ) - result_summary = await self._process_results(tasks, show_progress=show_progress) + result_summary = await self._process_results(tasks, show_progress=show_progress, timeout=timeout) return result_summary diff --git a/deepset_cloud_sdk/_service/files_service.py b/deepset_cloud_sdk/_service/files_service.py index d1b5ada6..9eddad67 100644 --- a/deepset_cloud_sdk/_service/files_service.py +++ b/deepset_cloud_sdk/_service/files_service.py @@ -135,7 +135,9 @@ async def _create_upload_session( :return: Upload session ID. """ upload_session = await self._upload_sessions.create( - workspace_name=workspace_name, write_mode=write_mode, enable_parallel_processing=enable_parallel_processing + workspace_name=workspace_name, + write_mode=write_mode, + enable_parallel_processing=enable_parallel_processing, ) try: yield upload_session @@ -143,7 +145,11 @@ async def _create_upload_session( await self._upload_sessions.close(workspace_name=workspace_name, session_id=upload_session.session_id) async def _wrapped_direct_upload_path( - self, workspace_name: str, file_path: Path, meta: Dict[str, Any], write_mode: WriteMode + self, + workspace_name: str, + file_path: Path, + meta: Dict[str, Any], + write_mode: WriteMode, ) -> S3UploadResult: try: await self._files.direct_upload_path( @@ -226,7 +232,10 @@ async def upload_file_paths( _coroutines.append( self._wrapped_direct_upload_path( - workspace_name=workspace_name, file_path=file_path, meta=meta, write_mode=write_mode + workspace_name=workspace_name, + file_path=file_path, + meta=meta, + write_mode=write_mode, ) ) result = await asyncio.gather(*_coroutines) @@ -242,25 +251,37 @@ async def upload_file_paths( failed=[r for r in result if r.success is False], ) - # create session to upload files to - async with self._create_upload_session( - workspace_name=workspace_name, write_mode=write_mode, enable_parallel_processing=enable_parallel_processing - ) as upload_session: - # upload file paths to session - - upload_summary = await self._s3.upload_files_from_paths( - upload_session=upload_session, file_paths=file_paths, show_progress=show_progress - ) - logger.info( - "Summary of S3 Uploads", - successful_uploads=upload_summary.successful_upload_count, - failed_uploads=upload_summary.failed_upload_count, - failed=upload_summary.failed, - ) + cancelled = False + while not cancelled: + # create session to upload files to + async with self._create_upload_session( + workspace_name=workspace_name, + write_mode=write_mode, + enable_parallel_processing=enable_parallel_processing, + ) as upload_session: + upload_summary = await self._s3.upload_files_from_paths( + upload_session=upload_session, + file_paths=file_paths, + show_progress=show_progress, + ) + cancelled = upload_summary.cancelled + logger.info( + "Summary of S3 Uploads", + successful_uploads=upload_summary.successful_upload_count, + failed_uploads=upload_summary.failed_upload_count, + failed=upload_summary.failed, + ) # wait for ingestion to finish if blocking: - total_files = len(list(filter(lambda x: not os.path.basename(x).endswith(META_SUFFIX), file_paths))) + total_files = len( + list( + filter( + lambda x: not os.path.basename(x).endswith(META_SUFFIX), + file_paths, + ) + ) + ) await self._wait_for_finished( workspace_name=workspace_name, session_id=upload_session.session_id, @@ -336,7 +357,8 @@ def _remove_duplicates(file_paths: List[Path]) -> List[Path]: for file_name, file_group in files_by_name.items(): if len(file_group) > 1: logger.warning( - "Multiple files with the same name found. Keeping the most recent one.", file_name=file_name + "Multiple files with the same name found. Keeping the most recent one.", + file_name=file_name, ) most_recent_file = sorted(file_group, key=lambda x: x.stat().st_mtime, reverse=True)[0] most_recent_files.append(most_recent_file) @@ -417,13 +439,19 @@ async def upload( Use this to speed up the upload process and if you are not running concurrent uploads for the same files. :raises TimeoutError: If blocking is True and the ingestion takes longer than timeout_s. """ - logger.info("Getting valid files from file path. This may take a few minutes.", recursive=recursive) + logger.info( + "Getting valid files from file path. This may take a few minutes.", + recursive=recursive, + ) if show_progress: with yaspin().arc as sp: sp.text = "Finding uploadable files in the given paths." file_paths = self._preprocess_paths( - paths, spinner=sp, recursive=recursive, desired_file_types=desired_file_types + paths, + spinner=sp, + recursive=recursive, + desired_file_types=desired_file_types, ) else: file_paths = self._preprocess_paths(paths, recursive=recursive, desired_file_types=desired_file_types) @@ -455,7 +483,11 @@ async def _download_and_log_errors( include_meta=include_meta, ) except FileNotFoundInDeepsetCloudException as e: - logger.error("File was listed in deepset Cloud but could not be downloaded.", file_id=file_id, error=e) + logger.error( + "File was listed in deepset Cloud but could not be downloaded.", + file_id=file_id, + error=e, + ) except Exception as e: logger.error("Failed to download file.", file_id=file_id, error=e) @@ -598,7 +630,9 @@ async def upload_in_memory( # create session to upload files to async with self._create_upload_session( - workspace_name=workspace_name, write_mode=write_mode, enable_parallel_processing=enable_parallel_processing + workspace_name=workspace_name, + write_mode=write_mode, + enable_parallel_processing=enable_parallel_processing, ) as upload_session: upload_summary = await self._s3.upload_in_memory( upload_session=upload_session, files=files, show_progress=show_progress