1
1
from dataclasses import asdict , dataclass , field , fields , is_dataclass
2
2
from functools import cached_property
3
3
from json import loads
4
- from logging import warning
4
+ from logging import getLogger , warning
5
5
from os import PathLike
6
6
from platform import system
7
7
from re import split
8
- from subprocess import CompletedProcess
8
+ from subprocess import CalledProcessError , CompletedProcess
9
9
from subprocess import run as subprocess_run
10
10
from types import TracebackType
11
11
from typing import Any , Callable , Literal , Optional , TypeVar , Union , cast
12
- from urllib .error import HTTPError , URLError
13
- from urllib .request import urlopen
14
12
15
13
from testcontainers .core .exceptions import ContainerIsNotRunning , NoSuchPortExposed
16
- from testcontainers .core .waiting_utils import wait_container_is_ready
14
+ from testcontainers .core .waiting_utils import WaitStrategy
17
15
18
16
_IPT = TypeVar ("_IPT" )
19
17
_WARNINGS = {"DOCKER_COMPOSE_GET_CONFIG" : "get_config is experimental, see testcontainers/testcontainers-python#669" }
20
18
19
+ logger = getLogger (__name__ )
20
+
21
21
22
22
def _ignore_properties (cls : type [_IPT ], dict_ : Any ) -> _IPT :
23
23
"""omits extra fields like @JsonIgnoreProperties(ignoreUnknown = true)
@@ -80,6 +80,7 @@ class ComposeContainer:
80
80
Health : Optional [str ] = None
81
81
ExitCode : Optional [int ] = None
82
82
Publishers : list [PublishedPortModel ] = field (default_factory = list )
83
+ _docker_compose : Optional ["DockerCompose" ] = field (default = None , init = False , repr = False )
83
84
84
85
def __post_init__ (self ) -> None :
85
86
if self .Publishers :
@@ -116,6 +117,41 @@ def _matches_protocol(prefer_ip_version: str, r: PublishedPortModel) -> bool:
116
117
r_url = r .URL
117
118
return (r_url is not None and ":" in r_url ) is (prefer_ip_version == "IPv6" )
118
119
120
+ # WaitStrategy compatibility methods
121
+ def get_container_host_ip (self ) -> str :
122
+ """Get the host IP for the container."""
123
+ # Simplified implementation - wait strategies don't use this yet
124
+ return "127.0.0.1"
125
+
126
+ def get_exposed_port (self , port : int ) -> int :
127
+ """Get the exposed port mapping for the given internal port."""
128
+ # Simplified implementation - wait strategies don't use this yet
129
+ return port
130
+
131
+ def get_logs (self ) -> tuple [bytes , bytes ]:
132
+ """Get container logs."""
133
+ if not self ._docker_compose :
134
+ raise RuntimeError ("DockerCompose reference not set on ComposeContainer" )
135
+ if not self .Service :
136
+ raise RuntimeError ("Service name not set on ComposeContainer" )
137
+ stdout , stderr = self ._docker_compose .get_logs (self .Service )
138
+ return stdout .encode (), stderr .encode ()
139
+
140
+ def get_wrapped_container (self ) -> "ComposeContainer" :
141
+ """Get the underlying container object for compatibility."""
142
+ return self
143
+
144
+ def reload (self ) -> None :
145
+ """Reload container information for compatibility with wait strategies."""
146
+ # ComposeContainer doesn't need explicit reloading as it's fetched fresh
147
+ # each time through get_container(), but we need this method for compatibility
148
+ pass
149
+
150
+ @property
151
+ def status (self ) -> str :
152
+ """Get container status for compatibility with wait strategies."""
153
+ return self .State or "unknown"
154
+
119
155
120
156
@dataclass
121
157
class DockerCompose :
@@ -178,6 +214,7 @@ class DockerCompose:
178
214
services : Optional [list [str ]] = None
179
215
docker_command_path : Optional [str ] = None
180
216
profiles : Optional [list [str ]] = None
217
+ _wait_strategies : Optional [dict [str , Any ]] = field (default = None , init = False , repr = False )
181
218
182
219
def __post_init__ (self ) -> None :
183
220
if isinstance (self .compose_file_name , str ):
@@ -213,6 +250,15 @@ def compose_command_property(self) -> list[str]:
213
250
docker_compose_cmd += ["--env-file" , self .env_file ]
214
251
return docker_compose_cmd
215
252
253
+ def waiting_for (self , strategies : dict [str , WaitStrategy ]) -> "DockerCompose" :
254
+ """
255
+ Set wait strategies for specific services.
256
+ Args:
257
+ strategies: Dictionary mapping service names to wait strategies
258
+ """
259
+ self ._wait_strategies = strategies
260
+ return self
261
+
216
262
def start (self ) -> None :
217
263
"""
218
264
Starts the docker compose environment.
@@ -241,6 +287,11 @@ def start(self) -> None:
241
287
242
288
self ._run_command (cmd = up_cmd )
243
289
290
+ if self ._wait_strategies :
291
+ for service , strategy in self ._wait_strategies .items ():
292
+ container = self .get_container (service_name = service )
293
+ strategy .wait_until_ready (container )
294
+
244
295
def stop (self , down : bool = True ) -> None :
245
296
"""
246
297
Stops the docker compose environment.
@@ -317,7 +368,7 @@ def get_containers(self, include_all: bool = False) -> list[ComposeContainer]:
317
368
result = self ._run_command (cmd = cmd )
318
369
stdout = split (r"\r?\n" , result .stdout .decode ("utf-8" ))
319
370
320
- containers = []
371
+ containers : list [ ComposeContainer ] = []
321
372
# one line per service in docker 25, single array for docker 24.0.2
322
373
for line in stdout :
323
374
if not line :
@@ -328,6 +379,10 @@ def get_containers(self, include_all: bool = False) -> list[ComposeContainer]:
328
379
else :
329
380
containers .append (_ignore_properties (ComposeContainer , data ))
330
381
382
+ # Set the docker_compose reference on each container
383
+ for container in containers :
384
+ container ._docker_compose = self
385
+
331
386
return containers
332
387
333
388
def get_container (
@@ -352,6 +407,7 @@ def get_container(
352
407
if not matching_containers :
353
408
raise ContainerIsNotRunning (f"{ service_name } is not running in the compose context" )
354
409
410
+ matching_containers [0 ]._docker_compose = self
355
411
return matching_containers [0 ]
356
412
357
413
def exec_in_container (
@@ -388,12 +444,18 @@ def _run_command(
388
444
context : Optional [str ] = None ,
389
445
) -> CompletedProcess [bytes ]:
390
446
context = context or str (self .context )
391
- return subprocess_run (
392
- cmd ,
393
- capture_output = True ,
394
- check = True ,
395
- cwd = context ,
396
- )
447
+ try :
448
+ return subprocess_run (
449
+ cmd ,
450
+ capture_output = True ,
451
+ check = True ,
452
+ cwd = context ,
453
+ )
454
+ except CalledProcessError as e :
455
+ logger .error (f"Command '{ e .cmd } ' failed with exit code { e .returncode } " )
456
+ logger .error (f"STDOUT:\n { e .stdout .decode (errors = 'ignore' )} " )
457
+ logger .error (f"STDERR:\n { e .stderr .decode (errors = 'ignore' )} " )
458
+ raise e from e
397
459
398
460
def get_service_port (
399
461
self ,
@@ -452,16 +514,54 @@ def get_service_host_and_port(
452
514
publisher = self .get_container (service_name ).get_publisher (by_port = port ).normalize ()
453
515
return publisher .URL , publisher .PublishedPort
454
516
455
- @wait_container_is_ready (HTTPError , URLError )
456
517
def wait_for (self , url : str ) -> "DockerCompose" :
457
518
"""
458
519
Waits for a response from a given URL. This is typically used to block until a service in
459
520
the environment has started and is responding. Note that it does not assert any sort of
460
521
return code, only check that the connection was successful.
461
522
523
+ This is a convenience method that internally uses HttpWaitStrategy. For more complex
524
+ wait scenarios, consider using the structured wait strategies with `waiting_for()`.
525
+
462
526
Args:
463
527
url: URL from one of the services in the environment to use to wait on.
528
+
529
+ Example:
530
+ # Simple URL wait (legacy style)
531
+ compose.wait_for("http://localhost:8080") \
532
+ \
533
+ # For more complex scenarios, use structured wait strategies:
534
+ from testcontainers.core.waiting_utils import HttpWaitStrategy, LogMessageWaitStrategy \
535
+ \
536
+ compose.waiting_for({ \
537
+ "web": HttpWaitStrategy(8080).for_status_code(200), \
538
+ "db": LogMessageWaitStrategy("database system is ready to accept connections") \
539
+ })
464
540
"""
541
+ import time
542
+ from urllib .error import HTTPError , URLError
543
+ from urllib .request import Request , urlopen
544
+
545
+ # For simple URL waiting when we have multiple containers,
546
+ # we'll do a direct HTTP check instead of using the container-based strategy
547
+ start_time = time .time ()
548
+ timeout = 120 # Default timeout
549
+
550
+ while True :
551
+ if time .time () - start_time > timeout :
552
+ raise TimeoutError (f"URL { url } not ready within { timeout } seconds" )
553
+
554
+ try :
555
+ request = Request (url , method = "GET" )
556
+ with urlopen (request , timeout = 1 ) as response :
557
+ if 200 <= response .status < 400 :
558
+ return self
559
+ except (URLError , HTTPError , ConnectionResetError , ConnectionRefusedError , BrokenPipeError , OSError ):
560
+ # Any connection error means we should keep waiting
561
+ pass
562
+
563
+ time .sleep (1 )
564
+
465
565
with urlopen (url ) as response :
466
566
response .read ()
467
567
return self
0 commit comments