diff --git a/.gitignore b/.gitignore index ba74660..86c28a4 100644 --- a/.gitignore +++ b/.gitignore @@ -43,6 +43,9 @@ nosetests.xml coverage.xml *,cover +# virtual environment +venv + # Translations *.mo *.pot diff --git a/.travis.yml b/.travis.yml index 1acac4a..8454dd0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,6 @@ cache: - pip python: - - 2.7 - 3.5 - 3.6 # - 3.7 diff --git a/README.md b/README.md index e9dc7ad..8b6530b 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,12 @@ but (want to) use it for automated testing because it gets the job done... ## What is it? -Python configuring and launching the infamous -[SIPp](http://sipp.sourceforge.net/) using an api inspired by -[requests](http://docs.python-requests.org/) +Python 3.6+ configuring and launching the infamous +[SIPp](http://sipp.sourceforge.net/) using a simple API to +generate commands and spawn them in subprocesses. + +Command subprocess launching now uses +[`trio`](https://trio.readthedocs.io/en/stable/reference-io.html#spawning-subprocesses)! ## It definitely lets you @@ -25,7 +28,7 @@ Python configuring and launching the infamous ## Basic Usage -Launching the default UAC scenario is a short line: +Launching the default UAC script is a short line: ```python import pysipp diff --git a/pysipp/__init__.py b/pysipp/__init__.py index 15c780f..5476559 100644 --- a/pysipp/__init__.py +++ b/pysipp/__init__.py @@ -20,15 +20,11 @@ """ import sys from os.path import dirname -from . import launch, report, plugin, netplug, agent +from . import plugin, netplug, agent from .load import iter_scen_dirs from .agent import client, server -class SIPpFailure(RuntimeError): - """SIPp commands failed""" - - __package__ = "pysipp" __author__ = "Tyler Goodlet (tgoodlet@gmail.com)" @@ -97,10 +93,12 @@ def scenario(dirpath=None, proxyaddr=None, autolocalsocks=True, **scenkwargs): # same as above scen = plugin.mng.hook.pysipp_conf_scen_protocol( - agents=[uas, uac], confpy=None, scenkwargs=scenkwargs + agents=[uas, uac], + confpy=None, + scenkwargs=scenkwargs, ) - if proxyaddr: + if proxyaddr is not None: assert isinstance(proxyaddr, tuple), "proxyaddr must be a (addr, port) tuple" scen.clientdefaults.proxyaddr = proxyaddr @@ -188,59 +186,5 @@ def pysipp_conf_scen(agents, scen): ua.rtp_echo = True -@plugin.hookimpl -def pysipp_new_runner(): - """Provision and assign a default cmd runner""" - return launch.PopenRunner() - - -@plugin.hookimpl -def pysipp_run_protocol(scen, runner, block, timeout, raise_exc): - """ "Run all rendered commands with the provided runner or the built-in - PopenRunner which runs commands locally. - """ - # use provided runner or default provided by hook - runner = runner or plugin.mng.hook.pysipp_new_runner() - agents = scen.prepare() - - def finalize(cmds2procs=None, timeout=180, raise_exc=True): - """Wait for all remaining agents in the scenario to finish executing - and perform error and logfile reporting. - """ - cmds2procs = cmds2procs or runner.get(timeout=timeout) - agents2procs = list(zip(agents, cmds2procs.values())) - msg = report.err_summary(agents2procs) - if msg: - # report logs and stderr - report.emit_logfiles(agents2procs) - if raise_exc: - # raise RuntimeError on agent failure(s) - # (HINT: to rerun type `scen()` from the debugger) - raise SIPpFailure(msg) - - return cmds2procs - - try: - # run all agents (raises RuntimeError on timeout) - cmds2procs = runner( - (ua.render() for ua in agents), block=block, timeout=timeout - ) - except launch.TimeoutError: # sucessful timeout - cmds2procs = finalize(timeout=0, raise_exc=False) - if raise_exc: - raise - else: - # async - if not block: - # XXX async run must bundle up results for later processing - scen.finalize = finalize - return finalize - - # sync - finalize(cmds2procs, raise_exc=raise_exc) - - return runner - - # register the default hook set plugin.mng.register(sys.modules[__name__]) diff --git a/pysipp/agent.py b/pysipp/agent.py index 3ebd1e0..84a9f60 100644 --- a/pysipp/agent.py +++ b/pysipp/agent.py @@ -5,15 +5,21 @@ import re import itertools import tempfile +from functools import partial from copy import deepcopy from distutils import spawn from collections import namedtuple, OrderedDict -from . import command, plugin, utils + +import trio + +from . import command, plugin, utils, launch, report log = utils.get_logger() SocketAddr = namedtuple("SocketAddr", "ip port") +DEFAULT_RUNNER_TIMEOUT = 180 + def tuple_property(attrs): def getter(self): @@ -61,7 +67,10 @@ def name(self): ipcaddr = tuple_property(("ipc_host", "ipc_port")) call_load = tuple_property(("rate", "limit", "call_count")) - def __call__(self, block=True, timeout=180, runner=None, raise_exc=True, **kwargs): + def __call__(self, *args, **kwargs): + return self.run(*args, **kwargs) + + def run(self, timeout=180, **kwargs): # create and configure a temp scenario scen = plugin.mng.hook.pysipp_conf_scen_protocol( @@ -69,16 +78,7 @@ def __call__(self, block=True, timeout=180, runner=None, raise_exc=True, **kwarg confpy=None, scenkwargs={}, ) - # run the standard protocol - # (attach allocted runner for reuse/post-portem) - return plugin.mng.hook.pysipp_run_protocol( - scen=scen, - block=block, - timeout=timeout, - runner=runner, - raise_exc=raise_exc, - **kwargs - ) + return scen.run(timeout=timeout, **kwargs) def is_client(self): return "uac" in self.name.lower() @@ -262,8 +262,12 @@ def __init__( confpy=None, enable_screen_file=True, ): + # placeholder for process "runner" + self._runner = None + # agents iterable in launch-order self._agents = agents + self._prepared_agents = None ua_attrs = UserAgent.keys() # default settings @@ -431,21 +435,34 @@ def from_agents(self, agents=None, autolocalsocks=True, **scenkwargs): """Create a new scenario from prepared agents.""" return type(self)(self.prepare(agents), self._defaults, confpy=self.mod) - def __call__( + async def arun( self, - agents=None, - block=True, - timeout=180, + timeout=DEFAULT_RUNNER_TIMEOUT, runner=None, - raise_exc=True, - copy_agents=False, - **kwargs + block=True, ): - return plugin.mng.hook.pysipp_run_protocol( - scen=self, - block=block, - timeout=timeout, - runner=runner, - raise_exc=raise_exc, - **kwargs + self._prepared_agents = agents = self.prepare() + self._runner = runner = runner or launch.TrioRunner() + + return await launch.run_all_agents(runner, agents, timeout=timeout, block=block) + + def finalize(self, *, timeout=DEFAULT_RUNNER_TIMEOUT): + assert ( + self._prepared_agents and self._runner + ), "Must run scenario before finalizing." + return trio.run( + partial( + launch.finalize, + self._runner, + self._prepared_agents, + timeout=timeout, + ) ) + + def run(self, timeout=180, **kwargs): + """Run scenario blocking to completion.""" + return trio.run(partial(self.arun, timeout=timeout, **kwargs)) + + def __call__(self, *args, **kwargs): + # TODO: deprecation warning here + return self.run(*args, **kwargs) diff --git a/pysipp/hookspec.py b/pysipp/hookspec.py index 5bc9e57..ddec3a5 100644 --- a/pysipp/hookspec.py +++ b/pysipp/hookspec.py @@ -65,18 +65,3 @@ def pysipp_conf_scen(agents, scen): socket arguments. It it the recommended hook for applying a default scenario configuration. """ - - -@hookspec(firstresult=True) -def pysipp_new_runner(): - """Create and return a runner instance to be used for invoking - multiple SIPp commands. The runner must be callable and support both a - `block` and `timeout` kwarg. - """ - - -@hookspec(firstresult=True) -def pysipp_run_protocol(scen, runner, block, timeout, raise_exc): - """Perform steps to execute all SIPp commands usually by calling a - preconfigured command launcher/runner. - """ diff --git a/pysipp/launch.py b/pysipp/launch.py index 68bd197..ff74ce8 100644 --- a/pysipp/launch.py +++ b/pysipp/launch.py @@ -1,17 +1,18 @@ """ Launchers for invoking SIPp user agents """ -import subprocess -import os import shlex -import select -import threading import signal +import subprocess import time from . import utils from pprint import pformat from collections import OrderedDict, namedtuple +import trio + +from . import report + log = utils.get_logger() Streams = namedtuple("Streams", "stdout stderr") @@ -21,110 +22,102 @@ class TimeoutError(Exception): "SIPp process timeout exception" -class PopenRunner(object): - """Run a sequence of SIPp agents asynchronously. If any process terminates - with a non-zero exit code, immediately kill all remaining processes and - collect std streams. +class SIPpFailure(RuntimeError): + """SIPp commands failed""" + - Adheres to an interface similar to `multiprocessing.pool.AsyncResult`. +class TrioRunner(object): + """Run a sequence of SIPp cmds asynchronously. If any process terminates + with a non-zero exit code, immediately canacel all remaining processes and + collect std streams. """ def __init__( self, - subprocmod=subprocess, - osmod=os, - poller=select.epoll, ): - # these could optionally be rpyc proxy objs - self.spm = subprocmod - self.osm = osmod - self.poller = poller() - # collector thread placeholder - self._waiter = None # store proc results self._procs = OrderedDict() - def __call__(self, cmds, block=True, rate=300, **kwargs): - if self._waiter and self._waiter.is_alive(): + async def run(self, cmds, rate=300, **kwargs): + if self.is_alive(): raise RuntimeError("Not all processes from a prior run have completed") if self._procs: raise RuntimeError( "Process results have not been cleared from previous run" ) - sp = self.spm - os = self.osm - DEVNULL = open(os.devnull, "wb") - fds2procs = OrderedDict() - # run agent commands in sequence for cmd in cmds: log.debug('launching cmd:\n"{}"\n'.format(cmd)) - proc = sp.Popen(shlex.split(cmd), stdout=DEVNULL, stderr=sp.PIPE) - fd = proc.stderr.fileno() - log.debug("registering fd '{}' for pid '{}'".format(fd, proc.pid)) - fds2procs[fd] = self._procs[cmd] = proc - # register for stderr hangup events - self.poller.register(proc.stderr.fileno(), select.EPOLLHUP) + proc = await trio.open_process( + shlex.split(cmd), stdout=subprocess.DEVNULL, stderr=subprocess.PIPE + ) + self._procs[cmd] = proc + # limit launch rate time.sleep(1.0 / rate) - # launch waiter - self._waiter = threading.Thread(target=self._wait, args=(fds2procs,)) - self._waiter.daemon = True - self._waiter.start() - - return self.get(**kwargs) if block else self._procs + return self._procs - def _wait(self, fds2procs): - log.debug("started waiter for procs {}".format(fds2procs)) + async def get(self, timeout=180): + """Block up to `timeout` seconds for all agents to complete. + Either return (cmd, proc) pairs or raise `TimeoutError` on timeout + """ signalled = None - left = len(fds2procs) - collected = 0 - while collected < left: - pairs = self.poller.poll() # wait on hangup events - log.debug("received hangup for pairs '{}'".format(pairs)) - for fd, status in pairs: - collected += 1 - proc = fds2procs[fd] - # attach streams so they can be read more then once - log.debug("collecting streams for {}".format(proc)) - proc.streams = Streams(*proc.communicate()) # timeout=2)) - if proc.returncode != 0 and not signalled: + + # taken mostly verbatim from ``trio.run_process()`` + async def read_output(stream): + chunks = [] + async with stream: + try: + while True: + chunk = await stream.receive_some(32768) + if not chunk: + break + chunks.append(chunk) + except trio.ClosedResourceError: + pass + + return b"".join(chunks) + + async def wait_on_proc(proc): + nonlocal signalled + async with proc as proc: + rc = await proc.wait() + if rc != 0 and not signalled: # stop all other agents if there is a failure signalled = self.stop() - log.debug("terminating waiter thread") - - def get(self, timeout=180): - """Block up to `timeout` seconds for all agents to complete. - Either return (cmd, proc) pairs or raise `TimeoutError` on timeout - """ - if self._waiter.is_alive(): - self._waiter.join(timeout=timeout) - - if self._waiter.is_alive(): - # kill them mfin SIPps - signalled = self.stop() - self._waiter.join(timeout=10) - - if self._waiter.is_alive(): - # try to stop a few more times - for _ in range(3): - signalled = self.stop() - self._waiter.join(timeout=1) - - if self._waiter.is_alive(): - # some procs failed to terminate via signalling - raise RuntimeError("Unable to kill all agents!?") - - # all procs were killed by SIGUSR1 - raise TimeoutError( - "pids '{}' failed to complete after '{}' seconds".format( - pformat([p.pid for p in signalled.values()]), timeout - ) + # collect stderr output + proc.stderr_output = await read_output(proc.stderr) + + try: + with trio.fail_after(timeout): + async with trio.open_nursery() as n: + for cmd, proc in self._procs.items(): + # async wait on each process to complete + n.start_soon(wait_on_proc, proc) + + return self._procs + + except trio.TooSlowError: + # kill all SIPp processes + signalled = self.stop() + # all procs were killed by SIGUSR1 + raise TimeoutError( + "pids '{}' failed to complete after '{}' seconds".format( + pformat([p.pid for p in signalled.values()]), timeout ) + ) - return self._procs + def iterprocs(self): + """Iterate all processes which are still alive yielding + (cmd, proc) pairs + """ + return ( + (cmd, proc) + for cmd, proc in self._procs.items() + if proc and proc.poll() is None + ) def stop(self): """Stop all agents with SIGUSR1 as per SIPp's signal handling""" @@ -146,25 +139,42 @@ def _signalall(self, signum): signalled[cmd] = proc return signalled - def iterprocs(self): - """Iterate all processes which are still alive yielding - (cmd, proc) pairs - """ - return ( - (cmd, proc) - for cmd, proc in self._procs.items() - if proc and proc.poll() is None - ) - def is_alive(self): """Return bool indicating whether some agents are still alive""" return any(self.iterprocs()) - def ready(self): - """Return bool indicating whether all agents have completed""" - return not self.is_alive() - def clear(self): """Clear all processes from the last run""" - assert self.ready(), "Not all processes have completed" + assert not self.is_alive(), "Not all processes have completed" self._procs.clear() + + +async def run_all_agents(runner, agents, timeout=180, block=True): + """Run a sequencec of agents using a ``TrioRunner``.""" + + try: + await runner.run((ua.render() for ua in agents), timeout=timeout) + if block: + await finalize(runner, agents, timeout) + return runner + except TimeoutError as terr: + # print error logs even when we timeout + try: + await finalize(runner, agents, timeout) + except SIPpFailure as err: + assert "exit code -9" in str(err) + raise terr + + +async def finalize(runner, agents, timeout): + """Block up to `timeout` seconds for all agents to complete.""" + # this might raise TimeoutError + cmds2procs = await runner.get(timeout=timeout) + agents2procs = list(zip(agents, cmds2procs.values())) + msg = report.err_summary(agents2procs) + if msg: + # report logs and stderr + await report.emit_logfiles(agents2procs) + raise SIPpFailure(msg) + + return cmds2procs diff --git a/pysipp/report.py b/pysipp/report.py index 9a69513..0dac395 100644 --- a/pysipp/report.py +++ b/pysipp/report.py @@ -16,6 +16,7 @@ 99: "Normal exit without calls processed", -1: "Fatal error", -2: "Fatal error binding a socket", + -9: "Signalled to stop with SIGUSR1", -10: "Signalled to stop with SIGUSR1", 254: "Connection Error: socket already in use", 255: "Command or syntax error: check stderr output", @@ -43,16 +44,14 @@ def err_summary(agents2procs): return msg -def emit_logfiles(agents2procs, level="warn", max_lines=100): +async def emit_logfiles(agents2procs, level="warn", max_lines=100): """Log all available SIPp log-file contents""" emit = getattr(log, level) for ua, proc in agents2procs: # print stderr emit( - "stderr for '{}' @ {}\n{}\n".format( - ua.name, ua.srcaddr, proc.streams.stderr - ) + "stderr for '{}' @ {}\n{}\n".format(ua.name, ua.srcaddr, proc.stderr_output) ) # FIXME: no idea, but some logs are not being printed without this # logging mod bug? diff --git a/setup.py b/setup.py index ced8544..a9590a7 100755 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ setup( name="pysipp", - version='0.1.0', + version='1.0.0.dev0', description="A SIPp scenario launcher", long_description=readme, long_description_content_type="text/markdown", @@ -36,7 +36,10 @@ url='https://github.com/SIPp/pysipp', platforms=['linux'], packages=['pysipp', 'pysipp.cli'], - install_requires=['pluggy>=0.11.0'], + install_requires=[ + 'pluggy >= 0.11.0', + 'trio>=0.11.0' + ], tests_require=['pytest'], entry_points={ 'console_scripts': [ @@ -48,7 +51,7 @@ 'Intended Audience :: Developers', 'License :: OSI Approved :: GNU General Public License v2 (GPLv2)', 'Operating System :: POSIX :: Linux', - 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3.6', 'Topic :: Software Development', 'Topic :: Software Development :: Testing', 'Topic :: Software Development :: Quality Assurance', diff --git a/tests/test_agent.py b/tests/test_agent.py index 5f1872c..2f49760 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -114,15 +114,22 @@ def test_server(): # test client failure on bad remote destination (agent.client(destaddr=("99.99.99.99", 5060)), 1, {}, RuntimeError), # test if server times out it is signalled - (agent.server(), 0, {"timeout": 1}, launch.TimeoutError), + (agent.server(), -9, {"timeout": 1}, launch.TimeoutError), ], ids=["ua", "uac", "uas"], ) def test_failures(ua, retcode, kwargs, exc): """Test failure cases for all types of agents""" + runner = launch.TrioRunner() + # run it without raising - runner = ua(raise_exc=False, **kwargs) - cmds2procs = runner.get(timeout=0) + if exc: + with pytest.raises(exc): + ua(runner=runner, **kwargs) + + # runner = ua(raise_exc=False, **kwargs) + + cmds2procs = runner._procs assert not runner.is_alive() assert len(list(runner.iterprocs())) == 0 # tests transparency of the defaults config pipeline diff --git a/tests/test_launcher.py b/tests/test_launcher.py index bf3bdc8..12b13d1 100644 --- a/tests/test_launcher.py +++ b/tests/test_launcher.py @@ -1,14 +1,16 @@ """ Basic agent/scenario launching """ +import trio +import pytest + from pysipp.agent import client, server -from pysipp.launch import PopenRunner +from pysipp.launch import TrioRunner, run_all_agents, SIPpFailure -def run_blocking(*agents): - runner = PopenRunner() +def run_blocking(runner, agents): assert not runner.is_alive() - runner(ua.render() for ua in agents) + trio.run(run_all_agents, runner, agents) assert not runner.is_alive() return runner @@ -22,24 +24,28 @@ def test_agent_fails(): uac.recv_timeout = 1 # avoids SIPp issue #176 uac.call_count = 1 # avoids SIPp issue #176 - runner = run_blocking(uas, uac) + runner = TrioRunner() + with pytest.raises(SIPpFailure): + run_blocking(runner, (uas, uac)) # fails due to invalid ip - uasproc = runner.get(timeout=0)[uas.render()] - assert uasproc.streams.stderr + uasproc = runner._procs[uas.render()] + print(uasproc.stderr_output) + assert uasproc.stderr_output assert uasproc.returncode == 255, uasproc.streams.stderr # killed by signal - uacproc = runner.get(timeout=0)[uac.render()] - # assert not uacproc.streams.stderr # sometimes this has a log msg? + uacproc = runner._procs[uac.render()] + # assert not uacproc.stderr_output # sometimes this has a log msg? ret = uacproc.returncode # killed by SIGUSR1 or terminates before it starts (racy) assert ret == -10 or ret == 0 def test_default_scen(default_agents): - runner = run_blocking(*default_agents) + runner = TrioRunner() + runner = run_blocking(runner, default_agents) # both agents should be successful - for cmd, proc in runner.get(timeout=0).items(): + for cmd, proc in runner._procs.items(): assert not proc.returncode diff --git a/tests/test_scenario.py b/tests/test_scenario.py new file mode 100644 index 0000000..60c7d65 --- /dev/null +++ b/tests/test_scenario.py @@ -0,0 +1,3 @@ +''' +pysipp.agent module tests +''' diff --git a/tests/test_stack.py b/tests/test_stack.py index c808c27..842cbf8 100644 --- a/tests/test_stack.py +++ b/tests/test_stack.py @@ -74,7 +74,7 @@ def test_sync_run(scenwalk): """Ensure all scenarios in the test run to completion in synchronous mode""" for path, scen in scenwalk(): runner = scen(timeout=6) - for cmd, proc in runner.get(timeout=0).items(): + for cmd, proc in runner._procs.items(): assert proc.returncode == 0 diff --git a/tox.ini b/tox.ini index cd9502a..5397180 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ # and then run "tox" from this directory. [tox] -envlist = py27, py35, py36, pypy +envlist = py36, py37 [testenv] deps =