From 98512fb9b3e3b74fae6cf8e3dac648c95d2ab2b3 Mon Sep 17 00:00:00 2001 From: LynnLiu <47371559@163.com> Date: Fri, 29 Nov 2024 17:11:20 +0800 Subject: [PATCH 01/15] Sync model infos when reconnect to supervisor --- xinference/core/worker.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/xinference/core/worker.py b/xinference/core/worker.py index 2a380bdf41..3859cddf31 100644 --- a/xinference/core/worker.py +++ b/xinference/core/worker.py @@ -335,7 +335,17 @@ async def get_supervisor_ref(self, add_worker: bool = True) -> xo.ActorRefType: # Newly started (or restarted), has no model, notify supervisor await self._supervisor_ref.add_worker(self.address) logger.info("Connected to supervisor as a fresh worker") - + + # Reconnect to Newly started supervisor, has running models + if add_worker and len(self._model_uid_to_model) > 0: + # Reconnect to Newly started supervisor, notify supervisor + await self._supervisor_ref.add_worker(self.address) + # Sync replical model infos + running_models = {} + running_models.update(await self.list_models()) + await self._supervisor_ref.sync_models(self.address, running_models) + logger.info(f"Connected to supervisor as a old worker with {len(running_models)} models") + self._status_guard_ref = await xo.actor_ref( address=self._supervisor_address, uid=StatusGuardActor.default_uid() ) @@ -1049,6 +1059,8 @@ async def _periodical_report_status(self): except ( Exception ) as ex: # pragma: no cover # noqa: E722 # nosec # pylint: disable=bare-except + # Disconnect from supervisor, which maybe restart + self._supervisor_ref = None logger.error(f"Failed to upload node info: {ex}") try: await asyncio.sleep(XINFERENCE_HEALTH_CHECK_INTERVAL) From d93c81b58c4f16fc05ce6edb1ca953b46975aebf Mon Sep 17 00:00:00 2001 From: LynnLiu <47371559@163.com> Date: Fri, 29 Nov 2024 17:19:37 +0800 Subject: [PATCH 02/15] Receive model infos of workers --- xinference/core/supervisor.py | 38 +++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/xinference/core/supervisor.py b/xinference/core/supervisor.py index c8f2f59ff6..a3607d8847 100644 --- a/xinference/core/supervisor.py +++ b/xinference/core/supervisor.py @@ -1178,6 +1178,44 @@ async def list_models(self) -> Dict[str, Dict[str, Any]]: v["replica"] = self._model_uid_to_replica_info[k].replica return running_model_info + # Receive model infos of workers + @log_async(logger=logger) + async def sync_models(self, worker_address: str, model_desc: Dict[str, Dict[str, Any]]): # model_uid : ModelDescription{"address"} + for replica_model_uid, desc_dict in model_desc.items(): + # Rebuild self._replica_model_uid_to_worker + if replica_model_uid in self._replica_model_uid_to_worker: + continue + + model_name = desc_dict["model_name"] if "model_name" in desc_dict else "" + model_version = desc_dict["model_version"] if "model_version" in desc_dict else "" + logger.debug(f"Receive model replica: {replica_model_uid} {worker_address} {model_name}") + + assert (worker_address in self._worker_address_to_worker), f"Worker {worker_address} not exists when sync_models" + + self._replica_model_uid_to_worker[replica_model_uid] = self._worker_address_to_worker[worker_address] + + + # Rebuild self._model_uid_to_replica_info + model_uid, rep_id = parse_replica_model_uid(replica_model_uid) + replica = rep_id+1 + if model_uid not in self._model_uid_to_replica_info: + self._model_uid_to_replica_info[model_uid] = ReplicaInfo(replica=replica, scheduler=itertools.cycle(range(replica))) + else: + if replica > self._model_uid_to_replica_info[model_uid].replica: + self._model_uid_to_replica_info[model_uid] = ReplicaInfo(replica=replica, scheduler=itertools.cycle(range(replica))) + + # Rebuild self._status_guard_ref + instance_info = InstanceInfo( + model_name=model_name, + model_uid=model_uid, + model_version=model_version, + model_ability=[], + replica=replica, + status=LaunchStatus.READY.name, + instance_created_ts=int(time.time()), + ) + await self._status_guard_ref.set_instance_info(model_uid, instance_info) + def is_local_deployment(self) -> bool: # TODO: temporary. return ( From 467f06b1c52134f57d8b80a4235b6cafc5a946f5 Mon Sep 17 00:00:00 2001 From: paradin <47371559@163.com> Date: Sat, 30 Nov 2024 17:34:40 +0800 Subject: [PATCH 03/15] repo black-pre-commit-mirror --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 61d19c86ae..895ea70dc6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ files: xinference repos: - - repo: https://github.com/psf/black + - repo: https://github.com/psf/black-pre-commit-mirror rev: 23.12.0 hooks: - id: black From 03172c8677eb29520d38d6704649257f161b0478 Mon Sep 17 00:00:00 2001 From: paradin <47371559@163.com> Date: Sat, 30 Nov 2024 17:35:13 +0800 Subject: [PATCH 04/15] fix lint --- xinference/core/supervisor.py | 45 ++++++++++++++++++++++------------- xinference/core/worker.py | 10 ++++---- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/xinference/core/supervisor.py b/xinference/core/supervisor.py index a3607d8847..1ab08a54fc 100644 --- a/xinference/core/supervisor.py +++ b/xinference/core/supervisor.py @@ -1180,30 +1180,43 @@ async def list_models(self) -> Dict[str, Dict[str, Any]]: # Receive model infos of workers @log_async(logger=logger) - async def sync_models(self, worker_address: str, model_desc: Dict[str, Dict[str, Any]]): # model_uid : ModelDescription{"address"} + async def sync_models( + self, worker_address: str, model_desc: Dict[str, Dict[str, Any]] + ): # model_uid : ModelDescription{"address"} for replica_model_uid, desc_dict in model_desc.items(): # Rebuild self._replica_model_uid_to_worker if replica_model_uid in self._replica_model_uid_to_worker: continue - + model_name = desc_dict["model_name"] if "model_name" in desc_dict else "" - model_version = desc_dict["model_version"] if "model_version" in desc_dict else "" - logger.debug(f"Receive model replica: {replica_model_uid} {worker_address} {model_name}") - - assert (worker_address in self._worker_address_to_worker), f"Worker {worker_address} not exists when sync_models" - - self._replica_model_uid_to_worker[replica_model_uid] = self._worker_address_to_worker[worker_address] - - + model_version = ( + desc_dict["model_version"] if "model_version" in desc_dict else "" + ) + logger.debug( + f"Receive model replica: {replica_model_uid} {worker_address} {model_name}" + ) + + assert ( + worker_address in self._worker_address_to_worker + ), f"Worker {worker_address} not exists when sync_models" + + self._replica_model_uid_to_worker[ + replica_model_uid + ] = self._worker_address_to_worker[worker_address] + # Rebuild self._model_uid_to_replica_info model_uid, rep_id = parse_replica_model_uid(replica_model_uid) - replica = rep_id+1 + replica = rep_id + 1 if model_uid not in self._model_uid_to_replica_info: - self._model_uid_to_replica_info[model_uid] = ReplicaInfo(replica=replica, scheduler=itertools.cycle(range(replica))) + self._model_uid_to_replica_info[model_uid] = ReplicaInfo( + replica=replica, scheduler=itertools.cycle(range(replica)) + ) else: if replica > self._model_uid_to_replica_info[model_uid].replica: - self._model_uid_to_replica_info[model_uid] = ReplicaInfo(replica=replica, scheduler=itertools.cycle(range(replica))) - + self._model_uid_to_replica_info[model_uid] = ReplicaInfo( + replica=replica, scheduler=itertools.cycle(range(replica)) + ) + # Rebuild self._status_guard_ref instance_info = InstanceInfo( model_name=model_name, @@ -1213,9 +1226,9 @@ async def sync_models(self, worker_address: str, model_desc: Dict[str, Dict[str, replica=replica, status=LaunchStatus.READY.name, instance_created_ts=int(time.time()), - ) + ) await self._status_guard_ref.set_instance_info(model_uid, instance_info) - + def is_local_deployment(self) -> bool: # TODO: temporary. return ( diff --git a/xinference/core/worker.py b/xinference/core/worker.py index 3859cddf31..368b1238ca 100644 --- a/xinference/core/worker.py +++ b/xinference/core/worker.py @@ -335,17 +335,19 @@ async def get_supervisor_ref(self, add_worker: bool = True) -> xo.ActorRefType: # Newly started (or restarted), has no model, notify supervisor await self._supervisor_ref.add_worker(self.address) logger.info("Connected to supervisor as a fresh worker") - + # Reconnect to Newly started supervisor, has running models if add_worker and len(self._model_uid_to_model) > 0: # Reconnect to Newly started supervisor, notify supervisor await self._supervisor_ref.add_worker(self.address) - # Sync replical model infos + # Sync replica model infos running_models = {} running_models.update(await self.list_models()) await self._supervisor_ref.sync_models(self.address, running_models) - logger.info(f"Connected to supervisor as a old worker with {len(running_models)} models") - + logger.info( + f"Connected to supervisor as a old worker with {len(running_models)} models" + ) + self._status_guard_ref = await xo.actor_ref( address=self._supervisor_address, uid=StatusGuardActor.default_uid() ) From 467a877d65998d4ba9a7af5b15e25c64d7a24a54 Mon Sep 17 00:00:00 2001 From: paradin <47371559@163.com> Date: Mon, 2 Dec 2024 17:24:27 +0800 Subject: [PATCH 05/15] add test file --- xinference/core/tests/test_restart_supervisor.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 xinference/core/tests/test_restart_supervisor.py diff --git a/xinference/core/tests/test_restart_supervisor.py b/xinference/core/tests/test_restart_supervisor.py new file mode 100644 index 0000000000..83066a989c --- /dev/null +++ b/xinference/core/tests/test_restart_supervisor.py @@ -0,0 +1,14 @@ +# Copyright 2022-2023 XProbe Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + From 23b7632978735c50120fa07cb6ec5de802414da4 Mon Sep 17 00:00:00 2001 From: paradin <47371559@163.com> Date: Mon, 2 Dec 2024 19:34:10 +0800 Subject: [PATCH 06/15] test restart supervisor --- .../core/tests/test_restart_supervisor.py | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/xinference/core/tests/test_restart_supervisor.py b/xinference/core/tests/test_restart_supervisor.py index 83066a989c..5103c7b227 100644 --- a/xinference/core/tests/test_restart_supervisor.py +++ b/xinference/core/tests/test_restart_supervisor.py @@ -12,3 +12,52 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Dict +import pytest +import xoscar as xo +from ...core.supervisor import SupervisorActor + +# test restart supervisor +@pytest.mark.asyncio +async def test_restart_supervisor(): + + from ...deploy.supervisor import run_in_subprocess as supervisor_run_in_subprocess + from ...deploy.worker import main as worker_run_in_subprocess + + # start supervisor + supervisor_address = "localhost:19034" + proc_supervisor = supervisor_run_in_subprocess( + supervisor_address + ) + + # start worker + proc_worker = worker_run_in_subprocess( + address="localhost:9998", + supervisor_address=supervisor_address + ) + + # load model + supervisor_ref = await xo.actor_ref( + supervisor_address, SupervisorActor.default_uid() + ) + + await supervisor_ref.launch_builtin_model( + model_uid="bge-m3", + model_name="bge-m3" + ) + + # query replica info + bge_m3_info = await supervisor_ref.describe_model("bge-m3") + + # kill supervisor + proc_supervisor.kill() + + # restart supervisor + proc_supervisor = supervisor_run_in_subprocess( + supervisor_address + ) + + # check replica info + bge_m3_info_check = await supervisor_ref.describe_model("bge-m3") + + assert(bge_m3_info["replica"]==bge_m3_info_check["replica"]) \ No newline at end of file From b4c9da267dca3018226cf270e9d54f073b6843f4 Mon Sep 17 00:00:00 2001 From: paradin <47371559@163.com> Date: Mon, 2 Dec 2024 21:06:05 +0800 Subject: [PATCH 07/15] fix lint --- .../core/tests/test_restart_supervisor.py | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/xinference/core/tests/test_restart_supervisor.py b/xinference/core/tests/test_restart_supervisor.py index 5103c7b227..38b602d348 100644 --- a/xinference/core/tests/test_restart_supervisor.py +++ b/xinference/core/tests/test_restart_supervisor.py @@ -12,28 +12,25 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict import pytest import xoscar as xo + from ...core.supervisor import SupervisorActor + # test restart supervisor @pytest.mark.asyncio async def test_restart_supervisor(): - from ...deploy.supervisor import run_in_subprocess as supervisor_run_in_subprocess from ...deploy.worker import main as worker_run_in_subprocess # start supervisor supervisor_address = "localhost:19034" - proc_supervisor = supervisor_run_in_subprocess( - supervisor_address - ) + proc_supervisor = supervisor_run_in_subprocess(supervisor_address) # start worker - proc_worker = worker_run_in_subprocess( - address="localhost:9998", - supervisor_address=supervisor_address + worker_run_in_subprocess( + address="localhost:9998", supervisor_address=supervisor_address ) # load model @@ -41,10 +38,7 @@ async def test_restart_supervisor(): supervisor_address, SupervisorActor.default_uid() ) - await supervisor_ref.launch_builtin_model( - model_uid="bge-m3", - model_name="bge-m3" - ) + await supervisor_ref.launch_builtin_model(model_uid="bge-m3", model_name="bge-m3") # query replica info bge_m3_info = await supervisor_ref.describe_model("bge-m3") @@ -53,11 +47,9 @@ async def test_restart_supervisor(): proc_supervisor.kill() # restart supervisor - proc_supervisor = supervisor_run_in_subprocess( - supervisor_address - ) + proc_supervisor = supervisor_run_in_subprocess(supervisor_address) # check replica info bge_m3_info_check = await supervisor_ref.describe_model("bge-m3") - assert(bge_m3_info["replica"]==bge_m3_info_check["replica"]) \ No newline at end of file + assert bge_m3_info["replica"] == bge_m3_info_check["replica"] From 18df2d0f81228b010c06e4438e82bbf3a257311d Mon Sep 17 00:00:00 2001 From: paradin <47371559@163.com> Date: Mon, 2 Dec 2024 21:13:02 +0800 Subject: [PATCH 08/15] change test smaller model --- xinference/core/tests/test_restart_supervisor.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/xinference/core/tests/test_restart_supervisor.py b/xinference/core/tests/test_restart_supervisor.py index 38b602d348..401a0fd020 100644 --- a/xinference/core/tests/test_restart_supervisor.py +++ b/xinference/core/tests/test_restart_supervisor.py @@ -38,10 +38,16 @@ async def test_restart_supervisor(): supervisor_address, SupervisorActor.default_uid() ) - await supervisor_ref.launch_builtin_model(model_uid="bge-m3", model_name="bge-m3") + model_uid = "qwen1.5-chat" + await supervisor_ref.launch_builtin_model( + model_uid=model_uid, + model_name="qwen1.5-chat", + model_size_in_billions="0_5", + quantization="q4_0", + ) # query replica info - bge_m3_info = await supervisor_ref.describe_model("bge-m3") + bge_m3_info = await supervisor_ref.describe_model(model_uid) # kill supervisor proc_supervisor.kill() @@ -50,6 +56,6 @@ async def test_restart_supervisor(): proc_supervisor = supervisor_run_in_subprocess(supervisor_address) # check replica info - bge_m3_info_check = await supervisor_ref.describe_model("bge-m3") + bge_m3_info_check = await supervisor_ref.describe_model(model_uid) assert bge_m3_info["replica"] == bge_m3_info_check["replica"] From 34e05330ab11da773bfe35aad0ba00c38ada3ded Mon Sep 17 00:00:00 2001 From: paradin <47371559@163.com> Date: Wed, 4 Dec 2024 13:57:19 +0800 Subject: [PATCH 09/15] test --- .../core/tests/test_restart_supervisor.py | 40 +++++++++++++++---- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/xinference/core/tests/test_restart_supervisor.py b/xinference/core/tests/test_restart_supervisor.py index 401a0fd020..3a26b2ec5e 100644 --- a/xinference/core/tests/test_restart_supervisor.py +++ b/xinference/core/tests/test_restart_supervisor.py @@ -12,26 +12,44 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import pytest import xoscar as xo +from typing import List, Optional, Union, Dict +import multiprocessing from ...core.supervisor import SupervisorActor + # test restart supervisor @pytest.mark.asyncio async def test_restart_supervisor(): from ...deploy.supervisor import run_in_subprocess as supervisor_run_in_subprocess - from ...deploy.worker import main as worker_run_in_subprocess + from ...deploy.worker import main as _start_worker + + def worker_run_in_subprocess( + address: str, + supervisor_address: str, + logging_conf: Optional[Dict] = None + ) -> multiprocessing.Process: + p = multiprocessing.Process(target=_start_worker, args=(address, supervisor_address, None, None, logging_conf)) + p.start() + return p # start supervisor - supervisor_address = "localhost:19034" + supervisor_address = f"localhost:{xo.utils.get_next_port()}" proc_supervisor = supervisor_run_in_subprocess(supervisor_address) + await asyncio.sleep(5) + # start worker worker_run_in_subprocess( - address="localhost:9998", supervisor_address=supervisor_address + address=f"localhost:{xo.utils.get_next_port()}", + supervisor_address=supervisor_address ) + + await asyncio.sleep(10) # load model supervisor_ref = await xo.actor_ref( @@ -44,18 +62,26 @@ async def test_restart_supervisor(): model_name="qwen1.5-chat", model_size_in_billions="0_5", quantization="q4_0", + model_engine="vLLM" ) # query replica info - bge_m3_info = await supervisor_ref.describe_model(model_uid) + model_replica_info = await supervisor_ref.describe_model(model_uid) # kill supervisor - proc_supervisor.kill() + proc_supervisor.terminate() + proc_supervisor.join() # restart supervisor proc_supervisor = supervisor_run_in_subprocess(supervisor_address) + await asyncio.sleep(5) + + supervisor_ref = await xo.actor_ref( + supervisor_address, SupervisorActor.default_uid() + ) + # check replica info - bge_m3_info_check = await supervisor_ref.describe_model(model_uid) + model_replic_info_check = await supervisor_ref.describe_model(model_uid) - assert bge_m3_info["replica"] == bge_m3_info_check["replica"] + assert model_replica_info["replica"] == model_replic_info_check["replica"] From 1ffb392e77d9e9608d883b0f31dafe1e2515153f Mon Sep 17 00:00:00 2001 From: paradin <47371559@163.com> Date: Wed, 4 Dec 2024 21:58:16 +0800 Subject: [PATCH 10/15] fix lint --- .../core/tests/test_restart_supervisor.py | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/xinference/core/tests/test_restart_supervisor.py b/xinference/core/tests/test_restart_supervisor.py index 3a26b2ec5e..7bb4b87249 100644 --- a/xinference/core/tests/test_restart_supervisor.py +++ b/xinference/core/tests/test_restart_supervisor.py @@ -13,15 +13,15 @@ # limitations under the License. import asyncio +import multiprocessing +from typing import Dict, Optional + import pytest import xoscar as xo -from typing import List, Optional, Union, Dict -import multiprocessing from ...core.supervisor import SupervisorActor - # test restart supervisor @pytest.mark.asyncio async def test_restart_supervisor(): @@ -29,11 +29,12 @@ async def test_restart_supervisor(): from ...deploy.worker import main as _start_worker def worker_run_in_subprocess( - address: str, - supervisor_address: str, - logging_conf: Optional[Dict] = None + address: str, supervisor_address: str, logging_conf: Optional[Dict] = None ) -> multiprocessing.Process: - p = multiprocessing.Process(target=_start_worker, args=(address, supervisor_address, None, None, logging_conf)) + p = multiprocessing.Process( + target=_start_worker, + args=(address, supervisor_address, None, None, logging_conf), + ) p.start() return p @@ -45,10 +46,10 @@ def worker_run_in_subprocess( # start worker worker_run_in_subprocess( - address=f"localhost:{xo.utils.get_next_port()}", - supervisor_address=supervisor_address + address=f"localhost:{xo.utils.get_next_port()}", + supervisor_address=supervisor_address, ) - + await asyncio.sleep(10) # load model @@ -62,7 +63,7 @@ def worker_run_in_subprocess( model_name="qwen1.5-chat", model_size_in_billions="0_5", quantization="q4_0", - model_engine="vLLM" + model_engine="llama.cpp", ) # query replica info From e2f184eee6e2e4aa7c78404e3cc5dd5b3fb22092 Mon Sep 17 00:00:00 2001 From: qinxuye Date: Fri, 13 Dec 2024 02:14:27 +0800 Subject: [PATCH 11/15] fix tests --- .../core/tests/test_restart_supervisor.py | 83 ++++++++++--------- xinference/deploy/supervisor.py | 3 +- 2 files changed, 47 insertions(+), 39 deletions(-) diff --git a/xinference/core/tests/test_restart_supervisor.py b/xinference/core/tests/test_restart_supervisor.py index 7bb4b87249..791718f480 100644 --- a/xinference/core/tests/test_restart_supervisor.py +++ b/xinference/core/tests/test_restart_supervisor.py @@ -12,19 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import asyncio import multiprocessing +import time from typing import Dict, Optional -import pytest import xoscar as xo -from ...core.supervisor import SupervisorActor +from ...api import restful_api +from ...client import Client -# test restart supervisor -@pytest.mark.asyncio -async def test_restart_supervisor(): +def test_restart_supervisor(): from ...deploy.supervisor import run_in_subprocess as supervisor_run_in_subprocess from ...deploy.worker import main as _start_worker @@ -39,50 +37,59 @@ def worker_run_in_subprocess( return p # start supervisor - supervisor_address = f"localhost:{xo.utils.get_next_port()}" + web_port, supervisor_port = xo.utils.get_next_port(), xo.utils.get_next_port() + supervisor_address = f"127.0.0.1:{supervisor_port}" proc_supervisor = supervisor_run_in_subprocess(supervisor_address) + rest_api_proc = multiprocessing.Process( + target=restful_api.run, + kwargs=dict( + supervisor_address=supervisor_address, host="127.0.0.1", port=web_port + ), + ) + rest_api_proc.start() - await asyncio.sleep(5) + time.sleep(5) # start worker - worker_run_in_subprocess( - address=f"localhost:{xo.utils.get_next_port()}", + proc_worker = worker_run_in_subprocess( + address=f"127.0.0.1:{xo.utils.get_next_port()}", supervisor_address=supervisor_address, ) - await asyncio.sleep(10) - - # load model - supervisor_ref = await xo.actor_ref( - supervisor_address, SupervisorActor.default_uid() - ) + time.sleep(10) - model_uid = "qwen1.5-chat" - await supervisor_ref.launch_builtin_model( - model_uid=model_uid, - model_name="qwen1.5-chat", - model_size_in_billions="0_5", - quantization="q4_0", - model_engine="llama.cpp", - ) + client = Client(f"http://127.0.0.1:{web_port}") - # query replica info - model_replica_info = await supervisor_ref.describe_model(model_uid) + try: + model_uid = "qwen1.5-chat" + client.launch_model( + model_uid=model_uid, + model_name="qwen1.5-chat", + model_size_in_billions="0_5", + quantization="q4_0", + model_engine="llama.cpp", + ) - # kill supervisor - proc_supervisor.terminate() - proc_supervisor.join() + # query replica info + model_replica_info = client.describe_model(model_uid) + assert model_replica_info is not None - # restart supervisor - proc_supervisor = supervisor_run_in_subprocess(supervisor_address) + # kill supervisor + proc_supervisor.terminate() + proc_supervisor.join() - await asyncio.sleep(5) + # restart supervisor + supervisor_run_in_subprocess(supervisor_address) - supervisor_ref = await xo.actor_ref( - supervisor_address, SupervisorActor.default_uid() - ) + time.sleep(5) - # check replica info - model_replic_info_check = await supervisor_ref.describe_model(model_uid) + # check replica info + model_replic_info_check = client.describe_model(model_uid) + assert model_replica_info["replica"] == model_replic_info_check["replica"] - assert model_replica_info["replica"] == model_replic_info_check["replica"] + finally: + client.abort_cluster() + proc_supervisor.terminate() + proc_worker.terminate() + proc_supervisor.join() + proc_worker.join() diff --git a/xinference/deploy/supervisor.py b/xinference/deploy/supervisor.py index ed12a9f7c2..41a4381b54 100644 --- a/xinference/deploy/supervisor.py +++ b/xinference/deploy/supervisor.py @@ -33,7 +33,8 @@ async def _start_supervisor(address: str, logging_conf: Optional[Dict] = None): - logging.config.dictConfig(logging_conf) # type: ignore + if logging_conf: + logging.config.dictConfig(logging_conf) # type: ignore pool = None try: From 2c6d05a4b3c92a7535f3556508d03dbad1580829 Mon Sep 17 00:00:00 2001 From: qinxuye Date: Fri, 13 Dec 2024 02:18:50 +0800 Subject: [PATCH 12/15] only run restart test for gpu ci --- .github/workflows/python.yaml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python.yaml b/.github/workflows/python.yaml index 75136ec76a..7798bb0530 100644 --- a/.github/workflows/python.yaml +++ b/.github/workflows/python.yaml @@ -208,7 +208,10 @@ jobs: --cov-config=setup.cfg --cov-report=xml --cov=xinference xinference/model/audio/tests/test_cosyvoice.py && \ ${{ env.SELF_HOST_PYTHON }} -m pytest --timeout=1500 \ -W ignore::PendingDeprecationWarning \ - --cov-config=setup.cfg --cov-report=xml --cov=xinference xinference/model/audio/tests/test_fish_speech.py + --cov-config=setup.cfg --cov-report=xml --cov=xinference xinference/model/audio/tests/test_fish_speech.py && \ + ${{ env.SELF_HOST_PYTHON }} -m pytest --timeout=1500 \ + -W ignore::PendingDeprecationWarning \ + --cov-config=setup.cfg --cov-report=xml --cov=xinference xinference/core/tests/test_restart_supervisor.py elif [ "$MODULE" == "metal" ]; then pytest --timeout=1500 \ -W ignore::PendingDeprecationWarning \ @@ -222,6 +225,6 @@ jobs: --cov-config=setup.cfg --cov-report=xml --cov=xinference xinference/client/tests/test_client.py pytest --timeout=1500 \ -W ignore::PendingDeprecationWarning \ - --cov-config=setup.cfg --cov-report=xml --cov=xinference --ignore xinference/core/tests/test_continuous_batching.py --ignore xinference/client/tests/test_client.py --ignore xinference/model/image/tests/test_stable_diffusion.py --ignore xinference/model/image/tests/test_got_ocr2.py --ignore xinference/model/audio/tests xinference + --cov-config=setup.cfg --cov-report=xml --cov=xinference --ignore xinference/core/tests/test_restart_supervisor.py --ignore xinference/core/tests/test_continuous_batching.py --ignore xinference/client/tests/test_client.py --ignore xinference/model/image/tests/test_stable_diffusion.py --ignore xinference/model/image/tests/test_got_ocr2.py --ignore xinference/model/audio/tests xinference fi working-directory: . From 252584035029fd1e15e4e3d916f0a613a899478d Mon Sep 17 00:00:00 2001 From: qinxuye Date: Fri, 13 Dec 2024 02:20:29 +0800 Subject: [PATCH 13/15] only run restart test for gpu ci --- .github/workflows/python.yaml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/python.yaml b/.github/workflows/python.yaml index 840c639ab4..8e41a69f69 100644 --- a/.github/workflows/python.yaml +++ b/.github/workflows/python.yaml @@ -232,7 +232,10 @@ jobs: --cov-config=setup.cfg --cov-report=xml --cov=xinference xinference/model/audio/tests/test_fish_speech.py && \ ${{ env.SELF_HOST_PYTHON }} -m pytest --timeout=1500 \ -W ignore::PendingDeprecationWarning \ - --cov-config=setup.cfg --cov-report=xml --cov=xinference xinference/model/embedding/tests/test_integrated_embedding.py + --cov-config=setup.cfg --cov-report=xml --cov=xinference xinference/model/embedding/tests/test_integrated_embedding.py && \ + ${{ env.SELF_HOST_PYTHON }} -m pytest --timeout=1500 \ + -W ignore::PendingDeprecationWarning \ + --cov-config=setup.cfg --cov-report=xml --cov=xinference xinference/core/tests/test_restart_supervisor.py elif [ "$MODULE" == "metal" ]; then pytest --timeout=1500 \ -W ignore::PendingDeprecationWarning \ From 85d99bb19bd8c7c8aff2023d0fd5f03e494004b5 Mon Sep 17 00:00:00 2001 From: qinxuye Date: Fri, 13 Dec 2024 11:54:53 +0800 Subject: [PATCH 14/15] fix --- .github/workflows/python.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/python.yaml b/.github/workflows/python.yaml index 8e41a69f69..951999c712 100644 --- a/.github/workflows/python.yaml +++ b/.github/workflows/python.yaml @@ -200,6 +200,7 @@ jobs: ${{ env.SELF_HOST_PYTHON }} -m pip install -U soundfile ${{ env.SELF_HOST_PYTHON }} -m pip install -U sentence-transformers ${{ env.SELF_HOST_PYTHON }} -m pip install -U FlagEmbedding + ${{ env.SELF_HOST_PYTHON }} -m pip install "llama-cpp-python>=0.2.82" -i https://abetlen.github.io/llama-cpp-python/whl/cu124 ${{ env.SELF_HOST_PYTHON }} -m pytest --timeout=1500 \ --disable-warnings \ --cov-config=setup.cfg --cov-report=xml --cov=xinference xinference/core/tests/test_continuous_batching.py && \ From 10afc174c05ccfbec093f013d0da0094b0e51cdc Mon Sep 17 00:00:00 2001 From: paradin <47371559@163.com> Date: Sun, 15 Dec 2024 15:35:43 +0800 Subject: [PATCH 15/15] wait for a moment, then the worker can detect disconnect from supervisor --- xinference/core/tests/test_restart_supervisor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/xinference/core/tests/test_restart_supervisor.py b/xinference/core/tests/test_restart_supervisor.py index 791718f480..249dcfdbf4 100644 --- a/xinference/core/tests/test_restart_supervisor.py +++ b/xinference/core/tests/test_restart_supervisor.py @@ -78,6 +78,8 @@ def worker_run_in_subprocess( proc_supervisor.terminate() proc_supervisor.join() + time.sleep(5) + # restart supervisor supervisor_run_in_subprocess(supervisor_address)