From: Jakob Cornell Date: Thu, 20 Jan 2022 01:53:11 +0000 (-0600) Subject: Demote Disk Jumble tree to subdirectory for EROS repo merge X-Git-Url: https://jcornell.net/gitweb/gitweb.cgi?a=commitdiff_plain;h=1a2ae836f5602651ffe4deea6f4b6d07a82c0b8f;p=eros.git Demote Disk Jumble tree to subdirectory for EROS repo merge --- diff --git a/disk_jumble/pyproject.toml b/disk_jumble/pyproject.toml new file mode 100644 index 0000000..8fe2f47 --- /dev/null +++ b/disk_jumble/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["setuptools>=42", "wheel"] +build-backend = "setuptools.build_meta" diff --git a/disk_jumble/setup.cfg b/disk_jumble/setup.cfg new file mode 100644 index 0000000..3fc8d70 --- /dev/null +++ b/disk_jumble/setup.cfg @@ -0,0 +1,11 @@ +[metadata] +name = disk_jumble +version = 0.0.1 + +[options] +package_dir = + = src +packages = disk_jumble +python_requires ~= "3.7" +install_requires = + psycopg2-binary ~= 2.8 diff --git a/disk_jumble/setup.py b/disk_jumble/setup.py new file mode 100644 index 0000000..b024da8 --- /dev/null +++ b/disk_jumble/setup.py @@ -0,0 +1,4 @@ +from setuptools import setup + + +setup() diff --git a/disk_jumble/src/disk_jumble/__init__.py b/disk_jumble/src/disk_jumble/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/disk_jumble/src/disk_jumble/bencode.py b/disk_jumble/src/disk_jumble/bencode.py new file mode 100644 index 0000000..7883b53 --- /dev/null +++ b/disk_jumble/src/disk_jumble/bencode.py @@ -0,0 +1,124 @@ +from __future__ import annotations +from typing import Union +import itertools + + +Bdict = dict[bytes, 'Type'] +Type = Union[bytes, int, list['Type'], Bdict] + + +class CodecError(Exception): + pass + + +def _pop_bytes(vals: list[bytes]) -> bytes: + len_parts = [] + while vals and vals[0].isdigit(): + len_parts.append(vals.pop(0)) + + try: + length = int(b"".join(len_parts).decode("ascii")) + except ValueError: + raise CodecError() + + try: + if vals.pop(0) != b":": + raise CodecError() + except IndexError: + raise CodecError() + + if length > len(vals): + raise CodecError() + + out = b"".join(vals[:length]) + del vals[:length] + return out + + +def _pop_int(vals: list[bytes]) -> int: + assert vals.pop(0) == b"i" + + try: + end = vals.index(b"e") + except ValueError: + raise CodecError() + + try: + out = int(b"".join(vals[:end]).decode("ascii")) + except ValueError: + raise CodecError() + + del vals[slice(end + 1)] + return out + + +def _pop_list(vals: list[bytes]) -> list[Type]: + assert vals.pop(0) == b"l" + + out = [] + while vals and vals[0] != b"e": + out.append(_pop_value(vals)) + + if vals: + del vals[0] + return out + else: + raise CodecError() + + +def _pop_dict(vals: list[bytes]) -> Bdict: + assert vals.pop(0) == b"d" + + out = {} + while vals and vals[0] != b"e": + key = _pop_bytes(vals) + out[key] = _pop_value(vals) + + if vals: + del vals[0] + return out + else: + raise CodecError() + + +def _pop_value(vals: list[bytes]) -> Type: + if vals: + if vals[0].isdigit(): + return _pop_bytes(vals) + elif vals[0] == b"i": + return _pop_int(vals) + elif vals[0] == b"l": + return _pop_list(vals) + elif vals[0] == b"d": + return _pop_dict(vals) + else: + raise CodecError() + else: + raise CodecError() + + +def decode(data: bytes) -> Type: + vals = [bytes([v]) for v in data] + out = _pop_value(vals) + if vals: + raise CodecError() + else: + return out + + +def _encode_helper(data: Type) -> list[bytes]: + if isinstance(data, bytes): + return [str(len(data)).encode("ascii"), b":", data] + elif isinstance(data, int): + return [b"i", str(data).encode("ascii"), b"e"] + elif isinstance(data, list): + return [b"l", *itertools.chain.from_iterable(map(_encode_helper, data)), b"e"] + elif isinstance(data, dict): + contents = itertools.chain.from_iterable(data.items()) + return [b"d", *itertools.chain.from_iterable(map(_encode_helper, contents)), b"e"] + else: + raise CodecError() + + +def encode(data: Type) -> bytes: + return b"".join(_encode_helper(data)) diff --git a/disk_jumble/src/disk_jumble/db.py b/disk_jumble/src/disk_jumble/db.py new file mode 100644 index 0000000..68488f0 --- /dev/null +++ b/disk_jumble/src/disk_jumble/db.py @@ -0,0 +1,223 @@ +from __future__ import annotations +from dataclasses import dataclass +from typing import Any, Iterable, Optional +import datetime as dt +import itertools + +from psycopg2.extras import execute_batch, Json, NumericRange + + +@dataclass +class Disk: + sector_size: int + + +@dataclass +class Slab: + id: int + disk_id: int + sectors: range + entity_id: bytes + entity_offset: int + crypt_key: bytes + + +@dataclass +class HasherRef: + id: int + seq: int + entity_offset: int + state: dict + + +@dataclass +class Wrapper: + conn: Any + + def get_passkey(self, tracker_id: int) -> str: + with self.conn.cursor() as cursor: + cursor.execute("select passkey from gazelle.passkey where gazelle_tracker_id = %s;", (tracker_id,)) + [(passkey,)] = cursor.fetchall() + + return passkey + + def get_torrents(self, tracker_id: int, batch_size: Optional[int] = None) -> Iterable[bytes]: + """Iterate the info hashes for the specified tracker which haven't been marked deleted.""" + + stmt = """ + select infohash from gazelle.torrent + where gazelle_tracker_id = %s and not is_deleted + order by infohash asc + ; + """ + with self.conn.cursor() as cursor: + if batch_size is not None: + cursor.itersize = batch_size + + cursor.execute(stmt, (tracker_id,)) + for row in cursor: + (info_hash_mem,) = row + info_hash = bytes(info_hash_mem) + assert len(info_hash) == 20 + yield info_hash + + def insert_swarm_info(self, tracker_id: int, infos: Iterable["disk_jumble.scrape.ScrapeInfo"]) -> None: + stmt = """ + insert into gazelle.tracker_stat (gazelle_tracker_id, infohash, ts, complete, incomplete, downloaded) + values (%s, %s, %s, %s, %s, %s) + ; + """ + with self.conn.cursor() as cursor: + param_sets = [ + (tracker_id, i.info_hash, i.timestamp, i.complete, i.incomplete, i.downloaded) + for i in infos + ] + execute_batch(cursor, stmt, param_sets) + + def get_disk(self, id_: int) -> Disk: + with self.conn.cursor() as cursor: + cursor.execute("select sector_size from diskjumble.disk where disk_id = %s;", (id_,)) + [(sector_size,)] = cursor.fetchall() + + return Disk(sector_size) + + def get_slabs_and_hashers(self, disk_id: int) -> Iterable[tuple[Slab, Optional[HasherRef]]]: + """ + Find slabs on the specified disk, and for each also return any lowest-offset saved hasher that left off directly + before or within the slab's entity data. + """ + + stmt = """ + with + incomplete_edge as ( + -- join up incomplete piece info and precompute where the hasher left off within the entity + select + verify_id, seq, slab.entity_id, hasher_state, + entity_offset + (upper(c.disk_sectors) - lower(slab.disk_sectors)) * sector_size as end_off + from + diskjumble.verify_piece_incomplete + natural left join diskjumble.verify_piece p + natural join diskjumble.verify_piece_content c + natural left join diskjumble.disk + left join diskjumble.slab on ( + c.disk_id = slab.disk_id + and upper(c.disk_sectors) <@ int8range(lower(slab.disk_sectors), upper(slab.disk_sectors), '[]') + ) + where seq >= all (select seq from diskjumble.verify_piece_content where verify_id = p.verify_id) + ) + select + slab_id, disk_id, disk_sectors, slab.entity_id, entity_offset, crypt_key, verify_id, seq, end_off, + hasher_state + from + diskjumble.slab + natural left join diskjumble.disk + left join incomplete_edge on + incomplete_edge.entity_id = slab.entity_id + and incomplete_edge.end_off <@ int8range( + slab.entity_offset, + slab.entity_offset + (upper(disk_sectors) - lower(disk_sectors)) * sector_size + ) + and (incomplete_edge.end_off - slab.entity_offset) %% sector_size = 0 + where disk_id = %s + order by slab.entity_id, entity_offset, slab_id + ; + """ + with self.conn.cursor() as cursor: + cursor.execute(stmt, (disk_id,)) + for (_, rows_iter) in itertools.groupby(cursor, lambda r: r[0]): + rows = list(rows_iter) + [(slab_id, disk_id, sectors_pg, entity_id, entity_off, key)] = {r[:6] for r in rows} + sectors = range(sectors_pg.lower, sectors_pg.upper) + slab = Slab(slab_id, disk_id, sectors, bytes(entity_id), entity_off, key) + + # `None' if no hasher match in outer join, otherwise earliest match + (*_, id_, seq, end_off, state) = min(rows, key = lambda r: r[-2]) + hasher_ref = None if id_ is None else HasherRef(id_, seq, end_off, state) + + yield (slab, hasher_ref) + + def get_torrent_info(self, disk_id: int) -> Iterable[tuple[bytes, bytes]]: + stmt = """ + with hashed as ( + select digest(info, 'sha1') as info_hash, info + from bittorrent.torrent_info + ) + select + distinct on (info_hash) + info_hash, info + from diskjumble.slab left outer join hashed on entity_id = info_hash + where disk_id = %s + ; + """ + with self.conn.cursor() as cursor: + cursor.execute(stmt, (disk_id,)) + for (info_hash, info) in cursor: + yield (bytes(info_hash), bytes(info)) + + def insert_verify_piece(self, ts: dt.datetime, entity_id: bytes, piece_num: int) -> int: + """Insert new verify piece, returning the ID of the inserted row.""" + + with self.conn.cursor() as cursor: + stmt = "insert into diskjumble.verify_piece values (default, %s, %s, %s) returning verify_id;" + cursor.execute(stmt, (ts, entity_id, piece_num)) + [(row_id,)] = cursor.fetchall() + return row_id + + def insert_verify_piece_content(self, verify_id: int, seq_start: int, disk_id: int, ranges: Iterable[range]) -> None: + with self.conn.cursor() as cursor: + execute_batch( + cursor, + "insert into diskjumble.verify_piece_content values (%s, %s, %s, %s);", + [ + (verify_id, seq, disk_id, NumericRange(r.start, r.stop)) + for (seq, r) in enumerate(ranges, start = seq_start) + ] + ) + + def mark_verify_piece_failed(self, verify_id: int) -> None: + with self.conn.cursor() as cursor: + cursor.execute("insert into diskjumble.verify_piece_fail values (%s);", (verify_id,)) + + def upsert_hasher_state(self, verify_id: int, state: dict) -> None: + stmt = """ + insert into diskjumble.verify_piece_incomplete values (%s, %s) + on conflict (verify_id) do update set hasher_state = excluded.hasher_state + ; + """ + with self.conn.cursor() as cursor: + cursor.execute(stmt, (verify_id, Json(state))) + + def delete_verify_piece(self, verify_id: int) -> None: + with self.conn.cursor() as cursor: + cursor.execute("delete from diskjumble.verify_piece_incomplete where verify_id = %s;", (verify_id,)) + cursor.execute("delete from diskjumble.verify_piece_content where verify_id = %s;", (verify_id,)) + cursor.execute("delete from diskjumble.verify_piece where verify_id = %s", (verify_id,)) + + def move_piece_content_for_pass(self, verify_id: int) -> None: + stmt = """ + with content_out as ( + delete from diskjumble.verify_piece_content c + using diskjumble.verify_piece p + where ( + c.verify_id = p.verify_id + and p.verify_id = %s + ) + returning at, disk_id, disk_sectors + ) + insert into diskjumble.verify_pass (at, disk_id, disk_sectors) + select at, disk_id, disk_sectors from content_out + ; + """ + with self.conn.cursor() as cursor: + cursor.execute(stmt, (verify_id,)) + + def insert_pass_data(self, ts: dt.datetime, disk_id: int, sectors: range) -> None: + with self.conn.cursor() as cursor: + cursor.execute( + "insert into diskjumble.verify_pass values (default, %s, %s, %s);", + (ts, disk_id, NumericRange(sectors.start, sectors.stop)) + ) + + def clear_incomplete(self, verify_id: int) -> None: + with self.conn.cursor() as cursor: + cursor.execute("delete from diskjumble.verify_piece_incomplete where verify_id = %s;", (verify_id,)) diff --git a/disk_jumble/src/disk_jumble/nettle.py b/disk_jumble/src/disk_jumble/nettle.py new file mode 100644 index 0000000..dfabbca --- /dev/null +++ b/disk_jumble/src/disk_jumble/nettle.py @@ -0,0 +1,66 @@ +"""Python wrappers for some of GnuTLS Nettle.""" + +from ctypes.util import find_library +from typing import Optional +import ctypes + + +_LIB = ctypes.CDLL(find_library("nettle")) + + +class _Sha1Defs: + _DIGEST_SIZE = 20 # in bytes + _BLOCK_SIZE = 64 # in bytes + _DIGEST_LENGTH = 5 + + _StateArr = ctypes.c_uint32 * _DIGEST_LENGTH + + _BlockArr = ctypes.c_uint8 * _BLOCK_SIZE + + +class Sha1Hasher(_Sha1Defs): + class Context(ctypes.Structure): + _fields_ = [ + ("state", _Sha1Defs._StateArr), + ("count", ctypes.c_uint64), + ("index", ctypes.c_uint), + ("block", _Sha1Defs._BlockArr), + ] + + @classmethod + def deserialize(cls, data): + return cls( + _Sha1Defs._StateArr(*data["state"]), + data["count"], + data["index"], + _Sha1Defs._BlockArr(*data["block"]), + ) + + def serialize(self): + return { + "state": list(self.state), + "count": self.count, + "index": self.index, + "block": list(self.block), + } + + @classmethod + def _new_context(cls): + ctx = cls.Context() + _LIB.nettle_sha1_init(ctypes.byref(ctx)) + return ctx + + def __init__(self, ctx_dict: Optional[dict]): + if ctx_dict: + self.ctx = self.Context.deserialize(ctx_dict) + else: + self.ctx = self._new_context() + + def update(self, data): + _LIB.nettle_sha1_update(ctypes.byref(self.ctx), len(data), data) + + def digest(self): + """Return the current digest and reset the hasher state.""" + out = (ctypes.c_uint8 * self._DIGEST_SIZE)() + _LIB.nettle_sha1_digest(ctypes.byref(self.ctx), self._DIGEST_SIZE, out) + return bytes(out) diff --git a/disk_jumble/src/disk_jumble/scrape.py b/disk_jumble/src/disk_jumble/scrape.py new file mode 100644 index 0000000..4143b8a --- /dev/null +++ b/disk_jumble/src/disk_jumble/scrape.py @@ -0,0 +1,157 @@ +from dataclasses import dataclass +from typing import Iterable, Union +from urllib.error import URLError +from urllib.parse import urlencode, urlparse, urlunparse +from urllib.request import HTTPHandler, HTTPRedirectHandler, HTTPSHandler, OpenerDirector, UnknownHandler +import argparse +import contextlib +import datetime as dt +import itertools +import re +import sys +import time + +import psycopg2 + +from disk_jumble import bencode +from disk_jumble.db import Wrapper as DbWrapper +from disk_jumble.trackers import Tracker, TRACKERS + + +class Result: + pass + + +@dataclass +class OkResult(Result): + data: bencode.Type + + +@dataclass +class ErrorResult(Result): + status: int + body: Union[bencode.Type, bytes] + + +@dataclass +class ScrapeInfo: + info_hash: bytes + timestamp: dt.datetime + complete: int + incomplete: int + downloaded: int + + +def scrape_batch(tracker: Tracker, info_hashes: Iterable[bytes], passkey: str) -> Result: + qs = urlencode({"info_hash": list(info_hashes)}, doseq = True) + url_parts = urlparse(tracker.scrape_url_format.format(passkey = passkey)) + assert not url_parts.query + url = urlunparse(url_parts._replace(query = qs)) + + # we handle HTTP errors ourself + opener = OpenerDirector() + for handler in [UnknownHandler(), HTTPHandler(), HTTPSHandler(), HTTPRedirectHandler()]: + opener.add_handler(handler) + + with opener.open(url) as resp: + body = resp.read() + + try: + data = bencode.decode(body) + except bencode.DecodeError: + data = body + else: + if resp.getcode() == 200 and b"files" in data: + return OkResult(data) + + return ErrorResult(resp.getcode(), data) + + +if __name__ == "__main__": + PSQL_PARAMS = ["dbname", "user", "password", "host", "port"] + + def tracker_name(val): + matches = [n for n in TRACKERS.keys() if n.lower() == val.lower()] + if matches: + [name] = matches + return name + else: + raise ValueError() + + def batch_size(val): + out = int(val) + if out > 0: + return out + else: + raise ValueError() + + def delay(val): + out = int(val) + if out >= 0: + return out + else: + raise ValueError() + + parser = argparse.ArgumentParser() + parser.add_argument( + "tracker", + choices = sorted(TRACKERS.keys()), + type = tracker_name, + help = "name of tracker to scrape", + ) + parser.add_argument( + "batch_size", + type = batch_size, + help = "number of torrents per batch", + ) + parser.add_argument( + "delay", + type = delay, + help = "delay between batches, in seconds", + ) + + for arg_name in PSQL_PARAMS: + parser.add_argument("--" + arg_name, nargs = "?") + + args = parser.parse_args() + tracker = TRACKERS[args.tracker] + delay = dt.timedelta(seconds = args.delay) + + params = {n: getattr(args, n) for n in PSQL_PARAMS if getattr(args, n)} + with contextlib.closing(psycopg2.connect(**params)) as conn: + conn.autocommit = True + db_wrapper = DbWrapper(conn) + passkey = db_wrapper.get_passkey(tracker.gazelle_id) + info_hashes = iter(db_wrapper.get_torrents(tracker.gazelle_id, args.batch_size)) + batches = iter(lambda: list(itertools.islice(info_hashes, args.batch_size)), []) + for (i, batch) in enumerate(batches): + if i != 0: + time.sleep(delay.total_seconds()) + timestamp = dt.datetime.now(dt.timezone.utc) + try: + result = scrape_batch(tracker, batch, passkey) + except URLError as e: + print("[ERR] couldn't connect: {}".format(e), file = sys.stderr, flush = True) + else: + if isinstance(result, OkResult): + assert set(result.data[b"files"].keys()) <= set(batch), "unexpected torrent in respose" + infos = [ + ScrapeInfo( + info_hash, + timestamp, + complete = meta_dict[b"complete"], + incomplete = meta_dict[b"incomplete"], + downloaded = meta_dict[b"downloaded"], + ) + for (info_hash, meta_dict) in result.data[b"files"].items() + ] + db_wrapper.insert_swarm_info(tracker.gazelle_id, infos) + print("[OK] finished batch ({} of {} torrents)".format(len(infos), len(batch)), file = sys.stderr, flush = True) + elif isinstance(result, ErrorResult): + full_disp = result.body.decode("ascii", "ignore") if isinstance(result.body, bytes) else str(result.body) + clean_disp = re.sub(r"\s", " ", full_disp) + display_size = 100 + disp = clean_disp if len(clean_disp) <= display_size else clean_disp[:display_size] + "…" + print("[ERR] tracker responded {}: {}".format(result.status, disp), file = sys.stderr, flush = True) + else: + raise Exception() diff --git a/disk_jumble/src/disk_jumble/tests/__init__.py b/disk_jumble/src/disk_jumble/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/disk_jumble/src/disk_jumble/tests/test_verify.py b/disk_jumble/src/disk_jumble/tests/test_verify.py new file mode 100644 index 0000000..eea5007 --- /dev/null +++ b/disk_jumble/src/disk_jumble/tests/test_verify.py @@ -0,0 +1,451 @@ +""" +Tests for the verification program `disk_jumble.verify' + +Like the verification program itself, these tests take database connection information from the environment. The +necessary schemas and tables are set up from scratch by the test code, so environment variables should point to a +database that's not hosting a live instance of Disk Jumble. Ideally, this is a completely empty local database created +for the purposes of testing, but other options are possible. + +The tests need access to an SQL source file containing the definitions for the required tables and other Postgres +objects; see `test_util/dump_db.py'. +""" + +from dataclasses import dataclass +from importlib import resources +from random import Random +from typing import Optional +import hashlib +import io +import tempfile +import unittest +import uuid + +from psycopg2.extras import NumericRange +import psycopg2 +import psycopg2.extras + +from disk_jumble import bencode +from disk_jumble.nettle import Sha1Hasher +from disk_jumble.verify import do_verify + + +_BUF_SIZE = 16 * 1024 ** 2 # in bytes + + +class Tests(unittest.TestCase): + _SCHEMAS = {"public", "diskjumble", "bittorrent"} + + def _basic_fresh_verify_helper(self, read_size): + sector_size = 32 + piece_size = 64 + + torrent_len = 3 * piece_size + disk = self._write_disk(sector_size, torrent_len // sector_size) + with _random_file(torrent_len, Random(0), on_disk = False) as torrent_file: + torrent = _Torrent(torrent_file, piece_size) + self._write_torrent(torrent) + with self._conn.cursor() as cursor: + cursor.execute( + "insert into diskjumble.slab values (default, %s, %s, %s, 0, null);", + (disk.id, NumericRange(0, disk.sector_count), torrent.info_hash) + ) + + do_verify(self._conn, disk.id, torrent_file, read_size, read_tries = 1) + + cursor.execute("select * from diskjumble.verify_pass;") + self.assertEqual(cursor.rowcount, 1) + (_, _, disk_id, sectors) = cursor.fetchone() + self.assertEqual(disk_id, disk.id) + self.assertEqual(sectors, NumericRange(0, torrent_len // sector_size)) + + def test_basic_fresh_verify_small_read_size(self): + self._basic_fresh_verify_helper(16) + + def test_basic_fresh_verify_large_read_size(self): + self._basic_fresh_verify_helper(128) + + def test_resume_fragmentation_unaligned_end(self): + """ + Test a run where a cached hash state is used, a piece is split on disk, and the end of the torrent isn't + sector-aligned. + """ + read_size = 8 + piece_size = 64 + + other_disk = self._write_disk(16, 1) + disk = self._write_disk(32, 3) + with _random_file(piece_size, Random(0), on_disk = False) as torrent_file: + torrent = _Torrent(torrent_file, piece_size) + self._write_torrent(torrent) + with self._conn.cursor() as cursor: + cursor.executemany( + "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);", + [ + (other_disk.id, NumericRange(0, 1), torrent.info_hash, 0), + (disk.id, NumericRange(0, 1), torrent.info_hash, other_disk.sector_size), + (disk.id, NumericRange(2, 3), torrent.info_hash, other_disk.sector_size + disk.sector_size), + ] + ) + + # Prepare the saved hasher state by running a verify + do_verify(self._conn, other_disk.id, torrent_file, read_size, read_tries = 1) + torrent_file.seek(0) + + cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;") + [(row_count,)] = cursor.fetchall() + self.assertEqual(row_count, 1) + + disk_file = io.BytesIO() + torrent_file.seek(other_disk.sector_size) + disk_file.write(torrent_file.read(disk.sector_size)) + disk_file.seek(disk_file.tell() + disk.sector_size) + disk_file.write(torrent_file.read()) + disk_file.seek(0) + do_verify(self._conn, disk.id, disk_file, read_size, read_tries = 1) + + # Check that there are no verify pieces in the database. Because of integrity constraints, this also + # guarantees there aren't any stray saved hasher states, failed verifies, or piece contents. + cursor.execute("select count(*) from diskjumble.verify_piece;") + [(row_count,)] = cursor.fetchall() + self.assertEqual(row_count, 0) + + cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;") + self.assertEqual( + cursor.fetchall(), + [(other_disk.id, NumericRange(0, 1)), (disk.id, NumericRange(0, 1)), (disk.id, NumericRange(2, 3))] + ) + + def test_resume_no_completion(self): + """ + Test a run where a saved hasher state is used and the target disk has subsequent entity data but not the full + remainder of the piece. + """ + read_size = 7 + piece_size = 64 + + other_disk = self._write_disk(16, 1) + disk = self._write_disk(32, 1) + with _random_file(piece_size, Random(0), on_disk = False) as torrent_file: + torrent = _Torrent(torrent_file, piece_size) + self._write_torrent(torrent) + with self._conn.cursor() as cursor: + cursor.executemany( + "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);", + [ + (other_disk.id, NumericRange(0, 1), torrent.info_hash, 0), + (disk.id, NumericRange(0, 1), torrent.info_hash, other_disk.sector_size), + ] + ) + + do_verify(self._conn, other_disk.id, torrent_file, read_size, read_tries = 1) + + cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;") + [(row_count,)] = cursor.fetchall() + self.assertEqual(row_count, 1) + + disk_file = io.BytesIO() + torrent_file.seek(other_disk.sector_size) + disk_file.write(torrent_file.read(disk.sector_size)) + disk_file.seek(0) + do_verify(self._conn, disk.id, disk_file, read_size, read_tries = 1) + + cursor.execute("select count(*) from diskjumble.verify_pass;") + [(row_count,)] = cursor.fetchall() + self.assertEqual(row_count, 0) + + cursor.execute("select entity_id, piece from diskjumble.verify_piece;") + [(entity_id, piece_num)] = cursor.fetchall() + self.assertEqual(bytes(entity_id), torrent.info_hash) + self.assertEqual(piece_num, 0) + + cursor.execute("select disk_id, disk_sectors from diskjumble.verify_piece_content;") + self.assertCountEqual( + cursor.fetchall(), + [(other_disk.id, NumericRange(0, 1)), (disk.id, NumericRange(0, 1))] + ) + + cursor.execute("select count(*) from diskjumble.verify_piece_fail;") + [(row_count,)] = cursor.fetchall() + self.assertEqual(row_count, 0) + + hasher = Sha1Hasher(None) + torrent_file.seek(0) + hasher.update(torrent_file.read(other_disk.sector_size + disk.sector_size)) + cursor.execute("select hasher_state from diskjumble.verify_piece_incomplete;") + self.assertEqual(cursor.fetchall(), [(hasher.ctx.serialize(),)]) + + def test_ignore_hasher_beginning_on_disk(self): + """ + Test a run where a saved hasher state is available for use but isn't used due to the beginning of the piece + being on disk. + """ + piece_size = 64 + + other_disk = self._write_disk(16, 1) + disk = self._write_disk(16, 4) + with _random_file(piece_size * 2, Random(0), on_disk = False) as torrent_file: + torrent = _Torrent(torrent_file, piece_size) + self._write_torrent(torrent) + with self._conn.cursor() as cursor: + cursor.executemany( + "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);", + [ + (other_disk.id, NumericRange(0, other_disk.sector_count), torrent.info_hash, piece_size), + (disk.id, NumericRange(0, disk.sector_count), torrent.info_hash, piece_size), + ] + ) + + do_verify(self._conn, other_disk.id, torrent_file, read_size = 128, read_tries = 1) + + cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;") + [(row_count,)] = cursor.fetchall() + self.assertEqual(row_count, 1) + + torrent_file.seek(piece_size) + disk_file = io.BytesIO(torrent_file.read()) + do_verify(self._conn, disk.id, disk_file, read_size = 128, read_tries = 1) + + cursor.execute( + "select disk_id from diskjumble.verify_piece_content natural join diskjumble.verify_piece_incomplete;" + ) + self.assertEqual(cursor.fetchall(), [(other_disk.id,)]) + + cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;") + self.assertEqual(cursor.fetchall(), [(disk.id, NumericRange(0, disk.sector_count))]) + + def test_ignore_hasher_unaligned(self): + """ + Test a run where a saved hasher isn't used because its entity data offset isn't sector-aligned on the target + disk. + + 0 16 32 48 64 80 96 112 128 + pieces: [-------------- 0 -------------] + other disk: [--][--][--][--][--] + disk: [------][------] + """ + piece_size = 128 + + other_disk = self._write_disk(16, 5) + disk = self._write_disk(32, 2) + with _random_file(piece_size, Random(0), on_disk = False) as torrent_file: + torrent = _Torrent(torrent_file, piece_size) + self._write_torrent(torrent) + with self._conn.cursor() as cursor: + cursor.executemany( + "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);", + [ + (other_disk.id, NumericRange(0, 5), torrent.info_hash, 0), + (disk.id, NumericRange(0, 2), torrent.info_hash, 64), + ] + ) + + do_verify(self._conn, other_disk.id, torrent_file, read_size = 16, read_tries = 1) + cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;") + [(row_count,)] = cursor.fetchall() + self.assertEqual(row_count, 1) + + disk_file = io.BytesIO(torrent_file.getvalue()[64:]) + do_verify(self._conn, disk.id, disk_file, read_size = 16, read_tries = 1) + + cursor.execute(""" + select disk_id, disk_sectors + from diskjumble.verify_piece_incomplete natural join diskjumble.verify_piece_content; + """) + self.assertEqual( + cursor.fetchall(), + [(other_disk.id, NumericRange(0, 5))] + ) + + cursor.execute("select count(*) from diskjumble.verify_pass;") + [(row_count,)] = cursor.fetchall() + self.assertEqual(row_count, 0) + + cursor.execute("select count(*) from diskjumble.verify_piece_fail;") + [(row_count,)] = cursor.fetchall() + self.assertEqual(row_count, 0) + + def test_transient_read_errors(self): + """ + Test a run where a read to the disk fails but fewer times than needed to mark the sector bad. + """ + piece_size = 32 + + disk = self._write_disk(32, 1) + with _random_file(piece_size, Random(0), on_disk = False) as torrent_file: + torrent = _Torrent(torrent_file, piece_size) + self._write_torrent(torrent) + with self._conn.cursor() as cursor: + cursor.execute( + "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);", + (disk.id, NumericRange(0, 1), torrent.info_hash, 0) + ) + + disk_file = _ReadErrorProxy(torrent_file, error_pos = 12, error_count = 2) + do_verify(self._conn, disk.id, disk_file, read_size = 4, read_tries = 3) + + self.assertEqual(disk_file.triggered, 2) + + cursor.execute("select count(*) from diskjumble.verify_piece;") + [(row_count,)] = cursor.fetchall() + self.assertEqual(row_count, 0) + + cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;") + self.assertEqual(cursor.fetchall(), [(disk.id, NumericRange(0, 1))]) + + def test_persistent_read_errors(self): + """ + Test a run where a disk read fails enough times to trigger the bad sector logic. + """ + piece_size = 64 + + other_a = self._write_disk(16, 1) + other_b = self._write_disk(16, 2) + disk = self._write_disk(16, 1) + with _random_file(piece_size, Random(0), on_disk = False) as torrent_file: + torrent = _Torrent(torrent_file, piece_size) + self._write_torrent(torrent) + with self._conn.cursor() as cursor: + cursor.executemany( + "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);", + [ + (other_a.id, NumericRange(0, 1), torrent.info_hash, 0), + (other_b.id, NumericRange(0, 2), torrent.info_hash, 16), + (disk.id, NumericRange(0, 1), torrent.info_hash, 48), + ] + ) + + do_verify(self._conn, other_a.id, torrent_file, read_size = 16, read_tries = 1) + other_b_file = io.BytesIO(torrent_file.getvalue()[16:48]) + do_verify(self._conn, other_b.id, other_b_file, read_size = 16, read_tries = 1) + + cursor.execute("select verify_id from diskjumble.verify_piece;") + [(verify_id,)] = cursor.fetchall() + + data = torrent_file.getvalue()[48:] + disk_file = _ReadErrorProxy(io.BytesIO(data), error_pos = 5, error_count = None) + do_verify(self._conn, disk.id, disk_file, read_size = 4, read_tries = 3) + + cursor.execute("select count(*) from diskjumble.verify_pass;") + [(row_count,)] = cursor.fetchall() + self.assertEqual(row_count, 0) + + cursor.execute("select disk_id, disk_sectors from diskjumble.verify_piece_content;") + self.assertCountEqual( + cursor.fetchall(), + [(other_a.id, NumericRange(0, 1)), (other_b.id, NumericRange(0, 2)), (disk.id, NumericRange(0, 1))] + ) + + cursor.execute("select verify_id from diskjumble.verify_piece_fail;") + self.assertEqual(cursor.fetchall(), [(verify_id,)]) + + cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;") + [(row_count,)] = cursor.fetchall() + self.assertEqual(row_count, 0) + + def _write_torrent(self, torrent: "_Torrent") -> None: + with self._conn.cursor() as cursor: + cursor.execute("insert into bittorrent.torrent_info values (%s);", (torrent.info,)) + + def _write_disk(self, sector_size: int, sector_count: int) -> "_Disk": + with self._conn.cursor() as cursor: + cursor.execute( + "insert into diskjumble.disk values (default, %s, null, %s, %s, false) returning disk_id;", + (uuid.uuid4(), sector_size, sector_count) + ) + [(id_,)] = cursor.fetchall() + return _Disk(id_, sector_size, sector_count) + + @classmethod + def setUpClass(cls): + psycopg2.extras.register_uuid() + prod_schema_sql = resources.files(__package__).joinpath("verify_setup.sql").read_text() + schema_sql = "\n".join( + l for l in prod_schema_sql.splitlines() + if ( + not l.startswith("SET") + and not l.startswith("SELECT") # ignore server settings + and "OWNER TO" not in l # and ownership changes + ) + ) + cls._conn = psycopg2.connect("") + with cls._conn, cls._conn.cursor() as cursor: + for name in cls._SCHEMAS: + cursor.execute(f"create schema {name};") + cursor.execute(schema_sql) + + @classmethod + def tearDownClass(self): + try: + with self._conn, self._conn.cursor() as cursor: + for name in self._SCHEMAS: + cursor.execute(f"drop schema {name} cascade;") + finally: + self._conn.close() + + def tearDown(self): + self._conn.rollback() + + +@dataclass +class _Disk: + id: int + sector_size: int + sector_count: int + + +class _Torrent: + def __init__(self, data: io.BufferedIOBase, piece_size: int) -> None: + data.seek(0) + hashes = [] + while True: + piece = data.read(piece_size) + if piece: + hashes.append(hashlib.sha1(piece).digest()) + else: + break + + info_dict = { + b"piece length": piece_size, + b"length": data.tell(), + b"pieces": b"".join(hashes), + } + self.data = data + self.info = bencode.encode(info_dict) + self.info_hash = hashlib.sha1(self.info).digest() + + +def _random_file(size: int, rand_src: Random, on_disk: bool) -> tempfile.NamedTemporaryFile: + f = tempfile.NamedTemporaryFile(buffering = _BUF_SIZE) if on_disk else io.BytesIO() + try: + while f.tell() < size: + write_size = min(size - f.tell(), _BUF_SIZE) + f.write(bytes(rand_src.getrandbits(8) for _ in range(write_size))) + f.seek(0) + return f + except Exception: + f.close() + raise + + +@dataclass +class _ReadErrorProxy(io.BufferedIOBase): + wrapped: io.BufferedIOBase + error_pos: int + error_count: Optional[int] + + def __post_init__(self): + self.triggered = 0 + + def read(self, size: int = -1) -> bytes: + pre_pos = self.wrapped.tell() + data = self.wrapped.read(size) + erroring = self.error_count is None or self.triggered < self.error_count + in_range = 0 <= pre_pos - self.error_pos < len(data) + if erroring and in_range: + self.triggered += 1 + raise OSError("simulated") + else: + return data + + def seek(self, *args, **kwargs) -> int: + return self.wrapped.seek(*args, **kwargs) diff --git a/disk_jumble/src/disk_jumble/tests/verify_setup.sql b/disk_jumble/src/disk_jumble/tests/verify_setup.sql new file mode 100644 index 0000000..aa0c5fb --- /dev/null +++ b/disk_jumble/src/disk_jumble/tests/verify_setup.sql @@ -0,0 +1,383 @@ +create extension btree_gist; +CREATE OR REPLACE FUNCTION public.digest(bytea, text) + RETURNS bytea + LANGUAGE c + IMMUTABLE PARALLEL SAFE STRICT +AS '$libdir/pgcrypto', $function$pg_digest$function$ +; +-- +-- PostgreSQL database dump +-- + +-- Dumped from database version 13.2 (Debian 13.2-1.pgdg100+1) +-- Dumped by pg_dump version 13.4 (Debian 13.4-0+deb11u1) + +SET statement_timeout = 0; +SET lock_timeout = 0; +SET idle_in_transaction_session_timeout = 0; +SET client_encoding = 'UTF8'; +SET standard_conforming_strings = on; +SELECT pg_catalog.set_config('search_path', '', false); +SET check_function_bodies = false; +SET xmloption = content; +SET client_min_messages = warning; +SET row_security = off; + +SET default_tablespace = ''; + +SET default_table_access_method = heap; + +-- +-- Name: torrent_info; Type: TABLE; Schema: bittorrent; Owner: eros +-- + +CREATE TABLE bittorrent.torrent_info ( + info bytea NOT NULL +); + + +ALTER TABLE bittorrent.torrent_info OWNER TO eros; + +-- +-- Name: disk; Type: TABLE; Schema: diskjumble; Owner: eros +-- + +CREATE TABLE diskjumble.disk ( + disk_id integer NOT NULL, + dev_uuid uuid NOT NULL, + dev_serial text, + sector_size integer NOT NULL, + num_sectors bigint NOT NULL, + failed boolean DEFAULT false NOT NULL +); + + +ALTER TABLE diskjumble.disk OWNER TO eros; + +-- +-- Name: disk_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros +-- + +CREATE SEQUENCE diskjumble.disk_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +ALTER TABLE diskjumble.disk_id_seq OWNER TO eros; + +-- +-- Name: disk_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros +-- + +ALTER SEQUENCE diskjumble.disk_id_seq OWNED BY diskjumble.disk.disk_id; + + +-- +-- Name: slab; Type: TABLE; Schema: diskjumble; Owner: eros +-- + +CREATE TABLE diskjumble.slab ( + slab_id integer NOT NULL, + disk_id integer NOT NULL, + disk_sectors int8range NOT NULL, + entity_id bytea NOT NULL, + entity_offset bigint NOT NULL, + crypt_key bytea +); + + +ALTER TABLE diskjumble.slab OWNER TO eros; + +-- +-- Name: slab_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros +-- + +CREATE SEQUENCE diskjumble.slab_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +ALTER TABLE diskjumble.slab_id_seq OWNER TO eros; + +-- +-- Name: slab_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros +-- + +ALTER SEQUENCE diskjumble.slab_id_seq OWNED BY diskjumble.slab.slab_id; + + +-- +-- Name: verify_pass; Type: TABLE; Schema: diskjumble; Owner: eros +-- + +CREATE TABLE diskjumble.verify_pass ( + verify_pass_id integer NOT NULL, + at timestamp with time zone, + disk_id integer NOT NULL, + disk_sectors int8range NOT NULL +); + + +ALTER TABLE diskjumble.verify_pass OWNER TO eros; + +-- +-- Name: verify_pass_verify_pass_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros +-- + +CREATE SEQUENCE diskjumble.verify_pass_verify_pass_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +ALTER TABLE diskjumble.verify_pass_verify_pass_id_seq OWNER TO eros; + +-- +-- Name: verify_pass_verify_pass_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros +-- + +ALTER SEQUENCE diskjumble.verify_pass_verify_pass_id_seq OWNED BY diskjumble.verify_pass.verify_pass_id; + + +-- +-- Name: verify_piece; Type: TABLE; Schema: diskjumble; Owner: eros +-- + +CREATE TABLE diskjumble.verify_piece ( + verify_id integer NOT NULL, + at timestamp with time zone, + entity_id bytea NOT NULL, + piece integer NOT NULL +); + + +ALTER TABLE diskjumble.verify_piece OWNER TO eros; + +-- +-- Name: verify_piece_content; Type: TABLE; Schema: diskjumble; Owner: eros +-- + +CREATE TABLE diskjumble.verify_piece_content ( + verify_id integer NOT NULL, + seq integer NOT NULL, + disk_id integer NOT NULL, + disk_sectors int8range NOT NULL +); + + +ALTER TABLE diskjumble.verify_piece_content OWNER TO eros; + +-- +-- Name: verify_piece_fail; Type: TABLE; Schema: diskjumble; Owner: eros +-- + +CREATE TABLE diskjumble.verify_piece_fail ( + verify_id integer NOT NULL +); + + +ALTER TABLE diskjumble.verify_piece_fail OWNER TO eros; + +-- +-- Name: verify_piece_incomplete; Type: TABLE; Schema: diskjumble; Owner: eros +-- + +CREATE TABLE diskjumble.verify_piece_incomplete ( + verify_id integer NOT NULL, + hasher_state json +); + + +ALTER TABLE diskjumble.verify_piece_incomplete OWNER TO eros; + +-- +-- Name: verify_piece_verify_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros +-- + +CREATE SEQUENCE diskjumble.verify_piece_verify_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +ALTER TABLE diskjumble.verify_piece_verify_id_seq OWNER TO eros; + +-- +-- Name: verify_piece_verify_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros +-- + +ALTER SEQUENCE diskjumble.verify_piece_verify_id_seq OWNED BY diskjumble.verify_piece.verify_id; + + +-- +-- Name: disk disk_id; Type: DEFAULT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.disk ALTER COLUMN disk_id SET DEFAULT nextval('diskjumble.disk_id_seq'::regclass); + + +-- +-- Name: slab slab_id; Type: DEFAULT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.slab ALTER COLUMN slab_id SET DEFAULT nextval('diskjumble.slab_id_seq'::regclass); + + +-- +-- Name: verify_pass verify_pass_id; Type: DEFAULT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.verify_pass ALTER COLUMN verify_pass_id SET DEFAULT nextval('diskjumble.verify_pass_verify_pass_id_seq'::regclass); + + +-- +-- Name: verify_piece verify_id; Type: DEFAULT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.verify_piece ALTER COLUMN verify_id SET DEFAULT nextval('diskjumble.verify_piece_verify_id_seq'::regclass); + + +-- +-- Name: disk disk_dev_uuid_key; Type: CONSTRAINT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.disk + ADD CONSTRAINT disk_dev_uuid_key UNIQUE (dev_uuid); + + +-- +-- Name: disk disk_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.disk + ADD CONSTRAINT disk_pkey PRIMARY KEY (disk_id); + + +-- +-- Name: slab slab_disk_id_disk_sectors_excl; Type: CONSTRAINT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.slab + ADD CONSTRAINT slab_disk_id_disk_sectors_excl EXCLUDE USING gist (disk_id WITH =, disk_sectors WITH &&); + + +-- +-- Name: slab slab_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.slab + ADD CONSTRAINT slab_pkey PRIMARY KEY (slab_id); + + +-- +-- Name: verify_pass verify_pass_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.verify_pass + ADD CONSTRAINT verify_pass_pkey PRIMARY KEY (verify_pass_id); + + +-- +-- Name: verify_piece_content verify_piece_content_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.verify_piece_content + ADD CONSTRAINT verify_piece_content_pkey PRIMARY KEY (verify_id, seq); + + +-- +-- Name: verify_piece_fail verify_piece_fail_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.verify_piece_fail + ADD CONSTRAINT verify_piece_fail_pkey PRIMARY KEY (verify_id); + + +-- +-- Name: verify_piece_incomplete verify_piece_incomplete_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.verify_piece_incomplete + ADD CONSTRAINT verify_piece_incomplete_pkey PRIMARY KEY (verify_id); + + +-- +-- Name: verify_piece verify_piece_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.verify_piece + ADD CONSTRAINT verify_piece_pkey PRIMARY KEY (verify_id); + + +-- +-- Name: torrent_info_digest_idx; Type: INDEX; Schema: bittorrent; Owner: eros +-- + +CREATE UNIQUE INDEX torrent_info_digest_idx ON bittorrent.torrent_info USING btree (public.digest(info, 'sha1'::text)); + + +-- +-- Name: slab slab_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.slab + ADD CONSTRAINT slab_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id); + + +-- +-- Name: verify_pass verify_pass_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.verify_pass + ADD CONSTRAINT verify_pass_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id); + + +-- +-- Name: verify_piece_content verify_piece_content_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.verify_piece_content + ADD CONSTRAINT verify_piece_content_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id); + + +-- +-- Name: verify_piece_content verify_piece_content_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.verify_piece_content + ADD CONSTRAINT verify_piece_content_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id); + + +-- +-- Name: verify_piece_fail verify_piece_fail_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.verify_piece_fail + ADD CONSTRAINT verify_piece_fail_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id); + + +-- +-- Name: verify_piece_incomplete verify_piece_incomplete_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros +-- + +ALTER TABLE ONLY diskjumble.verify_piece_incomplete + ADD CONSTRAINT verify_piece_incomplete_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id); + + +-- +-- PostgreSQL database dump complete +-- + diff --git a/disk_jumble/src/disk_jumble/trackers.py b/disk_jumble/src/disk_jumble/trackers.py new file mode 100644 index 0000000..ba65344 --- /dev/null +++ b/disk_jumble/src/disk_jumble/trackers.py @@ -0,0 +1,13 @@ +from typing import NamedTuple + + +class Tracker(NamedTuple): + gazelle_id: int + scrape_url_format: str + + +TRACKERS = { + "PTP": Tracker(1, "http://please.passthepopcorn.me:2710/{passkey}/scrape"), + "BTN": Tracker(3, "http://landof.tv/{passkey}/scrape"), + "Redacted": Tracker(4, "https://flacsfor.me/{passkey}/scrape"), +} diff --git a/disk_jumble/src/disk_jumble/verify.py b/disk_jumble/src/disk_jumble/verify.py new file mode 100644 index 0000000..0adbb03 --- /dev/null +++ b/disk_jumble/src/disk_jumble/verify.py @@ -0,0 +1,237 @@ +from __future__ import annotations +from abc import ABCMeta, abstractmethod +from dataclasses import dataclass +from typing import Optional +import argparse +import contextlib +import datetime as dt +import io +import itertools +import math + +import psycopg2 + +from disk_jumble import bencode +from disk_jumble.db import HasherRef, Slab, Wrapper as DbWrapper +from disk_jumble.nettle import Sha1Hasher + + +_READ_BUFFER_SIZE = 16 * 1024 ** 2 # in bytes + + +@dataclass +class _SlabChunk: + """A slice of a slab; comprising all or part of a piece to be hashed.""" + slab: Slab + slice: slice + + +@dataclass +class _PieceTask: + """The chunks needed to hash as fully as possible an entity piece.""" + entity_id: bytes + piece_num: int + hasher_ref: Optional[HasherRef] + chunks: list[_SlabChunk] + complete: bool # do these chunks complete the piece? + + +class _BadSector(Exception): + pass + + +def do_verify(conn, disk_id: int, disk_file: io.BufferedIOBase, read_size: int, read_tries: int) -> None: + db = DbWrapper(conn) + disk = db.get_disk(disk_id) + + info_dicts = { + info_hash: bencode.decode(info) + for (info_hash, info) in db.get_torrent_info(disk_id) + } + + tasks = [] + slabs_and_hashers = db.get_slabs_and_hashers(disk_id) + for (entity_id, group) in itertools.groupby(slabs_and_hashers, lambda t: t[0].entity_id): + info = info_dicts[entity_id] + piece_len = info[b"piece length"] + assert piece_len % disk.sector_size == 0 + if b"length" in info: + torrent_len = info[b"length"] + else: + torrent_len = sum(d[b"length"] for d in info[b"files"]) + + offset = None + use_hasher = None + chunks = [] + for (slab, hasher_ref) in group: + slab_end = min(slab.entity_offset + len(slab.sectors) * disk.sector_size, torrent_len) + + while offset is None or offset < slab_end: + if offset is not None and slab.entity_offset > offset: + if chunks: + tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, False)) + offset = None + use_hasher = None + chunks = [] + + if offset is None: + aligned = math.ceil(slab.entity_offset / piece_len) * piece_len + if hasher_ref and hasher_ref.entity_offset < aligned: + assert hasher_ref.entity_offset < torrent_len + use_hasher = hasher_ref + offset = hasher_ref.entity_offset + elif aligned < slab_end: + offset = aligned + else: + break # no usable data in this slab + + if offset is not None: + piece_end = min(offset + piece_len - offset % piece_len, torrent_len) + chunk_end = min(piece_end, slab_end) + chunks.append(_SlabChunk(slab, slice(offset - slab.entity_offset, chunk_end - slab.entity_offset))) + if chunk_end == piece_end: + tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, True)) + use_hasher = None + chunks = [] + offset = chunk_end + + if chunks: + tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, False)) + + @dataclass + class NewVerifyPiece: + entity_id: bytes + piece_num: int + sector_ranges: list[range] + hasher_state: Optional[dict] + failed: bool + + @dataclass + class VerifyUpdate: + seq_start: int + new_sector_ranges: list[range] + hasher_state: Optional[dict] + + passed_verifies = set() + failed_verifies = set() + new_pass_ranges = [] + vp_updates = {} + new_vps = [] + + run_ts = dt.datetime.now(dt.timezone.utc) + for task in tasks: + hasher = Sha1Hasher(task.hasher_ref.state if task.hasher_ref else None) + sector_ranges = [ + range( + chunk.slab.sectors.start + chunk.slice.start // disk.sector_size, + chunk.slab.sectors.start + math.ceil(chunk.slice.stop / disk.sector_size) + ) + for chunk in task.chunks + ] + + try: + for chunk in task.chunks: + slab_off = chunk.slab.sectors.start * disk.sector_size + disk_file.seek(slab_off + chunk.slice.start) + end_pos = slab_off + chunk.slice.stop + while disk_file.tell() < end_pos: + pos = disk_file.tell() + for _ in range(read_tries): + try: + data = disk_file.read(min(end_pos - pos, read_size)) + except OSError as e: + disk_file.seek(pos) + else: + break + else: + raise _BadSector() + + assert data + hasher.update(data) + except _BadSector: + if task.hasher_ref: + failed_verifies.add(task.hasher_ref.id) + vp_updates[task.hasher_ref.id] = VerifyUpdate(task.hasher_ref.seq + 1, sector_ranges, None) + else: + new_vps.append(NewVerifyPiece(task.entity_id, task.piece_num, sector_ranges, None, True)) + else: + hasher_state = hasher.ctx.serialize() + if task.complete: + s = slice(task.piece_num * 20, task.piece_num * 20 + 20) + expected_hash = info_dicts[task.entity_id][b"pieces"][s] + if hasher.digest() == expected_hash: + write_piece_data = False + new_pass_ranges.extend(sector_ranges) + if task.hasher_ref: + passed_verifies.add(task.hasher_ref.id) + else: + failed = True + write_piece_data = True + if task.hasher_ref: + failed_verifies.add(task.hasher_ref.id) + else: + failed = False + write_piece_data = True + + if write_piece_data: + if task.hasher_ref: + assert task.hasher_ref.id not in vp_updates + vp_updates[task.hasher_ref.id] = VerifyUpdate(task.hasher_ref.seq + 1, sector_ranges, hasher_state) + else: + new_vps.append(NewVerifyPiece(task.entity_id, task.piece_num, sector_ranges, hasher_state, failed)) + + new_pass_ranges.sort(key = lambda r: r.start) + merged_ranges = [] + for r in new_pass_ranges: + if merged_ranges and r.start == merged_ranges[-1].stop: + merged_ranges[-1] = range(merged_ranges[-1].start, r.stop) + else: + merged_ranges.append(r) + + for vp in new_vps: + verify_id = db.insert_verify_piece(run_ts, vp.entity_id, vp.piece_num) + db.insert_verify_piece_content(verify_id, 0, disk_id, vp.sector_ranges) + if vp.failed: + db.mark_verify_piece_failed(verify_id) + else: + db.upsert_hasher_state(verify_id, vp.hasher_state) + + for (verify_id, update) in vp_updates.items(): + db.insert_verify_piece_content(verify_id, update.seq_start, disk_id, update.new_sector_ranges) + if update.hasher_state: + db.upsert_hasher_state(verify_id, update.hasher_state) + + for verify_id in passed_verifies: + db.move_piece_content_for_pass(verify_id) + db.delete_verify_piece(verify_id) + + for r in merged_ranges: + db.insert_pass_data(run_ts, disk_id, r) + + for verify_id in failed_verifies: + db.clear_incomplete(verify_id) + db.mark_verify_piece_failed(verify_id) + + +if __name__ == "__main__": + def read_tries(raw_arg): + val = int(raw_arg) + if val > 0: + return val + else: + raise ValueError() + + parser = argparse.ArgumentParser() + parser.add_argument("disk_id", type = int) + parser.add_argument( + "read_tries", + type = read_tries, + help = "number of times to attempt a particular disk read before giving up on the sector", + ) + args = parser.parse_args() + + with contextlib.closing(psycopg2.connect("")) as conn: + conn.autocommit = True + path = f"/dev/mapper/diskjumble-{args.disk_id}" + with open(path, "rb", buffering = _READ_BUFFER_SIZE) as disk_file: + do_verify(conn, args.disk_id, disk_file, _READ_BUFFER_SIZE, args.read_tries) diff --git a/disk_jumble/test_util/dump_db.py b/disk_jumble/test_util/dump_db.py new file mode 100644 index 0000000..9551bf9 --- /dev/null +++ b/disk_jumble/test_util/dump_db.py @@ -0,0 +1,46 @@ +""" +Using the live database, dump creation code for extensions, tables, and functions needed for local testing + +For testing the verification script, write the output of this script to: + + src/disk_jumble/tests/verify_setup.sql +""" + +import contextlib +import itertools +import os +import subprocess + +import psycopg2 + + +procedures = [ + "public.digest(bytea, text)", +] + +extensions = [ + "btree_gist", +] + +with contextlib.closing(psycopg2.connect("")) as conn: + conn.autocommit = True + with conn.cursor() as cursor: + for ext in extensions: + print(f"create extension {ext};", flush = True) + for sig in procedures: + cursor.execute("select pg_get_functiondef(to_regprocedure(%s));", (sig,)) + [(sql,)] = cursor.fetchall() + print(sql + ";", flush = True) + +tables = [ + "diskjumble.disk", + "diskjumble.slab", + "diskjumble.verify_pass", + "diskjumble.verify_piece", + "diskjumble.verify_piece_content", + "diskjumble.verify_piece_fail", + "diskjumble.verify_piece_incomplete", + "bittorrent.torrent_info", +] +argv = ["pg_dump", *itertools.chain.from_iterable(["-t", table] for table in tables), "--schema-only", os.environ["PGDATABASE"]] +subprocess.run(argv, check = True) diff --git a/pyproject.toml b/pyproject.toml deleted file mode 100644 index 8fe2f47..0000000 --- a/pyproject.toml +++ /dev/null @@ -1,3 +0,0 @@ -[build-system] -requires = ["setuptools>=42", "wheel"] -build-backend = "setuptools.build_meta" diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 3fc8d70..0000000 --- a/setup.cfg +++ /dev/null @@ -1,11 +0,0 @@ -[metadata] -name = disk_jumble -version = 0.0.1 - -[options] -package_dir = - = src -packages = disk_jumble -python_requires ~= "3.7" -install_requires = - psycopg2-binary ~= 2.8 diff --git a/setup.py b/setup.py deleted file mode 100644 index b024da8..0000000 --- a/setup.py +++ /dev/null @@ -1,4 +0,0 @@ -from setuptools import setup - - -setup() diff --git a/src/disk_jumble/__init__.py b/src/disk_jumble/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/disk_jumble/bencode.py b/src/disk_jumble/bencode.py deleted file mode 100644 index 7883b53..0000000 --- a/src/disk_jumble/bencode.py +++ /dev/null @@ -1,124 +0,0 @@ -from __future__ import annotations -from typing import Union -import itertools - - -Bdict = dict[bytes, 'Type'] -Type = Union[bytes, int, list['Type'], Bdict] - - -class CodecError(Exception): - pass - - -def _pop_bytes(vals: list[bytes]) -> bytes: - len_parts = [] - while vals and vals[0].isdigit(): - len_parts.append(vals.pop(0)) - - try: - length = int(b"".join(len_parts).decode("ascii")) - except ValueError: - raise CodecError() - - try: - if vals.pop(0) != b":": - raise CodecError() - except IndexError: - raise CodecError() - - if length > len(vals): - raise CodecError() - - out = b"".join(vals[:length]) - del vals[:length] - return out - - -def _pop_int(vals: list[bytes]) -> int: - assert vals.pop(0) == b"i" - - try: - end = vals.index(b"e") - except ValueError: - raise CodecError() - - try: - out = int(b"".join(vals[:end]).decode("ascii")) - except ValueError: - raise CodecError() - - del vals[slice(end + 1)] - return out - - -def _pop_list(vals: list[bytes]) -> list[Type]: - assert vals.pop(0) == b"l" - - out = [] - while vals and vals[0] != b"e": - out.append(_pop_value(vals)) - - if vals: - del vals[0] - return out - else: - raise CodecError() - - -def _pop_dict(vals: list[bytes]) -> Bdict: - assert vals.pop(0) == b"d" - - out = {} - while vals and vals[0] != b"e": - key = _pop_bytes(vals) - out[key] = _pop_value(vals) - - if vals: - del vals[0] - return out - else: - raise CodecError() - - -def _pop_value(vals: list[bytes]) -> Type: - if vals: - if vals[0].isdigit(): - return _pop_bytes(vals) - elif vals[0] == b"i": - return _pop_int(vals) - elif vals[0] == b"l": - return _pop_list(vals) - elif vals[0] == b"d": - return _pop_dict(vals) - else: - raise CodecError() - else: - raise CodecError() - - -def decode(data: bytes) -> Type: - vals = [bytes([v]) for v in data] - out = _pop_value(vals) - if vals: - raise CodecError() - else: - return out - - -def _encode_helper(data: Type) -> list[bytes]: - if isinstance(data, bytes): - return [str(len(data)).encode("ascii"), b":", data] - elif isinstance(data, int): - return [b"i", str(data).encode("ascii"), b"e"] - elif isinstance(data, list): - return [b"l", *itertools.chain.from_iterable(map(_encode_helper, data)), b"e"] - elif isinstance(data, dict): - contents = itertools.chain.from_iterable(data.items()) - return [b"d", *itertools.chain.from_iterable(map(_encode_helper, contents)), b"e"] - else: - raise CodecError() - - -def encode(data: Type) -> bytes: - return b"".join(_encode_helper(data)) diff --git a/src/disk_jumble/db.py b/src/disk_jumble/db.py deleted file mode 100644 index 68488f0..0000000 --- a/src/disk_jumble/db.py +++ /dev/null @@ -1,223 +0,0 @@ -from __future__ import annotations -from dataclasses import dataclass -from typing import Any, Iterable, Optional -import datetime as dt -import itertools - -from psycopg2.extras import execute_batch, Json, NumericRange - - -@dataclass -class Disk: - sector_size: int - - -@dataclass -class Slab: - id: int - disk_id: int - sectors: range - entity_id: bytes - entity_offset: int - crypt_key: bytes - - -@dataclass -class HasherRef: - id: int - seq: int - entity_offset: int - state: dict - - -@dataclass -class Wrapper: - conn: Any - - def get_passkey(self, tracker_id: int) -> str: - with self.conn.cursor() as cursor: - cursor.execute("select passkey from gazelle.passkey where gazelle_tracker_id = %s;", (tracker_id,)) - [(passkey,)] = cursor.fetchall() - - return passkey - - def get_torrents(self, tracker_id: int, batch_size: Optional[int] = None) -> Iterable[bytes]: - """Iterate the info hashes for the specified tracker which haven't been marked deleted.""" - - stmt = """ - select infohash from gazelle.torrent - where gazelle_tracker_id = %s and not is_deleted - order by infohash asc - ; - """ - with self.conn.cursor() as cursor: - if batch_size is not None: - cursor.itersize = batch_size - - cursor.execute(stmt, (tracker_id,)) - for row in cursor: - (info_hash_mem,) = row - info_hash = bytes(info_hash_mem) - assert len(info_hash) == 20 - yield info_hash - - def insert_swarm_info(self, tracker_id: int, infos: Iterable["disk_jumble.scrape.ScrapeInfo"]) -> None: - stmt = """ - insert into gazelle.tracker_stat (gazelle_tracker_id, infohash, ts, complete, incomplete, downloaded) - values (%s, %s, %s, %s, %s, %s) - ; - """ - with self.conn.cursor() as cursor: - param_sets = [ - (tracker_id, i.info_hash, i.timestamp, i.complete, i.incomplete, i.downloaded) - for i in infos - ] - execute_batch(cursor, stmt, param_sets) - - def get_disk(self, id_: int) -> Disk: - with self.conn.cursor() as cursor: - cursor.execute("select sector_size from diskjumble.disk where disk_id = %s;", (id_,)) - [(sector_size,)] = cursor.fetchall() - - return Disk(sector_size) - - def get_slabs_and_hashers(self, disk_id: int) -> Iterable[tuple[Slab, Optional[HasherRef]]]: - """ - Find slabs on the specified disk, and for each also return any lowest-offset saved hasher that left off directly - before or within the slab's entity data. - """ - - stmt = """ - with - incomplete_edge as ( - -- join up incomplete piece info and precompute where the hasher left off within the entity - select - verify_id, seq, slab.entity_id, hasher_state, - entity_offset + (upper(c.disk_sectors) - lower(slab.disk_sectors)) * sector_size as end_off - from - diskjumble.verify_piece_incomplete - natural left join diskjumble.verify_piece p - natural join diskjumble.verify_piece_content c - natural left join diskjumble.disk - left join diskjumble.slab on ( - c.disk_id = slab.disk_id - and upper(c.disk_sectors) <@ int8range(lower(slab.disk_sectors), upper(slab.disk_sectors), '[]') - ) - where seq >= all (select seq from diskjumble.verify_piece_content where verify_id = p.verify_id) - ) - select - slab_id, disk_id, disk_sectors, slab.entity_id, entity_offset, crypt_key, verify_id, seq, end_off, - hasher_state - from - diskjumble.slab - natural left join diskjumble.disk - left join incomplete_edge on - incomplete_edge.entity_id = slab.entity_id - and incomplete_edge.end_off <@ int8range( - slab.entity_offset, - slab.entity_offset + (upper(disk_sectors) - lower(disk_sectors)) * sector_size - ) - and (incomplete_edge.end_off - slab.entity_offset) %% sector_size = 0 - where disk_id = %s - order by slab.entity_id, entity_offset, slab_id - ; - """ - with self.conn.cursor() as cursor: - cursor.execute(stmt, (disk_id,)) - for (_, rows_iter) in itertools.groupby(cursor, lambda r: r[0]): - rows = list(rows_iter) - [(slab_id, disk_id, sectors_pg, entity_id, entity_off, key)] = {r[:6] for r in rows} - sectors = range(sectors_pg.lower, sectors_pg.upper) - slab = Slab(slab_id, disk_id, sectors, bytes(entity_id), entity_off, key) - - # `None' if no hasher match in outer join, otherwise earliest match - (*_, id_, seq, end_off, state) = min(rows, key = lambda r: r[-2]) - hasher_ref = None if id_ is None else HasherRef(id_, seq, end_off, state) - - yield (slab, hasher_ref) - - def get_torrent_info(self, disk_id: int) -> Iterable[tuple[bytes, bytes]]: - stmt = """ - with hashed as ( - select digest(info, 'sha1') as info_hash, info - from bittorrent.torrent_info - ) - select - distinct on (info_hash) - info_hash, info - from diskjumble.slab left outer join hashed on entity_id = info_hash - where disk_id = %s - ; - """ - with self.conn.cursor() as cursor: - cursor.execute(stmt, (disk_id,)) - for (info_hash, info) in cursor: - yield (bytes(info_hash), bytes(info)) - - def insert_verify_piece(self, ts: dt.datetime, entity_id: bytes, piece_num: int) -> int: - """Insert new verify piece, returning the ID of the inserted row.""" - - with self.conn.cursor() as cursor: - stmt = "insert into diskjumble.verify_piece values (default, %s, %s, %s) returning verify_id;" - cursor.execute(stmt, (ts, entity_id, piece_num)) - [(row_id,)] = cursor.fetchall() - return row_id - - def insert_verify_piece_content(self, verify_id: int, seq_start: int, disk_id: int, ranges: Iterable[range]) -> None: - with self.conn.cursor() as cursor: - execute_batch( - cursor, - "insert into diskjumble.verify_piece_content values (%s, %s, %s, %s);", - [ - (verify_id, seq, disk_id, NumericRange(r.start, r.stop)) - for (seq, r) in enumerate(ranges, start = seq_start) - ] - ) - - def mark_verify_piece_failed(self, verify_id: int) -> None: - with self.conn.cursor() as cursor: - cursor.execute("insert into diskjumble.verify_piece_fail values (%s);", (verify_id,)) - - def upsert_hasher_state(self, verify_id: int, state: dict) -> None: - stmt = """ - insert into diskjumble.verify_piece_incomplete values (%s, %s) - on conflict (verify_id) do update set hasher_state = excluded.hasher_state - ; - """ - with self.conn.cursor() as cursor: - cursor.execute(stmt, (verify_id, Json(state))) - - def delete_verify_piece(self, verify_id: int) -> None: - with self.conn.cursor() as cursor: - cursor.execute("delete from diskjumble.verify_piece_incomplete where verify_id = %s;", (verify_id,)) - cursor.execute("delete from diskjumble.verify_piece_content where verify_id = %s;", (verify_id,)) - cursor.execute("delete from diskjumble.verify_piece where verify_id = %s", (verify_id,)) - - def move_piece_content_for_pass(self, verify_id: int) -> None: - stmt = """ - with content_out as ( - delete from diskjumble.verify_piece_content c - using diskjumble.verify_piece p - where ( - c.verify_id = p.verify_id - and p.verify_id = %s - ) - returning at, disk_id, disk_sectors - ) - insert into diskjumble.verify_pass (at, disk_id, disk_sectors) - select at, disk_id, disk_sectors from content_out - ; - """ - with self.conn.cursor() as cursor: - cursor.execute(stmt, (verify_id,)) - - def insert_pass_data(self, ts: dt.datetime, disk_id: int, sectors: range) -> None: - with self.conn.cursor() as cursor: - cursor.execute( - "insert into diskjumble.verify_pass values (default, %s, %s, %s);", - (ts, disk_id, NumericRange(sectors.start, sectors.stop)) - ) - - def clear_incomplete(self, verify_id: int) -> None: - with self.conn.cursor() as cursor: - cursor.execute("delete from diskjumble.verify_piece_incomplete where verify_id = %s;", (verify_id,)) diff --git a/src/disk_jumble/nettle.py b/src/disk_jumble/nettle.py deleted file mode 100644 index dfabbca..0000000 --- a/src/disk_jumble/nettle.py +++ /dev/null @@ -1,66 +0,0 @@ -"""Python wrappers for some of GnuTLS Nettle.""" - -from ctypes.util import find_library -from typing import Optional -import ctypes - - -_LIB = ctypes.CDLL(find_library("nettle")) - - -class _Sha1Defs: - _DIGEST_SIZE = 20 # in bytes - _BLOCK_SIZE = 64 # in bytes - _DIGEST_LENGTH = 5 - - _StateArr = ctypes.c_uint32 * _DIGEST_LENGTH - - _BlockArr = ctypes.c_uint8 * _BLOCK_SIZE - - -class Sha1Hasher(_Sha1Defs): - class Context(ctypes.Structure): - _fields_ = [ - ("state", _Sha1Defs._StateArr), - ("count", ctypes.c_uint64), - ("index", ctypes.c_uint), - ("block", _Sha1Defs._BlockArr), - ] - - @classmethod - def deserialize(cls, data): - return cls( - _Sha1Defs._StateArr(*data["state"]), - data["count"], - data["index"], - _Sha1Defs._BlockArr(*data["block"]), - ) - - def serialize(self): - return { - "state": list(self.state), - "count": self.count, - "index": self.index, - "block": list(self.block), - } - - @classmethod - def _new_context(cls): - ctx = cls.Context() - _LIB.nettle_sha1_init(ctypes.byref(ctx)) - return ctx - - def __init__(self, ctx_dict: Optional[dict]): - if ctx_dict: - self.ctx = self.Context.deserialize(ctx_dict) - else: - self.ctx = self._new_context() - - def update(self, data): - _LIB.nettle_sha1_update(ctypes.byref(self.ctx), len(data), data) - - def digest(self): - """Return the current digest and reset the hasher state.""" - out = (ctypes.c_uint8 * self._DIGEST_SIZE)() - _LIB.nettle_sha1_digest(ctypes.byref(self.ctx), self._DIGEST_SIZE, out) - return bytes(out) diff --git a/src/disk_jumble/scrape.py b/src/disk_jumble/scrape.py deleted file mode 100644 index 4143b8a..0000000 --- a/src/disk_jumble/scrape.py +++ /dev/null @@ -1,157 +0,0 @@ -from dataclasses import dataclass -from typing import Iterable, Union -from urllib.error import URLError -from urllib.parse import urlencode, urlparse, urlunparse -from urllib.request import HTTPHandler, HTTPRedirectHandler, HTTPSHandler, OpenerDirector, UnknownHandler -import argparse -import contextlib -import datetime as dt -import itertools -import re -import sys -import time - -import psycopg2 - -from disk_jumble import bencode -from disk_jumble.db import Wrapper as DbWrapper -from disk_jumble.trackers import Tracker, TRACKERS - - -class Result: - pass - - -@dataclass -class OkResult(Result): - data: bencode.Type - - -@dataclass -class ErrorResult(Result): - status: int - body: Union[bencode.Type, bytes] - - -@dataclass -class ScrapeInfo: - info_hash: bytes - timestamp: dt.datetime - complete: int - incomplete: int - downloaded: int - - -def scrape_batch(tracker: Tracker, info_hashes: Iterable[bytes], passkey: str) -> Result: - qs = urlencode({"info_hash": list(info_hashes)}, doseq = True) - url_parts = urlparse(tracker.scrape_url_format.format(passkey = passkey)) - assert not url_parts.query - url = urlunparse(url_parts._replace(query = qs)) - - # we handle HTTP errors ourself - opener = OpenerDirector() - for handler in [UnknownHandler(), HTTPHandler(), HTTPSHandler(), HTTPRedirectHandler()]: - opener.add_handler(handler) - - with opener.open(url) as resp: - body = resp.read() - - try: - data = bencode.decode(body) - except bencode.DecodeError: - data = body - else: - if resp.getcode() == 200 and b"files" in data: - return OkResult(data) - - return ErrorResult(resp.getcode(), data) - - -if __name__ == "__main__": - PSQL_PARAMS = ["dbname", "user", "password", "host", "port"] - - def tracker_name(val): - matches = [n for n in TRACKERS.keys() if n.lower() == val.lower()] - if matches: - [name] = matches - return name - else: - raise ValueError() - - def batch_size(val): - out = int(val) - if out > 0: - return out - else: - raise ValueError() - - def delay(val): - out = int(val) - if out >= 0: - return out - else: - raise ValueError() - - parser = argparse.ArgumentParser() - parser.add_argument( - "tracker", - choices = sorted(TRACKERS.keys()), - type = tracker_name, - help = "name of tracker to scrape", - ) - parser.add_argument( - "batch_size", - type = batch_size, - help = "number of torrents per batch", - ) - parser.add_argument( - "delay", - type = delay, - help = "delay between batches, in seconds", - ) - - for arg_name in PSQL_PARAMS: - parser.add_argument("--" + arg_name, nargs = "?") - - args = parser.parse_args() - tracker = TRACKERS[args.tracker] - delay = dt.timedelta(seconds = args.delay) - - params = {n: getattr(args, n) for n in PSQL_PARAMS if getattr(args, n)} - with contextlib.closing(psycopg2.connect(**params)) as conn: - conn.autocommit = True - db_wrapper = DbWrapper(conn) - passkey = db_wrapper.get_passkey(tracker.gazelle_id) - info_hashes = iter(db_wrapper.get_torrents(tracker.gazelle_id, args.batch_size)) - batches = iter(lambda: list(itertools.islice(info_hashes, args.batch_size)), []) - for (i, batch) in enumerate(batches): - if i != 0: - time.sleep(delay.total_seconds()) - timestamp = dt.datetime.now(dt.timezone.utc) - try: - result = scrape_batch(tracker, batch, passkey) - except URLError as e: - print("[ERR] couldn't connect: {}".format(e), file = sys.stderr, flush = True) - else: - if isinstance(result, OkResult): - assert set(result.data[b"files"].keys()) <= set(batch), "unexpected torrent in respose" - infos = [ - ScrapeInfo( - info_hash, - timestamp, - complete = meta_dict[b"complete"], - incomplete = meta_dict[b"incomplete"], - downloaded = meta_dict[b"downloaded"], - ) - for (info_hash, meta_dict) in result.data[b"files"].items() - ] - db_wrapper.insert_swarm_info(tracker.gazelle_id, infos) - print("[OK] finished batch ({} of {} torrents)".format(len(infos), len(batch)), file = sys.stderr, flush = True) - elif isinstance(result, ErrorResult): - full_disp = result.body.decode("ascii", "ignore") if isinstance(result.body, bytes) else str(result.body) - clean_disp = re.sub(r"\s", " ", full_disp) - display_size = 100 - disp = clean_disp if len(clean_disp) <= display_size else clean_disp[:display_size] + "…" - print("[ERR] tracker responded {}: {}".format(result.status, disp), file = sys.stderr, flush = True) - else: - raise Exception() diff --git a/src/disk_jumble/tests/__init__.py b/src/disk_jumble/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/disk_jumble/tests/test_verify.py b/src/disk_jumble/tests/test_verify.py deleted file mode 100644 index eea5007..0000000 --- a/src/disk_jumble/tests/test_verify.py +++ /dev/null @@ -1,451 +0,0 @@ -""" -Tests for the verification program `disk_jumble.verify' - -Like the verification program itself, these tests take database connection information from the environment. The -necessary schemas and tables are set up from scratch by the test code, so environment variables should point to a -database that's not hosting a live instance of Disk Jumble. Ideally, this is a completely empty local database created -for the purposes of testing, but other options are possible. - -The tests need access to an SQL source file containing the definitions for the required tables and other Postgres -objects; see `test_util/dump_db.py'. -""" - -from dataclasses import dataclass -from importlib import resources -from random import Random -from typing import Optional -import hashlib -import io -import tempfile -import unittest -import uuid - -from psycopg2.extras import NumericRange -import psycopg2 -import psycopg2.extras - -from disk_jumble import bencode -from disk_jumble.nettle import Sha1Hasher -from disk_jumble.verify import do_verify - - -_BUF_SIZE = 16 * 1024 ** 2 # in bytes - - -class Tests(unittest.TestCase): - _SCHEMAS = {"public", "diskjumble", "bittorrent"} - - def _basic_fresh_verify_helper(self, read_size): - sector_size = 32 - piece_size = 64 - - torrent_len = 3 * piece_size - disk = self._write_disk(sector_size, torrent_len // sector_size) - with _random_file(torrent_len, Random(0), on_disk = False) as torrent_file: - torrent = _Torrent(torrent_file, piece_size) - self._write_torrent(torrent) - with self._conn.cursor() as cursor: - cursor.execute( - "insert into diskjumble.slab values (default, %s, %s, %s, 0, null);", - (disk.id, NumericRange(0, disk.sector_count), torrent.info_hash) - ) - - do_verify(self._conn, disk.id, torrent_file, read_size, read_tries = 1) - - cursor.execute("select * from diskjumble.verify_pass;") - self.assertEqual(cursor.rowcount, 1) - (_, _, disk_id, sectors) = cursor.fetchone() - self.assertEqual(disk_id, disk.id) - self.assertEqual(sectors, NumericRange(0, torrent_len // sector_size)) - - def test_basic_fresh_verify_small_read_size(self): - self._basic_fresh_verify_helper(16) - - def test_basic_fresh_verify_large_read_size(self): - self._basic_fresh_verify_helper(128) - - def test_resume_fragmentation_unaligned_end(self): - """ - Test a run where a cached hash state is used, a piece is split on disk, and the end of the torrent isn't - sector-aligned. - """ - read_size = 8 - piece_size = 64 - - other_disk = self._write_disk(16, 1) - disk = self._write_disk(32, 3) - with _random_file(piece_size, Random(0), on_disk = False) as torrent_file: - torrent = _Torrent(torrent_file, piece_size) - self._write_torrent(torrent) - with self._conn.cursor() as cursor: - cursor.executemany( - "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);", - [ - (other_disk.id, NumericRange(0, 1), torrent.info_hash, 0), - (disk.id, NumericRange(0, 1), torrent.info_hash, other_disk.sector_size), - (disk.id, NumericRange(2, 3), torrent.info_hash, other_disk.sector_size + disk.sector_size), - ] - ) - - # Prepare the saved hasher state by running a verify - do_verify(self._conn, other_disk.id, torrent_file, read_size, read_tries = 1) - torrent_file.seek(0) - - cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;") - [(row_count,)] = cursor.fetchall() - self.assertEqual(row_count, 1) - - disk_file = io.BytesIO() - torrent_file.seek(other_disk.sector_size) - disk_file.write(torrent_file.read(disk.sector_size)) - disk_file.seek(disk_file.tell() + disk.sector_size) - disk_file.write(torrent_file.read()) - disk_file.seek(0) - do_verify(self._conn, disk.id, disk_file, read_size, read_tries = 1) - - # Check that there are no verify pieces in the database. Because of integrity constraints, this also - # guarantees there aren't any stray saved hasher states, failed verifies, or piece contents. - cursor.execute("select count(*) from diskjumble.verify_piece;") - [(row_count,)] = cursor.fetchall() - self.assertEqual(row_count, 0) - - cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;") - self.assertEqual( - cursor.fetchall(), - [(other_disk.id, NumericRange(0, 1)), (disk.id, NumericRange(0, 1)), (disk.id, NumericRange(2, 3))] - ) - - def test_resume_no_completion(self): - """ - Test a run where a saved hasher state is used and the target disk has subsequent entity data but not the full - remainder of the piece. - """ - read_size = 7 - piece_size = 64 - - other_disk = self._write_disk(16, 1) - disk = self._write_disk(32, 1) - with _random_file(piece_size, Random(0), on_disk = False) as torrent_file: - torrent = _Torrent(torrent_file, piece_size) - self._write_torrent(torrent) - with self._conn.cursor() as cursor: - cursor.executemany( - "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);", - [ - (other_disk.id, NumericRange(0, 1), torrent.info_hash, 0), - (disk.id, NumericRange(0, 1), torrent.info_hash, other_disk.sector_size), - ] - ) - - do_verify(self._conn, other_disk.id, torrent_file, read_size, read_tries = 1) - - cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;") - [(row_count,)] = cursor.fetchall() - self.assertEqual(row_count, 1) - - disk_file = io.BytesIO() - torrent_file.seek(other_disk.sector_size) - disk_file.write(torrent_file.read(disk.sector_size)) - disk_file.seek(0) - do_verify(self._conn, disk.id, disk_file, read_size, read_tries = 1) - - cursor.execute("select count(*) from diskjumble.verify_pass;") - [(row_count,)] = cursor.fetchall() - self.assertEqual(row_count, 0) - - cursor.execute("select entity_id, piece from diskjumble.verify_piece;") - [(entity_id, piece_num)] = cursor.fetchall() - self.assertEqual(bytes(entity_id), torrent.info_hash) - self.assertEqual(piece_num, 0) - - cursor.execute("select disk_id, disk_sectors from diskjumble.verify_piece_content;") - self.assertCountEqual( - cursor.fetchall(), - [(other_disk.id, NumericRange(0, 1)), (disk.id, NumericRange(0, 1))] - ) - - cursor.execute("select count(*) from diskjumble.verify_piece_fail;") - [(row_count,)] = cursor.fetchall() - self.assertEqual(row_count, 0) - - hasher = Sha1Hasher(None) - torrent_file.seek(0) - hasher.update(torrent_file.read(other_disk.sector_size + disk.sector_size)) - cursor.execute("select hasher_state from diskjumble.verify_piece_incomplete;") - self.assertEqual(cursor.fetchall(), [(hasher.ctx.serialize(),)]) - - def test_ignore_hasher_beginning_on_disk(self): - """ - Test a run where a saved hasher state is available for use but isn't used due to the beginning of the piece - being on disk. - """ - piece_size = 64 - - other_disk = self._write_disk(16, 1) - disk = self._write_disk(16, 4) - with _random_file(piece_size * 2, Random(0), on_disk = False) as torrent_file: - torrent = _Torrent(torrent_file, piece_size) - self._write_torrent(torrent) - with self._conn.cursor() as cursor: - cursor.executemany( - "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);", - [ - (other_disk.id, NumericRange(0, other_disk.sector_count), torrent.info_hash, piece_size), - (disk.id, NumericRange(0, disk.sector_count), torrent.info_hash, piece_size), - ] - ) - - do_verify(self._conn, other_disk.id, torrent_file, read_size = 128, read_tries = 1) - - cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;") - [(row_count,)] = cursor.fetchall() - self.assertEqual(row_count, 1) - - torrent_file.seek(piece_size) - disk_file = io.BytesIO(torrent_file.read()) - do_verify(self._conn, disk.id, disk_file, read_size = 128, read_tries = 1) - - cursor.execute( - "select disk_id from diskjumble.verify_piece_content natural join diskjumble.verify_piece_incomplete;" - ) - self.assertEqual(cursor.fetchall(), [(other_disk.id,)]) - - cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;") - self.assertEqual(cursor.fetchall(), [(disk.id, NumericRange(0, disk.sector_count))]) - - def test_ignore_hasher_unaligned(self): - """ - Test a run where a saved hasher isn't used because its entity data offset isn't sector-aligned on the target - disk. - - 0 16 32 48 64 80 96 112 128 - pieces: [-------------- 0 -------------] - other disk: [--][--][--][--][--] - disk: [------][------] - """ - piece_size = 128 - - other_disk = self._write_disk(16, 5) - disk = self._write_disk(32, 2) - with _random_file(piece_size, Random(0), on_disk = False) as torrent_file: - torrent = _Torrent(torrent_file, piece_size) - self._write_torrent(torrent) - with self._conn.cursor() as cursor: - cursor.executemany( - "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);", - [ - (other_disk.id, NumericRange(0, 5), torrent.info_hash, 0), - (disk.id, NumericRange(0, 2), torrent.info_hash, 64), - ] - ) - - do_verify(self._conn, other_disk.id, torrent_file, read_size = 16, read_tries = 1) - cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;") - [(row_count,)] = cursor.fetchall() - self.assertEqual(row_count, 1) - - disk_file = io.BytesIO(torrent_file.getvalue()[64:]) - do_verify(self._conn, disk.id, disk_file, read_size = 16, read_tries = 1) - - cursor.execute(""" - select disk_id, disk_sectors - from diskjumble.verify_piece_incomplete natural join diskjumble.verify_piece_content; - """) - self.assertEqual( - cursor.fetchall(), - [(other_disk.id, NumericRange(0, 5))] - ) - - cursor.execute("select count(*) from diskjumble.verify_pass;") - [(row_count,)] = cursor.fetchall() - self.assertEqual(row_count, 0) - - cursor.execute("select count(*) from diskjumble.verify_piece_fail;") - [(row_count,)] = cursor.fetchall() - self.assertEqual(row_count, 0) - - def test_transient_read_errors(self): - """ - Test a run where a read to the disk fails but fewer times than needed to mark the sector bad. - """ - piece_size = 32 - - disk = self._write_disk(32, 1) - with _random_file(piece_size, Random(0), on_disk = False) as torrent_file: - torrent = _Torrent(torrent_file, piece_size) - self._write_torrent(torrent) - with self._conn.cursor() as cursor: - cursor.execute( - "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);", - (disk.id, NumericRange(0, 1), torrent.info_hash, 0) - ) - - disk_file = _ReadErrorProxy(torrent_file, error_pos = 12, error_count = 2) - do_verify(self._conn, disk.id, disk_file, read_size = 4, read_tries = 3) - - self.assertEqual(disk_file.triggered, 2) - - cursor.execute("select count(*) from diskjumble.verify_piece;") - [(row_count,)] = cursor.fetchall() - self.assertEqual(row_count, 0) - - cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;") - self.assertEqual(cursor.fetchall(), [(disk.id, NumericRange(0, 1))]) - - def test_persistent_read_errors(self): - """ - Test a run where a disk read fails enough times to trigger the bad sector logic. - """ - piece_size = 64 - - other_a = self._write_disk(16, 1) - other_b = self._write_disk(16, 2) - disk = self._write_disk(16, 1) - with _random_file(piece_size, Random(0), on_disk = False) as torrent_file: - torrent = _Torrent(torrent_file, piece_size) - self._write_torrent(torrent) - with self._conn.cursor() as cursor: - cursor.executemany( - "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);", - [ - (other_a.id, NumericRange(0, 1), torrent.info_hash, 0), - (other_b.id, NumericRange(0, 2), torrent.info_hash, 16), - (disk.id, NumericRange(0, 1), torrent.info_hash, 48), - ] - ) - - do_verify(self._conn, other_a.id, torrent_file, read_size = 16, read_tries = 1) - other_b_file = io.BytesIO(torrent_file.getvalue()[16:48]) - do_verify(self._conn, other_b.id, other_b_file, read_size = 16, read_tries = 1) - - cursor.execute("select verify_id from diskjumble.verify_piece;") - [(verify_id,)] = cursor.fetchall() - - data = torrent_file.getvalue()[48:] - disk_file = _ReadErrorProxy(io.BytesIO(data), error_pos = 5, error_count = None) - do_verify(self._conn, disk.id, disk_file, read_size = 4, read_tries = 3) - - cursor.execute("select count(*) from diskjumble.verify_pass;") - [(row_count,)] = cursor.fetchall() - self.assertEqual(row_count, 0) - - cursor.execute("select disk_id, disk_sectors from diskjumble.verify_piece_content;") - self.assertCountEqual( - cursor.fetchall(), - [(other_a.id, NumericRange(0, 1)), (other_b.id, NumericRange(0, 2)), (disk.id, NumericRange(0, 1))] - ) - - cursor.execute("select verify_id from diskjumble.verify_piece_fail;") - self.assertEqual(cursor.fetchall(), [(verify_id,)]) - - cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;") - [(row_count,)] = cursor.fetchall() - self.assertEqual(row_count, 0) - - def _write_torrent(self, torrent: "_Torrent") -> None: - with self._conn.cursor() as cursor: - cursor.execute("insert into bittorrent.torrent_info values (%s);", (torrent.info,)) - - def _write_disk(self, sector_size: int, sector_count: int) -> "_Disk": - with self._conn.cursor() as cursor: - cursor.execute( - "insert into diskjumble.disk values (default, %s, null, %s, %s, false) returning disk_id;", - (uuid.uuid4(), sector_size, sector_count) - ) - [(id_,)] = cursor.fetchall() - return _Disk(id_, sector_size, sector_count) - - @classmethod - def setUpClass(cls): - psycopg2.extras.register_uuid() - prod_schema_sql = resources.files(__package__).joinpath("verify_setup.sql").read_text() - schema_sql = "\n".join( - l for l in prod_schema_sql.splitlines() - if ( - not l.startswith("SET") - and not l.startswith("SELECT") # ignore server settings - and "OWNER TO" not in l # and ownership changes - ) - ) - cls._conn = psycopg2.connect("") - with cls._conn, cls._conn.cursor() as cursor: - for name in cls._SCHEMAS: - cursor.execute(f"create schema {name};") - cursor.execute(schema_sql) - - @classmethod - def tearDownClass(self): - try: - with self._conn, self._conn.cursor() as cursor: - for name in self._SCHEMAS: - cursor.execute(f"drop schema {name} cascade;") - finally: - self._conn.close() - - def tearDown(self): - self._conn.rollback() - - -@dataclass -class _Disk: - id: int - sector_size: int - sector_count: int - - -class _Torrent: - def __init__(self, data: io.BufferedIOBase, piece_size: int) -> None: - data.seek(0) - hashes = [] - while True: - piece = data.read(piece_size) - if piece: - hashes.append(hashlib.sha1(piece).digest()) - else: - break - - info_dict = { - b"piece length": piece_size, - b"length": data.tell(), - b"pieces": b"".join(hashes), - } - self.data = data - self.info = bencode.encode(info_dict) - self.info_hash = hashlib.sha1(self.info).digest() - - -def _random_file(size: int, rand_src: Random, on_disk: bool) -> tempfile.NamedTemporaryFile: - f = tempfile.NamedTemporaryFile(buffering = _BUF_SIZE) if on_disk else io.BytesIO() - try: - while f.tell() < size: - write_size = min(size - f.tell(), _BUF_SIZE) - f.write(bytes(rand_src.getrandbits(8) for _ in range(write_size))) - f.seek(0) - return f - except Exception: - f.close() - raise - - -@dataclass -class _ReadErrorProxy(io.BufferedIOBase): - wrapped: io.BufferedIOBase - error_pos: int - error_count: Optional[int] - - def __post_init__(self): - self.triggered = 0 - - def read(self, size: int = -1) -> bytes: - pre_pos = self.wrapped.tell() - data = self.wrapped.read(size) - erroring = self.error_count is None or self.triggered < self.error_count - in_range = 0 <= pre_pos - self.error_pos < len(data) - if erroring and in_range: - self.triggered += 1 - raise OSError("simulated") - else: - return data - - def seek(self, *args, **kwargs) -> int: - return self.wrapped.seek(*args, **kwargs) diff --git a/src/disk_jumble/tests/verify_setup.sql b/src/disk_jumble/tests/verify_setup.sql deleted file mode 100644 index aa0c5fb..0000000 --- a/src/disk_jumble/tests/verify_setup.sql +++ /dev/null @@ -1,383 +0,0 @@ -create extension btree_gist; -CREATE OR REPLACE FUNCTION public.digest(bytea, text) - RETURNS bytea - LANGUAGE c - IMMUTABLE PARALLEL SAFE STRICT -AS '$libdir/pgcrypto', $function$pg_digest$function$ -; --- --- PostgreSQL database dump --- - --- Dumped from database version 13.2 (Debian 13.2-1.pgdg100+1) --- Dumped by pg_dump version 13.4 (Debian 13.4-0+deb11u1) - -SET statement_timeout = 0; -SET lock_timeout = 0; -SET idle_in_transaction_session_timeout = 0; -SET client_encoding = 'UTF8'; -SET standard_conforming_strings = on; -SELECT pg_catalog.set_config('search_path', '', false); -SET check_function_bodies = false; -SET xmloption = content; -SET client_min_messages = warning; -SET row_security = off; - -SET default_tablespace = ''; - -SET default_table_access_method = heap; - --- --- Name: torrent_info; Type: TABLE; Schema: bittorrent; Owner: eros --- - -CREATE TABLE bittorrent.torrent_info ( - info bytea NOT NULL -); - - -ALTER TABLE bittorrent.torrent_info OWNER TO eros; - --- --- Name: disk; Type: TABLE; Schema: diskjumble; Owner: eros --- - -CREATE TABLE diskjumble.disk ( - disk_id integer NOT NULL, - dev_uuid uuid NOT NULL, - dev_serial text, - sector_size integer NOT NULL, - num_sectors bigint NOT NULL, - failed boolean DEFAULT false NOT NULL -); - - -ALTER TABLE diskjumble.disk OWNER TO eros; - --- --- Name: disk_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros --- - -CREATE SEQUENCE diskjumble.disk_id_seq - AS integer - START WITH 1 - INCREMENT BY 1 - NO MINVALUE - NO MAXVALUE - CACHE 1; - - -ALTER TABLE diskjumble.disk_id_seq OWNER TO eros; - --- --- Name: disk_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros --- - -ALTER SEQUENCE diskjumble.disk_id_seq OWNED BY diskjumble.disk.disk_id; - - --- --- Name: slab; Type: TABLE; Schema: diskjumble; Owner: eros --- - -CREATE TABLE diskjumble.slab ( - slab_id integer NOT NULL, - disk_id integer NOT NULL, - disk_sectors int8range NOT NULL, - entity_id bytea NOT NULL, - entity_offset bigint NOT NULL, - crypt_key bytea -); - - -ALTER TABLE diskjumble.slab OWNER TO eros; - --- --- Name: slab_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros --- - -CREATE SEQUENCE diskjumble.slab_id_seq - START WITH 1 - INCREMENT BY 1 - NO MINVALUE - NO MAXVALUE - CACHE 1; - - -ALTER TABLE diskjumble.slab_id_seq OWNER TO eros; - --- --- Name: slab_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros --- - -ALTER SEQUENCE diskjumble.slab_id_seq OWNED BY diskjumble.slab.slab_id; - - --- --- Name: verify_pass; Type: TABLE; Schema: diskjumble; Owner: eros --- - -CREATE TABLE diskjumble.verify_pass ( - verify_pass_id integer NOT NULL, - at timestamp with time zone, - disk_id integer NOT NULL, - disk_sectors int8range NOT NULL -); - - -ALTER TABLE diskjumble.verify_pass OWNER TO eros; - --- --- Name: verify_pass_verify_pass_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros --- - -CREATE SEQUENCE diskjumble.verify_pass_verify_pass_id_seq - AS integer - START WITH 1 - INCREMENT BY 1 - NO MINVALUE - NO MAXVALUE - CACHE 1; - - -ALTER TABLE diskjumble.verify_pass_verify_pass_id_seq OWNER TO eros; - --- --- Name: verify_pass_verify_pass_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros --- - -ALTER SEQUENCE diskjumble.verify_pass_verify_pass_id_seq OWNED BY diskjumble.verify_pass.verify_pass_id; - - --- --- Name: verify_piece; Type: TABLE; Schema: diskjumble; Owner: eros --- - -CREATE TABLE diskjumble.verify_piece ( - verify_id integer NOT NULL, - at timestamp with time zone, - entity_id bytea NOT NULL, - piece integer NOT NULL -); - - -ALTER TABLE diskjumble.verify_piece OWNER TO eros; - --- --- Name: verify_piece_content; Type: TABLE; Schema: diskjumble; Owner: eros --- - -CREATE TABLE diskjumble.verify_piece_content ( - verify_id integer NOT NULL, - seq integer NOT NULL, - disk_id integer NOT NULL, - disk_sectors int8range NOT NULL -); - - -ALTER TABLE diskjumble.verify_piece_content OWNER TO eros; - --- --- Name: verify_piece_fail; Type: TABLE; Schema: diskjumble; Owner: eros --- - -CREATE TABLE diskjumble.verify_piece_fail ( - verify_id integer NOT NULL -); - - -ALTER TABLE diskjumble.verify_piece_fail OWNER TO eros; - --- --- Name: verify_piece_incomplete; Type: TABLE; Schema: diskjumble; Owner: eros --- - -CREATE TABLE diskjumble.verify_piece_incomplete ( - verify_id integer NOT NULL, - hasher_state json -); - - -ALTER TABLE diskjumble.verify_piece_incomplete OWNER TO eros; - --- --- Name: verify_piece_verify_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros --- - -CREATE SEQUENCE diskjumble.verify_piece_verify_id_seq - AS integer - START WITH 1 - INCREMENT BY 1 - NO MINVALUE - NO MAXVALUE - CACHE 1; - - -ALTER TABLE diskjumble.verify_piece_verify_id_seq OWNER TO eros; - --- --- Name: verify_piece_verify_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros --- - -ALTER SEQUENCE diskjumble.verify_piece_verify_id_seq OWNED BY diskjumble.verify_piece.verify_id; - - --- --- Name: disk disk_id; Type: DEFAULT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.disk ALTER COLUMN disk_id SET DEFAULT nextval('diskjumble.disk_id_seq'::regclass); - - --- --- Name: slab slab_id; Type: DEFAULT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.slab ALTER COLUMN slab_id SET DEFAULT nextval('diskjumble.slab_id_seq'::regclass); - - --- --- Name: verify_pass verify_pass_id; Type: DEFAULT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.verify_pass ALTER COLUMN verify_pass_id SET DEFAULT nextval('diskjumble.verify_pass_verify_pass_id_seq'::regclass); - - --- --- Name: verify_piece verify_id; Type: DEFAULT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.verify_piece ALTER COLUMN verify_id SET DEFAULT nextval('diskjumble.verify_piece_verify_id_seq'::regclass); - - --- --- Name: disk disk_dev_uuid_key; Type: CONSTRAINT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.disk - ADD CONSTRAINT disk_dev_uuid_key UNIQUE (dev_uuid); - - --- --- Name: disk disk_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.disk - ADD CONSTRAINT disk_pkey PRIMARY KEY (disk_id); - - --- --- Name: slab slab_disk_id_disk_sectors_excl; Type: CONSTRAINT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.slab - ADD CONSTRAINT slab_disk_id_disk_sectors_excl EXCLUDE USING gist (disk_id WITH =, disk_sectors WITH &&); - - --- --- Name: slab slab_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.slab - ADD CONSTRAINT slab_pkey PRIMARY KEY (slab_id); - - --- --- Name: verify_pass verify_pass_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.verify_pass - ADD CONSTRAINT verify_pass_pkey PRIMARY KEY (verify_pass_id); - - --- --- Name: verify_piece_content verify_piece_content_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.verify_piece_content - ADD CONSTRAINT verify_piece_content_pkey PRIMARY KEY (verify_id, seq); - - --- --- Name: verify_piece_fail verify_piece_fail_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.verify_piece_fail - ADD CONSTRAINT verify_piece_fail_pkey PRIMARY KEY (verify_id); - - --- --- Name: verify_piece_incomplete verify_piece_incomplete_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.verify_piece_incomplete - ADD CONSTRAINT verify_piece_incomplete_pkey PRIMARY KEY (verify_id); - - --- --- Name: verify_piece verify_piece_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.verify_piece - ADD CONSTRAINT verify_piece_pkey PRIMARY KEY (verify_id); - - --- --- Name: torrent_info_digest_idx; Type: INDEX; Schema: bittorrent; Owner: eros --- - -CREATE UNIQUE INDEX torrent_info_digest_idx ON bittorrent.torrent_info USING btree (public.digest(info, 'sha1'::text)); - - --- --- Name: slab slab_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.slab - ADD CONSTRAINT slab_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id); - - --- --- Name: verify_pass verify_pass_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.verify_pass - ADD CONSTRAINT verify_pass_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id); - - --- --- Name: verify_piece_content verify_piece_content_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.verify_piece_content - ADD CONSTRAINT verify_piece_content_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id); - - --- --- Name: verify_piece_content verify_piece_content_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.verify_piece_content - ADD CONSTRAINT verify_piece_content_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id); - - --- --- Name: verify_piece_fail verify_piece_fail_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.verify_piece_fail - ADD CONSTRAINT verify_piece_fail_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id); - - --- --- Name: verify_piece_incomplete verify_piece_incomplete_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros --- - -ALTER TABLE ONLY diskjumble.verify_piece_incomplete - ADD CONSTRAINT verify_piece_incomplete_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id); - - --- --- PostgreSQL database dump complete --- - diff --git a/src/disk_jumble/trackers.py b/src/disk_jumble/trackers.py deleted file mode 100644 index ba65344..0000000 --- a/src/disk_jumble/trackers.py +++ /dev/null @@ -1,13 +0,0 @@ -from typing import NamedTuple - - -class Tracker(NamedTuple): - gazelle_id: int - scrape_url_format: str - - -TRACKERS = { - "PTP": Tracker(1, "http://please.passthepopcorn.me:2710/{passkey}/scrape"), - "BTN": Tracker(3, "http://landof.tv/{passkey}/scrape"), - "Redacted": Tracker(4, "https://flacsfor.me/{passkey}/scrape"), -} diff --git a/src/disk_jumble/verify.py b/src/disk_jumble/verify.py deleted file mode 100644 index 0adbb03..0000000 --- a/src/disk_jumble/verify.py +++ /dev/null @@ -1,237 +0,0 @@ -from __future__ import annotations -from abc import ABCMeta, abstractmethod -from dataclasses import dataclass -from typing import Optional -import argparse -import contextlib -import datetime as dt -import io -import itertools -import math - -import psycopg2 - -from disk_jumble import bencode -from disk_jumble.db import HasherRef, Slab, Wrapper as DbWrapper -from disk_jumble.nettle import Sha1Hasher - - -_READ_BUFFER_SIZE = 16 * 1024 ** 2 # in bytes - - -@dataclass -class _SlabChunk: - """A slice of a slab; comprising all or part of a piece to be hashed.""" - slab: Slab - slice: slice - - -@dataclass -class _PieceTask: - """The chunks needed to hash as fully as possible an entity piece.""" - entity_id: bytes - piece_num: int - hasher_ref: Optional[HasherRef] - chunks: list[_SlabChunk] - complete: bool # do these chunks complete the piece? - - -class _BadSector(Exception): - pass - - -def do_verify(conn, disk_id: int, disk_file: io.BufferedIOBase, read_size: int, read_tries: int) -> None: - db = DbWrapper(conn) - disk = db.get_disk(disk_id) - - info_dicts = { - info_hash: bencode.decode(info) - for (info_hash, info) in db.get_torrent_info(disk_id) - } - - tasks = [] - slabs_and_hashers = db.get_slabs_and_hashers(disk_id) - for (entity_id, group) in itertools.groupby(slabs_and_hashers, lambda t: t[0].entity_id): - info = info_dicts[entity_id] - piece_len = info[b"piece length"] - assert piece_len % disk.sector_size == 0 - if b"length" in info: - torrent_len = info[b"length"] - else: - torrent_len = sum(d[b"length"] for d in info[b"files"]) - - offset = None - use_hasher = None - chunks = [] - for (slab, hasher_ref) in group: - slab_end = min(slab.entity_offset + len(slab.sectors) * disk.sector_size, torrent_len) - - while offset is None or offset < slab_end: - if offset is not None and slab.entity_offset > offset: - if chunks: - tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, False)) - offset = None - use_hasher = None - chunks = [] - - if offset is None: - aligned = math.ceil(slab.entity_offset / piece_len) * piece_len - if hasher_ref and hasher_ref.entity_offset < aligned: - assert hasher_ref.entity_offset < torrent_len - use_hasher = hasher_ref - offset = hasher_ref.entity_offset - elif aligned < slab_end: - offset = aligned - else: - break # no usable data in this slab - - if offset is not None: - piece_end = min(offset + piece_len - offset % piece_len, torrent_len) - chunk_end = min(piece_end, slab_end) - chunks.append(_SlabChunk(slab, slice(offset - slab.entity_offset, chunk_end - slab.entity_offset))) - if chunk_end == piece_end: - tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, True)) - use_hasher = None - chunks = [] - offset = chunk_end - - if chunks: - tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, False)) - - @dataclass - class NewVerifyPiece: - entity_id: bytes - piece_num: int - sector_ranges: list[range] - hasher_state: Optional[dict] - failed: bool - - @dataclass - class VerifyUpdate: - seq_start: int - new_sector_ranges: list[range] - hasher_state: Optional[dict] - - passed_verifies = set() - failed_verifies = set() - new_pass_ranges = [] - vp_updates = {} - new_vps = [] - - run_ts = dt.datetime.now(dt.timezone.utc) - for task in tasks: - hasher = Sha1Hasher(task.hasher_ref.state if task.hasher_ref else None) - sector_ranges = [ - range( - chunk.slab.sectors.start + chunk.slice.start // disk.sector_size, - chunk.slab.sectors.start + math.ceil(chunk.slice.stop / disk.sector_size) - ) - for chunk in task.chunks - ] - - try: - for chunk in task.chunks: - slab_off = chunk.slab.sectors.start * disk.sector_size - disk_file.seek(slab_off + chunk.slice.start) - end_pos = slab_off + chunk.slice.stop - while disk_file.tell() < end_pos: - pos = disk_file.tell() - for _ in range(read_tries): - try: - data = disk_file.read(min(end_pos - pos, read_size)) - except OSError as e: - disk_file.seek(pos) - else: - break - else: - raise _BadSector() - - assert data - hasher.update(data) - except _BadSector: - if task.hasher_ref: - failed_verifies.add(task.hasher_ref.id) - vp_updates[task.hasher_ref.id] = VerifyUpdate(task.hasher_ref.seq + 1, sector_ranges, None) - else: - new_vps.append(NewVerifyPiece(task.entity_id, task.piece_num, sector_ranges, None, True)) - else: - hasher_state = hasher.ctx.serialize() - if task.complete: - s = slice(task.piece_num * 20, task.piece_num * 20 + 20) - expected_hash = info_dicts[task.entity_id][b"pieces"][s] - if hasher.digest() == expected_hash: - write_piece_data = False - new_pass_ranges.extend(sector_ranges) - if task.hasher_ref: - passed_verifies.add(task.hasher_ref.id) - else: - failed = True - write_piece_data = True - if task.hasher_ref: - failed_verifies.add(task.hasher_ref.id) - else: - failed = False - write_piece_data = True - - if write_piece_data: - if task.hasher_ref: - assert task.hasher_ref.id not in vp_updates - vp_updates[task.hasher_ref.id] = VerifyUpdate(task.hasher_ref.seq + 1, sector_ranges, hasher_state) - else: - new_vps.append(NewVerifyPiece(task.entity_id, task.piece_num, sector_ranges, hasher_state, failed)) - - new_pass_ranges.sort(key = lambda r: r.start) - merged_ranges = [] - for r in new_pass_ranges: - if merged_ranges and r.start == merged_ranges[-1].stop: - merged_ranges[-1] = range(merged_ranges[-1].start, r.stop) - else: - merged_ranges.append(r) - - for vp in new_vps: - verify_id = db.insert_verify_piece(run_ts, vp.entity_id, vp.piece_num) - db.insert_verify_piece_content(verify_id, 0, disk_id, vp.sector_ranges) - if vp.failed: - db.mark_verify_piece_failed(verify_id) - else: - db.upsert_hasher_state(verify_id, vp.hasher_state) - - for (verify_id, update) in vp_updates.items(): - db.insert_verify_piece_content(verify_id, update.seq_start, disk_id, update.new_sector_ranges) - if update.hasher_state: - db.upsert_hasher_state(verify_id, update.hasher_state) - - for verify_id in passed_verifies: - db.move_piece_content_for_pass(verify_id) - db.delete_verify_piece(verify_id) - - for r in merged_ranges: - db.insert_pass_data(run_ts, disk_id, r) - - for verify_id in failed_verifies: - db.clear_incomplete(verify_id) - db.mark_verify_piece_failed(verify_id) - - -if __name__ == "__main__": - def read_tries(raw_arg): - val = int(raw_arg) - if val > 0: - return val - else: - raise ValueError() - - parser = argparse.ArgumentParser() - parser.add_argument("disk_id", type = int) - parser.add_argument( - "read_tries", - type = read_tries, - help = "number of times to attempt a particular disk read before giving up on the sector", - ) - args = parser.parse_args() - - with contextlib.closing(psycopg2.connect("")) as conn: - conn.autocommit = True - path = f"/dev/mapper/diskjumble-{args.disk_id}" - with open(path, "rb", buffering = _READ_BUFFER_SIZE) as disk_file: - do_verify(conn, args.disk_id, disk_file, _READ_BUFFER_SIZE, args.read_tries) diff --git a/test_util/dump_db.py b/test_util/dump_db.py deleted file mode 100644 index 9551bf9..0000000 --- a/test_util/dump_db.py +++ /dev/null @@ -1,46 +0,0 @@ -""" -Using the live database, dump creation code for extensions, tables, and functions needed for local testing - -For testing the verification script, write the output of this script to: - - src/disk_jumble/tests/verify_setup.sql -""" - -import contextlib -import itertools -import os -import subprocess - -import psycopg2 - - -procedures = [ - "public.digest(bytea, text)", -] - -extensions = [ - "btree_gist", -] - -with contextlib.closing(psycopg2.connect("")) as conn: - conn.autocommit = True - with conn.cursor() as cursor: - for ext in extensions: - print(f"create extension {ext};", flush = True) - for sig in procedures: - cursor.execute("select pg_get_functiondef(to_regprocedure(%s));", (sig,)) - [(sql,)] = cursor.fetchall() - print(sql + ";", flush = True) - -tables = [ - "diskjumble.disk", - "diskjumble.slab", - "diskjumble.verify_pass", - "diskjumble.verify_piece", - "diskjumble.verify_piece_content", - "diskjumble.verify_piece_fail", - "diskjumble.verify_piece_incomplete", - "bittorrent.torrent_info", -] -argv = ["pg_dump", *itertools.chain.from_iterable(["-t", table] for table in tables), "--schema-only", os.environ["PGDATABASE"]] -subprocess.run(argv, check = True)