Skip to content

[FEATURE]: Async Operations: Concurrent data fetching with semaphore control #294

@itsomg134

Description

@itsomg134

Motivation

"""
Comprehensive Python Application: Data Analysis and Web Scraping Tool
This module demonstrates various programming concepts including OOP, decorators,
context managers, async operations, and data processing.
"""

import asyncio
import json
import logging
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple, Any
from functools import wraps, lru_cache
from collections import defaultdict, Counter
from enum import Enum
import hashlib
import time

Configure logging

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(name)

============================================================================

ENUMS AND CONSTANTS

============================================================================

class DataFormat(Enum):
"""Supported data formats"""
JSON = "json"
CSV = "csv"
XML = "xml"
PARQUET = "parquet"

class ProcessingStatus(Enum):
"""Data processing status"""
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"

Constants

MAX_RETRIES = 3
TIMEOUT_SECONDS = 30
CACHE_SIZE = 128
DEFAULT_BATCH_SIZE = 100

============================================================================

DECORATORS

============================================================================

def timer(func):
"""Decorator to measure function execution time"""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
logger.info(f"{func.name} executed in {end - start:.4f} seconds")
return result
return wrapper

def retry(max_attempts=MAX_RETRIES, delay=1):
"""Decorator to retry function on failure"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
logger.error(f"Failed after {max_attempts} attempts: {e}")
raise
logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying...")
time.sleep(delay)
return None
return wrapper
return decorator

def validate_input(validator_func):
"""Decorator to validate function inputs"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not validator_func(*args, **kwargs):
raise ValueError(f"Invalid input for {func.name}")
return func(*args, **kwargs)
return wrapper
return decorator

============================================================================

DATA CLASSES

============================================================================

@DataClass
class DataRecord:
"""Represents a single data record"""
id: str
timestamp: datetime
data: Dict[str, Any]
status: ProcessingStatus = ProcessingStatus.PENDING
metadata: Dict[str, Any] = field(default_factory=dict)

def __post_init__(self):
    """Validate and process data after initialization"""
    if not self.id:
        self.id = self._generate_id()
    if not isinstance(self.timestamp, datetime):
        self.timestamp = datetime.now()

def _generate_id(self) -> str:
    """Generate unique ID based on data content"""
    content = json.dumps(self.data, sort_keys=True)
    return hashlib.md5(content.encode()).hexdigest()

def to_dict(self) -> Dict[str, Any]:
    """Convert record to dictionary"""
    return {
        'id': self.id,
        'timestamp': self.timestamp.isoformat(),
        'data': self.data,
        'status': self.status.value,
        'metadata': self.metadata
    }

@DataClass
class ProcessingResult:
"""Results from data processing operation"""
total_records: int
successful: int
failed: int
duration: float
errors: List[str] = field(default_factory=list)

@property
def success_rate(self) -> float:
    """Calculate success rate percentage"""
    return (self.successful / self.total_records * 100) if self.total_records > 0 else 0.0

============================================================================

ABSTRACT BASE CLASSES

============================================================================

class DataProcessor(ABC):
"""Abstract base class for data processors"""

def __init__(self, config: Dict[str, Any]):
    self.config = config
    self.processed_count = 0
    self._cache = {}

@abstractmethod
def process(self, data: Any) -> Any:
    """Process data - must be implemented by subclasses"""
    pass

@abstractmethod
def validate(self, data: Any) -> bool:
    """Validate data - must be implemented by subclasses"""
    pass

def reset(self):
    """Reset processor state"""
    self.processed_count = 0
    self._cache.clear()

class DataStorage(ABC):
"""Abstract base class for data storage"""

@abstractmethod
def save(self, data: Any, key: str) -> bool:
    """Save data to storage"""
    pass

@abstractmethod
def load(self, key: str) -> Any:
    """Load data from storage"""
    pass

@abstractmethod
def delete(self, key: str) -> bool:
    """Delete data from storage"""
    pass

============================================================================

CONCRETE IMPLEMENTATIONS

============================================================================

class TextProcessor(DataProcessor):
"""Process text data with various transformations"""

def __init__(self, config: Dict[str, Any]):
    super().__init__(config)
    self.stop_words = set(config.get('stop_words', []))
    self.min_length = config.get('min_length', 3)

def validate(self, data: Any) -> bool:
    """Validate text data"""
    return isinstance(data, str) and len(data) > 0

@timer
def process(self, data: str) -> Dict[str, Any]:
    """Process text with cleaning and analysis"""
    if not self.validate(data):
        raise ValueError("Invalid text data")
    
    # Clean text
    cleaned = self._clean_text(data)
    
    # Tokenize
    tokens = self._tokenize(cleaned)
    
    # Filter tokens
    filtered_tokens = self._filter_tokens(tokens)
    
    # Analyze
    analysis = self._analyze(filtered_tokens)
    
    self.processed_count += 1
    
    return {
        'original': data,
        'cleaned': cleaned,
        'tokens': filtered_tokens,
        'analysis': analysis
    }

def _clean_text(self, text: str) -> str:
    """Clean text by removing special characters and extra whitespace"""
    text = re.sub(r'[^\w\s]', ' ', text)
    text = re.sub(r'\s+', ' ', text)
    return text.strip().lower()

def _tokenize(self, text: str) -> List[str]:
    """Split text into tokens"""
    return text.split()

def _filter_tokens(self, tokens: List[str]) -> List[str]:
    """Filter tokens based on length and stop words"""
    return [
        token for token in tokens
        if len(token) >= self.min_length and token not in self.stop_words
    ]

@lru_cache(maxsize=CACHE_SIZE)
def _analyze(self, tokens: Tuple[str, ...]) -> Dict[str, Any]:
    """Analyze tokens to generate statistics"""
    counter = Counter(tokens)
    return {
        'total_tokens': len(tokens),
        'unique_tokens': len(counter),
        'most_common': counter.most_common(10),
        'avg_token_length': sum(len(t) for t in tokens) / len(tokens) if tokens else 0
    }

class NumericProcessor(DataProcessor):
"""Process numeric data with statistical analysis"""

def validate(self, data: Any) -> bool:
    """Validate numeric data"""
    return isinstance(data, (list, tuple)) and all(isinstance(x, (int, float)) for x in data)

@timer
def process(self, data: List[float]) -> Dict[str, Any]:
    """Process numeric data and compute statistics"""
    if not self.validate(data):
        raise ValueError("Invalid numeric data")
    
    sorted_data = sorted(data)
    n = len(sorted_data)
    
    stats = {
        'count': n,
        'sum': sum(data),
        'mean': sum(data) / n,
        'min': min(data),
        'max': max(data),
        'range': max(data) - min(data),
        'median': self._calculate_median(sorted_data),
        'std_dev': self._calculate_std_dev(data),
        'variance': self._calculate_variance(data)
    }
    
    self.processed_count += 1
    return stats

def _calculate_median(self, sorted_data: List[float]) -> float:
    """Calculate median of sorted data"""
    n = len(sorted_data)
    if n % 2 == 0:
        return (sorted_data[n//2 - 1] + sorted_data[n//2]) / 2
    return sorted_data[n//2]

def _calculate_variance(self, data: List[float]) -> float:
    """Calculate variance"""
    mean = sum(data) / len(data)
    return sum((x - mean) ** 2 for x in data) / len(data)

def _calculate_std_dev(self, data: List[float]) -> float:
    """Calculate standard deviation"""
    return self._calculate_variance(data) ** 0.5

class InMemoryStorage(DataStorage):
"""Simple in-memory storage implementation"""

def __init__(self):
    self._storage: Dict[str, Any] = {}
    self._access_count: Dict[str, int] = defaultdict(int)

def save(self, data: Any, key: str) -> bool:
    """Save data to memory"""
    try:
        self._storage[key] = data
        logger.info(f"Saved data with key: {key}")
        return True
    except Exception as e:
        logger.error(f"Failed to save data: {e}")
        return False

def load(self, key: str) -> Any:
    """Load data from memory"""
    self._access_count[key] += 1
    data = self._storage.get(key)
    if data is None:
        logger.warning(f"No data found for key: {key}")
    return data

def delete(self, key: str) -> bool:
    """Delete data from memory"""
    if key in self._storage:
        del self._storage[key]
        logger.info(f"Deleted data with key: {key}")
        return True
    return False

def get_stats(self) -> Dict[str, Any]:
    """Get storage statistics"""
    return {
        'total_keys': len(self._storage),
        'most_accessed': sorted(
            self._access_count.items(),
            key=lambda x: x[1],
            reverse=True
        )[:5]
    }

============================================================================

CONTEXT MANAGERS

============================================================================

class DataPipeline:
"""Context manager for data processing pipeline"""

def __init__(self, name: str, processors: List[DataProcessor]):
    self.name = name
    self.processors = processors
    self.start_time = None
    self.records_processed = 0

def __enter__(self):
    """Setup pipeline"""
    self.start_time = time.time()
    logger.info(f"Starting pipeline: {self.name}")
    return self

def __exit__(self, exc_type, exc_val, exc_tb):
    """Cleanup and report results"""
    duration = time.time() - self.start_time
    logger.info(f"Pipeline {self.name} completed in {duration:.2f}s")
    logger.info(f"Processed {self.records_processed} records")
    
    if exc_type is not None:
        logger.error(f"Pipeline failed with error: {exc_val}")
        return False
    return True

def process_batch(self, records: List[DataRecord]) -> ProcessingResult:
    """Process a batch of records through all processors"""
    successful = 0
    failed = 0
    errors = []
    
    for record in records:
        try:
            for processor in self.processors:
                record.data = processor.process(record.data)
            record.status = ProcessingStatus.COMPLETED
            successful += 1
        except Exception as e:
            record.status = ProcessingStatus.FAILED
            failed += 1
            errors.append(str(e))
            logger.error(f"Failed to process record {record.id}: {e}")
    
    self.records_processed += len(records)
    
    return ProcessingResult(
        total_records=len(records),
        successful=successful,
        failed=failed,
        duration=time.time() - self.start_time,
        errors=errors
    )

============================================================================

ASYNC OPERATIONS

============================================================================

class AsyncDataFetcher:
"""Async data fetcher with concurrency control"""

def __init__(self, max_concurrent: int = 5):
    self.max_concurrent = max_concurrent
    self.semaphore = asyncio.Semaphore(max_concurrent)

async def fetch_data(self, url: str) -> Dict[str, Any]:
    """Simulate async data fetching"""
    async with self.semaphore:
        logger.info(f"Fetching data from: {url}")
        await asyncio.sleep(0.1)  # Simulate network delay
        return {
            'url': url,
            'data': f"Data from {url}",
            'timestamp': datetime.now().isoformat()
        }

async def fetch_multiple(self, urls: List[str]) -> List[Dict[str, Any]]:
    """Fetch data from multiple URLs concurrently"""
    tasks = [self.fetch_data(url) for url in urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    successful = [r for r in results if not isinstance(r, Exception)]
    failed = [r for r in results if isinstance(r, Exception)]
    
    logger.info(f"Fetched {len(successful)} items, {len(failed)} failed")
    return successful

============================================================================

UTILITY FUNCTIONS

============================================================================

def chunk_list(lst: List[Any], chunk_size: int) -> List[List[Any]]:
"""Split list into chunks of specified size"""
return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]

def merge_dicts(*dicts: Dict) -> Dict:
"""Merge multiple dictionaries"""
result = {}
for d in dicts:
result.update(d)
return result

def format_duration(seconds: float) -> str:
"""Format duration in seconds to human-readable string"""
if seconds < 60:
return f"{seconds:.2f}s"
elif seconds < 3600:
minutes = seconds / 60
return f"{minutes:.2f}m"
else:
hours = seconds / 3600
return f"{hours:.2f}h"

def calculate_percentile(data: List[float], percentile: float) -> float:
"""Calculate percentile value from data"""
if not 0 <= percentile <= 100:
raise ValueError("Percentile must be between 0 and 100")

sorted_data = sorted(data)
index = (len(sorted_data) - 1) * percentile / 100
floor_index = int(index)

if floor_index == len(sorted_data) - 1:
    return sorted_data[floor_index]

fraction = index - floor_index
return sorted_data[floor_index] * (1 - fraction) + sorted_data[floor_index + 1] * fraction

============================================================================

MAIN APPLICATION

============================================================================

class DataAnalysisApp:
"""Main application for data analysis"""

def __init__(self, config: Dict[str, Any]):
    self.config = config
    self.storage = InMemoryStorage()
    self.processors = self._initialize_processors()
    self.fetcher = AsyncDataFetcher(max_concurrent=config.get('max_concurrent', 5))

def _initialize_processors(self) -> List[DataProcessor]:
    """Initialize data processors based on configuration"""
    processors = []
    
    if self.config.get('enable_text_processing'):
        text_config = self.config.get('text_processor_config', {})
        processors.append(TextProcessor(text_config))
    
    if self.config.get('enable_numeric_processing'):
        numeric_config = self.config.get('numeric_processor_config', {})
        processors.append(NumericProcessor(numeric_config))
    
    return processors

@timer
def process_data(self, records: List[DataRecord], batch_size: int = DEFAULT_BATCH_SIZE):
    """Process data records in batches"""
    batches = chunk_list(records, batch_size)
    total_results = []
    
    with DataPipeline("main_pipeline", self.processors) as pipeline:
        for i, batch in enumerate(batches):
            logger.info(f"Processing batch {i + 1}/{len(batches)}")
            result = pipeline.process_batch(batch)
            total_results.append(result)
            
            # Store processed records
            for record in batch:
                self.storage.save(record.to_dict(), record.id)
    
    return self._aggregate_results(total_results)

def _aggregate_results(self, results: List[ProcessingResult]) -> ProcessingResult:
    """Aggregate results from multiple batches"""
    total = sum(r.total_records for r in results)
    successful = sum(r.successful for r in results)
    failed = sum(r.failed for r in results)
    duration = sum(r.duration for r in results)
    errors = [e for r in results for e in r.errors]
    
    return ProcessingResult(
        total_records=total,
        successful=successful,
        failed=failed,
        duration=duration,
        errors=errors
    )

async def fetch_and_process(self, urls: List[str]):
    """Fetch data from URLs and process it"""
    fetched_data = await self.fetcher.fetch_multiple(urls)
    
    records = [
        DataRecord(
            id="",
            timestamp=datetime.now(),
            data=item
        )
        for item in fetched_data
    ]
    
    return self.process_data(records)

def generate_report(self) -> Dict[str, Any]:
    """Generate analysis report"""
    storage_stats = self.storage.get_stats()
    
    return {
        'timestamp': datetime.now().isoformat(),
        'storage_stats': storage_stats,
        'processor_stats': {
            f"processor_{i}": {
                'type': type(p).__name__,
                'processed_count': p.processed_count
            }
            for i, p in enumerate(self.processors)
        }
    }

============================================================================

EXAMPLE USAGE

============================================================================

def main():
"""Example usage of the data analysis application"""

# Configuration
config = {
    'enable_text_processing': True,
    'enable_numeric_processing': True,
    'max_concurrent': 10,
    'text_processor_config': {
        'stop_words': ['the', 'a', 'an', 'and', 'or'],
        'min_length': 3
    },
    'numeric_processor_config': {}
}

# Create application
app = DataAnalysisApp(config)

# Sample data
sample_records = [
    DataRecord(
        id="",
        timestamp=datetime.now(),
        data="This is a sample text for processing and analysis"
    ),
    DataRecord(
        id="",
        timestamp=datetime.now(),
        data=[1.5, 2.3, 3.7, 4.2, 5.8, 6.1, 7.4, 8.9, 9.2, 10.5]
    )
]

# Process data
result = app.process_data(sample_records)

# Print results
print(f"\nProcessing Results:")
print(f"Total Records: {result.total_records}")
print(f"Successful: {result.successful}")
print(f"Failed: {result.failed}")
print(f"Success Rate: {result.success_rate:.2f}%")
print(f"Duration: {format_duration(result.duration)}")

# Generate report
report = app.generate_report()
print(f"\nReport: {json.dumps(report, indent=2)}")

if name == "main":
main()

Examples

No response

Possible workarounds

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions