From 346883120326b7eca8ad6c37a4f368659b47138b Mon Sep 17 00:00:00 2001 From: "hey@jiaweing.com" Date: Sat, 30 Nov 2024 20:15:59 +0800 Subject: [PATCH] feat: support url convert --- .gitignore | 1 + README.md | 39 +++++++++++--- document_converter/route.py | 82 +++++++++++++++++++----------- document_converter/service.py | 95 ++++++++++++++++++++++------------- worker/tasks.py | 6 +-- 5 files changed, 150 insertions(+), 73 deletions(-) diff --git a/.gitignore b/.gitignore index 76da708..74593d2 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ # Datasource local storage ignored files /dataSources/ /dataSources.local.xml +__pycache__/ # Additional IDE files *.iml diff --git a/README.md b/README.md index 788b99f..b410a20 100644 --- a/README.md +++ b/README.md @@ -5,20 +5,22 @@ ## Comparison to Other Parsing Libraries -| Original PDF | -|--------------| +| Original PDF | +| -------------------------------------------------------------------------------------------------------------------- | | | -| Docling-API | Marker | -|-------------|--------| +| Docling-API | Marker | +| ------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------ | | | | -| PyPDF | PyMuPDF4LLM | -|-------|-------------| +| PyPDF | PyMuPDF4LLM | +| ----------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------- | | | | ## Features + - **Multiple Format Support**: Converts various document types including: + - PDF files - Microsoft Word documents (DOCX) - PowerPoint presentations (PPTX) @@ -26,8 +28,10 @@ - Images (JPG, PNG, TIFF, BMP) - AsciiDoc files - Markdown files + - URLs - **Conversion Capabilities**: + - Text extraction and formatting - Table detection, extraction and conversion - Image extraction and processing @@ -35,6 +39,7 @@ - Configurable image resolution scaling - **API Endpoints**: + - Synchronous single document conversion - Synchronous batch document conversion - Asynchronous single document conversion with job tracking @@ -49,16 +54,19 @@ ## Environment Setup (Running Locally) ### Prerequisites + - Python 3.8 or higher - Poetry (Python package manager) - Redis server (for task queue) ### 1. Install Poetry (if not already installed) + ```bash curl -sSL https://install.python-poetry.org | python3 - ``` ### 2. Clone and Setup Project + ```bash git clone https://github.com/drmingler/docling-api.git cd document-converter @@ -66,22 +74,27 @@ poetry install ``` ### 3. Configure Environment + Create a `.env` file in the project root: + ```bash REDIS_HOST=redis://localhost:6379/0 ENV=development ``` ### 4. Start Redis Server + Start Redis locally (install if not already installed): #### For MacOS: + ```bash brew install redis brew services start redis ``` #### For Ubuntu/Debian: + ```bash sudo apt-get install redis-server sudo service redis-server start @@ -90,16 +103,19 @@ sudo service redis-server start ### 5. Start the Application Components 1. Start the FastAPI server: + ```bash poetry run uvicorn main:app --reload --port 8080 ``` 2. Start Celery worker (in a new terminal): + ```bash poetry run celery -A worker.celery_config worker --pool=solo -n worker_primary --loglevel=info ``` 3. Start Flower dashboard for monitoring (optional, in a new terminal): + ```bash poetry run celery -A worker.celery_config flower --port=5555 ``` @@ -107,11 +123,13 @@ poetry run celery -A worker.celery_config flower --port=5555 ### 6. Verify Installation 1. Check if the API server is running: + ```bash curl http://localhost:8080/docs ``` 2. Test Celery worker: + ```bash curl -X POST "http://localhost:8080/documents/convert" \ -H "accept: application/json" \ @@ -120,6 +138,7 @@ curl -X POST "http://localhost:8080/documents/convert" \ ``` 3. Access monitoring dashboard: + - Open http://localhost:5555 in your browser to view the Flower dashboard ### Development Notes @@ -132,25 +151,31 @@ curl -X POST "http://localhost:8080/documents/convert" \ ## Environment Setup (Running in Docker) 1. Clone the repository: + ```bash git clone https://github.com/drmingler/docling-api.git cd document-converter ``` 2. Create a `.env` file: + ```bash REDIS_HOST=redis://redis:6379/0 ENV=production ``` ### CPU Mode + To start the service using CPU-only processing, use the following command. You can adjust the number of Celery workers by specifying the --scale option. In this example, 1 worker will be created: + ```bash docker-compose -f docker-compose.cpu.yml up --build --scale celery_worker=1 ``` ### GPU Mode (Recommend for production) + For production, it is recommended to enable GPU acceleration, as it significantly improves performance. Use the command below to start the service with GPU support. You can also scale the number of Celery workers using the --scale option; here, 3 workers will be launched: + ```bash docker-compose -f docker-compose.gpu.yml up --build --scale celery_worker=3 ``` @@ -237,9 +262,11 @@ The service uses a distributed architecture with the following components: - Multiple workers can be scaled horizontally for increased throughput ## License + The codebase is under MIT license. See LICENSE for more information ## Acknowledgements + - [Docling](https://github.com/DS4SD/docling) the state-of-the-art document conversion library by IBM - [FastAPI](https://fastapi.tiangolo.com/) the web framework - [Celery](https://docs.celeryq.dev/en/stable/) for distributed task processing diff --git a/document_converter/route.py b/document_converter/route.py index e04ce6b..83797a0 100644 --- a/document_converter/route.py +++ b/document_converter/route.py @@ -1,7 +1,8 @@ from io import BytesIO from multiprocessing.pool import AsyncResult from typing import List -from fastapi import APIRouter, File, HTTPException, UploadFile, Query +from fastapi import APIRouter, File, HTTPException, UploadFile, Query, Body +from pydantic import HttpUrl from document_converter.schema import BatchConversionJobResult, ConversationJobResult, ConversionResult from document_converter.service import DocumentConverterService, DoclingDocumentConversion @@ -23,16 +24,23 @@ description="Convert a single document synchronously", ) async def convert_single_document( - document: UploadFile = File(...), + document: UploadFile = File(None), + url: HttpUrl = Body(None), extract_tables_as_images: bool = False, image_resolution_scale: int = Query(4, ge=1, le=4), ): - file_bytes = await document.read() - if not is_file_format_supported(file_bytes, document.filename): - raise HTTPException(status_code=400, detail=f"Unsupported file format: {document.filename}") + if document: + file_bytes = await document.read() + if not is_file_format_supported(file_bytes, document.filename): + raise HTTPException(status_code=400, detail=f"Unsupported file format: {document.filename}") + doc_input = (document.filename, BytesIO(file_bytes)) + elif url: + doc_input = str(url) + else: + raise HTTPException(status_code=400, detail="Either document or url must be provided") return document_converter_service.convert_document( - (document.filename, BytesIO(file_bytes)), + doc_input, extract_tables=extract_tables_as_images, image_resolution_scale=image_resolution_scale, ) @@ -45,19 +53,25 @@ async def convert_single_document( description="Convert multiple documents synchronously", ) async def convert_multiple_documents( - documents: List[UploadFile] = File(...), + documents: List[UploadFile] = File(None), + urls: List[HttpUrl] = Body(None), extract_tables_as_images: bool = False, image_resolution_scale: int = Query(4, ge=1, le=4), ): - doc_streams = [] - for document in documents: - file_bytes = await document.read() - if not is_file_format_supported(file_bytes, document.filename): - raise HTTPException(status_code=400, detail=f"Unsupported file format: {document.filename}") - doc_streams.append((document.filename, BytesIO(file_bytes))) + if documents: + doc_inputs = [] + for document in documents: + file_bytes = await document.read() + if not is_file_format_supported(file_bytes, document.filename): + raise HTTPException(status_code=400, detail=f"Unsupported file format: {document.filename}") + doc_inputs.append((document.filename, BytesIO(file_bytes))) + elif urls: + doc_inputs = [str(url) for url in urls] + else: + raise HTTPException(status_code=400, detail="Either documents or urls must be provided") return document_converter_service.convert_documents( - doc_streams, + doc_inputs, extract_tables=extract_tables_as_images, image_resolution_scale=image_resolution_scale, ) @@ -70,16 +84,23 @@ async def convert_multiple_documents( description="Create a conversion job for a single document", ) async def create_single_document_conversion_job( - document: UploadFile = File(...), + document: UploadFile = File(None), + url: HttpUrl = Body(None), extract_tables_as_images: bool = False, image_resolution_scale: int = Query(4, ge=1, le=4), ): - file_bytes = await document.read() - if not is_file_format_supported(file_bytes, document.filename): - raise HTTPException(status_code=400, detail=f"Unsupported file format: {document.filename}") + if document: + file_bytes = await document.read() + if not is_file_format_supported(file_bytes, document.filename): + raise HTTPException(status_code=400, detail=f"Unsupported file format: {document.filename}") + doc_input = (document.filename, file_bytes) + elif url: + doc_input = str(url) + else: + raise HTTPException(status_code=400, detail="Either document or url must be provided") task = convert_document_task.delay( - (document.filename, file_bytes), + doc_input, extract_tables=extract_tables_as_images, image_resolution_scale=image_resolution_scale, ) @@ -104,20 +125,25 @@ async def get_conversion_job_status(job_id: str): description="Create a conversion job for multiple documents", ) async def create_batch_conversion_job( - documents: List[UploadFile] = File(...), + documents: List[UploadFile] = File(None), + urls: List[HttpUrl] = Body(None), extract_tables_as_images: bool = False, image_resolution_scale: int = Query(4, ge=1, le=4), ): - """Create a batch conversion job for multiple documents.""" - doc_data = [] - for document in documents: - file_bytes = await document.read() - if not is_file_format_supported(file_bytes, document.filename): - raise HTTPException(status_code=400, detail=f"Unsupported file format: {document.filename}") - doc_data.append((document.filename, file_bytes)) + if documents: + doc_inputs = [] + for document in documents: + file_bytes = await document.read() + if not is_file_format_supported(file_bytes, document.filename): + raise HTTPException(status_code=400, detail=f"Unsupported file format: {document.filename}") + doc_inputs.append((document.filename, file_bytes)) + elif urls: + doc_inputs = [str(url) for url in urls] + else: + raise HTTPException(status_code=400, detail="Either documents or urls must be provided") task = convert_documents_task.delay( - doc_data, + doc_inputs, extract_tables=extract_tables_as_images, image_resolution_scale=image_resolution_scale, ) diff --git a/document_converter/service.py b/document_converter/service.py index ec3bc39..688136f 100644 --- a/document_converter/service.py +++ b/document_converter/service.py @@ -2,7 +2,7 @@ import logging from abc import ABC, abstractmethod from io import BytesIO -from typing import List, Tuple +from typing import List, Tuple, Union from celery.result import AsyncResult from docling.datamodel.base_models import InputFormat, DocumentStream @@ -19,11 +19,11 @@ class DocumentConversionBase(ABC): @abstractmethod - def convert(self, document: Tuple[str, BytesIO], **kwargs) -> ConversionResult: + def convert(self, document: Union[Tuple[str, BytesIO], str], **kwargs) -> ConversionResult: pass @abstractmethod - def convert_batch(self, documents: List[Tuple[str, BytesIO]], **kwargs) -> List[ConversionResult]: + def convert_batch(self, documents: List[Union[Tuple[str, BytesIO], str]], **kwargs) -> List[ConversionResult]: pass @@ -67,29 +67,39 @@ def _process_document_images(conv_res) -> Tuple[str, List[ImageData]]: def convert( self, - document: Tuple[str, BytesIO], + document: Union[Tuple[str, BytesIO], str], extract_tables: bool = False, image_resolution_scale: int = IMAGE_RESOLUTION_SCALE, ) -> ConversionResult: - filename, file = document pipeline_options = self._setup_pipeline_options(extract_tables, image_resolution_scale) doc_converter = DocumentConverter( format_options={InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)} ) - conv_res = doc_converter.convert(DocumentStream(name=filename, stream=file), raises_on_error=False) - doc_filename = conv_res.input.file.stem + try: + if isinstance(document, tuple): + filename, file = document + conv_res = doc_converter.convert(DocumentStream(name=filename, stream=file), raises_on_error=False) + doc_filename = conv_res.input.file.stem + else: + # Handle URL or file path directly + conv_res = doc_converter.convert(document, raises_on_error=False) + doc_filename = document.split('/')[-1] # Extract filename from URL or path - if conv_res.errors: - logging.error(f"Failed to convert {filename}: {conv_res.errors[0].error_message}") - return ConversionResult(filename=doc_filename, error=conv_res.errors[0].error_message) + if conv_res.errors: + logging.error(f"Failed to convert {doc_filename}: {conv_res.errors[0].error_message}") + return ConversionResult(filename=doc_filename, error=conv_res.errors[0].error_message) - content_md, images = self._process_document_images(conv_res) - return ConversionResult(filename=doc_filename, markdown=content_md, images=images) + content_md, images = self._process_document_images(conv_res) + return ConversionResult(filename=doc_filename, markdown=content_md, images=images) + except Exception as e: + error_msg = f"Failed to convert document: {str(e)}" + logging.error(error_msg) + return ConversionResult(filename=doc_filename if 'doc_filename' in locals() else "unknown", error=error_msg) def convert_batch( self, - documents: List[Tuple[str, BytesIO]], + documents: List[Union[Tuple[str, BytesIO], str]], extract_tables: bool = False, image_resolution_scale: int = IMAGE_RESOLUTION_SCALE, ) -> List[ConversionResult]: @@ -98,22 +108,29 @@ def convert_batch( format_options={InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)} ) - conv_results = doc_converter.convert_all( - [DocumentStream(name=filename, stream=file) for filename, file in documents], - raises_on_error=False, - ) - results = [] - for conv_res in conv_results: - doc_filename = conv_res.input.file.stem - - if conv_res.errors: - logging.error(f"Failed to convert {conv_res.input.name}: {conv_res.errors[0].error_message}") - results.append(ConversionResult(filename=conv_res.input.name, error=conv_res.errors[0].error_message)) - continue - - content_md, images = self._process_document_images(conv_res) - results.append(ConversionResult(filename=doc_filename, markdown=content_md, images=images)) + for doc in documents: + try: + if isinstance(doc, tuple): + filename, file = doc + conv_res = doc_converter.convert(DocumentStream(name=filename, stream=file), raises_on_error=False) + doc_filename = conv_res.input.file.stem + else: + # Handle URL or file path directly + conv_res = doc_converter.convert(doc, raises_on_error=False) + doc_filename = doc.split('/')[-1] # Extract filename from URL or path + + if conv_res.errors: + logging.error(f"Failed to convert {doc_filename}: {conv_res.errors[0].error_message}") + results.append(ConversionResult(filename=doc_filename, error=conv_res.errors[0].error_message)) + continue + + content_md, images = self._process_document_images(conv_res) + results.append(ConversionResult(filename=doc_filename, markdown=content_md, images=images)) + except Exception as e: + error_msg = f"Failed to convert document: {str(e)}" + logging.error(error_msg) + results.append(ConversionResult(filename="unknown", error=error_msg)) return results @@ -122,31 +139,37 @@ class DocumentConverterService: def __init__(self, document_converter: DocumentConversionBase): self.document_converter = document_converter - def convert_document(self, document: Tuple[str, BytesIO], **kwargs) -> ConversionResult: + def convert_document(self, document: Union[Tuple[str, BytesIO], str], **kwargs) -> ConversionResult: result = self.document_converter.convert(document, **kwargs) if result.error: - logging.error(f"Failed to convert {document[0]}: {result.error}") + logging.error(f"Failed to convert document: {result.error}") raise HTTPException(status_code=500, detail=result.error) return result - def convert_documents(self, documents: List[Tuple[str, BytesIO]], **kwargs) -> List[ConversionResult]: + def convert_documents(self, documents: List[Union[Tuple[str, BytesIO], str]], **kwargs) -> List[ConversionResult]: return self.document_converter.convert_batch(documents, **kwargs) def convert_document_task( self, - document: Tuple[str, bytes], + document: Union[Tuple[str, bytes], str], **kwargs, ) -> ConversionResult: - document = (document[0], BytesIO(document[1])) + if isinstance(document, tuple): + document = (document[0], BytesIO(document[1])) return self.document_converter.convert(document, **kwargs) def convert_documents_task( self, - documents: List[Tuple[str, bytes]], + documents: List[Union[Tuple[str, bytes], str]], **kwargs, ) -> List[ConversionResult]: - documents = [(filename, BytesIO(file)) for filename, file in documents] - return self.document_converter.convert_batch(documents, **kwargs) + processed_docs = [] + for doc in documents: + if isinstance(doc, tuple): + processed_docs.append((doc[0], BytesIO(doc[1]))) + else: + processed_docs.append(doc) + return self.document_converter.convert_batch(processed_docs, **kwargs) def get_single_document_task_result(self, job_id: str) -> ConversationJobResult: """Get the status and result of a document conversion job. diff --git a/worker/tasks.py b/worker/tasks.py index d0d34ae..55fc805 100644 --- a/worker/tasks.py +++ b/worker/tasks.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Tuple, Union from document_converter.service import IMAGE_RESOLUTION_SCALE, DoclingDocumentConversion, DocumentConverterService from worker.celery_config import celery_app @@ -12,7 +12,7 @@ def ping(): @celery_app.task(bind=True, name="convert_document") def convert_document_task( self, - document: Tuple[str, bytes], + document: Union[Tuple[str, bytes], str], extract_tables: bool = False, image_resolution_scale: int = IMAGE_RESOLUTION_SCALE, ) -> Dict[str, Any]: @@ -26,7 +26,7 @@ def convert_document_task( @celery_app.task(bind=True, name="convert_documents") def convert_documents_task( self, - documents: List[Tuple[str, bytes]], + documents: List[Union[Tuple[str, bytes], str]], extract_tables: bool = False, image_resolution_scale: int = IMAGE_RESOLUTION_SCALE, ) -> List[Dict[str, Any]]: