From 7aae9af401a9bb42f1bac413559edc774344f232 Mon Sep 17 00:00:00 2001 From: Jakob Cornell Date: Sun, 20 Feb 2022 23:57:25 -0600 Subject: [PATCH] Update verify v1 for database schema changes Also renamed test module for clarity --- disk_jumble/src/disk_jumble/__init__.py | 1 + disk_jumble/src/disk_jumble/db.py | 28 ++---- .../{test_verify.py => test_verify_v1.py} | 96 ++++++++++--------- .../src/disk_jumble/tests/verify_setup.sql | 14 +-- disk_jumble/src/disk_jumble/verify.py | 23 ++--- 5 files changed, 79 insertions(+), 83 deletions(-) rename disk_jumble/src/disk_jumble/tests/{test_verify.py => test_verify_v1.py} (84%) diff --git a/disk_jumble/src/disk_jumble/__init__.py b/disk_jumble/src/disk_jumble/__init__.py index e69de29..4b7b490 100644 --- a/disk_jumble/src/disk_jumble/__init__.py +++ b/disk_jumble/src/disk_jumble/__init__.py @@ -0,0 +1 @@ +SECTOR_SIZE = 16 * 1024 # in bytes diff --git a/disk_jumble/src/disk_jumble/db.py b/disk_jumble/src/disk_jumble/db.py index 68488f0..5b7e2f8 100644 --- a/disk_jumble/src/disk_jumble/db.py +++ b/disk_jumble/src/disk_jumble/db.py @@ -7,11 +7,6 @@ import itertools from psycopg2.extras import execute_batch, Json, NumericRange -@dataclass -class Disk: - sector_size: int - - @dataclass class Slab: id: int @@ -74,14 +69,7 @@ class Wrapper: ] execute_batch(cursor, stmt, param_sets) - def get_disk(self, id_: int) -> Disk: - with self.conn.cursor() as cursor: - cursor.execute("select sector_size from diskjumble.disk where disk_id = %s;", (id_,)) - [(sector_size,)] = cursor.fetchall() - - return Disk(sector_size) - - def get_slabs_and_hashers(self, disk_id: int) -> Iterable[tuple[Slab, Optional[HasherRef]]]: + def get_slabs_and_hashers(self, disk_id: int, sector_size: int) -> Iterable[tuple[Slab, Optional[HasherRef]]]: """ Find slabs on the specified disk, and for each also return any lowest-offset saved hasher that left off directly before or within the slab's entity data. @@ -93,7 +81,7 @@ class Wrapper: -- join up incomplete piece info and precompute where the hasher left off within the entity select verify_id, seq, slab.entity_id, hasher_state, - entity_offset + (upper(c.disk_sectors) - lower(slab.disk_sectors)) * sector_size as end_off + entity_offset + (upper(c.disk_sectors) - lower(slab.disk_blocks)) * %(sector_size)s as end_off from diskjumble.verify_piece_incomplete natural left join diskjumble.verify_piece p @@ -101,12 +89,12 @@ class Wrapper: natural left join diskjumble.disk left join diskjumble.slab on ( c.disk_id = slab.disk_id - and upper(c.disk_sectors) <@ int8range(lower(slab.disk_sectors), upper(slab.disk_sectors), '[]') + and upper(c.disk_sectors) <@ int8range(lower(slab.disk_blocks), upper(slab.disk_blocks), '[]') ) where seq >= all (select seq from diskjumble.verify_piece_content where verify_id = p.verify_id) ) select - slab_id, disk_id, disk_sectors, slab.entity_id, entity_offset, crypt_key, verify_id, seq, end_off, + slab_id, disk_id, disk_blocks, slab.entity_id, entity_offset, crypt_key, verify_id, seq, end_off, hasher_state from diskjumble.slab @@ -115,15 +103,15 @@ class Wrapper: incomplete_edge.entity_id = slab.entity_id and incomplete_edge.end_off <@ int8range( slab.entity_offset, - slab.entity_offset + (upper(disk_sectors) - lower(disk_sectors)) * sector_size + slab.entity_offset + (upper(disk_blocks) - lower(disk_blocks)) * %(sector_size)s ) - and (incomplete_edge.end_off - slab.entity_offset) %% sector_size = 0 - where disk_id = %s + and (incomplete_edge.end_off - slab.entity_offset) %% %(sector_size)s = 0 + where disk_id = %(disk_id)s order by slab.entity_id, entity_offset, slab_id ; """ with self.conn.cursor() as cursor: - cursor.execute(stmt, (disk_id,)) + cursor.execute(stmt, {"disk_id": disk_id, "sector_size": sector_size}) for (_, rows_iter) in itertools.groupby(cursor, lambda r: r[0]): rows = list(rows_iter) [(slab_id, disk_id, sectors_pg, entity_id, entity_off, key)] = {r[:6] for r in rows} diff --git a/disk_jumble/src/disk_jumble/tests/test_verify.py b/disk_jumble/src/disk_jumble/tests/test_verify_v1.py similarity index 84% rename from disk_jumble/src/disk_jumble/tests/test_verify.py rename to disk_jumble/src/disk_jumble/tests/test_verify_v1.py index 75f166d..bc18e28 100644 --- a/disk_jumble/src/disk_jumble/tests/test_verify.py +++ b/disk_jumble/src/disk_jumble/tests/test_verify_v1.py @@ -40,7 +40,7 @@ class Tests(unittest.TestCase): piece_size = 64 torrent_len = 3 * piece_size - disk = self._write_disk(sector_size, torrent_len // sector_size) + disk = self._write_disk(torrent_len // sector_size) with _random_file(torrent_len, Random(0), on_disk = False) as torrent_file: torrent = _Torrent(torrent_file, piece_size) self._write_torrent(torrent) @@ -50,7 +50,7 @@ class Tests(unittest.TestCase): (disk.id, NumericRange(0, disk.sector_count), torrent.info_hash) ) - do_verify(self._conn, disk.id, torrent_file, read_size, read_tries = 1) + do_verify(self._conn, disk.id, sector_size, torrent_file, read_size, read_tries = 1) cursor.execute("select * from diskjumble.verify_pass;") self.assertEqual(cursor.rowcount, 1) @@ -69,26 +69,26 @@ class Tests(unittest.TestCase): Test a run where a cached hash state is used, a piece is split on disk, and the end of the torrent isn't sector-aligned. """ + sector_size = 16 read_size = 8 - piece_size = 64 - other_disk = self._write_disk(16, 1) - disk = self._write_disk(32, 3) - with _random_file(piece_size, Random(0), on_disk = False) as torrent_file: - torrent = _Torrent(torrent_file, piece_size) + other_disk = self._write_disk(1) + disk = self._write_disk(5) + with _random_file(60, Random(0), on_disk = False) as torrent_file: + torrent = _Torrent(torrent_file, piece_size = 64) self._write_torrent(torrent) with self._conn.cursor() as cursor: cursor.executemany( "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);", [ (other_disk.id, NumericRange(0, 1), torrent.info_hash, 0), - (disk.id, NumericRange(0, 1), torrent.info_hash, other_disk.sector_size), - (disk.id, NumericRange(2, 3), torrent.info_hash, other_disk.sector_size + disk.sector_size), + (disk.id, NumericRange(0, 2), torrent.info_hash, 16), + (disk.id, NumericRange(4, 5), torrent.info_hash, 48), ] ) # Prepare the saved hasher state by running a verify - do_verify(self._conn, other_disk.id, torrent_file, read_size, read_tries = 1) + do_verify(self._conn, other_disk.id, sector_size, torrent_file, read_size, read_tries = 1) torrent_file.seek(0) cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;") @@ -96,12 +96,12 @@ class Tests(unittest.TestCase): self.assertEqual(row_count, 1) disk_file = io.BytesIO() - torrent_file.seek(other_disk.sector_size) - disk_file.write(torrent_file.read(disk.sector_size)) - disk_file.seek(disk_file.tell() + disk.sector_size) + torrent_file.seek(sector_size) + disk_file.write(torrent_file.read(sector_size * 2)) + disk_file.seek(disk_file.tell() + sector_size * 2) disk_file.write(torrent_file.read()) disk_file.seek(0) - do_verify(self._conn, disk.id, disk_file, read_size, read_tries = 1) + do_verify(self._conn, disk.id, sector_size, disk_file, read_size, read_tries = 1) # Check that there are no verify pieces in the database. Because of integrity constraints, this also # guarantees there aren't any stray saved hasher states, failed verifies, or piece contents. @@ -112,7 +112,7 @@ class Tests(unittest.TestCase): cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;") self.assertEqual( cursor.fetchall(), - [(other_disk.id, NumericRange(0, 1)), (disk.id, NumericRange(0, 1)), (disk.id, NumericRange(2, 3))] + [(other_disk.id, NumericRange(0, 1)), (disk.id, NumericRange(0, 2)), (disk.id, NumericRange(4, 5))] ) def test_resume_no_completion(self): @@ -120,11 +120,12 @@ class Tests(unittest.TestCase): Test a run where a saved hasher state is used and the target disk has subsequent entity data but not the full remainder of the piece. """ + sector_size = 16 read_size = 7 piece_size = 64 - other_disk = self._write_disk(16, 1) - disk = self._write_disk(32, 1) + other_disk = self._write_disk(1) + disk = self._write_disk(2) with _random_file(piece_size, Random(0), on_disk = False) as torrent_file: torrent = _Torrent(torrent_file, piece_size) self._write_torrent(torrent) @@ -133,21 +134,21 @@ class Tests(unittest.TestCase): "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);", [ (other_disk.id, NumericRange(0, 1), torrent.info_hash, 0), - (disk.id, NumericRange(0, 1), torrent.info_hash, other_disk.sector_size), + (disk.id, NumericRange(0, 2), torrent.info_hash, sector_size), ] ) - do_verify(self._conn, other_disk.id, torrent_file, read_size, read_tries = 1) + do_verify(self._conn, other_disk.id, sector_size, torrent_file, read_size, read_tries = 1) cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;") [(row_count,)] = cursor.fetchall() self.assertEqual(row_count, 1) disk_file = io.BytesIO() - torrent_file.seek(other_disk.sector_size) - disk_file.write(torrent_file.read(disk.sector_size)) + torrent_file.seek(sector_size) + disk_file.write(torrent_file.read(sector_size * 2)) disk_file.seek(0) - do_verify(self._conn, disk.id, disk_file, read_size, read_tries = 1) + do_verify(self._conn, disk.id, sector_size, disk_file, read_size, read_tries = 1) cursor.execute("select count(*) from diskjumble.verify_pass;") [(row_count,)] = cursor.fetchall() @@ -161,7 +162,7 @@ class Tests(unittest.TestCase): cursor.execute("select disk_id, disk_sectors from diskjumble.verify_piece_content;") self.assertCountEqual( cursor.fetchall(), - [(other_disk.id, NumericRange(0, 1)), (disk.id, NumericRange(0, 1))] + [(other_disk.id, NumericRange(0, 1)), (disk.id, NumericRange(0, 2))] ) cursor.execute("select count(*) from diskjumble.verify_piece_fail;") @@ -170,7 +171,7 @@ class Tests(unittest.TestCase): hasher = Sha1Hasher(None) torrent_file.seek(0) - hasher.update(torrent_file.read(other_disk.sector_size + disk.sector_size)) + hasher.update(torrent_file.read(sector_size * 3)) cursor.execute("select hasher_state from diskjumble.verify_piece_incomplete;") self.assertEqual(cursor.fetchall(), [(hasher.ctx.serialize(),)]) @@ -181,8 +182,10 @@ class Tests(unittest.TestCase): """ piece_size = 64 - other_disk = self._write_disk(16, 1) - disk = self._write_disk(16, 4) + other_disk = self._write_disk(1) + od_ss = 16 + disk = self._write_disk(4) + d_ss = 16 with _random_file(piece_size * 2, Random(0), on_disk = False) as torrent_file: torrent = _Torrent(torrent_file, piece_size) self._write_torrent(torrent) @@ -195,7 +198,7 @@ class Tests(unittest.TestCase): ] ) - do_verify(self._conn, other_disk.id, torrent_file, read_size = 128, read_tries = 1) + do_verify(self._conn, other_disk.id, od_ss, torrent_file, read_size = 128, read_tries = 1) cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;") [(row_count,)] = cursor.fetchall() @@ -203,7 +206,7 @@ class Tests(unittest.TestCase): torrent_file.seek(piece_size) disk_file = io.BytesIO(torrent_file.read()) - do_verify(self._conn, disk.id, disk_file, read_size = 128, read_tries = 1) + do_verify(self._conn, disk.id, d_ss, disk_file, read_size = 128, read_tries = 1) cursor.execute( "select disk_id from diskjumble.verify_piece_content natural join diskjumble.verify_piece_incomplete;" @@ -225,8 +228,10 @@ class Tests(unittest.TestCase): """ piece_size = 128 - other_disk = self._write_disk(16, 5) - disk = self._write_disk(32, 2) + other_disk = self._write_disk(5) + od_ss = 16 + disk = self._write_disk(2) + d_ss = 32 with _random_file(piece_size, Random(0), on_disk = False) as torrent_file: torrent = _Torrent(torrent_file, piece_size) self._write_torrent(torrent) @@ -239,13 +244,13 @@ class Tests(unittest.TestCase): ] ) - do_verify(self._conn, other_disk.id, torrent_file, read_size = 16, read_tries = 1) + do_verify(self._conn, other_disk.id, od_ss, torrent_file, read_size = 16, read_tries = 1) cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;") [(row_count,)] = cursor.fetchall() self.assertEqual(row_count, 1) disk_file = io.BytesIO(torrent_file.getvalue()[64:]) - do_verify(self._conn, disk.id, disk_file, read_size = 16, read_tries = 1) + do_verify(self._conn, disk.id, d_ss, disk_file, read_size = 16, read_tries = 1) cursor.execute(""" select disk_id, disk_sectors @@ -268,9 +273,10 @@ class Tests(unittest.TestCase): """ Test a run where a read to the disk fails but fewer times than needed to mark the sector bad. """ + sector_size = 32 piece_size = 32 - disk = self._write_disk(32, 1) + disk = self._write_disk(1) with _random_file(piece_size, Random(0), on_disk = False) as torrent_file: torrent = _Torrent(torrent_file, piece_size) self._write_torrent(torrent) @@ -281,7 +287,7 @@ class Tests(unittest.TestCase): ) disk_file = _ReadErrorProxy(torrent_file, error_pos = 12, error_count = 2) - do_verify(self._conn, disk.id, disk_file, read_size = 4, read_tries = 3) + do_verify(self._conn, disk.id, sector_size, disk_file, read_size = 4, read_tries = 3) self.assertEqual(disk_file.triggered, 2) @@ -296,11 +302,12 @@ class Tests(unittest.TestCase): """ Test a run where a disk read fails enough times to trigger the bad sector logic. """ + sector_size = 16 piece_size = 64 - other_a = self._write_disk(16, 1) - other_b = self._write_disk(16, 2) - disk = self._write_disk(16, 1) + other_a = self._write_disk(1) + other_b = self._write_disk(2) + disk = self._write_disk(1) with _random_file(piece_size, Random(0), on_disk = False) as torrent_file: torrent = _Torrent(torrent_file, piece_size) self._write_torrent(torrent) @@ -314,16 +321,16 @@ class Tests(unittest.TestCase): ] ) - do_verify(self._conn, other_a.id, torrent_file, read_size = 16, read_tries = 1) + do_verify(self._conn, other_a.id, sector_size, torrent_file, read_size = 16, read_tries = 1) other_b_file = io.BytesIO(torrent_file.getvalue()[16:48]) - do_verify(self._conn, other_b.id, other_b_file, read_size = 16, read_tries = 1) + do_verify(self._conn, other_b.id, sector_size, other_b_file, read_size = 16, read_tries = 1) cursor.execute("select verify_id from diskjumble.verify_piece;") [(verify_id,)] = cursor.fetchall() data = torrent_file.getvalue()[48:] disk_file = _ReadErrorProxy(io.BytesIO(data), error_pos = 5, error_count = None) - do_verify(self._conn, disk.id, disk_file, read_size = 4, read_tries = 3) + do_verify(self._conn, disk.id, sector_size, disk_file, read_size = 4, read_tries = 3) cursor.execute("select count(*) from diskjumble.verify_pass;") [(row_count,)] = cursor.fetchall() @@ -346,14 +353,14 @@ class Tests(unittest.TestCase): with self._conn.cursor() as cursor: cursor.execute("insert into bittorrent.torrent_info values (%s);", (torrent.info,)) - def _write_disk(self, sector_size: int, sector_count: int) -> "_Disk": + def _write_disk(self, sector_count: int) -> "_Disk": with self._conn.cursor() as cursor: cursor.execute( - "insert into diskjumble.disk values (default, %s, null, %s, %s, false) returning disk_id;", - (uuid.uuid4(), sector_size, sector_count) + "insert into diskjumble.disk values (default, %s, null, %s, false) returning disk_id;", + (uuid.uuid4(), sector_count) ) [(id_,)] = cursor.fetchall() - return _Disk(id_, sector_size, sector_count) + return _Disk(id_, sector_count) @classmethod def setUpClass(cls): @@ -389,7 +396,6 @@ class Tests(unittest.TestCase): @dataclass class _Disk: id: int - sector_size: int sector_count: int diff --git a/disk_jumble/src/disk_jumble/tests/verify_setup.sql b/disk_jumble/src/disk_jumble/tests/verify_setup.sql index aa0c5fb..9189c4e 100644 --- a/disk_jumble/src/disk_jumble/tests/verify_setup.sql +++ b/disk_jumble/src/disk_jumble/tests/verify_setup.sql @@ -9,8 +9,8 @@ AS '$libdir/pgcrypto', $function$pg_digest$function$ -- PostgreSQL database dump -- --- Dumped from database version 13.2 (Debian 13.2-1.pgdg100+1) --- Dumped by pg_dump version 13.4 (Debian 13.4-0+deb11u1) +-- Dumped from database version 13.5 (Debian 13.5-1.pgdg100+1) +-- Dumped by pg_dump version 13.5 (Debian 13.5-0+deb11u1) SET statement_timeout = 0; SET lock_timeout = 0; @@ -46,8 +46,7 @@ CREATE TABLE diskjumble.disk ( disk_id integer NOT NULL, dev_uuid uuid NOT NULL, dev_serial text, - sector_size integer NOT NULL, - num_sectors bigint NOT NULL, + num_blocks bigint NOT NULL, failed boolean DEFAULT false NOT NULL ); @@ -83,10 +82,11 @@ ALTER SEQUENCE diskjumble.disk_id_seq OWNED BY diskjumble.disk.disk_id; CREATE TABLE diskjumble.slab ( slab_id integer NOT NULL, disk_id integer NOT NULL, - disk_sectors int8range NOT NULL, + disk_blocks int8range NOT NULL, entity_id bytea NOT NULL, entity_offset bigint NOT NULL, - crypt_key bytea + crypt_key bytea, + realized boolean DEFAULT false ); @@ -271,7 +271,7 @@ ALTER TABLE ONLY diskjumble.disk -- ALTER TABLE ONLY diskjumble.slab - ADD CONSTRAINT slab_disk_id_disk_sectors_excl EXCLUDE USING gist (disk_id WITH =, disk_sectors WITH &&); + ADD CONSTRAINT slab_disk_id_disk_sectors_excl EXCLUDE USING gist (disk_id WITH =, disk_blocks WITH &&); -- diff --git a/disk_jumble/src/disk_jumble/verify.py b/disk_jumble/src/disk_jumble/verify.py index f644d9a..6c15143 100644 --- a/disk_jumble/src/disk_jumble/verify.py +++ b/disk_jumble/src/disk_jumble/verify.py @@ -12,7 +12,7 @@ import math from psycopg2.extras import NumericRange import psycopg2 -from disk_jumble import bencode +from disk_jumble import bencode, SECTOR_SIZE from disk_jumble.db import HasherRef, Slab, Wrapper as DbWrapper from disk_jumble.nettle import Sha1Hasher @@ -43,9 +43,8 @@ class _BadSector(Exception): pass -def do_verify(conn, disk_id: int, disk_file: io.BufferedIOBase, read_size: int, read_tries: int) -> None: +def do_verify(conn, disk_id: int, sector_size: int, disk_file: io.BufferedIOBase, read_size: int, read_tries: int) -> None: db = DbWrapper(conn) - disk = db.get_disk(disk_id) info_dicts = { info_hash: bencode.decode(info) @@ -53,11 +52,11 @@ def do_verify(conn, disk_id: int, disk_file: io.BufferedIOBase, read_size: int, } tasks = [] - slabs_and_hashers = db.get_slabs_and_hashers(disk_id) + slabs_and_hashers = db.get_slabs_and_hashers(disk_id, sector_size) for (entity_id, group) in itertools.groupby(slabs_and_hashers, lambda t: t[0].entity_id): info = info_dicts[entity_id] piece_len = info[b"piece length"] - assert piece_len % disk.sector_size == 0 + assert piece_len % sector_size == 0 if b"length" in info: torrent_len = info[b"length"] else: @@ -67,7 +66,7 @@ def do_verify(conn, disk_id: int, disk_file: io.BufferedIOBase, read_size: int, use_hasher = None chunks = [] for (slab, hasher_ref) in group: - slab_end = min(slab.entity_offset + len(slab.sectors) * disk.sector_size, torrent_len) + slab_end = min(slab.entity_offset + len(slab.sectors) * sector_size, torrent_len) while offset is None or offset < slab_end: if offset is not None and slab.entity_offset > offset: @@ -126,15 +125,15 @@ def do_verify(conn, disk_id: int, disk_file: io.BufferedIOBase, read_size: int, hasher = Sha1Hasher(task.hasher_ref.state if task.hasher_ref else None) sector_ranges = [ range( - chunk.slab.sectors.start + chunk.slice.start // disk.sector_size, - chunk.slab.sectors.start + math.ceil(chunk.slice.stop / disk.sector_size) + chunk.slab.sectors.start + chunk.slice.start // sector_size, + chunk.slab.sectors.start + math.ceil(chunk.slice.stop / sector_size) ) for chunk in task.chunks ] try: for chunk in task.chunks: - slab_off = chunk.slab.sectors.start * disk.sector_size + slab_off = chunk.slab.sectors.start * sector_size disk_file.seek(slab_off + chunk.slice.start) end_pos = slab_off + chunk.slice.stop while disk_file.tell() < end_pos: @@ -379,5 +378,7 @@ if __name__ == "__main__": conn.autocommit = True path = f"/dev/mapper/diskjumble-{args.disk_id}" with open(path, "rb", buffering = _READ_BUFFER_SIZE) as disk_file: - verify_func = do_verify_v2 if args.entity_version == "2" else do_verify - verify_func(conn, args.disk_id, disk_file, _READ_BUFFER_SIZE, args.read_tries) + if args.entity_version == "1": + do_verify(conn, args.disk_id, SECTOR_SIZE, disk_file, _READ_BUFFER_SIZE, args.read_tries) + else: + do_verify_v2(conn, args.disk_id, disk_file, _READ_BUFFER_SIZE, args.read_tries) -- 2.30.2