| | |
| | |
| | import re |
| | from abc import ABCMeta, abstractmethod |
| | from pathlib import Path |
| | from typing import Optional, Union |
| |
|
| |
|
| | class BaseStorageBackend(metaclass=ABCMeta): |
| | """Abstract class of storage backends. |
| | |
| | All backends need to implement two apis: ``get()`` and ``get_text()``. |
| | ``get()`` reads the file as a byte stream and ``get_text()`` reads the file |
| | as texts. |
| | """ |
| |
|
| | @property |
| | def name(self) -> str: |
| | return self.__class__.__name__ |
| |
|
| | @abstractmethod |
| | def get(self, filepath: str) -> bytes: |
| | pass |
| |
|
| |
|
| | class PetrelBackend(BaseStorageBackend): |
| | """Petrel storage backend (for internal use). |
| | |
| | PetrelBackend supports reading and writing data to multiple clusters. |
| | If the file path contains the cluster name, PetrelBackend will read data |
| | from specified cluster or write data to it. Otherwise, PetrelBackend will |
| | access the default cluster. |
| | |
| | Args: |
| | path_mapping (dict, optional): Path mapping dict from local path to |
| | Petrel path. When ``path_mapping={'src': 'dst'}``, ``src`` in |
| | ``filepath`` will be replaced by ``dst``. Default: None. |
| | enable_mc (bool, optional): Whether to enable memcached support. |
| | Default: True. |
| | conf_path (str, optional): Config path of Petrel client. Default: None. |
| | `New in version 1.7.1`. |
| | |
| | Examples: |
| | >>> filepath1 = 's3://path/of/file' |
| | >>> filepath2 = 'cluster-name:s3://path/of/file' |
| | >>> client = PetrelBackend() |
| | >>> client.get(filepath1) # get data from default cluster |
| | >>> client.get(filepath2) # get data from 'cluster-name' cluster |
| | """ |
| |
|
| | def __init__(self, |
| | path_mapping: Optional[dict] = None, |
| | enable_mc: bool = False, |
| | conf_path: str = None): |
| | try: |
| | from petrel_client import client |
| | except ImportError: |
| | raise ImportError('Please install petrel_client to enable ' |
| | 'PetrelBackend.') |
| |
|
| | self._client = client.Client(conf_path=conf_path, enable_mc=enable_mc) |
| | assert isinstance(path_mapping, dict) or path_mapping is None |
| | self.path_mapping = path_mapping |
| |
|
| | def _map_path(self, filepath: Union[str, Path]) -> str: |
| | """Map ``filepath`` to a string path whose prefix will be replaced by |
| | :attr:`self.path_mapping`. |
| | |
| | Args: |
| | filepath (str): Path to be mapped. |
| | """ |
| | filepath = str(filepath) |
| | if self.path_mapping is not None: |
| | for k, v in self.path_mapping.items(): |
| | filepath = filepath.replace(k, v, 1) |
| | return filepath |
| |
|
| | def _format_path(self, filepath: str) -> str: |
| | """Convert a ``filepath`` to standard format of petrel oss. |
| | |
| | If the ``filepath`` is concatenated by ``os.path.join``, in a Windows |
| | environment, the ``filepath`` will be the format of |
| | 's3://bucket_name\\image.jpg'. By invoking :meth:`_format_path`, the |
| | above ``filepath`` will be converted to 's3://bucket_name/image.jpg'. |
| | |
| | Args: |
| | filepath (str): Path to be formatted. |
| | """ |
| | return re.sub(r'\\+', '/', filepath) |
| |
|
| | def get(self, filepath: Union[str, Path]) -> bytes: |
| | """Read data from a given ``filepath`` with 'rb' mode. |
| | |
| | Args: |
| | filepath (str or Path): Path to read data. |
| | |
| | Returns: |
| | bytes: The loaded bytes. |
| | """ |
| | filepath = self._map_path(filepath) |
| | filepath = self._format_path(filepath) |
| | value = self._client.Get(filepath) |
| | return value |
| |
|
| |
|
| | class HardDiskBackend(BaseStorageBackend): |
| | """Raw hard disks storage backend.""" |
| |
|
| | def get(self, filepath: Union[str, Path]) -> bytes: |
| | """Read data from a given ``filepath`` with 'rb' mode. |
| | |
| | Args: |
| | filepath (str or Path): Path to read data. |
| | |
| | Returns: |
| | bytes: Expected bytes object. |
| | """ |
| | with open(filepath, 'rb') as f: |
| | value_buf = f.read() |
| | return value_buf |
| |
|