@@ -2888,64 +2888,96 @@ def get_optimization_report(self, script_path: str, days: int = 30) -> str:
28882888
28892889class ScheduledTask :
28902890 """Represents a scheduled task"""
2891-
2891+
28922892 def __init__ (self , task_id : str , script_path : str , schedule : Optional [str ] = None ,
28932893 cron_expr : Optional [str ] = None , trigger_events : Optional [List [str ]] = None ,
2894- enabled : bool = True ):
2894+ enabled : bool = True , script_args : Optional [List [str ]] = None ,
2895+ dependencies : Optional [List [str ]] = None ):
28952896 """Initialize scheduled task
2896-
2897+
28972898 Args:
28982899 task_id: Unique task identifier
28992900 script_path: Path to script to execute
29002901 schedule: Simple schedule (e.g., 'daily', 'hourly', 'every_5min')
29012902 cron_expr: Cron expression for complex schedules
29022903 trigger_events: Event names that trigger execution
29032904 enabled: Whether task is enabled
2905+ script_args: Arguments to pass to the script during execution
2906+ dependencies: Other task IDs that must complete successfully first
29042907 """
29052908 self .task_id = task_id
29062909 self .script_path = script_path
29072910 self .schedule = schedule
29082911 self .cron_expr = cron_expr
29092912 self .trigger_events = trigger_events or []
29102913 self .enabled = enabled
2914+ self .script_args = script_args or []
2915+ self .dependencies = dependencies or []
29112916 self .last_run : Optional [datetime ] = None
29122917 self .next_run : Optional [datetime ] = None
29132918 self .run_count = 0
2914- self .last_status = None
2919+ self .last_status : Optional [str ] = None
2920+ self .last_error : Optional [str ] = None
29152921
29162922
29172923class TaskScheduler :
29182924 """Manages scheduled script execution and event-driven triggers"""
2919-
2920- def __init__ (self , logger : Optional [logging .Logger ] = None ):
2925+
2926+ def __init__ (self , logger : Optional [logging .Logger ] = None , history_db : Optional [ str ] = None ):
29212927 """Initialize scheduler
2922-
2928+
29232929 Args:
29242930 logger: Logger instance
2931+ history_db: Optional history database path passed to ScriptRunner
29252932 """
29262933 self .logger = logger or logging .getLogger (__name__ )
29272934 self .tasks = {}
29282935 self .events = {}
2929- self .triggered_tasks = []
2930-
2936+ self .triggered_tasks : List [str ] = []
2937+ self .history_db = history_db
2938+ self .execution_log : List [Dict [str , Any ]] = []
2939+
29312940 def add_scheduled_task (self , task_id : str , script_path : str ,
2932- schedule : Optional [str ] = None , cron_expr : Optional [str ] = None ) -> ScheduledTask :
2941+ schedule : Optional [str ] = None , cron_expr : Optional [str ] = None ,
2942+ script_args : Optional [List [str ]] = None ,
2943+ dependencies : Optional [List [str ]] = None ) -> ScheduledTask :
29332944 """Add a scheduled task
2934-
2945+
29352946 Args:
29362947 task_id: Unique identifier
29372948 script_path: Script to run
29382949 schedule: Simple schedule string
29392950 cron_expr: Cron expression
2940-
2951+ script_args: Arguments for the script
2952+ dependencies: List of prerequisite task IDs
2953+
29412954 Returns:
29422955 ScheduledTask object
29432956 """
2944- task = ScheduledTask (task_id , script_path , schedule , cron_expr )
2957+ task = ScheduledTask (task_id , script_path , schedule , cron_expr , script_args = script_args ,
2958+ dependencies = dependencies )
29452959 self .tasks [task_id ] = task
29462960 self ._calculate_next_run (task )
29472961 self .logger .info (f"Added task '{ task_id } ': { script_path } " )
29482962 return task
2963+
2964+ def add_dependencies (self , task_id : str , dependencies : List [str ]) -> bool :
2965+ """Register dependencies for an existing task.
2966+
2967+ Args:
2968+ task_id: Task that should wait on dependencies
2969+ dependencies: Other task IDs that must complete successfully
2970+
2971+ Returns:
2972+ bool: True if dependencies were added
2973+ """
2974+ if task_id not in self .tasks :
2975+ self .logger .error (f"Task '{ task_id } ' not found" )
2976+ return False
2977+
2978+ self .tasks [task_id ].dependencies = list (dependencies )
2979+ self .logger .info (f"Task '{ task_id } ' dependencies set: { ', ' .join (dependencies )} " )
2980+ return True
29492981
29502982 def add_event_trigger (self , task_id : str , event_name : str ) -> bool :
29512983 """Add event trigger for a task
@@ -2982,7 +3014,15 @@ def trigger_event(self, event_name: str) -> List[str]:
29823014 tasks = self .events .get (event_name , [])
29833015 self .logger .info (f"Event '{ event_name } ' triggered: { len (tasks )} tasks" )
29843016 return tasks
2985-
3017+
3018+ def _dependencies_satisfied (self , task : ScheduledTask ) -> bool :
3019+ """Check whether all dependencies for a task are successful."""
3020+ for dep_id in task .dependencies :
3021+ dep = self .tasks .get (dep_id )
3022+ if not dep or dep .last_status != "success" :
3023+ return False
3024+ return True
3025+
29863026 def get_due_tasks (self ) -> List [ScheduledTask ]:
29873027 """Get tasks that are due for execution
29883028
@@ -3015,6 +3055,89 @@ def mark_executed(self, task_id: str, status: str = "success"):
30153055 task .run_count += 1
30163056 self ._calculate_next_run (task )
30173057 self .logger .info (f"Task '{ task_id } ' executed: { status } " )
3058+
3059+ def run_task (self , task_id : str , runner_factory : Optional [Callable [..., Any ]] = None ,
3060+ runner_kwargs : Optional [Dict [str , Any ]] = None ) -> Dict [str , Any ]:
3061+ """Execute a task immediately using the provided runner factory."""
3062+ if task_id not in self .tasks :
3063+ raise ValueError (f"Task '{ task_id } ' not found" )
3064+
3065+ task = self .tasks [task_id ]
3066+ runner_kwargs = runner_kwargs or {}
3067+ runner_factory = runner_factory or ScriptRunner
3068+
3069+ if not self ._dependencies_satisfied (task ):
3070+ self .logger .info (f"Task '{ task_id } ' skipped: waiting on dependencies" )
3071+ return {"task_id" : task_id , "status" : "skipped" , "reason" : "dependencies_pending" }
3072+
3073+ try :
3074+ runner = runner_factory (
3075+ task .script_path ,
3076+ script_args = task .script_args ,
3077+ history_db = self .history_db ,
3078+ ** runner_kwargs ,
3079+ )
3080+ execution = runner .run_script ()
3081+ status = "success" if execution .get ("returncode" ) == 0 else "failed"
3082+ error = execution .get ("stderr" ) if status == "failed" else None
3083+ except Exception as exc :
3084+ execution = None
3085+ status = "failed"
3086+ error = str (exc )
3087+
3088+ task .last_error = error
3089+ self .mark_executed (task_id , status )
3090+
3091+ log_entry = {
3092+ "task_id" : task_id ,
3093+ "status" : status ,
3094+ "timestamp" : datetime .now ().isoformat (),
3095+ "error" : error ,
3096+ "next_run" : task .next_run .isoformat () if task .next_run else None ,
3097+ }
3098+ self .execution_log .append (log_entry )
3099+
3100+ if status != "success" :
3101+ self .logger .error (f"Task '{ task_id } ' failed: { error } " )
3102+ else :
3103+ self .logger .info (f"Task '{ task_id } ' completed successfully" )
3104+
3105+ return {
3106+ "task_id" : task_id ,
3107+ "status" : status ,
3108+ "error" : error ,
3109+ "metrics" : execution .get ("metrics" ) if execution else None ,
3110+ }
3111+
3112+ def run_due_tasks (self , runner_factory : Optional [Callable [..., Any ]] = None ,
3113+ runner_kwargs : Optional [Dict [str , Any ]] = None ,
3114+ stop_on_error : bool = False ) -> List [Dict [str , Any ]]:
3115+ """Execute all due tasks whose dependencies are satisfied."""
3116+ results : List [Dict [str , Any ]] = []
3117+ pending = {task .task_id : task for task in self .get_due_tasks ()}
3118+
3119+ runner_factory = runner_factory or ScriptRunner
3120+ runner_kwargs = runner_kwargs or {}
3121+
3122+ while pending :
3123+ progressed = False
3124+ for task_id , task in list (pending .items ()):
3125+ if not self ._dependencies_satisfied (task ):
3126+ continue
3127+
3128+ result = self .run_task (task_id , runner_factory = runner_factory , runner_kwargs = runner_kwargs )
3129+ results .append (result )
3130+ progressed = True
3131+ pending .pop (task_id , None )
3132+
3133+ if stop_on_error and result .get ("status" ) != "success" :
3134+ return results
3135+
3136+ if not progressed :
3137+ self .logger .info ("No further progress possible; remaining tasks waiting on dependencies" )
3138+ break
3139+
3140+ return results
30183141
30193142 def _calculate_next_run (self , task : ScheduledTask ):
30203143 """Calculate next run time for task
@@ -3045,6 +3168,14 @@ def _calculate_next_run(self, task: ScheduledTask):
30453168 task .next_run = now + timedelta (seconds = amount )
30463169 except Exception as e :
30473170 self .logger .error (f"Error parsing schedule '{ task .schedule } ': { e } " )
3171+ elif task .cron_expr :
3172+ try :
3173+ from croniter import croniter # type: ignore
3174+
3175+ iterator = croniter (task .cron_expr , now )
3176+ task .next_run = iterator .get_next (datetime )
3177+ except Exception as e :
3178+ self .logger .error (f"Error parsing cron expression '{ task .cron_expr } ': { e } " )
30483179 else :
30493180 task .next_run = now + timedelta (hours = 1 ) # Default to 1 hour
30503181
@@ -3069,7 +3200,9 @@ def get_task_status(self, task_id: str) -> Dict:
30693200 "next_run" : task .next_run .isoformat () if task .next_run else None ,
30703201 "run_count" : task .run_count ,
30713202 "last_status" : task .last_status ,
3072- "triggers" : task .trigger_events
3203+ "triggers" : task .trigger_events ,
3204+ "dependencies" : task .dependencies ,
3205+ "last_error" : task .last_error ,
30733206 }
30743207
30753208 def list_tasks (self ) -> List [Dict ]:
@@ -7853,6 +7986,8 @@ def main():
78537986 print (f" Enabled: { task ['enabled' ]} " )
78547987 print (f" Runs: { task ['run_count' ]} " )
78557988 print (f" Last status: { task ['last_status' ]} " )
7989+ if task .get ('dependencies' ):
7990+ print (f" Depends on: { ', ' .join (task ['dependencies' ])} " )
78567991 if task ['triggers' ]:
78577992 print (f" Triggers: { ', ' .join (task ['triggers' ])} " )
78587993 else :
0 commit comments