| | |
| | |
| |
|
| | """Standalone functions to accompany the index implementation and make it more |
| | versatile.""" |
| |
|
| | __all__ = [ |
| | "write_cache", |
| | "read_cache", |
| | "write_tree_from_cache", |
| | "entry_key", |
| | "stat_mode_to_index_mode", |
| | "S_IFGITLINK", |
| | "run_commit_hook", |
| | "hook_path", |
| | ] |
| |
|
| | from io import BytesIO |
| | import os |
| | import os.path as osp |
| | from pathlib import Path |
| | from stat import S_IFDIR, S_IFLNK, S_IFMT, S_IFREG, S_ISDIR, S_ISLNK, S_IXUSR |
| | import subprocess |
| | import sys |
| |
|
| | from gitdb.base import IStream |
| | from gitdb.typ import str_tree_type |
| |
|
| | from git.cmd import handle_process_output, safer_popen |
| | from git.compat import defenc, force_bytes, force_text, safe_decode |
| | from git.exc import HookExecutionError, UnmergedEntriesError |
| | from git.objects.fun import ( |
| | traverse_tree_recursive, |
| | traverse_trees_recursive, |
| | tree_to_stream, |
| | ) |
| | from git.util import IndexFileSHA1Writer, finalize_process |
| |
|
| | from .typ import BaseIndexEntry, IndexEntry, CE_NAMEMASK, CE_STAGESHIFT |
| | from .util import pack, unpack |
| |
|
| | |
| |
|
| | from typing import Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast |
| |
|
| | from git.types import PathLike |
| |
|
| | if TYPE_CHECKING: |
| | from git.db import GitCmdObjectDB |
| | from git.objects.tree import TreeCacheTup |
| |
|
| | from .base import IndexFile |
| |
|
| | |
| |
|
| | S_IFGITLINK = S_IFLNK | S_IFDIR |
| | """Flags for a submodule.""" |
| |
|
| | CE_NAMEMASK_INV = ~CE_NAMEMASK |
| |
|
| |
|
| | def hook_path(name: str, git_dir: PathLike) -> str: |
| | """:return: path to the given named hook in the given git repository directory""" |
| | return osp.join(git_dir, "hooks", name) |
| |
|
| |
|
| | def _has_file_extension(path: str) -> str: |
| | return osp.splitext(path)[1] |
| |
|
| |
|
| | def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None: |
| | """Run the commit hook of the given name. Silently ignore hooks that do not exist. |
| | |
| | :param name: |
| | Name of hook, like ``pre-commit``. |
| | |
| | :param index: |
| | :class:`~git.index.base.IndexFile` instance. |
| | |
| | :param args: |
| | Arguments passed to hook file. |
| | |
| | :raise git.exc.HookExecutionError: |
| | """ |
| | hp = hook_path(name, index.repo.git_dir) |
| | if not os.access(hp, os.X_OK): |
| | return |
| |
|
| | env = os.environ.copy() |
| | env["GIT_INDEX_FILE"] = safe_decode(str(index.path)) |
| | env["GIT_EDITOR"] = ":" |
| | cmd = [hp] |
| | try: |
| | if sys.platform == "win32" and not _has_file_extension(hp): |
| | |
| | |
| | relative_hp = Path(hp).relative_to(index.repo.working_dir).as_posix() |
| | cmd = ["bash.exe", relative_hp] |
| |
|
| | process = safer_popen( |
| | cmd + list(args), |
| | env=env, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | cwd=index.repo.working_dir, |
| | ) |
| | except Exception as ex: |
| | raise HookExecutionError(hp, ex) from ex |
| | else: |
| | stdout_list: List[str] = [] |
| | stderr_list: List[str] = [] |
| | handle_process_output(process, stdout_list.append, stderr_list.append, finalize_process) |
| | stdout = "".join(stdout_list) |
| | stderr = "".join(stderr_list) |
| | if process.returncode != 0: |
| | stdout = force_text(stdout, defenc) |
| | stderr = force_text(stderr, defenc) |
| | raise HookExecutionError(hp, process.returncode, stderr, stdout) |
| | |
| |
|
| |
|
| | def stat_mode_to_index_mode(mode: int) -> int: |
| | """Convert the given mode from a stat call to the corresponding index mode and |
| | return it.""" |
| | if S_ISLNK(mode): |
| | return S_IFLNK |
| | if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: |
| | return S_IFGITLINK |
| | return S_IFREG | (mode & S_IXUSR and 0o755 or 0o644) |
| |
|
| |
|
| | def write_cache( |
| | entries: Sequence[Union[BaseIndexEntry, "IndexEntry"]], |
| | stream: IO[bytes], |
| | extension_data: Union[None, bytes] = None, |
| | ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer, |
| | ) -> None: |
| | """Write the cache represented by entries to a stream. |
| | |
| | :param entries: |
| | **Sorted** list of entries. |
| | |
| | :param stream: |
| | Stream to wrap into the AdapterStreamCls - it is used for final output. |
| | |
| | :param ShaStreamCls: |
| | Type to use when writing to the stream. It produces a sha while writing to it, |
| | before the data is passed on to the wrapped stream. |
| | |
| | :param extension_data: |
| | Any kind of data to write as a trailer, it must begin a 4 byte identifier, |
| | followed by its size (4 bytes). |
| | """ |
| | |
| | stream_sha = ShaStreamCls(stream) |
| |
|
| | tell = stream_sha.tell |
| | write = stream_sha.write |
| |
|
| | |
| | version = 2 |
| | write(b"DIRC") |
| | write(pack(">LL", version, len(entries))) |
| |
|
| | |
| | for entry in entries: |
| | beginoffset = tell() |
| | write(entry.ctime_bytes) |
| | write(entry.mtime_bytes) |
| | path_str = str(entry.path) |
| | path: bytes = force_bytes(path_str, encoding=defenc) |
| | plen = len(path) & CE_NAMEMASK |
| | assert plen == len(path), "Path %s too long to fit into index" % entry.path |
| | flags = plen | (entry.flags & CE_NAMEMASK_INV) |
| | write( |
| | pack( |
| | ">LLLLLL20sH", |
| | entry.dev, |
| | entry.inode, |
| | entry.mode, |
| | entry.uid, |
| | entry.gid, |
| | entry.size, |
| | entry.binsha, |
| | flags, |
| | ) |
| | ) |
| | write(path) |
| | real_size = (tell() - beginoffset + 8) & ~7 |
| | write(b"\0" * ((beginoffset + real_size) - tell())) |
| | |
| |
|
| | |
| | if extension_data is not None: |
| | stream_sha.write(extension_data) |
| |
|
| | |
| | stream_sha.write_sha() |
| |
|
| |
|
| | def read_header(stream: IO[bytes]) -> Tuple[int, int]: |
| | """Return tuple(version_long, num_entries) from the given stream.""" |
| | type_id = stream.read(4) |
| | if type_id != b"DIRC": |
| | raise AssertionError("Invalid index file header: %r" % type_id) |
| | unpacked = cast(Tuple[int, int], unpack(">LL", stream.read(4 * 2))) |
| | version, num_entries = unpacked |
| |
|
| | |
| | assert version in (1, 2) |
| | return version, num_entries |
| |
|
| |
|
| | def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, int]: |
| | """ |
| | :return: |
| | Key suitable to be used for the |
| | :attr:`index.entries <git.index.base.IndexFile.entries>` dictionary. |
| | |
| | :param entry: |
| | One instance of type BaseIndexEntry or the path and the stage. |
| | """ |
| |
|
| | |
| | |
| |
|
| | if len(entry) == 1: |
| | entry_first = entry[0] |
| | assert isinstance(entry_first, BaseIndexEntry) |
| | return (entry_first.path, entry_first.stage) |
| | else: |
| | |
| | entry = cast(Tuple[PathLike, int], entry) |
| | return entry |
| | |
| |
|
| |
|
| | def read_cache( |
| | stream: IO[bytes], |
| | ) -> Tuple[int, Dict[Tuple[PathLike, int], "IndexEntry"], bytes, bytes]: |
| | """Read a cache file from the given stream. |
| | |
| | :return: |
| | tuple(version, entries_dict, extension_data, content_sha) |
| | |
| | * *version* is the integer version number. |
| | * *entries_dict* is a dictionary which maps IndexEntry instances to a path at a |
| | stage. |
| | * *extension_data* is ``""`` or 4 bytes of type + 4 bytes of size + size bytes. |
| | * *content_sha* is a 20 byte sha on all cache file contents. |
| | """ |
| | version, num_entries = read_header(stream) |
| | count = 0 |
| | entries: Dict[Tuple[PathLike, int], "IndexEntry"] = {} |
| |
|
| | read = stream.read |
| | tell = stream.tell |
| | while count < num_entries: |
| | beginoffset = tell() |
| | ctime = unpack(">8s", read(8))[0] |
| | mtime = unpack(">8s", read(8))[0] |
| | (dev, ino, mode, uid, gid, size, sha, flags) = unpack(">LLLLLL20sH", read(20 + 4 * 6 + 2)) |
| | path_size = flags & CE_NAMEMASK |
| | path = read(path_size).decode(defenc) |
| |
|
| | real_size = (tell() - beginoffset + 8) & ~7 |
| | read((beginoffset + real_size) - tell()) |
| | entry = IndexEntry((mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size)) |
| | |
| | entries[(path, entry.stage)] = entry |
| | count += 1 |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | extension_data = stream.read(~0) |
| | assert len(extension_data) > 19, ( |
| | "Index Footer was not at least a sha on content as it was only %i bytes in size" % len(extension_data) |
| | ) |
| |
|
| | content_sha = extension_data[-20:] |
| |
|
| | |
| | extension_data = extension_data[:-20] |
| |
|
| | return (version, entries, extension_data, content_sha) |
| |
|
| |
|
| | def write_tree_from_cache( |
| | entries: List[IndexEntry], odb: "GitCmdObjectDB", sl: slice, si: int = 0 |
| | ) -> Tuple[bytes, List["TreeCacheTup"]]: |
| | R"""Create a tree from the given sorted list of entries and put the respective |
| | trees into the given object database. |
| | |
| | :param entries: |
| | **Sorted** list of :class:`~git.index.typ.IndexEntry`\s. |
| | |
| | :param odb: |
| | Object database to store the trees in. |
| | |
| | :param si: |
| | Start index at which we should start creating subtrees. |
| | |
| | :param sl: |
| | Slice indicating the range we should process on the entries list. |
| | |
| | :return: |
| | tuple(binsha, list(tree_entry, ...)) |
| | |
| | A tuple of a sha and a list of tree entries being a tuple of hexsha, mode, name. |
| | """ |
| | tree_items: List["TreeCacheTup"] = [] |
| |
|
| | ci = sl.start |
| | end = sl.stop |
| | while ci < end: |
| | entry = entries[ci] |
| | if entry.stage != 0: |
| | raise UnmergedEntriesError(entry) |
| | |
| | ci += 1 |
| | rbound = entry.path.find("/", si) |
| | if rbound == -1: |
| | |
| | tree_items.append((entry.binsha, entry.mode, entry.path[si:])) |
| | else: |
| | |
| | base = entry.path[si:rbound] |
| | xi = ci |
| | while xi < end: |
| | oentry = entries[xi] |
| | orbound = oentry.path.find("/", si) |
| | if orbound == -1 or oentry.path[si:orbound] != base: |
| | break |
| | |
| | xi += 1 |
| | |
| |
|
| | |
| | |
| | sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1) |
| | tree_items.append((sha, S_IFDIR, base)) |
| |
|
| | |
| | ci = xi |
| | |
| | |
| |
|
| | |
| | sio = BytesIO() |
| | tree_to_stream(tree_items, sio.write) |
| | sio.seek(0) |
| |
|
| | istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio)) |
| | return (istream.binsha, tree_items) |
| |
|
| |
|
| | def _tree_entry_to_baseindexentry(tree_entry: "TreeCacheTup", stage: int) -> BaseIndexEntry: |
| | return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2])) |
| |
|
| |
|
| | def aggressive_tree_merge(odb: "GitCmdObjectDB", tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]: |
| | R""" |
| | :return: |
| | List of :class:`~git.index.typ.BaseIndexEntry`\s representing the aggressive |
| | merge of the given trees. All valid entries are on stage 0, whereas the |
| | conflicting ones are left on stage 1, 2 or 3, whereas stage 1 corresponds to the |
| | common ancestor tree, 2 to our tree and 3 to 'their' tree. |
| | |
| | :param tree_shas: |
| | 1, 2 or 3 trees as identified by their binary 20 byte shas. If 1 or two, the |
| | entries will effectively correspond to the last given tree. If 3 are given, a 3 |
| | way merge is performed. |
| | """ |
| | out: List[BaseIndexEntry] = [] |
| |
|
| | |
| | |
| | if len(tree_shas) in (1, 2): |
| | for entry in traverse_tree_recursive(odb, tree_shas[-1], ""): |
| | out.append(_tree_entry_to_baseindexentry(entry, 0)) |
| | |
| | return out |
| | |
| |
|
| | if len(tree_shas) > 3: |
| | raise ValueError("Cannot handle %i trees at once" % len(tree_shas)) |
| |
|
| | |
| | for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ""): |
| | if base is not None: |
| | |
| | if ours is not None: |
| | |
| | if theirs is not None: |
| | |
| | |
| | |
| | if (base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or ( |
| | base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1] |
| | ): |
| | |
| | out.append(_tree_entry_to_baseindexentry(base, 1)) |
| | out.append(_tree_entry_to_baseindexentry(ours, 2)) |
| | out.append(_tree_entry_to_baseindexentry(theirs, 3)) |
| | elif base[0] != ours[0] or base[1] != ours[1]: |
| | |
| | out.append(_tree_entry_to_baseindexentry(ours, 0)) |
| | else: |
| | |
| | |
| | out.append(_tree_entry_to_baseindexentry(theirs, 0)) |
| | |
| | else: |
| | if ours[0] != base[0] or ours[1] != base[1]: |
| | |
| | out.append(_tree_entry_to_baseindexentry(base, 1)) |
| | out.append(_tree_entry_to_baseindexentry(ours, 2)) |
| | |
| | |
| | |
| | |
| | |
| | else: |
| | if theirs is None: |
| | |
| | pass |
| | else: |
| | if theirs[0] != base[0] or theirs[1] != base[1]: |
| | |
| | out.append(_tree_entry_to_baseindexentry(base, 1)) |
| | out.append(_tree_entry_to_baseindexentry(theirs, 3)) |
| | |
| | |
| | |
| | |
| | |
| | |
| | else: |
| | |
| | if ours is None: |
| | |
| | assert theirs is not None |
| | out.append(_tree_entry_to_baseindexentry(theirs, 0)) |
| | elif theirs is None: |
| | |
| | out.append(_tree_entry_to_baseindexentry(ours, 0)) |
| | else: |
| | |
| | if ours[0] != theirs[0] or ours[1] != theirs[1]: |
| | out.append(_tree_entry_to_baseindexentry(ours, 2)) |
| | out.append(_tree_entry_to_baseindexentry(theirs, 3)) |
| | else: |
| | |
| | out.append(_tree_entry_to_baseindexentry(ours, 0)) |
| | |
| | |
| | |
| | |
| |
|
| | return out |
| |
|