1 import subprocess as _subprocess
3 from . import error as _error
6 def invoke(args, stdin=None, universal_newlines=False, timeout=None,
7 expect=None, **kwargs):
9 stdin_pipe = _subprocess.PIPE
13 p = _subprocess.Popen(
14 args, stdin=stdin_pipe, stdout=_subprocess.PIPE,
15 stderr=_subprocess.PIPE, universal_newlines=universal_newlines,
17 except FileNotFoundError as e:
18 raise _error.CommandError(arguments=args, stdin=stdin) from e
20 stdout,stderr = p.communicate(input=stdin, timeout=timeout)
21 except _subprocess.TimeoutExpired as e:
23 stdout,stderr = p.communicate()
25 raise _error.CommandError(
26 msg='timeout ({}s) expired'.format(timeout),
27 arguments=args, stdin=stdin, stdout=stdout, stderr=stderr,
30 if expect and status not in expect:
31 raise _error.CommandError(
32 msg='unexpected exit status ({} not in {})'.format(status, expect),
33 args=args, stdin=stdin, stdout=stdout, stderr=stderr,
35 return (status, stdout, stderr)