| | |
| | |
| | |
| | |
| | """Test for object db""" |
| |
|
| | from gitdb.test.lib import ( |
| | TestBase, |
| | DummyStream, |
| | make_bytes, |
| | make_object, |
| | fixture_path |
| | ) |
| |
|
| | from gitdb import ( |
| | DecompressMemMapReader, |
| | FDCompressedSha1Writer, |
| | LooseObjectDB, |
| | Sha1Writer, |
| | MemoryDB, |
| | IStream, |
| | ) |
| | from gitdb.util import hex_to_bin |
| |
|
| | import zlib |
| | from gitdb.typ import ( |
| | str_blob_type |
| | ) |
| |
|
| | import tempfile |
| | import os |
| | from io import BytesIO |
| |
|
| |
|
| | class TestStream(TestBase): |
| |
|
| | """Test stream classes""" |
| |
|
| | data_sizes = (15, 10000, 1000 * 1024 + 512) |
| |
|
| | def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None): |
| | """Make stream tests - the orig_stream is seekable, allowing it to be |
| | rewound and reused |
| | :param cdata: the data we expect to read from stream, the contents |
| | :param rewind_stream: function called to rewind the stream to make it ready |
| | for reuse""" |
| | ns = 10 |
| | assert len(cdata) > ns - 1, "Data must be larger than %i, was %i" % (ns, len(cdata)) |
| |
|
| | |
| | ss = len(cdata) // ns |
| | for i in range(ns): |
| | data = stream.read(ss) |
| | chunk = cdata[i * ss:(i + 1) * ss] |
| | assert data == chunk |
| | |
| | rest = stream.read() |
| | if rest: |
| | assert rest == cdata[-len(rest):] |
| | |
| |
|
| | if isinstance(stream, DecompressMemMapReader): |
| | assert len(stream.data()) == stream.compressed_bytes_read() |
| | |
| |
|
| | rewind_stream(stream) |
| |
|
| | |
| | rdata = stream.read() |
| | assert rdata == cdata |
| |
|
| | if isinstance(stream, DecompressMemMapReader): |
| | assert len(stream.data()) == stream.compressed_bytes_read() |
| | |
| |
|
| | def test_decompress_reader(self): |
| | for close_on_deletion in range(2): |
| | for with_size in range(2): |
| | for ds in self.data_sizes: |
| | cdata = make_bytes(ds, randomize=False) |
| |
|
| | |
| | |
| |
|
| | |
| | if with_size: |
| | |
| | zdata = zlib.compress(make_object(str_blob_type, cdata)) |
| | typ, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion) |
| | assert size == len(cdata) |
| | assert typ == str_blob_type |
| |
|
| | |
| | test_reader = DecompressMemMapReader(zdata, close_on_deletion=False) |
| | assert test_reader._s == len(cdata) |
| | else: |
| | |
| | zdata = zlib.compress(cdata) |
| | reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata)) |
| | assert reader._s == len(cdata) |
| | |
| |
|
| | self._assert_stream_reader(reader, cdata, lambda r: r.seek(0)) |
| |
|
| | |
| | dummy = DummyStream() |
| | reader._m = dummy |
| |
|
| | assert not dummy.closed |
| | del(reader) |
| | assert dummy.closed == close_on_deletion |
| | |
| | |
| | |
| |
|
| | def test_sha_writer(self): |
| | writer = Sha1Writer() |
| | assert 2 == writer.write(b"hi") |
| | assert len(writer.sha(as_hex=1)) == 40 |
| | assert len(writer.sha(as_hex=0)) == 20 |
| |
|
| | |
| | prev_sha = writer.sha() |
| | writer.write(b"hi again") |
| | assert writer.sha() != prev_sha |
| |
|
| | def test_compressed_writer(self): |
| | for ds in self.data_sizes: |
| | fd, path = tempfile.mkstemp() |
| | ostream = FDCompressedSha1Writer(fd) |
| | data = make_bytes(ds, randomize=False) |
| |
|
| | |
| | assert len(data) == ostream.write(data) |
| | ostream.close() |
| |
|
| | |
| | self.assertRaises(OSError, os.close, fd) |
| |
|
| | |
| | fd = os.open(path, os.O_RDONLY | getattr(os, 'O_BINARY', 0)) |
| | written_data = os.read(fd, os.path.getsize(path)) |
| | assert len(written_data) == os.path.getsize(path) |
| | os.close(fd) |
| | assert written_data == zlib.compress(data, 1) |
| |
|
| | os.remove(path) |
| | |
| |
|
| | def test_decompress_reader_special_case(self): |
| | odb = LooseObjectDB(fixture_path('objects')) |
| | mdb = MemoryDB() |
| | for sha in (b'888401851f15db0eed60eb1bc29dec5ddcace911', |
| | b'7bb839852ed5e3a069966281bb08d50012fb309b',): |
| | ostream = odb.stream(hex_to_bin(sha)) |
| |
|
| | |
| | data = ostream.read() |
| | assert len(data) == ostream.size |
| |
|
| | |
| | dump = mdb.store(IStream(ostream.type, ostream.size, BytesIO(data))) |
| | assert dump.hexsha == sha |
| | |
| |
|