+from hashlib import sha1
+from importlib import resources
+from io import BufferedIOBase, BytesIO
+from random import Random
+from tempfile import NamedTemporaryFile
+from unittest import TestCase
+
+from psycopg2.extras import register_uuid
+import psycopg2
+
+from disk_jumble import bencode
+
+
+_SCHEMAS = {"public", "diskjumble", "bittorrent"}
+
+_BUF_SIZE = 16 * 1024 ** 2 # in bytes
+
+
+def random_file(size: int, rand_src: Random, on_disk: bool) -> BufferedIOBase:
+ f = NamedTemporaryFile(buffering = _BUF_SIZE) if on_disk else BytesIO()
+ try:
+ while f.tell() < size:
+ write_size = min(size - f.tell(), _BUF_SIZE)
+ f.write(bytes(rand_src.getrandbits(8) for _ in range(write_size)))
+ f.seek(0)
+ return f
+ except Exception:
+ f.close()
+ raise
+
+
+class DjTestCase(TestCase):
+ """
+ Base class for Disk Jumble tests that handles setup and teardown of a testing database.
+ """
+ @classmethod
+ def setUpClass(cls):
+ register_uuid()
+ prod_schema_sql = resources.files(__package__).joinpath("verify_setup.sql").read_text()
+ schema_sql = "\n".join(
+ l for l in prod_schema_sql.splitlines()
+ if (
+ not l.startswith("SET")
+ and not l.startswith("SELECT") # ignore server settings
+ and "OWNER TO" not in l # and ownership changes
+ )
+ )
+ cls._conn = psycopg2.connect("")
+ with cls._conn, cls._conn.cursor() as cursor:
+ for name in _SCHEMAS:
+ cursor.execute(f"create schema {name};")
+ cursor.execute(schema_sql)
+
+ @classmethod
+ def tearDownClass(cls):
+ try:
+ with cls._conn, cls._conn.cursor() as cursor:
+ for name in _SCHEMAS:
+ cursor.execute(f"drop schema {name} cascade;")
+ finally:
+ cls._conn.close()
+
+ def tearDown(self):
+ self._conn.rollback()
+
+
+class Torrent:
+ def __init__(self, data: BufferedIOBase, piece_size: int) -> None:
+ data.seek(0)
+ hashes = []
+ while True:
+ piece = data.read(piece_size)
+ if piece:
+ hashes.append(sha1(piece).digest())
+ else:
+ break
+
+ info_dict = {
+ b"piece length": piece_size,
+ b"length": data.tell(),
+ b"pieces": b"".join(hashes),
+ }
+ self.data = data
+ self.info = bencode.encode(info_dict)
+ self.info_hash = sha1(self.info).digest()
"""
from dataclasses import dataclass
-from importlib import resources
from random import Random
from typing import Optional
-import hashlib
import io
-import tempfile
-import unittest
import uuid
from psycopg2.extras import NumericRange
-import psycopg2
-import psycopg2.extras
-from disk_jumble import bencode
+from disk_jumble.tests import DjTestCase, random_file, Torrent
from disk_jumble.verify import _do_verify
-_BUF_SIZE = 16 * 1024 ** 2 # in bytes
-
-
-class Tests(unittest.TestCase):
- _SCHEMAS = {"public", "diskjumble", "bittorrent"}
-
+class Tests(DjTestCase):
def _basic_fresh_verify_helper(self, read_size):
sector_size = 32
piece_size = 64
torrent_len = 3 * piece_size
disk = self._write_disk(torrent_len // sector_size)
- with _random_file(torrent_len, Random(0), on_disk = False) as torrent_file:
- torrent = _Torrent(torrent_file, piece_size)
+ with random_file(torrent_len, Random(0), on_disk = False) as torrent_file:
+ torrent = Torrent(torrent_file, piece_size)
self._write_torrent(torrent)
with self._conn.cursor() as cursor:
cursor.execute(
(disk.id, NumericRange(0, disk.sector_count), torrent.info_hash)
)
- do_verify(self._conn, disk.id, sector_size, torrent_file, read_size, read_tries = 1)
+ _do_verify(self._conn, disk.id, sector_size, torrent_file, read_size, read_tries = 1)
cursor.execute("select * from diskjumble.verify_pass;")
self.assertEqual(cursor.rowcount, 1)
od_ss = 16
disk = self._write_disk(4)
d_ss = 16
- with _random_file(piece_size * 2, Random(0), on_disk = False) as torrent_file:
- torrent = _Torrent(torrent_file, piece_size)
+ with random_file(piece_size * 2, Random(0), on_disk = False) as torrent_file:
+ torrent = Torrent(torrent_file, piece_size)
self._write_torrent(torrent)
with self._conn.cursor() as cursor:
cursor.executemany(
]
)
- do_verify(self._conn, other_disk.id, od_ss, torrent_file, read_size = 128, read_tries = 1)
+ _do_verify(self._conn, other_disk.id, od_ss, torrent_file, read_size = 128, read_tries = 1)
cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
[(row_count,)] = cursor.fetchall()
torrent_file.seek(piece_size)
disk_file = io.BytesIO(torrent_file.read())
- do_verify(self._conn, disk.id, d_ss, disk_file, read_size = 128, read_tries = 1)
+ _do_verify(self._conn, disk.id, d_ss, disk_file, read_size = 128, read_tries = 1)
cursor.execute(
"select disk_id from diskjumble.verify_piece_content natural join diskjumble.verify_piece_incomplete;"
piece_size = 32
disk = self._write_disk(1)
- with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
- torrent = _Torrent(torrent_file, piece_size)
+ with random_file(piece_size, Random(0), on_disk = False) as torrent_file:
+ torrent = Torrent(torrent_file, piece_size)
self._write_torrent(torrent)
with self._conn.cursor() as cursor:
cursor.execute(
)
disk_file = _ReadErrorProxy(torrent_file, error_pos = 12, error_count = 2)
- do_verify(self._conn, disk.id, sector_size, disk_file, read_size = 4, read_tries = 3)
+ _do_verify(self._conn, disk.id, sector_size, disk_file, read_size = 4, read_tries = 3)
self.assertEqual(disk_file.triggered, 2)
other_a = self._write_disk(1)
other_b = self._write_disk(2)
disk = self._write_disk(1)
- with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
- torrent = _Torrent(torrent_file, piece_size)
+ with random_file(piece_size, Random(0), on_disk = False) as torrent_file:
+ torrent = Torrent(torrent_file, piece_size)
self._write_torrent(torrent)
with self._conn.cursor() as cursor:
cursor.executemany(
]
)
- do_verify(self._conn, other_a.id, sector_size, torrent_file, read_size = 16, read_tries = 1)
+ _do_verify(self._conn, other_a.id, sector_size, torrent_file, read_size = 16, read_tries = 1)
other_b_file = io.BytesIO(torrent_file.getvalue()[16:48])
- do_verify(self._conn, other_b.id, sector_size, other_b_file, read_size = 16, read_tries = 1)
+ _do_verify(self._conn, other_b.id, sector_size, other_b_file, read_size = 16, read_tries = 1)
cursor.execute("select verify_id from diskjumble.verify_piece;")
[(verify_id,)] = cursor.fetchall()
data = torrent_file.getvalue()[48:]
disk_file = _ReadErrorProxy(io.BytesIO(data), error_pos = 5, error_count = None)
- do_verify(self._conn, disk.id, sector_size, disk_file, read_size = 4, read_tries = 3)
+ _do_verify(self._conn, disk.id, sector_size, disk_file, read_size = 4, read_tries = 3)
cursor.execute("select count(*) from diskjumble.verify_pass;")
[(row_count,)] = cursor.fetchall()
[(row_count,)] = cursor.fetchall()
self.assertEqual(row_count, 0)
- def _write_torrent(self, torrent: "_Torrent") -> None:
+ def _write_torrent(self, torrent: Torrent) -> None:
with self._conn.cursor() as cursor:
cursor.execute("insert into bittorrent.torrent_info values (%s);", (torrent.info,))
[(id_,)] = cursor.fetchall()
return _Disk(id_, sector_count)
- @classmethod
- def setUpClass(cls):
- psycopg2.extras.register_uuid()
- prod_schema_sql = resources.files(__package__).joinpath("verify_setup.sql").read_text()
- schema_sql = "\n".join(
- l for l in prod_schema_sql.splitlines()
- if (
- not l.startswith("SET")
- and not l.startswith("SELECT") # ignore server settings
- and "OWNER TO" not in l # and ownership changes
- )
- )
- cls._conn = psycopg2.connect("")
- with cls._conn, cls._conn.cursor() as cursor:
- for name in cls._SCHEMAS:
- cursor.execute(f"create schema {name};")
- cursor.execute(schema_sql)
-
- @classmethod
- def tearDownClass(self):
- try:
- with self._conn, self._conn.cursor() as cursor:
- for name in self._SCHEMAS:
- cursor.execute(f"drop schema {name} cascade;")
- finally:
- self._conn.close()
-
- def tearDown(self):
- self._conn.rollback()
-
@dataclass
class _Disk:
sector_count: int
-class _Torrent:
- def __init__(self, data: io.BufferedIOBase, piece_size: int) -> None:
- data.seek(0)
- hashes = []
- while True:
- piece = data.read(piece_size)
- if piece:
- hashes.append(hashlib.sha1(piece).digest())
- else:
- break
-
- info_dict = {
- b"piece length": piece_size,
- b"length": data.tell(),
- b"pieces": b"".join(hashes),
- }
- self.data = data
- self.info = bencode.encode(info_dict)
- self.info_hash = hashlib.sha1(self.info).digest()
-
-
-def _random_file(size: int, rand_src: Random, on_disk: bool) -> io.BufferedIOBase:
- f = tempfile.NamedTemporaryFile(buffering = _BUF_SIZE) if on_disk else io.BytesIO()
- try:
- while f.tell() < size:
- write_size = min(size - f.tell(), _BUF_SIZE)
- f.write(bytes(rand_src.getrandbits(8) for _ in range(write_size)))
- f.seek(0)
- return f
- except Exception:
- f.close()
- raise
-
-
@dataclass
class _ReadErrorProxy(io.BufferedIOBase):
wrapped: io.BufferedIOBase
--- /dev/null
+from io import BytesIO
+from random import Random
+from unittest.mock import patch
+
+from psycopg2.extras import NumericRange
+
+from disk_jumble.tests import DjTestCase, Torrent
+from disk_jumble.verify import _do_verify
+
+
+class Tests(DjTestCase):
+ def test_v1_worklist_unaligned_piece(self):
+ """
+ In this setup we have a single torrent with two pieces, the second of which is unaligned due to the piece length
+ not being a multiple of the block size. Three slabs cover the second piece, to be verified.
+ """
+ random = Random(0)
+ torrent_data = bytes(random.getrandbits(8) for _ in range(120))
+ torrent = Torrent(BytesIO(torrent_data), 66)
+
+ with self._conn.cursor() as cursor:
+ cursor.execute("insert into bittorrent.torrent_info values (%s)", (torrent.info,))
+ cursor.execute("insert into diskjumble.disk values (1, gen_random_uuid(), null, 4, default)")
+ cursor.executemany(
+ "insert into diskjumble.slab values (default, 1, %s, %s, %s, null)",
+ [
+ (NumericRange(0, 1), torrent.info_hash, 64),
+ (NumericRange(1, 3), torrent.info_hash, 80),
+ (NumericRange(3, 4), torrent.info_hash, 112),
+ ],
+ )
+
+ def v2_worklist_mock(*args, **kwargs):
+ return []
+
+ # TODO remove patch when v2 local schemas are available
+ with patch("disk_jumble.verify._get_v2_worklist", v2_worklist_mock):
+ _do_verify(
+ conn = self._conn,
+ disk_id = 1,
+ target_ranges = [range(1, 2)],
+ disk_file = BytesIO(torrent_data[64 : 120] + b"\0" * 8),
+ read_size = 8,
+ read_tries = 1,
+ block_size = 16,
+ )
+
+ with self._conn.cursor() as cursor:
+ cursor.execute("select unnest(verified_map) from diskjumble.disk_maps")
+ self.assertEqual(cursor.rowcount, 1)
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ # TODO This setup should be done automatically by executing a dump of the live database schemas, but I can't
+ # create a full dump currently due to incompatibility between my Postgres 13 pgdump and the version 14 server.
+ with cls._conn, cls._conn.cursor() as cursor:
+ cursor.execute(
+ """
+ create table bittorrent.torrent_info (info bytea not null);
+ create table diskjumble.disk (
+ disk_id integer,
+ dev_uuid uuid,
+ dev_serial text,
+ num_blocks bigint,
+ failed boolean default false
+ );
+ create table diskjumble.slab (
+ slab_id integer,
+ disk_id integer,
+ disk_blocks int8range,
+ entity_id bytea,
+ entity_offset bigint,
+ crypt_key bytea
+ );
+ """
+ )