diff options
author | noptuno <repollo.marrero@gmail.com> | 2023-04-28 02:40:47 +0200 |
---|---|---|
committer | noptuno <repollo.marrero@gmail.com> | 2023-04-28 02:40:47 +0200 |
commit | 6f6a73987201c9c303047c61389b82ad98b15597 (patch) | |
tree | bf67eb590d49979d6740bc1e94b4018df48bce98 /venv/lib/python3.9/site-packages/trio/_windows_pipes.py | |
parent | Resolved merge conflicts and merged pr_218 into STREAMLIT_CHAT_IMPLEMENTATION (diff) | |
parent | Merging PR_218 openai_rev package with new streamlit chat app (diff) | |
download | gpt4free-6f6a73987201c9c303047c61389b82ad98b15597.tar gpt4free-6f6a73987201c9c303047c61389b82ad98b15597.tar.gz gpt4free-6f6a73987201c9c303047c61389b82ad98b15597.tar.bz2 gpt4free-6f6a73987201c9c303047c61389b82ad98b15597.tar.lz gpt4free-6f6a73987201c9c303047c61389b82ad98b15597.tar.xz gpt4free-6f6a73987201c9c303047c61389b82ad98b15597.tar.zst gpt4free-6f6a73987201c9c303047c61389b82ad98b15597.zip |
Diffstat (limited to 'venv/lib/python3.9/site-packages/trio/_windows_pipes.py')
-rw-r--r-- | venv/lib/python3.9/site-packages/trio/_windows_pipes.py | 138 |
1 files changed, 138 insertions, 0 deletions
diff --git a/venv/lib/python3.9/site-packages/trio/_windows_pipes.py b/venv/lib/python3.9/site-packages/trio/_windows_pipes.py new file mode 100644 index 00000000..693792ba --- /dev/null +++ b/venv/lib/python3.9/site-packages/trio/_windows_pipes.py @@ -0,0 +1,138 @@ +import sys +from typing import TYPE_CHECKING +from . import _core +from ._abc import SendStream, ReceiveStream +from ._util import ConflictDetector, Final +from ._core._windows_cffi import _handle, raise_winerror, kernel32, ffi + +assert sys.platform == "win32" or not TYPE_CHECKING + +# XX TODO: don't just make this up based on nothing. +DEFAULT_RECEIVE_SIZE = 65536 + + +# See the comments on _unix_pipes._FdHolder for discussion of why we set the +# handle to -1 when it's closed. +class _HandleHolder: + def __init__(self, handle: int) -> None: + self.handle = -1 + if not isinstance(handle, int): + raise TypeError("handle must be an int") + self.handle = handle + _core.register_with_iocp(self.handle) + + @property + def closed(self): + return self.handle == -1 + + def close(self): + if self.closed: + return + handle = self.handle + self.handle = -1 + if not kernel32.CloseHandle(_handle(handle)): + raise_winerror() + + def __del__(self): + self.close() + + +class PipeSendStream(SendStream, metaclass=Final): + """Represents a send stream over a Windows named pipe that has been + opened in OVERLAPPED mode. + """ + + def __init__(self, handle: int) -> None: + self._handle_holder = _HandleHolder(handle) + self._conflict_detector = ConflictDetector( + "another task is currently using this pipe" + ) + + async def send_all(self, data: bytes): + with self._conflict_detector: + if self._handle_holder.closed: + raise _core.ClosedResourceError("this pipe is already closed") + + if not data: + await _core.checkpoint() + return + + try: + written = await _core.write_overlapped(self._handle_holder.handle, data) + except BrokenPipeError as ex: + raise _core.BrokenResourceError from ex + # By my reading of MSDN, this assert is guaranteed to pass so long + # as the pipe isn't in nonblocking mode, but... let's just + # double-check. + assert written == len(data) + + async def wait_send_all_might_not_block(self) -> None: + with self._conflict_detector: + if self._handle_holder.closed: + raise _core.ClosedResourceError("This pipe is already closed") + + # not implemented yet, and probably not needed + await _core.checkpoint() + + def close(self): + self._handle_holder.close() + + async def aclose(self): + self.close() + await _core.checkpoint() + + +class PipeReceiveStream(ReceiveStream, metaclass=Final): + """Represents a receive stream over an os.pipe object.""" + + def __init__(self, handle: int) -> None: + self._handle_holder = _HandleHolder(handle) + self._conflict_detector = ConflictDetector( + "another task is currently using this pipe" + ) + + async def receive_some(self, max_bytes=None) -> bytes: + with self._conflict_detector: + if self._handle_holder.closed: + raise _core.ClosedResourceError("this pipe is already closed") + + if max_bytes is None: + max_bytes = DEFAULT_RECEIVE_SIZE + else: + if not isinstance(max_bytes, int): + raise TypeError("max_bytes must be integer >= 1") + if max_bytes < 1: + raise ValueError("max_bytes must be integer >= 1") + + buffer = bytearray(max_bytes) + try: + size = await _core.readinto_overlapped( + self._handle_holder.handle, buffer + ) + except BrokenPipeError: + if self._handle_holder.closed: + raise _core.ClosedResourceError( + "another task closed this pipe" + ) from None + + # Windows raises BrokenPipeError on one end of a pipe + # whenever the other end closes, regardless of direction. + # Convert this to the Unix behavior of returning EOF to the + # reader when the writer closes. + # + # And since we're not raising an exception, we have to + # checkpoint. But readinto_overlapped did raise an exception, + # so it might not have checkpointed for us. So we have to + # checkpoint manually. + await _core.checkpoint() + return b"" + else: + del buffer[size:] + return buffer + + def close(self): + self._handle_holder.close() + + async def aclose(self): + self.close() + await _core.checkpoint() |