summaryrefslogtreecommitdiffstats
path: root/venv/lib/python3.9/site-packages/trio/_windows_pipes.py
diff options
context:
space:
mode:
authornoptuno <repollo.marrero@gmail.com>2023-04-28 02:40:47 +0200
committernoptuno <repollo.marrero@gmail.com>2023-04-28 02:40:47 +0200
commit6f6a73987201c9c303047c61389b82ad98b15597 (patch)
treebf67eb590d49979d6740bc1e94b4018df48bce98 /venv/lib/python3.9/site-packages/trio/_windows_pipes.py
parentResolved merge conflicts and merged pr_218 into STREAMLIT_CHAT_IMPLEMENTATION (diff)
parentMerging PR_218 openai_rev package with new streamlit chat app (diff)
downloadgpt4free-6f6a73987201c9c303047c61389b82ad98b15597.tar
gpt4free-6f6a73987201c9c303047c61389b82ad98b15597.tar.gz
gpt4free-6f6a73987201c9c303047c61389b82ad98b15597.tar.bz2
gpt4free-6f6a73987201c9c303047c61389b82ad98b15597.tar.lz
gpt4free-6f6a73987201c9c303047c61389b82ad98b15597.tar.xz
gpt4free-6f6a73987201c9c303047c61389b82ad98b15597.tar.zst
gpt4free-6f6a73987201c9c303047c61389b82ad98b15597.zip
Diffstat (limited to 'venv/lib/python3.9/site-packages/trio/_windows_pipes.py')
-rw-r--r--venv/lib/python3.9/site-packages/trio/_windows_pipes.py138
1 files changed, 138 insertions, 0 deletions
diff --git a/venv/lib/python3.9/site-packages/trio/_windows_pipes.py b/venv/lib/python3.9/site-packages/trio/_windows_pipes.py
new file mode 100644
index 00000000..693792ba
--- /dev/null
+++ b/venv/lib/python3.9/site-packages/trio/_windows_pipes.py
@@ -0,0 +1,138 @@
+import sys
+from typing import TYPE_CHECKING
+from . import _core
+from ._abc import SendStream, ReceiveStream
+from ._util import ConflictDetector, Final
+from ._core._windows_cffi import _handle, raise_winerror, kernel32, ffi
+
+assert sys.platform == "win32" or not TYPE_CHECKING
+
+# XX TODO: don't just make this up based on nothing.
+DEFAULT_RECEIVE_SIZE = 65536
+
+
+# See the comments on _unix_pipes._FdHolder for discussion of why we set the
+# handle to -1 when it's closed.
+class _HandleHolder:
+ def __init__(self, handle: int) -> None:
+ self.handle = -1
+ if not isinstance(handle, int):
+ raise TypeError("handle must be an int")
+ self.handle = handle
+ _core.register_with_iocp(self.handle)
+
+ @property
+ def closed(self):
+ return self.handle == -1
+
+ def close(self):
+ if self.closed:
+ return
+ handle = self.handle
+ self.handle = -1
+ if not kernel32.CloseHandle(_handle(handle)):
+ raise_winerror()
+
+ def __del__(self):
+ self.close()
+
+
+class PipeSendStream(SendStream, metaclass=Final):
+ """Represents a send stream over a Windows named pipe that has been
+ opened in OVERLAPPED mode.
+ """
+
+ def __init__(self, handle: int) -> None:
+ self._handle_holder = _HandleHolder(handle)
+ self._conflict_detector = ConflictDetector(
+ "another task is currently using this pipe"
+ )
+
+ async def send_all(self, data: bytes):
+ with self._conflict_detector:
+ if self._handle_holder.closed:
+ raise _core.ClosedResourceError("this pipe is already closed")
+
+ if not data:
+ await _core.checkpoint()
+ return
+
+ try:
+ written = await _core.write_overlapped(self._handle_holder.handle, data)
+ except BrokenPipeError as ex:
+ raise _core.BrokenResourceError from ex
+ # By my reading of MSDN, this assert is guaranteed to pass so long
+ # as the pipe isn't in nonblocking mode, but... let's just
+ # double-check.
+ assert written == len(data)
+
+ async def wait_send_all_might_not_block(self) -> None:
+ with self._conflict_detector:
+ if self._handle_holder.closed:
+ raise _core.ClosedResourceError("This pipe is already closed")
+
+ # not implemented yet, and probably not needed
+ await _core.checkpoint()
+
+ def close(self):
+ self._handle_holder.close()
+
+ async def aclose(self):
+ self.close()
+ await _core.checkpoint()
+
+
+class PipeReceiveStream(ReceiveStream, metaclass=Final):
+ """Represents a receive stream over an os.pipe object."""
+
+ def __init__(self, handle: int) -> None:
+ self._handle_holder = _HandleHolder(handle)
+ self._conflict_detector = ConflictDetector(
+ "another task is currently using this pipe"
+ )
+
+ async def receive_some(self, max_bytes=None) -> bytes:
+ with self._conflict_detector:
+ if self._handle_holder.closed:
+ raise _core.ClosedResourceError("this pipe is already closed")
+
+ if max_bytes is None:
+ max_bytes = DEFAULT_RECEIVE_SIZE
+ else:
+ if not isinstance(max_bytes, int):
+ raise TypeError("max_bytes must be integer >= 1")
+ if max_bytes < 1:
+ raise ValueError("max_bytes must be integer >= 1")
+
+ buffer = bytearray(max_bytes)
+ try:
+ size = await _core.readinto_overlapped(
+ self._handle_holder.handle, buffer
+ )
+ except BrokenPipeError:
+ if self._handle_holder.closed:
+ raise _core.ClosedResourceError(
+ "another task closed this pipe"
+ ) from None
+
+ # Windows raises BrokenPipeError on one end of a pipe
+ # whenever the other end closes, regardless of direction.
+ # Convert this to the Unix behavior of returning EOF to the
+ # reader when the writer closes.
+ #
+ # And since we're not raising an exception, we have to
+ # checkpoint. But readinto_overlapped did raise an exception,
+ # so it might not have checkpointed for us. So we have to
+ # checkpoint manually.
+ await _core.checkpoint()
+ return b""
+ else:
+ del buffer[size:]
+ return buffer
+
+ def close(self):
+ self._handle_holder.close()
+
+ async def aclose(self):
+ self.close()
+ await _core.checkpoint()