Demote Disk Jumble tree to subdirectory for EROS repo merge
authorJakob Cornell <jakob+gpg@jcornell.net>
Thu, 20 Jan 2022 01:53:11 +0000 (19:53 -0600)
committerJakob Cornell <jakob+gpg@jcornell.net>
Thu, 20 Jan 2022 01:53:11 +0000 (19:53 -0600)
28 files changed:
disk_jumble/pyproject.toml [new file with mode: 0644]
disk_jumble/setup.cfg [new file with mode: 0644]
disk_jumble/setup.py [new file with mode: 0644]
disk_jumble/src/disk_jumble/__init__.py [new file with mode: 0644]
disk_jumble/src/disk_jumble/bencode.py [new file with mode: 0644]
disk_jumble/src/disk_jumble/db.py [new file with mode: 0644]
disk_jumble/src/disk_jumble/nettle.py [new file with mode: 0644]
disk_jumble/src/disk_jumble/scrape.py [new file with mode: 0644]
disk_jumble/src/disk_jumble/tests/__init__.py [new file with mode: 0644]
disk_jumble/src/disk_jumble/tests/test_verify.py [new file with mode: 0644]
disk_jumble/src/disk_jumble/tests/verify_setup.sql [new file with mode: 0644]
disk_jumble/src/disk_jumble/trackers.py [new file with mode: 0644]
disk_jumble/src/disk_jumble/verify.py [new file with mode: 0644]
disk_jumble/test_util/dump_db.py [new file with mode: 0644]
pyproject.toml [deleted file]
setup.cfg [deleted file]
setup.py [deleted file]
src/disk_jumble/__init__.py [deleted file]
src/disk_jumble/bencode.py [deleted file]
src/disk_jumble/db.py [deleted file]
src/disk_jumble/nettle.py [deleted file]
src/disk_jumble/scrape.py [deleted file]
src/disk_jumble/tests/__init__.py [deleted file]
src/disk_jumble/tests/test_verify.py [deleted file]
src/disk_jumble/tests/verify_setup.sql [deleted file]
src/disk_jumble/trackers.py [deleted file]
src/disk_jumble/verify.py [deleted file]
test_util/dump_db.py [deleted file]

diff --git a/disk_jumble/pyproject.toml b/disk_jumble/pyproject.toml
new file mode 100644 (file)
index 0000000..8fe2f47
--- /dev/null
@@ -0,0 +1,3 @@
+[build-system]
+requires = ["setuptools>=42", "wheel"]
+build-backend = "setuptools.build_meta"
diff --git a/disk_jumble/setup.cfg b/disk_jumble/setup.cfg
new file mode 100644 (file)
index 0000000..3fc8d70
--- /dev/null
@@ -0,0 +1,11 @@
+[metadata]
+name = disk_jumble
+version = 0.0.1
+
+[options]
+package_dir =
+       = src
+packages = disk_jumble
+python_requires ~= "3.7"
+install_requires =
+       psycopg2-binary ~= 2.8
diff --git a/disk_jumble/setup.py b/disk_jumble/setup.py
new file mode 100644 (file)
index 0000000..b024da8
--- /dev/null
@@ -0,0 +1,4 @@
+from setuptools import setup
+
+
+setup()
diff --git a/disk_jumble/src/disk_jumble/__init__.py b/disk_jumble/src/disk_jumble/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/disk_jumble/src/disk_jumble/bencode.py b/disk_jumble/src/disk_jumble/bencode.py
new file mode 100644 (file)
index 0000000..7883b53
--- /dev/null
@@ -0,0 +1,124 @@
+from __future__ import annotations
+from typing import Union
+import itertools
+
+
+Bdict = dict[bytes, 'Type']
+Type = Union[bytes, int, list['Type'], Bdict]
+
+
+class CodecError(Exception):
+       pass
+
+
+def _pop_bytes(vals: list[bytes]) -> bytes:
+       len_parts = []
+       while vals and vals[0].isdigit():
+               len_parts.append(vals.pop(0))
+
+       try:
+               length = int(b"".join(len_parts).decode("ascii"))
+       except ValueError:
+               raise CodecError()
+
+       try:
+               if vals.pop(0) != b":":
+                       raise CodecError()
+       except IndexError:
+               raise CodecError()
+
+       if length > len(vals):
+               raise CodecError()
+
+       out = b"".join(vals[:length])
+       del vals[:length]
+       return out
+
+
+def _pop_int(vals: list[bytes]) -> int:
+       assert vals.pop(0) == b"i"
+
+       try:
+               end = vals.index(b"e")
+       except ValueError:
+               raise CodecError()
+
+       try:
+               out = int(b"".join(vals[:end]).decode("ascii"))
+       except ValueError:
+               raise CodecError()
+
+       del vals[slice(end + 1)]
+       return out
+
+
+def _pop_list(vals: list[bytes]) -> list[Type]:
+       assert vals.pop(0) == b"l"
+
+       out = []
+       while vals and vals[0] != b"e":
+               out.append(_pop_value(vals))
+
+       if vals:
+               del vals[0]
+               return out
+       else:
+               raise CodecError()
+
+
+def _pop_dict(vals: list[bytes]) -> Bdict:
+       assert vals.pop(0) == b"d"
+
+       out = {}
+       while vals and vals[0] != b"e":
+               key = _pop_bytes(vals)
+               out[key] = _pop_value(vals)
+
+       if vals:
+               del vals[0]
+               return out
+       else:
+               raise CodecError()
+
+
+def _pop_value(vals: list[bytes]) -> Type:
+       if vals:
+               if vals[0].isdigit():
+                       return _pop_bytes(vals)
+               elif vals[0] == b"i":
+                       return _pop_int(vals)
+               elif vals[0] == b"l":
+                       return _pop_list(vals)
+               elif vals[0] == b"d":
+                       return _pop_dict(vals)
+               else:
+                       raise CodecError()
+       else:
+               raise CodecError()
+
+
+def decode(data: bytes) -> Type:
+       vals = [bytes([v]) for v in data]
+       out = _pop_value(vals)
+       if vals:
+               raise CodecError()
+       else:
+               return out
+
+
+def _encode_helper(data: Type) -> list[bytes]:
+       if isinstance(data, bytes):
+               return [str(len(data)).encode("ascii"), b":", data]
+       elif isinstance(data, int):
+               return [b"i", str(data).encode("ascii"), b"e"]
+       elif isinstance(data, list):
+               return [b"l", *itertools.chain.from_iterable(map(_encode_helper, data)), b"e"]
+       elif isinstance(data, dict):
+               contents = itertools.chain.from_iterable(data.items())
+               return [b"d", *itertools.chain.from_iterable(map(_encode_helper, contents)), b"e"]
+       else:
+               raise CodecError()
+
+
+def encode(data: Type) -> bytes:
+       return b"".join(_encode_helper(data))
diff --git a/disk_jumble/src/disk_jumble/db.py b/disk_jumble/src/disk_jumble/db.py
new file mode 100644 (file)
index 0000000..68488f0
--- /dev/null
@@ -0,0 +1,223 @@
+from __future__ import annotations
+from dataclasses import dataclass
+from typing import Any, Iterable, Optional
+import datetime as dt
+import itertools
+
+from psycopg2.extras import execute_batch, Json, NumericRange
+
+
+@dataclass
+class Disk:
+       sector_size: int
+
+
+@dataclass
+class Slab:
+       id: int
+       disk_id: int
+       sectors: range
+       entity_id: bytes
+       entity_offset: int
+       crypt_key: bytes
+
+
+@dataclass
+class HasherRef:
+       id: int
+       seq: int
+       entity_offset: int
+       state: dict
+
+
+@dataclass
+class Wrapper:
+       conn: Any
+
+       def get_passkey(self, tracker_id: int) -> str:
+               with self.conn.cursor() as cursor:
+                       cursor.execute("select passkey from gazelle.passkey where gazelle_tracker_id = %s;", (tracker_id,))
+                       [(passkey,)] = cursor.fetchall()
+
+               return passkey
+
+       def get_torrents(self, tracker_id: int, batch_size: Optional[int] = None) -> Iterable[bytes]:
+               """Iterate the info hashes for the specified tracker which haven't been marked deleted."""
+
+               stmt = """
+                       select infohash from gazelle.torrent
+                       where gazelle_tracker_id = %s and not is_deleted
+                       order by infohash asc
+                       ;
+               """
+               with self.conn.cursor() as cursor:
+                       if batch_size is not None:
+                               cursor.itersize = batch_size
+
+                       cursor.execute(stmt, (tracker_id,))
+                       for row in cursor:
+                               (info_hash_mem,) = row
+                               info_hash = bytes(info_hash_mem)
+                               assert len(info_hash) == 20
+                               yield info_hash
+
+       def insert_swarm_info(self, tracker_id: int, infos: Iterable["disk_jumble.scrape.ScrapeInfo"]) -> None:
+               stmt = """
+                       insert into gazelle.tracker_stat (gazelle_tracker_id, infohash, ts, complete, incomplete, downloaded)
+                       values (%s, %s, %s, %s, %s, %s)
+                       ;
+               """
+               with self.conn.cursor() as cursor:
+                       param_sets = [
+                               (tracker_id, i.info_hash, i.timestamp, i.complete, i.incomplete, i.downloaded)
+                               for i in infos
+                       ]
+                       execute_batch(cursor, stmt, param_sets)
+
+       def get_disk(self, id_: int) -> Disk:
+               with self.conn.cursor() as cursor:
+                       cursor.execute("select sector_size from diskjumble.disk where disk_id = %s;", (id_,))
+                       [(sector_size,)] = cursor.fetchall()
+
+               return Disk(sector_size)
+
+       def get_slabs_and_hashers(self, disk_id: int) -> Iterable[tuple[Slab, Optional[HasherRef]]]:
+               """
+               Find slabs on the specified disk, and for each also return any lowest-offset saved hasher that left off directly
+               before or within the slab's entity data.
+               """
+
+               stmt = """
+                       with
+                               incomplete_edge as (
+                                       -- join up incomplete piece info and precompute where the hasher left off within the entity
+                                       select
+                                               verify_id, seq, slab.entity_id, hasher_state,
+                                               entity_offset + (upper(c.disk_sectors) - lower(slab.disk_sectors)) * sector_size as end_off
+                                       from
+                                               diskjumble.verify_piece_incomplete
+                                               natural left join diskjumble.verify_piece p
+                                               natural join diskjumble.verify_piece_content c
+                                               natural left join diskjumble.disk
+                                               left join diskjumble.slab on (
+                                                       c.disk_id = slab.disk_id
+                                                       and upper(c.disk_sectors) <@ int8range(lower(slab.disk_sectors), upper(slab.disk_sectors), '[]')
+                                               )
+                                       where seq >= all (select seq from diskjumble.verify_piece_content where verify_id = p.verify_id)
+                               )
+                       select
+                               slab_id, disk_id, disk_sectors, slab.entity_id, entity_offset, crypt_key, verify_id, seq, end_off,
+                               hasher_state
+                       from
+                               diskjumble.slab
+                               natural left join diskjumble.disk
+                               left join incomplete_edge on
+                                       incomplete_edge.entity_id = slab.entity_id
+                                       and incomplete_edge.end_off <@ int8range(
+                                               slab.entity_offset,
+                                               slab.entity_offset + (upper(disk_sectors) - lower(disk_sectors)) * sector_size
+                                       )
+                                       and (incomplete_edge.end_off - slab.entity_offset) %% sector_size = 0
+                       where disk_id = %s
+                       order by slab.entity_id, entity_offset, slab_id
+                       ;
+               """
+               with self.conn.cursor() as cursor:
+                       cursor.execute(stmt, (disk_id,))
+                       for (_, rows_iter) in itertools.groupby(cursor, lambda r: r[0]):
+                               rows = list(rows_iter)
+                               [(slab_id, disk_id, sectors_pg, entity_id, entity_off, key)] = {r[:6] for r in rows}
+                               sectors = range(sectors_pg.lower, sectors_pg.upper)
+                               slab = Slab(slab_id, disk_id, sectors, bytes(entity_id), entity_off, key)
+
+                               # `None' if no hasher match in outer join, otherwise earliest match
+                               (*_, id_, seq, end_off, state) = min(rows, key = lambda r: r[-2])
+                               hasher_ref = None if id_ is None else HasherRef(id_, seq, end_off, state)
+
+                               yield (slab, hasher_ref)
+
+       def get_torrent_info(self, disk_id: int) -> Iterable[tuple[bytes, bytes]]:
+               stmt = """
+                       with hashed as (
+                               select digest(info, 'sha1') as info_hash, info
+                               from bittorrent.torrent_info
+                       )
+                       select
+                               distinct on (info_hash)
+                               info_hash, info
+                               from diskjumble.slab left outer join hashed on entity_id = info_hash
+                               where disk_id = %s
+                       ;
+               """
+               with self.conn.cursor() as cursor:
+                       cursor.execute(stmt, (disk_id,))
+                       for (info_hash, info) in cursor:
+                               yield (bytes(info_hash), bytes(info))
+
+       def insert_verify_piece(self, ts: dt.datetime, entity_id: bytes, piece_num: int) -> int:
+               """Insert new verify piece, returning the ID of the inserted row."""
+
+               with self.conn.cursor() as cursor:
+                       stmt = "insert into diskjumble.verify_piece values (default, %s, %s, %s) returning verify_id;"
+                       cursor.execute(stmt, (ts, entity_id, piece_num))
+                       [(row_id,)] = cursor.fetchall()
+               return row_id
+
+       def insert_verify_piece_content(self, verify_id: int, seq_start: int, disk_id: int, ranges: Iterable[range]) -> None:
+               with self.conn.cursor() as cursor:
+                       execute_batch(
+                               cursor,
+                               "insert into diskjumble.verify_piece_content values (%s, %s, %s, %s);",
+                               [
+                                       (verify_id, seq, disk_id, NumericRange(r.start, r.stop))
+                                       for (seq, r) in enumerate(ranges, start = seq_start)
+                               ]
+                       )
+
+       def mark_verify_piece_failed(self, verify_id: int) -> None:
+               with self.conn.cursor() as cursor:
+                       cursor.execute("insert into diskjumble.verify_piece_fail values (%s);", (verify_id,))
+
+       def upsert_hasher_state(self, verify_id: int, state: dict) -> None:
+               stmt = """
+                       insert into diskjumble.verify_piece_incomplete values (%s, %s)
+                       on conflict (verify_id) do update set hasher_state = excluded.hasher_state
+                       ;
+               """
+               with self.conn.cursor() as cursor:
+                       cursor.execute(stmt, (verify_id, Json(state)))
+
+       def delete_verify_piece(self, verify_id: int) -> None:
+               with self.conn.cursor() as cursor:
+                       cursor.execute("delete from diskjumble.verify_piece_incomplete where verify_id = %s;", (verify_id,))
+                       cursor.execute("delete from diskjumble.verify_piece_content where verify_id = %s;", (verify_id,))
+                       cursor.execute("delete from diskjumble.verify_piece where verify_id = %s", (verify_id,))
+
+       def move_piece_content_for_pass(self, verify_id: int) -> None:
+               stmt = """
+                       with content_out as (
+                               delete from diskjumble.verify_piece_content c
+                               using diskjumble.verify_piece p
+                               where (
+                                       c.verify_id = p.verify_id
+                                       and p.verify_id = %s
+                               )
+                               returning at, disk_id, disk_sectors
+                       )
+                       insert into diskjumble.verify_pass (at, disk_id, disk_sectors)
+                       select at, disk_id, disk_sectors from content_out
+                       ;
+               """
+               with self.conn.cursor() as cursor:
+                       cursor.execute(stmt, (verify_id,))
+
+       def insert_pass_data(self, ts: dt.datetime, disk_id: int, sectors: range) -> None:
+               with self.conn.cursor() as cursor:
+                       cursor.execute(
+                               "insert into diskjumble.verify_pass values (default, %s, %s, %s);",
+                               (ts, disk_id, NumericRange(sectors.start, sectors.stop))
+                       )
+
+       def clear_incomplete(self, verify_id: int) -> None:
+               with self.conn.cursor() as cursor:
+                       cursor.execute("delete from diskjumble.verify_piece_incomplete where verify_id = %s;", (verify_id,))
diff --git a/disk_jumble/src/disk_jumble/nettle.py b/disk_jumble/src/disk_jumble/nettle.py
new file mode 100644 (file)
index 0000000..dfabbca
--- /dev/null
@@ -0,0 +1,66 @@
+"""Python wrappers for some of GnuTLS Nettle."""
+
+from ctypes.util import find_library
+from typing import Optional
+import ctypes
+
+
+_LIB = ctypes.CDLL(find_library("nettle"))
+
+
+class _Sha1Defs:
+       _DIGEST_SIZE = 20  # in bytes
+       _BLOCK_SIZE = 64  # in bytes
+       _DIGEST_LENGTH = 5
+
+       _StateArr = ctypes.c_uint32 * _DIGEST_LENGTH
+
+       _BlockArr = ctypes.c_uint8 * _BLOCK_SIZE
+
+
+class Sha1Hasher(_Sha1Defs):
+       class Context(ctypes.Structure):
+               _fields_ = [
+                       ("state", _Sha1Defs._StateArr),
+                       ("count", ctypes.c_uint64),
+                       ("index", ctypes.c_uint),
+                       ("block", _Sha1Defs._BlockArr),
+               ]
+
+               @classmethod
+               def deserialize(cls, data):
+                       return cls(
+                               _Sha1Defs._StateArr(*data["state"]),
+                               data["count"],
+                               data["index"],
+                               _Sha1Defs._BlockArr(*data["block"]),
+                       )
+
+               def serialize(self):
+                       return {
+                               "state": list(self.state),
+                               "count": self.count,
+                               "index": self.index,
+                               "block": list(self.block),
+                       }
+
+       @classmethod
+       def _new_context(cls):
+               ctx = cls.Context()
+               _LIB.nettle_sha1_init(ctypes.byref(ctx))
+               return ctx
+
+       def __init__(self, ctx_dict: Optional[dict]):
+               if ctx_dict:
+                       self.ctx = self.Context.deserialize(ctx_dict)
+               else:
+                       self.ctx = self._new_context()
+
+       def update(self, data):
+               _LIB.nettle_sha1_update(ctypes.byref(self.ctx), len(data), data)
+
+       def digest(self):
+               """Return the current digest and reset the hasher state."""
+               out = (ctypes.c_uint8 * self._DIGEST_SIZE)()
+               _LIB.nettle_sha1_digest(ctypes.byref(self.ctx), self._DIGEST_SIZE, out)
+               return bytes(out)
diff --git a/disk_jumble/src/disk_jumble/scrape.py b/disk_jumble/src/disk_jumble/scrape.py
new file mode 100644 (file)
index 0000000..4143b8a
--- /dev/null
@@ -0,0 +1,157 @@
+from dataclasses import dataclass
+from typing import Iterable, Union
+from urllib.error import URLError
+from urllib.parse import urlencode, urlparse, urlunparse
+from urllib.request import HTTPHandler, HTTPRedirectHandler, HTTPSHandler, OpenerDirector, UnknownHandler
+import argparse
+import contextlib
+import datetime as dt
+import itertools
+import re
+import sys
+import time
+
+import psycopg2
+
+from disk_jumble import bencode
+from disk_jumble.db import Wrapper as DbWrapper
+from disk_jumble.trackers import Tracker, TRACKERS
+
+
+class Result:
+       pass
+
+
+@dataclass
+class OkResult(Result):
+       data: bencode.Type
+
+
+@dataclass
+class ErrorResult(Result):
+       status: int
+       body: Union[bencode.Type, bytes]
+
+
+@dataclass
+class ScrapeInfo:
+       info_hash: bytes
+       timestamp: dt.datetime
+       complete: int
+       incomplete: int
+       downloaded: int
+
+
+def scrape_batch(tracker: Tracker, info_hashes: Iterable[bytes], passkey: str) -> Result:
+       qs = urlencode({"info_hash": list(info_hashes)}, doseq = True)
+       url_parts = urlparse(tracker.scrape_url_format.format(passkey = passkey))
+       assert not url_parts.query
+       url = urlunparse(url_parts._replace(query = qs))
+
+       # we handle HTTP errors ourself
+       opener = OpenerDirector()
+       for handler in [UnknownHandler(), HTTPHandler(), HTTPSHandler(), HTTPRedirectHandler()]:
+               opener.add_handler(handler)
+
+       with opener.open(url) as resp:
+               body = resp.read()
+
+       try:
+               data = bencode.decode(body)
+       except bencode.DecodeError:
+               data = body
+       else:
+               if resp.getcode() == 200 and b"files" in data:
+                       return OkResult(data)
+
+       return ErrorResult(resp.getcode(), data)
+
+
+if __name__ == "__main__":
+       PSQL_PARAMS = ["dbname", "user", "password", "host", "port"]
+
+       def tracker_name(val):
+               matches = [n for n in TRACKERS.keys() if n.lower() == val.lower()]
+               if matches:
+                       [name] = matches
+                       return name
+               else:
+                       raise ValueError()
+
+       def batch_size(val):
+               out = int(val)
+               if out > 0:
+                       return out
+               else:
+                       raise ValueError()
+
+       def delay(val):
+               out = int(val)
+               if out >= 0:
+                       return out
+               else:
+                       raise ValueError()
+
+       parser = argparse.ArgumentParser()
+       parser.add_argument(
+               "tracker",
+               choices = sorted(TRACKERS.keys()),
+               type = tracker_name,
+               help = "name of tracker to scrape",
+       )
+       parser.add_argument(
+               "batch_size",
+               type = batch_size,
+               help = "number of torrents per batch",
+       )
+       parser.add_argument(
+               "delay",
+               type = delay,
+               help = "delay between batches, in seconds",
+       )
+
+       for arg_name in PSQL_PARAMS:
+               parser.add_argument("--" + arg_name, nargs = "?")
+
+       args = parser.parse_args()
+       tracker = TRACKERS[args.tracker]
+       delay = dt.timedelta(seconds = args.delay)
+
+       params = {n: getattr(args, n) for n in PSQL_PARAMS if getattr(args, n)}
+       with contextlib.closing(psycopg2.connect(**params)) as conn:
+               conn.autocommit = True
+               db_wrapper = DbWrapper(conn)
+               passkey = db_wrapper.get_passkey(tracker.gazelle_id)
+               info_hashes = iter(db_wrapper.get_torrents(tracker.gazelle_id, args.batch_size))
+               batches = iter(lambda: list(itertools.islice(info_hashes, args.batch_size)), [])
+               for (i, batch) in enumerate(batches):
+                       if i != 0:
+                               time.sleep(delay.total_seconds())
+                       timestamp = dt.datetime.now(dt.timezone.utc)
+                       try:
+                               result = scrape_batch(tracker, batch, passkey)
+                       except URLError as e:
+                               print("[ERR] couldn't connect: {}".format(e), file = sys.stderr, flush = True)
+                       else:
+                               if isinstance(result, OkResult):
+                                       assert set(result.data[b"files"].keys()) <= set(batch), "unexpected torrent in respose"
+                                       infos = [
+                                               ScrapeInfo(
+                                                       info_hash,
+                                                       timestamp,
+                                                       complete = meta_dict[b"complete"],
+                                                       incomplete = meta_dict[b"incomplete"],
+                                                       downloaded = meta_dict[b"downloaded"],
+                                               )
+                                               for (info_hash, meta_dict) in result.data[b"files"].items()
+                                       ]
+                                       db_wrapper.insert_swarm_info(tracker.gazelle_id, infos)
+                                       print("[OK] finished batch ({} of {} torrents)".format(len(infos), len(batch)), file = sys.stderr, flush = True)
+                               elif isinstance(result, ErrorResult):
+                                       full_disp = result.body.decode("ascii", "ignore") if isinstance(result.body, bytes) else str(result.body)
+                                       clean_disp = re.sub(r"\s", " ", full_disp)
+                                       display_size = 100
+                                       disp = clean_disp if len(clean_disp) <= display_size else clean_disp[:display_size] + "…"
+                                       print("[ERR] tracker responded {}: {}".format(result.status, disp), file = sys.stderr, flush = True)
+                               else:
+                                       raise Exception()
diff --git a/disk_jumble/src/disk_jumble/tests/__init__.py b/disk_jumble/src/disk_jumble/tests/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/disk_jumble/src/disk_jumble/tests/test_verify.py b/disk_jumble/src/disk_jumble/tests/test_verify.py
new file mode 100644 (file)
index 0000000..eea5007
--- /dev/null
@@ -0,0 +1,451 @@
+"""
+Tests for the verification program `disk_jumble.verify'
+
+Like the verification program itself, these tests take database connection information from the environment.  The
+necessary schemas and tables are set up from scratch by the test code, so environment variables should point to a
+database that's not hosting a live instance of Disk Jumble.  Ideally, this is a completely empty local database created
+for the purposes of testing, but other options are possible.
+
+The tests need access to an SQL source file containing the definitions for the required tables and other Postgres
+objects; see `test_util/dump_db.py'.
+"""
+
+from dataclasses import dataclass
+from importlib import resources
+from random import Random
+from typing import Optional
+import hashlib
+import io
+import tempfile
+import unittest
+import uuid
+
+from psycopg2.extras import NumericRange
+import psycopg2
+import psycopg2.extras
+
+from disk_jumble import bencode
+from disk_jumble.nettle import Sha1Hasher
+from disk_jumble.verify import do_verify
+
+
+_BUF_SIZE = 16 * 1024 ** 2  # in bytes
+
+
+class Tests(unittest.TestCase):
+       _SCHEMAS = {"public", "diskjumble", "bittorrent"}
+
+       def _basic_fresh_verify_helper(self, read_size):
+               sector_size = 32
+               piece_size = 64
+
+               torrent_len = 3 * piece_size
+               disk = self._write_disk(sector_size, torrent_len // sector_size)
+               with _random_file(torrent_len, Random(0), on_disk = False) as torrent_file:
+                       torrent = _Torrent(torrent_file, piece_size)
+                       self._write_torrent(torrent)
+                       with self._conn.cursor() as cursor:
+                               cursor.execute(
+                                       "insert into diskjumble.slab values (default, %s, %s, %s, 0, null);",
+                                       (disk.id, NumericRange(0, disk.sector_count), torrent.info_hash)
+                               )
+
+                               do_verify(self._conn, disk.id, torrent_file, read_size, read_tries = 1)
+
+                               cursor.execute("select * from diskjumble.verify_pass;")
+                               self.assertEqual(cursor.rowcount, 1)
+                               (_, _, disk_id, sectors) = cursor.fetchone()
+                               self.assertEqual(disk_id, disk.id)
+                               self.assertEqual(sectors, NumericRange(0, torrent_len // sector_size))
+
+       def test_basic_fresh_verify_small_read_size(self):
+               self._basic_fresh_verify_helper(16)
+
+       def test_basic_fresh_verify_large_read_size(self):
+               self._basic_fresh_verify_helper(128)
+
+       def test_resume_fragmentation_unaligned_end(self):
+               """
+               Test a run where a cached hash state is used, a piece is split on disk, and the end of the torrent isn't
+               sector-aligned.
+               """
+               read_size = 8
+               piece_size = 64
+
+               other_disk = self._write_disk(16, 1)
+               disk = self._write_disk(32, 3)
+               with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
+                       torrent = _Torrent(torrent_file, piece_size)
+                       self._write_torrent(torrent)
+                       with self._conn.cursor() as cursor:
+                               cursor.executemany(
+                                       "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
+                                       [
+                                               (other_disk.id, NumericRange(0, 1), torrent.info_hash, 0),
+                                               (disk.id, NumericRange(0, 1), torrent.info_hash, other_disk.sector_size),
+                                               (disk.id, NumericRange(2, 3), torrent.info_hash, other_disk.sector_size + disk.sector_size),
+                                       ]
+                               )
+
+                               # Prepare the saved hasher state by running a verify
+                               do_verify(self._conn, other_disk.id, torrent_file, read_size, read_tries = 1)
+                               torrent_file.seek(0)
+
+                               cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
+                               [(row_count,)] = cursor.fetchall()
+                               self.assertEqual(row_count, 1)
+
+                               disk_file = io.BytesIO()
+                               torrent_file.seek(other_disk.sector_size)
+                               disk_file.write(torrent_file.read(disk.sector_size))
+                               disk_file.seek(disk_file.tell() + disk.sector_size)
+                               disk_file.write(torrent_file.read())
+                               disk_file.seek(0)
+                               do_verify(self._conn, disk.id, disk_file, read_size, read_tries = 1)
+
+                               # Check that there are no verify pieces in the database.  Because of integrity constraints, this also
+                               # guarantees there aren't any stray saved hasher states, failed verifies, or piece contents.
+                               cursor.execute("select count(*) from diskjumble.verify_piece;")
+                               [(row_count,)] = cursor.fetchall()
+                               self.assertEqual(row_count, 0)
+
+                               cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;")
+                               self.assertEqual(
+                                       cursor.fetchall(),
+                                       [(other_disk.id, NumericRange(0, 1)), (disk.id, NumericRange(0, 1)), (disk.id, NumericRange(2, 3))]
+                               )
+
+       def test_resume_no_completion(self):
+               """
+               Test a run where a saved hasher state is used and the target disk has subsequent entity data but not the full
+               remainder of the piece.
+               """
+               read_size = 7
+               piece_size = 64
+
+               other_disk = self._write_disk(16, 1)
+               disk = self._write_disk(32, 1)
+               with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
+                       torrent = _Torrent(torrent_file, piece_size)
+                       self._write_torrent(torrent)
+                       with self._conn.cursor() as cursor:
+                               cursor.executemany(
+                                       "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
+                                       [
+                                               (other_disk.id, NumericRange(0, 1), torrent.info_hash, 0),
+                                               (disk.id, NumericRange(0, 1), torrent.info_hash, other_disk.sector_size),
+                                       ]
+                               )
+
+                               do_verify(self._conn, other_disk.id, torrent_file, read_size, read_tries = 1)
+
+                               cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
+                               [(row_count,)] = cursor.fetchall()
+                               self.assertEqual(row_count, 1)
+
+                               disk_file = io.BytesIO()
+                               torrent_file.seek(other_disk.sector_size)
+                               disk_file.write(torrent_file.read(disk.sector_size))
+                               disk_file.seek(0)
+                               do_verify(self._conn, disk.id, disk_file, read_size, read_tries = 1)
+
+                               cursor.execute("select count(*) from diskjumble.verify_pass;")
+                               [(row_count,)] = cursor.fetchall()
+                               self.assertEqual(row_count, 0)
+
+                               cursor.execute("select entity_id, piece from diskjumble.verify_piece;")
+                               [(entity_id, piece_num)] = cursor.fetchall()
+                               self.assertEqual(bytes(entity_id), torrent.info_hash)
+                               self.assertEqual(piece_num, 0)
+
+                               cursor.execute("select disk_id, disk_sectors from diskjumble.verify_piece_content;")
+                               self.assertCountEqual(
+                                       cursor.fetchall(),
+                                       [(other_disk.id, NumericRange(0, 1)), (disk.id, NumericRange(0, 1))]
+                               )
+
+                               cursor.execute("select count(*) from diskjumble.verify_piece_fail;")
+                               [(row_count,)] = cursor.fetchall()
+                               self.assertEqual(row_count, 0)
+
+                               hasher = Sha1Hasher(None)
+                               torrent_file.seek(0)
+                               hasher.update(torrent_file.read(other_disk.sector_size + disk.sector_size))
+                               cursor.execute("select hasher_state from diskjumble.verify_piece_incomplete;")
+                               self.assertEqual(cursor.fetchall(), [(hasher.ctx.serialize(),)])
+
+       def test_ignore_hasher_beginning_on_disk(self):
+               """
+               Test a run where a saved hasher state is available for use but isn't used due to the beginning of the piece
+               being on disk.
+               """
+               piece_size = 64
+
+               other_disk = self._write_disk(16, 1)
+               disk = self._write_disk(16, 4)
+               with _random_file(piece_size * 2, Random(0), on_disk = False) as torrent_file:
+                       torrent = _Torrent(torrent_file, piece_size)
+                       self._write_torrent(torrent)
+                       with self._conn.cursor() as cursor:
+                               cursor.executemany(
+                                       "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
+                                       [
+                                               (other_disk.id, NumericRange(0, other_disk.sector_count), torrent.info_hash, piece_size),
+                                               (disk.id, NumericRange(0, disk.sector_count), torrent.info_hash, piece_size),
+                                       ]
+                               )
+
+                               do_verify(self._conn, other_disk.id, torrent_file, read_size = 128, read_tries = 1)
+
+                               cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
+                               [(row_count,)] = cursor.fetchall()
+                               self.assertEqual(row_count, 1)
+
+                               torrent_file.seek(piece_size)
+                               disk_file = io.BytesIO(torrent_file.read())
+                               do_verify(self._conn, disk.id, disk_file, read_size = 128, read_tries = 1)
+
+                               cursor.execute(
+                                       "select disk_id from diskjumble.verify_piece_content natural join diskjumble.verify_piece_incomplete;"
+                               )
+                               self.assertEqual(cursor.fetchall(), [(other_disk.id,)])
+
+                               cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;")
+                               self.assertEqual(cursor.fetchall(), [(disk.id, NumericRange(0, disk.sector_count))])
+
+       def test_ignore_hasher_unaligned(self):
+               """
+               Test a run where a saved hasher isn't used because its entity data offset isn't sector-aligned on the target
+               disk.
+
+                           0   16  32  48  64  80  96  112 128
+               pieces:     [-------------- 0 -------------]
+               other disk: [--][--][--][--][--]
+               disk:                       [------][------]
+               """
+               piece_size = 128
+
+               other_disk = self._write_disk(16, 5)
+               disk = self._write_disk(32, 2)
+               with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
+                       torrent = _Torrent(torrent_file, piece_size)
+                       self._write_torrent(torrent)
+                       with self._conn.cursor() as cursor:
+                               cursor.executemany(
+                                       "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
+                                       [
+                                               (other_disk.id, NumericRange(0, 5), torrent.info_hash, 0),
+                                               (disk.id, NumericRange(0, 2), torrent.info_hash, 64),
+                                       ]
+                               )
+
+                               do_verify(self._conn, other_disk.id, torrent_file, read_size = 16, read_tries = 1)
+                               cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
+                               [(row_count,)] = cursor.fetchall()
+                               self.assertEqual(row_count, 1)
+
+                               disk_file = io.BytesIO(torrent_file.getvalue()[64:])
+                               do_verify(self._conn, disk.id, disk_file, read_size = 16, read_tries = 1)
+
+                               cursor.execute("""
+                                       select disk_id, disk_sectors
+                                       from diskjumble.verify_piece_incomplete natural join diskjumble.verify_piece_content;
+                               """)
+                               self.assertEqual(
+                                       cursor.fetchall(),
+                                       [(other_disk.id, NumericRange(0, 5))]
+                               )
+
+                               cursor.execute("select count(*) from diskjumble.verify_pass;")
+                               [(row_count,)] = cursor.fetchall()
+                               self.assertEqual(row_count, 0)
+
+                               cursor.execute("select count(*) from diskjumble.verify_piece_fail;")
+                               [(row_count,)] = cursor.fetchall()
+                               self.assertEqual(row_count, 0)
+
+       def test_transient_read_errors(self):
+               """
+               Test a run where a read to the disk fails but fewer times than needed to mark the sector bad.
+               """
+               piece_size = 32
+
+               disk = self._write_disk(32, 1)
+               with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
+                       torrent = _Torrent(torrent_file, piece_size)
+                       self._write_torrent(torrent)
+                       with self._conn.cursor() as cursor:
+                               cursor.execute(
+                                       "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
+                                       (disk.id, NumericRange(0, 1), torrent.info_hash, 0)
+                               )
+
+                               disk_file = _ReadErrorProxy(torrent_file, error_pos = 12, error_count = 2)
+                               do_verify(self._conn, disk.id, disk_file, read_size = 4, read_tries = 3)
+
+                               self.assertEqual(disk_file.triggered, 2)
+
+                               cursor.execute("select count(*) from diskjumble.verify_piece;")
+                               [(row_count,)] = cursor.fetchall()
+                               self.assertEqual(row_count, 0)
+
+                               cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;")
+                               self.assertEqual(cursor.fetchall(), [(disk.id, NumericRange(0, 1))])
+
+       def test_persistent_read_errors(self):
+               """
+               Test a run where a disk read fails enough times to trigger the bad sector logic.
+               """
+               piece_size = 64
+
+               other_a = self._write_disk(16, 1)
+               other_b = self._write_disk(16, 2)
+               disk = self._write_disk(16, 1)
+               with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
+                       torrent = _Torrent(torrent_file, piece_size)
+                       self._write_torrent(torrent)
+                       with self._conn.cursor() as cursor:
+                               cursor.executemany(
+                                       "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
+                                       [
+                                               (other_a.id, NumericRange(0, 1), torrent.info_hash, 0),
+                                               (other_b.id, NumericRange(0, 2), torrent.info_hash, 16),
+                                               (disk.id, NumericRange(0, 1), torrent.info_hash, 48),
+                                       ]
+                               )
+
+                               do_verify(self._conn, other_a.id, torrent_file, read_size = 16, read_tries = 1)
+                               other_b_file = io.BytesIO(torrent_file.getvalue()[16:48])
+                               do_verify(self._conn, other_b.id, other_b_file, read_size = 16, read_tries = 1)
+
+                               cursor.execute("select verify_id from diskjumble.verify_piece;")
+                               [(verify_id,)] = cursor.fetchall()
+
+                               data = torrent_file.getvalue()[48:]
+                               disk_file = _ReadErrorProxy(io.BytesIO(data), error_pos = 5, error_count = None)
+                               do_verify(self._conn, disk.id, disk_file, read_size = 4, read_tries = 3)
+
+                               cursor.execute("select count(*) from diskjumble.verify_pass;")
+                               [(row_count,)] = cursor.fetchall()
+                               self.assertEqual(row_count, 0)
+
+                               cursor.execute("select disk_id, disk_sectors from diskjumble.verify_piece_content;")
+                               self.assertCountEqual(
+                                       cursor.fetchall(),
+                                       [(other_a.id, NumericRange(0, 1)), (other_b.id, NumericRange(0, 2)), (disk.id, NumericRange(0, 1))]
+                               )
+
+                               cursor.execute("select verify_id from diskjumble.verify_piece_fail;")
+                               self.assertEqual(cursor.fetchall(), [(verify_id,)])
+
+                               cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
+                               [(row_count,)] = cursor.fetchall()
+                               self.assertEqual(row_count, 0)
+
+       def _write_torrent(self, torrent: "_Torrent") -> None:
+               with self._conn.cursor() as cursor:
+                       cursor.execute("insert into bittorrent.torrent_info values (%s);", (torrent.info,))
+
+       def _write_disk(self, sector_size: int, sector_count: int) -> "_Disk":
+               with self._conn.cursor() as cursor:
+                       cursor.execute(
+                               "insert into diskjumble.disk values (default, %s, null, %s, %s, false) returning disk_id;",
+                               (uuid.uuid4(), sector_size, sector_count)
+                       )
+                       [(id_,)] = cursor.fetchall()
+               return _Disk(id_, sector_size, sector_count)
+
+       @classmethod
+       def setUpClass(cls):
+               psycopg2.extras.register_uuid()
+               prod_schema_sql = resources.files(__package__).joinpath("verify_setup.sql").read_text()
+               schema_sql = "\n".join(
+                       l for l in prod_schema_sql.splitlines()
+                       if (
+                               not l.startswith("SET")
+                               and not l.startswith("SELECT")  # ignore server settings
+                               and "OWNER TO" not in l  # and ownership changes
+                       )
+               )
+               cls._conn = psycopg2.connect("")
+               with cls._conn, cls._conn.cursor() as cursor:
+                       for name in cls._SCHEMAS:
+                               cursor.execute(f"create schema {name};")
+                       cursor.execute(schema_sql)
+
+       @classmethod
+       def tearDownClass(self):
+               try:
+                       with self._conn, self._conn.cursor() as cursor:
+                               for name in self._SCHEMAS:
+                                       cursor.execute(f"drop schema {name} cascade;")
+               finally:
+                       self._conn.close()
+
+       def tearDown(self):
+               self._conn.rollback()
+
+
+@dataclass
+class _Disk:
+       id: int
+       sector_size: int
+       sector_count: int
+
+
+class _Torrent:
+       def __init__(self, data: io.BufferedIOBase, piece_size: int) -> None:
+               data.seek(0)
+               hashes = []
+               while True:
+                       piece = data.read(piece_size)
+                       if piece:
+                               hashes.append(hashlib.sha1(piece).digest())
+                       else:
+                               break
+
+               info_dict = {
+                       b"piece length": piece_size,
+                       b"length": data.tell(),
+                       b"pieces": b"".join(hashes),
+               }
+               self.data = data
+               self.info = bencode.encode(info_dict)
+               self.info_hash = hashlib.sha1(self.info).digest()
+
+
+def _random_file(size: int, rand_src: Random, on_disk: bool) -> tempfile.NamedTemporaryFile:
+       f = tempfile.NamedTemporaryFile(buffering = _BUF_SIZE) if on_disk else io.BytesIO()
+       try:
+               while f.tell() < size:
+                       write_size = min(size - f.tell(), _BUF_SIZE)
+                       f.write(bytes(rand_src.getrandbits(8) for _ in range(write_size)))
+               f.seek(0)
+               return f
+       except Exception:
+               f.close()
+               raise
+
+
+@dataclass
+class _ReadErrorProxy(io.BufferedIOBase):
+       wrapped: io.BufferedIOBase
+       error_pos: int
+       error_count: Optional[int]
+
+       def __post_init__(self):
+               self.triggered = 0
+
+       def read(self, size: int = -1) -> bytes:
+               pre_pos = self.wrapped.tell()
+               data = self.wrapped.read(size)
+               erroring = self.error_count is None or self.triggered < self.error_count
+               in_range = 0 <= pre_pos - self.error_pos < len(data)
+               if erroring and in_range:
+                       self.triggered += 1
+                       raise OSError("simulated")
+               else:
+                       return data
+
+       def seek(self, *args, **kwargs) -> int:
+               return self.wrapped.seek(*args, **kwargs)
diff --git a/disk_jumble/src/disk_jumble/tests/verify_setup.sql b/disk_jumble/src/disk_jumble/tests/verify_setup.sql
new file mode 100644 (file)
index 0000000..aa0c5fb
--- /dev/null
@@ -0,0 +1,383 @@
+create extension btree_gist;
+CREATE OR REPLACE FUNCTION public.digest(bytea, text)
+ RETURNS bytea
+ LANGUAGE c
+ IMMUTABLE PARALLEL SAFE STRICT
+AS '$libdir/pgcrypto', $function$pg_digest$function$
+;
+--
+-- PostgreSQL database dump
+--
+
+-- Dumped from database version 13.2 (Debian 13.2-1.pgdg100+1)
+-- Dumped by pg_dump version 13.4 (Debian 13.4-0+deb11u1)
+
+SET statement_timeout = 0;
+SET lock_timeout = 0;
+SET idle_in_transaction_session_timeout = 0;
+SET client_encoding = 'UTF8';
+SET standard_conforming_strings = on;
+SELECT pg_catalog.set_config('search_path', '', false);
+SET check_function_bodies = false;
+SET xmloption = content;
+SET client_min_messages = warning;
+SET row_security = off;
+
+SET default_tablespace = '';
+
+SET default_table_access_method = heap;
+
+--
+-- Name: torrent_info; Type: TABLE; Schema: bittorrent; Owner: eros
+--
+
+CREATE TABLE bittorrent.torrent_info (
+    info bytea NOT NULL
+);
+
+
+ALTER TABLE bittorrent.torrent_info OWNER TO eros;
+
+--
+-- Name: disk; Type: TABLE; Schema: diskjumble; Owner: eros
+--
+
+CREATE TABLE diskjumble.disk (
+    disk_id integer NOT NULL,
+    dev_uuid uuid NOT NULL,
+    dev_serial text,
+    sector_size integer NOT NULL,
+    num_sectors bigint NOT NULL,
+    failed boolean DEFAULT false NOT NULL
+);
+
+
+ALTER TABLE diskjumble.disk OWNER TO eros;
+
+--
+-- Name: disk_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros
+--
+
+CREATE SEQUENCE diskjumble.disk_id_seq
+    AS integer
+    START WITH 1
+    INCREMENT BY 1
+    NO MINVALUE
+    NO MAXVALUE
+    CACHE 1;
+
+
+ALTER TABLE diskjumble.disk_id_seq OWNER TO eros;
+
+--
+-- Name: disk_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros
+--
+
+ALTER SEQUENCE diskjumble.disk_id_seq OWNED BY diskjumble.disk.disk_id;
+
+
+--
+-- Name: slab; Type: TABLE; Schema: diskjumble; Owner: eros
+--
+
+CREATE TABLE diskjumble.slab (
+    slab_id integer NOT NULL,
+    disk_id integer NOT NULL,
+    disk_sectors int8range NOT NULL,
+    entity_id bytea NOT NULL,
+    entity_offset bigint NOT NULL,
+    crypt_key bytea
+);
+
+
+ALTER TABLE diskjumble.slab OWNER TO eros;
+
+--
+-- Name: slab_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros
+--
+
+CREATE SEQUENCE diskjumble.slab_id_seq
+    START WITH 1
+    INCREMENT BY 1
+    NO MINVALUE
+    NO MAXVALUE
+    CACHE 1;
+
+
+ALTER TABLE diskjumble.slab_id_seq OWNER TO eros;
+
+--
+-- Name: slab_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros
+--
+
+ALTER SEQUENCE diskjumble.slab_id_seq OWNED BY diskjumble.slab.slab_id;
+
+
+--
+-- Name: verify_pass; Type: TABLE; Schema: diskjumble; Owner: eros
+--
+
+CREATE TABLE diskjumble.verify_pass (
+    verify_pass_id integer NOT NULL,
+    at timestamp with time zone,
+    disk_id integer NOT NULL,
+    disk_sectors int8range NOT NULL
+);
+
+
+ALTER TABLE diskjumble.verify_pass OWNER TO eros;
+
+--
+-- Name: verify_pass_verify_pass_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros
+--
+
+CREATE SEQUENCE diskjumble.verify_pass_verify_pass_id_seq
+    AS integer
+    START WITH 1
+    INCREMENT BY 1
+    NO MINVALUE
+    NO MAXVALUE
+    CACHE 1;
+
+
+ALTER TABLE diskjumble.verify_pass_verify_pass_id_seq OWNER TO eros;
+
+--
+-- Name: verify_pass_verify_pass_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros
+--
+
+ALTER SEQUENCE diskjumble.verify_pass_verify_pass_id_seq OWNED BY diskjumble.verify_pass.verify_pass_id;
+
+
+--
+-- Name: verify_piece; Type: TABLE; Schema: diskjumble; Owner: eros
+--
+
+CREATE TABLE diskjumble.verify_piece (
+    verify_id integer NOT NULL,
+    at timestamp with time zone,
+    entity_id bytea NOT NULL,
+    piece integer NOT NULL
+);
+
+
+ALTER TABLE diskjumble.verify_piece OWNER TO eros;
+
+--
+-- Name: verify_piece_content; Type: TABLE; Schema: diskjumble; Owner: eros
+--
+
+CREATE TABLE diskjumble.verify_piece_content (
+    verify_id integer NOT NULL,
+    seq integer NOT NULL,
+    disk_id integer NOT NULL,
+    disk_sectors int8range NOT NULL
+);
+
+
+ALTER TABLE diskjumble.verify_piece_content OWNER TO eros;
+
+--
+-- Name: verify_piece_fail; Type: TABLE; Schema: diskjumble; Owner: eros
+--
+
+CREATE TABLE diskjumble.verify_piece_fail (
+    verify_id integer NOT NULL
+);
+
+
+ALTER TABLE diskjumble.verify_piece_fail OWNER TO eros;
+
+--
+-- Name: verify_piece_incomplete; Type: TABLE; Schema: diskjumble; Owner: eros
+--
+
+CREATE TABLE diskjumble.verify_piece_incomplete (
+    verify_id integer NOT NULL,
+    hasher_state json
+);
+
+
+ALTER TABLE diskjumble.verify_piece_incomplete OWNER TO eros;
+
+--
+-- Name: verify_piece_verify_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros
+--
+
+CREATE SEQUENCE diskjumble.verify_piece_verify_id_seq
+    AS integer
+    START WITH 1
+    INCREMENT BY 1
+    NO MINVALUE
+    NO MAXVALUE
+    CACHE 1;
+
+
+ALTER TABLE diskjumble.verify_piece_verify_id_seq OWNER TO eros;
+
+--
+-- Name: verify_piece_verify_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros
+--
+
+ALTER SEQUENCE diskjumble.verify_piece_verify_id_seq OWNED BY diskjumble.verify_piece.verify_id;
+
+
+--
+-- Name: disk disk_id; Type: DEFAULT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.disk ALTER COLUMN disk_id SET DEFAULT nextval('diskjumble.disk_id_seq'::regclass);
+
+
+--
+-- Name: slab slab_id; Type: DEFAULT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.slab ALTER COLUMN slab_id SET DEFAULT nextval('diskjumble.slab_id_seq'::regclass);
+
+
+--
+-- Name: verify_pass verify_pass_id; Type: DEFAULT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_pass ALTER COLUMN verify_pass_id SET DEFAULT nextval('diskjumble.verify_pass_verify_pass_id_seq'::regclass);
+
+
+--
+-- Name: verify_piece verify_id; Type: DEFAULT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece ALTER COLUMN verify_id SET DEFAULT nextval('diskjumble.verify_piece_verify_id_seq'::regclass);
+
+
+--
+-- Name: disk disk_dev_uuid_key; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.disk
+    ADD CONSTRAINT disk_dev_uuid_key UNIQUE (dev_uuid);
+
+
+--
+-- Name: disk disk_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.disk
+    ADD CONSTRAINT disk_pkey PRIMARY KEY (disk_id);
+
+
+--
+-- Name: slab slab_disk_id_disk_sectors_excl; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.slab
+    ADD CONSTRAINT slab_disk_id_disk_sectors_excl EXCLUDE USING gist (disk_id WITH =, disk_sectors WITH &&);
+
+
+--
+-- Name: slab slab_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.slab
+    ADD CONSTRAINT slab_pkey PRIMARY KEY (slab_id);
+
+
+--
+-- Name: verify_pass verify_pass_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_pass
+    ADD CONSTRAINT verify_pass_pkey PRIMARY KEY (verify_pass_id);
+
+
+--
+-- Name: verify_piece_content verify_piece_content_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece_content
+    ADD CONSTRAINT verify_piece_content_pkey PRIMARY KEY (verify_id, seq);
+
+
+--
+-- Name: verify_piece_fail verify_piece_fail_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece_fail
+    ADD CONSTRAINT verify_piece_fail_pkey PRIMARY KEY (verify_id);
+
+
+--
+-- Name: verify_piece_incomplete verify_piece_incomplete_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece_incomplete
+    ADD CONSTRAINT verify_piece_incomplete_pkey PRIMARY KEY (verify_id);
+
+
+--
+-- Name: verify_piece verify_piece_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece
+    ADD CONSTRAINT verify_piece_pkey PRIMARY KEY (verify_id);
+
+
+--
+-- Name: torrent_info_digest_idx; Type: INDEX; Schema: bittorrent; Owner: eros
+--
+
+CREATE UNIQUE INDEX torrent_info_digest_idx ON bittorrent.torrent_info USING btree (public.digest(info, 'sha1'::text));
+
+
+--
+-- Name: slab slab_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.slab
+    ADD CONSTRAINT slab_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id);
+
+
+--
+-- Name: verify_pass verify_pass_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_pass
+    ADD CONSTRAINT verify_pass_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id);
+
+
+--
+-- Name: verify_piece_content verify_piece_content_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece_content
+    ADD CONSTRAINT verify_piece_content_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id);
+
+
+--
+-- Name: verify_piece_content verify_piece_content_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece_content
+    ADD CONSTRAINT verify_piece_content_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id);
+
+
+--
+-- Name: verify_piece_fail verify_piece_fail_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece_fail
+    ADD CONSTRAINT verify_piece_fail_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id);
+
+
+--
+-- Name: verify_piece_incomplete verify_piece_incomplete_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece_incomplete
+    ADD CONSTRAINT verify_piece_incomplete_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id);
+
+
+--
+-- PostgreSQL database dump complete
+--
+
diff --git a/disk_jumble/src/disk_jumble/trackers.py b/disk_jumble/src/disk_jumble/trackers.py
new file mode 100644 (file)
index 0000000..ba65344
--- /dev/null
@@ -0,0 +1,13 @@
+from typing import NamedTuple
+
+
+class Tracker(NamedTuple):
+       gazelle_id: int
+       scrape_url_format: str
+
+
+TRACKERS = {
+       "PTP": Tracker(1, "http://please.passthepopcorn.me:2710/{passkey}/scrape"),
+       "BTN": Tracker(3, "http://landof.tv/{passkey}/scrape"),
+       "Redacted": Tracker(4, "https://flacsfor.me/{passkey}/scrape"),
+}
diff --git a/disk_jumble/src/disk_jumble/verify.py b/disk_jumble/src/disk_jumble/verify.py
new file mode 100644 (file)
index 0000000..0adbb03
--- /dev/null
@@ -0,0 +1,237 @@
+from __future__ import annotations
+from abc import ABCMeta, abstractmethod
+from dataclasses import dataclass
+from typing import Optional
+import argparse
+import contextlib
+import datetime as dt
+import io
+import itertools
+import math
+
+import psycopg2
+
+from disk_jumble import bencode
+from disk_jumble.db import HasherRef, Slab, Wrapper as DbWrapper
+from disk_jumble.nettle import Sha1Hasher
+
+
+_READ_BUFFER_SIZE = 16 * 1024 ** 2  # in bytes
+
+
+@dataclass
+class _SlabChunk:
+       """A slice of a slab; comprising all or part of a piece to be hashed."""
+       slab: Slab
+       slice: slice
+
+
+@dataclass
+class _PieceTask:
+       """The chunks needed to hash as fully as possible an entity piece."""
+       entity_id: bytes
+       piece_num: int
+       hasher_ref: Optional[HasherRef]
+       chunks: list[_SlabChunk]
+       complete: bool  # do these chunks complete the piece?
+
+
+class _BadSector(Exception):
+       pass
+
+
+def do_verify(conn, disk_id: int, disk_file: io.BufferedIOBase, read_size: int, read_tries: int) -> None:
+       db = DbWrapper(conn)
+       disk = db.get_disk(disk_id)
+
+       info_dicts = {
+               info_hash: bencode.decode(info)
+               for (info_hash, info) in db.get_torrent_info(disk_id)
+       }
+
+       tasks = []
+       slabs_and_hashers = db.get_slabs_and_hashers(disk_id)
+       for (entity_id, group) in itertools.groupby(slabs_and_hashers, lambda t: t[0].entity_id):
+               info = info_dicts[entity_id]
+               piece_len = info[b"piece length"]
+               assert piece_len % disk.sector_size == 0
+               if b"length" in info:
+                       torrent_len = info[b"length"]
+               else:
+                       torrent_len = sum(d[b"length"] for d in info[b"files"])
+
+               offset = None
+               use_hasher = None
+               chunks = []
+               for (slab, hasher_ref) in group:
+                       slab_end = min(slab.entity_offset + len(slab.sectors) * disk.sector_size, torrent_len)
+
+                       while offset is None or offset < slab_end:
+                               if offset is not None and slab.entity_offset > offset:
+                                       if chunks:
+                                               tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, False))
+                                       offset = None
+                                       use_hasher = None
+                                       chunks = []
+
+                               if offset is None:
+                                       aligned = math.ceil(slab.entity_offset / piece_len) * piece_len
+                                       if hasher_ref and hasher_ref.entity_offset < aligned:
+                                               assert hasher_ref.entity_offset < torrent_len
+                                               use_hasher = hasher_ref
+                                               offset = hasher_ref.entity_offset
+                                       elif aligned < slab_end:
+                                               offset = aligned
+                                       else:
+                                               break  # no usable data in this slab
+
+                               if offset is not None:
+                                       piece_end = min(offset + piece_len - offset % piece_len, torrent_len)
+                                       chunk_end = min(piece_end, slab_end)
+                                       chunks.append(_SlabChunk(slab, slice(offset - slab.entity_offset, chunk_end - slab.entity_offset)))
+                                       if chunk_end == piece_end:
+                                               tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, True))
+                                               use_hasher = None
+                                               chunks = []
+                                       offset = chunk_end
+
+               if chunks:
+                       tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, False))
+
+       @dataclass
+       class NewVerifyPiece:
+               entity_id: bytes
+               piece_num: int
+               sector_ranges: list[range]
+               hasher_state: Optional[dict]
+               failed: bool
+
+       @dataclass
+       class VerifyUpdate:
+               seq_start: int
+               new_sector_ranges: list[range]
+               hasher_state: Optional[dict]
+
+       passed_verifies = set()
+       failed_verifies = set()
+       new_pass_ranges = []
+       vp_updates = {}
+       new_vps = []
+
+       run_ts = dt.datetime.now(dt.timezone.utc)
+       for task in tasks:
+               hasher = Sha1Hasher(task.hasher_ref.state if task.hasher_ref else None)
+               sector_ranges = [
+                       range(
+                               chunk.slab.sectors.start + chunk.slice.start // disk.sector_size,
+                               chunk.slab.sectors.start + math.ceil(chunk.slice.stop / disk.sector_size)
+                       )
+                       for chunk in task.chunks
+               ]
+
+               try:
+                       for chunk in task.chunks:
+                               slab_off = chunk.slab.sectors.start * disk.sector_size
+                               disk_file.seek(slab_off + chunk.slice.start)
+                               end_pos = slab_off + chunk.slice.stop
+                               while disk_file.tell() < end_pos:
+                                       pos = disk_file.tell()
+                                       for _ in range(read_tries):
+                                               try:
+                                                       data = disk_file.read(min(end_pos - pos, read_size))
+                                               except OSError as e:
+                                                       disk_file.seek(pos)
+                                               else:
+                                                       break
+                                       else:
+                                               raise _BadSector()
+
+                                       assert data
+                                       hasher.update(data)
+               except _BadSector:
+                       if task.hasher_ref:
+                               failed_verifies.add(task.hasher_ref.id)
+                               vp_updates[task.hasher_ref.id] = VerifyUpdate(task.hasher_ref.seq + 1, sector_ranges, None)
+                       else:
+                               new_vps.append(NewVerifyPiece(task.entity_id, task.piece_num, sector_ranges, None, True))
+               else:
+                       hasher_state = hasher.ctx.serialize()
+                       if task.complete:
+                               s = slice(task.piece_num * 20, task.piece_num * 20 + 20)
+                               expected_hash = info_dicts[task.entity_id][b"pieces"][s]
+                               if hasher.digest() == expected_hash:
+                                       write_piece_data = False
+                                       new_pass_ranges.extend(sector_ranges)
+                                       if task.hasher_ref:
+                                               passed_verifies.add(task.hasher_ref.id)
+                               else:
+                                       failed = True
+                                       write_piece_data = True
+                                       if task.hasher_ref:
+                                               failed_verifies.add(task.hasher_ref.id)
+                       else:
+                               failed = False
+                               write_piece_data = True
+
+                       if write_piece_data:
+                               if task.hasher_ref:
+                                       assert task.hasher_ref.id not in vp_updates
+                                       vp_updates[task.hasher_ref.id] = VerifyUpdate(task.hasher_ref.seq + 1, sector_ranges, hasher_state)
+                               else:
+                                       new_vps.append(NewVerifyPiece(task.entity_id, task.piece_num, sector_ranges, hasher_state, failed))
+
+       new_pass_ranges.sort(key = lambda r: r.start)
+       merged_ranges = []
+       for r in new_pass_ranges:
+               if merged_ranges and r.start == merged_ranges[-1].stop:
+                       merged_ranges[-1] = range(merged_ranges[-1].start, r.stop)
+               else:
+                       merged_ranges.append(r)
+
+       for vp in new_vps:
+               verify_id = db.insert_verify_piece(run_ts, vp.entity_id, vp.piece_num)
+               db.insert_verify_piece_content(verify_id, 0, disk_id, vp.sector_ranges)
+               if vp.failed:
+                       db.mark_verify_piece_failed(verify_id)
+               else:
+                       db.upsert_hasher_state(verify_id, vp.hasher_state)
+
+       for (verify_id, update) in vp_updates.items():
+               db.insert_verify_piece_content(verify_id, update.seq_start, disk_id, update.new_sector_ranges)
+               if update.hasher_state:
+                       db.upsert_hasher_state(verify_id, update.hasher_state)
+
+       for verify_id in passed_verifies:
+               db.move_piece_content_for_pass(verify_id)
+               db.delete_verify_piece(verify_id)
+
+       for r in merged_ranges:
+               db.insert_pass_data(run_ts, disk_id, r)
+
+       for verify_id in failed_verifies:
+               db.clear_incomplete(verify_id)
+               db.mark_verify_piece_failed(verify_id)
+
+
+if __name__ == "__main__":
+       def read_tries(raw_arg):
+               val = int(raw_arg)
+               if val > 0:
+                       return val
+               else:
+                       raise ValueError()
+
+       parser = argparse.ArgumentParser()
+       parser.add_argument("disk_id", type = int)
+       parser.add_argument(
+               "read_tries",
+               type = read_tries,
+               help = "number of times to attempt a particular disk read before giving up on the sector",
+       )
+       args = parser.parse_args()
+
+       with contextlib.closing(psycopg2.connect("")) as conn:
+               conn.autocommit = True
+               path = f"/dev/mapper/diskjumble-{args.disk_id}"
+               with open(path, "rb", buffering = _READ_BUFFER_SIZE) as disk_file:
+                       do_verify(conn, args.disk_id, disk_file, _READ_BUFFER_SIZE, args.read_tries)
diff --git a/disk_jumble/test_util/dump_db.py b/disk_jumble/test_util/dump_db.py
new file mode 100644 (file)
index 0000000..9551bf9
--- /dev/null
@@ -0,0 +1,46 @@
+"""
+Using the live database, dump creation code for extensions, tables, and functions needed for local testing
+
+For testing the verification script, write the output of this script to:
+
+       src/disk_jumble/tests/verify_setup.sql
+"""
+
+import contextlib
+import itertools
+import os
+import subprocess
+
+import psycopg2
+
+
+procedures = [
+       "public.digest(bytea, text)",
+]
+
+extensions = [
+       "btree_gist",
+]
+
+with contextlib.closing(psycopg2.connect("")) as conn:
+       conn.autocommit = True
+       with conn.cursor() as cursor:
+               for ext in extensions:
+                       print(f"create extension {ext};", flush = True)
+               for sig in procedures:
+                       cursor.execute("select pg_get_functiondef(to_regprocedure(%s));", (sig,))
+                       [(sql,)] = cursor.fetchall()
+                       print(sql + ";", flush = True)
+
+tables = [
+       "diskjumble.disk",
+       "diskjumble.slab",
+       "diskjumble.verify_pass",
+       "diskjumble.verify_piece",
+       "diskjumble.verify_piece_content",
+       "diskjumble.verify_piece_fail",
+       "diskjumble.verify_piece_incomplete",
+       "bittorrent.torrent_info",
+]
+argv = ["pg_dump", *itertools.chain.from_iterable(["-t", table] for table in tables), "--schema-only", os.environ["PGDATABASE"]]
+subprocess.run(argv, check = True)
diff --git a/pyproject.toml b/pyproject.toml
deleted file mode 100644 (file)
index 8fe2f47..0000000
+++ /dev/null
@@ -1,3 +0,0 @@
-[build-system]
-requires = ["setuptools>=42", "wheel"]
-build-backend = "setuptools.build_meta"
diff --git a/setup.cfg b/setup.cfg
deleted file mode 100644 (file)
index 3fc8d70..0000000
--- a/setup.cfg
+++ /dev/null
@@ -1,11 +0,0 @@
-[metadata]
-name = disk_jumble
-version = 0.0.1
-
-[options]
-package_dir =
-       = src
-packages = disk_jumble
-python_requires ~= "3.7"
-install_requires =
-       psycopg2-binary ~= 2.8
diff --git a/setup.py b/setup.py
deleted file mode 100644 (file)
index b024da8..0000000
--- a/setup.py
+++ /dev/null
@@ -1,4 +0,0 @@
-from setuptools import setup
-
-
-setup()
diff --git a/src/disk_jumble/__init__.py b/src/disk_jumble/__init__.py
deleted file mode 100644 (file)
index e69de29..0000000
diff --git a/src/disk_jumble/bencode.py b/src/disk_jumble/bencode.py
deleted file mode 100644 (file)
index 7883b53..0000000
+++ /dev/null
@@ -1,124 +0,0 @@
-from __future__ import annotations
-from typing import Union
-import itertools
-
-
-Bdict = dict[bytes, 'Type']
-Type = Union[bytes, int, list['Type'], Bdict]
-
-
-class CodecError(Exception):
-       pass
-
-
-def _pop_bytes(vals: list[bytes]) -> bytes:
-       len_parts = []
-       while vals and vals[0].isdigit():
-               len_parts.append(vals.pop(0))
-
-       try:
-               length = int(b"".join(len_parts).decode("ascii"))
-       except ValueError:
-               raise CodecError()
-
-       try:
-               if vals.pop(0) != b":":
-                       raise CodecError()
-       except IndexError:
-               raise CodecError()
-
-       if length > len(vals):
-               raise CodecError()
-
-       out = b"".join(vals[:length])
-       del vals[:length]
-       return out
-
-
-def _pop_int(vals: list[bytes]) -> int:
-       assert vals.pop(0) == b"i"
-
-       try:
-               end = vals.index(b"e")
-       except ValueError:
-               raise CodecError()
-
-       try:
-               out = int(b"".join(vals[:end]).decode("ascii"))
-       except ValueError:
-               raise CodecError()
-
-       del vals[slice(end + 1)]
-       return out
-
-
-def _pop_list(vals: list[bytes]) -> list[Type]:
-       assert vals.pop(0) == b"l"
-
-       out = []
-       while vals and vals[0] != b"e":
-               out.append(_pop_value(vals))
-
-       if vals:
-               del vals[0]
-               return out
-       else:
-               raise CodecError()
-
-
-def _pop_dict(vals: list[bytes]) -> Bdict:
-       assert vals.pop(0) == b"d"
-
-       out = {}
-       while vals and vals[0] != b"e":
-               key = _pop_bytes(vals)
-               out[key] = _pop_value(vals)
-
-       if vals:
-               del vals[0]
-               return out
-       else:
-               raise CodecError()
-
-
-def _pop_value(vals: list[bytes]) -> Type:
-       if vals:
-               if vals[0].isdigit():
-                       return _pop_bytes(vals)
-               elif vals[0] == b"i":
-                       return _pop_int(vals)
-               elif vals[0] == b"l":
-                       return _pop_list(vals)
-               elif vals[0] == b"d":
-                       return _pop_dict(vals)
-               else:
-                       raise CodecError()
-       else:
-               raise CodecError()
-
-
-def decode(data: bytes) -> Type:
-       vals = [bytes([v]) for v in data]
-       out = _pop_value(vals)
-       if vals:
-               raise CodecError()
-       else:
-               return out
-
-
-def _encode_helper(data: Type) -> list[bytes]:
-       if isinstance(data, bytes):
-               return [str(len(data)).encode("ascii"), b":", data]
-       elif isinstance(data, int):
-               return [b"i", str(data).encode("ascii"), b"e"]
-       elif isinstance(data, list):
-               return [b"l", *itertools.chain.from_iterable(map(_encode_helper, data)), b"e"]
-       elif isinstance(data, dict):
-               contents = itertools.chain.from_iterable(data.items())
-               return [b"d", *itertools.chain.from_iterable(map(_encode_helper, contents)), b"e"]
-       else:
-               raise CodecError()
-
-
-def encode(data: Type) -> bytes:
-       return b"".join(_encode_helper(data))
diff --git a/src/disk_jumble/db.py b/src/disk_jumble/db.py
deleted file mode 100644 (file)
index 68488f0..0000000
+++ /dev/null
@@ -1,223 +0,0 @@
-from __future__ import annotations
-from dataclasses import dataclass
-from typing import Any, Iterable, Optional
-import datetime as dt
-import itertools
-
-from psycopg2.extras import execute_batch, Json, NumericRange
-
-
-@dataclass
-class Disk:
-       sector_size: int
-
-
-@dataclass
-class Slab:
-       id: int
-       disk_id: int
-       sectors: range
-       entity_id: bytes
-       entity_offset: int
-       crypt_key: bytes
-
-
-@dataclass
-class HasherRef:
-       id: int
-       seq: int
-       entity_offset: int
-       state: dict
-
-
-@dataclass
-class Wrapper:
-       conn: Any
-
-       def get_passkey(self, tracker_id: int) -> str:
-               with self.conn.cursor() as cursor:
-                       cursor.execute("select passkey from gazelle.passkey where gazelle_tracker_id = %s;", (tracker_id,))
-                       [(passkey,)] = cursor.fetchall()
-
-               return passkey
-
-       def get_torrents(self, tracker_id: int, batch_size: Optional[int] = None) -> Iterable[bytes]:
-               """Iterate the info hashes for the specified tracker which haven't been marked deleted."""
-
-               stmt = """
-                       select infohash from gazelle.torrent
-                       where gazelle_tracker_id = %s and not is_deleted
-                       order by infohash asc
-                       ;
-               """
-               with self.conn.cursor() as cursor:
-                       if batch_size is not None:
-                               cursor.itersize = batch_size
-
-                       cursor.execute(stmt, (tracker_id,))
-                       for row in cursor:
-                               (info_hash_mem,) = row
-                               info_hash = bytes(info_hash_mem)
-                               assert len(info_hash) == 20
-                               yield info_hash
-
-       def insert_swarm_info(self, tracker_id: int, infos: Iterable["disk_jumble.scrape.ScrapeInfo"]) -> None:
-               stmt = """
-                       insert into gazelle.tracker_stat (gazelle_tracker_id, infohash, ts, complete, incomplete, downloaded)
-                       values (%s, %s, %s, %s, %s, %s)
-                       ;
-               """
-               with self.conn.cursor() as cursor:
-                       param_sets = [
-                               (tracker_id, i.info_hash, i.timestamp, i.complete, i.incomplete, i.downloaded)
-                               for i in infos
-                       ]
-                       execute_batch(cursor, stmt, param_sets)
-
-       def get_disk(self, id_: int) -> Disk:
-               with self.conn.cursor() as cursor:
-                       cursor.execute("select sector_size from diskjumble.disk where disk_id = %s;", (id_,))
-                       [(sector_size,)] = cursor.fetchall()
-
-               return Disk(sector_size)
-
-       def get_slabs_and_hashers(self, disk_id: int) -> Iterable[tuple[Slab, Optional[HasherRef]]]:
-               """
-               Find slabs on the specified disk, and for each also return any lowest-offset saved hasher that left off directly
-               before or within the slab's entity data.
-               """
-
-               stmt = """
-                       with
-                               incomplete_edge as (
-                                       -- join up incomplete piece info and precompute where the hasher left off within the entity
-                                       select
-                                               verify_id, seq, slab.entity_id, hasher_state,
-                                               entity_offset + (upper(c.disk_sectors) - lower(slab.disk_sectors)) * sector_size as end_off
-                                       from
-                                               diskjumble.verify_piece_incomplete
-                                               natural left join diskjumble.verify_piece p
-                                               natural join diskjumble.verify_piece_content c
-                                               natural left join diskjumble.disk
-                                               left join diskjumble.slab on (
-                                                       c.disk_id = slab.disk_id
-                                                       and upper(c.disk_sectors) <@ int8range(lower(slab.disk_sectors), upper(slab.disk_sectors), '[]')
-                                               )
-                                       where seq >= all (select seq from diskjumble.verify_piece_content where verify_id = p.verify_id)
-                               )
-                       select
-                               slab_id, disk_id, disk_sectors, slab.entity_id, entity_offset, crypt_key, verify_id, seq, end_off,
-                               hasher_state
-                       from
-                               diskjumble.slab
-                               natural left join diskjumble.disk
-                               left join incomplete_edge on
-                                       incomplete_edge.entity_id = slab.entity_id
-                                       and incomplete_edge.end_off <@ int8range(
-                                               slab.entity_offset,
-                                               slab.entity_offset + (upper(disk_sectors) - lower(disk_sectors)) * sector_size
-                                       )
-                                       and (incomplete_edge.end_off - slab.entity_offset) %% sector_size = 0
-                       where disk_id = %s
-                       order by slab.entity_id, entity_offset, slab_id
-                       ;
-               """
-               with self.conn.cursor() as cursor:
-                       cursor.execute(stmt, (disk_id,))
-                       for (_, rows_iter) in itertools.groupby(cursor, lambda r: r[0]):
-                               rows = list(rows_iter)
-                               [(slab_id, disk_id, sectors_pg, entity_id, entity_off, key)] = {r[:6] for r in rows}
-                               sectors = range(sectors_pg.lower, sectors_pg.upper)
-                               slab = Slab(slab_id, disk_id, sectors, bytes(entity_id), entity_off, key)
-
-                               # `None' if no hasher match in outer join, otherwise earliest match
-                               (*_, id_, seq, end_off, state) = min(rows, key = lambda r: r[-2])
-                               hasher_ref = None if id_ is None else HasherRef(id_, seq, end_off, state)
-
-                               yield (slab, hasher_ref)
-
-       def get_torrent_info(self, disk_id: int) -> Iterable[tuple[bytes, bytes]]:
-               stmt = """
-                       with hashed as (
-                               select digest(info, 'sha1') as info_hash, info
-                               from bittorrent.torrent_info
-                       )
-                       select
-                               distinct on (info_hash)
-                               info_hash, info
-                               from diskjumble.slab left outer join hashed on entity_id = info_hash
-                               where disk_id = %s
-                       ;
-               """
-               with self.conn.cursor() as cursor:
-                       cursor.execute(stmt, (disk_id,))
-                       for (info_hash, info) in cursor:
-                               yield (bytes(info_hash), bytes(info))
-
-       def insert_verify_piece(self, ts: dt.datetime, entity_id: bytes, piece_num: int) -> int:
-               """Insert new verify piece, returning the ID of the inserted row."""
-
-               with self.conn.cursor() as cursor:
-                       stmt = "insert into diskjumble.verify_piece values (default, %s, %s, %s) returning verify_id;"
-                       cursor.execute(stmt, (ts, entity_id, piece_num))
-                       [(row_id,)] = cursor.fetchall()
-               return row_id
-
-       def insert_verify_piece_content(self, verify_id: int, seq_start: int, disk_id: int, ranges: Iterable[range]) -> None:
-               with self.conn.cursor() as cursor:
-                       execute_batch(
-                               cursor,
-                               "insert into diskjumble.verify_piece_content values (%s, %s, %s, %s);",
-                               [
-                                       (verify_id, seq, disk_id, NumericRange(r.start, r.stop))
-                                       for (seq, r) in enumerate(ranges, start = seq_start)
-                               ]
-                       )
-
-       def mark_verify_piece_failed(self, verify_id: int) -> None:
-               with self.conn.cursor() as cursor:
-                       cursor.execute("insert into diskjumble.verify_piece_fail values (%s);", (verify_id,))
-
-       def upsert_hasher_state(self, verify_id: int, state: dict) -> None:
-               stmt = """
-                       insert into diskjumble.verify_piece_incomplete values (%s, %s)
-                       on conflict (verify_id) do update set hasher_state = excluded.hasher_state
-                       ;
-               """
-               with self.conn.cursor() as cursor:
-                       cursor.execute(stmt, (verify_id, Json(state)))
-
-       def delete_verify_piece(self, verify_id: int) -> None:
-               with self.conn.cursor() as cursor:
-                       cursor.execute("delete from diskjumble.verify_piece_incomplete where verify_id = %s;", (verify_id,))
-                       cursor.execute("delete from diskjumble.verify_piece_content where verify_id = %s;", (verify_id,))
-                       cursor.execute("delete from diskjumble.verify_piece where verify_id = %s", (verify_id,))
-
-       def move_piece_content_for_pass(self, verify_id: int) -> None:
-               stmt = """
-                       with content_out as (
-                               delete from diskjumble.verify_piece_content c
-                               using diskjumble.verify_piece p
-                               where (
-                                       c.verify_id = p.verify_id
-                                       and p.verify_id = %s
-                               )
-                               returning at, disk_id, disk_sectors
-                       )
-                       insert into diskjumble.verify_pass (at, disk_id, disk_sectors)
-                       select at, disk_id, disk_sectors from content_out
-                       ;
-               """
-               with self.conn.cursor() as cursor:
-                       cursor.execute(stmt, (verify_id,))
-
-       def insert_pass_data(self, ts: dt.datetime, disk_id: int, sectors: range) -> None:
-               with self.conn.cursor() as cursor:
-                       cursor.execute(
-                               "insert into diskjumble.verify_pass values (default, %s, %s, %s);",
-                               (ts, disk_id, NumericRange(sectors.start, sectors.stop))
-                       )
-
-       def clear_incomplete(self, verify_id: int) -> None:
-               with self.conn.cursor() as cursor:
-                       cursor.execute("delete from diskjumble.verify_piece_incomplete where verify_id = %s;", (verify_id,))
diff --git a/src/disk_jumble/nettle.py b/src/disk_jumble/nettle.py
deleted file mode 100644 (file)
index dfabbca..0000000
+++ /dev/null
@@ -1,66 +0,0 @@
-"""Python wrappers for some of GnuTLS Nettle."""
-
-from ctypes.util import find_library
-from typing import Optional
-import ctypes
-
-
-_LIB = ctypes.CDLL(find_library("nettle"))
-
-
-class _Sha1Defs:
-       _DIGEST_SIZE = 20  # in bytes
-       _BLOCK_SIZE = 64  # in bytes
-       _DIGEST_LENGTH = 5
-
-       _StateArr = ctypes.c_uint32 * _DIGEST_LENGTH
-
-       _BlockArr = ctypes.c_uint8 * _BLOCK_SIZE
-
-
-class Sha1Hasher(_Sha1Defs):
-       class Context(ctypes.Structure):
-               _fields_ = [
-                       ("state", _Sha1Defs._StateArr),
-                       ("count", ctypes.c_uint64),
-                       ("index", ctypes.c_uint),
-                       ("block", _Sha1Defs._BlockArr),
-               ]
-
-               @classmethod
-               def deserialize(cls, data):
-                       return cls(
-                               _Sha1Defs._StateArr(*data["state"]),
-                               data["count"],
-                               data["index"],
-                               _Sha1Defs._BlockArr(*data["block"]),
-                       )
-
-               def serialize(self):
-                       return {
-                               "state": list(self.state),
-                               "count": self.count,
-                               "index": self.index,
-                               "block": list(self.block),
-                       }
-
-       @classmethod
-       def _new_context(cls):
-               ctx = cls.Context()
-               _LIB.nettle_sha1_init(ctypes.byref(ctx))
-               return ctx
-
-       def __init__(self, ctx_dict: Optional[dict]):
-               if ctx_dict:
-                       self.ctx = self.Context.deserialize(ctx_dict)
-               else:
-                       self.ctx = self._new_context()
-
-       def update(self, data):
-               _LIB.nettle_sha1_update(ctypes.byref(self.ctx), len(data), data)
-
-       def digest(self):
-               """Return the current digest and reset the hasher state."""
-               out = (ctypes.c_uint8 * self._DIGEST_SIZE)()
-               _LIB.nettle_sha1_digest(ctypes.byref(self.ctx), self._DIGEST_SIZE, out)
-               return bytes(out)
diff --git a/src/disk_jumble/scrape.py b/src/disk_jumble/scrape.py
deleted file mode 100644 (file)
index 4143b8a..0000000
+++ /dev/null
@@ -1,157 +0,0 @@
-from dataclasses import dataclass
-from typing import Iterable, Union
-from urllib.error import URLError
-from urllib.parse import urlencode, urlparse, urlunparse
-from urllib.request import HTTPHandler, HTTPRedirectHandler, HTTPSHandler, OpenerDirector, UnknownHandler
-import argparse
-import contextlib
-import datetime as dt
-import itertools
-import re
-import sys
-import time
-
-import psycopg2
-
-from disk_jumble import bencode
-from disk_jumble.db import Wrapper as DbWrapper
-from disk_jumble.trackers import Tracker, TRACKERS
-
-
-class Result:
-       pass
-
-
-@dataclass
-class OkResult(Result):
-       data: bencode.Type
-
-
-@dataclass
-class ErrorResult(Result):
-       status: int
-       body: Union[bencode.Type, bytes]
-
-
-@dataclass
-class ScrapeInfo:
-       info_hash: bytes
-       timestamp: dt.datetime
-       complete: int
-       incomplete: int
-       downloaded: int
-
-
-def scrape_batch(tracker: Tracker, info_hashes: Iterable[bytes], passkey: str) -> Result:
-       qs = urlencode({"info_hash": list(info_hashes)}, doseq = True)
-       url_parts = urlparse(tracker.scrape_url_format.format(passkey = passkey))
-       assert not url_parts.query
-       url = urlunparse(url_parts._replace(query = qs))
-
-       # we handle HTTP errors ourself
-       opener = OpenerDirector()
-       for handler in [UnknownHandler(), HTTPHandler(), HTTPSHandler(), HTTPRedirectHandler()]:
-               opener.add_handler(handler)
-
-       with opener.open(url) as resp:
-               body = resp.read()
-
-       try:
-               data = bencode.decode(body)
-       except bencode.DecodeError:
-               data = body
-       else:
-               if resp.getcode() == 200 and b"files" in data:
-                       return OkResult(data)
-
-       return ErrorResult(resp.getcode(), data)
-
-
-if __name__ == "__main__":
-       PSQL_PARAMS = ["dbname", "user", "password", "host", "port"]
-
-       def tracker_name(val):
-               matches = [n for n in TRACKERS.keys() if n.lower() == val.lower()]
-               if matches:
-                       [name] = matches
-                       return name
-               else:
-                       raise ValueError()
-
-       def batch_size(val):
-               out = int(val)
-               if out > 0:
-                       return out
-               else:
-                       raise ValueError()
-
-       def delay(val):
-               out = int(val)
-               if out >= 0:
-                       return out
-               else:
-                       raise ValueError()
-
-       parser = argparse.ArgumentParser()
-       parser.add_argument(
-               "tracker",
-               choices = sorted(TRACKERS.keys()),
-               type = tracker_name,
-               help = "name of tracker to scrape",
-       )
-       parser.add_argument(
-               "batch_size",
-               type = batch_size,
-               help = "number of torrents per batch",
-       )
-       parser.add_argument(
-               "delay",
-               type = delay,
-               help = "delay between batches, in seconds",
-       )
-
-       for arg_name in PSQL_PARAMS:
-               parser.add_argument("--" + arg_name, nargs = "?")
-
-       args = parser.parse_args()
-       tracker = TRACKERS[args.tracker]
-       delay = dt.timedelta(seconds = args.delay)
-
-       params = {n: getattr(args, n) for n in PSQL_PARAMS if getattr(args, n)}
-       with contextlib.closing(psycopg2.connect(**params)) as conn:
-               conn.autocommit = True
-               db_wrapper = DbWrapper(conn)
-               passkey = db_wrapper.get_passkey(tracker.gazelle_id)
-               info_hashes = iter(db_wrapper.get_torrents(tracker.gazelle_id, args.batch_size))
-               batches = iter(lambda: list(itertools.islice(info_hashes, args.batch_size)), [])
-               for (i, batch) in enumerate(batches):
-                       if i != 0:
-                               time.sleep(delay.total_seconds())
-                       timestamp = dt.datetime.now(dt.timezone.utc)
-                       try:
-                               result = scrape_batch(tracker, batch, passkey)
-                       except URLError as e:
-                               print("[ERR] couldn't connect: {}".format(e), file = sys.stderr, flush = True)
-                       else:
-                               if isinstance(result, OkResult):
-                                       assert set(result.data[b"files"].keys()) <= set(batch), "unexpected torrent in respose"
-                                       infos = [
-                                               ScrapeInfo(
-                                                       info_hash,
-                                                       timestamp,
-                                                       complete = meta_dict[b"complete"],
-                                                       incomplete = meta_dict[b"incomplete"],
-                                                       downloaded = meta_dict[b"downloaded"],
-                                               )
-                                               for (info_hash, meta_dict) in result.data[b"files"].items()
-                                       ]
-                                       db_wrapper.insert_swarm_info(tracker.gazelle_id, infos)
-                                       print("[OK] finished batch ({} of {} torrents)".format(len(infos), len(batch)), file = sys.stderr, flush = True)
-                               elif isinstance(result, ErrorResult):
-                                       full_disp = result.body.decode("ascii", "ignore") if isinstance(result.body, bytes) else str(result.body)
-                                       clean_disp = re.sub(r"\s", " ", full_disp)
-                                       display_size = 100
-                                       disp = clean_disp if len(clean_disp) <= display_size else clean_disp[:display_size] + "…"
-                                       print("[ERR] tracker responded {}: {}".format(result.status, disp), file = sys.stderr, flush = True)
-                               else:
-                                       raise Exception()
diff --git a/src/disk_jumble/tests/__init__.py b/src/disk_jumble/tests/__init__.py
deleted file mode 100644 (file)
index e69de29..0000000
diff --git a/src/disk_jumble/tests/test_verify.py b/src/disk_jumble/tests/test_verify.py
deleted file mode 100644 (file)
index eea5007..0000000
+++ /dev/null
@@ -1,451 +0,0 @@
-"""
-Tests for the verification program `disk_jumble.verify'
-
-Like the verification program itself, these tests take database connection information from the environment.  The
-necessary schemas and tables are set up from scratch by the test code, so environment variables should point to a
-database that's not hosting a live instance of Disk Jumble.  Ideally, this is a completely empty local database created
-for the purposes of testing, but other options are possible.
-
-The tests need access to an SQL source file containing the definitions for the required tables and other Postgres
-objects; see `test_util/dump_db.py'.
-"""
-
-from dataclasses import dataclass
-from importlib import resources
-from random import Random
-from typing import Optional
-import hashlib
-import io
-import tempfile
-import unittest
-import uuid
-
-from psycopg2.extras import NumericRange
-import psycopg2
-import psycopg2.extras
-
-from disk_jumble import bencode
-from disk_jumble.nettle import Sha1Hasher
-from disk_jumble.verify import do_verify
-
-
-_BUF_SIZE = 16 * 1024 ** 2  # in bytes
-
-
-class Tests(unittest.TestCase):
-       _SCHEMAS = {"public", "diskjumble", "bittorrent"}
-
-       def _basic_fresh_verify_helper(self, read_size):
-               sector_size = 32
-               piece_size = 64
-
-               torrent_len = 3 * piece_size
-               disk = self._write_disk(sector_size, torrent_len // sector_size)
-               with _random_file(torrent_len, Random(0), on_disk = False) as torrent_file:
-                       torrent = _Torrent(torrent_file, piece_size)
-                       self._write_torrent(torrent)
-                       with self._conn.cursor() as cursor:
-                               cursor.execute(
-                                       "insert into diskjumble.slab values (default, %s, %s, %s, 0, null);",
-                                       (disk.id, NumericRange(0, disk.sector_count), torrent.info_hash)
-                               )
-
-                               do_verify(self._conn, disk.id, torrent_file, read_size, read_tries = 1)
-
-                               cursor.execute("select * from diskjumble.verify_pass;")
-                               self.assertEqual(cursor.rowcount, 1)
-                               (_, _, disk_id, sectors) = cursor.fetchone()
-                               self.assertEqual(disk_id, disk.id)
-                               self.assertEqual(sectors, NumericRange(0, torrent_len // sector_size))
-
-       def test_basic_fresh_verify_small_read_size(self):
-               self._basic_fresh_verify_helper(16)
-
-       def test_basic_fresh_verify_large_read_size(self):
-               self._basic_fresh_verify_helper(128)
-
-       def test_resume_fragmentation_unaligned_end(self):
-               """
-               Test a run where a cached hash state is used, a piece is split on disk, and the end of the torrent isn't
-               sector-aligned.
-               """
-               read_size = 8
-               piece_size = 64
-
-               other_disk = self._write_disk(16, 1)
-               disk = self._write_disk(32, 3)
-               with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
-                       torrent = _Torrent(torrent_file, piece_size)
-                       self._write_torrent(torrent)
-                       with self._conn.cursor() as cursor:
-                               cursor.executemany(
-                                       "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
-                                       [
-                                               (other_disk.id, NumericRange(0, 1), torrent.info_hash, 0),
-                                               (disk.id, NumericRange(0, 1), torrent.info_hash, other_disk.sector_size),
-                                               (disk.id, NumericRange(2, 3), torrent.info_hash, other_disk.sector_size + disk.sector_size),
-                                       ]
-                               )
-
-                               # Prepare the saved hasher state by running a verify
-                               do_verify(self._conn, other_disk.id, torrent_file, read_size, read_tries = 1)
-                               torrent_file.seek(0)
-
-                               cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
-                               [(row_count,)] = cursor.fetchall()
-                               self.assertEqual(row_count, 1)
-
-                               disk_file = io.BytesIO()
-                               torrent_file.seek(other_disk.sector_size)
-                               disk_file.write(torrent_file.read(disk.sector_size))
-                               disk_file.seek(disk_file.tell() + disk.sector_size)
-                               disk_file.write(torrent_file.read())
-                               disk_file.seek(0)
-                               do_verify(self._conn, disk.id, disk_file, read_size, read_tries = 1)
-
-                               # Check that there are no verify pieces in the database.  Because of integrity constraints, this also
-                               # guarantees there aren't any stray saved hasher states, failed verifies, or piece contents.
-                               cursor.execute("select count(*) from diskjumble.verify_piece;")
-                               [(row_count,)] = cursor.fetchall()
-                               self.assertEqual(row_count, 0)
-
-                               cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;")
-                               self.assertEqual(
-                                       cursor.fetchall(),
-                                       [(other_disk.id, NumericRange(0, 1)), (disk.id, NumericRange(0, 1)), (disk.id, NumericRange(2, 3))]
-                               )
-
-       def test_resume_no_completion(self):
-               """
-               Test a run where a saved hasher state is used and the target disk has subsequent entity data but not the full
-               remainder of the piece.
-               """
-               read_size = 7
-               piece_size = 64
-
-               other_disk = self._write_disk(16, 1)
-               disk = self._write_disk(32, 1)
-               with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
-                       torrent = _Torrent(torrent_file, piece_size)
-                       self._write_torrent(torrent)
-                       with self._conn.cursor() as cursor:
-                               cursor.executemany(
-                                       "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
-                                       [
-                                               (other_disk.id, NumericRange(0, 1), torrent.info_hash, 0),
-                                               (disk.id, NumericRange(0, 1), torrent.info_hash, other_disk.sector_size),
-                                       ]
-                               )
-
-                               do_verify(self._conn, other_disk.id, torrent_file, read_size, read_tries = 1)
-
-                               cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
-                               [(row_count,)] = cursor.fetchall()
-                               self.assertEqual(row_count, 1)
-
-                               disk_file = io.BytesIO()
-                               torrent_file.seek(other_disk.sector_size)
-                               disk_file.write(torrent_file.read(disk.sector_size))
-                               disk_file.seek(0)
-                               do_verify(self._conn, disk.id, disk_file, read_size, read_tries = 1)
-
-                               cursor.execute("select count(*) from diskjumble.verify_pass;")
-                               [(row_count,)] = cursor.fetchall()
-                               self.assertEqual(row_count, 0)
-
-                               cursor.execute("select entity_id, piece from diskjumble.verify_piece;")
-                               [(entity_id, piece_num)] = cursor.fetchall()
-                               self.assertEqual(bytes(entity_id), torrent.info_hash)
-                               self.assertEqual(piece_num, 0)
-
-                               cursor.execute("select disk_id, disk_sectors from diskjumble.verify_piece_content;")
-                               self.assertCountEqual(
-                                       cursor.fetchall(),
-                                       [(other_disk.id, NumericRange(0, 1)), (disk.id, NumericRange(0, 1))]
-                               )
-
-                               cursor.execute("select count(*) from diskjumble.verify_piece_fail;")
-                               [(row_count,)] = cursor.fetchall()
-                               self.assertEqual(row_count, 0)
-
-                               hasher = Sha1Hasher(None)
-                               torrent_file.seek(0)
-                               hasher.update(torrent_file.read(other_disk.sector_size + disk.sector_size))
-                               cursor.execute("select hasher_state from diskjumble.verify_piece_incomplete;")
-                               self.assertEqual(cursor.fetchall(), [(hasher.ctx.serialize(),)])
-
-       def test_ignore_hasher_beginning_on_disk(self):
-               """
-               Test a run where a saved hasher state is available for use but isn't used due to the beginning of the piece
-               being on disk.
-               """
-               piece_size = 64
-
-               other_disk = self._write_disk(16, 1)
-               disk = self._write_disk(16, 4)
-               with _random_file(piece_size * 2, Random(0), on_disk = False) as torrent_file:
-                       torrent = _Torrent(torrent_file, piece_size)
-                       self._write_torrent(torrent)
-                       with self._conn.cursor() as cursor:
-                               cursor.executemany(
-                                       "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
-                                       [
-                                               (other_disk.id, NumericRange(0, other_disk.sector_count), torrent.info_hash, piece_size),
-                                               (disk.id, NumericRange(0, disk.sector_count), torrent.info_hash, piece_size),
-                                       ]
-                               )
-
-                               do_verify(self._conn, other_disk.id, torrent_file, read_size = 128, read_tries = 1)
-
-                               cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
-                               [(row_count,)] = cursor.fetchall()
-                               self.assertEqual(row_count, 1)
-
-                               torrent_file.seek(piece_size)
-                               disk_file = io.BytesIO(torrent_file.read())
-                               do_verify(self._conn, disk.id, disk_file, read_size = 128, read_tries = 1)
-
-                               cursor.execute(
-                                       "select disk_id from diskjumble.verify_piece_content natural join diskjumble.verify_piece_incomplete;"
-                               )
-                               self.assertEqual(cursor.fetchall(), [(other_disk.id,)])
-
-                               cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;")
-                               self.assertEqual(cursor.fetchall(), [(disk.id, NumericRange(0, disk.sector_count))])
-
-       def test_ignore_hasher_unaligned(self):
-               """
-               Test a run where a saved hasher isn't used because its entity data offset isn't sector-aligned on the target
-               disk.
-
-                           0   16  32  48  64  80  96  112 128
-               pieces:     [-------------- 0 -------------]
-               other disk: [--][--][--][--][--]
-               disk:                       [------][------]
-               """
-               piece_size = 128
-
-               other_disk = self._write_disk(16, 5)
-               disk = self._write_disk(32, 2)
-               with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
-                       torrent = _Torrent(torrent_file, piece_size)
-                       self._write_torrent(torrent)
-                       with self._conn.cursor() as cursor:
-                               cursor.executemany(
-                                       "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
-                                       [
-                                               (other_disk.id, NumericRange(0, 5), torrent.info_hash, 0),
-                                               (disk.id, NumericRange(0, 2), torrent.info_hash, 64),
-                                       ]
-                               )
-
-                               do_verify(self._conn, other_disk.id, torrent_file, read_size = 16, read_tries = 1)
-                               cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
-                               [(row_count,)] = cursor.fetchall()
-                               self.assertEqual(row_count, 1)
-
-                               disk_file = io.BytesIO(torrent_file.getvalue()[64:])
-                               do_verify(self._conn, disk.id, disk_file, read_size = 16, read_tries = 1)
-
-                               cursor.execute("""
-                                       select disk_id, disk_sectors
-                                       from diskjumble.verify_piece_incomplete natural join diskjumble.verify_piece_content;
-                               """)
-                               self.assertEqual(
-                                       cursor.fetchall(),
-                                       [(other_disk.id, NumericRange(0, 5))]
-                               )
-
-                               cursor.execute("select count(*) from diskjumble.verify_pass;")
-                               [(row_count,)] = cursor.fetchall()
-                               self.assertEqual(row_count, 0)
-
-                               cursor.execute("select count(*) from diskjumble.verify_piece_fail;")
-                               [(row_count,)] = cursor.fetchall()
-                               self.assertEqual(row_count, 0)
-
-       def test_transient_read_errors(self):
-               """
-               Test a run where a read to the disk fails but fewer times than needed to mark the sector bad.
-               """
-               piece_size = 32
-
-               disk = self._write_disk(32, 1)
-               with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
-                       torrent = _Torrent(torrent_file, piece_size)
-                       self._write_torrent(torrent)
-                       with self._conn.cursor() as cursor:
-                               cursor.execute(
-                                       "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
-                                       (disk.id, NumericRange(0, 1), torrent.info_hash, 0)
-                               )
-
-                               disk_file = _ReadErrorProxy(torrent_file, error_pos = 12, error_count = 2)
-                               do_verify(self._conn, disk.id, disk_file, read_size = 4, read_tries = 3)
-
-                               self.assertEqual(disk_file.triggered, 2)
-
-                               cursor.execute("select count(*) from diskjumble.verify_piece;")
-                               [(row_count,)] = cursor.fetchall()
-                               self.assertEqual(row_count, 0)
-
-                               cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;")
-                               self.assertEqual(cursor.fetchall(), [(disk.id, NumericRange(0, 1))])
-
-       def test_persistent_read_errors(self):
-               """
-               Test a run where a disk read fails enough times to trigger the bad sector logic.
-               """
-               piece_size = 64
-
-               other_a = self._write_disk(16, 1)
-               other_b = self._write_disk(16, 2)
-               disk = self._write_disk(16, 1)
-               with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
-                       torrent = _Torrent(torrent_file, piece_size)
-                       self._write_torrent(torrent)
-                       with self._conn.cursor() as cursor:
-                               cursor.executemany(
-                                       "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
-                                       [
-                                               (other_a.id, NumericRange(0, 1), torrent.info_hash, 0),
-                                               (other_b.id, NumericRange(0, 2), torrent.info_hash, 16),
-                                               (disk.id, NumericRange(0, 1), torrent.info_hash, 48),
-                                       ]
-                               )
-
-                               do_verify(self._conn, other_a.id, torrent_file, read_size = 16, read_tries = 1)
-                               other_b_file = io.BytesIO(torrent_file.getvalue()[16:48])
-                               do_verify(self._conn, other_b.id, other_b_file, read_size = 16, read_tries = 1)
-
-                               cursor.execute("select verify_id from diskjumble.verify_piece;")
-                               [(verify_id,)] = cursor.fetchall()
-
-                               data = torrent_file.getvalue()[48:]
-                               disk_file = _ReadErrorProxy(io.BytesIO(data), error_pos = 5, error_count = None)
-                               do_verify(self._conn, disk.id, disk_file, read_size = 4, read_tries = 3)
-
-                               cursor.execute("select count(*) from diskjumble.verify_pass;")
-                               [(row_count,)] = cursor.fetchall()
-                               self.assertEqual(row_count, 0)
-
-                               cursor.execute("select disk_id, disk_sectors from diskjumble.verify_piece_content;")
-                               self.assertCountEqual(
-                                       cursor.fetchall(),
-                                       [(other_a.id, NumericRange(0, 1)), (other_b.id, NumericRange(0, 2)), (disk.id, NumericRange(0, 1))]
-                               )
-
-                               cursor.execute("select verify_id from diskjumble.verify_piece_fail;")
-                               self.assertEqual(cursor.fetchall(), [(verify_id,)])
-
-                               cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
-                               [(row_count,)] = cursor.fetchall()
-                               self.assertEqual(row_count, 0)
-
-       def _write_torrent(self, torrent: "_Torrent") -> None:
-               with self._conn.cursor() as cursor:
-                       cursor.execute("insert into bittorrent.torrent_info values (%s);", (torrent.info,))
-
-       def _write_disk(self, sector_size: int, sector_count: int) -> "_Disk":
-               with self._conn.cursor() as cursor:
-                       cursor.execute(
-                               "insert into diskjumble.disk values (default, %s, null, %s, %s, false) returning disk_id;",
-                               (uuid.uuid4(), sector_size, sector_count)
-                       )
-                       [(id_,)] = cursor.fetchall()
-               return _Disk(id_, sector_size, sector_count)
-
-       @classmethod
-       def setUpClass(cls):
-               psycopg2.extras.register_uuid()
-               prod_schema_sql = resources.files(__package__).joinpath("verify_setup.sql").read_text()
-               schema_sql = "\n".join(
-                       l for l in prod_schema_sql.splitlines()
-                       if (
-                               not l.startswith("SET")
-                               and not l.startswith("SELECT")  # ignore server settings
-                               and "OWNER TO" not in l  # and ownership changes
-                       )
-               )
-               cls._conn = psycopg2.connect("")
-               with cls._conn, cls._conn.cursor() as cursor:
-                       for name in cls._SCHEMAS:
-                               cursor.execute(f"create schema {name};")
-                       cursor.execute(schema_sql)
-
-       @classmethod
-       def tearDownClass(self):
-               try:
-                       with self._conn, self._conn.cursor() as cursor:
-                               for name in self._SCHEMAS:
-                                       cursor.execute(f"drop schema {name} cascade;")
-               finally:
-                       self._conn.close()
-
-       def tearDown(self):
-               self._conn.rollback()
-
-
-@dataclass
-class _Disk:
-       id: int
-       sector_size: int
-       sector_count: int
-
-
-class _Torrent:
-       def __init__(self, data: io.BufferedIOBase, piece_size: int) -> None:
-               data.seek(0)
-               hashes = []
-               while True:
-                       piece = data.read(piece_size)
-                       if piece:
-                               hashes.append(hashlib.sha1(piece).digest())
-                       else:
-                               break
-
-               info_dict = {
-                       b"piece length": piece_size,
-                       b"length": data.tell(),
-                       b"pieces": b"".join(hashes),
-               }
-               self.data = data
-               self.info = bencode.encode(info_dict)
-               self.info_hash = hashlib.sha1(self.info).digest()
-
-
-def _random_file(size: int, rand_src: Random, on_disk: bool) -> tempfile.NamedTemporaryFile:
-       f = tempfile.NamedTemporaryFile(buffering = _BUF_SIZE) if on_disk else io.BytesIO()
-       try:
-               while f.tell() < size:
-                       write_size = min(size - f.tell(), _BUF_SIZE)
-                       f.write(bytes(rand_src.getrandbits(8) for _ in range(write_size)))
-               f.seek(0)
-               return f
-       except Exception:
-               f.close()
-               raise
-
-
-@dataclass
-class _ReadErrorProxy(io.BufferedIOBase):
-       wrapped: io.BufferedIOBase
-       error_pos: int
-       error_count: Optional[int]
-
-       def __post_init__(self):
-               self.triggered = 0
-
-       def read(self, size: int = -1) -> bytes:
-               pre_pos = self.wrapped.tell()
-               data = self.wrapped.read(size)
-               erroring = self.error_count is None or self.triggered < self.error_count
-               in_range = 0 <= pre_pos - self.error_pos < len(data)
-               if erroring and in_range:
-                       self.triggered += 1
-                       raise OSError("simulated")
-               else:
-                       return data
-
-       def seek(self, *args, **kwargs) -> int:
-               return self.wrapped.seek(*args, **kwargs)
diff --git a/src/disk_jumble/tests/verify_setup.sql b/src/disk_jumble/tests/verify_setup.sql
deleted file mode 100644 (file)
index aa0c5fb..0000000
+++ /dev/null
@@ -1,383 +0,0 @@
-create extension btree_gist;
-CREATE OR REPLACE FUNCTION public.digest(bytea, text)
- RETURNS bytea
- LANGUAGE c
- IMMUTABLE PARALLEL SAFE STRICT
-AS '$libdir/pgcrypto', $function$pg_digest$function$
-;
---
--- PostgreSQL database dump
---
-
--- Dumped from database version 13.2 (Debian 13.2-1.pgdg100+1)
--- Dumped by pg_dump version 13.4 (Debian 13.4-0+deb11u1)
-
-SET statement_timeout = 0;
-SET lock_timeout = 0;
-SET idle_in_transaction_session_timeout = 0;
-SET client_encoding = 'UTF8';
-SET standard_conforming_strings = on;
-SELECT pg_catalog.set_config('search_path', '', false);
-SET check_function_bodies = false;
-SET xmloption = content;
-SET client_min_messages = warning;
-SET row_security = off;
-
-SET default_tablespace = '';
-
-SET default_table_access_method = heap;
-
---
--- Name: torrent_info; Type: TABLE; Schema: bittorrent; Owner: eros
---
-
-CREATE TABLE bittorrent.torrent_info (
-    info bytea NOT NULL
-);
-
-
-ALTER TABLE bittorrent.torrent_info OWNER TO eros;
-
---
--- Name: disk; Type: TABLE; Schema: diskjumble; Owner: eros
---
-
-CREATE TABLE diskjumble.disk (
-    disk_id integer NOT NULL,
-    dev_uuid uuid NOT NULL,
-    dev_serial text,
-    sector_size integer NOT NULL,
-    num_sectors bigint NOT NULL,
-    failed boolean DEFAULT false NOT NULL
-);
-
-
-ALTER TABLE diskjumble.disk OWNER TO eros;
-
---
--- Name: disk_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros
---
-
-CREATE SEQUENCE diskjumble.disk_id_seq
-    AS integer
-    START WITH 1
-    INCREMENT BY 1
-    NO MINVALUE
-    NO MAXVALUE
-    CACHE 1;
-
-
-ALTER TABLE diskjumble.disk_id_seq OWNER TO eros;
-
---
--- Name: disk_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros
---
-
-ALTER SEQUENCE diskjumble.disk_id_seq OWNED BY diskjumble.disk.disk_id;
-
-
---
--- Name: slab; Type: TABLE; Schema: diskjumble; Owner: eros
---
-
-CREATE TABLE diskjumble.slab (
-    slab_id integer NOT NULL,
-    disk_id integer NOT NULL,
-    disk_sectors int8range NOT NULL,
-    entity_id bytea NOT NULL,
-    entity_offset bigint NOT NULL,
-    crypt_key bytea
-);
-
-
-ALTER TABLE diskjumble.slab OWNER TO eros;
-
---
--- Name: slab_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros
---
-
-CREATE SEQUENCE diskjumble.slab_id_seq
-    START WITH 1
-    INCREMENT BY 1
-    NO MINVALUE
-    NO MAXVALUE
-    CACHE 1;
-
-
-ALTER TABLE diskjumble.slab_id_seq OWNER TO eros;
-
---
--- Name: slab_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros
---
-
-ALTER SEQUENCE diskjumble.slab_id_seq OWNED BY diskjumble.slab.slab_id;
-
-
---
--- Name: verify_pass; Type: TABLE; Schema: diskjumble; Owner: eros
---
-
-CREATE TABLE diskjumble.verify_pass (
-    verify_pass_id integer NOT NULL,
-    at timestamp with time zone,
-    disk_id integer NOT NULL,
-    disk_sectors int8range NOT NULL
-);
-
-
-ALTER TABLE diskjumble.verify_pass OWNER TO eros;
-
---
--- Name: verify_pass_verify_pass_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros
---
-
-CREATE SEQUENCE diskjumble.verify_pass_verify_pass_id_seq
-    AS integer
-    START WITH 1
-    INCREMENT BY 1
-    NO MINVALUE
-    NO MAXVALUE
-    CACHE 1;
-
-
-ALTER TABLE diskjumble.verify_pass_verify_pass_id_seq OWNER TO eros;
-
---
--- Name: verify_pass_verify_pass_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros
---
-
-ALTER SEQUENCE diskjumble.verify_pass_verify_pass_id_seq OWNED BY diskjumble.verify_pass.verify_pass_id;
-
-
---
--- Name: verify_piece; Type: TABLE; Schema: diskjumble; Owner: eros
---
-
-CREATE TABLE diskjumble.verify_piece (
-    verify_id integer NOT NULL,
-    at timestamp with time zone,
-    entity_id bytea NOT NULL,
-    piece integer NOT NULL
-);
-
-
-ALTER TABLE diskjumble.verify_piece OWNER TO eros;
-
---
--- Name: verify_piece_content; Type: TABLE; Schema: diskjumble; Owner: eros
---
-
-CREATE TABLE diskjumble.verify_piece_content (
-    verify_id integer NOT NULL,
-    seq integer NOT NULL,
-    disk_id integer NOT NULL,
-    disk_sectors int8range NOT NULL
-);
-
-
-ALTER TABLE diskjumble.verify_piece_content OWNER TO eros;
-
---
--- Name: verify_piece_fail; Type: TABLE; Schema: diskjumble; Owner: eros
---
-
-CREATE TABLE diskjumble.verify_piece_fail (
-    verify_id integer NOT NULL
-);
-
-
-ALTER TABLE diskjumble.verify_piece_fail OWNER TO eros;
-
---
--- Name: verify_piece_incomplete; Type: TABLE; Schema: diskjumble; Owner: eros
---
-
-CREATE TABLE diskjumble.verify_piece_incomplete (
-    verify_id integer NOT NULL,
-    hasher_state json
-);
-
-
-ALTER TABLE diskjumble.verify_piece_incomplete OWNER TO eros;
-
---
--- Name: verify_piece_verify_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros
---
-
-CREATE SEQUENCE diskjumble.verify_piece_verify_id_seq
-    AS integer
-    START WITH 1
-    INCREMENT BY 1
-    NO MINVALUE
-    NO MAXVALUE
-    CACHE 1;
-
-
-ALTER TABLE diskjumble.verify_piece_verify_id_seq OWNER TO eros;
-
---
--- Name: verify_piece_verify_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros
---
-
-ALTER SEQUENCE diskjumble.verify_piece_verify_id_seq OWNED BY diskjumble.verify_piece.verify_id;
-
-
---
--- Name: disk disk_id; Type: DEFAULT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.disk ALTER COLUMN disk_id SET DEFAULT nextval('diskjumble.disk_id_seq'::regclass);
-
-
---
--- Name: slab slab_id; Type: DEFAULT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.slab ALTER COLUMN slab_id SET DEFAULT nextval('diskjumble.slab_id_seq'::regclass);
-
-
---
--- Name: verify_pass verify_pass_id; Type: DEFAULT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_pass ALTER COLUMN verify_pass_id SET DEFAULT nextval('diskjumble.verify_pass_verify_pass_id_seq'::regclass);
-
-
---
--- Name: verify_piece verify_id; Type: DEFAULT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece ALTER COLUMN verify_id SET DEFAULT nextval('diskjumble.verify_piece_verify_id_seq'::regclass);
-
-
---
--- Name: disk disk_dev_uuid_key; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.disk
-    ADD CONSTRAINT disk_dev_uuid_key UNIQUE (dev_uuid);
-
-
---
--- Name: disk disk_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.disk
-    ADD CONSTRAINT disk_pkey PRIMARY KEY (disk_id);
-
-
---
--- Name: slab slab_disk_id_disk_sectors_excl; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.slab
-    ADD CONSTRAINT slab_disk_id_disk_sectors_excl EXCLUDE USING gist (disk_id WITH =, disk_sectors WITH &&);
-
-
---
--- Name: slab slab_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.slab
-    ADD CONSTRAINT slab_pkey PRIMARY KEY (slab_id);
-
-
---
--- Name: verify_pass verify_pass_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_pass
-    ADD CONSTRAINT verify_pass_pkey PRIMARY KEY (verify_pass_id);
-
-
---
--- Name: verify_piece_content verify_piece_content_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece_content
-    ADD CONSTRAINT verify_piece_content_pkey PRIMARY KEY (verify_id, seq);
-
-
---
--- Name: verify_piece_fail verify_piece_fail_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece_fail
-    ADD CONSTRAINT verify_piece_fail_pkey PRIMARY KEY (verify_id);
-
-
---
--- Name: verify_piece_incomplete verify_piece_incomplete_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece_incomplete
-    ADD CONSTRAINT verify_piece_incomplete_pkey PRIMARY KEY (verify_id);
-
-
---
--- Name: verify_piece verify_piece_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece
-    ADD CONSTRAINT verify_piece_pkey PRIMARY KEY (verify_id);
-
-
---
--- Name: torrent_info_digest_idx; Type: INDEX; Schema: bittorrent; Owner: eros
---
-
-CREATE UNIQUE INDEX torrent_info_digest_idx ON bittorrent.torrent_info USING btree (public.digest(info, 'sha1'::text));
-
-
---
--- Name: slab slab_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.slab
-    ADD CONSTRAINT slab_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id);
-
-
---
--- Name: verify_pass verify_pass_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_pass
-    ADD CONSTRAINT verify_pass_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id);
-
-
---
--- Name: verify_piece_content verify_piece_content_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece_content
-    ADD CONSTRAINT verify_piece_content_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id);
-
-
---
--- Name: verify_piece_content verify_piece_content_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece_content
-    ADD CONSTRAINT verify_piece_content_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id);
-
-
---
--- Name: verify_piece_fail verify_piece_fail_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece_fail
-    ADD CONSTRAINT verify_piece_fail_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id);
-
-
---
--- Name: verify_piece_incomplete verify_piece_incomplete_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece_incomplete
-    ADD CONSTRAINT verify_piece_incomplete_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id);
-
-
---
--- PostgreSQL database dump complete
---
-
diff --git a/src/disk_jumble/trackers.py b/src/disk_jumble/trackers.py
deleted file mode 100644 (file)
index ba65344..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-from typing import NamedTuple
-
-
-class Tracker(NamedTuple):
-       gazelle_id: int
-       scrape_url_format: str
-
-
-TRACKERS = {
-       "PTP": Tracker(1, "http://please.passthepopcorn.me:2710/{passkey}/scrape"),
-       "BTN": Tracker(3, "http://landof.tv/{passkey}/scrape"),
-       "Redacted": Tracker(4, "https://flacsfor.me/{passkey}/scrape"),
-}
diff --git a/src/disk_jumble/verify.py b/src/disk_jumble/verify.py
deleted file mode 100644 (file)
index 0adbb03..0000000
+++ /dev/null
@@ -1,237 +0,0 @@
-from __future__ import annotations
-from abc import ABCMeta, abstractmethod
-from dataclasses import dataclass
-from typing import Optional
-import argparse
-import contextlib
-import datetime as dt
-import io
-import itertools
-import math
-
-import psycopg2
-
-from disk_jumble import bencode
-from disk_jumble.db import HasherRef, Slab, Wrapper as DbWrapper
-from disk_jumble.nettle import Sha1Hasher
-
-
-_READ_BUFFER_SIZE = 16 * 1024 ** 2  # in bytes
-
-
-@dataclass
-class _SlabChunk:
-       """A slice of a slab; comprising all or part of a piece to be hashed."""
-       slab: Slab
-       slice: slice
-
-
-@dataclass
-class _PieceTask:
-       """The chunks needed to hash as fully as possible an entity piece."""
-       entity_id: bytes
-       piece_num: int
-       hasher_ref: Optional[HasherRef]
-       chunks: list[_SlabChunk]
-       complete: bool  # do these chunks complete the piece?
-
-
-class _BadSector(Exception):
-       pass
-
-
-def do_verify(conn, disk_id: int, disk_file: io.BufferedIOBase, read_size: int, read_tries: int) -> None:
-       db = DbWrapper(conn)
-       disk = db.get_disk(disk_id)
-
-       info_dicts = {
-               info_hash: bencode.decode(info)
-               for (info_hash, info) in db.get_torrent_info(disk_id)
-       }
-
-       tasks = []
-       slabs_and_hashers = db.get_slabs_and_hashers(disk_id)
-       for (entity_id, group) in itertools.groupby(slabs_and_hashers, lambda t: t[0].entity_id):
-               info = info_dicts[entity_id]
-               piece_len = info[b"piece length"]
-               assert piece_len % disk.sector_size == 0
-               if b"length" in info:
-                       torrent_len = info[b"length"]
-               else:
-                       torrent_len = sum(d[b"length"] for d in info[b"files"])
-
-               offset = None
-               use_hasher = None
-               chunks = []
-               for (slab, hasher_ref) in group:
-                       slab_end = min(slab.entity_offset + len(slab.sectors) * disk.sector_size, torrent_len)
-
-                       while offset is None or offset < slab_end:
-                               if offset is not None and slab.entity_offset > offset:
-                                       if chunks:
-                                               tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, False))
-                                       offset = None
-                                       use_hasher = None
-                                       chunks = []
-
-                               if offset is None:
-                                       aligned = math.ceil(slab.entity_offset / piece_len) * piece_len
-                                       if hasher_ref and hasher_ref.entity_offset < aligned:
-                                               assert hasher_ref.entity_offset < torrent_len
-                                               use_hasher = hasher_ref
-                                               offset = hasher_ref.entity_offset
-                                       elif aligned < slab_end:
-                                               offset = aligned
-                                       else:
-                                               break  # no usable data in this slab
-
-                               if offset is not None:
-                                       piece_end = min(offset + piece_len - offset % piece_len, torrent_len)
-                                       chunk_end = min(piece_end, slab_end)
-                                       chunks.append(_SlabChunk(slab, slice(offset - slab.entity_offset, chunk_end - slab.entity_offset)))
-                                       if chunk_end == piece_end:
-                                               tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, True))
-                                               use_hasher = None
-                                               chunks = []
-                                       offset = chunk_end
-
-               if chunks:
-                       tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, False))
-
-       @dataclass
-       class NewVerifyPiece:
-               entity_id: bytes
-               piece_num: int
-               sector_ranges: list[range]
-               hasher_state: Optional[dict]
-               failed: bool
-
-       @dataclass
-       class VerifyUpdate:
-               seq_start: int
-               new_sector_ranges: list[range]
-               hasher_state: Optional[dict]
-
-       passed_verifies = set()
-       failed_verifies = set()
-       new_pass_ranges = []
-       vp_updates = {}
-       new_vps = []
-
-       run_ts = dt.datetime.now(dt.timezone.utc)
-       for task in tasks:
-               hasher = Sha1Hasher(task.hasher_ref.state if task.hasher_ref else None)
-               sector_ranges = [
-                       range(
-                               chunk.slab.sectors.start + chunk.slice.start // disk.sector_size,
-                               chunk.slab.sectors.start + math.ceil(chunk.slice.stop / disk.sector_size)
-                       )
-                       for chunk in task.chunks
-               ]
-
-               try:
-                       for chunk in task.chunks:
-                               slab_off = chunk.slab.sectors.start * disk.sector_size
-                               disk_file.seek(slab_off + chunk.slice.start)
-                               end_pos = slab_off + chunk.slice.stop
-                               while disk_file.tell() < end_pos:
-                                       pos = disk_file.tell()
-                                       for _ in range(read_tries):
-                                               try:
-                                                       data = disk_file.read(min(end_pos - pos, read_size))
-                                               except OSError as e:
-                                                       disk_file.seek(pos)
-                                               else:
-                                                       break
-                                       else:
-                                               raise _BadSector()
-
-                                       assert data
-                                       hasher.update(data)
-               except _BadSector:
-                       if task.hasher_ref:
-                               failed_verifies.add(task.hasher_ref.id)
-                               vp_updates[task.hasher_ref.id] = VerifyUpdate(task.hasher_ref.seq + 1, sector_ranges, None)
-                       else:
-                               new_vps.append(NewVerifyPiece(task.entity_id, task.piece_num, sector_ranges, None, True))
-               else:
-                       hasher_state = hasher.ctx.serialize()
-                       if task.complete:
-                               s = slice(task.piece_num * 20, task.piece_num * 20 + 20)
-                               expected_hash = info_dicts[task.entity_id][b"pieces"][s]
-                               if hasher.digest() == expected_hash:
-                                       write_piece_data = False
-                                       new_pass_ranges.extend(sector_ranges)
-                                       if task.hasher_ref:
-                                               passed_verifies.add(task.hasher_ref.id)
-                               else:
-                                       failed = True
-                                       write_piece_data = True
-                                       if task.hasher_ref:
-                                               failed_verifies.add(task.hasher_ref.id)
-                       else:
-                               failed = False
-                               write_piece_data = True
-
-                       if write_piece_data:
-                               if task.hasher_ref:
-                                       assert task.hasher_ref.id not in vp_updates
-                                       vp_updates[task.hasher_ref.id] = VerifyUpdate(task.hasher_ref.seq + 1, sector_ranges, hasher_state)
-                               else:
-                                       new_vps.append(NewVerifyPiece(task.entity_id, task.piece_num, sector_ranges, hasher_state, failed))
-
-       new_pass_ranges.sort(key = lambda r: r.start)
-       merged_ranges = []
-       for r in new_pass_ranges:
-               if merged_ranges and r.start == merged_ranges[-1].stop:
-                       merged_ranges[-1] = range(merged_ranges[-1].start, r.stop)
-               else:
-                       merged_ranges.append(r)
-
-       for vp in new_vps:
-               verify_id = db.insert_verify_piece(run_ts, vp.entity_id, vp.piece_num)
-               db.insert_verify_piece_content(verify_id, 0, disk_id, vp.sector_ranges)
-               if vp.failed:
-                       db.mark_verify_piece_failed(verify_id)
-               else:
-                       db.upsert_hasher_state(verify_id, vp.hasher_state)
-
-       for (verify_id, update) in vp_updates.items():
-               db.insert_verify_piece_content(verify_id, update.seq_start, disk_id, update.new_sector_ranges)
-               if update.hasher_state:
-                       db.upsert_hasher_state(verify_id, update.hasher_state)
-
-       for verify_id in passed_verifies:
-               db.move_piece_content_for_pass(verify_id)
-               db.delete_verify_piece(verify_id)
-
-       for r in merged_ranges:
-               db.insert_pass_data(run_ts, disk_id, r)
-
-       for verify_id in failed_verifies:
-               db.clear_incomplete(verify_id)
-               db.mark_verify_piece_failed(verify_id)
-
-
-if __name__ == "__main__":
-       def read_tries(raw_arg):
-               val = int(raw_arg)
-               if val > 0:
-                       return val
-               else:
-                       raise ValueError()
-
-       parser = argparse.ArgumentParser()
-       parser.add_argument("disk_id", type = int)
-       parser.add_argument(
-               "read_tries",
-               type = read_tries,
-               help = "number of times to attempt a particular disk read before giving up on the sector",
-       )
-       args = parser.parse_args()
-
-       with contextlib.closing(psycopg2.connect("")) as conn:
-               conn.autocommit = True
-               path = f"/dev/mapper/diskjumble-{args.disk_id}"
-               with open(path, "rb", buffering = _READ_BUFFER_SIZE) as disk_file:
-                       do_verify(conn, args.disk_id, disk_file, _READ_BUFFER_SIZE, args.read_tries)
diff --git a/test_util/dump_db.py b/test_util/dump_db.py
deleted file mode 100644 (file)
index 9551bf9..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-"""
-Using the live database, dump creation code for extensions, tables, and functions needed for local testing
-
-For testing the verification script, write the output of this script to:
-
-       src/disk_jumble/tests/verify_setup.sql
-"""
-
-import contextlib
-import itertools
-import os
-import subprocess
-
-import psycopg2
-
-
-procedures = [
-       "public.digest(bytea, text)",
-]
-
-extensions = [
-       "btree_gist",
-]
-
-with contextlib.closing(psycopg2.connect("")) as conn:
-       conn.autocommit = True
-       with conn.cursor() as cursor:
-               for ext in extensions:
-                       print(f"create extension {ext};", flush = True)
-               for sig in procedures:
-                       cursor.execute("select pg_get_functiondef(to_regprocedure(%s));", (sig,))
-                       [(sql,)] = cursor.fetchall()
-                       print(sql + ";", flush = True)
-
-tables = [
-       "diskjumble.disk",
-       "diskjumble.slab",
-       "diskjumble.verify_pass",
-       "diskjumble.verify_piece",
-       "diskjumble.verify_piece_content",
-       "diskjumble.verify_piece_fail",
-       "diskjumble.verify_piece_incomplete",
-       "bittorrent.torrent_info",
-]
-argv = ["pg_dump", *itertools.chain.from_iterable(["-t", table] for table in tables), "--schema-only", os.environ["PGDATABASE"]]
-subprocess.run(argv, check = True)