Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions backend/workflow_manager/endpoint_v2/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,27 @@ def _get_source_endpoint_for_workflow(
return endpoint

def validate(self) -> None:
self.workflow_log.log_info(logger, "Starting destination connector validation")

connection_type = self.endpoint.connection_type
connector: ConnectorInstance = self.endpoint.connector_instance
connector: ConnectorInstance | None = self.endpoint.connector_instance

if connection_type is None:
error_msg = "Missing destination connection type"
self.workflow_log.log_error(logger, error_msg)
raise MissingDestinationConnectionType()
if connection_type not in WorkflowEndpoint.ConnectionType.values:
error_msg = f"Invalid destination connection type: {connection_type}"
self.workflow_log.log_error(logger, error_msg)
raise InvalidDestinationConnectionType()
if (
# Check if connector is required but missing
requires_connector = (
connection_type != WorkflowEndpoint.ConnectionType.API
and connection_type != WorkflowEndpoint.ConnectionType.MANUALREVIEW
and connector is None
):
)
if requires_connector and connector is None:
error_msg = "Destination connector not configured"
self.workflow_log.log_error(logger, error_msg)
raise DestinationConnectorNotConfigured()

# Validate database connection if it's a database destination
Expand All @@ -153,12 +163,26 @@ def validate(self) -> None:
connector_settings=connector.connector_metadata,
)
engine = db_class.get_engine()
self.workflow_log.log_info(logger, "Database connection test successful")
if hasattr(engine, "close"):
engine.close()
except ConnectorError as e:
# Handle specific connector errors with detailed logging
error_msg = f"Database connector validation failed: {str(e)}"
self.workflow_log.log_error(logger, error_msg)
logger.error(error_msg)
raise
except Exception as e:
logger.error(f"Database connection failed: {str(e)}")
# Handle unexpected errors
error_msg = f"Unexpected error during database validation: {str(e)}"
self.workflow_log.log_error(logger, error_msg)
logger.error(error_msg)
raise

self.workflow_log.log_info(
logger, "Destination connector validation completed successfully"
)

def _should_handle_hitl(
self,
file_name: str,
Expand Down
16 changes: 16 additions & 0 deletions backend/workflow_manager/endpoint_v2/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,31 @@ def _get_endpoint_for_workflow(
return endpoint

def validate(self) -> None:
self.workflow_log.log_info(logger, "Starting source connector validation")

connection_type = self.endpoint.connection_type
connector: ConnectorInstance = self.endpoint.connector_instance

if connection_type is None:
error_msg = "Missing source connection type"
self.workflow_log.log_error(logger, error_msg)
raise MissingSourceConnectionType()

if connection_type not in WorkflowEndpoint.ConnectionType.values:
error_msg = f"Invalid source connection type: {connection_type}"
self.workflow_log.log_error(logger, error_msg)
raise InvalidSourceConnectionType()

if connection_type != WorkflowEndpoint.ConnectionType.API and connector is None:
error_msg = "Source connector not configured"
self.workflow_log.log_error(logger, error_msg)
raise SourceConnectorNotConfigured()

# If we reach here, validation passed
self.workflow_log.log_info(
logger, "Source connector validation completed successfully"
)

def valid_file_patterns(self, required_patterns: list[Any]) -> list[str]:
patterns = {
FileType.PDF_DOCUMENTS: FilePattern.PDF_DOCUMENTS,
Expand Down
10 changes: 3 additions & 7 deletions backend/workflow_manager/utils/workflow_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@
from utils.local_context import StateStore

from unstract.core.pubsub_helper import LogPublisher
from unstract.workflow_execution.enums import (
LogComponent,
LogLevel,
LogStage,
LogState,
)
from unstract.workflow_execution.enums import LogComponent, LogLevel, LogStage, LogState


class WorkflowLog:
Expand All @@ -22,7 +17,8 @@ def __init__(
pipeline_id: str | None = None,
):
log_events_id: str | None = StateStore.get(Common.LOG_EVENTS_ID)
self.messaging_channel = log_events_id if log_events_id else pipeline_id
# Ensure messaging_channel is never None - use execution_id as fallback
self.messaging_channel = log_events_id or pipeline_id or str(execution_id)
self.execution_id = str(execution_id)
self.file_execution_id = str(file_execution_id) if file_execution_id else None
self.organization_id = str(organization_id) if organization_id else None
Expand Down
70 changes: 63 additions & 7 deletions backend/workflow_manager/workflow_v2/execution_log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ def process_log_history_from_cache(

logger.info(f"Processing {logs_count} logs from queue '{queue_name}'")

# Count error logs for priority reporting
error_logs_count = sum(
1 for log in logs_to_process if log.data.get("level") == "ERROR"
)
if error_logs_count > 0:
logger.error(
f"Found {error_logs_count} ERROR level logs in batch - will be processed with priority"
)

# Preload required WorkflowExecution and WorkflowFileExecution objects
execution_ids = {log.execution_id for log in logs_to_process}
file_execution_ids = {
Expand All @@ -77,14 +86,40 @@ def process_log_history_from_cache(
for obj in WorkflowFileExecution.objects.filter(id__in=file_execution_ids)
}

# Process logs with preloaded data
for log_data in logs_to_process:
# Process logs with preloaded data, prioritizing error logs
# Sort to process ERROR level logs first
sorted_logs = sorted(
logs_to_process, key=lambda log: 0 if log.data.get("level") == "ERROR" else 1
)

for log_data in sorted_logs:
# Display log content for all levels in workflow-logging service
log_level = log_data.data.get("level", "INFO")
log_message = log_data.data.get("log") or log_data.data.get(
"message", "No message"
)
log_stage = log_data.data.get("stage", "UNKNOWN_STAGE")

# Display log with appropriate level
if log_level == "ERROR":
logger.error(f"Processing ERROR log [{log_stage}]: {log_message}")
elif log_level == "WARNING":
logger.warning(f"Processing WARNING log [{log_stage}]: {log_message}")
elif log_level == "INFO":
logger.info(f"Processing INFO log [{log_stage}]: {log_message}")
elif log_level == "DEBUG":
logger.debug(f"Processing DEBUG log [{log_stage}]: {log_message}")
else:
logger.info(f"Processing {log_level} log [{log_stage}]: {log_message}")

execution = execution_map.get(log_data.execution_id)
if not execution:
logger.warning(
f"Execution not found for execution_id: {log_data.execution_id}, "
"skipping log push"
)
error_msg = f"Execution not found for execution_id: {log_data.execution_id}, skipping log push"
# Use error level if the original log was an error
if log_data.data.get("level") == "ERROR":
logger.error(f"ERROR LOG SKIPPED: {error_msg}")
else:
logger.warning(error_msg)
skipped_count += 1
continue

Expand All @@ -111,7 +146,28 @@ def process_log_history_from_cache(
# Bulk insert logs for each organization
processed_count = 0
for organization_id, logs in organization_logs.items():
logger.info(f"Storing {len(logs)} logs for org: {organization_id}")
# Count logs by level for better visibility
log_counts: dict[str, int] = {}
for log in logs:
level = log.data.get("level", "INFO")
log_counts[level] = log_counts.get(level, 0) + 1

# Display log count summary
if "ERROR" in log_counts:
count_summary = ", ".join(
[f"{count} {level}" for level, count in log_counts.items()]
)
logger.error(
f"Storing logs for org {organization_id}: {count_summary} (total: {len(logs)})"
)
else:
count_summary = ", ".join(
[f"{count} {level}" for level, count in log_counts.items()]
)
logger.info(
f"Storing logs for org {organization_id}: {count_summary} (total: {len(logs)})"
)

ExecutionLog.objects.bulk_create(objs=logs, ignore_conflicts=True)
processed_count += len(logs)

Expand Down
9 changes: 8 additions & 1 deletion unstract/core/src/unstract/core/pubsub_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,14 @@ def publish(cls, channel_id: str, payload: dict[str, Any]) -> bool:
compression=None,
retry=True,
)
logging.debug(f"Published '{channel_id}' <= {payload}")

# Enhanced logging for error messages
if payload.get("level") == "ERROR":
logging.error(
f"ERROR log published to '{channel_id}': {payload.get('log', payload.get('message', ''))}"
)
else:
logging.debug(f"Published '{channel_id}' <= {payload}")

# Persisting messages for unified notification
if payload.get("type") == "LOG":
Expand Down
95 changes: 72 additions & 23 deletions workers/shared/workflow/source_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,27 +110,50 @@ def get_fsspec_fs(self):

This method replicates backend logic for getting filesystem access.
"""
if self.connection_type == self.ConnectionType.API_STORAGE:
# API storage uses workflow execution storage
from unstract.filesystem import FileStorageType, FileSystem
try:
if self.workflow_log:
self.workflow_log.log_info(
logger, "Initializing source connector filesystem"
)

if self.connection_type == self.ConnectionType.API_STORAGE:
# API storage uses workflow execution storage
from unstract.filesystem import FileStorageType, FileSystem

file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
return file_system.get_file_storage()
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
return file_system.get_file_storage()

if not self.connector_id or not self.connector_settings:
raise Exception("Source connector not configured")
if not self.connector_id or not self.connector_settings:
error_msg = (
"Source connector not configured - missing connector_id or settings"
)
if self.workflow_log:
self.workflow_log.log_error(logger, error_msg)
raise Exception(error_msg)

# Get the connector instance using connectorkit
from unstract.connectors.connectorkit import Connectorkit
# Get the connector instance using connectorkit
from unstract.connectors.connectorkit import Connectorkit

connectorkit = Connectorkit()
connector_class = connectorkit.get_connector_class_by_connector_id(
self.connector_id
)
connector_instance = connector_class(self.connector_settings)
connectorkit = Connectorkit()
connector_class = connectorkit.get_connector_class_by_connector_id(
self.connector_id
)
connector_instance = connector_class(self.connector_settings)

# Get fsspec filesystem
fs = connector_instance.get_fsspec_fs()
if self.workflow_log:
self.workflow_log.log_info(
logger, "Source connector filesystem initialized successfully"
)
return fs

# Get fsspec filesystem
return connector_instance.get_fsspec_fs()
except Exception as e:
error_msg = f"Failed to initialize source connector filesystem: {str(e)}"
if self.workflow_log:
self.workflow_log.log_error(logger, error_msg)
logger.error(error_msg)
raise

def read_file_content(self, file_path: str) -> bytes:
"""Read file content from source connector.
Expand Down Expand Up @@ -163,11 +186,16 @@ def list_files(
Returns:
List of file information dictionaries
"""
fs = self.get_fsspec_fs()

# Implementation would list files using fsspec
# This is a simplified version
try:
if self.workflow_log:
self.workflow_log.log_info(
logger, f"Listing files from source directory: {input_directory}"
)

fs = self.get_fsspec_fs()

# Implementation would list files using fsspec
# This is a simplified version
files = []
if self.connection_type == self.ConnectionType.API_STORAGE:
# Use filesystem listing
Expand All @@ -184,25 +212,46 @@ def list_files(
}
)

if self.workflow_log:
self.workflow_log.log_info(
logger, f"Found {len(files)} files in source directory"
)
return files
except Exception as e:
logger.error(f"Failed to list files from source: {e}")
error_msg = f"Failed to list files from source connector directory '{input_directory}': {str(e)}"
if self.workflow_log:
self.workflow_log.log_error(logger, error_msg)
logger.error(error_msg)
return []

def validate(self) -> None:
"""Validate source connector configuration."""
if self.workflow_log:
self.workflow_log.log_info(logger, "Starting source connector validation")

connection_type = self.connection_type

if connection_type not in [
self.ConnectionType.FILESYSTEM,
self.ConnectionType.API,
self.ConnectionType.API_STORAGE,
]:
raise Exception(f"Invalid source connection type: {connection_type}")
error_msg = f"Invalid source connection type: {connection_type}"
if self.workflow_log:
self.workflow_log.log_error(logger, error_msg)
raise Exception(error_msg)

if connection_type == self.ConnectionType.FILESYSTEM:
if not self.connector_id or not self.connector_settings:
raise Exception("Filesystem source requires connector configuration")
error_msg = "Filesystem source requires connector configuration"
if self.workflow_log:
self.workflow_log.log_error(logger, error_msg)
raise Exception(error_msg)

if self.workflow_log:
self.workflow_log.log_info(
logger, "Source connector validation completed successfully"
)

def get_config(self) -> SourceConfig:
"""Get serializable configuration for the source connector."""
Expand Down