From fdab5aed91b64f139381873e297f28acd2cfa109 Mon Sep 17 00:00:00 2001 From: Jakob Cornell Date: Wed, 29 Jun 2022 21:52:53 -0500 Subject: [PATCH] Refactor and add tests for DJ verify (WIP) --- disk_jumble/src/disk_jumble/tests/__init__.py | 85 ++++++++++++++ .../src/disk_jumble/tests/test_verify_v1.py | 111 +++--------------- .../src/disk_jumble/tests/test_worklist.py | 78 ++++++++++++ 3 files changed, 181 insertions(+), 93 deletions(-) create mode 100644 disk_jumble/src/disk_jumble/tests/test_worklist.py diff --git a/disk_jumble/src/disk_jumble/tests/__init__.py b/disk_jumble/src/disk_jumble/tests/__init__.py index e69de29..0af8ba6 100644 --- a/disk_jumble/src/disk_jumble/tests/__init__.py +++ b/disk_jumble/src/disk_jumble/tests/__init__.py @@ -0,0 +1,85 @@ +from hashlib import sha1 +from importlib import resources +from io import BufferedIOBase, BytesIO +from random import Random +from tempfile import NamedTemporaryFile +from unittest import TestCase + +from psycopg2.extras import register_uuid +import psycopg2 + +from disk_jumble import bencode + + +_SCHEMAS = {"public", "diskjumble", "bittorrent"} + +_BUF_SIZE = 16 * 1024 ** 2 # in bytes + + +def random_file(size: int, rand_src: Random, on_disk: bool) -> BufferedIOBase: + f = NamedTemporaryFile(buffering = _BUF_SIZE) if on_disk else BytesIO() + try: + while f.tell() < size: + write_size = min(size - f.tell(), _BUF_SIZE) + f.write(bytes(rand_src.getrandbits(8) for _ in range(write_size))) + f.seek(0) + return f + except Exception: + f.close() + raise + + +class DjTestCase(TestCase): + """ + Base class for Disk Jumble tests that handles setup and teardown of a testing database. + """ + @classmethod + def setUpClass(cls): + register_uuid() + prod_schema_sql = resources.files(__package__).joinpath("verify_setup.sql").read_text() + schema_sql = "\n".join( + l for l in prod_schema_sql.splitlines() + if ( + not l.startswith("SET") + and not l.startswith("SELECT") # ignore server settings + and "OWNER TO" not in l # and ownership changes + ) + ) + cls._conn = psycopg2.connect("") + with cls._conn, cls._conn.cursor() as cursor: + for name in _SCHEMAS: + cursor.execute(f"create schema {name};") + cursor.execute(schema_sql) + + @classmethod + def tearDownClass(cls): + try: + with cls._conn, cls._conn.cursor() as cursor: + for name in _SCHEMAS: + cursor.execute(f"drop schema {name} cascade;") + finally: + cls._conn.close() + + def tearDown(self): + self._conn.rollback() + + +class Torrent: + def __init__(self, data: BufferedIOBase, piece_size: int) -> None: + data.seek(0) + hashes = [] + while True: + piece = data.read(piece_size) + if piece: + hashes.append(sha1(piece).digest()) + else: + break + + info_dict = { + b"piece length": piece_size, + b"length": data.tell(), + b"pieces": b"".join(hashes), + } + self.data = data + self.info = bencode.encode(info_dict) + self.info_hash = sha1(self.info).digest() diff --git a/disk_jumble/src/disk_jumble/tests/test_verify_v1.py b/disk_jumble/src/disk_jumble/tests/test_verify_v1.py index 10a468a..74235ad 100644 --- a/disk_jumble/src/disk_jumble/tests/test_verify_v1.py +++ b/disk_jumble/src/disk_jumble/tests/test_verify_v1.py @@ -11,37 +11,26 @@ objects; see `test_util/dump_db.py'. """ from dataclasses import dataclass -from importlib import resources from random import Random from typing import Optional -import hashlib import io -import tempfile -import unittest import uuid from psycopg2.extras import NumericRange -import psycopg2 -import psycopg2.extras -from disk_jumble import bencode +from disk_jumble.tests import DjTestCase, random_file, Torrent from disk_jumble.verify import _do_verify -_BUF_SIZE = 16 * 1024 ** 2 # in bytes - - -class Tests(unittest.TestCase): - _SCHEMAS = {"public", "diskjumble", "bittorrent"} - +class Tests(DjTestCase): def _basic_fresh_verify_helper(self, read_size): sector_size = 32 piece_size = 64 torrent_len = 3 * piece_size disk = self._write_disk(torrent_len // sector_size) - with _random_file(torrent_len, Random(0), on_disk = False) as torrent_file: - torrent = _Torrent(torrent_file, piece_size) + with random_file(torrent_len, Random(0), on_disk = False) as torrent_file: + torrent = Torrent(torrent_file, piece_size) self._write_torrent(torrent) with self._conn.cursor() as cursor: cursor.execute( @@ -49,7 +38,7 @@ class Tests(unittest.TestCase): (disk.id, NumericRange(0, disk.sector_count), torrent.info_hash) ) - do_verify(self._conn, disk.id, sector_size, torrent_file, read_size, read_tries = 1) + _do_verify(self._conn, disk.id, sector_size, torrent_file, read_size, read_tries = 1) cursor.execute("select * from diskjumble.verify_pass;") self.assertEqual(cursor.rowcount, 1) @@ -74,8 +63,8 @@ class Tests(unittest.TestCase): od_ss = 16 disk = self._write_disk(4) d_ss = 16 - with _random_file(piece_size * 2, Random(0), on_disk = False) as torrent_file: - torrent = _Torrent(torrent_file, piece_size) + with random_file(piece_size * 2, Random(0), on_disk = False) as torrent_file: + torrent = Torrent(torrent_file, piece_size) self._write_torrent(torrent) with self._conn.cursor() as cursor: cursor.executemany( @@ -86,7 +75,7 @@ class Tests(unittest.TestCase): ] ) - do_verify(self._conn, other_disk.id, od_ss, torrent_file, read_size = 128, read_tries = 1) + _do_verify(self._conn, other_disk.id, od_ss, torrent_file, read_size = 128, read_tries = 1) cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;") [(row_count,)] = cursor.fetchall() @@ -94,7 +83,7 @@ class Tests(unittest.TestCase): torrent_file.seek(piece_size) disk_file = io.BytesIO(torrent_file.read()) - do_verify(self._conn, disk.id, d_ss, disk_file, read_size = 128, read_tries = 1) + _do_verify(self._conn, disk.id, d_ss, disk_file, read_size = 128, read_tries = 1) cursor.execute( "select disk_id from diskjumble.verify_piece_content natural join diskjumble.verify_piece_incomplete;" @@ -112,8 +101,8 @@ class Tests(unittest.TestCase): piece_size = 32 disk = self._write_disk(1) - with _random_file(piece_size, Random(0), on_disk = False) as torrent_file: - torrent = _Torrent(torrent_file, piece_size) + with random_file(piece_size, Random(0), on_disk = False) as torrent_file: + torrent = Torrent(torrent_file, piece_size) self._write_torrent(torrent) with self._conn.cursor() as cursor: cursor.execute( @@ -122,7 +111,7 @@ class Tests(unittest.TestCase): ) disk_file = _ReadErrorProxy(torrent_file, error_pos = 12, error_count = 2) - do_verify(self._conn, disk.id, sector_size, disk_file, read_size = 4, read_tries = 3) + _do_verify(self._conn, disk.id, sector_size, disk_file, read_size = 4, read_tries = 3) self.assertEqual(disk_file.triggered, 2) @@ -143,8 +132,8 @@ class Tests(unittest.TestCase): other_a = self._write_disk(1) other_b = self._write_disk(2) disk = self._write_disk(1) - with _random_file(piece_size, Random(0), on_disk = False) as torrent_file: - torrent = _Torrent(torrent_file, piece_size) + with random_file(piece_size, Random(0), on_disk = False) as torrent_file: + torrent = Torrent(torrent_file, piece_size) self._write_torrent(torrent) with self._conn.cursor() as cursor: cursor.executemany( @@ -156,16 +145,16 @@ class Tests(unittest.TestCase): ] ) - do_verify(self._conn, other_a.id, sector_size, torrent_file, read_size = 16, read_tries = 1) + _do_verify(self._conn, other_a.id, sector_size, torrent_file, read_size = 16, read_tries = 1) other_b_file = io.BytesIO(torrent_file.getvalue()[16:48]) - do_verify(self._conn, other_b.id, sector_size, other_b_file, read_size = 16, read_tries = 1) + _do_verify(self._conn, other_b.id, sector_size, other_b_file, read_size = 16, read_tries = 1) cursor.execute("select verify_id from diskjumble.verify_piece;") [(verify_id,)] = cursor.fetchall() data = torrent_file.getvalue()[48:] disk_file = _ReadErrorProxy(io.BytesIO(data), error_pos = 5, error_count = None) - do_verify(self._conn, disk.id, sector_size, disk_file, read_size = 4, read_tries = 3) + _do_verify(self._conn, disk.id, sector_size, disk_file, read_size = 4, read_tries = 3) cursor.execute("select count(*) from diskjumble.verify_pass;") [(row_count,)] = cursor.fetchall() @@ -184,7 +173,7 @@ class Tests(unittest.TestCase): [(row_count,)] = cursor.fetchall() self.assertEqual(row_count, 0) - def _write_torrent(self, torrent: "_Torrent") -> None: + def _write_torrent(self, torrent: Torrent) -> None: with self._conn.cursor() as cursor: cursor.execute("insert into bittorrent.torrent_info values (%s);", (torrent.info,)) @@ -197,36 +186,6 @@ class Tests(unittest.TestCase): [(id_,)] = cursor.fetchall() return _Disk(id_, sector_count) - @classmethod - def setUpClass(cls): - psycopg2.extras.register_uuid() - prod_schema_sql = resources.files(__package__).joinpath("verify_setup.sql").read_text() - schema_sql = "\n".join( - l for l in prod_schema_sql.splitlines() - if ( - not l.startswith("SET") - and not l.startswith("SELECT") # ignore server settings - and "OWNER TO" not in l # and ownership changes - ) - ) - cls._conn = psycopg2.connect("") - with cls._conn, cls._conn.cursor() as cursor: - for name in cls._SCHEMAS: - cursor.execute(f"create schema {name};") - cursor.execute(schema_sql) - - @classmethod - def tearDownClass(self): - try: - with self._conn, self._conn.cursor() as cursor: - for name in self._SCHEMAS: - cursor.execute(f"drop schema {name} cascade;") - finally: - self._conn.close() - - def tearDown(self): - self._conn.rollback() - @dataclass class _Disk: @@ -234,40 +193,6 @@ class _Disk: sector_count: int -class _Torrent: - def __init__(self, data: io.BufferedIOBase, piece_size: int) -> None: - data.seek(0) - hashes = [] - while True: - piece = data.read(piece_size) - if piece: - hashes.append(hashlib.sha1(piece).digest()) - else: - break - - info_dict = { - b"piece length": piece_size, - b"length": data.tell(), - b"pieces": b"".join(hashes), - } - self.data = data - self.info = bencode.encode(info_dict) - self.info_hash = hashlib.sha1(self.info).digest() - - -def _random_file(size: int, rand_src: Random, on_disk: bool) -> io.BufferedIOBase: - f = tempfile.NamedTemporaryFile(buffering = _BUF_SIZE) if on_disk else io.BytesIO() - try: - while f.tell() < size: - write_size = min(size - f.tell(), _BUF_SIZE) - f.write(bytes(rand_src.getrandbits(8) for _ in range(write_size))) - f.seek(0) - return f - except Exception: - f.close() - raise - - @dataclass class _ReadErrorProxy(io.BufferedIOBase): wrapped: io.BufferedIOBase diff --git a/disk_jumble/src/disk_jumble/tests/test_worklist.py b/disk_jumble/src/disk_jumble/tests/test_worklist.py new file mode 100644 index 0000000..ac4b58a --- /dev/null +++ b/disk_jumble/src/disk_jumble/tests/test_worklist.py @@ -0,0 +1,78 @@ +from io import BytesIO +from random import Random +from unittest.mock import patch + +from psycopg2.extras import NumericRange + +from disk_jumble.tests import DjTestCase, Torrent +from disk_jumble.verify import _do_verify + + +class Tests(DjTestCase): + def test_v1_worklist_unaligned_piece(self): + """ + In this setup we have a single torrent with two pieces, the second of which is unaligned due to the piece length + not being a multiple of the block size. Three slabs cover the second piece, to be verified. + """ + random = Random(0) + torrent_data = bytes(random.getrandbits(8) for _ in range(120)) + torrent = Torrent(BytesIO(torrent_data), 66) + + with self._conn.cursor() as cursor: + cursor.execute("insert into bittorrent.torrent_info values (%s)", (torrent.info,)) + cursor.execute("insert into diskjumble.disk values (1, gen_random_uuid(), null, 4, default)") + cursor.executemany( + "insert into diskjumble.slab values (default, 1, %s, %s, %s, null)", + [ + (NumericRange(0, 1), torrent.info_hash, 64), + (NumericRange(1, 3), torrent.info_hash, 80), + (NumericRange(3, 4), torrent.info_hash, 112), + ], + ) + + def v2_worklist_mock(*args, **kwargs): + return [] + + # TODO remove patch when v2 local schemas are available + with patch("disk_jumble.verify._get_v2_worklist", v2_worklist_mock): + _do_verify( + conn = self._conn, + disk_id = 1, + target_ranges = [range(1, 2)], + disk_file = BytesIO(torrent_data[64 : 120] + b"\0" * 8), + read_size = 8, + read_tries = 1, + block_size = 16, + ) + + with self._conn.cursor() as cursor: + cursor.execute("select unnest(verified_map) from diskjumble.disk_maps") + self.assertEqual(cursor.rowcount, 1) + + @classmethod + def setUpClass(cls): + super().setUpClass() + + # TODO This setup should be done automatically by executing a dump of the live database schemas, but I can't + # create a full dump currently due to incompatibility between my Postgres 13 pgdump and the version 14 server. + with cls._conn, cls._conn.cursor() as cursor: + cursor.execute( + """ + create table bittorrent.torrent_info (info bytea not null); + create table diskjumble.disk ( + disk_id integer, + dev_uuid uuid, + dev_serial text, + num_blocks bigint, + failed boolean default false + ); + create table diskjumble.slab ( + slab_id integer, + disk_id integer, + disk_blocks int8range, + entity_id bytea, + entity_offset bigint, + crypt_key bytea + ); + """ + ) -- 2.30.2