Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 53 additions & 8 deletions deepset_cloud_sdk/_s3/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import os
import re
from dataclasses import dataclass
from datetime import timedelta
from http import HTTPStatus
from pathlib import Path
from types import TracebackType
from typing import Any, Coroutine, List, Optional, Sequence, Type, Union
from typing import Any, Coroutine, List, Optional, Sequence, Type, Union, cast

import aiofiles
import aiohttp
Expand Down Expand Up @@ -62,6 +63,7 @@ class S3UploadSummary:
successful_upload_count: int
failed_upload_count: int
failed: List[S3UploadResult]
cancelled: bool = False


def make_safe_file_name(file_name: str) -> str:
Expand Down Expand Up @@ -277,18 +279,41 @@ async def _process_results(
self,
tasks: List[Coroutine[Any, Any, S3UploadResult]],
show_progress: bool = True,
timeout: timedelta = timedelta(hours=1),
) -> S3UploadSummary:
"""Summarize the results of the uploads to S3.

:param tasks: List of upload tasks.
:param show_progress: Whether to show a progress bar.
:param timeout: Timeout for the upload. If set, cancels remaining uploads after timeout.
:return: S3UploadResult object.
"""
results: List[S3UploadResult] = []
cancelled = False

if show_progress:
results = await tqdm.gather(*tasks, desc="Upload to S3")
else:
results = await asyncio.gather(*tasks)
async def process_with_timeout() -> List[S3UploadResult]:
try:
if show_progress:
return cast(
List[S3UploadResult],
await tqdm.gather(*tasks, desc="Upload to S3"),
)
return cast(List[S3UploadResult], await asyncio.gather(*tasks))
except asyncio.TimeoutError:
task_objects = [asyncio.create_task(task) for task in tasks]
results = [task.result() for task in task_objects if task.done()]
return results

try:
results = await asyncio.wait_for(process_with_timeout(), timeout=timeout.total_seconds())
except asyncio.TimeoutError:
cancelled = True
logger.warning(
"Upload timeout reached. Returning partial results.",
timeout_s=timeout.total_seconds(),
)
task_objects = [asyncio.create_task(task) for task in tasks]
results = [task.result() for task in task_objects if task.done()]

logger.info(
"Finished uploading files.",
Expand All @@ -309,6 +334,7 @@ async def _process_results(
failed_upload_count=len(failed),
failed=failed,
total_files=len(tasks),
cancelled=cancelled,
)
if result_summary.successful_upload_count == 0:
logger.error("Could not upload any files to S3.")
Expand All @@ -320,34 +346,53 @@ async def upload_files_from_paths(
upload_session: UploadSession,
file_paths: List[Path],
show_progress: bool = True,
timeout: timedelta = timedelta(hours=1),
) -> S3UploadSummary:
"""Upload a set of files to the prefixed S3 namespace given a list of paths.

The timeout can be set to automatically canceling the remaining uploads if
the upload takes too long. The method will return the summary of all processed
files until the timeout is reached.

:param upload_session: UploadSession to associate the upload with.
:param file_paths: A list of paths to upload.
:param show_progress: Whether to show a progress bar on the upload.
:param timeout: Timeout for the upload.
:return: S3UploadSummary object.
"""
async with aiohttp.ClientSession(connector=self.connector) as client_session:
async with aiohttp.ClientSession(
connector=self.connector,
) as client_session:
tasks = []

for file_path in file_paths:
tasks.append(self.upload_from_file(file_path, upload_session, client_session))

result_summary = await self._process_results(tasks, show_progress=show_progress)
result_summary = await self._process_results(tasks, show_progress=show_progress, timeout=timeout)
if result_summary.cancelled:
logger.warning(
"Upload timeout reached. Returning partial results.",
timeout_s=timeout.total_seconds(),
)
return result_summary

async def upload_in_memory(
self,
upload_session: UploadSession,
files: Sequence[DeepsetCloudFileBase],
show_progress: bool = True,
timeout: timedelta = timedelta(hours=1),
) -> S3UploadSummary:
"""Upload a set of files to the prefixed S3 namespace given a list of paths.

The timeout can be set to automatically canceling the remaining uploads if
the upload takes too long. The method will return the summary of all processed
files until the timeout is reached.

:param upload_session: UploadSession to associate the upload with.
:param files: A list of DeepsetCloudFileBase to upload.
:param show_progress: Whether to show a progress bar on the upload.
:param timeout: Timeout for the upload.
:return: S3UploadSummary object.
"""
async with aiohttp.ClientSession(
Expand All @@ -373,6 +418,6 @@ async def upload_in_memory(
)
)

result_summary = await self._process_results(tasks, show_progress=show_progress)
result_summary = await self._process_results(tasks, show_progress=show_progress, timeout=timeout)

return result_summary
82 changes: 58 additions & 24 deletions deepset_cloud_sdk/_service/files_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,21 @@ async def _create_upload_session(
:return: Upload session ID.
"""
upload_session = await self._upload_sessions.create(
workspace_name=workspace_name, write_mode=write_mode, enable_parallel_processing=enable_parallel_processing
workspace_name=workspace_name,
write_mode=write_mode,
enable_parallel_processing=enable_parallel_processing,
)
try:
yield upload_session
finally:
await self._upload_sessions.close(workspace_name=workspace_name, session_id=upload_session.session_id)

async def _wrapped_direct_upload_path(
self, workspace_name: str, file_path: Path, meta: Dict[str, Any], write_mode: WriteMode
self,
workspace_name: str,
file_path: Path,
meta: Dict[str, Any],
write_mode: WriteMode,
) -> S3UploadResult:
try:
await self._files.direct_upload_path(
Expand Down Expand Up @@ -226,7 +232,10 @@ async def upload_file_paths(

_coroutines.append(
self._wrapped_direct_upload_path(
workspace_name=workspace_name, file_path=file_path, meta=meta, write_mode=write_mode
workspace_name=workspace_name,
file_path=file_path,
meta=meta,
write_mode=write_mode,
)
)
result = await asyncio.gather(*_coroutines)
Expand All @@ -242,25 +251,37 @@ async def upload_file_paths(
failed=[r for r in result if r.success is False],
)

# create session to upload files to
async with self._create_upload_session(
workspace_name=workspace_name, write_mode=write_mode, enable_parallel_processing=enable_parallel_processing
) as upload_session:
# upload file paths to session

upload_summary = await self._s3.upload_files_from_paths(
upload_session=upload_session, file_paths=file_paths, show_progress=show_progress
)
logger.info(
"Summary of S3 Uploads",
successful_uploads=upload_summary.successful_upload_count,
failed_uploads=upload_summary.failed_upload_count,
failed=upload_summary.failed,
)
cancelled = False
while not cancelled:
# create session to upload files to
async with self._create_upload_session(
workspace_name=workspace_name,
write_mode=write_mode,
enable_parallel_processing=enable_parallel_processing,
) as upload_session:
upload_summary = await self._s3.upload_files_from_paths(
upload_session=upload_session,
file_paths=file_paths,
show_progress=show_progress,
)
cancelled = upload_summary.cancelled
logger.info(
"Summary of S3 Uploads",
successful_uploads=upload_summary.successful_upload_count,
failed_uploads=upload_summary.failed_upload_count,
failed=upload_summary.failed,
)

# wait for ingestion to finish
if blocking:
total_files = len(list(filter(lambda x: not os.path.basename(x).endswith(META_SUFFIX), file_paths)))
total_files = len(
list(
filter(
lambda x: not os.path.basename(x).endswith(META_SUFFIX),
file_paths,
)
)
)
await self._wait_for_finished(
workspace_name=workspace_name,
session_id=upload_session.session_id,
Expand Down Expand Up @@ -336,7 +357,8 @@ def _remove_duplicates(file_paths: List[Path]) -> List[Path]:
for file_name, file_group in files_by_name.items():
if len(file_group) > 1:
logger.warning(
"Multiple files with the same name found. Keeping the most recent one.", file_name=file_name
"Multiple files with the same name found. Keeping the most recent one.",
file_name=file_name,
)
most_recent_file = sorted(file_group, key=lambda x: x.stat().st_mtime, reverse=True)[0]
most_recent_files.append(most_recent_file)
Expand Down Expand Up @@ -417,13 +439,19 @@ async def upload(
Use this to speed up the upload process and if you are not running concurrent uploads for the same files.
:raises TimeoutError: If blocking is True and the ingestion takes longer than timeout_s.
"""
logger.info("Getting valid files from file path. This may take a few minutes.", recursive=recursive)
logger.info(
"Getting valid files from file path. This may take a few minutes.",
recursive=recursive,
)

if show_progress:
with yaspin().arc as sp:
sp.text = "Finding uploadable files in the given paths."
file_paths = self._preprocess_paths(
paths, spinner=sp, recursive=recursive, desired_file_types=desired_file_types
paths,
spinner=sp,
recursive=recursive,
desired_file_types=desired_file_types,
)
else:
file_paths = self._preprocess_paths(paths, recursive=recursive, desired_file_types=desired_file_types)
Expand Down Expand Up @@ -455,7 +483,11 @@ async def _download_and_log_errors(
include_meta=include_meta,
)
except FileNotFoundInDeepsetCloudException as e:
logger.error("File was listed in deepset Cloud but could not be downloaded.", file_id=file_id, error=e)
logger.error(
"File was listed in deepset Cloud but could not be downloaded.",
file_id=file_id,
error=e,
)
except Exception as e:
logger.error("Failed to download file.", file_id=file_id, error=e)

Expand Down Expand Up @@ -598,7 +630,9 @@ async def upload_in_memory(

# create session to upload files to
async with self._create_upload_session(
workspace_name=workspace_name, write_mode=write_mode, enable_parallel_processing=enable_parallel_processing
workspace_name=workspace_name,
write_mode=write_mode,
enable_parallel_processing=enable_parallel_processing,
) as upload_session:
upload_summary = await self._s3.upload_in_memory(
upload_session=upload_session, files=files, show_progress=show_progress
Expand Down
Loading