| |
| |
| |
| |
|
|
| |
| |
| |
| |
| from __future__ import annotations |
|
|
| import logging |
| import os |
| import sys |
| from collections import OrderedDict |
| from typing import Any, Literal, NamedTuple, TypeVar, Union |
|
|
| import numpy as np |
| import numpy.typing as npt |
|
|
| from .quants import quant_shape_to_byte_shape |
|
|
| if __name__ == "__main__": |
| from pathlib import Path |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
| from gguf.constants import ( |
| GGML_QUANT_SIZES, |
| GGUF_DEFAULT_ALIGNMENT, |
| GGUF_MAGIC, |
| GGUF_VERSION, |
| GGMLQuantizationType, |
| GGUFValueType, |
| GGUFEndian, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
| READER_SUPPORTED_VERSIONS = [2, GGUF_VERSION] |
|
|
|
|
| class ReaderField(NamedTuple): |
| |
| offset: int |
|
|
| |
| name: str |
|
|
| |
| |
| parts: list[npt.NDArray[Any]] = [] |
|
|
| |
| |
| |
| data: list[int] = [-1] |
|
|
| types: list[GGUFValueType] = [] |
|
|
| def contents(self, index_or_slice: int | slice = slice(None)) -> Any: |
| if self.types: |
| to_string = lambda x: str(x.tobytes(), encoding='utf-8') |
| main_type = self.types[0] |
|
|
| if main_type == GGUFValueType.ARRAY: |
| sub_type = self.types[-1] |
|
|
| if sub_type == GGUFValueType.STRING: |
| indices = self.data[index_or_slice] |
|
|
| if isinstance(index_or_slice, int): |
| return to_string(self.parts[indices]) |
| else: |
| return [to_string(self.parts[idx]) for idx in indices] |
| else: |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| if isinstance(index_or_slice, int): |
| return self.parts[self.data[index_or_slice]].tolist()[0] |
| else: |
| return [pv for idx in self.data[index_or_slice] for pv in self.parts[idx].tolist()] |
|
|
| if main_type == GGUFValueType.STRING: |
| return to_string(self.parts[-1]) |
| else: |
| return self.parts[-1].tolist()[0] |
|
|
| return None |
|
|
|
|
| class ReaderTensor(NamedTuple): |
| name: str |
| tensor_type: GGMLQuantizationType |
| shape: npt.NDArray[np.uint32] |
| n_elements: int |
| n_bytes: int |
| data_offset: int |
| data: npt.NDArray[Any] |
| field: ReaderField |
|
|
|
|
| class GGUFReader: |
| |
| byte_order: Literal['I', 'S'] = 'I' |
| alignment: int = GGUF_DEFAULT_ALIGNMENT |
| data_offset: int |
|
|
| |
| gguf_scalar_to_np: dict[GGUFValueType, type[np.generic]] = { |
| GGUFValueType.UINT8: np.uint8, |
| GGUFValueType.INT8: np.int8, |
| GGUFValueType.UINT16: np.uint16, |
| GGUFValueType.INT16: np.int16, |
| GGUFValueType.UINT32: np.uint32, |
| GGUFValueType.INT32: np.int32, |
| GGUFValueType.FLOAT32: np.float32, |
| GGUFValueType.UINT64: np.uint64, |
| GGUFValueType.INT64: np.int64, |
| GGUFValueType.FLOAT64: np.float64, |
| GGUFValueType.BOOL: np.bool_, |
| } |
|
|
| def __init__(self, path: os.PathLike[str] | str, mode: Literal['r', 'r+', 'c'] = 'r'): |
| self.data = np.memmap(path, mode = mode) |
| offs = 0 |
|
|
| |
| if self._get(offs, np.uint32, override_order = '<')[0] != GGUF_MAGIC: |
| raise ValueError('GGUF magic invalid') |
| offs += 4 |
|
|
| |
| temp_version = self._get(offs, np.uint32) |
| if temp_version[0] & 65535 == 0: |
| |
| |
| self.byte_order = 'S' |
| temp_version = temp_version.view(temp_version.dtype.newbyteorder(self.byte_order)) |
| version = temp_version[0] |
| if version not in READER_SUPPORTED_VERSIONS: |
| raise ValueError(f'Sorry, file appears to be version {version} which we cannot handle') |
| if sys.byteorder == "little": |
| |
| host_endian = GGUFEndian.LITTLE |
| swapped_endian = GGUFEndian.BIG |
| else: |
| |
| host_endian = GGUFEndian.BIG |
| swapped_endian = GGUFEndian.LITTLE |
| self.endianess = swapped_endian if self.byte_order == "S" else host_endian |
| self.fields: OrderedDict[str, ReaderField] = OrderedDict() |
| self.tensors: list[ReaderTensor] = [] |
| offs += self._push_field(ReaderField(offs, 'GGUF.version', [temp_version], [0], [GGUFValueType.UINT32])) |
|
|
| |
| temp_counts = self._get(offs, np.uint64, 2) |
| offs += self._push_field(ReaderField(offs, 'GGUF.tensor_count', [temp_counts[:1]], [0], [GGUFValueType.UINT64])) |
| offs += self._push_field(ReaderField(offs, 'GGUF.kv_count', [temp_counts[1:]], [0], [GGUFValueType.UINT64])) |
| tensor_count, kv_count = temp_counts |
| offs = self._build_fields(offs, kv_count) |
|
|
| |
| offs, tensors_fields = self._build_tensor_info(offs, tensor_count) |
| new_align = self.fields.get('general.alignment') |
| if new_align is not None: |
| if new_align.types != [GGUFValueType.UINT32]: |
| raise ValueError('Bad type for general.alignment field') |
| self.alignment = new_align.parts[-1][0] |
| padding = offs % self.alignment |
| if padding != 0: |
| offs += self.alignment - padding |
| self.data_offset = offs |
| self._build_tensors(offs, tensors_fields) |
|
|
| _DT = TypeVar('_DT', bound = npt.DTypeLike) |
|
|
| |
| def get_field(self, key: str) -> Union[ReaderField, None]: |
| return self.fields.get(key, None) |
|
|
| |
| def get_tensor(self, idx: int) -> ReaderTensor: |
| return self.tensors[idx] |
|
|
| def _get( |
| self, offset: int, dtype: npt.DTypeLike, count: int = 1, override_order: None | Literal['I', 'S', '<'] = None, |
| ) -> npt.NDArray[Any]: |
| count = int(count) |
| itemsize = int(np.empty([], dtype = dtype).itemsize) |
| end_offs = offset + itemsize * count |
| arr = self.data[offset:end_offs].view(dtype=dtype)[:count] |
| return arr.view(arr.dtype.newbyteorder(self.byte_order if override_order is None else override_order)) |
|
|
| def _push_field(self, field: ReaderField, skip_sum: bool = False) -> int: |
| if field.name in self.fields: |
| |
| |
|
|
| logger.warning(f'Duplicate key {field.name} at offset {field.offset}') |
| self.fields[field.name + '_{}'.format(field.offset)] = field |
| else: |
| self.fields[field.name] = field |
| return 0 if skip_sum else sum(int(part.nbytes) for part in field.parts) |
|
|
| def _get_str(self, offset: int) -> tuple[npt.NDArray[np.uint64], npt.NDArray[np.uint8]]: |
| slen = self._get(offset, np.uint64) |
| return slen, self._get(offset + 8, np.uint8, slen[0]) |
|
|
| def _get_field_parts( |
| self, orig_offs: int, raw_type: int, |
| ) -> tuple[int, list[npt.NDArray[Any]], list[int], list[GGUFValueType]]: |
| offs = orig_offs |
| types: list[GGUFValueType] = [] |
| gtype = GGUFValueType(raw_type) |
| types.append(gtype) |
| |
| if gtype == GGUFValueType.STRING: |
| sparts: list[npt.NDArray[Any]] = list(self._get_str(offs)) |
| size = sum(int(part.nbytes) for part in sparts) |
| return size, sparts, [1], types |
| |
| nptype = self.gguf_scalar_to_np.get(gtype) |
| if nptype is not None: |
| val = self._get(offs, nptype) |
| return int(val.nbytes), [val], [0], types |
| |
| if gtype == GGUFValueType.ARRAY: |
| raw_itype = self._get(offs, np.uint32) |
| offs += int(raw_itype.nbytes) |
| alen = self._get(offs, np.uint64) |
| offs += int(alen.nbytes) |
| aparts: list[npt.NDArray[Any]] = [raw_itype, alen] |
| data_idxs: list[int] = [] |
| |
| for idx in range(int(alen[0])): |
| curr_size, curr_parts, curr_idxs, curr_types = self._get_field_parts(offs, raw_itype[0]) |
| if idx == 0: |
| types += curr_types |
| idxs_offs = len(aparts) |
| aparts += curr_parts |
| data_idxs += [i + idxs_offs for i in curr_idxs] |
| offs += curr_size |
| return offs - orig_offs, aparts, data_idxs, types |
| raise ValueError(f'Unknown/unhandled field type {gtype}') |
|
|
| def _get_tensor_info_field(self, orig_offs: int) -> ReaderField: |
| offs = orig_offs |
|
|
| |
| name_len, name_data = self._get_str(offs) |
| offs += int(name_len.nbytes + name_data.nbytes) |
|
|
| |
| n_dims = self._get(offs, np.uint32) |
| offs += int(n_dims.nbytes) |
|
|
| |
| dims = self._get(offs, np.uint64, n_dims[0]) |
| offs += int(dims.nbytes) |
|
|
| |
| raw_dtype = self._get(offs, np.uint32) |
| offs += int(raw_dtype.nbytes) |
|
|
| |
| offset_tensor = self._get(offs, np.uint64) |
| offs += int(offset_tensor.nbytes) |
|
|
| return ReaderField( |
| orig_offs, |
| str(bytes(name_data), encoding = 'utf-8'), |
| [name_len, name_data, n_dims, dims, raw_dtype, offset_tensor], |
| [1, 3, 4, 5], |
| ) |
|
|
| def _build_fields(self, offs: int, count: int) -> int: |
| for _ in range(count): |
| orig_offs = offs |
| kv_klen, kv_kdata = self._get_str(offs) |
| offs += int(kv_klen.nbytes + kv_kdata.nbytes) |
| raw_kv_type = self._get(offs, np.uint32) |
| offs += int(raw_kv_type.nbytes) |
| parts: list[npt.NDArray[Any]] = [kv_klen, kv_kdata, raw_kv_type] |
| idxs_offs = len(parts) |
| field_size, field_parts, field_idxs, field_types = self._get_field_parts(offs, raw_kv_type[0]) |
| parts += field_parts |
| self._push_field(ReaderField( |
| orig_offs, |
| str(bytes(kv_kdata), encoding = 'utf-8'), |
| parts, |
| [idx + idxs_offs for idx in field_idxs], |
| field_types, |
| ), skip_sum = True) |
| offs += field_size |
| return offs |
|
|
| def _build_tensor_info(self, offs: int, count: int) -> tuple[int, list[ReaderField]]: |
| tensor_fields = [] |
| for _ in range(count): |
| field = self._get_tensor_info_field(offs) |
| offs += sum(int(part.nbytes) for part in field.parts) |
| tensor_fields.append(field) |
| return offs, tensor_fields |
|
|
| def _build_tensors(self, start_offs: int, fields: list[ReaderField]) -> None: |
| tensors = [] |
| tensor_names = set() |
| for field in fields: |
| _name_len, name_data, _n_dims, dims, raw_dtype, offset_tensor = field.parts |
| |
| tensor_name = str(bytes(name_data), encoding = 'utf-8') |
| if tensor_name in tensor_names: |
| raise ValueError(f'Found duplicated tensor with name {tensor_name}') |
| tensor_names.add(tensor_name) |
| ggml_type = GGMLQuantizationType(raw_dtype[0]) |
| n_elems = int(np.prod(dims)) |
| np_dims = tuple(reversed(dims.tolist())) |
| block_size, type_size = GGML_QUANT_SIZES[ggml_type] |
| n_bytes = n_elems * type_size // block_size |
| data_offs = int(start_offs + offset_tensor[0]) |
| item_type: npt.DTypeLike |
| if ggml_type == GGMLQuantizationType.F16: |
| item_count = n_elems |
| item_type = np.float16 |
| elif ggml_type == GGMLQuantizationType.F32: |
| item_count = n_elems |
| item_type = np.float32 |
| elif ggml_type == GGMLQuantizationType.F64: |
| item_count = n_elems |
| item_type = np.float64 |
| elif ggml_type == GGMLQuantizationType.I8: |
| item_count = n_elems |
| item_type = np.int8 |
| elif ggml_type == GGMLQuantizationType.I16: |
| item_count = n_elems |
| item_type = np.int16 |
| elif ggml_type == GGMLQuantizationType.I32: |
| item_count = n_elems |
| item_type = np.int32 |
| elif ggml_type == GGMLQuantizationType.I64: |
| item_count = n_elems |
| item_type = np.int64 |
| else: |
| item_count = n_bytes |
| item_type = np.uint8 |
| np_dims = quant_shape_to_byte_shape(np_dims, ggml_type) |
| tensors.append(ReaderTensor( |
| name = tensor_name, |
| tensor_type = ggml_type, |
| shape = dims, |
| n_elements = n_elems, |
| n_bytes = n_bytes, |
| data_offset = data_offs, |
| data = self._get(data_offs, item_type, item_count).reshape(np_dims), |
| field = field, |
| )) |
| self.tensors = tensors |
|
|