| | |
| | |
| | |
| | |
| |
|
| | """Parser for reading and writing configuration files.""" |
| |
|
| | __all__ = ["GitConfigParser", "SectionConstraint"] |
| |
|
| | import abc |
| | import configparser as cp |
| | import fnmatch |
| | from functools import wraps |
| | import inspect |
| | from io import BufferedReader, IOBase |
| | import logging |
| | import os |
| | import os.path as osp |
| | import re |
| | import sys |
| |
|
| | from git.compat import defenc, force_text |
| | from git.util import LockFile |
| |
|
| | |
| |
|
| | from typing import ( |
| | Any, |
| | Callable, |
| | Generic, |
| | IO, |
| | List, |
| | Dict, |
| | Sequence, |
| | TYPE_CHECKING, |
| | Tuple, |
| | TypeVar, |
| | Union, |
| | cast, |
| | ) |
| |
|
| | from git.types import Lit_config_levels, ConfigLevels_Tup, PathLike, assert_never, _T |
| |
|
| | if TYPE_CHECKING: |
| | from io import BytesIO |
| |
|
| | from git.repo.base import Repo |
| |
|
| | T_ConfigParser = TypeVar("T_ConfigParser", bound="GitConfigParser") |
| | T_OMD_value = TypeVar("T_OMD_value", str, bytes, int, float, bool) |
| |
|
| | if sys.version_info[:3] < (3, 7, 2): |
| | |
| | from collections import OrderedDict |
| |
|
| | OrderedDict_OMD = OrderedDict |
| | else: |
| | from typing import OrderedDict |
| |
|
| | OrderedDict_OMD = OrderedDict[str, List[T_OMD_value]] |
| |
|
| | |
| |
|
| | _logger = logging.getLogger(__name__) |
| |
|
| | CONFIG_LEVELS: ConfigLevels_Tup = ("system", "user", "global", "repository") |
| | """The configuration level of a configuration file.""" |
| |
|
| | CONDITIONAL_INCLUDE_REGEXP = re.compile(r"(?<=includeIf )\"(gitdir|gitdir/i|onbranch):(.+)\"") |
| | """Section pattern to detect conditional includes. |
| | |
| | See: https://git-scm.com/docs/git-config#_conditional_includes |
| | """ |
| |
|
| |
|
| | class MetaParserBuilder(abc.ABCMeta): |
| | """Utility class wrapping base-class methods into decorators that assure read-only |
| | properties.""" |
| |
|
| | def __new__(cls, name: str, bases: Tuple, clsdict: Dict[str, Any]) -> "MetaParserBuilder": |
| | """Equip all base-class methods with a needs_values decorator, and all non-const |
| | methods with a :func:`set_dirty_and_flush_changes` decorator in addition to |
| | that. |
| | """ |
| | kmm = "_mutating_methods_" |
| | if kmm in clsdict: |
| | mutating_methods = clsdict[kmm] |
| | for base in bases: |
| | methods = (t for t in inspect.getmembers(base, inspect.isroutine) if not t[0].startswith("_")) |
| | for name, method in methods: |
| | if name in clsdict: |
| | continue |
| | method_with_values = needs_values(method) |
| | if name in mutating_methods: |
| | method_with_values = set_dirty_and_flush_changes(method_with_values) |
| | |
| |
|
| | clsdict[name] = method_with_values |
| | |
| | |
| | |
| |
|
| | new_type = super().__new__(cls, name, bases, clsdict) |
| | return new_type |
| |
|
| |
|
| | def needs_values(func: Callable[..., _T]) -> Callable[..., _T]: |
| | """Return a method for ensuring we read values (on demand) before we try to access |
| | them.""" |
| |
|
| | @wraps(func) |
| | def assure_data_present(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T: |
| | self.read() |
| | return func(self, *args, **kwargs) |
| |
|
| | |
| | return assure_data_present |
| |
|
| |
|
| | def set_dirty_and_flush_changes(non_const_func: Callable[..., _T]) -> Callable[..., _T]: |
| | """Return a method that checks whether given non constant function may be called. |
| | |
| | If so, the instance will be set dirty. Additionally, we flush the changes right to |
| | disk. |
| | """ |
| |
|
| | def flush_changes(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T: |
| | rval = non_const_func(self, *args, **kwargs) |
| | self._dirty = True |
| | self.write() |
| | return rval |
| |
|
| | |
| | flush_changes.__name__ = non_const_func.__name__ |
| | return flush_changes |
| |
|
| |
|
| | class SectionConstraint(Generic[T_ConfigParser]): |
| | """Constrains a ConfigParser to only option commands which are constrained to |
| | always use the section we have been initialized with. |
| | |
| | It supports all ConfigParser methods that operate on an option. |
| | |
| | :note: |
| | If used as a context manager, will release the wrapped ConfigParser. |
| | """ |
| |
|
| | __slots__ = ("_config", "_section_name") |
| |
|
| | _valid_attrs_ = ( |
| | "get_value", |
| | "set_value", |
| | "get", |
| | "set", |
| | "getint", |
| | "getfloat", |
| | "getboolean", |
| | "has_option", |
| | "remove_section", |
| | "remove_option", |
| | "options", |
| | ) |
| |
|
| | def __init__(self, config: T_ConfigParser, section: str) -> None: |
| | self._config = config |
| | self._section_name = section |
| |
|
| | def __del__(self) -> None: |
| | |
| | |
| | |
| | self._config.release() |
| |
|
| | def __getattr__(self, attr: str) -> Any: |
| | if attr in self._valid_attrs_: |
| | return lambda *args, **kwargs: self._call_config(attr, *args, **kwargs) |
| | return super().__getattribute__(attr) |
| |
|
| | def _call_config(self, method: str, *args: Any, **kwargs: Any) -> Any: |
| | """Call the configuration at the given method which must take a section name as |
| | first argument.""" |
| | return getattr(self._config, method)(self._section_name, *args, **kwargs) |
| |
|
| | @property |
| | def config(self) -> T_ConfigParser: |
| | """return: ConfigParser instance we constrain""" |
| | return self._config |
| |
|
| | def release(self) -> None: |
| | """Equivalent to :meth:`GitConfigParser.release`, which is called on our |
| | underlying parser instance.""" |
| | return self._config.release() |
| |
|
| | def __enter__(self) -> "SectionConstraint[T_ConfigParser]": |
| | self._config.__enter__() |
| | return self |
| |
|
| | def __exit__(self, exception_type: str, exception_value: str, traceback: str) -> None: |
| | self._config.__exit__(exception_type, exception_value, traceback) |
| |
|
| |
|
| | class _OMD(OrderedDict_OMD): |
| | """Ordered multi-dict.""" |
| |
|
| | def __setitem__(self, key: str, value: _T) -> None: |
| | super().__setitem__(key, [value]) |
| |
|
| | def add(self, key: str, value: Any) -> None: |
| | if key not in self: |
| | super().__setitem__(key, [value]) |
| | return |
| |
|
| | super().__getitem__(key).append(value) |
| |
|
| | def setall(self, key: str, values: List[_T]) -> None: |
| | super().__setitem__(key, values) |
| |
|
| | def __getitem__(self, key: str) -> Any: |
| | return super().__getitem__(key)[-1] |
| |
|
| | def getlast(self, key: str) -> Any: |
| | return super().__getitem__(key)[-1] |
| |
|
| | def setlast(self, key: str, value: Any) -> None: |
| | if key not in self: |
| | super().__setitem__(key, [value]) |
| | return |
| |
|
| | prior = super().__getitem__(key) |
| | prior[-1] = value |
| |
|
| | def get(self, key: str, default: Union[_T, None] = None) -> Union[_T, None]: |
| | return super().get(key, [default])[-1] |
| |
|
| | def getall(self, key: str) -> List[_T]: |
| | return super().__getitem__(key) |
| |
|
| | def items(self) -> List[Tuple[str, _T]]: |
| | """List of (key, last value for key).""" |
| | return [(k, self[k]) for k in self] |
| |
|
| | def items_all(self) -> List[Tuple[str, List[_T]]]: |
| | """List of (key, list of values for key).""" |
| | return [(k, self.getall(k)) for k in self] |
| |
|
| |
|
| | def get_config_path(config_level: Lit_config_levels) -> str: |
| | |
| | |
| | if sys.platform == "win32" and config_level == "system": |
| | config_level = "global" |
| |
|
| | if config_level == "system": |
| | return "/etc/gitconfig" |
| | elif config_level == "user": |
| | config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", "~"), ".config") |
| | return osp.normpath(osp.expanduser(osp.join(config_home, "git", "config"))) |
| | elif config_level == "global": |
| | return osp.normpath(osp.expanduser("~/.gitconfig")) |
| | elif config_level == "repository": |
| | raise ValueError("No repo to get repository configuration from. Use Repo._get_config_path") |
| | else: |
| | |
| | |
| | assert_never( |
| | config_level, |
| | ValueError(f"Invalid configuration level: {config_level!r}"), |
| | ) |
| |
|
| |
|
| | class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder): |
| | """Implements specifics required to read git style configuration files. |
| | |
| | This variation behaves much like the :manpage:`git-config(1)` command, such that the |
| | configuration will be read on demand based on the filepath given during |
| | initialization. |
| | |
| | The changes will automatically be written once the instance goes out of scope, but |
| | can be triggered manually as well. |
| | |
| | The configuration file will be locked if you intend to change values preventing |
| | other instances to write concurrently. |
| | |
| | :note: |
| | The config is case-sensitive even when queried, hence section and option names |
| | must match perfectly. |
| | |
| | :note: |
| | If used as a context manager, this will release the locked file. |
| | """ |
| |
|
| | |
| | t_lock = LockFile |
| | """The lock type determines the type of lock to use in new configuration readers. |
| | |
| | They must be compatible to the :class:`~git.util.LockFile` interface. |
| | A suitable alternative would be the :class:`~git.util.BlockingLockFile`. |
| | """ |
| |
|
| | re_comment = re.compile(r"^\s*[#;]") |
| | |
| |
|
| | optvalueonly_source = r"\s*(?P<option>[^:=\s][^:=]*)" |
| |
|
| | OPTVALUEONLY = re.compile(optvalueonly_source) |
| |
|
| | OPTCRE = re.compile(optvalueonly_source + r"\s*(?P<vi>[:=])\s*" + r"(?P<value>.*)$") |
| |
|
| | del optvalueonly_source |
| |
|
| | _mutating_methods_ = ("add_section", "remove_section", "remove_option", "set") |
| | """Names of :class:`~configparser.RawConfigParser` methods able to change the |
| | instance.""" |
| |
|
| | def __init__( |
| | self, |
| | file_or_files: Union[None, PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]] = None, |
| | read_only: bool = True, |
| | merge_includes: bool = True, |
| | config_level: Union[Lit_config_levels, None] = None, |
| | repo: Union["Repo", None] = None, |
| | ) -> None: |
| | """Initialize a configuration reader to read the given `file_or_files` and to |
| | possibly allow changes to it by setting `read_only` False. |
| | |
| | :param file_or_files: |
| | A file path or file object, or a sequence of possibly more than one of them. |
| | |
| | :param read_only: |
| | If ``True``, the ConfigParser may only read the data, but not change it. |
| | If ``False``, only a single file path or file object may be given. We will |
| | write back the changes when they happen, or when the ConfigParser is |
| | released. This will not happen if other configuration files have been |
| | included. |
| | |
| | :param merge_includes: |
| | If ``True``, we will read files mentioned in ``[include]`` sections and |
| | merge their contents into ours. This makes it impossible to write back an |
| | individual configuration file. Thus, if you want to modify a single |
| | configuration file, turn this off to leave the original dataset unaltered |
| | when reading it. |
| | |
| | :param repo: |
| | Reference to repository to use if ``[includeIf]`` sections are found in |
| | configuration files. |
| | """ |
| | cp.RawConfigParser.__init__(self, dict_type=_OMD) |
| | self._dict: Callable[..., _OMD] |
| | self._defaults: _OMD |
| | self._sections: _OMD |
| |
|
| | |
| | |
| | if not hasattr(self, "_proxies"): |
| | self._proxies = self._dict() |
| |
|
| | if file_or_files is not None: |
| | self._file_or_files: Union[PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]] = file_or_files |
| | else: |
| | if config_level is None: |
| | if read_only: |
| | self._file_or_files = [ |
| | get_config_path(cast(Lit_config_levels, f)) for f in CONFIG_LEVELS if f != "repository" |
| | ] |
| | else: |
| | raise ValueError("No configuration level or configuration files specified") |
| | else: |
| | self._file_or_files = [get_config_path(config_level)] |
| |
|
| | self._read_only = read_only |
| | self._dirty = False |
| | self._is_initialized = False |
| | self._merge_includes = merge_includes |
| | self._repo = repo |
| | self._lock: Union["LockFile", None] = None |
| | self._acquire_lock() |
| |
|
| | def _acquire_lock(self) -> None: |
| | if not self._read_only: |
| | if not self._lock: |
| | if isinstance(self._file_or_files, (str, os.PathLike)): |
| | file_or_files = self._file_or_files |
| | elif isinstance(self._file_or_files, (tuple, list, Sequence)): |
| | raise ValueError( |
| | "Write-ConfigParsers can operate on a single file only, multiple files have been passed" |
| | ) |
| | else: |
| | file_or_files = self._file_or_files.name |
| |
|
| | |
| | |
| | self._lock = self.t_lock(file_or_files) |
| | |
| |
|
| | self._lock._obtain_lock() |
| | |
| |
|
| | def __del__(self) -> None: |
| | """Write pending changes if required and release locks.""" |
| | |
| | self.release() |
| |
|
| | def __enter__(self) -> "GitConfigParser": |
| | self._acquire_lock() |
| | return self |
| |
|
| | def __exit__(self, *args: Any) -> None: |
| | self.release() |
| |
|
| | def release(self) -> None: |
| | """Flush changes and release the configuration write lock. This instance must |
| | not be used anymore afterwards. |
| | |
| | In Python 3, it's required to explicitly release locks and flush changes, as |
| | ``__del__`` is not called deterministically anymore. |
| | """ |
| | |
| | |
| | if self.read_only or (self._lock and not self._lock._has_lock()): |
| | return |
| |
|
| | try: |
| | self.write() |
| | except IOError: |
| | _logger.error("Exception during destruction of GitConfigParser", exc_info=True) |
| | except ReferenceError: |
| | |
| | |
| | |
| | pass |
| | finally: |
| | if self._lock is not None: |
| | self._lock._release_lock() |
| |
|
| | def optionxform(self, optionstr: str) -> str: |
| | """Do not transform options in any way when writing.""" |
| | return optionstr |
| |
|
| | def _read(self, fp: Union[BufferedReader, IO[bytes]], fpname: str) -> None: |
| | """Originally a direct copy of the Python 2.4 version of |
| | :meth:`RawConfigParser._read <configparser.RawConfigParser._read>`, to ensure it |
| | uses ordered dicts. |
| | |
| | The ordering bug was fixed in Python 2.4, and dict itself keeps ordering since |
| | Python 3.7. This has some other changes, especially that it ignores initial |
| | whitespace, since git uses tabs. (Big comments are removed to be more compact.) |
| | """ |
| | cursect = None |
| | optname = None |
| | lineno = 0 |
| | is_multi_line = False |
| | e = None |
| |
|
| | def string_decode(v: str) -> str: |
| | if v and v.endswith("\\"): |
| | v = v[:-1] |
| | |
| |
|
| | return v.encode(defenc).decode("unicode_escape") |
| |
|
| | |
| |
|
| | while True: |
| | |
| | line = fp.readline().decode(defenc) |
| | if not line: |
| | break |
| | lineno = lineno + 1 |
| | |
| | if line.strip() == "" or self.re_comment.match(line): |
| | continue |
| | if line.split(None, 1)[0].lower() == "rem" and line[0] in "rR": |
| | |
| | continue |
| |
|
| | |
| | mo = self.SECTCRE.match(line.strip()) |
| | if not is_multi_line and mo: |
| | sectname: str = mo.group("header").strip() |
| | if sectname in self._sections: |
| | cursect = self._sections[sectname] |
| | elif sectname == cp.DEFAULTSECT: |
| | cursect = self._defaults |
| | else: |
| | cursect = self._dict((("__name__", sectname),)) |
| | self._sections[sectname] = cursect |
| | self._proxies[sectname] = None |
| | |
| | optname = None |
| | |
| | elif cursect is None: |
| | raise cp.MissingSectionHeaderError(fpname, lineno, line) |
| | |
| | elif not is_multi_line: |
| | mo = self.OPTCRE.match(line) |
| | if mo: |
| | |
| | optname, vi, optval = mo.group("option", "vi", "value") |
| | if vi in ("=", ":") and ";" in optval and not optval.strip().startswith('"'): |
| | pos = optval.find(";") |
| | if pos != -1 and optval[pos - 1].isspace(): |
| | optval = optval[:pos] |
| | optval = optval.strip() |
| | if optval == '""': |
| | optval = "" |
| | |
| | optname = self.optionxform(optname.rstrip()) |
| | if len(optval) > 1 and optval[0] == '"' and optval[-1] != '"': |
| | is_multi_line = True |
| | optval = string_decode(optval[1:]) |
| | |
| | |
| | cursect.add(optname, optval) |
| | else: |
| | |
| | if not self.OPTVALUEONLY.match(line): |
| | if not e: |
| | e = cp.ParsingError(fpname) |
| | e.append(lineno, repr(line)) |
| | continue |
| | else: |
| | line = line.rstrip() |
| | if line.endswith('"'): |
| | is_multi_line = False |
| | line = line[:-1] |
| | |
| | optval = cursect.getlast(optname) |
| | cursect.setlast(optname, optval + string_decode(line)) |
| | |
| | |
| |
|
| | |
| | if e: |
| | raise e |
| |
|
| | def _has_includes(self) -> Union[bool, int]: |
| | return self._merge_includes and len(self._included_paths()) |
| |
|
| | def _included_paths(self) -> List[Tuple[str, str]]: |
| | """List all paths that must be included to configuration. |
| | |
| | :return: |
| | The list of paths, where each path is a tuple of (option, value). |
| | """ |
| | paths = [] |
| |
|
| | for section in self.sections(): |
| | if section == "include": |
| | paths += self.items(section) |
| |
|
| | match = CONDITIONAL_INCLUDE_REGEXP.search(section) |
| | if match is None or self._repo is None: |
| | continue |
| |
|
| | keyword = match.group(1) |
| | value = match.group(2).strip() |
| |
|
| | if keyword in ["gitdir", "gitdir/i"]: |
| | value = osp.expanduser(value) |
| |
|
| | if not any(value.startswith(s) for s in ["./", "/"]): |
| | value = "**/" + value |
| | if value.endswith("/"): |
| | value += "**" |
| |
|
| | |
| | if keyword.endswith("/i"): |
| | value = re.sub( |
| | r"[a-zA-Z]", |
| | lambda m: "[{}{}]".format(m.group().lower(), m.group().upper()), |
| | value, |
| | ) |
| | if self._repo.git_dir: |
| | if fnmatch.fnmatchcase(str(self._repo.git_dir), value): |
| | paths += self.items(section) |
| |
|
| | elif keyword == "onbranch": |
| | try: |
| | branch_name = self._repo.active_branch.name |
| | except TypeError: |
| | |
| | continue |
| |
|
| | if fnmatch.fnmatchcase(branch_name, value): |
| | paths += self.items(section) |
| |
|
| | return paths |
| |
|
| | def read(self) -> None: |
| | """Read the data stored in the files we have been initialized with. |
| | |
| | This will ignore files that cannot be read, possibly leaving an empty |
| | configuration. |
| | |
| | :raise IOError: |
| | If a file cannot be handled. |
| | """ |
| | if self._is_initialized: |
| | return |
| | self._is_initialized = True |
| |
|
| | files_to_read: List[Union[PathLike, IO]] = [""] |
| | if isinstance(self._file_or_files, (str, os.PathLike)): |
| | |
| | files_to_read = [self._file_or_files] |
| | elif not isinstance(self._file_or_files, (tuple, list, Sequence)): |
| | |
| | files_to_read = [self._file_or_files] |
| | else: |
| | files_to_read = list(self._file_or_files) |
| | |
| |
|
| | seen = set(files_to_read) |
| | num_read_include_files = 0 |
| | while files_to_read: |
| | file_path = files_to_read.pop(0) |
| | file_ok = False |
| |
|
| | if hasattr(file_path, "seek"): |
| | |
| | |
| | file_path = cast(IO[bytes], file_path) |
| | self._read(file_path, file_path.name) |
| | else: |
| | |
| | file_path = cast(PathLike, file_path) |
| | try: |
| | with open(file_path, "rb") as fp: |
| | file_ok = True |
| | self._read(fp, fp.name) |
| | except IOError: |
| | continue |
| |
|
| | |
| | |
| | if self._has_includes(): |
| | for _, include_path in self._included_paths(): |
| | if include_path.startswith("~"): |
| | include_path = osp.expanduser(include_path) |
| | if not osp.isabs(include_path): |
| | if not file_ok: |
| | continue |
| | |
| | file_path = cast(PathLike, file_path) |
| | assert osp.isabs(file_path), "Need absolute paths to be sure our cycle checks will work" |
| | include_path = osp.join(osp.dirname(file_path), include_path) |
| | |
| | include_path = osp.normpath(include_path) |
| | if include_path in seen or not os.access(include_path, os.R_OK): |
| | continue |
| | seen.add(include_path) |
| | |
| | files_to_read.insert(0, include_path) |
| | num_read_include_files += 1 |
| | |
| | |
| | |
| |
|
| | |
| | |
| | if num_read_include_files == 0: |
| | self._merge_includes = False |
| |
|
| | def _write(self, fp: IO) -> None: |
| | """Write an .ini-format representation of the configuration state in |
| | git compatible format.""" |
| |
|
| | def write_section(name: str, section_dict: _OMD) -> None: |
| | fp.write(("[%s]\n" % name).encode(defenc)) |
| |
|
| | values: Sequence[str] |
| | v: str |
| | for key, values in section_dict.items_all(): |
| | if key == "__name__": |
| | continue |
| |
|
| | for v in values: |
| | fp.write(("\t%s = %s\n" % (key, self._value_to_string(v).replace("\n", "\n\t"))).encode(defenc)) |
| | |
| |
|
| | |
| |
|
| | if self._defaults: |
| | write_section(cp.DEFAULTSECT, self._defaults) |
| | value: _OMD |
| |
|
| | for name, value in self._sections.items(): |
| | write_section(name, value) |
| |
|
| | def items(self, section_name: str) -> List[Tuple[str, str]]: |
| | """:return: list((option, value), ...) pairs of all items in the given section""" |
| | return [(k, v) for k, v in super().items(section_name) if k != "__name__"] |
| |
|
| | def items_all(self, section_name: str) -> List[Tuple[str, List[str]]]: |
| | """:return: list((option, [values...]), ...) pairs of all items in the given section""" |
| | rv = _OMD(self._defaults) |
| |
|
| | for k, vs in self._sections[section_name].items_all(): |
| | if k == "__name__": |
| | continue |
| |
|
| | if k in rv and rv.getall(k) == vs: |
| | continue |
| |
|
| | for v in vs: |
| | rv.add(k, v) |
| |
|
| | return rv.items_all() |
| |
|
| | @needs_values |
| | def write(self) -> None: |
| | """Write changes to our file, if there are changes at all. |
| | |
| | :raise IOError: |
| | If this is a read-only writer instance or if we could not obtain a file |
| | lock. |
| | """ |
| | self._assure_writable("write") |
| | if not self._dirty: |
| | return |
| |
|
| | if isinstance(self._file_or_files, (list, tuple)): |
| | raise AssertionError( |
| | "Cannot write back if there is not exactly a single file to write to, have %i files" |
| | % len(self._file_or_files) |
| | ) |
| | |
| |
|
| | if self._has_includes(): |
| | _logger.debug( |
| | "Skipping write-back of configuration file as include files were merged in." |
| | + "Set merge_includes=False to prevent this." |
| | ) |
| | return |
| | |
| |
|
| | fp = self._file_or_files |
| |
|
| | |
| | is_file_lock = isinstance(fp, (str, os.PathLike, IOBase)) |
| | if is_file_lock and self._lock is not None: |
| | self._lock._obtain_lock() |
| |
|
| | if not hasattr(fp, "seek"): |
| | fp = cast(PathLike, fp) |
| | with open(fp, "wb") as fp_open: |
| | self._write(fp_open) |
| | else: |
| | fp = cast("BytesIO", fp) |
| | fp.seek(0) |
| | |
| | if hasattr(fp, "truncate"): |
| | fp.truncate() |
| | self._write(fp) |
| |
|
| | def _assure_writable(self, method_name: str) -> None: |
| | if self.read_only: |
| | raise IOError("Cannot execute non-constant method %s.%s" % (self, method_name)) |
| |
|
| | def add_section(self, section: str) -> None: |
| | """Assures added options will stay in order.""" |
| | return super().add_section(section) |
| |
|
| | @property |
| | def read_only(self) -> bool: |
| | """:return: ``True`` if this instance may change the configuration file""" |
| | return self._read_only |
| |
|
| | |
| | def get_value( |
| | self, |
| | section: str, |
| | option: str, |
| | default: Union[int, float, str, bool, None] = None, |
| | ) -> Union[int, float, str, bool]: |
| | """Get an option's value. |
| | |
| | If multiple values are specified for this option in the section, the last one |
| | specified is returned. |
| | |
| | :param default: |
| | If not ``None``, the given default value will be returned in case the option |
| | did not exist. |
| | |
| | :return: |
| | A properly typed value, either int, float or string |
| | |
| | :raise TypeError: |
| | In case the value could not be understood. |
| | Otherwise the exceptions known to the ConfigParser will be raised. |
| | """ |
| | try: |
| | valuestr = self.get(section, option) |
| | except Exception: |
| | if default is not None: |
| | return default |
| | raise |
| |
|
| | return self._string_to_value(valuestr) |
| |
|
| | def get_values( |
| | self, |
| | section: str, |
| | option: str, |
| | default: Union[int, float, str, bool, None] = None, |
| | ) -> List[Union[int, float, str, bool]]: |
| | """Get an option's values. |
| | |
| | If multiple values are specified for this option in the section, all are |
| | returned. |
| | |
| | :param default: |
| | If not ``None``, a list containing the given default value will be returned |
| | in case the option did not exist. |
| | |
| | :return: |
| | A list of properly typed values, either int, float or string |
| | |
| | :raise TypeError: |
| | In case the value could not be understood. |
| | Otherwise the exceptions known to the ConfigParser will be raised. |
| | """ |
| | try: |
| | self.sections() |
| | lst = self._sections[section].getall(option) |
| | except Exception: |
| | if default is not None: |
| | return [default] |
| | raise |
| |
|
| | return [self._string_to_value(valuestr) for valuestr in lst] |
| |
|
| | def _string_to_value(self, valuestr: str) -> Union[int, float, str, bool]: |
| | types = (int, float) |
| | for numtype in types: |
| | try: |
| | val = numtype(valuestr) |
| | |
| | if val != float(valuestr): |
| | continue |
| | return val |
| | except (ValueError, TypeError): |
| | continue |
| | |
| |
|
| | |
| | vl = valuestr.lower() |
| | if vl == "false": |
| | return False |
| | if vl == "true": |
| | return True |
| |
|
| | if not isinstance(valuestr, str): |
| | raise TypeError( |
| | "Invalid value type: only int, long, float and str are allowed", |
| | valuestr, |
| | ) |
| |
|
| | return valuestr |
| |
|
| | def _value_to_string(self, value: Union[str, bytes, int, float, bool]) -> str: |
| | if isinstance(value, (int, float, bool)): |
| | return str(value) |
| | return force_text(value) |
| |
|
| | @needs_values |
| | @set_dirty_and_flush_changes |
| | def set_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> "GitConfigParser": |
| | """Set the given option in section to the given value. |
| | |
| | This will create the section if required, and will not throw as opposed to the |
| | default ConfigParser ``set`` method. |
| | |
| | :param section: |
| | Name of the section in which the option resides or should reside. |
| | |
| | :param option: |
| | Name of the options whose value to set. |
| | |
| | :param value: |
| | Value to set the option to. It must be a string or convertible to a string. |
| | |
| | :return: |
| | This instance |
| | """ |
| | if not self.has_section(section): |
| | self.add_section(section) |
| | self.set(section, option, self._value_to_string(value)) |
| | return self |
| |
|
| | @needs_values |
| | @set_dirty_and_flush_changes |
| | def add_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> "GitConfigParser": |
| | """Add a value for the given option in section. |
| | |
| | This will create the section if required, and will not throw as opposed to the |
| | default ConfigParser ``set`` method. The value becomes the new value of the |
| | option as returned by :meth:`get_value`, and appends to the list of values |
| | returned by :meth:`get_values`. |
| | |
| | :param section: |
| | Name of the section in which the option resides or should reside. |
| | |
| | :param option: |
| | Name of the option. |
| | |
| | :param value: |
| | Value to add to option. It must be a string or convertible to a string. |
| | |
| | :return: |
| | This instance |
| | """ |
| | if not self.has_section(section): |
| | self.add_section(section) |
| | self._sections[section].add(option, self._value_to_string(value)) |
| | return self |
| |
|
| | def rename_section(self, section: str, new_name: str) -> "GitConfigParser": |
| | """Rename the given section to `new_name`. |
| | |
| | :raise ValueError: |
| | If: |
| | |
| | * `section` doesn't exist. |
| | * A section with `new_name` does already exist. |
| | |
| | :return: |
| | This instance |
| | """ |
| | if not self.has_section(section): |
| | raise ValueError("Source section '%s' doesn't exist" % section) |
| | if self.has_section(new_name): |
| | raise ValueError("Destination section '%s' already exists" % new_name) |
| |
|
| | super().add_section(new_name) |
| | new_section = self._sections[new_name] |
| | for k, vs in self.items_all(section): |
| | new_section.setall(k, vs) |
| | |
| |
|
| | |
| | |
| | self.remove_section(section) |
| | return self |
| |
|