-
Notifications
You must be signed in to change notification settings - Fork 316
Description
📝 SECURITY FEATURE: Audit Logging System
Summary: Implement a tamper-resistant audit logging system that tracks all security-relevant actions, administrative changes, and data access patterns. This provides forensic capabilities, compliance support, and real-time security monitoring.
Background: Audit logging is critical for:
- Security incident investigation
- Compliance requirements (SOC2, GDPR, HIPAA)
- Detecting insider threats
- Tracking configuration changes
- Monitoring data access patterns
- Forensic analysis
Scope:
- Create structured audit log format with immutable records
- Track all CRUD operations on tools, resources, prompts, and servers
- Log authentication events and authorization decisions
- Monitor API access patterns and rate limit violations
- Implement log integrity verification
- Add log retention and rotation policies
- Create audit log query API
- Add real-time alerting for suspicious patterns
- Configurable hashing algorithm (SHA-256 by default)
Implementation Details:
1. Create Audit Log Models
Add to mcpgateway/models/audit.py
:
from sqlalchemy import Column, Integer, String, DateTime, JSON, Text, Index, BigInteger
from sqlalchemy.dialects.postgresql import UUID
import uuid
from datetime import datetime
import hashlib
import json
from typing import Optional, Dict, Any
from mcpgateway.models.base import Base
class AuditLog(Base):
"""Immutable audit log entries."""
__tablename__ = "audit_logs"
# Primary fields
id = Column(BigInteger, primary_key=True, autoincrement=True)
event_id = Column(UUID(as_uuid=True), default=uuid.uuid4, unique=True, nullable=False)
timestamp = Column(DateTime, default=datetime.utcnow, nullable=False, index=True)
# Event classification
event_type = Column(String(50), nullable=False, index=True) # auth, crud, access, security
event_action = Column(String(50), nullable=False, index=True) # login, create, update, delete, view
event_result = Column(String(20), nullable=False, index=True) # success, failure, error
severity = Column(String(20), nullable=False, default="info") # debug, info, warning, error, critical
# Actor information
actor_id = Column(String(255), index=True) # User ID or system
actor_type = Column(String(50)) # user, system, api_key
actor_ip = Column(String(45), index=True) # Support IPv6
actor_user_agent = Column(Text)
# Target information
target_type = Column(String(50), index=True) # tool, resource, prompt, server, user
target_id = Column(String(255), index=True)
target_name = Column(String(255))
# Request context
request_id = Column(String(255), index=True) # X-Request-ID
session_id = Column(String(255), index=True)
api_endpoint = Column(String(255))
http_method = Column(String(10))
# Event details
details = Column(JSON) # Flexible additional data
error_message = Column(Text) # For failed operations
# Change tracking
old_values = Column(JSON) # For update operations
new_values = Column(JSON) # For create/update operations
# Integrity
checksum = Column(String(64), nullable=False) # SHA-256 hash
previous_checksum = Column(String(64)) # Chain logs together
# Indexes for common queries
__table_args__ = (
Index('idx_audit_timestamp_type', 'timestamp', 'event_type'),
Index('idx_audit_actor_timestamp', 'actor_id', 'timestamp'),
Index('idx_audit_target', 'target_type', 'target_id'),
Index('idx_audit_severity_timestamp', 'severity', 'timestamp'),
)
def calculate_checksum(self) -> str:
"""Calculate SHA-256 checksum of the log entry."""
# Create deterministic string representation
data = {
'event_id': str(self.event_id),
'timestamp': self.timestamp.isoformat() if self.timestamp else '',
'event_type': self.event_type,
'event_action': self.event_action,
'event_result': self.event_result,
'actor_id': self.actor_id or '',
'target_type': self.target_type or '',
'target_id': self.target_id or '',
'details': json.dumps(self.details, sort_keys=True) if self.details else '',
'previous_checksum': self.previous_checksum or ''
}
# Create checksum
content = json.dumps(data, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
def verify_integrity(self) -> bool:
"""Verify the integrity of this log entry."""
return self.checksum == self.calculate_checksum()
class AuditLogArchive(Base):
"""Archived audit logs for long-term storage."""
__tablename__ = "audit_log_archives"
id = Column(BigInteger, primary_key=True)
archived_at = Column(DateTime, default=datetime.utcnow, nullable=False)
start_date = Column(DateTime, nullable=False)
end_date = Column(DateTime, nullable=False)
record_count = Column(Integer, nullable=False)
compressed_data = Column(Text) # JSON compressed with zlib
checksum = Column(String(64), nullable=False)
__table_args__ = (
Index('idx_archive_dates', 'start_date', 'end_date'),
)
2. Create Audit Logger Service
Create mcpgateway/services/audit_logger.py
:
import asyncio
from typing import Optional, Dict, Any, List
from datetime import datetime, timedelta
from contextlib import asynccontextmanager
import json
import zlib
from sqlalchemy import select, and_, func
from sqlalchemy.ext.asyncio import AsyncSession
from mcpgateway.models.audit import AuditLog, AuditLogArchive
from mcpgateway.database import get_db
from mcpgateway.config import settings
from mcpgateway.logging import get_logger
logger = get_logger(__name__)
class AuditLogger:
"""Service for creating and managing audit logs."""
def __init__(self):
self._queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
self._batch_size = settings.audit_log_batch_size
self._flush_interval = settings.audit_log_flush_interval
self._worker_task = None
self._last_checksum = None
async def start(self):
"""Start the audit logger background worker."""
self._worker_task = asyncio.create_task(self._process_queue())
logger.info("Audit logger started")
async def stop(self):
"""Stop the audit logger."""
if self._worker_task:
self._worker_task.cancel()
try:
await self._worker_task
except asyncio.CancelledError:
pass
logger.info("Audit logger stopped")
async def log(
self,
event_type: str,
event_action: str,
event_result: str = "success",
severity: str = "info",
actor_id: Optional[str] = None,
actor_type: Optional[str] = None,
actor_ip: Optional[str] = None,
actor_user_agent: Optional[str] = None,
target_type: Optional[str] = None,
target_id: Optional[str] = None,
target_name: Optional[str] = None,
request_id: Optional[str] = None,
session_id: Optional[str] = None,
api_endpoint: Optional[str] = None,
http_method: Optional[str] = None,
details: Optional[Dict[Any, Any]] = None,
error_message: Optional[str] = None,
old_values: Optional[Dict[Any, Any]] = None,
new_values: Optional[Dict[Any, Any]] = None,
):
"""Create an audit log entry."""
entry = AuditLog(
event_type=event_type,
event_action=event_action,
event_result=event_result,
severity=severity,
actor_id=actor_id,
actor_type=actor_type,
actor_ip=actor_ip,
actor_user_agent=actor_user_agent,
target_type=target_type,
target_id=target_id,
target_name=target_name,
request_id=request_id,
session_id=session_id,
api_endpoint=api_endpoint,
http_method=http_method,
details=details,
error_message=error_message,
old_values=old_values,
new_values=new_values,
previous_checksum=self._last_checksum
)
# Calculate checksum
entry.checksum = entry.calculate_checksum()
# Queue for batch processing
try:
await asyncio.wait_for(
self._queue.put(entry),
timeout=1.0
)
except asyncio.TimeoutError:
# If queue is full, log directly (fallback)
logger.warning("Audit queue full, logging directly")
await self._write_logs([entry])
async def _process_queue(self):
"""Background worker to process audit log queue."""
batch = []
last_flush = datetime.utcnow()
while True:
try:
# Wait for items with timeout
timeout = self._flush_interval - (datetime.utcnow() - last_flush).total_seconds()
timeout = max(0.1, timeout)
try:
entry = await asyncio.wait_for(
self._queue.get(),
timeout=timeout
)
batch.append(entry)
except asyncio.TimeoutError:
pass
# Flush if batch is full or timeout reached
should_flush = (
len(batch) >= self._batch_size or
(datetime.utcnow() - last_flush).total_seconds() >= self._flush_interval
)
if should_flush and batch:
await self._write_logs(batch)
batch = []
last_flush = datetime.utcnow()
except asyncio.CancelledError:
# Flush remaining logs before shutdown
if batch:
await self._write_logs(batch)
raise
except Exception as e:
logger.error(f"Error in audit logger: {e}")
await asyncio.sleep(1)
async def _write_logs(self, entries: List[AuditLog]):
"""Write audit logs to database."""
async with get_db() as db:
try:
db.add_all(entries)
await db.commit()
# Update last checksum for chaining
if entries:
self._last_checksum = entries[-1].checksum
logger.debug(f"Wrote {len(entries)} audit logs")
except Exception as e:
logger.error(f"Failed to write audit logs: {e}")
await db.rollback()
# TODO: Write to fallback file
async def query(
self,
event_type: Optional[str] = None,
event_action: Optional[str] = None,
actor_id: Optional[str] = None,
target_type: Optional[str] = None,
target_id: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
severity: Optional[str] = None,
limit: int = 100,
offset: int = 0
) -> List[AuditLog]:
"""Query audit logs with filters."""
async with get_db() as db:
query = select(AuditLog)
# Apply filters
conditions = []
if event_type:
conditions.append(AuditLog.event_type == event_type)
if event_action:
conditions.append(AuditLog.event_action == event_action)
if actor_id:
conditions.append(AuditLog.actor_id == actor_id)
if target_type:
conditions.append(AuditLog.target_type == target_type)
if target_id:
conditions.append(AuditLog.target_id == target_id)
if start_time:
conditions.append(AuditLog.timestamp >= start_time)
if end_time:
conditions.append(AuditLog.timestamp <= end_time)
if severity:
conditions.append(AuditLog.severity == severity)
if conditions:
query = query.where(and_(*conditions))
# Order and paginate
query = query.order_by(AuditLog.timestamp.desc())
query = query.limit(limit).offset(offset)
result = await db.execute(query)
return result.scalars().all()
async def verify_integrity(
self,
start_time: datetime,
end_time: datetime
) -> Dict[str, Any]:
"""Verify integrity of audit logs in a time range."""
async with get_db() as db:
query = select(AuditLog).where(
and_(
AuditLog.timestamp >= start_time,
AuditLog.timestamp <= end_time
)
).order_by(AuditLog.timestamp)
result = await db.execute(query)
logs = result.scalars().all()
total = len(logs)
valid = 0
invalid = []
chain_breaks = []
previous_checksum = None
for log in logs:
# Verify individual log integrity
if log.verify_integrity():
valid += 1
else:
invalid.append({
'event_id': str(log.event_id),
'timestamp': log.timestamp.isoformat()
})
# Verify chain integrity
if previous_checksum and log.previous_checksum != previous_checksum:
chain_breaks.append({
'event_id': str(log.event_id),
'timestamp': log.timestamp.isoformat(),
'expected': previous_checksum,
'actual': log.previous_checksum
})
previous_checksum = log.checksum
return {
'total_logs': total,
'valid_logs': valid,
'invalid_logs': len(invalid),
'chain_breaks': len(chain_breaks),
'integrity_percentage': (valid / total * 100) if total > 0 else 100,
'invalid_entries': invalid[:10], # First 10
'chain_break_points': chain_breaks[:10]
}
# Global audit logger instance
audit_logger = AuditLogger()
3. Create Audit Middleware
Create mcpgateway/middleware/audit_middleware.py
:
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
import time
import json
import uuid
from mcpgateway.services.audit_logger import audit_logger
from mcpgateway.config import settings
from mcpgateway.auth import get_current_user
from mcpgateway.logging import get_logger
logger = get_logger(__name__)
class AuditMiddleware(BaseHTTPMiddleware):
"""Middleware to audit all API requests."""
# Endpoints that should be audited
AUDIT_PATHS = {
'/admin/': 'admin',
'/api/': 'api',
'/rpc': 'rpc',
'/auth/': 'auth'
}
# Sensitive paths that need detailed logging
SENSITIVE_PATHS = {
'/admin/tools': 'tool',
'/admin/resources': 'resource',
'/admin/prompts': 'prompt',
'/admin/servers': 'server',
'/admin/gateways': 'gateway',
'/auth/login': 'auth',
'/auth/logout': 'auth'
}
async def dispatch(self, request: Request, call_next):
# Skip if audit logging is disabled
if not settings.audit_log_enabled:
return await call_next(request)
# Check if this path should be audited
should_audit = any(
request.url.path.startswith(path)
for path in self.AUDIT_PATHS
)
if not should_audit:
return await call_next(request)
# Generate request ID if not present
request_id = request.headers.get('X-Request-ID', str(uuid.uuid4()))
request.state.request_id = request_id
# Track request timing
start_time = time.time()
# Get user info
user_info = await self._get_user_info(request)
# Capture request body for POST/PUT/DELETE
request_body = None
if request.method in ['POST', 'PUT', 'PATCH', 'DELETE']:
try:
body = await request.body()
if body:
request_body = json.loads(body)
# Recreate request with body
from starlette.datastructures import Headers
from starlette.requests import Request as StarletteRequest
async def receive():
return {"type": "http.request", "body": body}
request = StarletteRequest(
request.scope,
receive=receive,
send=request._send
)
except:
pass
# Process request
response = await call_next(request)
# Calculate response time
response_time = time.time() - start_time
# Determine event details
event_type, target_type = self._determine_event_type(request.url.path)
event_action = self._determine_action(request.method, request.url.path)
event_result = "success" if 200 <= response.status_code < 400 else "failure"
severity = self._determine_severity(response.status_code, event_type)
# Extract target information
target_id = self._extract_target_id(request.url.path)
# Build audit details
details = {
'response_time_ms': round(response_time * 1000, 2),
'status_code': response.status_code,
'path': request.url.path,
'query_params': dict(request.query_params) if request.query_params else None
}
# Add body for sensitive operations
if event_type in ['admin', 'auth'] and request_body:
# Sanitize sensitive data
sanitized_body = self._sanitize_request_body(request_body)
details['request_body'] = sanitized_body
# Log the request
await audit_logger.log(
event_type=event_type,
event_action=event_action,
event_result=event_result,
severity=severity,
actor_id=user_info.get('user_id'),
actor_type=user_info.get('user_type'),
actor_ip=request.client.host if request.client else None,
actor_user_agent=request.headers.get('User-Agent'),
target_type=target_type,
target_id=target_id,
request_id=request_id,
session_id=user_info.get('session_id'),
api_endpoint=request.url.path,
http_method=request.method,
details=details,
error_message=None if event_result == "success" else f"HTTP {response.status_code}"
)
# Add request ID to response headers
response.headers['X-Request-ID'] = request_id
return response
async def _get_user_info(self, request: Request) -> dict:
"""Extract user information from request."""
try:
# Try to get authenticated user
user = await get_current_user(request)
if user:
return {
'user_id': user.id,
'user_type': 'user',
'session_id': getattr(request.state, 'session_id', None)
}
except:
pass
# Check for API key
if 'X-API-Key' in request.headers:
return {
'user_id': request.headers['X-API-Key'][:8] + '...',
'user_type': 'api_key',
'session_id': None
}
return {
'user_id': 'anonymous',
'user_type': 'anonymous',
'session_id': None
}
def _determine_event_type(self, path: str) -> tuple[str, str]:
"""Determine event type and target type from path."""
if path.startswith('/auth/'):
return 'auth', 'user'
elif path.startswith('/admin/tools'):
return 'admin', 'tool'
elif path.startswith('/admin/resources'):
return 'admin', 'resource'
elif path.startswith('/admin/prompts'):
return 'admin', 'prompt'
elif path.startswith('/admin/servers'):
return 'admin', 'server'
elif path.startswith('/admin/gateways'):
return 'admin', 'gateway'
elif path.startswith('/admin/'):
return 'admin', 'system'
elif path.startswith('/api/'):
return 'api', 'data'
elif path.startswith('/rpc'):
return 'rpc', 'method'
else:
return 'access', 'resource'
def _determine_action(self, method: str, path: str) -> str:
"""Determine action from HTTP method and path."""
if '/login' in path:
return 'login'
elif '/logout' in path:
return 'logout'
method_map = {
'GET': 'view',
'POST': 'create',
'PUT': 'update',
'PATCH': 'update',
'DELETE': 'delete'
}
return method_map.get(method, 'access')
def _determine_severity(self, status_code: int, event_type: str) -> str:
"""Determine severity based on status code and event type."""
if status_code >= 500:
return 'error'
elif status_code >= 400:
return 'warning'
elif event_type in ['auth', 'admin']:
return 'info'
else:
return 'debug'
def _extract_target_id(self, path: str) -> Optional[str]:
"""Extract target ID from path."""
parts = path.strip('/').split('/')
if len(parts) >= 3:
# Assume ID is after resource type (e.g., /admin/tools/{id})
return parts[2]
return None
def _sanitize_request_body(self, body: dict) -> dict:
"""Remove sensitive data from request body."""
sensitive_fields = {
'password', 'secret', 'token', 'api_key',
'private_key', 'auth', 'authorization'
}
sanitized = {}
for key, value in body.items():
if any(field in key.lower() for field in sensitive_fields):
sanitized[key] = '***REDACTED***'
elif isinstance(value, dict):
sanitized[key] = self._sanitize_request_body(value)
else:
sanitized[key] = value
return sanitized
4. Add Audit Log API Endpoints
Create mcpgateway/api/audit.py
:
from fastapi import APIRouter, Depends, Query, HTTPException
from datetime import datetime, timedelta
from typing import Optional, List
from pydantic import BaseModel
from mcpgateway.services.audit_logger import audit_logger
from mcpgateway.auth import require_admin
from mcpgateway.models.audit import AuditLog
router = APIRouter(prefix="/api/audit", tags=["audit"])
class AuditLogResponse(BaseModel):
"""Audit log response model."""
event_id: str
timestamp: datetime
event_type: str
event_action: str
event_result: str
severity: str
actor_id: Optional[str]
actor_ip: Optional[str]
target_type: Optional[str]
target_id: Optional[str]
target_name: Optional[str]
api_endpoint: Optional[str]
details: Optional[dict]
class Config:
from_attributes = True
class AuditIntegrityResponse(BaseModel):
"""Audit integrity check response."""
total_logs: int
valid_logs: int
invalid_logs: int
chain_breaks: int
integrity_percentage: float
invalid_entries: List[dict]
chain_break_points: List[dict]
@router.get("/logs", response_model=List[AuditLogResponse])
async def get_audit_logs(
event_type: Optional[str] = None,
event_action: Optional[str] = None,
actor_id: Optional[str] = None,
target_type: Optional[str] = None,
target_id: Optional[str] = None,
severity: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
_: None = Depends(require_admin)
):
"""Query audit logs with filters."""
# Default time range if not specified
if not end_time:
end_time = datetime.utcnow()
if not start_time:
start_time = end_time - timedelta(days=7)
logs = await audit_logger.query(
event_type=event_type,
event_action=event_action,
actor_id=actor_id,
target_type=target_type,
target_id=target_id,
start_time=start_time,
end_time=end_time,
severity=severity,
limit=limit,
offset=offset
)
return [AuditLogResponse.from_orm(log) for log in logs]
@router.get("/integrity", response_model=AuditIntegrityResponse)
async def check_audit_integrity(
start_time: datetime = Query(..., description="Start time for integrity check"),
end_time: datetime = Query(..., description="End time for integrity check"),
_: None = Depends(require_admin)
):
"""Verify integrity of audit logs in a time range."""
if end_time <= start_time:
raise HTTPException(400, "End time must be after start time")
if (end_time - start_time).days > 31:
raise HTTPException(400, "Time range cannot exceed 31 days")
result = await audit_logger.verify_integrity(start_time, end_time)
return AuditIntegrityResponse(**result)
@router.get("/export")
async def export_audit_logs(
format: str = Query("json", regex="^(json|csv)$"),
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
_: None = Depends(require_admin)
):
"""Export audit logs in specified format."""
# Implementation for CSV/JSON export
pass
5. Add Configuration Settings
Add to mcpgateway/config.py
:
# Audit Logging Settings
audit_log_enabled: bool = True
audit_log_batch_size: int = 100
audit_log_flush_interval: float = 5.0 # seconds
audit_log_retention_days: int = 90
audit_log_archive_after_days: int = 30
audit_log_integrity_checks: bool = True
audit_log_encryption: bool = False
audit_log_encryption_key: Optional[str] = None
# Audit Alert Settings
audit_alert_enabled: bool = True
audit_alert_failed_logins: int = 5 # Alert after N failed logins
audit_alert_privilege_escalation: bool = True
audit_alert_data_exfiltration: bool = True
audit_alert_config_changes: bool = True
6. Add to .env.example
#####################################
# Audit Logging Configuration
#####################################
# Enable audit logging
AUDIT_LOG_ENABLED=true
# Batch settings for performance
AUDIT_LOG_BATCH_SIZE=100
AUDIT_LOG_FLUSH_INTERVAL=5.0
# Retention settings
AUDIT_LOG_RETENTION_DAYS=90
AUDIT_LOG_ARCHIVE_AFTER_DAYS=30
# Security settings
AUDIT_LOG_INTEGRITY_CHECKS=true
AUDIT_LOG_ENCRYPTION=false
# AUDIT_LOG_ENCRYPTION_KEY=your-32-byte-key-here
# Alert settings
AUDIT_ALERT_ENABLED=true
AUDIT_ALERT_FAILED_LOGINS=5
AUDIT_ALERT_PRIVILEGE_ESCALATION=true
AUDIT_ALERT_DATA_EXFILTRATION=true
AUDIT_ALERT_CONFIG_CHANGES=true
7. Create Audit Dashboard Component
Create mcpgateway/templates/audit_dashboard.html
:
<div id="audit-dashboard" class="space-y-6">
<h2 class="text-2xl font-bold dark:text-gray-200">Audit Log Dashboard</h2>
<!-- Summary Cards -->
<div class="grid grid-cols-1 md:grid-cols-4 gap-4">
<div class="bg-white rounded-lg shadow p-4 dark:bg-gray-800">
<h3 class="text-lg font-medium text-gray-700 dark:text-gray-300">Total Events</h3>
<p class="text-3xl font-bold text-gray-900 dark:text-gray-100" id="total-events">0</p>
</div>
<div class="bg-white rounded-lg shadow p-4 dark:bg-gray-800">
<h3 class="text-lg font-medium text-gray-700 dark:text-gray-300">Failed Logins</h3>
<p class="text-3xl font-bold text-red-600" id="failed-logins">0</p>
</div>
<div class="bg-white rounded-lg shadow p-4 dark:bg-gray-800">
<h3 class="text-lg font-medium text-gray-700 dark:text-gray-300">Config Changes</h3>
<p class="text-3xl font-bold text-yellow-600" id="config-changes">0</p>
</div>
<div class="bg-white rounded-lg shadow p-4 dark:bg-gray-800">
<h3 class="text-lg font-medium text-gray-700 dark:text-gray-300">Active Users</h3>
<p class="text-3xl font-bold text-green-600" id="active-users">0</p>
</div>
</div>
<!-- Filters -->
<div class="bg-white rounded-lg shadow p-4 dark:bg-gray-800">
<h3 class="text-lg font-medium mb-4 dark:text-gray-200">Filter Logs</h3>
<div class="grid grid-cols-1 md:grid-cols-4 gap-4">
<select id="event-type-filter" class="rounded-md border-gray-300 dark:bg-gray-700 dark:border-gray-600">
<option value="">All Event Types</option>
<option value="auth">Authentication</option>
<option value="admin">Administration</option>
<option value="api">API Access</option>
<option value="security">Security</option>
</select>
<select id="severity-filter" class="rounded-md border-gray-300 dark:bg-gray-700 dark:border-gray-600">
<option value="">All Severities</option>
<option value="critical">Critical</option>
<option value="error">Error</option>
<option value="warning">Warning</option>
<option value="info">Info</option>
</select>
<input type="datetime-local" id="start-time-filter" class="rounded-md border-gray-300 dark:bg-gray-700 dark:border-gray-600">
<button onclick="refreshAuditLogs()" class="bg-indigo-600 text-white rounded-md px-4 py-2 hover:bg-indigo-700">
Refresh
</button>
</div>
</div>
<!-- Log Table -->
<div class="bg-white rounded-lg shadow overflow-hidden dark:bg-gray-800">
<table class="min-w-full divide-y divide-gray-200 dark:divide-gray-700">
<thead class="bg-gray-50 dark:bg-gray-700">
<tr>
<th class="px-6 py-3 text-left text-xs font-medium text-gray-500 dark:text-gray-300 uppercase">Timestamp</th>
<th class="px-6 py-3 text-left text-xs font-medium text-gray-500 dark:text-gray-300 uppercase">Event</th>
<th class="px-6 py-3 text-left text-xs font-medium text-gray-500 dark:text-gray-300 uppercase">Actor</th>
<th class="px-6 py-3 text-left text-xs font-medium text-gray-500 dark:text-gray-300 uppercase">Target</th>
<th class="px-6 py-3 text-left text-xs font-medium text-gray-500 dark:text-gray-300 uppercase">Result</th>
<th class="px-6 py-3 text-left text-xs font-medium text-gray-500 dark:text-gray-300 uppercase">Details</th>
</tr>
</thead>
<tbody id="audit-log-tbody" class="bg-white divide-y divide-gray-200 dark:bg-gray-900 dark:divide-gray-700">
<!-- Logs will be populated here -->
</tbody>
</table>
</div>
</div>
<script>
async function refreshAuditLogs() {
const eventType = document.getElementById('event-type-filter').value;
const severity = document.getElementById('severity-filter').value;
const startTime = document.getElementById('start-time-filter').value;
const params = new URLSearchParams();
if (eventType) params.append('event_type', eventType);
if (severity) params.append('severity', severity);
if (startTime) params.append('start_time', startTime);
try {
const response = await fetch(`${window.ROOT_PATH}/api/audit/logs?${params}`);
const logs = await response.json();
const tbody = document.getElementById('audit-log-tbody');
tbody.innerHTML = '';
logs.forEach(log => {
const row = tbody.insertRow();
row.innerHTML = `
<td class="px-6 py-4 whitespace-nowrap text-sm text-gray-900 dark:text-gray-100">
${new Date(log.timestamp).toLocaleString()}
</td>
<td class="px-6 py-4 whitespace-nowrap text-sm text-gray-900 dark:text-gray-100">
${log.event_type}/${log.event_action}
</td>
<td class="px-6 py-4 whitespace-nowrap text-sm text-gray-900 dark:text-gray-100">
${log.actor_id || 'System'}
</td>
<td class="px-6 py-4 whitespace-nowrap text-sm text-gray-900 dark:text-gray-100">
${log.target_type}/${log.target_id || 'N/A'}
</td>
<td class="px-6 py-4 whitespace-nowrap">
<span class="px-2 inline-flex text-xs leading-5 font-semibold rounded-full
${log.event_result === 'success' ? 'bg-green-100 text-green-800' : 'bg-red-100 text-red-800'}">
${log.event_result}
</span>
</td>
<td class="px-6 py-4 text-sm text-gray-900 dark:text-gray-100">
<button onclick='viewAuditDetails(${JSON.stringify(log)})'
class="text-indigo-600 hover:text-indigo-900 dark:text-indigo-400">
View
</button>
</td>
`;
});
updateDashboardStats(logs);
} catch (error) {
console.error('Failed to fetch audit logs:', error);
}
}
function updateDashboardStats(logs) {
document.getElementById('total-events').textContent = logs.length;
const failedLogins = logs.filter(l =>
l.event_action === 'login' && l.event_result === 'failure'
).length;
document.getElementById('failed-logins').textContent = failedLogins;
const configChanges = logs.filter(l =>
l.event_type === 'admin' && ['create', 'update', 'delete'].includes(l.event_action)
).length;
document.getElementById('config-changes').textContent = configChanges;
const uniqueUsers = new Set(logs.map(l => l.actor_id).filter(Boolean)).size;
document.getElementById('active-users').textContent = uniqueUsers;
}
function viewAuditDetails(log) {
// Show detailed log information in modal
console.log('View details:', log);
}
// Initial load
refreshAuditLogs();
</script>
8. Add Startup Integration
Update mcpgateway/main.py
:
from mcpgateway.services.audit_logger import audit_logger
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager."""
# Startup
try:
# Start audit logger
await audit_logger.start()
# Log startup
await audit_logger.log(
event_type="system",
event_action="startup",
event_result="success",
severity="info",
actor_id="system",
actor_type="system",
details={"version": settings.app_version}
)
yield
finally:
# Log shutdown
await audit_logger.log(
event_type="system",
event_action="shutdown",
event_result="success",
severity="info",
actor_id="system",
actor_type="system"
)
# Stop audit logger
await audit_logger.stop()
Testing Checklist:
- Test audit log creation for all CRUD operations
- Verify authentication events are logged
- Test batch processing performance
- Verify log integrity checks
- Test log retention and archival
- Verify sensitive data sanitization
- Test audit log queries and filtering
- Verify real-time dashboard updates
- Test export functionality
- Verify checksum chain integrity
Security Considerations:
- Audit logs are immutable once written
- Checksum chain prevents tampering
- Sensitive data is sanitized
- Admin access required for viewing logs
- Optional encryption for logs at rest
Performance Impact:
- Asynchronous batch processing
- Minimal request latency impact
- Configurable batch size and flush interval
- Database indexes for fast queries
Compliance Features:
- Tamper-evident logging
- Complete audit trail
- Data retention policies
- Export capabilities
- Integrity verification
Files to Create/Modify:
mcpgateway/models/audit.py
- Audit log database modelsmcpgateway/services/audit_logger.py
- Core audit logging servicemcpgateway/middleware/audit_middleware.py
- Request auditingmcpgateway/api/audit.py
- Audit log query APImcpgateway/templates/audit_dashboard.html
- Admin UI componentmcpgateway/config.py
- Add audit configuration.env.example
- Add audit environment variablesmcpgateway/main.py
- Integrate audit logger startup
Priority: High — Audit logging is essential for:
- Security incident investigation
- Compliance requirements
- Detecting insider threats
- Forensic analysis