55from typing import Dict , List , Optional , Set
66
77from temporalio import workflow
8- from temporalio .common import RetryPolicy
98from temporalio .exceptions import ApplicationError
109
1110from updates_and_signals .safe_message_handlers .activities import (
1211 AssignNodesToJobInput ,
13- FindBadNodesInput ,
1412 UnassignNodesForJobInput ,
1513 assign_nodes_to_job ,
16- find_bad_nodes ,
1714 unassign_nodes_for_job ,
1815)
1916
@@ -37,7 +34,6 @@ class ClusterManagerInput:
3734@dataclass
3835class ClusterManagerResult :
3936 num_currently_assigned_nodes : int
40- num_bad_nodes : int
4137
4238
4339# Be in the habit of storing message inputs and outputs in serializable structures.
@@ -116,7 +112,7 @@ async def assign_nodes_to_job(
116112 )
117113 nodes_to_assign = unassigned_nodes [: input .total_num_nodes ]
118114 # This await would be dangerous without nodes_lock because it yields control and allows interleaving
119- # with delete_job and perform_health_checks , which both touch self.state.nodes.
115+ # with delete_job, which touches self.state.nodes.
120116 await self ._assign_nodes_to_job (nodes_to_assign , input .job_name )
121117 return ClusterManagerAssignNodesToJobResult (
122118 nodes_assigned = self .get_assigned_nodes (job_name = input .job_name )
@@ -150,7 +146,7 @@ async def delete_job(self, input: ClusterManagerDeleteJobInput) -> None:
150146 k for k , v in self .state .nodes .items () if v == input .job_name
151147 ]
152148 # This await would be dangerous without nodes_lock because it yields control and allows interleaving
153- # with assign_nodes_to_job and perform_health_checks , which all touch self.state.nodes.
149+ # with assign_nodes_to_job, which touches self.state.nodes.
154150 await self ._unassign_nodes_for_job (nodes_to_unassign , input .job_name )
155151
156152 async def _unassign_nodes_for_job (
@@ -167,40 +163,11 @@ async def _unassign_nodes_for_job(
167163 def get_unassigned_nodes (self ) -> List [str ]:
168164 return [k for k , v in self .state .nodes .items () if v is None ]
169165
170- def get_bad_nodes (self ) -> Set [str ]:
171- return set ([k for k , v in self .state .nodes .items () if v == "BAD!" ])
172-
173166 def get_assigned_nodes (self , * , job_name : Optional [str ] = None ) -> Set [str ]:
174167 if job_name :
175168 return set ([k for k , v in self .state .nodes .items () if v == job_name ])
176169 else :
177- return set (
178- [
179- k
180- for k , v in self .state .nodes .items ()
181- if v is not None and v != "BAD!"
182- ]
183- )
184-
185- async def perform_health_checks (self ) -> None :
186- async with self .nodes_lock :
187- assigned_nodes = self .get_assigned_nodes ()
188- try :
189- # This await would be dangerous without nodes_lock because it yields control and allows interleaving
190- # with assign_nodes_to_job and delete_job, which both touch self.state.nodes.
191- bad_nodes = await workflow .execute_activity (
192- find_bad_nodes ,
193- FindBadNodesInput (nodes_to_check = assigned_nodes ),
194- start_to_close_timeout = timedelta (seconds = 10 ),
195- # This health check is optional, and our lock would block the whole workflow if we let it retry forever.
196- retry_policy = RetryPolicy (maximum_attempts = 1 ),
197- )
198- for node in bad_nodes :
199- self .state .nodes [node ] = "BAD!"
200- except Exception as e :
201- workflow .logger .warn (
202- f"Health check failed with error { type (e ).__name__ } :{ e } "
203- )
170+ return set ([k for k , v in self .state .nodes .items () if v is not None ])
204171
205172 # The cluster manager is a long-running "entity" workflow so we need to periodically checkpoint its state and
206173 # continue-as-new.
@@ -229,9 +196,7 @@ def should_continue_as_new(self) -> bool:
229196 async def run (self , input : ClusterManagerInput ) -> ClusterManagerResult :
230197 self .init (input )
231198 await workflow .wait_condition (lambda : self .state .cluster_started )
232- # Perform health checks at intervals.
233199 while True :
234- await self .perform_health_checks ()
235200 try :
236201 await workflow .wait_condition (
237202 lambda : self .state .cluster_shutdown
@@ -250,7 +215,4 @@ async def run(self, input: ClusterManagerInput) -> ClusterManagerResult:
250215 test_continue_as_new = input .test_continue_as_new ,
251216 )
252217 )
253- return ClusterManagerResult (
254- len (self .get_assigned_nodes ()),
255- len (self .get_bad_nodes ()),
256- )
218+ return ClusterManagerResult (len (self .get_assigned_nodes ()))
0 commit comments