Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions backend/workflow_manager/endpoint_v2/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,24 @@ def _get_source_endpoint_for_workflow(

def validate(self) -> None:
connection_type = self.endpoint.connection_type
connector: ConnectorInstance = self.endpoint.connector_instance
connector: ConnectorInstance | None = self.endpoint.connector_instance

if connection_type is None:
error_msg = "Missing destination connection type"
self.workflow_log.log_error(logger, error_msg)
raise MissingDestinationConnectionType()
if connection_type not in WorkflowEndpoint.ConnectionType.values:
error_msg = f"Invalid destination connection type: {connection_type}"
self.workflow_log.log_error(logger, error_msg)
raise InvalidDestinationConnectionType()
if (
# Check if connector is required but missing
requires_connector = (
connection_type != WorkflowEndpoint.ConnectionType.API
and connection_type != WorkflowEndpoint.ConnectionType.MANUALREVIEW
and connector is None
):
)
if requires_connector and connector is None:
error_msg = "Destination connector not configured"
self.workflow_log.log_error(logger, error_msg)
raise DestinationConnectorNotConfigured()

# Validate database connection if it's a database destination
Expand All @@ -147,10 +155,18 @@ def validate(self) -> None:
connector_settings=connector.connector_metadata,
)
engine = db_class.get_engine()
self.workflow_log.log_info(logger, "Database connection test successful")
if hasattr(engine, "close"):
engine.close()
except ConnectorError as e:
error_msg = f"Database connector validation failed: {e}"
self.workflow_log.log_error(logger, error_msg)
logger.exception(error_msg)
raise
except Exception as e:
logger.error(f"Database connection failed: {str(e)}")
error_msg = f"Unexpected error during database validation: {e}"
self.workflow_log.log_error(logger, error_msg)
logger.exception(error_msg)
raise

def _should_handle_hitl(
Expand Down
9 changes: 9 additions & 0 deletions backend/workflow_manager/endpoint_v2/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,20 @@ def _get_endpoint_for_workflow(
def validate(self) -> None:
connection_type = self.endpoint.connection_type
connector: ConnectorInstance = self.endpoint.connector_instance

if connection_type is None:
error_msg = "Missing source connection type"
self.workflow_log.log_error(logger, error_msg)
raise MissingSourceConnectionType()

if connection_type not in WorkflowEndpoint.ConnectionType.values:
error_msg = f"Invalid source connection type: {connection_type}"
self.workflow_log.log_error(logger, error_msg)
raise InvalidSourceConnectionType()

if connection_type != WorkflowEndpoint.ConnectionType.API and connector is None:
error_msg = "Source connector not configured"
self.workflow_log.log_error(logger, error_msg)
raise SourceConnectorNotConfigured()

def valid_file_patterns(self, required_patterns: list[Any]) -> list[str]:
Expand Down
10 changes: 3 additions & 7 deletions backend/workflow_manager/utils/workflow_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@
from utils.local_context import StateStore

from unstract.core.pubsub_helper import LogPublisher
from unstract.workflow_execution.enums import (
LogComponent,
LogLevel,
LogStage,
LogState,
)
from unstract.workflow_execution.enums import LogComponent, LogLevel, LogStage, LogState


class WorkflowLog:
Expand All @@ -22,7 +17,8 @@ def __init__(
pipeline_id: str | None = None,
):
log_events_id: str | None = StateStore.get(Common.LOG_EVENTS_ID)
self.messaging_channel = log_events_id if log_events_id else pipeline_id
# Ensure messaging_channel is never None - use execution_id as fallback
self.messaging_channel = log_events_id or pipeline_id or str(execution_id)
self.execution_id = str(execution_id)
self.file_execution_id = str(file_execution_id) if file_execution_id else None
self.organization_id = str(organization_id) if organization_id else None
Expand Down
26 changes: 14 additions & 12 deletions workers/shared/workflow/destination_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Optional

from shared.enums import QueueResultStatus
from shared.enums import DestinationConfigKey, QueueResultStatus

# Import database utils (stable path)
from shared.infrastructure.database.utils import WorkerDatabaseUtils
from shared.infrastructure.logging import WorkerLogger
from shared.infrastructure.logging.helpers import log_file_error, log_file_info
from shared.models.result_models import QueueResult
from shared.utils.api_result_cache import get_api_cache_manager
from shared.utils.manual_review_factory import (
get_manual_review_service,
has_manual_review_plugin,
)
from shared.workflow.connectors.service import WorkerConnectorService
from shared.workflow.logger_helper import WorkflowLoggerHelper

from unstract.connectors.connectorkit import Connectorkit
from unstract.connectors.exceptions import ConnectorError
Expand All @@ -53,12 +58,6 @@
ExecutionFileHandler,
)

from ..enums import DestinationConfigKey
from ..infrastructure.logging import WorkerLogger
from ..infrastructure.logging.helpers import log_file_error, log_file_info
from ..utils.api_result_cache import get_api_cache_manager
from .connectors.service import WorkerConnectorService

if TYPE_CHECKING:
from ..api_client import InternalAPIClient

Expand Down Expand Up @@ -198,6 +197,9 @@ def __init__(self, config: DestinationConfig, workflow_log=None):
self.settings = config.settings
self.workflow_log = workflow_log

# Initialize logger helper for safe logging operations
self.logger_helper = WorkflowLoggerHelper(workflow_log)

# Store destination connector instance details
self.connector_id = config.connector_id
self.connector_settings = config.connector_settings
Expand Down Expand Up @@ -927,7 +929,7 @@ def insert_into_db(
logger.info(f"Successfully inserted data into database table {table_name}")

# Log to UI with file_execution_id for better correlation
if self.workflow_log and hasattr(self, "current_file_execution_id"):
if hasattr(self, "current_file_execution_id"):
log_file_info(
self.workflow_log,
self.current_file_execution_id,
Expand Down Expand Up @@ -1175,7 +1177,7 @@ def copy_output_to_output_directory(
logger.error(error_message)

# Log to UI with file_execution_id
if self.workflow_log and hasattr(self, "current_file_execution_id"):
if hasattr(self, "current_file_execution_id"):
log_file_info(
self.workflow_log,
self.current_file_execution_id,
Expand All @@ -1189,7 +1191,7 @@ def copy_output_to_output_directory(
logger.info(success_message)

# Log to UI
if self.workflow_log and hasattr(self, "current_file_execution_id"):
if hasattr(self, "current_file_execution_id"):
log_file_info(
self.workflow_log,
self.current_file_execution_id,
Expand All @@ -1202,7 +1204,7 @@ def copy_output_to_output_directory(
logger.info(success_message)

# Log to UI
if self.workflow_log and hasattr(self, "current_file_execution_id"):
if hasattr(self, "current_file_execution_id"):
log_file_info(
self.workflow_log,
self.current_file_execution_id,
Expand All @@ -1214,7 +1216,7 @@ def copy_output_to_output_directory(
logger.error(error_msg, exc_info=True)

# Log error to UI
if self.workflow_log and hasattr(self, "current_file_execution_id"):
if hasattr(self, "current_file_execution_id"):
log_file_info(
self.workflow_log,
self.current_file_execution_id,
Expand Down
48 changes: 48 additions & 0 deletions workers/shared/workflow/logger_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Workflow logger helper for safe logging operations.
This module provides a helper class to handle workflow logging operations
safely, eliminating the need for repetitive conditional checks throughout
the codebase.
"""

import logging

from shared.infrastructure.logging.workflow_logger import WorkerWorkflowLogger


class WorkflowLoggerHelper:
"""Helper class for safe workflow logging operations.
This class encapsulates the logic for safely logging messages when a
workflow_log instance is available, eliminating repetitive conditional
checks throughout the connector classes.
"""

def __init__(self, workflow_log: WorkerWorkflowLogger | None = None) -> None:
"""Initialize the logger helper.
Args:
workflow_log: Optional workflow log instance that provides
log_info and log_error methods.
"""
self.workflow_log = workflow_log

def log_info(self, logger: logging.Logger, message: str) -> None:
"""Safely log info message if workflow_log is available.
Args:
logger: The standard Python logger instance
message: The message to log
"""
if self.workflow_log:
self.workflow_log.log_info(logger, message)

def log_error(self, logger: logging.Logger, message: str) -> None:
"""Safely log error message if workflow_log is available.
Args:
logger: The standard Python logger instance
message: The error message to log
"""
if self.workflow_log:
self.workflow_log.log_error(logger, message)
63 changes: 41 additions & 22 deletions workers/shared/workflow/source_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
from dataclasses import dataclass
from typing import Any

from unstract.core.data_models import ConnectionType as CoreConnectionType
from shared.infrastructure.logging.logger import WorkerLogger
from shared.workflow.logger_helper import WorkflowLoggerHelper

from ..infrastructure.logging.logger import WorkerLogger
from unstract.core.data_models import ConnectionType as CoreConnectionType

logger = WorkerLogger.get_logger(__name__)

Expand Down Expand Up @@ -94,6 +95,7 @@
self.connection_type = config.connection_type
self.settings = config.settings
self.workflow_log = workflow_log
self.logger_helper = WorkflowLoggerHelper(workflow_log)

# Store connector instance details
self.connector_id = config.connector_id
Expand All @@ -110,27 +112,39 @@
This method replicates backend logic for getting filesystem access.
"""
if self.connection_type == self.ConnectionType.API_STORAGE:
# API storage uses workflow execution storage
from unstract.filesystem import FileStorageType, FileSystem
try:
if self.connection_type == self.ConnectionType.API_STORAGE:
# API storage uses workflow execution storage
from unstract.filesystem import FileStorageType, FileSystem

file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
return file_system.get_file_storage()

file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
return file_system.get_file_storage()
if not self.connector_id or not self.connector_settings:
error_msg = (
"Source connector not configured - missing connector_id or settings"
)
self.logger_helper.log_error(logger, error_msg)
raise Exception(error_msg)

Check warning on line 128 in workers/shared/workflow/source_connector.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this generic exception class with a more specific one.

See more on https://sonarcloud.io/project/issues?id=Zipstack_unstract&issues=AZpo0mZIIaUyjjGwWlCv&open=AZpo0mZIIaUyjjGwWlCv&pullRequest=1650

if not self.connector_id or not self.connector_settings:
raise Exception("Source connector not configured")
# Get the connector instance using connectorkit
from unstract.connectors.connectorkit import Connectorkit

# Get the connector instance using connectorkit
from unstract.connectors.connectorkit import Connectorkit
connectorkit = Connectorkit()
connector_class = connectorkit.get_connector_class_by_connector_id(
self.connector_id
)
connector_instance = connector_class(self.connector_settings)

connectorkit = Connectorkit()
connector_class = connectorkit.get_connector_class_by_connector_id(
self.connector_id
)
connector_instance = connector_class(self.connector_settings)
# Get fsspec filesystem
fs = connector_instance.get_fsspec_fs()
return fs

# Get fsspec filesystem
return connector_instance.get_fsspec_fs()
except Exception as e:
error_msg = f"Failed to initialize source connector filesystem: {str(e)}"
self.logger_helper.log_error(logger, error_msg)
logger.error(error_msg)
raise

def read_file_content(self, file_path: str) -> bytes:
"""Read file content from source connector.
Expand Down Expand Up @@ -164,7 +178,6 @@
List of file information dictionaries
"""
fs = self.get_fsspec_fs()

# Implementation would list files using fsspec
# This is a simplified version
try:
Expand All @@ -186,7 +199,9 @@

return files
except Exception as e:
logger.error(f"Failed to list files from source: {e}")
error_msg = f"Failed to list files from source connector directory '{input_directory}': {str(e)}"
self.logger_helper.log_error(logger, error_msg)
logger.error(error_msg)
return []

def validate(self) -> None:
Expand All @@ -198,11 +213,15 @@
self.ConnectionType.API,
self.ConnectionType.API_STORAGE,
]:
raise Exception(f"Invalid source connection type: {connection_type}")
error_msg = f"Invalid source connection type: {connection_type}"
self.logger_helper.log_error(logger, error_msg)
raise Exception(error_msg)

Check warning on line 218 in workers/shared/workflow/source_connector.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this generic exception class with a more specific one.

See more on https://sonarcloud.io/project/issues?id=Zipstack_unstract&issues=AZpo0mZIIaUyjjGwWlCw&open=AZpo0mZIIaUyjjGwWlCw&pullRequest=1650

if connection_type == self.ConnectionType.FILESYSTEM:
if not self.connector_id or not self.connector_settings:
raise Exception("Filesystem source requires connector configuration")
error_msg = "Filesystem source requires connector configuration"
self.logger_helper.log_error(logger, error_msg)
raise Exception(error_msg)

Check warning on line 224 in workers/shared/workflow/source_connector.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this generic exception class with a more specific one.

See more on https://sonarcloud.io/project/issues?id=Zipstack_unstract&issues=AZpo0mZIIaUyjjGwWlCx&open=AZpo0mZIIaUyjjGwWlCx&pullRequest=1650

def get_config(self) -> SourceConfig:
"""Get serializable configuration for the source connector."""
Expand Down