summaryrefslogtreecommitdiffstats
path: root/venv/lib/python3.9/site-packages/git/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.9/site-packages/git/util.py')
-rw-r--r--venv/lib/python3.9/site-packages/git/util.py1206
1 files changed, 1206 insertions, 0 deletions
diff --git a/venv/lib/python3.9/site-packages/git/util.py b/venv/lib/python3.9/site-packages/git/util.py
new file mode 100644
index 00000000..30028b1c
--- /dev/null
+++ b/venv/lib/python3.9/site-packages/git/util.py
@@ -0,0 +1,1206 @@
+# utils.py
+# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
+#
+# This module is part of GitPython and is released under
+# the BSD License: http://www.opensource.org/licenses/bsd-license.php
+
+from abc import abstractmethod
+import os.path as osp
+from .compat import is_win
+import contextlib
+from functools import wraps
+import getpass
+import logging
+import os
+import platform
+import subprocess
+import re
+import shutil
+import stat
+from sys import maxsize
+import time
+from urllib.parse import urlsplit, urlunsplit
+import warnings
+
+# from git.objects.util import Traversable
+
+# typing ---------------------------------------------------------
+
+from typing import (
+ Any,
+ AnyStr,
+ BinaryIO,
+ Callable,
+ Dict,
+ Generator,
+ IO,
+ Iterator,
+ List,
+ Optional,
+ Pattern,
+ Sequence,
+ Tuple,
+ TypeVar,
+ Union,
+ cast,
+ TYPE_CHECKING,
+ overload,
+)
+
+import pathlib
+
+if TYPE_CHECKING:
+ from git.remote import Remote
+ from git.repo.base import Repo
+ from git.config import GitConfigParser, SectionConstraint
+ from git import Git
+
+ # from git.objects.base import IndexObject
+
+
+from .types import (
+ Literal,
+ SupportsIndex,
+ Protocol,
+ runtime_checkable, # because behind py version guards
+ PathLike,
+ HSH_TD,
+ Total_TD,
+ Files_TD, # aliases
+ Has_id_attribute,
+)
+
+T_IterableObj = TypeVar("T_IterableObj", bound=Union["IterableObj", "Has_id_attribute"], covariant=True)
+# So IterableList[Head] is subtype of IterableList[IterableObj]
+
+# ---------------------------------------------------------------------
+
+
+from gitdb.util import ( # NOQA @IgnorePep8
+ make_sha,
+ LockedFD, # @UnusedImport
+ file_contents_ro, # @UnusedImport
+ file_contents_ro_filepath, # @UnusedImport
+ LazyMixin, # @UnusedImport
+ to_hex_sha, # @UnusedImport
+ to_bin_sha, # @UnusedImport
+ bin_to_hex, # @UnusedImport
+ hex_to_bin, # @UnusedImport
+)
+
+
+# NOTE: Some of the unused imports might be used/imported by others.
+# Handle once test-cases are back up and running.
+# Most of these are unused here, but are for use by git-python modules so these
+# don't see gitdb all the time. Flake of course doesn't like it.
+__all__ = [
+ "stream_copy",
+ "join_path",
+ "to_native_path_linux",
+ "join_path_native",
+ "Stats",
+ "IndexFileSHA1Writer",
+ "IterableObj",
+ "IterableList",
+ "BlockingLockFile",
+ "LockFile",
+ "Actor",
+ "get_user_id",
+ "assure_directory_exists",
+ "RemoteProgress",
+ "CallableRemoteProgress",
+ "rmtree",
+ "unbare_repo",
+ "HIDE_WINDOWS_KNOWN_ERRORS",
+]
+
+log = logging.getLogger(__name__)
+
+# types############################################################
+
+
+#: We need an easy way to see if Appveyor TCs start failing,
+#: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy,
+#: till then, we wish to hide them.
+HIDE_WINDOWS_KNOWN_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_KNOWN_ERRORS", True)
+HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_FREEZE_ERRORS", True)
+
+# { Utility Methods
+
+T = TypeVar("T")
+
+
+def unbare_repo(func: Callable[..., T]) -> Callable[..., T]:
+ """Methods with this decorator raise :class:`.exc.InvalidGitRepositoryError` if they
+ encounter a bare repository"""
+
+ from .exc import InvalidGitRepositoryError
+
+ @wraps(func)
+ def wrapper(self: "Remote", *args: Any, **kwargs: Any) -> T:
+ if self.repo.bare:
+ raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__)
+ # END bare method
+ return func(self, *args, **kwargs)
+
+ # END wrapper
+
+ return wrapper
+
+
+@contextlib.contextmanager
+def cwd(new_dir: PathLike) -> Generator[PathLike, None, None]:
+ old_dir = os.getcwd()
+ os.chdir(new_dir)
+ try:
+ yield new_dir
+ finally:
+ os.chdir(old_dir)
+
+
+def rmtree(path: PathLike) -> None:
+ """Remove the given recursively.
+
+ :note: we use shutil rmtree but adjust its behaviour to see whether files that
+ couldn't be deleted are read-only. Windows will not remove them in that case"""
+
+ def onerror(func: Callable, path: PathLike, exc_info: str) -> None:
+ # Is the error an access error ?
+ os.chmod(path, stat.S_IWUSR)
+
+ try:
+ func(path) # Will scream if still not possible to delete.
+ except Exception as ex:
+ if HIDE_WINDOWS_KNOWN_ERRORS:
+ from unittest import SkipTest
+
+ raise SkipTest("FIXME: fails with: PermissionError\n {}".format(ex)) from ex
+ raise
+
+ return shutil.rmtree(path, False, onerror)
+
+
+def rmfile(path: PathLike) -> None:
+ """Ensure file deleted also on *Windows* where read-only files need special treatment."""
+ if osp.isfile(path):
+ if is_win:
+ os.chmod(path, 0o777)
+ os.remove(path)
+
+
+def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int:
+ """Copy all data from the source stream into the destination stream in chunks
+ of size chunk_size
+
+ :return: amount of bytes written"""
+ br = 0
+ while True:
+ chunk = source.read(chunk_size)
+ destination.write(chunk)
+ br += len(chunk)
+ if len(chunk) < chunk_size:
+ break
+ # END reading output stream
+ return br
+
+
+def join_path(a: PathLike, *p: PathLike) -> PathLike:
+ """Join path tokens together similar to osp.join, but always use
+ '/' instead of possibly '\' on windows."""
+ path = str(a)
+ for b in p:
+ b = str(b)
+ if not b:
+ continue
+ if b.startswith("/"):
+ path += b[1:]
+ elif path == "" or path.endswith("/"):
+ path += b
+ else:
+ path += "/" + b
+ # END for each path token to add
+ return path
+
+
+if is_win:
+
+ def to_native_path_windows(path: PathLike) -> PathLike:
+ path = str(path)
+ return path.replace("/", "\\")
+
+ def to_native_path_linux(path: PathLike) -> str:
+ path = str(path)
+ return path.replace("\\", "/")
+
+ __all__.append("to_native_path_windows")
+ to_native_path = to_native_path_windows
+else:
+ # no need for any work on linux
+ def to_native_path_linux(path: PathLike) -> str:
+ return str(path)
+
+ to_native_path = to_native_path_linux
+
+
+def join_path_native(a: PathLike, *p: PathLike) -> PathLike:
+ """
+ As join path, but makes sure an OS native path is returned. This is only
+ needed to play it safe on my dear windows and to assure nice paths that only
+ use '\'"""
+ return to_native_path(join_path(a, *p))
+
+
+def assure_directory_exists(path: PathLike, is_file: bool = False) -> bool:
+ """Assure that the directory pointed to by path exists.
+
+ :param is_file: If True, path is assumed to be a file and handled correctly.
+ Otherwise it must be a directory
+ :return: True if the directory was created, False if it already existed"""
+ if is_file:
+ path = osp.dirname(path)
+ # END handle file
+ if not osp.isdir(path):
+ os.makedirs(path, exist_ok=True)
+ return True
+ return False
+
+
+def _get_exe_extensions() -> Sequence[str]:
+ PATHEXT = os.environ.get("PATHEXT", None)
+ return (
+ tuple(p.upper() for p in PATHEXT.split(os.pathsep)) if PATHEXT else (".BAT", "COM", ".EXE") if is_win else ("")
+ )
+
+
+def py_where(program: str, path: Optional[PathLike] = None) -> List[str]:
+ # From: http://stackoverflow.com/a/377028/548792
+ winprog_exts = _get_exe_extensions()
+
+ def is_exec(fpath: str) -> bool:
+ return (
+ osp.isfile(fpath)
+ and os.access(fpath, os.X_OK)
+ and (os.name != "nt" or not winprog_exts or any(fpath.upper().endswith(ext) for ext in winprog_exts))
+ )
+
+ progs = []
+ if not path:
+ path = os.environ["PATH"]
+ for folder in str(path).split(os.pathsep):
+ folder = folder.strip('"')
+ if folder:
+ exe_path = osp.join(folder, program)
+ for f in [exe_path] + ["%s%s" % (exe_path, e) for e in winprog_exts]:
+ if is_exec(f):
+ progs.append(f)
+ return progs
+
+
+def _cygexpath(drive: Optional[str], path: str) -> str:
+ if osp.isabs(path) and not drive:
+ # Invoked from `cygpath()` directly with `D:Apps\123`?
+ # It's an error, leave it alone just slashes)
+ p = path # convert to str if AnyPath given
+ else:
+ p = path and osp.normpath(osp.expandvars(osp.expanduser(path)))
+ if osp.isabs(p):
+ if drive:
+ # Confusing, maybe a remote system should expand vars.
+ p = path
+ else:
+ p = cygpath(p)
+ elif drive:
+ p = "/proc/cygdrive/%s/%s" % (drive.lower(), p)
+ p_str = str(p) # ensure it is a str and not AnyPath
+ return p_str.replace("\\", "/")
+
+
+_cygpath_parsers: Tuple[Tuple[Pattern[str], Callable, bool], ...] = (
+ # See: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx
+ # and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths
+ (
+ re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"),
+ (lambda server, share, rest_path: "//%s/%s/%s" % (server, share, rest_path.replace("\\", "/"))),
+ False,
+ ),
+ (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), (_cygexpath), False),
+ (re.compile(r"(\w):[/\\](.*)"), (_cygexpath), False),
+ (re.compile(r"file:(.*)", re.I), (lambda rest_path: rest_path), True),
+ (re.compile(r"(\w{2,}:.*)"), (lambda url: url), False), # remote URL, do nothing
+)
+
+
+def cygpath(path: str) -> str:
+ """Use :meth:`git.cmd.Git.polish_url()` instead, that works on any environment."""
+ path = str(path) # ensure is str and not AnyPath.
+ # Fix to use Paths when 3.5 dropped. or to be just str if only for urls?
+ if not path.startswith(("/cygdrive", "//", "/proc/cygdrive")):
+ for regex, parser, recurse in _cygpath_parsers:
+ match = regex.match(path)
+ if match:
+ path = parser(*match.groups())
+ if recurse:
+ path = cygpath(path)
+ break
+ else:
+ path = _cygexpath(None, path)
+
+ return path
+
+
+_decygpath_regex = re.compile(r"(?:/proc)?/cygdrive/(\w)(/.*)?")
+
+
+def decygpath(path: PathLike) -> str:
+ path = str(path)
+ m = _decygpath_regex.match(path)
+ if m:
+ drive, rest_path = m.groups()
+ path = "%s:%s" % (drive.upper(), rest_path or "")
+
+ return path.replace("/", "\\")
+
+
+#: Store boolean flags denoting if a specific Git executable
+#: is from a Cygwin installation (since `cache_lru()` unsupported on PY2).
+_is_cygwin_cache: Dict[str, Optional[bool]] = {}
+
+
+@overload
+def is_cygwin_git(git_executable: None) -> Literal[False]:
+ ...
+
+
+@overload
+def is_cygwin_git(git_executable: PathLike) -> bool:
+ ...
+
+
+def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool:
+ if is_win:
+ # is_win seems to be true only for Windows-native pythons
+ # cygwin has os.name = posix, I think
+ return False
+
+ if git_executable is None:
+ return False
+
+ git_executable = str(git_executable)
+ is_cygwin = _is_cygwin_cache.get(git_executable) # type: Optional[bool]
+ if is_cygwin is None:
+ is_cygwin = False
+ try:
+ git_dir = osp.dirname(git_executable)
+ if not git_dir:
+ res = py_where(git_executable)
+ git_dir = osp.dirname(res[0]) if res else ""
+
+ # Just a name given, not a real path.
+ uname_cmd = osp.join(git_dir, "uname")
+ process = subprocess.Popen([uname_cmd], stdout=subprocess.PIPE, universal_newlines=True)
+ uname_out, _ = process.communicate()
+ # retcode = process.poll()
+ is_cygwin = "CYGWIN" in uname_out
+ except Exception as ex:
+ log.debug("Failed checking if running in CYGWIN due to: %r", ex)
+ _is_cygwin_cache[git_executable] = is_cygwin
+
+ return is_cygwin
+
+
+def get_user_id() -> str:
+ """:return: string identifying the currently active system user as name@node"""
+ return "%s@%s" % (getpass.getuser(), platform.node())
+
+
+def finalize_process(proc: Union[subprocess.Popen, "Git.AutoInterrupt"], **kwargs: Any) -> None:
+ """Wait for the process (clone, fetch, pull or push) and handle its errors accordingly"""
+ # TODO: No close proc-streams??
+ proc.wait(**kwargs)
+
+
+@overload
+def expand_path(p: None, expand_vars: bool = ...) -> None:
+ ...
+
+
+@overload
+def expand_path(p: PathLike, expand_vars: bool = ...) -> str:
+ # improve these overloads when 3.5 dropped
+ ...
+
+
+def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[PathLike]:
+ if isinstance(p, pathlib.Path):
+ return p.resolve()
+ try:
+ p = osp.expanduser(p) # type: ignore
+ if expand_vars:
+ p = osp.expandvars(p) # type: ignore
+ return osp.normpath(osp.abspath(p)) # type: ignore
+ except Exception:
+ return None
+
+
+def remove_password_if_present(cmdline: Sequence[str]) -> List[str]:
+ """
+ Parse any command line argument and if on of the element is an URL with a
+ username and/or password, replace them by stars (in-place).
+
+ If nothing found just returns the command line as-is.
+
+ This should be used for every log line that print a command line, as well as
+ exception messages.
+ """
+ new_cmdline = []
+ for index, to_parse in enumerate(cmdline):
+ new_cmdline.append(to_parse)
+ try:
+ url = urlsplit(to_parse)
+ # Remove password from the URL if present
+ if url.password is None and url.username is None:
+ continue
+
+ if url.password is not None:
+ url = url._replace(netloc=url.netloc.replace(url.password, "*****"))
+ if url.username is not None:
+ url = url._replace(netloc=url.netloc.replace(url.username, "*****"))
+ new_cmdline[index] = urlunsplit(url)
+ except ValueError:
+ # This is not a valid URL
+ continue
+ return new_cmdline
+
+
+# } END utilities
+
+# { Classes
+
+
+class RemoteProgress(object):
+ """
+ Handler providing an interface to parse progress information emitted by git-push
+ and git-fetch and to dispatch callbacks allowing subclasses to react to the progress.
+ """
+
+ _num_op_codes: int = 9
+ (
+ BEGIN,
+ END,
+ COUNTING,
+ COMPRESSING,
+ WRITING,
+ RECEIVING,
+ RESOLVING,
+ FINDING_SOURCES,
+ CHECKING_OUT,
+ ) = [1 << x for x in range(_num_op_codes)]
+ STAGE_MASK = BEGIN | END
+ OP_MASK = ~STAGE_MASK
+
+ DONE_TOKEN = "done."
+ TOKEN_SEPARATOR = ", "
+
+ __slots__ = (
+ "_cur_line",
+ "_seen_ops",
+ "error_lines", # Lines that started with 'error:' or 'fatal:'.
+ "other_lines",
+ ) # Lines not denoting progress (i.e.g. push-infos).
+ re_op_absolute = re.compile(r"(remote: )?([\w\s]+):\s+()(\d+)()(.*)")
+ re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)")
+
+ def __init__(self) -> None:
+ self._seen_ops: List[int] = []
+ self._cur_line: Optional[str] = None
+ self.error_lines: List[str] = []
+ self.other_lines: List[str] = []
+
+ def _parse_progress_line(self, line: AnyStr) -> None:
+ """Parse progress information from the given line as retrieved by git-push
+ or git-fetch.
+
+ - Lines that do not contain progress info are stored in :attr:`other_lines`.
+ - Lines that seem to contain an error (i.e. start with error: or fatal:) are stored
+ in :attr:`error_lines`."""
+ # handle
+ # Counting objects: 4, done.
+ # Compressing objects: 50% (1/2)
+ # Compressing objects: 100% (2/2)
+ # Compressing objects: 100% (2/2), done.
+ if isinstance(line, bytes): # mypy argues about ternary assignment
+ line_str = line.decode("utf-8")
+ else:
+ line_str = line
+ self._cur_line = line_str
+
+ if self._cur_line.startswith(("error:", "fatal:")):
+ self.error_lines.append(self._cur_line)
+ return
+
+ # find escape characters and cut them away - regex will not work with
+ # them as they are non-ascii. As git might expect a tty, it will send them
+ last_valid_index = None
+ for i, c in enumerate(reversed(line_str)):
+ if ord(c) < 32:
+ # its a slice index
+ last_valid_index = -i - 1
+ # END character was non-ascii
+ # END for each character in line
+ if last_valid_index is not None:
+ line_str = line_str[:last_valid_index]
+ # END cut away invalid part
+ line_str = line_str.rstrip()
+
+ cur_count, max_count = None, None
+ match = self.re_op_relative.match(line_str)
+ if match is None:
+ match = self.re_op_absolute.match(line_str)
+
+ if not match:
+ self.line_dropped(line_str)
+ self.other_lines.append(line_str)
+ return
+ # END could not get match
+
+ op_code = 0
+ _remote, op_name, _percent, cur_count, max_count, message = match.groups()
+
+ # get operation id
+ if op_name == "Counting objects":
+ op_code |= self.COUNTING
+ elif op_name == "Compressing objects":
+ op_code |= self.COMPRESSING
+ elif op_name == "Writing objects":
+ op_code |= self.WRITING
+ elif op_name == "Receiving objects":
+ op_code |= self.RECEIVING
+ elif op_name == "Resolving deltas":
+ op_code |= self.RESOLVING
+ elif op_name == "Finding sources":
+ op_code |= self.FINDING_SOURCES
+ elif op_name == "Checking out files":
+ op_code |= self.CHECKING_OUT
+ else:
+ # Note: On windows it can happen that partial lines are sent
+ # Hence we get something like "CompreReceiving objects", which is
+ # a blend of "Compressing objects" and "Receiving objects".
+ # This can't really be prevented, so we drop the line verbosely
+ # to make sure we get informed in case the process spits out new
+ # commands at some point.
+ self.line_dropped(line_str)
+ # Note: Don't add this line to the other lines, as we have to silently
+ # drop it
+ return None
+ # END handle op code
+
+ # figure out stage
+ if op_code not in self._seen_ops:
+ self._seen_ops.append(op_code)
+ op_code |= self.BEGIN
+ # END begin opcode
+
+ if message is None:
+ message = ""
+ # END message handling
+
+ message = message.strip()
+ if message.endswith(self.DONE_TOKEN):
+ op_code |= self.END
+ message = message[: -len(self.DONE_TOKEN)]
+ # END end message handling
+ message = message.strip(self.TOKEN_SEPARATOR)
+
+ self.update(
+ op_code,
+ cur_count and float(cur_count),
+ max_count and float(max_count),
+ message,
+ )
+
+ def new_message_handler(self) -> Callable[[str], None]:
+ """
+ :return:
+ a progress handler suitable for handle_process_output(), passing lines on to this Progress
+ handler in a suitable format"""
+
+ def handler(line: AnyStr) -> None:
+ return self._parse_progress_line(line.rstrip())
+
+ # end
+ return handler
+
+ def line_dropped(self, line: str) -> None:
+ """Called whenever a line could not be understood and was therefore dropped."""
+ pass
+
+ def update(
+ self,
+ op_code: int,
+ cur_count: Union[str, float],
+ max_count: Union[str, float, None] = None,
+ message: str = "",
+ ) -> None:
+ """Called whenever the progress changes
+
+ :param op_code:
+ Integer allowing to be compared against Operation IDs and stage IDs.
+
+ Stage IDs are BEGIN and END. BEGIN will only be set once for each Operation
+ ID as well as END. It may be that BEGIN and END are set at once in case only
+ one progress message was emitted due to the speed of the operation.
+ Between BEGIN and END, none of these flags will be set
+
+ Operation IDs are all held within the OP_MASK. Only one Operation ID will
+ be active per call.
+ :param cur_count: Current absolute count of items
+
+ :param max_count:
+ The maximum count of items we expect. It may be None in case there is
+ no maximum number of items or if it is (yet) unknown.
+
+ :param message:
+ In case of the 'WRITING' operation, it contains the amount of bytes
+ transferred. It may possibly be used for other purposes as well.
+
+ You may read the contents of the current line in self._cur_line"""
+ pass
+
+
+class CallableRemoteProgress(RemoteProgress):
+ """An implementation forwarding updates to any callable"""
+
+ __slots__ = "_callable"
+
+ def __init__(self, fn: Callable) -> None:
+ self._callable = fn
+ super(CallableRemoteProgress, self).__init__()
+
+ def update(self, *args: Any, **kwargs: Any) -> None:
+ self._callable(*args, **kwargs)
+
+
+class Actor(object):
+ """Actors hold information about a person acting on the repository. They
+ can be committers and authors or anything with a name and an email as
+ mentioned in the git log entries."""
+
+ # PRECOMPILED REGEX
+ name_only_regex = re.compile(r"<(.*)>")
+ name_email_regex = re.compile(r"(.*) <(.*?)>")
+
+ # ENVIRONMENT VARIABLES
+ # read when creating new commits
+ env_author_name = "GIT_AUTHOR_NAME"
+ env_author_email = "GIT_AUTHOR_EMAIL"
+ env_committer_name = "GIT_COMMITTER_NAME"
+ env_committer_email = "GIT_COMMITTER_EMAIL"
+
+ # CONFIGURATION KEYS
+ conf_name = "name"
+ conf_email = "email"
+
+ __slots__ = ("name", "email")
+
+ def __init__(self, name: Optional[str], email: Optional[str]) -> None:
+ self.name = name
+ self.email = email
+
+ def __eq__(self, other: Any) -> bool:
+ return self.name == other.name and self.email == other.email
+
+ def __ne__(self, other: Any) -> bool:
+ return not (self == other)
+
+ def __hash__(self) -> int:
+ return hash((self.name, self.email))
+
+ def __str__(self) -> str:
+ return self.name if self.name else ""
+
+ def __repr__(self) -> str:
+ return '<git.Actor "%s <%s>">' % (self.name, self.email)
+
+ @classmethod
+ def _from_string(cls, string: str) -> "Actor":
+ """Create an Actor from a string.
+ :param string: is the string, which is expected to be in regular git format
+
+ John Doe <jdoe@example.com>
+
+ :return: Actor"""
+ m = cls.name_email_regex.search(string)
+ if m:
+ name, email = m.groups()
+ return Actor(name, email)
+ else:
+ m = cls.name_only_regex.search(string)
+ if m:
+ return Actor(m.group(1), None)
+ # assume best and use the whole string as name
+ return Actor(string, None)
+ # END special case name
+ # END handle name/email matching
+
+ @classmethod
+ def _main_actor(
+ cls,
+ env_name: str,
+ env_email: str,
+ config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None,
+ ) -> "Actor":
+ actor = Actor("", "")
+ user_id = None # We use this to avoid multiple calls to getpass.getuser()
+
+ def default_email() -> str:
+ nonlocal user_id
+ if not user_id:
+ user_id = get_user_id()
+ return user_id
+
+ def default_name() -> str:
+ return default_email().split("@")[0]
+
+ for attr, evar, cvar, default in (
+ ("name", env_name, cls.conf_name, default_name),
+ ("email", env_email, cls.conf_email, default_email),
+ ):
+ try:
+ val = os.environ[evar]
+ setattr(actor, attr, val)
+ except KeyError:
+ if config_reader is not None:
+ try:
+ val = config_reader.get("user", cvar)
+ except Exception:
+ val = default()
+ setattr(actor, attr, val)
+ # END config-reader handling
+ if not getattr(actor, attr):
+ setattr(actor, attr, default())
+ # END handle name
+ # END for each item to retrieve
+ return actor
+
+ @classmethod
+ def committer(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor":
+ """
+ :return: Actor instance corresponding to the configured committer. It behaves
+ similar to the git implementation, such that the environment will override
+ configuration values of config_reader. If no value is set at all, it will be
+ generated
+ :param config_reader: ConfigReader to use to retrieve the values from in case
+ they are not set in the environment"""
+ return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader)
+
+ @classmethod
+ def author(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor":
+ """Same as committer(), but defines the main author. It may be specified in the environment,
+ but defaults to the committer"""
+ return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader)
+
+
+class Stats(object):
+
+ """
+ Represents stat information as presented by git at the end of a merge. It is
+ created from the output of a diff operation.
+
+ ``Example``::
+
+ c = Commit( sha1 )
+ s = c.stats
+ s.total # full-stat-dict
+ s.files # dict( filepath : stat-dict )
+
+ ``stat-dict``
+
+ A dictionary with the following keys and values::
+
+ deletions = number of deleted lines as int
+ insertions = number of inserted lines as int
+ lines = total number of lines changed as int, or deletions + insertions
+
+ ``full-stat-dict``
+
+ In addition to the items in the stat-dict, it features additional information::
+
+ files = number of changed files as int"""
+
+ __slots__ = ("total", "files")
+
+ def __init__(self, total: Total_TD, files: Dict[PathLike, Files_TD]):
+ self.total = total
+ self.files = files
+
+ @classmethod
+ def _list_from_string(cls, repo: "Repo", text: str) -> "Stats":
+ """Create a Stat object from output retrieved by git-diff.
+
+ :return: git.Stat"""
+
+ hsh: HSH_TD = {
+ "total": {"insertions": 0, "deletions": 0, "lines": 0, "files": 0},
+ "files": {},
+ }
+ for line in text.splitlines():
+ (raw_insertions, raw_deletions, filename) = line.split("\t")
+ insertions = raw_insertions != "-" and int(raw_insertions) or 0
+ deletions = raw_deletions != "-" and int(raw_deletions) or 0
+ hsh["total"]["insertions"] += insertions
+ hsh["total"]["deletions"] += deletions
+ hsh["total"]["lines"] += insertions + deletions
+ hsh["total"]["files"] += 1
+ files_dict: Files_TD = {
+ "insertions": insertions,
+ "deletions": deletions,
+ "lines": insertions + deletions,
+ }
+ hsh["files"][filename.strip()] = files_dict
+ return Stats(hsh["total"], hsh["files"])
+
+
+class IndexFileSHA1Writer(object):
+
+ """Wrapper around a file-like object that remembers the SHA1 of
+ the data written to it. It will write a sha when the stream is closed
+ or if the asked for explicitly using write_sha.
+
+ Only useful to the indexfile
+
+ :note: Based on the dulwich project"""
+
+ __slots__ = ("f", "sha1")
+
+ def __init__(self, f: IO) -> None:
+ self.f = f
+ self.sha1 = make_sha(b"")
+
+ def write(self, data: AnyStr) -> int:
+ self.sha1.update(data)
+ return self.f.write(data)
+
+ def write_sha(self) -> bytes:
+ sha = self.sha1.digest()
+ self.f.write(sha)
+ return sha
+
+ def close(self) -> bytes:
+ sha = self.write_sha()
+ self.f.close()
+ return sha
+
+ def tell(self) -> int:
+ return self.f.tell()
+
+
+class LockFile(object):
+
+ """Provides methods to obtain, check for, and release a file based lock which
+ should be used to handle concurrent access to the same file.
+
+ As we are a utility class to be derived from, we only use protected methods.
+
+ Locks will automatically be released on destruction"""
+
+ __slots__ = ("_file_path", "_owns_lock")
+
+ def __init__(self, file_path: PathLike) -> None:
+ self._file_path = file_path
+ self._owns_lock = False
+
+ def __del__(self) -> None:
+ self._release_lock()
+
+ def _lock_file_path(self) -> str:
+ """:return: Path to lockfile"""
+ return "%s.lock" % (self._file_path)
+
+ def _has_lock(self) -> bool:
+ """:return: True if we have a lock and if the lockfile still exists
+ :raise AssertionError: if our lock-file does not exist"""
+ return self._owns_lock
+
+ def _obtain_lock_or_raise(self) -> None:
+ """Create a lock file as flag for other instances, mark our instance as lock-holder
+
+ :raise IOError: if a lock was already present or a lock file could not be written"""
+ if self._has_lock():
+ return
+ lock_file = self._lock_file_path()
+ if osp.isfile(lock_file):
+ raise IOError(
+ "Lock for file %r did already exist, delete %r in case the lock is illegal"
+ % (self._file_path, lock_file)
+ )
+
+ try:
+ flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
+ if is_win:
+ flags |= os.O_SHORT_LIVED
+ fd = os.open(lock_file, flags, 0)
+ os.close(fd)
+ except OSError as e:
+ raise IOError(str(e)) from e
+
+ self._owns_lock = True
+
+ def _obtain_lock(self) -> None:
+ """The default implementation will raise if a lock cannot be obtained.
+ Subclasses may override this method to provide a different implementation"""
+ return self._obtain_lock_or_raise()
+
+ def _release_lock(self) -> None:
+ """Release our lock if we have one"""
+ if not self._has_lock():
+ return
+
+ # if someone removed our file beforhand, lets just flag this issue
+ # instead of failing, to make it more usable.
+ lfp = self._lock_file_path()
+ try:
+ rmfile(lfp)
+ except OSError:
+ pass
+ self._owns_lock = False
+
+
+class BlockingLockFile(LockFile):
+
+ """The lock file will block until a lock could be obtained, or fail after
+ a specified timeout.
+
+ :note: If the directory containing the lock was removed, an exception will
+ be raised during the blocking period, preventing hangs as the lock
+ can never be obtained."""
+
+ __slots__ = ("_check_interval", "_max_block_time")
+
+ def __init__(
+ self,
+ file_path: PathLike,
+ check_interval_s: float = 0.3,
+ max_block_time_s: int = maxsize,
+ ) -> None:
+ """Configure the instance
+
+ :param check_interval_s:
+ Period of time to sleep until the lock is checked the next time.
+ By default, it waits a nearly unlimited time
+
+ :param max_block_time_s: Maximum amount of seconds we may lock"""
+ super(BlockingLockFile, self).__init__(file_path)
+ self._check_interval = check_interval_s
+ self._max_block_time = max_block_time_s
+
+ def _obtain_lock(self) -> None:
+ """This method blocks until it obtained the lock, or raises IOError if
+ it ran out of time or if the parent directory was not available anymore.
+ If this method returns, you are guaranteed to own the lock"""
+ starttime = time.time()
+ maxtime = starttime + float(self._max_block_time)
+ while True:
+ try:
+ super(BlockingLockFile, self)._obtain_lock()
+ except IOError as e:
+ # synity check: if the directory leading to the lockfile is not
+ # readable anymore, raise an exception
+ curtime = time.time()
+ if not osp.isdir(osp.dirname(self._lock_file_path())):
+ msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % (
+ self._lock_file_path(),
+ curtime - starttime,
+ )
+ raise IOError(msg) from e
+ # END handle missing directory
+
+ if curtime >= maxtime:
+ msg = "Waited %g seconds for lock at %r" % (
+ maxtime - starttime,
+ self._lock_file_path(),
+ )
+ raise IOError(msg) from e
+ # END abort if we wait too long
+ time.sleep(self._check_interval)
+ else:
+ break
+ # END endless loop
+
+
+class IterableList(List[T_IterableObj]):
+
+ """
+ List of iterable objects allowing to query an object by id or by named index::
+
+ heads = repo.heads
+ heads.master
+ heads['master']
+ heads[0]
+
+ Iterable parent objects = [Commit, SubModule, Reference, FetchInfo, PushInfo]
+ Iterable via inheritance = [Head, TagReference, RemoteReference]
+ ]
+ It requires an id_attribute name to be set which will be queried from its
+ contained items to have a means for comparison.
+
+ A prefix can be specified which is to be used in case the id returned by the
+ items always contains a prefix that does not matter to the user, so it
+ can be left out."""
+
+ __slots__ = ("_id_attr", "_prefix")
+
+ def __new__(cls, id_attr: str, prefix: str = "") -> "IterableList[IterableObj]":
+ return super(IterableList, cls).__new__(cls)
+
+ def __init__(self, id_attr: str, prefix: str = "") -> None:
+ self._id_attr = id_attr
+ self._prefix = prefix
+
+ def __contains__(self, attr: object) -> bool:
+ # first try identity match for performance
+ try:
+ rval = list.__contains__(self, attr)
+ if rval:
+ return rval
+ except (AttributeError, TypeError):
+ pass
+ # END handle match
+
+ # otherwise make a full name search
+ try:
+ getattr(self, cast(str, attr)) # use cast to silence mypy
+ return True
+ except (AttributeError, TypeError):
+ return False
+ # END handle membership
+
+ def __getattr__(self, attr: str) -> T_IterableObj:
+ attr = self._prefix + attr
+ for item in self:
+ if getattr(item, self._id_attr) == attr:
+ return item
+ # END for each item
+ return list.__getattribute__(self, attr)
+
+ def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> T_IterableObj: # type: ignore
+
+ assert isinstance(index, (int, str, slice)), "Index of IterableList should be an int or str"
+
+ if isinstance(index, int):
+ return list.__getitem__(self, index)
+ elif isinstance(index, slice):
+ raise ValueError("Index should be an int or str")
+ else:
+ try:
+ return getattr(self, index)
+ except AttributeError as e:
+ raise IndexError("No item found with id %r" % (self._prefix + index)) from e
+ # END handle getattr
+
+ def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> None:
+
+ assert isinstance(index, (int, str)), "Index of IterableList should be an int or str"
+
+ delindex = cast(int, index)
+ if not isinstance(index, int):
+ delindex = -1
+ name = self._prefix + index
+ for i, item in enumerate(self):
+ if getattr(item, self._id_attr) == name:
+ delindex = i
+ break
+ # END search index
+ # END for each item
+ if delindex == -1:
+ raise IndexError("Item with name %s not found" % name)
+ # END handle error
+ # END get index to delete
+ list.__delitem__(self, delindex)
+
+
+class IterableClassWatcher(type):
+ """Metaclass that watches"""
+
+ def __init__(cls, name: str, bases: Tuple, clsdict: Dict) -> None:
+ for base in bases:
+ if type(base) == IterableClassWatcher:
+ warnings.warn(
+ f"GitPython Iterable subclassed by {name}. "
+ "Iterable is deprecated due to naming clash since v3.1.18"
+ " and will be removed in 3.1.20, "
+ "Use IterableObj instead \n",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+
+
+class Iterable(metaclass=IterableClassWatcher):
+
+ """Defines an interface for iterable items which is to assure a uniform
+ way to retrieve and iterate items within the git repository"""
+
+ __slots__ = ()
+ _id_attribute_ = "attribute that most suitably identifies your instance"
+
+ @classmethod
+ def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any:
+ """
+ Deprecated, use IterableObj instead.
+ Find all items of this type - subclasses can specify args and kwargs differently.
+ If no args are given, subclasses are obliged to return all items if no additional
+ arguments arg given.
+
+ :note: Favor the iter_items method as it will
+
+ :return: list(Item,...) list of item instances"""
+ out_list: Any = IterableList(cls._id_attribute_)
+ out_list.extend(cls.iter_items(repo, *args, **kwargs))
+ return out_list
+
+ @classmethod
+ def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any:
+ # return typed to be compatible with subtypes e.g. Remote
+ """For more information about the arguments, see list_items
+ :return: iterator yielding Items"""
+ raise NotImplementedError("To be implemented by Subclass")
+
+
+@runtime_checkable
+class IterableObj(Protocol):
+ """Defines an interface for iterable items which is to assure a uniform
+ way to retrieve and iterate items within the git repository
+
+ Subclasses = [Submodule, Commit, Reference, PushInfo, FetchInfo, Remote]"""
+
+ __slots__ = ()
+ _id_attribute_: str
+
+ @classmethod
+ def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> IterableList[T_IterableObj]:
+ """
+ Find all items of this type - subclasses can specify args and kwargs differently.
+ If no args are given, subclasses are obliged to return all items if no additional
+ arguments arg given.
+
+ :note: Favor the iter_items method as it will
+
+ :return: list(Item,...) list of item instances"""
+ out_list: IterableList = IterableList(cls._id_attribute_)
+ out_list.extend(cls.iter_items(repo, *args, **kwargs))
+ return out_list
+
+ @classmethod
+ @abstractmethod
+ def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Iterator[T_IterableObj]: # Iterator[T_IterableObj]:
+ # return typed to be compatible with subtypes e.g. Remote
+ """For more information about the arguments, see list_items
+ :return: iterator yielding Items"""
+ raise NotImplementedError("To be implemented by Subclass")
+
+
+# } END classes
+
+
+class NullHandler(logging.Handler):
+ def emit(self, record: object) -> None:
+ pass