diff options
author | noptuno <repollo.marrero@gmail.com> | 2023-04-28 02:29:30 +0200 |
---|---|---|
committer | noptuno <repollo.marrero@gmail.com> | 2023-04-28 02:29:30 +0200 |
commit | 355dee533bb34a571b9367820a63cccb668cf866 (patch) | |
tree | 838af886b4fec07320aeb10f0d1e74ba79e79b5c /venv/lib/python3.9/site-packages/git/util.py | |
parent | added pyproject.toml file (diff) | |
download | gpt4free-355dee533bb34a571b9367820a63cccb668cf866.tar gpt4free-355dee533bb34a571b9367820a63cccb668cf866.tar.gz gpt4free-355dee533bb34a571b9367820a63cccb668cf866.tar.bz2 gpt4free-355dee533bb34a571b9367820a63cccb668cf866.tar.lz gpt4free-355dee533bb34a571b9367820a63cccb668cf866.tar.xz gpt4free-355dee533bb34a571b9367820a63cccb668cf866.tar.zst gpt4free-355dee533bb34a571b9367820a63cccb668cf866.zip |
Diffstat (limited to 'venv/lib/python3.9/site-packages/git/util.py')
-rw-r--r-- | venv/lib/python3.9/site-packages/git/util.py | 1206 |
1 files changed, 1206 insertions, 0 deletions
diff --git a/venv/lib/python3.9/site-packages/git/util.py b/venv/lib/python3.9/site-packages/git/util.py new file mode 100644 index 00000000..30028b1c --- /dev/null +++ b/venv/lib/python3.9/site-packages/git/util.py @@ -0,0 +1,1206 @@ +# utils.py +# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors +# +# This module is part of GitPython and is released under +# the BSD License: http://www.opensource.org/licenses/bsd-license.php + +from abc import abstractmethod +import os.path as osp +from .compat import is_win +import contextlib +from functools import wraps +import getpass +import logging +import os +import platform +import subprocess +import re +import shutil +import stat +from sys import maxsize +import time +from urllib.parse import urlsplit, urlunsplit +import warnings + +# from git.objects.util import Traversable + +# typing --------------------------------------------------------- + +from typing import ( + Any, + AnyStr, + BinaryIO, + Callable, + Dict, + Generator, + IO, + Iterator, + List, + Optional, + Pattern, + Sequence, + Tuple, + TypeVar, + Union, + cast, + TYPE_CHECKING, + overload, +) + +import pathlib + +if TYPE_CHECKING: + from git.remote import Remote + from git.repo.base import Repo + from git.config import GitConfigParser, SectionConstraint + from git import Git + + # from git.objects.base import IndexObject + + +from .types import ( + Literal, + SupportsIndex, + Protocol, + runtime_checkable, # because behind py version guards + PathLike, + HSH_TD, + Total_TD, + Files_TD, # aliases + Has_id_attribute, +) + +T_IterableObj = TypeVar("T_IterableObj", bound=Union["IterableObj", "Has_id_attribute"], covariant=True) +# So IterableList[Head] is subtype of IterableList[IterableObj] + +# --------------------------------------------------------------------- + + +from gitdb.util import ( # NOQA @IgnorePep8 + make_sha, + LockedFD, # @UnusedImport + file_contents_ro, # @UnusedImport + file_contents_ro_filepath, # @UnusedImport + LazyMixin, # @UnusedImport + to_hex_sha, # @UnusedImport + to_bin_sha, # @UnusedImport + bin_to_hex, # @UnusedImport + hex_to_bin, # @UnusedImport +) + + +# NOTE: Some of the unused imports might be used/imported by others. +# Handle once test-cases are back up and running. +# Most of these are unused here, but are for use by git-python modules so these +# don't see gitdb all the time. Flake of course doesn't like it. +__all__ = [ + "stream_copy", + "join_path", + "to_native_path_linux", + "join_path_native", + "Stats", + "IndexFileSHA1Writer", + "IterableObj", + "IterableList", + "BlockingLockFile", + "LockFile", + "Actor", + "get_user_id", + "assure_directory_exists", + "RemoteProgress", + "CallableRemoteProgress", + "rmtree", + "unbare_repo", + "HIDE_WINDOWS_KNOWN_ERRORS", +] + +log = logging.getLogger(__name__) + +# types############################################################ + + +#: We need an easy way to see if Appveyor TCs start failing, +#: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy, +#: till then, we wish to hide them. +HIDE_WINDOWS_KNOWN_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_KNOWN_ERRORS", True) +HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_FREEZE_ERRORS", True) + +# { Utility Methods + +T = TypeVar("T") + + +def unbare_repo(func: Callable[..., T]) -> Callable[..., T]: + """Methods with this decorator raise :class:`.exc.InvalidGitRepositoryError` if they + encounter a bare repository""" + + from .exc import InvalidGitRepositoryError + + @wraps(func) + def wrapper(self: "Remote", *args: Any, **kwargs: Any) -> T: + if self.repo.bare: + raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__) + # END bare method + return func(self, *args, **kwargs) + + # END wrapper + + return wrapper + + +@contextlib.contextmanager +def cwd(new_dir: PathLike) -> Generator[PathLike, None, None]: + old_dir = os.getcwd() + os.chdir(new_dir) + try: + yield new_dir + finally: + os.chdir(old_dir) + + +def rmtree(path: PathLike) -> None: + """Remove the given recursively. + + :note: we use shutil rmtree but adjust its behaviour to see whether files that + couldn't be deleted are read-only. Windows will not remove them in that case""" + + def onerror(func: Callable, path: PathLike, exc_info: str) -> None: + # Is the error an access error ? + os.chmod(path, stat.S_IWUSR) + + try: + func(path) # Will scream if still not possible to delete. + except Exception as ex: + if HIDE_WINDOWS_KNOWN_ERRORS: + from unittest import SkipTest + + raise SkipTest("FIXME: fails with: PermissionError\n {}".format(ex)) from ex + raise + + return shutil.rmtree(path, False, onerror) + + +def rmfile(path: PathLike) -> None: + """Ensure file deleted also on *Windows* where read-only files need special treatment.""" + if osp.isfile(path): + if is_win: + os.chmod(path, 0o777) + os.remove(path) + + +def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int: + """Copy all data from the source stream into the destination stream in chunks + of size chunk_size + + :return: amount of bytes written""" + br = 0 + while True: + chunk = source.read(chunk_size) + destination.write(chunk) + br += len(chunk) + if len(chunk) < chunk_size: + break + # END reading output stream + return br + + +def join_path(a: PathLike, *p: PathLike) -> PathLike: + """Join path tokens together similar to osp.join, but always use + '/' instead of possibly '\' on windows.""" + path = str(a) + for b in p: + b = str(b) + if not b: + continue + if b.startswith("/"): + path += b[1:] + elif path == "" or path.endswith("/"): + path += b + else: + path += "/" + b + # END for each path token to add + return path + + +if is_win: + + def to_native_path_windows(path: PathLike) -> PathLike: + path = str(path) + return path.replace("/", "\\") + + def to_native_path_linux(path: PathLike) -> str: + path = str(path) + return path.replace("\\", "/") + + __all__.append("to_native_path_windows") + to_native_path = to_native_path_windows +else: + # no need for any work on linux + def to_native_path_linux(path: PathLike) -> str: + return str(path) + + to_native_path = to_native_path_linux + + +def join_path_native(a: PathLike, *p: PathLike) -> PathLike: + """ + As join path, but makes sure an OS native path is returned. This is only + needed to play it safe on my dear windows and to assure nice paths that only + use '\'""" + return to_native_path(join_path(a, *p)) + + +def assure_directory_exists(path: PathLike, is_file: bool = False) -> bool: + """Assure that the directory pointed to by path exists. + + :param is_file: If True, path is assumed to be a file and handled correctly. + Otherwise it must be a directory + :return: True if the directory was created, False if it already existed""" + if is_file: + path = osp.dirname(path) + # END handle file + if not osp.isdir(path): + os.makedirs(path, exist_ok=True) + return True + return False + + +def _get_exe_extensions() -> Sequence[str]: + PATHEXT = os.environ.get("PATHEXT", None) + return ( + tuple(p.upper() for p in PATHEXT.split(os.pathsep)) if PATHEXT else (".BAT", "COM", ".EXE") if is_win else ("") + ) + + +def py_where(program: str, path: Optional[PathLike] = None) -> List[str]: + # From: http://stackoverflow.com/a/377028/548792 + winprog_exts = _get_exe_extensions() + + def is_exec(fpath: str) -> bool: + return ( + osp.isfile(fpath) + and os.access(fpath, os.X_OK) + and (os.name != "nt" or not winprog_exts or any(fpath.upper().endswith(ext) for ext in winprog_exts)) + ) + + progs = [] + if not path: + path = os.environ["PATH"] + for folder in str(path).split(os.pathsep): + folder = folder.strip('"') + if folder: + exe_path = osp.join(folder, program) + for f in [exe_path] + ["%s%s" % (exe_path, e) for e in winprog_exts]: + if is_exec(f): + progs.append(f) + return progs + + +def _cygexpath(drive: Optional[str], path: str) -> str: + if osp.isabs(path) and not drive: + # Invoked from `cygpath()` directly with `D:Apps\123`? + # It's an error, leave it alone just slashes) + p = path # convert to str if AnyPath given + else: + p = path and osp.normpath(osp.expandvars(osp.expanduser(path))) + if osp.isabs(p): + if drive: + # Confusing, maybe a remote system should expand vars. + p = path + else: + p = cygpath(p) + elif drive: + p = "/proc/cygdrive/%s/%s" % (drive.lower(), p) + p_str = str(p) # ensure it is a str and not AnyPath + return p_str.replace("\\", "/") + + +_cygpath_parsers: Tuple[Tuple[Pattern[str], Callable, bool], ...] = ( + # See: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx + # and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths + ( + re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"), + (lambda server, share, rest_path: "//%s/%s/%s" % (server, share, rest_path.replace("\\", "/"))), + False, + ), + (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), (_cygexpath), False), + (re.compile(r"(\w):[/\\](.*)"), (_cygexpath), False), + (re.compile(r"file:(.*)", re.I), (lambda rest_path: rest_path), True), + (re.compile(r"(\w{2,}:.*)"), (lambda url: url), False), # remote URL, do nothing +) + + +def cygpath(path: str) -> str: + """Use :meth:`git.cmd.Git.polish_url()` instead, that works on any environment.""" + path = str(path) # ensure is str and not AnyPath. + # Fix to use Paths when 3.5 dropped. or to be just str if only for urls? + if not path.startswith(("/cygdrive", "//", "/proc/cygdrive")): + for regex, parser, recurse in _cygpath_parsers: + match = regex.match(path) + if match: + path = parser(*match.groups()) + if recurse: + path = cygpath(path) + break + else: + path = _cygexpath(None, path) + + return path + + +_decygpath_regex = re.compile(r"(?:/proc)?/cygdrive/(\w)(/.*)?") + + +def decygpath(path: PathLike) -> str: + path = str(path) + m = _decygpath_regex.match(path) + if m: + drive, rest_path = m.groups() + path = "%s:%s" % (drive.upper(), rest_path or "") + + return path.replace("/", "\\") + + +#: Store boolean flags denoting if a specific Git executable +#: is from a Cygwin installation (since `cache_lru()` unsupported on PY2). +_is_cygwin_cache: Dict[str, Optional[bool]] = {} + + +@overload +def is_cygwin_git(git_executable: None) -> Literal[False]: + ... + + +@overload +def is_cygwin_git(git_executable: PathLike) -> bool: + ... + + +def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool: + if is_win: + # is_win seems to be true only for Windows-native pythons + # cygwin has os.name = posix, I think + return False + + if git_executable is None: + return False + + git_executable = str(git_executable) + is_cygwin = _is_cygwin_cache.get(git_executable) # type: Optional[bool] + if is_cygwin is None: + is_cygwin = False + try: + git_dir = osp.dirname(git_executable) + if not git_dir: + res = py_where(git_executable) + git_dir = osp.dirname(res[0]) if res else "" + + # Just a name given, not a real path. + uname_cmd = osp.join(git_dir, "uname") + process = subprocess.Popen([uname_cmd], stdout=subprocess.PIPE, universal_newlines=True) + uname_out, _ = process.communicate() + # retcode = process.poll() + is_cygwin = "CYGWIN" in uname_out + except Exception as ex: + log.debug("Failed checking if running in CYGWIN due to: %r", ex) + _is_cygwin_cache[git_executable] = is_cygwin + + return is_cygwin + + +def get_user_id() -> str: + """:return: string identifying the currently active system user as name@node""" + return "%s@%s" % (getpass.getuser(), platform.node()) + + +def finalize_process(proc: Union[subprocess.Popen, "Git.AutoInterrupt"], **kwargs: Any) -> None: + """Wait for the process (clone, fetch, pull or push) and handle its errors accordingly""" + # TODO: No close proc-streams?? + proc.wait(**kwargs) + + +@overload +def expand_path(p: None, expand_vars: bool = ...) -> None: + ... + + +@overload +def expand_path(p: PathLike, expand_vars: bool = ...) -> str: + # improve these overloads when 3.5 dropped + ... + + +def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[PathLike]: + if isinstance(p, pathlib.Path): + return p.resolve() + try: + p = osp.expanduser(p) # type: ignore + if expand_vars: + p = osp.expandvars(p) # type: ignore + return osp.normpath(osp.abspath(p)) # type: ignore + except Exception: + return None + + +def remove_password_if_present(cmdline: Sequence[str]) -> List[str]: + """ + Parse any command line argument and if on of the element is an URL with a + username and/or password, replace them by stars (in-place). + + If nothing found just returns the command line as-is. + + This should be used for every log line that print a command line, as well as + exception messages. + """ + new_cmdline = [] + for index, to_parse in enumerate(cmdline): + new_cmdline.append(to_parse) + try: + url = urlsplit(to_parse) + # Remove password from the URL if present + if url.password is None and url.username is None: + continue + + if url.password is not None: + url = url._replace(netloc=url.netloc.replace(url.password, "*****")) + if url.username is not None: + url = url._replace(netloc=url.netloc.replace(url.username, "*****")) + new_cmdline[index] = urlunsplit(url) + except ValueError: + # This is not a valid URL + continue + return new_cmdline + + +# } END utilities + +# { Classes + + +class RemoteProgress(object): + """ + Handler providing an interface to parse progress information emitted by git-push + and git-fetch and to dispatch callbacks allowing subclasses to react to the progress. + """ + + _num_op_codes: int = 9 + ( + BEGIN, + END, + COUNTING, + COMPRESSING, + WRITING, + RECEIVING, + RESOLVING, + FINDING_SOURCES, + CHECKING_OUT, + ) = [1 << x for x in range(_num_op_codes)] + STAGE_MASK = BEGIN | END + OP_MASK = ~STAGE_MASK + + DONE_TOKEN = "done." + TOKEN_SEPARATOR = ", " + + __slots__ = ( + "_cur_line", + "_seen_ops", + "error_lines", # Lines that started with 'error:' or 'fatal:'. + "other_lines", + ) # Lines not denoting progress (i.e.g. push-infos). + re_op_absolute = re.compile(r"(remote: )?([\w\s]+):\s+()(\d+)()(.*)") + re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)") + + def __init__(self) -> None: + self._seen_ops: List[int] = [] + self._cur_line: Optional[str] = None + self.error_lines: List[str] = [] + self.other_lines: List[str] = [] + + def _parse_progress_line(self, line: AnyStr) -> None: + """Parse progress information from the given line as retrieved by git-push + or git-fetch. + + - Lines that do not contain progress info are stored in :attr:`other_lines`. + - Lines that seem to contain an error (i.e. start with error: or fatal:) are stored + in :attr:`error_lines`.""" + # handle + # Counting objects: 4, done. + # Compressing objects: 50% (1/2) + # Compressing objects: 100% (2/2) + # Compressing objects: 100% (2/2), done. + if isinstance(line, bytes): # mypy argues about ternary assignment + line_str = line.decode("utf-8") + else: + line_str = line + self._cur_line = line_str + + if self._cur_line.startswith(("error:", "fatal:")): + self.error_lines.append(self._cur_line) + return + + # find escape characters and cut them away - regex will not work with + # them as they are non-ascii. As git might expect a tty, it will send them + last_valid_index = None + for i, c in enumerate(reversed(line_str)): + if ord(c) < 32: + # its a slice index + last_valid_index = -i - 1 + # END character was non-ascii + # END for each character in line + if last_valid_index is not None: + line_str = line_str[:last_valid_index] + # END cut away invalid part + line_str = line_str.rstrip() + + cur_count, max_count = None, None + match = self.re_op_relative.match(line_str) + if match is None: + match = self.re_op_absolute.match(line_str) + + if not match: + self.line_dropped(line_str) + self.other_lines.append(line_str) + return + # END could not get match + + op_code = 0 + _remote, op_name, _percent, cur_count, max_count, message = match.groups() + + # get operation id + if op_name == "Counting objects": + op_code |= self.COUNTING + elif op_name == "Compressing objects": + op_code |= self.COMPRESSING + elif op_name == "Writing objects": + op_code |= self.WRITING + elif op_name == "Receiving objects": + op_code |= self.RECEIVING + elif op_name == "Resolving deltas": + op_code |= self.RESOLVING + elif op_name == "Finding sources": + op_code |= self.FINDING_SOURCES + elif op_name == "Checking out files": + op_code |= self.CHECKING_OUT + else: + # Note: On windows it can happen that partial lines are sent + # Hence we get something like "CompreReceiving objects", which is + # a blend of "Compressing objects" and "Receiving objects". + # This can't really be prevented, so we drop the line verbosely + # to make sure we get informed in case the process spits out new + # commands at some point. + self.line_dropped(line_str) + # Note: Don't add this line to the other lines, as we have to silently + # drop it + return None + # END handle op code + + # figure out stage + if op_code not in self._seen_ops: + self._seen_ops.append(op_code) + op_code |= self.BEGIN + # END begin opcode + + if message is None: + message = "" + # END message handling + + message = message.strip() + if message.endswith(self.DONE_TOKEN): + op_code |= self.END + message = message[: -len(self.DONE_TOKEN)] + # END end message handling + message = message.strip(self.TOKEN_SEPARATOR) + + self.update( + op_code, + cur_count and float(cur_count), + max_count and float(max_count), + message, + ) + + def new_message_handler(self) -> Callable[[str], None]: + """ + :return: + a progress handler suitable for handle_process_output(), passing lines on to this Progress + handler in a suitable format""" + + def handler(line: AnyStr) -> None: + return self._parse_progress_line(line.rstrip()) + + # end + return handler + + def line_dropped(self, line: str) -> None: + """Called whenever a line could not be understood and was therefore dropped.""" + pass + + def update( + self, + op_code: int, + cur_count: Union[str, float], + max_count: Union[str, float, None] = None, + message: str = "", + ) -> None: + """Called whenever the progress changes + + :param op_code: + Integer allowing to be compared against Operation IDs and stage IDs. + + Stage IDs are BEGIN and END. BEGIN will only be set once for each Operation + ID as well as END. It may be that BEGIN and END are set at once in case only + one progress message was emitted due to the speed of the operation. + Between BEGIN and END, none of these flags will be set + + Operation IDs are all held within the OP_MASK. Only one Operation ID will + be active per call. + :param cur_count: Current absolute count of items + + :param max_count: + The maximum count of items we expect. It may be None in case there is + no maximum number of items or if it is (yet) unknown. + + :param message: + In case of the 'WRITING' operation, it contains the amount of bytes + transferred. It may possibly be used for other purposes as well. + + You may read the contents of the current line in self._cur_line""" + pass + + +class CallableRemoteProgress(RemoteProgress): + """An implementation forwarding updates to any callable""" + + __slots__ = "_callable" + + def __init__(self, fn: Callable) -> None: + self._callable = fn + super(CallableRemoteProgress, self).__init__() + + def update(self, *args: Any, **kwargs: Any) -> None: + self._callable(*args, **kwargs) + + +class Actor(object): + """Actors hold information about a person acting on the repository. They + can be committers and authors or anything with a name and an email as + mentioned in the git log entries.""" + + # PRECOMPILED REGEX + name_only_regex = re.compile(r"<(.*)>") + name_email_regex = re.compile(r"(.*) <(.*?)>") + + # ENVIRONMENT VARIABLES + # read when creating new commits + env_author_name = "GIT_AUTHOR_NAME" + env_author_email = "GIT_AUTHOR_EMAIL" + env_committer_name = "GIT_COMMITTER_NAME" + env_committer_email = "GIT_COMMITTER_EMAIL" + + # CONFIGURATION KEYS + conf_name = "name" + conf_email = "email" + + __slots__ = ("name", "email") + + def __init__(self, name: Optional[str], email: Optional[str]) -> None: + self.name = name + self.email = email + + def __eq__(self, other: Any) -> bool: + return self.name == other.name and self.email == other.email + + def __ne__(self, other: Any) -> bool: + return not (self == other) + + def __hash__(self) -> int: + return hash((self.name, self.email)) + + def __str__(self) -> str: + return self.name if self.name else "" + + def __repr__(self) -> str: + return '<git.Actor "%s <%s>">' % (self.name, self.email) + + @classmethod + def _from_string(cls, string: str) -> "Actor": + """Create an Actor from a string. + :param string: is the string, which is expected to be in regular git format + + John Doe <jdoe@example.com> + + :return: Actor""" + m = cls.name_email_regex.search(string) + if m: + name, email = m.groups() + return Actor(name, email) + else: + m = cls.name_only_regex.search(string) + if m: + return Actor(m.group(1), None) + # assume best and use the whole string as name + return Actor(string, None) + # END special case name + # END handle name/email matching + + @classmethod + def _main_actor( + cls, + env_name: str, + env_email: str, + config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None, + ) -> "Actor": + actor = Actor("", "") + user_id = None # We use this to avoid multiple calls to getpass.getuser() + + def default_email() -> str: + nonlocal user_id + if not user_id: + user_id = get_user_id() + return user_id + + def default_name() -> str: + return default_email().split("@")[0] + + for attr, evar, cvar, default in ( + ("name", env_name, cls.conf_name, default_name), + ("email", env_email, cls.conf_email, default_email), + ): + try: + val = os.environ[evar] + setattr(actor, attr, val) + except KeyError: + if config_reader is not None: + try: + val = config_reader.get("user", cvar) + except Exception: + val = default() + setattr(actor, attr, val) + # END config-reader handling + if not getattr(actor, attr): + setattr(actor, attr, default()) + # END handle name + # END for each item to retrieve + return actor + + @classmethod + def committer(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor": + """ + :return: Actor instance corresponding to the configured committer. It behaves + similar to the git implementation, such that the environment will override + configuration values of config_reader. If no value is set at all, it will be + generated + :param config_reader: ConfigReader to use to retrieve the values from in case + they are not set in the environment""" + return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader) + + @classmethod + def author(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor": + """Same as committer(), but defines the main author. It may be specified in the environment, + but defaults to the committer""" + return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader) + + +class Stats(object): + + """ + Represents stat information as presented by git at the end of a merge. It is + created from the output of a diff operation. + + ``Example``:: + + c = Commit( sha1 ) + s = c.stats + s.total # full-stat-dict + s.files # dict( filepath : stat-dict ) + + ``stat-dict`` + + A dictionary with the following keys and values:: + + deletions = number of deleted lines as int + insertions = number of inserted lines as int + lines = total number of lines changed as int, or deletions + insertions + + ``full-stat-dict`` + + In addition to the items in the stat-dict, it features additional information:: + + files = number of changed files as int""" + + __slots__ = ("total", "files") + + def __init__(self, total: Total_TD, files: Dict[PathLike, Files_TD]): + self.total = total + self.files = files + + @classmethod + def _list_from_string(cls, repo: "Repo", text: str) -> "Stats": + """Create a Stat object from output retrieved by git-diff. + + :return: git.Stat""" + + hsh: HSH_TD = { + "total": {"insertions": 0, "deletions": 0, "lines": 0, "files": 0}, + "files": {}, + } + for line in text.splitlines(): + (raw_insertions, raw_deletions, filename) = line.split("\t") + insertions = raw_insertions != "-" and int(raw_insertions) or 0 + deletions = raw_deletions != "-" and int(raw_deletions) or 0 + hsh["total"]["insertions"] += insertions + hsh["total"]["deletions"] += deletions + hsh["total"]["lines"] += insertions + deletions + hsh["total"]["files"] += 1 + files_dict: Files_TD = { + "insertions": insertions, + "deletions": deletions, + "lines": insertions + deletions, + } + hsh["files"][filename.strip()] = files_dict + return Stats(hsh["total"], hsh["files"]) + + +class IndexFileSHA1Writer(object): + + """Wrapper around a file-like object that remembers the SHA1 of + the data written to it. It will write a sha when the stream is closed + or if the asked for explicitly using write_sha. + + Only useful to the indexfile + + :note: Based on the dulwich project""" + + __slots__ = ("f", "sha1") + + def __init__(self, f: IO) -> None: + self.f = f + self.sha1 = make_sha(b"") + + def write(self, data: AnyStr) -> int: + self.sha1.update(data) + return self.f.write(data) + + def write_sha(self) -> bytes: + sha = self.sha1.digest() + self.f.write(sha) + return sha + + def close(self) -> bytes: + sha = self.write_sha() + self.f.close() + return sha + + def tell(self) -> int: + return self.f.tell() + + +class LockFile(object): + + """Provides methods to obtain, check for, and release a file based lock which + should be used to handle concurrent access to the same file. + + As we are a utility class to be derived from, we only use protected methods. + + Locks will automatically be released on destruction""" + + __slots__ = ("_file_path", "_owns_lock") + + def __init__(self, file_path: PathLike) -> None: + self._file_path = file_path + self._owns_lock = False + + def __del__(self) -> None: + self._release_lock() + + def _lock_file_path(self) -> str: + """:return: Path to lockfile""" + return "%s.lock" % (self._file_path) + + def _has_lock(self) -> bool: + """:return: True if we have a lock and if the lockfile still exists + :raise AssertionError: if our lock-file does not exist""" + return self._owns_lock + + def _obtain_lock_or_raise(self) -> None: + """Create a lock file as flag for other instances, mark our instance as lock-holder + + :raise IOError: if a lock was already present or a lock file could not be written""" + if self._has_lock(): + return + lock_file = self._lock_file_path() + if osp.isfile(lock_file): + raise IOError( + "Lock for file %r did already exist, delete %r in case the lock is illegal" + % (self._file_path, lock_file) + ) + + try: + flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL + if is_win: + flags |= os.O_SHORT_LIVED + fd = os.open(lock_file, flags, 0) + os.close(fd) + except OSError as e: + raise IOError(str(e)) from e + + self._owns_lock = True + + def _obtain_lock(self) -> None: + """The default implementation will raise if a lock cannot be obtained. + Subclasses may override this method to provide a different implementation""" + return self._obtain_lock_or_raise() + + def _release_lock(self) -> None: + """Release our lock if we have one""" + if not self._has_lock(): + return + + # if someone removed our file beforhand, lets just flag this issue + # instead of failing, to make it more usable. + lfp = self._lock_file_path() + try: + rmfile(lfp) + except OSError: + pass + self._owns_lock = False + + +class BlockingLockFile(LockFile): + + """The lock file will block until a lock could be obtained, or fail after + a specified timeout. + + :note: If the directory containing the lock was removed, an exception will + be raised during the blocking period, preventing hangs as the lock + can never be obtained.""" + + __slots__ = ("_check_interval", "_max_block_time") + + def __init__( + self, + file_path: PathLike, + check_interval_s: float = 0.3, + max_block_time_s: int = maxsize, + ) -> None: + """Configure the instance + + :param check_interval_s: + Period of time to sleep until the lock is checked the next time. + By default, it waits a nearly unlimited time + + :param max_block_time_s: Maximum amount of seconds we may lock""" + super(BlockingLockFile, self).__init__(file_path) + self._check_interval = check_interval_s + self._max_block_time = max_block_time_s + + def _obtain_lock(self) -> None: + """This method blocks until it obtained the lock, or raises IOError if + it ran out of time or if the parent directory was not available anymore. + If this method returns, you are guaranteed to own the lock""" + starttime = time.time() + maxtime = starttime + float(self._max_block_time) + while True: + try: + super(BlockingLockFile, self)._obtain_lock() + except IOError as e: + # synity check: if the directory leading to the lockfile is not + # readable anymore, raise an exception + curtime = time.time() + if not osp.isdir(osp.dirname(self._lock_file_path())): + msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % ( + self._lock_file_path(), + curtime - starttime, + ) + raise IOError(msg) from e + # END handle missing directory + + if curtime >= maxtime: + msg = "Waited %g seconds for lock at %r" % ( + maxtime - starttime, + self._lock_file_path(), + ) + raise IOError(msg) from e + # END abort if we wait too long + time.sleep(self._check_interval) + else: + break + # END endless loop + + +class IterableList(List[T_IterableObj]): + + """ + List of iterable objects allowing to query an object by id or by named index:: + + heads = repo.heads + heads.master + heads['master'] + heads[0] + + Iterable parent objects = [Commit, SubModule, Reference, FetchInfo, PushInfo] + Iterable via inheritance = [Head, TagReference, RemoteReference] + ] + It requires an id_attribute name to be set which will be queried from its + contained items to have a means for comparison. + + A prefix can be specified which is to be used in case the id returned by the + items always contains a prefix that does not matter to the user, so it + can be left out.""" + + __slots__ = ("_id_attr", "_prefix") + + def __new__(cls, id_attr: str, prefix: str = "") -> "IterableList[IterableObj]": + return super(IterableList, cls).__new__(cls) + + def __init__(self, id_attr: str, prefix: str = "") -> None: + self._id_attr = id_attr + self._prefix = prefix + + def __contains__(self, attr: object) -> bool: + # first try identity match for performance + try: + rval = list.__contains__(self, attr) + if rval: + return rval + except (AttributeError, TypeError): + pass + # END handle match + + # otherwise make a full name search + try: + getattr(self, cast(str, attr)) # use cast to silence mypy + return True + except (AttributeError, TypeError): + return False + # END handle membership + + def __getattr__(self, attr: str) -> T_IterableObj: + attr = self._prefix + attr + for item in self: + if getattr(item, self._id_attr) == attr: + return item + # END for each item + return list.__getattribute__(self, attr) + + def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> T_IterableObj: # type: ignore + + assert isinstance(index, (int, str, slice)), "Index of IterableList should be an int or str" + + if isinstance(index, int): + return list.__getitem__(self, index) + elif isinstance(index, slice): + raise ValueError("Index should be an int or str") + else: + try: + return getattr(self, index) + except AttributeError as e: + raise IndexError("No item found with id %r" % (self._prefix + index)) from e + # END handle getattr + + def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> None: + + assert isinstance(index, (int, str)), "Index of IterableList should be an int or str" + + delindex = cast(int, index) + if not isinstance(index, int): + delindex = -1 + name = self._prefix + index + for i, item in enumerate(self): + if getattr(item, self._id_attr) == name: + delindex = i + break + # END search index + # END for each item + if delindex == -1: + raise IndexError("Item with name %s not found" % name) + # END handle error + # END get index to delete + list.__delitem__(self, delindex) + + +class IterableClassWatcher(type): + """Metaclass that watches""" + + def __init__(cls, name: str, bases: Tuple, clsdict: Dict) -> None: + for base in bases: + if type(base) == IterableClassWatcher: + warnings.warn( + f"GitPython Iterable subclassed by {name}. " + "Iterable is deprecated due to naming clash since v3.1.18" + " and will be removed in 3.1.20, " + "Use IterableObj instead \n", + DeprecationWarning, + stacklevel=2, + ) + + +class Iterable(metaclass=IterableClassWatcher): + + """Defines an interface for iterable items which is to assure a uniform + way to retrieve and iterate items within the git repository""" + + __slots__ = () + _id_attribute_ = "attribute that most suitably identifies your instance" + + @classmethod + def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any: + """ + Deprecated, use IterableObj instead. + Find all items of this type - subclasses can specify args and kwargs differently. + If no args are given, subclasses are obliged to return all items if no additional + arguments arg given. + + :note: Favor the iter_items method as it will + + :return: list(Item,...) list of item instances""" + out_list: Any = IterableList(cls._id_attribute_) + out_list.extend(cls.iter_items(repo, *args, **kwargs)) + return out_list + + @classmethod + def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any: + # return typed to be compatible with subtypes e.g. Remote + """For more information about the arguments, see list_items + :return: iterator yielding Items""" + raise NotImplementedError("To be implemented by Subclass") + + +@runtime_checkable +class IterableObj(Protocol): + """Defines an interface for iterable items which is to assure a uniform + way to retrieve and iterate items within the git repository + + Subclasses = [Submodule, Commit, Reference, PushInfo, FetchInfo, Remote]""" + + __slots__ = () + _id_attribute_: str + + @classmethod + def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> IterableList[T_IterableObj]: + """ + Find all items of this type - subclasses can specify args and kwargs differently. + If no args are given, subclasses are obliged to return all items if no additional + arguments arg given. + + :note: Favor the iter_items method as it will + + :return: list(Item,...) list of item instances""" + out_list: IterableList = IterableList(cls._id_attribute_) + out_list.extend(cls.iter_items(repo, *args, **kwargs)) + return out_list + + @classmethod + @abstractmethod + def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Iterator[T_IterableObj]: # Iterator[T_IterableObj]: + # return typed to be compatible with subtypes e.g. Remote + """For more information about the arguments, see list_items + :return: iterator yielding Items""" + raise NotImplementedError("To be implemented by Subclass") + + +# } END classes + + +class NullHandler(logging.Handler): + def emit(self, record: object) -> None: + pass |