Refactor and add tests for DJ verify (WIP)
authorJakob Cornell <jakob+gpg@jcornell.net>
Thu, 30 Jun 2022 02:52:53 +0000 (21:52 -0500)
committerJakob Cornell <jakob+gpg@jcornell.net>
Thu, 30 Jun 2022 02:52:53 +0000 (21:52 -0500)
disk_jumble/src/disk_jumble/tests/__init__.py
disk_jumble/src/disk_jumble/tests/test_verify_v1.py
disk_jumble/src/disk_jumble/tests/test_worklist.py [new file with mode: 0644]

index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0af8ba6caee1905e05cf8464413795398dc72b76 100644 (file)
@@ -0,0 +1,85 @@
+from hashlib import sha1
+from importlib import resources
+from io import BufferedIOBase, BytesIO
+from random import Random
+from tempfile import NamedTemporaryFile
+from unittest import TestCase
+
+from psycopg2.extras import register_uuid
+import psycopg2
+
+from disk_jumble import bencode
+
+
+_SCHEMAS = {"public", "diskjumble", "bittorrent"}
+
+_BUF_SIZE = 16 * 1024 ** 2  # in bytes
+
+
+def random_file(size: int, rand_src: Random, on_disk: bool) -> BufferedIOBase:
+       f = NamedTemporaryFile(buffering = _BUF_SIZE) if on_disk else BytesIO()
+       try:
+               while f.tell() < size:
+                       write_size = min(size - f.tell(), _BUF_SIZE)
+                       f.write(bytes(rand_src.getrandbits(8) for _ in range(write_size)))
+               f.seek(0)
+               return f
+       except Exception:
+               f.close()
+               raise
+
+
+class DjTestCase(TestCase):
+       """
+       Base class for Disk Jumble tests that handles setup and teardown of a testing database.
+       """
+       @classmethod
+       def setUpClass(cls):
+               register_uuid()
+               prod_schema_sql = resources.files(__package__).joinpath("verify_setup.sql").read_text()
+               schema_sql = "\n".join(
+                       l for l in prod_schema_sql.splitlines()
+                       if (
+                               not l.startswith("SET")
+                               and not l.startswith("SELECT")  # ignore server settings
+                               and "OWNER TO" not in l  # and ownership changes
+                       )
+               )
+               cls._conn = psycopg2.connect("")
+               with cls._conn, cls._conn.cursor() as cursor:
+                       for name in _SCHEMAS:
+                               cursor.execute(f"create schema {name};")
+                       cursor.execute(schema_sql)
+
+       @classmethod
+       def tearDownClass(cls):
+               try:
+                       with cls._conn, cls._conn.cursor() as cursor:
+                               for name in _SCHEMAS:
+                                       cursor.execute(f"drop schema {name} cascade;")
+               finally:
+                       cls._conn.close()
+
+       def tearDown(self):
+               self._conn.rollback()
+
+
+class Torrent:
+       def __init__(self, data: BufferedIOBase, piece_size: int) -> None:
+               data.seek(0)
+               hashes = []
+               while True:
+                       piece = data.read(piece_size)
+                       if piece:
+                               hashes.append(sha1(piece).digest())
+                       else:
+                               break
+
+               info_dict = {
+                       b"piece length": piece_size,
+                       b"length": data.tell(),
+                       b"pieces": b"".join(hashes),
+               }
+               self.data = data
+               self.info = bencode.encode(info_dict)
+               self.info_hash = sha1(self.info).digest()
index 10a468ab41f3815ef525b7d1232f77e504b3e8e9..74235ad39d56440b3d4753716f128405314d9d9b 100644 (file)
@@ -11,37 +11,26 @@ objects; see `test_util/dump_db.py'.
 """
 
 from dataclasses import dataclass
-from importlib import resources
 from random import Random
 from typing import Optional
-import hashlib
 import io
-import tempfile
-import unittest
 import uuid
 
 from psycopg2.extras import NumericRange
-import psycopg2
-import psycopg2.extras
 
-from disk_jumble import bencode
+from disk_jumble.tests import DjTestCase, random_file, Torrent
 from disk_jumble.verify import _do_verify
 
 
-_BUF_SIZE = 16 * 1024 ** 2  # in bytes
-
-
-class Tests(unittest.TestCase):
-       _SCHEMAS = {"public", "diskjumble", "bittorrent"}
-
+class Tests(DjTestCase):
        def _basic_fresh_verify_helper(self, read_size):
                sector_size = 32
                piece_size = 64
 
                torrent_len = 3 * piece_size
                disk = self._write_disk(torrent_len // sector_size)
-               with _random_file(torrent_len, Random(0), on_disk = False) as torrent_file:
-                       torrent = _Torrent(torrent_file, piece_size)
+               with random_file(torrent_len, Random(0), on_disk = False) as torrent_file:
+                       torrent = Torrent(torrent_file, piece_size)
                        self._write_torrent(torrent)
                        with self._conn.cursor() as cursor:
                                cursor.execute(
@@ -49,7 +38,7 @@ class Tests(unittest.TestCase):
                                        (disk.id, NumericRange(0, disk.sector_count), torrent.info_hash)
                                )
 
-                               do_verify(self._conn, disk.id, sector_size, torrent_file, read_size, read_tries = 1)
+                               _do_verify(self._conn, disk.id, sector_size, torrent_file, read_size, read_tries = 1)
 
                                cursor.execute("select * from diskjumble.verify_pass;")
                                self.assertEqual(cursor.rowcount, 1)
@@ -74,8 +63,8 @@ class Tests(unittest.TestCase):
                od_ss = 16
                disk = self._write_disk(4)
                d_ss = 16
-               with _random_file(piece_size * 2, Random(0), on_disk = False) as torrent_file:
-                       torrent = _Torrent(torrent_file, piece_size)
+               with random_file(piece_size * 2, Random(0), on_disk = False) as torrent_file:
+                       torrent = Torrent(torrent_file, piece_size)
                        self._write_torrent(torrent)
                        with self._conn.cursor() as cursor:
                                cursor.executemany(
@@ -86,7 +75,7 @@ class Tests(unittest.TestCase):
                                        ]
                                )
 
-                               do_verify(self._conn, other_disk.id, od_ss, torrent_file, read_size = 128, read_tries = 1)
+                               _do_verify(self._conn, other_disk.id, od_ss, torrent_file, read_size = 128, read_tries = 1)
 
                                cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
                                [(row_count,)] = cursor.fetchall()
@@ -94,7 +83,7 @@ class Tests(unittest.TestCase):
 
                                torrent_file.seek(piece_size)
                                disk_file = io.BytesIO(torrent_file.read())
-                               do_verify(self._conn, disk.id, d_ss, disk_file, read_size = 128, read_tries = 1)
+                               _do_verify(self._conn, disk.id, d_ss, disk_file, read_size = 128, read_tries = 1)
 
                                cursor.execute(
                                        "select disk_id from diskjumble.verify_piece_content natural join diskjumble.verify_piece_incomplete;"
@@ -112,8 +101,8 @@ class Tests(unittest.TestCase):
                piece_size = 32
 
                disk = self._write_disk(1)
-               with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
-                       torrent = _Torrent(torrent_file, piece_size)
+               with random_file(piece_size, Random(0), on_disk = False) as torrent_file:
+                       torrent = Torrent(torrent_file, piece_size)
                        self._write_torrent(torrent)
                        with self._conn.cursor() as cursor:
                                cursor.execute(
@@ -122,7 +111,7 @@ class Tests(unittest.TestCase):
                                )
 
                                disk_file = _ReadErrorProxy(torrent_file, error_pos = 12, error_count = 2)
-                               do_verify(self._conn, disk.id, sector_size, disk_file, read_size = 4, read_tries = 3)
+                               _do_verify(self._conn, disk.id, sector_size, disk_file, read_size = 4, read_tries = 3)
 
                                self.assertEqual(disk_file.triggered, 2)
 
@@ -143,8 +132,8 @@ class Tests(unittest.TestCase):
                other_a = self._write_disk(1)
                other_b = self._write_disk(2)
                disk = self._write_disk(1)
-               with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
-                       torrent = _Torrent(torrent_file, piece_size)
+               with random_file(piece_size, Random(0), on_disk = False) as torrent_file:
+                       torrent = Torrent(torrent_file, piece_size)
                        self._write_torrent(torrent)
                        with self._conn.cursor() as cursor:
                                cursor.executemany(
@@ -156,16 +145,16 @@ class Tests(unittest.TestCase):
                                        ]
                                )
 
-                               do_verify(self._conn, other_a.id, sector_size, torrent_file, read_size = 16, read_tries = 1)
+                               _do_verify(self._conn, other_a.id, sector_size, torrent_file, read_size = 16, read_tries = 1)
                                other_b_file = io.BytesIO(torrent_file.getvalue()[16:48])
-                               do_verify(self._conn, other_b.id, sector_size, other_b_file, read_size = 16, read_tries = 1)
+                               _do_verify(self._conn, other_b.id, sector_size, other_b_file, read_size = 16, read_tries = 1)
 
                                cursor.execute("select verify_id from diskjumble.verify_piece;")
                                [(verify_id,)] = cursor.fetchall()
 
                                data = torrent_file.getvalue()[48:]
                                disk_file = _ReadErrorProxy(io.BytesIO(data), error_pos = 5, error_count = None)
-                               do_verify(self._conn, disk.id, sector_size, disk_file, read_size = 4, read_tries = 3)
+                               _do_verify(self._conn, disk.id, sector_size, disk_file, read_size = 4, read_tries = 3)
 
                                cursor.execute("select count(*) from diskjumble.verify_pass;")
                                [(row_count,)] = cursor.fetchall()
@@ -184,7 +173,7 @@ class Tests(unittest.TestCase):
                                [(row_count,)] = cursor.fetchall()
                                self.assertEqual(row_count, 0)
 
-       def _write_torrent(self, torrent: "_Torrent") -> None:
+       def _write_torrent(self, torrent: Torrent) -> None:
                with self._conn.cursor() as cursor:
                        cursor.execute("insert into bittorrent.torrent_info values (%s);", (torrent.info,))
 
@@ -197,36 +186,6 @@ class Tests(unittest.TestCase):
                        [(id_,)] = cursor.fetchall()
                return _Disk(id_, sector_count)
 
-       @classmethod
-       def setUpClass(cls):
-               psycopg2.extras.register_uuid()
-               prod_schema_sql = resources.files(__package__).joinpath("verify_setup.sql").read_text()
-               schema_sql = "\n".join(
-                       l for l in prod_schema_sql.splitlines()
-                       if (
-                               not l.startswith("SET")
-                               and not l.startswith("SELECT")  # ignore server settings
-                               and "OWNER TO" not in l  # and ownership changes
-                       )
-               )
-               cls._conn = psycopg2.connect("")
-               with cls._conn, cls._conn.cursor() as cursor:
-                       for name in cls._SCHEMAS:
-                               cursor.execute(f"create schema {name};")
-                       cursor.execute(schema_sql)
-
-       @classmethod
-       def tearDownClass(self):
-               try:
-                       with self._conn, self._conn.cursor() as cursor:
-                               for name in self._SCHEMAS:
-                                       cursor.execute(f"drop schema {name} cascade;")
-               finally:
-                       self._conn.close()
-
-       def tearDown(self):
-               self._conn.rollback()
-
 
 @dataclass
 class _Disk:
@@ -234,40 +193,6 @@ class _Disk:
        sector_count: int
 
 
-class _Torrent:
-       def __init__(self, data: io.BufferedIOBase, piece_size: int) -> None:
-               data.seek(0)
-               hashes = []
-               while True:
-                       piece = data.read(piece_size)
-                       if piece:
-                               hashes.append(hashlib.sha1(piece).digest())
-                       else:
-                               break
-
-               info_dict = {
-                       b"piece length": piece_size,
-                       b"length": data.tell(),
-                       b"pieces": b"".join(hashes),
-               }
-               self.data = data
-               self.info = bencode.encode(info_dict)
-               self.info_hash = hashlib.sha1(self.info).digest()
-
-
-def _random_file(size: int, rand_src: Random, on_disk: bool) -> io.BufferedIOBase:
-       f = tempfile.NamedTemporaryFile(buffering = _BUF_SIZE) if on_disk else io.BytesIO()
-       try:
-               while f.tell() < size:
-                       write_size = min(size - f.tell(), _BUF_SIZE)
-                       f.write(bytes(rand_src.getrandbits(8) for _ in range(write_size)))
-               f.seek(0)
-               return f
-       except Exception:
-               f.close()
-               raise
-
-
 @dataclass
 class _ReadErrorProxy(io.BufferedIOBase):
        wrapped: io.BufferedIOBase
diff --git a/disk_jumble/src/disk_jumble/tests/test_worklist.py b/disk_jumble/src/disk_jumble/tests/test_worklist.py
new file mode 100644 (file)
index 0000000..ac4b58a
--- /dev/null
@@ -0,0 +1,78 @@
+from io import BytesIO
+from random import Random
+from unittest.mock import patch
+
+from psycopg2.extras import NumericRange
+
+from disk_jumble.tests import DjTestCase, Torrent
+from disk_jumble.verify import _do_verify
+
+
+class Tests(DjTestCase):
+       def test_v1_worklist_unaligned_piece(self):
+               """
+               In this setup we have a single torrent with two pieces, the second of which is unaligned due to the piece length
+               not being a multiple of the block size. Three slabs cover the second piece, to be verified.
+               """
+               random = Random(0)
+               torrent_data = bytes(random.getrandbits(8) for _ in range(120))
+               torrent = Torrent(BytesIO(torrent_data), 66)
+
+               with self._conn.cursor() as cursor:
+                       cursor.execute("insert into bittorrent.torrent_info values (%s)", (torrent.info,))
+                       cursor.execute("insert into diskjumble.disk values (1, gen_random_uuid(), null, 4, default)")
+                       cursor.executemany(
+                               "insert into diskjumble.slab values (default, 1, %s, %s, %s, null)",
+                               [
+                                       (NumericRange(0, 1), torrent.info_hash, 64),
+                                       (NumericRange(1, 3), torrent.info_hash, 80),
+                                       (NumericRange(3, 4), torrent.info_hash, 112),
+                               ],
+                       )
+
+               def v2_worklist_mock(*args, **kwargs):
+                       return []
+
+               # TODO remove patch when v2 local schemas are available
+               with patch("disk_jumble.verify._get_v2_worklist", v2_worklist_mock):
+                       _do_verify(
+                               conn = self._conn,
+                               disk_id = 1,
+                               target_ranges = [range(1, 2)],
+                               disk_file = BytesIO(torrent_data[64 : 120] + b"\0" * 8),
+                               read_size = 8,
+                               read_tries = 1,
+                               block_size = 16,
+                       )
+
+               with self._conn.cursor() as cursor:
+                       cursor.execute("select unnest(verified_map) from diskjumble.disk_maps")
+                       self.assertEqual(cursor.rowcount, 1)
+
+       @classmethod
+       def setUpClass(cls):
+               super().setUpClass()
+
+               # TODO This setup should be done automatically by executing a dump of the live database schemas, but I can't
+               # create a full dump currently due to incompatibility between my Postgres 13 pgdump and the version 14 server.
+               with cls._conn, cls._conn.cursor() as cursor:
+                       cursor.execute(
+                               """
+                                       create table bittorrent.torrent_info (info bytea not null);
+                                       create table diskjumble.disk (
+                                               disk_id integer,
+                                               dev_uuid uuid,
+                                               dev_serial text,
+                                               num_blocks bigint,
+                                               failed boolean default false
+                                       );
+                                       create table diskjumble.slab (
+                                               slab_id integer,
+                                               disk_id integer,
+                                               disk_blocks int8range,
+                                               entity_id bytea,
+                                               entity_offset bigint,
+                                               crypt_key bytea
+                                       );
+                               """
+                       )