diff options
Diffstat (limited to 'venv/lib/python3.9/site-packages/trio/_subprocess_platform')
4 files changed, 276 insertions, 0 deletions
diff --git a/venv/lib/python3.9/site-packages/trio/_subprocess_platform/__init__.py b/venv/lib/python3.9/site-packages/trio/_subprocess_platform/__init__.py new file mode 100644 index 00000000..7a131e09 --- /dev/null +++ b/venv/lib/python3.9/site-packages/trio/_subprocess_platform/__init__.py @@ -0,0 +1,122 @@ +# Platform-specific subprocess bits'n'pieces. + +import os +import sys +from typing import Optional, Tuple, TYPE_CHECKING + +import trio +from .. import _core, _subprocess +from .._abc import SendStream, ReceiveStream + + +_wait_child_exiting_error: Optional[ImportError] = None +_create_child_pipe_error: Optional[ImportError] = None + + +if TYPE_CHECKING: + # internal types for the pipe representations used in type checking only + class ClosableSendStream(SendStream): + def close(self) -> None: + ... + + class ClosableReceiveStream(ReceiveStream): + def close(self) -> None: + ... + + +# Fallback versions of the functions provided -- implementations +# per OS are imported atop these at the bottom of the module. +async def wait_child_exiting(process: "_subprocess.Process") -> None: + """Block until the child process managed by ``process`` is exiting. + + It is invalid to call this function if the process has already + been waited on; that is, ``process.returncode`` must be None. + + When this function returns, it indicates that a call to + :meth:`subprocess.Popen.wait` will immediately be able to + return the process's exit status. The actual exit status is not + consumed by this call, since :class:`~subprocess.Popen` wants + to be able to do that itself. + """ + raise NotImplementedError from _wait_child_exiting_error # pragma: no cover + + +def create_pipe_to_child_stdin() -> Tuple["ClosableSendStream", int]: + """Create a new pipe suitable for sending data from this + process to the standard input of a child we're about to spawn. + + Returns: + A pair ``(trio_end, subprocess_end)`` where ``trio_end`` is a + :class:`~trio.abc.SendStream` and ``subprocess_end`` is + something suitable for passing as the ``stdin`` argument of + :class:`subprocess.Popen`. + """ + raise NotImplementedError from _create_child_pipe_error # pragma: no cover + + +def create_pipe_from_child_output() -> Tuple["ClosableReceiveStream", int]: + """Create a new pipe suitable for receiving data into this + process from the standard output or error stream of a child + we're about to spawn. + + Returns: + A pair ``(trio_end, subprocess_end)`` where ``trio_end`` is a + :class:`~trio.abc.ReceiveStream` and ``subprocess_end`` is + something suitable for passing as the ``stdin`` argument of + :class:`subprocess.Popen`. + """ + raise NotImplementedError from _create_child_pipe_error # pragma: no cover + + +try: + if sys.platform == "win32": + from .windows import wait_child_exiting # noqa: F811 + elif sys.platform != "linux" and (TYPE_CHECKING or hasattr(_core, "wait_kevent")): + from .kqueue import wait_child_exiting # noqa: F811 + else: + from .waitid import wait_child_exiting # noqa: F811 +except ImportError as ex: # pragma: no cover + _wait_child_exiting_error = ex + +try: + if TYPE_CHECKING: + # Not worth type checking these definitions + pass + + elif os.name == "posix": + + def create_pipe_to_child_stdin(): # noqa: F811 + rfd, wfd = os.pipe() + return trio.lowlevel.FdStream(wfd), rfd + + def create_pipe_from_child_output(): # noqa: F811 + rfd, wfd = os.pipe() + return trio.lowlevel.FdStream(rfd), wfd + + elif os.name == "nt": + from .._windows_pipes import PipeSendStream, PipeReceiveStream + + # This isn't exported or documented, but it's also not + # underscore-prefixed, and seems kosher to use. The asyncio docs + # for 3.5 included an example that imported socketpair from + # windows_utils (before socket.socketpair existed on Windows), and + # when asyncio.windows_utils.socketpair was removed in 3.7, the + # removal was mentioned in the release notes. + from asyncio.windows_utils import pipe as windows_pipe + import msvcrt + + def create_pipe_to_child_stdin(): # noqa: F811 + # for stdin, we want the write end (our end) to use overlapped I/O + rh, wh = windows_pipe(overlapped=(False, True)) + return PipeSendStream(wh), msvcrt.open_osfhandle(rh, os.O_RDONLY) + + def create_pipe_from_child_output(): # noqa: F811 + # for stdout/err, it's the read end that's overlapped + rh, wh = windows_pipe(overlapped=(True, False)) + return PipeReceiveStream(rh), msvcrt.open_osfhandle(wh, 0) + + else: # pragma: no cover + raise ImportError("pipes not implemented on this platform") + +except ImportError as ex: # pragma: no cover + _create_child_pipe_error = ex diff --git a/venv/lib/python3.9/site-packages/trio/_subprocess_platform/kqueue.py b/venv/lib/python3.9/site-packages/trio/_subprocess_platform/kqueue.py new file mode 100644 index 00000000..412ccf87 --- /dev/null +++ b/venv/lib/python3.9/site-packages/trio/_subprocess_platform/kqueue.py @@ -0,0 +1,41 @@ +import sys +import select +from typing import TYPE_CHECKING +from .. import _core, _subprocess + +assert (sys.platform != "win32" and sys.platform != "linux") or not TYPE_CHECKING + + +async def wait_child_exiting(process: "_subprocess.Process") -> None: + kqueue = _core.current_kqueue() + try: + from select import KQ_NOTE_EXIT + except ImportError: # pragma: no cover + # pypy doesn't define KQ_NOTE_EXIT: + # https://bitbucket.org/pypy/pypy/issues/2921/ + # I verified this value against both Darwin and FreeBSD + KQ_NOTE_EXIT = 0x80000000 + + make_event = lambda flags: select.kevent( + process.pid, filter=select.KQ_FILTER_PROC, flags=flags, fflags=KQ_NOTE_EXIT + ) + + try: + kqueue.control([make_event(select.KQ_EV_ADD | select.KQ_EV_ONESHOT)], 0) + except ProcessLookupError: # pragma: no cover + # This can supposedly happen if the process is in the process + # of exiting, and it can even be the case that kqueue says the + # process doesn't exist before waitpid(WNOHANG) says it hasn't + # exited yet. See the discussion in https://chromium.googlesource.com/ + # chromium/src/base/+/master/process/kill_mac.cc . + # We haven't actually seen this error occur since we added + # locking to prevent multiple calls to wait_child_exiting() + # for the same process simultaneously, but given the explanation + # in Chromium it seems we should still keep the check. + return + + def abort(_): + kqueue.control([make_event(select.KQ_EV_DELETE)], 0) + return _core.Abort.SUCCEEDED + + await _core.wait_kevent(process.pid, select.KQ_FILTER_PROC, abort) diff --git a/venv/lib/python3.9/site-packages/trio/_subprocess_platform/waitid.py b/venv/lib/python3.9/site-packages/trio/_subprocess_platform/waitid.py new file mode 100644 index 00000000..ad690172 --- /dev/null +++ b/venv/lib/python3.9/site-packages/trio/_subprocess_platform/waitid.py @@ -0,0 +1,107 @@ +import errno +import math +import os +import sys + +from .. import _core, _subprocess +from .._sync import CapacityLimiter, Event +from .._threads import to_thread_run_sync + +try: + from os import waitid + + def sync_wait_reapable(pid): + waitid(os.P_PID, pid, os.WEXITED | os.WNOWAIT) + +except ImportError: + # pypy doesn't define os.waitid so we need to pull it out ourselves + # using cffi: https://bitbucket.org/pypy/pypy/issues/2922/ + import cffi + + waitid_ffi = cffi.FFI() + + # Believe it or not, siginfo_t starts with fields in the + # same layout on both Linux and Darwin. The Linux structure + # is bigger so that's what we use to size `pad`; while + # there are a few extra fields in there, most of it is + # true padding which would not be written by the syscall. + waitid_ffi.cdef( + """ +typedef struct siginfo_s { + int si_signo; + int si_errno; + int si_code; + int si_pid; + int si_uid; + int si_status; + int pad[26]; +} siginfo_t; +int waitid(int idtype, int id, siginfo_t* result, int options); +""" + ) + waitid = waitid_ffi.dlopen(None).waitid + + def sync_wait_reapable(pid): + P_PID = 1 + WEXITED = 0x00000004 + if sys.platform == "darwin": # pragma: no cover + # waitid() is not exposed on Python on Darwin but does + # work through CFFI; note that we typically won't get + # here since Darwin also defines kqueue + WNOWAIT = 0x00000020 + else: + WNOWAIT = 0x01000000 + result = waitid_ffi.new("siginfo_t *") + while waitid(P_PID, pid, result, WEXITED | WNOWAIT) < 0: + got_errno = waitid_ffi.errno + if got_errno == errno.EINTR: + continue + raise OSError(got_errno, os.strerror(got_errno)) + + +# adapted from +# https://github.com/python-trio/trio/issues/4#issuecomment-398967572 + +waitid_limiter = CapacityLimiter(math.inf) + + +async def _waitid_system_task(pid: int, event: Event) -> None: + """Spawn a thread that waits for ``pid`` to exit, then wake any tasks + that were waiting on it. + """ + # cancellable=True: if this task is cancelled, then we abandon the + # thread to keep running waitpid in the background. Since this is + # always run as a system task, this will only happen if the whole + # call to trio.run is shutting down. + + try: + await to_thread_run_sync( + sync_wait_reapable, pid, cancellable=True, limiter=waitid_limiter + ) + except OSError: + # If waitid fails, waitpid will fail too, so it still makes + # sense to wake up the callers of wait_process_exiting(). The + # most likely reason for this error in practice is a child + # exiting when wait() is not possible because SIGCHLD is + # ignored. + pass + finally: + event.set() + + +async def wait_child_exiting(process: "_subprocess.Process") -> None: + # Logic of this function: + # - The first time we get called, we create an Event and start + # an instance of _waitid_system_task that will set the Event + # when waitid() completes. If that Event is set before + # we get cancelled, we're good. + # - Otherwise, a following call after the cancellation must + # reuse the Event created during the first call, lest we + # create an arbitrary number of threads waiting on the same + # process. + + if process._wait_for_exit_data is None: + process._wait_for_exit_data = event = Event() # type: ignore + _core.spawn_system_task(_waitid_system_task, process.pid, event) + assert isinstance(process._wait_for_exit_data, Event) + await process._wait_for_exit_data.wait() diff --git a/venv/lib/python3.9/site-packages/trio/_subprocess_platform/windows.py b/venv/lib/python3.9/site-packages/trio/_subprocess_platform/windows.py new file mode 100644 index 00000000..958be867 --- /dev/null +++ b/venv/lib/python3.9/site-packages/trio/_subprocess_platform/windows.py @@ -0,0 +1,6 @@ +from .. import _subprocess +from .._wait_for_object import WaitForSingleObject + + +async def wait_child_exiting(process: "_subprocess.Process") -> None: + await WaitForSingleObject(int(process._proc._handle)) |