--- /dev/null
+[build-system]
+requires = ["setuptools>=42", "wheel"]
+build-backend = "setuptools.build_meta"
--- /dev/null
+[metadata]
+name = disk_jumble
+version = 0.0.1
+
+[options]
+package_dir =
+ = src
+packages = disk_jumble
+python_requires ~= "3.7"
+install_requires =
+ psycopg2-binary ~= 2.8
--- /dev/null
+from setuptools import setup
+
+
+setup()
--- /dev/null
+from __future__ import annotations
+from typing import Union
+import itertools
+
+
+Bdict = dict[bytes, 'Type']
+Type = Union[bytes, int, list['Type'], Bdict]
+
+
+class CodecError(Exception):
+ pass
+
+
+def _pop_bytes(vals: list[bytes]) -> bytes:
+ len_parts = []
+ while vals and vals[0].isdigit():
+ len_parts.append(vals.pop(0))
+
+ try:
+ length = int(b"".join(len_parts).decode("ascii"))
+ except ValueError:
+ raise CodecError()
+
+ try:
+ if vals.pop(0) != b":":
+ raise CodecError()
+ except IndexError:
+ raise CodecError()
+
+ if length > len(vals):
+ raise CodecError()
+
+ out = b"".join(vals[:length])
+ del vals[:length]
+ return out
+
+
+def _pop_int(vals: list[bytes]) -> int:
+ assert vals.pop(0) == b"i"
+
+ try:
+ end = vals.index(b"e")
+ except ValueError:
+ raise CodecError()
+
+ try:
+ out = int(b"".join(vals[:end]).decode("ascii"))
+ except ValueError:
+ raise CodecError()
+
+ del vals[slice(end + 1)]
+ return out
+
+
+def _pop_list(vals: list[bytes]) -> list[Type]:
+ assert vals.pop(0) == b"l"
+
+ out = []
+ while vals and vals[0] != b"e":
+ out.append(_pop_value(vals))
+
+ if vals:
+ del vals[0]
+ return out
+ else:
+ raise CodecError()
+
+
+def _pop_dict(vals: list[bytes]) -> Bdict:
+ assert vals.pop(0) == b"d"
+
+ out = {}
+ while vals and vals[0] != b"e":
+ key = _pop_bytes(vals)
+ out[key] = _pop_value(vals)
+
+ if vals:
+ del vals[0]
+ return out
+ else:
+ raise CodecError()
+
+
+def _pop_value(vals: list[bytes]) -> Type:
+ if vals:
+ if vals[0].isdigit():
+ return _pop_bytes(vals)
+ elif vals[0] == b"i":
+ return _pop_int(vals)
+ elif vals[0] == b"l":
+ return _pop_list(vals)
+ elif vals[0] == b"d":
+ return _pop_dict(vals)
+ else:
+ raise CodecError()
+ else:
+ raise CodecError()
+
+
+def decode(data: bytes) -> Type:
+ vals = [bytes([v]) for v in data]
+ out = _pop_value(vals)
+ if vals:
+ raise CodecError()
+ else:
+ return out
+
+
+def _encode_helper(data: Type) -> list[bytes]:
+ if isinstance(data, bytes):
+ return [str(len(data)).encode("ascii"), b":", data]
+ elif isinstance(data, int):
+ return [b"i", str(data).encode("ascii"), b"e"]
+ elif isinstance(data, list):
+ return [b"l", *itertools.chain.from_iterable(map(_encode_helper, data)), b"e"]
+ elif isinstance(data, dict):
+ contents = itertools.chain.from_iterable(data.items())
+ return [b"d", *itertools.chain.from_iterable(map(_encode_helper, contents)), b"e"]
+ else:
+ raise CodecError()
+
+
+def encode(data: Type) -> bytes:
+ return b"".join(_encode_helper(data))
--- /dev/null
+from __future__ import annotations
+from dataclasses import dataclass
+from typing import Any, Iterable, Optional
+import datetime as dt
+import itertools
+
+from psycopg2.extras import execute_batch, Json, NumericRange
+
+
+@dataclass
+class Disk:
+ sector_size: int
+
+
+@dataclass
+class Slab:
+ id: int
+ disk_id: int
+ sectors: range
+ entity_id: bytes
+ entity_offset: int
+ crypt_key: bytes
+
+
+@dataclass
+class HasherRef:
+ id: int
+ seq: int
+ entity_offset: int
+ state: dict
+
+
+@dataclass
+class Wrapper:
+ conn: Any
+
+ def get_passkey(self, tracker_id: int) -> str:
+ with self.conn.cursor() as cursor:
+ cursor.execute("select passkey from gazelle.passkey where gazelle_tracker_id = %s;", (tracker_id,))
+ [(passkey,)] = cursor.fetchall()
+
+ return passkey
+
+ def get_torrents(self, tracker_id: int, batch_size: Optional[int] = None) -> Iterable[bytes]:
+ """Iterate the info hashes for the specified tracker which haven't been marked deleted."""
+
+ stmt = """
+ select infohash from gazelle.torrent
+ where gazelle_tracker_id = %s and not is_deleted
+ order by infohash asc
+ ;
+ """
+ with self.conn.cursor() as cursor:
+ if batch_size is not None:
+ cursor.itersize = batch_size
+
+ cursor.execute(stmt, (tracker_id,))
+ for row in cursor:
+ (info_hash_mem,) = row
+ info_hash = bytes(info_hash_mem)
+ assert len(info_hash) == 20
+ yield info_hash
+
+ def insert_swarm_info(self, tracker_id: int, infos: Iterable["disk_jumble.scrape.ScrapeInfo"]) -> None:
+ stmt = """
+ insert into gazelle.tracker_stat (gazelle_tracker_id, infohash, ts, complete, incomplete, downloaded)
+ values (%s, %s, %s, %s, %s, %s)
+ ;
+ """
+ with self.conn.cursor() as cursor:
+ param_sets = [
+ (tracker_id, i.info_hash, i.timestamp, i.complete, i.incomplete, i.downloaded)
+ for i in infos
+ ]
+ execute_batch(cursor, stmt, param_sets)
+
+ def get_disk(self, id_: int) -> Disk:
+ with self.conn.cursor() as cursor:
+ cursor.execute("select sector_size from diskjumble.disk where disk_id = %s;", (id_,))
+ [(sector_size,)] = cursor.fetchall()
+
+ return Disk(sector_size)
+
+ def get_slabs_and_hashers(self, disk_id: int) -> Iterable[tuple[Slab, Optional[HasherRef]]]:
+ """
+ Find slabs on the specified disk, and for each also return any lowest-offset saved hasher that left off directly
+ before or within the slab's entity data.
+ """
+
+ stmt = """
+ with
+ incomplete_edge as (
+ -- join up incomplete piece info and precompute where the hasher left off within the entity
+ select
+ verify_id, seq, slab.entity_id, hasher_state,
+ entity_offset + (upper(c.disk_sectors) - lower(slab.disk_sectors)) * sector_size as end_off
+ from
+ diskjumble.verify_piece_incomplete
+ natural left join diskjumble.verify_piece p
+ natural join diskjumble.verify_piece_content c
+ natural left join diskjumble.disk
+ left join diskjumble.slab on (
+ c.disk_id = slab.disk_id
+ and upper(c.disk_sectors) <@ int8range(lower(slab.disk_sectors), upper(slab.disk_sectors), '[]')
+ )
+ where seq >= all (select seq from diskjumble.verify_piece_content where verify_id = p.verify_id)
+ )
+ select
+ slab_id, disk_id, disk_sectors, slab.entity_id, entity_offset, crypt_key, verify_id, seq, end_off,
+ hasher_state
+ from
+ diskjumble.slab
+ natural left join diskjumble.disk
+ left join incomplete_edge on
+ incomplete_edge.entity_id = slab.entity_id
+ and incomplete_edge.end_off <@ int8range(
+ slab.entity_offset,
+ slab.entity_offset + (upper(disk_sectors) - lower(disk_sectors)) * sector_size
+ )
+ and (incomplete_edge.end_off - slab.entity_offset) %% sector_size = 0
+ where disk_id = %s
+ order by slab.entity_id, entity_offset, slab_id
+ ;
+ """
+ with self.conn.cursor() as cursor:
+ cursor.execute(stmt, (disk_id,))
+ for (_, rows_iter) in itertools.groupby(cursor, lambda r: r[0]):
+ rows = list(rows_iter)
+ [(slab_id, disk_id, sectors_pg, entity_id, entity_off, key)] = {r[:6] for r in rows}
+ sectors = range(sectors_pg.lower, sectors_pg.upper)
+ slab = Slab(slab_id, disk_id, sectors, bytes(entity_id), entity_off, key)
+
+ # `None' if no hasher match in outer join, otherwise earliest match
+ (*_, id_, seq, end_off, state) = min(rows, key = lambda r: r[-2])
+ hasher_ref = None if id_ is None else HasherRef(id_, seq, end_off, state)
+
+ yield (slab, hasher_ref)
+
+ def get_torrent_info(self, disk_id: int) -> Iterable[tuple[bytes, bytes]]:
+ stmt = """
+ with hashed as (
+ select digest(info, 'sha1') as info_hash, info
+ from bittorrent.torrent_info
+ )
+ select
+ distinct on (info_hash)
+ info_hash, info
+ from diskjumble.slab left outer join hashed on entity_id = info_hash
+ where disk_id = %s
+ ;
+ """
+ with self.conn.cursor() as cursor:
+ cursor.execute(stmt, (disk_id,))
+ for (info_hash, info) in cursor:
+ yield (bytes(info_hash), bytes(info))
+
+ def insert_verify_piece(self, ts: dt.datetime, entity_id: bytes, piece_num: int) -> int:
+ """Insert new verify piece, returning the ID of the inserted row."""
+
+ with self.conn.cursor() as cursor:
+ stmt = "insert into diskjumble.verify_piece values (default, %s, %s, %s) returning verify_id;"
+ cursor.execute(stmt, (ts, entity_id, piece_num))
+ [(row_id,)] = cursor.fetchall()
+ return row_id
+
+ def insert_verify_piece_content(self, verify_id: int, seq_start: int, disk_id: int, ranges: Iterable[range]) -> None:
+ with self.conn.cursor() as cursor:
+ execute_batch(
+ cursor,
+ "insert into diskjumble.verify_piece_content values (%s, %s, %s, %s);",
+ [
+ (verify_id, seq, disk_id, NumericRange(r.start, r.stop))
+ for (seq, r) in enumerate(ranges, start = seq_start)
+ ]
+ )
+
+ def mark_verify_piece_failed(self, verify_id: int) -> None:
+ with self.conn.cursor() as cursor:
+ cursor.execute("insert into diskjumble.verify_piece_fail values (%s);", (verify_id,))
+
+ def upsert_hasher_state(self, verify_id: int, state: dict) -> None:
+ stmt = """
+ insert into diskjumble.verify_piece_incomplete values (%s, %s)
+ on conflict (verify_id) do update set hasher_state = excluded.hasher_state
+ ;
+ """
+ with self.conn.cursor() as cursor:
+ cursor.execute(stmt, (verify_id, Json(state)))
+
+ def delete_verify_piece(self, verify_id: int) -> None:
+ with self.conn.cursor() as cursor:
+ cursor.execute("delete from diskjumble.verify_piece_incomplete where verify_id = %s;", (verify_id,))
+ cursor.execute("delete from diskjumble.verify_piece_content where verify_id = %s;", (verify_id,))
+ cursor.execute("delete from diskjumble.verify_piece where verify_id = %s", (verify_id,))
+
+ def move_piece_content_for_pass(self, verify_id: int) -> None:
+ stmt = """
+ with content_out as (
+ delete from diskjumble.verify_piece_content c
+ using diskjumble.verify_piece p
+ where (
+ c.verify_id = p.verify_id
+ and p.verify_id = %s
+ )
+ returning at, disk_id, disk_sectors
+ )
+ insert into diskjumble.verify_pass (at, disk_id, disk_sectors)
+ select at, disk_id, disk_sectors from content_out
+ ;
+ """
+ with self.conn.cursor() as cursor:
+ cursor.execute(stmt, (verify_id,))
+
+ def insert_pass_data(self, ts: dt.datetime, disk_id: int, sectors: range) -> None:
+ with self.conn.cursor() as cursor:
+ cursor.execute(
+ "insert into diskjumble.verify_pass values (default, %s, %s, %s);",
+ (ts, disk_id, NumericRange(sectors.start, sectors.stop))
+ )
+
+ def clear_incomplete(self, verify_id: int) -> None:
+ with self.conn.cursor() as cursor:
+ cursor.execute("delete from diskjumble.verify_piece_incomplete where verify_id = %s;", (verify_id,))
--- /dev/null
+"""Python wrappers for some of GnuTLS Nettle."""
+
+from ctypes.util import find_library
+from typing import Optional
+import ctypes
+
+
+_LIB = ctypes.CDLL(find_library("nettle"))
+
+
+class _Sha1Defs:
+ _DIGEST_SIZE = 20 # in bytes
+ _BLOCK_SIZE = 64 # in bytes
+ _DIGEST_LENGTH = 5
+
+ _StateArr = ctypes.c_uint32 * _DIGEST_LENGTH
+
+ _BlockArr = ctypes.c_uint8 * _BLOCK_SIZE
+
+
+class Sha1Hasher(_Sha1Defs):
+ class Context(ctypes.Structure):
+ _fields_ = [
+ ("state", _Sha1Defs._StateArr),
+ ("count", ctypes.c_uint64),
+ ("index", ctypes.c_uint),
+ ("block", _Sha1Defs._BlockArr),
+ ]
+
+ @classmethod
+ def deserialize(cls, data):
+ return cls(
+ _Sha1Defs._StateArr(*data["state"]),
+ data["count"],
+ data["index"],
+ _Sha1Defs._BlockArr(*data["block"]),
+ )
+
+ def serialize(self):
+ return {
+ "state": list(self.state),
+ "count": self.count,
+ "index": self.index,
+ "block": list(self.block),
+ }
+
+ @classmethod
+ def _new_context(cls):
+ ctx = cls.Context()
+ _LIB.nettle_sha1_init(ctypes.byref(ctx))
+ return ctx
+
+ def __init__(self, ctx_dict: Optional[dict]):
+ if ctx_dict:
+ self.ctx = self.Context.deserialize(ctx_dict)
+ else:
+ self.ctx = self._new_context()
+
+ def update(self, data):
+ _LIB.nettle_sha1_update(ctypes.byref(self.ctx), len(data), data)
+
+ def digest(self):
+ """Return the current digest and reset the hasher state."""
+ out = (ctypes.c_uint8 * self._DIGEST_SIZE)()
+ _LIB.nettle_sha1_digest(ctypes.byref(self.ctx), self._DIGEST_SIZE, out)
+ return bytes(out)
--- /dev/null
+from dataclasses import dataclass
+from typing import Iterable, Union
+from urllib.error import URLError
+from urllib.parse import urlencode, urlparse, urlunparse
+from urllib.request import HTTPHandler, HTTPRedirectHandler, HTTPSHandler, OpenerDirector, UnknownHandler
+import argparse
+import contextlib
+import datetime as dt
+import itertools
+import re
+import sys
+import time
+
+import psycopg2
+
+from disk_jumble import bencode
+from disk_jumble.db import Wrapper as DbWrapper
+from disk_jumble.trackers import Tracker, TRACKERS
+
+
+class Result:
+ pass
+
+
+@dataclass
+class OkResult(Result):
+ data: bencode.Type
+
+
+@dataclass
+class ErrorResult(Result):
+ status: int
+ body: Union[bencode.Type, bytes]
+
+
+@dataclass
+class ScrapeInfo:
+ info_hash: bytes
+ timestamp: dt.datetime
+ complete: int
+ incomplete: int
+ downloaded: int
+
+
+def scrape_batch(tracker: Tracker, info_hashes: Iterable[bytes], passkey: str) -> Result:
+ qs = urlencode({"info_hash": list(info_hashes)}, doseq = True)
+ url_parts = urlparse(tracker.scrape_url_format.format(passkey = passkey))
+ assert not url_parts.query
+ url = urlunparse(url_parts._replace(query = qs))
+
+ # we handle HTTP errors ourself
+ opener = OpenerDirector()
+ for handler in [UnknownHandler(), HTTPHandler(), HTTPSHandler(), HTTPRedirectHandler()]:
+ opener.add_handler(handler)
+
+ with opener.open(url) as resp:
+ body = resp.read()
+
+ try:
+ data = bencode.decode(body)
+ except bencode.DecodeError:
+ data = body
+ else:
+ if resp.getcode() == 200 and b"files" in data:
+ return OkResult(data)
+
+ return ErrorResult(resp.getcode(), data)
+
+
+if __name__ == "__main__":
+ PSQL_PARAMS = ["dbname", "user", "password", "host", "port"]
+
+ def tracker_name(val):
+ matches = [n for n in TRACKERS.keys() if n.lower() == val.lower()]
+ if matches:
+ [name] = matches
+ return name
+ else:
+ raise ValueError()
+
+ def batch_size(val):
+ out = int(val)
+ if out > 0:
+ return out
+ else:
+ raise ValueError()
+
+ def delay(val):
+ out = int(val)
+ if out >= 0:
+ return out
+ else:
+ raise ValueError()
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "tracker",
+ choices = sorted(TRACKERS.keys()),
+ type = tracker_name,
+ help = "name of tracker to scrape",
+ )
+ parser.add_argument(
+ "batch_size",
+ type = batch_size,
+ help = "number of torrents per batch",
+ )
+ parser.add_argument(
+ "delay",
+ type = delay,
+ help = "delay between batches, in seconds",
+ )
+
+ for arg_name in PSQL_PARAMS:
+ parser.add_argument("--" + arg_name, nargs = "?")
+
+ args = parser.parse_args()
+ tracker = TRACKERS[args.tracker]
+ delay = dt.timedelta(seconds = args.delay)
+
+ params = {n: getattr(args, n) for n in PSQL_PARAMS if getattr(args, n)}
+ with contextlib.closing(psycopg2.connect(**params)) as conn:
+ conn.autocommit = True
+ db_wrapper = DbWrapper(conn)
+ passkey = db_wrapper.get_passkey(tracker.gazelle_id)
+ info_hashes = iter(db_wrapper.get_torrents(tracker.gazelle_id, args.batch_size))
+ batches = iter(lambda: list(itertools.islice(info_hashes, args.batch_size)), [])
+ for (i, batch) in enumerate(batches):
+ if i != 0:
+ time.sleep(delay.total_seconds())
+ timestamp = dt.datetime.now(dt.timezone.utc)
+ try:
+ result = scrape_batch(tracker, batch, passkey)
+ except URLError as e:
+ print("[ERR] couldn't connect: {}".format(e), file = sys.stderr, flush = True)
+ else:
+ if isinstance(result, OkResult):
+ assert set(result.data[b"files"].keys()) <= set(batch), "unexpected torrent in respose"
+ infos = [
+ ScrapeInfo(
+ info_hash,
+ timestamp,
+ complete = meta_dict[b"complete"],
+ incomplete = meta_dict[b"incomplete"],
+ downloaded = meta_dict[b"downloaded"],
+ )
+ for (info_hash, meta_dict) in result.data[b"files"].items()
+ ]
+ db_wrapper.insert_swarm_info(tracker.gazelle_id, infos)
+ print("[OK] finished batch ({} of {} torrents)".format(len(infos), len(batch)), file = sys.stderr, flush = True)
+ elif isinstance(result, ErrorResult):
+ full_disp = result.body.decode("ascii", "ignore") if isinstance(result.body, bytes) else str(result.body)
+ clean_disp = re.sub(r"\s", " ", full_disp)
+ display_size = 100
+ disp = clean_disp if len(clean_disp) <= display_size else clean_disp[:display_size] + "…"
+ print("[ERR] tracker responded {}: {}".format(result.status, disp), file = sys.stderr, flush = True)
+ else:
+ raise Exception()
--- /dev/null
+"""
+Tests for the verification program `disk_jumble.verify'
+
+Like the verification program itself, these tests take database connection information from the environment. The
+necessary schemas and tables are set up from scratch by the test code, so environment variables should point to a
+database that's not hosting a live instance of Disk Jumble. Ideally, this is a completely empty local database created
+for the purposes of testing, but other options are possible.
+
+The tests need access to an SQL source file containing the definitions for the required tables and other Postgres
+objects; see `test_util/dump_db.py'.
+"""
+
+from dataclasses import dataclass
+from importlib import resources
+from random import Random
+from typing import Optional
+import hashlib
+import io
+import tempfile
+import unittest
+import uuid
+
+from psycopg2.extras import NumericRange
+import psycopg2
+import psycopg2.extras
+
+from disk_jumble import bencode
+from disk_jumble.nettle import Sha1Hasher
+from disk_jumble.verify import do_verify
+
+
+_BUF_SIZE = 16 * 1024 ** 2 # in bytes
+
+
+class Tests(unittest.TestCase):
+ _SCHEMAS = {"public", "diskjumble", "bittorrent"}
+
+ def _basic_fresh_verify_helper(self, read_size):
+ sector_size = 32
+ piece_size = 64
+
+ torrent_len = 3 * piece_size
+ disk = self._write_disk(sector_size, torrent_len // sector_size)
+ with _random_file(torrent_len, Random(0), on_disk = False) as torrent_file:
+ torrent = _Torrent(torrent_file, piece_size)
+ self._write_torrent(torrent)
+ with self._conn.cursor() as cursor:
+ cursor.execute(
+ "insert into diskjumble.slab values (default, %s, %s, %s, 0, null);",
+ (disk.id, NumericRange(0, disk.sector_count), torrent.info_hash)
+ )
+
+ do_verify(self._conn, disk.id, torrent_file, read_size, read_tries = 1)
+
+ cursor.execute("select * from diskjumble.verify_pass;")
+ self.assertEqual(cursor.rowcount, 1)
+ (_, _, disk_id, sectors) = cursor.fetchone()
+ self.assertEqual(disk_id, disk.id)
+ self.assertEqual(sectors, NumericRange(0, torrent_len // sector_size))
+
+ def test_basic_fresh_verify_small_read_size(self):
+ self._basic_fresh_verify_helper(16)
+
+ def test_basic_fresh_verify_large_read_size(self):
+ self._basic_fresh_verify_helper(128)
+
+ def test_resume_fragmentation_unaligned_end(self):
+ """
+ Test a run where a cached hash state is used, a piece is split on disk, and the end of the torrent isn't
+ sector-aligned.
+ """
+ read_size = 8
+ piece_size = 64
+
+ other_disk = self._write_disk(16, 1)
+ disk = self._write_disk(32, 3)
+ with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
+ torrent = _Torrent(torrent_file, piece_size)
+ self._write_torrent(torrent)
+ with self._conn.cursor() as cursor:
+ cursor.executemany(
+ "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
+ [
+ (other_disk.id, NumericRange(0, 1), torrent.info_hash, 0),
+ (disk.id, NumericRange(0, 1), torrent.info_hash, other_disk.sector_size),
+ (disk.id, NumericRange(2, 3), torrent.info_hash, other_disk.sector_size + disk.sector_size),
+ ]
+ )
+
+ # Prepare the saved hasher state by running a verify
+ do_verify(self._conn, other_disk.id, torrent_file, read_size, read_tries = 1)
+ torrent_file.seek(0)
+
+ cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
+ [(row_count,)] = cursor.fetchall()
+ self.assertEqual(row_count, 1)
+
+ disk_file = io.BytesIO()
+ torrent_file.seek(other_disk.sector_size)
+ disk_file.write(torrent_file.read(disk.sector_size))
+ disk_file.seek(disk_file.tell() + disk.sector_size)
+ disk_file.write(torrent_file.read())
+ disk_file.seek(0)
+ do_verify(self._conn, disk.id, disk_file, read_size, read_tries = 1)
+
+ # Check that there are no verify pieces in the database. Because of integrity constraints, this also
+ # guarantees there aren't any stray saved hasher states, failed verifies, or piece contents.
+ cursor.execute("select count(*) from diskjumble.verify_piece;")
+ [(row_count,)] = cursor.fetchall()
+ self.assertEqual(row_count, 0)
+
+ cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;")
+ self.assertEqual(
+ cursor.fetchall(),
+ [(other_disk.id, NumericRange(0, 1)), (disk.id, NumericRange(0, 1)), (disk.id, NumericRange(2, 3))]
+ )
+
+ def test_resume_no_completion(self):
+ """
+ Test a run where a saved hasher state is used and the target disk has subsequent entity data but not the full
+ remainder of the piece.
+ """
+ read_size = 7
+ piece_size = 64
+
+ other_disk = self._write_disk(16, 1)
+ disk = self._write_disk(32, 1)
+ with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
+ torrent = _Torrent(torrent_file, piece_size)
+ self._write_torrent(torrent)
+ with self._conn.cursor() as cursor:
+ cursor.executemany(
+ "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
+ [
+ (other_disk.id, NumericRange(0, 1), torrent.info_hash, 0),
+ (disk.id, NumericRange(0, 1), torrent.info_hash, other_disk.sector_size),
+ ]
+ )
+
+ do_verify(self._conn, other_disk.id, torrent_file, read_size, read_tries = 1)
+
+ cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
+ [(row_count,)] = cursor.fetchall()
+ self.assertEqual(row_count, 1)
+
+ disk_file = io.BytesIO()
+ torrent_file.seek(other_disk.sector_size)
+ disk_file.write(torrent_file.read(disk.sector_size))
+ disk_file.seek(0)
+ do_verify(self._conn, disk.id, disk_file, read_size, read_tries = 1)
+
+ cursor.execute("select count(*) from diskjumble.verify_pass;")
+ [(row_count,)] = cursor.fetchall()
+ self.assertEqual(row_count, 0)
+
+ cursor.execute("select entity_id, piece from diskjumble.verify_piece;")
+ [(entity_id, piece_num)] = cursor.fetchall()
+ self.assertEqual(bytes(entity_id), torrent.info_hash)
+ self.assertEqual(piece_num, 0)
+
+ cursor.execute("select disk_id, disk_sectors from diskjumble.verify_piece_content;")
+ self.assertCountEqual(
+ cursor.fetchall(),
+ [(other_disk.id, NumericRange(0, 1)), (disk.id, NumericRange(0, 1))]
+ )
+
+ cursor.execute("select count(*) from diskjumble.verify_piece_fail;")
+ [(row_count,)] = cursor.fetchall()
+ self.assertEqual(row_count, 0)
+
+ hasher = Sha1Hasher(None)
+ torrent_file.seek(0)
+ hasher.update(torrent_file.read(other_disk.sector_size + disk.sector_size))
+ cursor.execute("select hasher_state from diskjumble.verify_piece_incomplete;")
+ self.assertEqual(cursor.fetchall(), [(hasher.ctx.serialize(),)])
+
+ def test_ignore_hasher_beginning_on_disk(self):
+ """
+ Test a run where a saved hasher state is available for use but isn't used due to the beginning of the piece
+ being on disk.
+ """
+ piece_size = 64
+
+ other_disk = self._write_disk(16, 1)
+ disk = self._write_disk(16, 4)
+ with _random_file(piece_size * 2, Random(0), on_disk = False) as torrent_file:
+ torrent = _Torrent(torrent_file, piece_size)
+ self._write_torrent(torrent)
+ with self._conn.cursor() as cursor:
+ cursor.executemany(
+ "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
+ [
+ (other_disk.id, NumericRange(0, other_disk.sector_count), torrent.info_hash, piece_size),
+ (disk.id, NumericRange(0, disk.sector_count), torrent.info_hash, piece_size),
+ ]
+ )
+
+ do_verify(self._conn, other_disk.id, torrent_file, read_size = 128, read_tries = 1)
+
+ cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
+ [(row_count,)] = cursor.fetchall()
+ self.assertEqual(row_count, 1)
+
+ torrent_file.seek(piece_size)
+ disk_file = io.BytesIO(torrent_file.read())
+ do_verify(self._conn, disk.id, disk_file, read_size = 128, read_tries = 1)
+
+ cursor.execute(
+ "select disk_id from diskjumble.verify_piece_content natural join diskjumble.verify_piece_incomplete;"
+ )
+ self.assertEqual(cursor.fetchall(), [(other_disk.id,)])
+
+ cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;")
+ self.assertEqual(cursor.fetchall(), [(disk.id, NumericRange(0, disk.sector_count))])
+
+ def test_ignore_hasher_unaligned(self):
+ """
+ Test a run where a saved hasher isn't used because its entity data offset isn't sector-aligned on the target
+ disk.
+
+ 0 16 32 48 64 80 96 112 128
+ pieces: [-------------- 0 -------------]
+ other disk: [--][--][--][--][--]
+ disk: [------][------]
+ """
+ piece_size = 128
+
+ other_disk = self._write_disk(16, 5)
+ disk = self._write_disk(32, 2)
+ with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
+ torrent = _Torrent(torrent_file, piece_size)
+ self._write_torrent(torrent)
+ with self._conn.cursor() as cursor:
+ cursor.executemany(
+ "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
+ [
+ (other_disk.id, NumericRange(0, 5), torrent.info_hash, 0),
+ (disk.id, NumericRange(0, 2), torrent.info_hash, 64),
+ ]
+ )
+
+ do_verify(self._conn, other_disk.id, torrent_file, read_size = 16, read_tries = 1)
+ cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
+ [(row_count,)] = cursor.fetchall()
+ self.assertEqual(row_count, 1)
+
+ disk_file = io.BytesIO(torrent_file.getvalue()[64:])
+ do_verify(self._conn, disk.id, disk_file, read_size = 16, read_tries = 1)
+
+ cursor.execute("""
+ select disk_id, disk_sectors
+ from diskjumble.verify_piece_incomplete natural join diskjumble.verify_piece_content;
+ """)
+ self.assertEqual(
+ cursor.fetchall(),
+ [(other_disk.id, NumericRange(0, 5))]
+ )
+
+ cursor.execute("select count(*) from diskjumble.verify_pass;")
+ [(row_count,)] = cursor.fetchall()
+ self.assertEqual(row_count, 0)
+
+ cursor.execute("select count(*) from diskjumble.verify_piece_fail;")
+ [(row_count,)] = cursor.fetchall()
+ self.assertEqual(row_count, 0)
+
+ def test_transient_read_errors(self):
+ """
+ Test a run where a read to the disk fails but fewer times than needed to mark the sector bad.
+ """
+ piece_size = 32
+
+ disk = self._write_disk(32, 1)
+ with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
+ torrent = _Torrent(torrent_file, piece_size)
+ self._write_torrent(torrent)
+ with self._conn.cursor() as cursor:
+ cursor.execute(
+ "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
+ (disk.id, NumericRange(0, 1), torrent.info_hash, 0)
+ )
+
+ disk_file = _ReadErrorProxy(torrent_file, error_pos = 12, error_count = 2)
+ do_verify(self._conn, disk.id, disk_file, read_size = 4, read_tries = 3)
+
+ self.assertEqual(disk_file.triggered, 2)
+
+ cursor.execute("select count(*) from diskjumble.verify_piece;")
+ [(row_count,)] = cursor.fetchall()
+ self.assertEqual(row_count, 0)
+
+ cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;")
+ self.assertEqual(cursor.fetchall(), [(disk.id, NumericRange(0, 1))])
+
+ def test_persistent_read_errors(self):
+ """
+ Test a run where a disk read fails enough times to trigger the bad sector logic.
+ """
+ piece_size = 64
+
+ other_a = self._write_disk(16, 1)
+ other_b = self._write_disk(16, 2)
+ disk = self._write_disk(16, 1)
+ with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
+ torrent = _Torrent(torrent_file, piece_size)
+ self._write_torrent(torrent)
+ with self._conn.cursor() as cursor:
+ cursor.executemany(
+ "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
+ [
+ (other_a.id, NumericRange(0, 1), torrent.info_hash, 0),
+ (other_b.id, NumericRange(0, 2), torrent.info_hash, 16),
+ (disk.id, NumericRange(0, 1), torrent.info_hash, 48),
+ ]
+ )
+
+ do_verify(self._conn, other_a.id, torrent_file, read_size = 16, read_tries = 1)
+ other_b_file = io.BytesIO(torrent_file.getvalue()[16:48])
+ do_verify(self._conn, other_b.id, other_b_file, read_size = 16, read_tries = 1)
+
+ cursor.execute("select verify_id from diskjumble.verify_piece;")
+ [(verify_id,)] = cursor.fetchall()
+
+ data = torrent_file.getvalue()[48:]
+ disk_file = _ReadErrorProxy(io.BytesIO(data), error_pos = 5, error_count = None)
+ do_verify(self._conn, disk.id, disk_file, read_size = 4, read_tries = 3)
+
+ cursor.execute("select count(*) from diskjumble.verify_pass;")
+ [(row_count,)] = cursor.fetchall()
+ self.assertEqual(row_count, 0)
+
+ cursor.execute("select disk_id, disk_sectors from diskjumble.verify_piece_content;")
+ self.assertCountEqual(
+ cursor.fetchall(),
+ [(other_a.id, NumericRange(0, 1)), (other_b.id, NumericRange(0, 2)), (disk.id, NumericRange(0, 1))]
+ )
+
+ cursor.execute("select verify_id from diskjumble.verify_piece_fail;")
+ self.assertEqual(cursor.fetchall(), [(verify_id,)])
+
+ cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
+ [(row_count,)] = cursor.fetchall()
+ self.assertEqual(row_count, 0)
+
+ def _write_torrent(self, torrent: "_Torrent") -> None:
+ with self._conn.cursor() as cursor:
+ cursor.execute("insert into bittorrent.torrent_info values (%s);", (torrent.info,))
+
+ def _write_disk(self, sector_size: int, sector_count: int) -> "_Disk":
+ with self._conn.cursor() as cursor:
+ cursor.execute(
+ "insert into diskjumble.disk values (default, %s, null, %s, %s, false) returning disk_id;",
+ (uuid.uuid4(), sector_size, sector_count)
+ )
+ [(id_,)] = cursor.fetchall()
+ return _Disk(id_, sector_size, sector_count)
+
+ @classmethod
+ def setUpClass(cls):
+ psycopg2.extras.register_uuid()
+ prod_schema_sql = resources.files(__package__).joinpath("verify_setup.sql").read_text()
+ schema_sql = "\n".join(
+ l for l in prod_schema_sql.splitlines()
+ if (
+ not l.startswith("SET")
+ and not l.startswith("SELECT") # ignore server settings
+ and "OWNER TO" not in l # and ownership changes
+ )
+ )
+ cls._conn = psycopg2.connect("")
+ with cls._conn, cls._conn.cursor() as cursor:
+ for name in cls._SCHEMAS:
+ cursor.execute(f"create schema {name};")
+ cursor.execute(schema_sql)
+
+ @classmethod
+ def tearDownClass(self):
+ try:
+ with self._conn, self._conn.cursor() as cursor:
+ for name in self._SCHEMAS:
+ cursor.execute(f"drop schema {name} cascade;")
+ finally:
+ self._conn.close()
+
+ def tearDown(self):
+ self._conn.rollback()
+
+
+@dataclass
+class _Disk:
+ id: int
+ sector_size: int
+ sector_count: int
+
+
+class _Torrent:
+ def __init__(self, data: io.BufferedIOBase, piece_size: int) -> None:
+ data.seek(0)
+ hashes = []
+ while True:
+ piece = data.read(piece_size)
+ if piece:
+ hashes.append(hashlib.sha1(piece).digest())
+ else:
+ break
+
+ info_dict = {
+ b"piece length": piece_size,
+ b"length": data.tell(),
+ b"pieces": b"".join(hashes),
+ }
+ self.data = data
+ self.info = bencode.encode(info_dict)
+ self.info_hash = hashlib.sha1(self.info).digest()
+
+
+def _random_file(size: int, rand_src: Random, on_disk: bool) -> tempfile.NamedTemporaryFile:
+ f = tempfile.NamedTemporaryFile(buffering = _BUF_SIZE) if on_disk else io.BytesIO()
+ try:
+ while f.tell() < size:
+ write_size = min(size - f.tell(), _BUF_SIZE)
+ f.write(bytes(rand_src.getrandbits(8) for _ in range(write_size)))
+ f.seek(0)
+ return f
+ except Exception:
+ f.close()
+ raise
+
+
+@dataclass
+class _ReadErrorProxy(io.BufferedIOBase):
+ wrapped: io.BufferedIOBase
+ error_pos: int
+ error_count: Optional[int]
+
+ def __post_init__(self):
+ self.triggered = 0
+
+ def read(self, size: int = -1) -> bytes:
+ pre_pos = self.wrapped.tell()
+ data = self.wrapped.read(size)
+ erroring = self.error_count is None or self.triggered < self.error_count
+ in_range = 0 <= pre_pos - self.error_pos < len(data)
+ if erroring and in_range:
+ self.triggered += 1
+ raise OSError("simulated")
+ else:
+ return data
+
+ def seek(self, *args, **kwargs) -> int:
+ return self.wrapped.seek(*args, **kwargs)
--- /dev/null
+create extension btree_gist;
+CREATE OR REPLACE FUNCTION public.digest(bytea, text)
+ RETURNS bytea
+ LANGUAGE c
+ IMMUTABLE PARALLEL SAFE STRICT
+AS '$libdir/pgcrypto', $function$pg_digest$function$
+;
+--
+-- PostgreSQL database dump
+--
+
+-- Dumped from database version 13.2 (Debian 13.2-1.pgdg100+1)
+-- Dumped by pg_dump version 13.4 (Debian 13.4-0+deb11u1)
+
+SET statement_timeout = 0;
+SET lock_timeout = 0;
+SET idle_in_transaction_session_timeout = 0;
+SET client_encoding = 'UTF8';
+SET standard_conforming_strings = on;
+SELECT pg_catalog.set_config('search_path', '', false);
+SET check_function_bodies = false;
+SET xmloption = content;
+SET client_min_messages = warning;
+SET row_security = off;
+
+SET default_tablespace = '';
+
+SET default_table_access_method = heap;
+
+--
+-- Name: torrent_info; Type: TABLE; Schema: bittorrent; Owner: eros
+--
+
+CREATE TABLE bittorrent.torrent_info (
+ info bytea NOT NULL
+);
+
+
+ALTER TABLE bittorrent.torrent_info OWNER TO eros;
+
+--
+-- Name: disk; Type: TABLE; Schema: diskjumble; Owner: eros
+--
+
+CREATE TABLE diskjumble.disk (
+ disk_id integer NOT NULL,
+ dev_uuid uuid NOT NULL,
+ dev_serial text,
+ sector_size integer NOT NULL,
+ num_sectors bigint NOT NULL,
+ failed boolean DEFAULT false NOT NULL
+);
+
+
+ALTER TABLE diskjumble.disk OWNER TO eros;
+
+--
+-- Name: disk_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros
+--
+
+CREATE SEQUENCE diskjumble.disk_id_seq
+ AS integer
+ START WITH 1
+ INCREMENT BY 1
+ NO MINVALUE
+ NO MAXVALUE
+ CACHE 1;
+
+
+ALTER TABLE diskjumble.disk_id_seq OWNER TO eros;
+
+--
+-- Name: disk_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros
+--
+
+ALTER SEQUENCE diskjumble.disk_id_seq OWNED BY diskjumble.disk.disk_id;
+
+
+--
+-- Name: slab; Type: TABLE; Schema: diskjumble; Owner: eros
+--
+
+CREATE TABLE diskjumble.slab (
+ slab_id integer NOT NULL,
+ disk_id integer NOT NULL,
+ disk_sectors int8range NOT NULL,
+ entity_id bytea NOT NULL,
+ entity_offset bigint NOT NULL,
+ crypt_key bytea
+);
+
+
+ALTER TABLE diskjumble.slab OWNER TO eros;
+
+--
+-- Name: slab_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros
+--
+
+CREATE SEQUENCE diskjumble.slab_id_seq
+ START WITH 1
+ INCREMENT BY 1
+ NO MINVALUE
+ NO MAXVALUE
+ CACHE 1;
+
+
+ALTER TABLE diskjumble.slab_id_seq OWNER TO eros;
+
+--
+-- Name: slab_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros
+--
+
+ALTER SEQUENCE diskjumble.slab_id_seq OWNED BY diskjumble.slab.slab_id;
+
+
+--
+-- Name: verify_pass; Type: TABLE; Schema: diskjumble; Owner: eros
+--
+
+CREATE TABLE diskjumble.verify_pass (
+ verify_pass_id integer NOT NULL,
+ at timestamp with time zone,
+ disk_id integer NOT NULL,
+ disk_sectors int8range NOT NULL
+);
+
+
+ALTER TABLE diskjumble.verify_pass OWNER TO eros;
+
+--
+-- Name: verify_pass_verify_pass_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros
+--
+
+CREATE SEQUENCE diskjumble.verify_pass_verify_pass_id_seq
+ AS integer
+ START WITH 1
+ INCREMENT BY 1
+ NO MINVALUE
+ NO MAXVALUE
+ CACHE 1;
+
+
+ALTER TABLE diskjumble.verify_pass_verify_pass_id_seq OWNER TO eros;
+
+--
+-- Name: verify_pass_verify_pass_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros
+--
+
+ALTER SEQUENCE diskjumble.verify_pass_verify_pass_id_seq OWNED BY diskjumble.verify_pass.verify_pass_id;
+
+
+--
+-- Name: verify_piece; Type: TABLE; Schema: diskjumble; Owner: eros
+--
+
+CREATE TABLE diskjumble.verify_piece (
+ verify_id integer NOT NULL,
+ at timestamp with time zone,
+ entity_id bytea NOT NULL,
+ piece integer NOT NULL
+);
+
+
+ALTER TABLE diskjumble.verify_piece OWNER TO eros;
+
+--
+-- Name: verify_piece_content; Type: TABLE; Schema: diskjumble; Owner: eros
+--
+
+CREATE TABLE diskjumble.verify_piece_content (
+ verify_id integer NOT NULL,
+ seq integer NOT NULL,
+ disk_id integer NOT NULL,
+ disk_sectors int8range NOT NULL
+);
+
+
+ALTER TABLE diskjumble.verify_piece_content OWNER TO eros;
+
+--
+-- Name: verify_piece_fail; Type: TABLE; Schema: diskjumble; Owner: eros
+--
+
+CREATE TABLE diskjumble.verify_piece_fail (
+ verify_id integer NOT NULL
+);
+
+
+ALTER TABLE diskjumble.verify_piece_fail OWNER TO eros;
+
+--
+-- Name: verify_piece_incomplete; Type: TABLE; Schema: diskjumble; Owner: eros
+--
+
+CREATE TABLE diskjumble.verify_piece_incomplete (
+ verify_id integer NOT NULL,
+ hasher_state json
+);
+
+
+ALTER TABLE diskjumble.verify_piece_incomplete OWNER TO eros;
+
+--
+-- Name: verify_piece_verify_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros
+--
+
+CREATE SEQUENCE diskjumble.verify_piece_verify_id_seq
+ AS integer
+ START WITH 1
+ INCREMENT BY 1
+ NO MINVALUE
+ NO MAXVALUE
+ CACHE 1;
+
+
+ALTER TABLE diskjumble.verify_piece_verify_id_seq OWNER TO eros;
+
+--
+-- Name: verify_piece_verify_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros
+--
+
+ALTER SEQUENCE diskjumble.verify_piece_verify_id_seq OWNED BY diskjumble.verify_piece.verify_id;
+
+
+--
+-- Name: disk disk_id; Type: DEFAULT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.disk ALTER COLUMN disk_id SET DEFAULT nextval('diskjumble.disk_id_seq'::regclass);
+
+
+--
+-- Name: slab slab_id; Type: DEFAULT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.slab ALTER COLUMN slab_id SET DEFAULT nextval('diskjumble.slab_id_seq'::regclass);
+
+
+--
+-- Name: verify_pass verify_pass_id; Type: DEFAULT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_pass ALTER COLUMN verify_pass_id SET DEFAULT nextval('diskjumble.verify_pass_verify_pass_id_seq'::regclass);
+
+
+--
+-- Name: verify_piece verify_id; Type: DEFAULT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece ALTER COLUMN verify_id SET DEFAULT nextval('diskjumble.verify_piece_verify_id_seq'::regclass);
+
+
+--
+-- Name: disk disk_dev_uuid_key; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.disk
+ ADD CONSTRAINT disk_dev_uuid_key UNIQUE (dev_uuid);
+
+
+--
+-- Name: disk disk_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.disk
+ ADD CONSTRAINT disk_pkey PRIMARY KEY (disk_id);
+
+
+--
+-- Name: slab slab_disk_id_disk_sectors_excl; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.slab
+ ADD CONSTRAINT slab_disk_id_disk_sectors_excl EXCLUDE USING gist (disk_id WITH =, disk_sectors WITH &&);
+
+
+--
+-- Name: slab slab_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.slab
+ ADD CONSTRAINT slab_pkey PRIMARY KEY (slab_id);
+
+
+--
+-- Name: verify_pass verify_pass_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_pass
+ ADD CONSTRAINT verify_pass_pkey PRIMARY KEY (verify_pass_id);
+
+
+--
+-- Name: verify_piece_content verify_piece_content_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece_content
+ ADD CONSTRAINT verify_piece_content_pkey PRIMARY KEY (verify_id, seq);
+
+
+--
+-- Name: verify_piece_fail verify_piece_fail_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece_fail
+ ADD CONSTRAINT verify_piece_fail_pkey PRIMARY KEY (verify_id);
+
+
+--
+-- Name: verify_piece_incomplete verify_piece_incomplete_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece_incomplete
+ ADD CONSTRAINT verify_piece_incomplete_pkey PRIMARY KEY (verify_id);
+
+
+--
+-- Name: verify_piece verify_piece_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece
+ ADD CONSTRAINT verify_piece_pkey PRIMARY KEY (verify_id);
+
+
+--
+-- Name: torrent_info_digest_idx; Type: INDEX; Schema: bittorrent; Owner: eros
+--
+
+CREATE UNIQUE INDEX torrent_info_digest_idx ON bittorrent.torrent_info USING btree (public.digest(info, 'sha1'::text));
+
+
+--
+-- Name: slab slab_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.slab
+ ADD CONSTRAINT slab_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id);
+
+
+--
+-- Name: verify_pass verify_pass_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_pass
+ ADD CONSTRAINT verify_pass_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id);
+
+
+--
+-- Name: verify_piece_content verify_piece_content_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece_content
+ ADD CONSTRAINT verify_piece_content_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id);
+
+
+--
+-- Name: verify_piece_content verify_piece_content_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece_content
+ ADD CONSTRAINT verify_piece_content_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id);
+
+
+--
+-- Name: verify_piece_fail verify_piece_fail_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece_fail
+ ADD CONSTRAINT verify_piece_fail_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id);
+
+
+--
+-- Name: verify_piece_incomplete verify_piece_incomplete_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
+--
+
+ALTER TABLE ONLY diskjumble.verify_piece_incomplete
+ ADD CONSTRAINT verify_piece_incomplete_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id);
+
+
+--
+-- PostgreSQL database dump complete
+--
+
--- /dev/null
+from typing import NamedTuple
+
+
+class Tracker(NamedTuple):
+ gazelle_id: int
+ scrape_url_format: str
+
+
+TRACKERS = {
+ "PTP": Tracker(1, "http://please.passthepopcorn.me:2710/{passkey}/scrape"),
+ "BTN": Tracker(3, "http://landof.tv/{passkey}/scrape"),
+ "Redacted": Tracker(4, "https://flacsfor.me/{passkey}/scrape"),
+}
--- /dev/null
+from __future__ import annotations
+from abc import ABCMeta, abstractmethod
+from dataclasses import dataclass
+from typing import Optional
+import argparse
+import contextlib
+import datetime as dt
+import io
+import itertools
+import math
+
+import psycopg2
+
+from disk_jumble import bencode
+from disk_jumble.db import HasherRef, Slab, Wrapper as DbWrapper
+from disk_jumble.nettle import Sha1Hasher
+
+
+_READ_BUFFER_SIZE = 16 * 1024 ** 2 # in bytes
+
+
+@dataclass
+class _SlabChunk:
+ """A slice of a slab; comprising all or part of a piece to be hashed."""
+ slab: Slab
+ slice: slice
+
+
+@dataclass
+class _PieceTask:
+ """The chunks needed to hash as fully as possible an entity piece."""
+ entity_id: bytes
+ piece_num: int
+ hasher_ref: Optional[HasherRef]
+ chunks: list[_SlabChunk]
+ complete: bool # do these chunks complete the piece?
+
+
+class _BadSector(Exception):
+ pass
+
+
+def do_verify(conn, disk_id: int, disk_file: io.BufferedIOBase, read_size: int, read_tries: int) -> None:
+ db = DbWrapper(conn)
+ disk = db.get_disk(disk_id)
+
+ info_dicts = {
+ info_hash: bencode.decode(info)
+ for (info_hash, info) in db.get_torrent_info(disk_id)
+ }
+
+ tasks = []
+ slabs_and_hashers = db.get_slabs_and_hashers(disk_id)
+ for (entity_id, group) in itertools.groupby(slabs_and_hashers, lambda t: t[0].entity_id):
+ info = info_dicts[entity_id]
+ piece_len = info[b"piece length"]
+ assert piece_len % disk.sector_size == 0
+ if b"length" in info:
+ torrent_len = info[b"length"]
+ else:
+ torrent_len = sum(d[b"length"] for d in info[b"files"])
+
+ offset = None
+ use_hasher = None
+ chunks = []
+ for (slab, hasher_ref) in group:
+ slab_end = min(slab.entity_offset + len(slab.sectors) * disk.sector_size, torrent_len)
+
+ while offset is None or offset < slab_end:
+ if offset is not None and slab.entity_offset > offset:
+ if chunks:
+ tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, False))
+ offset = None
+ use_hasher = None
+ chunks = []
+
+ if offset is None:
+ aligned = math.ceil(slab.entity_offset / piece_len) * piece_len
+ if hasher_ref and hasher_ref.entity_offset < aligned:
+ assert hasher_ref.entity_offset < torrent_len
+ use_hasher = hasher_ref
+ offset = hasher_ref.entity_offset
+ elif aligned < slab_end:
+ offset = aligned
+ else:
+ break # no usable data in this slab
+
+ if offset is not None:
+ piece_end = min(offset + piece_len - offset % piece_len, torrent_len)
+ chunk_end = min(piece_end, slab_end)
+ chunks.append(_SlabChunk(slab, slice(offset - slab.entity_offset, chunk_end - slab.entity_offset)))
+ if chunk_end == piece_end:
+ tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, True))
+ use_hasher = None
+ chunks = []
+ offset = chunk_end
+
+ if chunks:
+ tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, False))
+
+ @dataclass
+ class NewVerifyPiece:
+ entity_id: bytes
+ piece_num: int
+ sector_ranges: list[range]
+ hasher_state: Optional[dict]
+ failed: bool
+
+ @dataclass
+ class VerifyUpdate:
+ seq_start: int
+ new_sector_ranges: list[range]
+ hasher_state: Optional[dict]
+
+ passed_verifies = set()
+ failed_verifies = set()
+ new_pass_ranges = []
+ vp_updates = {}
+ new_vps = []
+
+ run_ts = dt.datetime.now(dt.timezone.utc)
+ for task in tasks:
+ hasher = Sha1Hasher(task.hasher_ref.state if task.hasher_ref else None)
+ sector_ranges = [
+ range(
+ chunk.slab.sectors.start + chunk.slice.start // disk.sector_size,
+ chunk.slab.sectors.start + math.ceil(chunk.slice.stop / disk.sector_size)
+ )
+ for chunk in task.chunks
+ ]
+
+ try:
+ for chunk in task.chunks:
+ slab_off = chunk.slab.sectors.start * disk.sector_size
+ disk_file.seek(slab_off + chunk.slice.start)
+ end_pos = slab_off + chunk.slice.stop
+ while disk_file.tell() < end_pos:
+ pos = disk_file.tell()
+ for _ in range(read_tries):
+ try:
+ data = disk_file.read(min(end_pos - pos, read_size))
+ except OSError as e:
+ disk_file.seek(pos)
+ else:
+ break
+ else:
+ raise _BadSector()
+
+ assert data
+ hasher.update(data)
+ except _BadSector:
+ if task.hasher_ref:
+ failed_verifies.add(task.hasher_ref.id)
+ vp_updates[task.hasher_ref.id] = VerifyUpdate(task.hasher_ref.seq + 1, sector_ranges, None)
+ else:
+ new_vps.append(NewVerifyPiece(task.entity_id, task.piece_num, sector_ranges, None, True))
+ else:
+ hasher_state = hasher.ctx.serialize()
+ if task.complete:
+ s = slice(task.piece_num * 20, task.piece_num * 20 + 20)
+ expected_hash = info_dicts[task.entity_id][b"pieces"][s]
+ if hasher.digest() == expected_hash:
+ write_piece_data = False
+ new_pass_ranges.extend(sector_ranges)
+ if task.hasher_ref:
+ passed_verifies.add(task.hasher_ref.id)
+ else:
+ failed = True
+ write_piece_data = True
+ if task.hasher_ref:
+ failed_verifies.add(task.hasher_ref.id)
+ else:
+ failed = False
+ write_piece_data = True
+
+ if write_piece_data:
+ if task.hasher_ref:
+ assert task.hasher_ref.id not in vp_updates
+ vp_updates[task.hasher_ref.id] = VerifyUpdate(task.hasher_ref.seq + 1, sector_ranges, hasher_state)
+ else:
+ new_vps.append(NewVerifyPiece(task.entity_id, task.piece_num, sector_ranges, hasher_state, failed))
+
+ new_pass_ranges.sort(key = lambda r: r.start)
+ merged_ranges = []
+ for r in new_pass_ranges:
+ if merged_ranges and r.start == merged_ranges[-1].stop:
+ merged_ranges[-1] = range(merged_ranges[-1].start, r.stop)
+ else:
+ merged_ranges.append(r)
+
+ for vp in new_vps:
+ verify_id = db.insert_verify_piece(run_ts, vp.entity_id, vp.piece_num)
+ db.insert_verify_piece_content(verify_id, 0, disk_id, vp.sector_ranges)
+ if vp.failed:
+ db.mark_verify_piece_failed(verify_id)
+ else:
+ db.upsert_hasher_state(verify_id, vp.hasher_state)
+
+ for (verify_id, update) in vp_updates.items():
+ db.insert_verify_piece_content(verify_id, update.seq_start, disk_id, update.new_sector_ranges)
+ if update.hasher_state:
+ db.upsert_hasher_state(verify_id, update.hasher_state)
+
+ for verify_id in passed_verifies:
+ db.move_piece_content_for_pass(verify_id)
+ db.delete_verify_piece(verify_id)
+
+ for r in merged_ranges:
+ db.insert_pass_data(run_ts, disk_id, r)
+
+ for verify_id in failed_verifies:
+ db.clear_incomplete(verify_id)
+ db.mark_verify_piece_failed(verify_id)
+
+
+if __name__ == "__main__":
+ def read_tries(raw_arg):
+ val = int(raw_arg)
+ if val > 0:
+ return val
+ else:
+ raise ValueError()
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument("disk_id", type = int)
+ parser.add_argument(
+ "read_tries",
+ type = read_tries,
+ help = "number of times to attempt a particular disk read before giving up on the sector",
+ )
+ args = parser.parse_args()
+
+ with contextlib.closing(psycopg2.connect("")) as conn:
+ conn.autocommit = True
+ path = f"/dev/mapper/diskjumble-{args.disk_id}"
+ with open(path, "rb", buffering = _READ_BUFFER_SIZE) as disk_file:
+ do_verify(conn, args.disk_id, disk_file, _READ_BUFFER_SIZE, args.read_tries)
--- /dev/null
+"""
+Using the live database, dump creation code for extensions, tables, and functions needed for local testing
+
+For testing the verification script, write the output of this script to:
+
+ src/disk_jumble/tests/verify_setup.sql
+"""
+
+import contextlib
+import itertools
+import os
+import subprocess
+
+import psycopg2
+
+
+procedures = [
+ "public.digest(bytea, text)",
+]
+
+extensions = [
+ "btree_gist",
+]
+
+with contextlib.closing(psycopg2.connect("")) as conn:
+ conn.autocommit = True
+ with conn.cursor() as cursor:
+ for ext in extensions:
+ print(f"create extension {ext};", flush = True)
+ for sig in procedures:
+ cursor.execute("select pg_get_functiondef(to_regprocedure(%s));", (sig,))
+ [(sql,)] = cursor.fetchall()
+ print(sql + ";", flush = True)
+
+tables = [
+ "diskjumble.disk",
+ "diskjumble.slab",
+ "diskjumble.verify_pass",
+ "diskjumble.verify_piece",
+ "diskjumble.verify_piece_content",
+ "diskjumble.verify_piece_fail",
+ "diskjumble.verify_piece_incomplete",
+ "bittorrent.torrent_info",
+]
+argv = ["pg_dump", *itertools.chain.from_iterable(["-t", table] for table in tables), "--schema-only", os.environ["PGDATABASE"]]
+subprocess.run(argv, check = True)
+++ /dev/null
-[build-system]
-requires = ["setuptools>=42", "wheel"]
-build-backend = "setuptools.build_meta"
+++ /dev/null
-[metadata]
-name = disk_jumble
-version = 0.0.1
-
-[options]
-package_dir =
- = src
-packages = disk_jumble
-python_requires ~= "3.7"
-install_requires =
- psycopg2-binary ~= 2.8
+++ /dev/null
-from setuptools import setup
-
-
-setup()
+++ /dev/null
-from __future__ import annotations
-from typing import Union
-import itertools
-
-
-Bdict = dict[bytes, 'Type']
-Type = Union[bytes, int, list['Type'], Bdict]
-
-
-class CodecError(Exception):
- pass
-
-
-def _pop_bytes(vals: list[bytes]) -> bytes:
- len_parts = []
- while vals and vals[0].isdigit():
- len_parts.append(vals.pop(0))
-
- try:
- length = int(b"".join(len_parts).decode("ascii"))
- except ValueError:
- raise CodecError()
-
- try:
- if vals.pop(0) != b":":
- raise CodecError()
- except IndexError:
- raise CodecError()
-
- if length > len(vals):
- raise CodecError()
-
- out = b"".join(vals[:length])
- del vals[:length]
- return out
-
-
-def _pop_int(vals: list[bytes]) -> int:
- assert vals.pop(0) == b"i"
-
- try:
- end = vals.index(b"e")
- except ValueError:
- raise CodecError()
-
- try:
- out = int(b"".join(vals[:end]).decode("ascii"))
- except ValueError:
- raise CodecError()
-
- del vals[slice(end + 1)]
- return out
-
-
-def _pop_list(vals: list[bytes]) -> list[Type]:
- assert vals.pop(0) == b"l"
-
- out = []
- while vals and vals[0] != b"e":
- out.append(_pop_value(vals))
-
- if vals:
- del vals[0]
- return out
- else:
- raise CodecError()
-
-
-def _pop_dict(vals: list[bytes]) -> Bdict:
- assert vals.pop(0) == b"d"
-
- out = {}
- while vals and vals[0] != b"e":
- key = _pop_bytes(vals)
- out[key] = _pop_value(vals)
-
- if vals:
- del vals[0]
- return out
- else:
- raise CodecError()
-
-
-def _pop_value(vals: list[bytes]) -> Type:
- if vals:
- if vals[0].isdigit():
- return _pop_bytes(vals)
- elif vals[0] == b"i":
- return _pop_int(vals)
- elif vals[0] == b"l":
- return _pop_list(vals)
- elif vals[0] == b"d":
- return _pop_dict(vals)
- else:
- raise CodecError()
- else:
- raise CodecError()
-
-
-def decode(data: bytes) -> Type:
- vals = [bytes([v]) for v in data]
- out = _pop_value(vals)
- if vals:
- raise CodecError()
- else:
- return out
-
-
-def _encode_helper(data: Type) -> list[bytes]:
- if isinstance(data, bytes):
- return [str(len(data)).encode("ascii"), b":", data]
- elif isinstance(data, int):
- return [b"i", str(data).encode("ascii"), b"e"]
- elif isinstance(data, list):
- return [b"l", *itertools.chain.from_iterable(map(_encode_helper, data)), b"e"]
- elif isinstance(data, dict):
- contents = itertools.chain.from_iterable(data.items())
- return [b"d", *itertools.chain.from_iterable(map(_encode_helper, contents)), b"e"]
- else:
- raise CodecError()
-
-
-def encode(data: Type) -> bytes:
- return b"".join(_encode_helper(data))
+++ /dev/null
-from __future__ import annotations
-from dataclasses import dataclass
-from typing import Any, Iterable, Optional
-import datetime as dt
-import itertools
-
-from psycopg2.extras import execute_batch, Json, NumericRange
-
-
-@dataclass
-class Disk:
- sector_size: int
-
-
-@dataclass
-class Slab:
- id: int
- disk_id: int
- sectors: range
- entity_id: bytes
- entity_offset: int
- crypt_key: bytes
-
-
-@dataclass
-class HasherRef:
- id: int
- seq: int
- entity_offset: int
- state: dict
-
-
-@dataclass
-class Wrapper:
- conn: Any
-
- def get_passkey(self, tracker_id: int) -> str:
- with self.conn.cursor() as cursor:
- cursor.execute("select passkey from gazelle.passkey where gazelle_tracker_id = %s;", (tracker_id,))
- [(passkey,)] = cursor.fetchall()
-
- return passkey
-
- def get_torrents(self, tracker_id: int, batch_size: Optional[int] = None) -> Iterable[bytes]:
- """Iterate the info hashes for the specified tracker which haven't been marked deleted."""
-
- stmt = """
- select infohash from gazelle.torrent
- where gazelle_tracker_id = %s and not is_deleted
- order by infohash asc
- ;
- """
- with self.conn.cursor() as cursor:
- if batch_size is not None:
- cursor.itersize = batch_size
-
- cursor.execute(stmt, (tracker_id,))
- for row in cursor:
- (info_hash_mem,) = row
- info_hash = bytes(info_hash_mem)
- assert len(info_hash) == 20
- yield info_hash
-
- def insert_swarm_info(self, tracker_id: int, infos: Iterable["disk_jumble.scrape.ScrapeInfo"]) -> None:
- stmt = """
- insert into gazelle.tracker_stat (gazelle_tracker_id, infohash, ts, complete, incomplete, downloaded)
- values (%s, %s, %s, %s, %s, %s)
- ;
- """
- with self.conn.cursor() as cursor:
- param_sets = [
- (tracker_id, i.info_hash, i.timestamp, i.complete, i.incomplete, i.downloaded)
- for i in infos
- ]
- execute_batch(cursor, stmt, param_sets)
-
- def get_disk(self, id_: int) -> Disk:
- with self.conn.cursor() as cursor:
- cursor.execute("select sector_size from diskjumble.disk where disk_id = %s;", (id_,))
- [(sector_size,)] = cursor.fetchall()
-
- return Disk(sector_size)
-
- def get_slabs_and_hashers(self, disk_id: int) -> Iterable[tuple[Slab, Optional[HasherRef]]]:
- """
- Find slabs on the specified disk, and for each also return any lowest-offset saved hasher that left off directly
- before or within the slab's entity data.
- """
-
- stmt = """
- with
- incomplete_edge as (
- -- join up incomplete piece info and precompute where the hasher left off within the entity
- select
- verify_id, seq, slab.entity_id, hasher_state,
- entity_offset + (upper(c.disk_sectors) - lower(slab.disk_sectors)) * sector_size as end_off
- from
- diskjumble.verify_piece_incomplete
- natural left join diskjumble.verify_piece p
- natural join diskjumble.verify_piece_content c
- natural left join diskjumble.disk
- left join diskjumble.slab on (
- c.disk_id = slab.disk_id
- and upper(c.disk_sectors) <@ int8range(lower(slab.disk_sectors), upper(slab.disk_sectors), '[]')
- )
- where seq >= all (select seq from diskjumble.verify_piece_content where verify_id = p.verify_id)
- )
- select
- slab_id, disk_id, disk_sectors, slab.entity_id, entity_offset, crypt_key, verify_id, seq, end_off,
- hasher_state
- from
- diskjumble.slab
- natural left join diskjumble.disk
- left join incomplete_edge on
- incomplete_edge.entity_id = slab.entity_id
- and incomplete_edge.end_off <@ int8range(
- slab.entity_offset,
- slab.entity_offset + (upper(disk_sectors) - lower(disk_sectors)) * sector_size
- )
- and (incomplete_edge.end_off - slab.entity_offset) %% sector_size = 0
- where disk_id = %s
- order by slab.entity_id, entity_offset, slab_id
- ;
- """
- with self.conn.cursor() as cursor:
- cursor.execute(stmt, (disk_id,))
- for (_, rows_iter) in itertools.groupby(cursor, lambda r: r[0]):
- rows = list(rows_iter)
- [(slab_id, disk_id, sectors_pg, entity_id, entity_off, key)] = {r[:6] for r in rows}
- sectors = range(sectors_pg.lower, sectors_pg.upper)
- slab = Slab(slab_id, disk_id, sectors, bytes(entity_id), entity_off, key)
-
- # `None' if no hasher match in outer join, otherwise earliest match
- (*_, id_, seq, end_off, state) = min(rows, key = lambda r: r[-2])
- hasher_ref = None if id_ is None else HasherRef(id_, seq, end_off, state)
-
- yield (slab, hasher_ref)
-
- def get_torrent_info(self, disk_id: int) -> Iterable[tuple[bytes, bytes]]:
- stmt = """
- with hashed as (
- select digest(info, 'sha1') as info_hash, info
- from bittorrent.torrent_info
- )
- select
- distinct on (info_hash)
- info_hash, info
- from diskjumble.slab left outer join hashed on entity_id = info_hash
- where disk_id = %s
- ;
- """
- with self.conn.cursor() as cursor:
- cursor.execute(stmt, (disk_id,))
- for (info_hash, info) in cursor:
- yield (bytes(info_hash), bytes(info))
-
- def insert_verify_piece(self, ts: dt.datetime, entity_id: bytes, piece_num: int) -> int:
- """Insert new verify piece, returning the ID of the inserted row."""
-
- with self.conn.cursor() as cursor:
- stmt = "insert into diskjumble.verify_piece values (default, %s, %s, %s) returning verify_id;"
- cursor.execute(stmt, (ts, entity_id, piece_num))
- [(row_id,)] = cursor.fetchall()
- return row_id
-
- def insert_verify_piece_content(self, verify_id: int, seq_start: int, disk_id: int, ranges: Iterable[range]) -> None:
- with self.conn.cursor() as cursor:
- execute_batch(
- cursor,
- "insert into diskjumble.verify_piece_content values (%s, %s, %s, %s);",
- [
- (verify_id, seq, disk_id, NumericRange(r.start, r.stop))
- for (seq, r) in enumerate(ranges, start = seq_start)
- ]
- )
-
- def mark_verify_piece_failed(self, verify_id: int) -> None:
- with self.conn.cursor() as cursor:
- cursor.execute("insert into diskjumble.verify_piece_fail values (%s);", (verify_id,))
-
- def upsert_hasher_state(self, verify_id: int, state: dict) -> None:
- stmt = """
- insert into diskjumble.verify_piece_incomplete values (%s, %s)
- on conflict (verify_id) do update set hasher_state = excluded.hasher_state
- ;
- """
- with self.conn.cursor() as cursor:
- cursor.execute(stmt, (verify_id, Json(state)))
-
- def delete_verify_piece(self, verify_id: int) -> None:
- with self.conn.cursor() as cursor:
- cursor.execute("delete from diskjumble.verify_piece_incomplete where verify_id = %s;", (verify_id,))
- cursor.execute("delete from diskjumble.verify_piece_content where verify_id = %s;", (verify_id,))
- cursor.execute("delete from diskjumble.verify_piece where verify_id = %s", (verify_id,))
-
- def move_piece_content_for_pass(self, verify_id: int) -> None:
- stmt = """
- with content_out as (
- delete from diskjumble.verify_piece_content c
- using diskjumble.verify_piece p
- where (
- c.verify_id = p.verify_id
- and p.verify_id = %s
- )
- returning at, disk_id, disk_sectors
- )
- insert into diskjumble.verify_pass (at, disk_id, disk_sectors)
- select at, disk_id, disk_sectors from content_out
- ;
- """
- with self.conn.cursor() as cursor:
- cursor.execute(stmt, (verify_id,))
-
- def insert_pass_data(self, ts: dt.datetime, disk_id: int, sectors: range) -> None:
- with self.conn.cursor() as cursor:
- cursor.execute(
- "insert into diskjumble.verify_pass values (default, %s, %s, %s);",
- (ts, disk_id, NumericRange(sectors.start, sectors.stop))
- )
-
- def clear_incomplete(self, verify_id: int) -> None:
- with self.conn.cursor() as cursor:
- cursor.execute("delete from diskjumble.verify_piece_incomplete where verify_id = %s;", (verify_id,))
+++ /dev/null
-"""Python wrappers for some of GnuTLS Nettle."""
-
-from ctypes.util import find_library
-from typing import Optional
-import ctypes
-
-
-_LIB = ctypes.CDLL(find_library("nettle"))
-
-
-class _Sha1Defs:
- _DIGEST_SIZE = 20 # in bytes
- _BLOCK_SIZE = 64 # in bytes
- _DIGEST_LENGTH = 5
-
- _StateArr = ctypes.c_uint32 * _DIGEST_LENGTH
-
- _BlockArr = ctypes.c_uint8 * _BLOCK_SIZE
-
-
-class Sha1Hasher(_Sha1Defs):
- class Context(ctypes.Structure):
- _fields_ = [
- ("state", _Sha1Defs._StateArr),
- ("count", ctypes.c_uint64),
- ("index", ctypes.c_uint),
- ("block", _Sha1Defs._BlockArr),
- ]
-
- @classmethod
- def deserialize(cls, data):
- return cls(
- _Sha1Defs._StateArr(*data["state"]),
- data["count"],
- data["index"],
- _Sha1Defs._BlockArr(*data["block"]),
- )
-
- def serialize(self):
- return {
- "state": list(self.state),
- "count": self.count,
- "index": self.index,
- "block": list(self.block),
- }
-
- @classmethod
- def _new_context(cls):
- ctx = cls.Context()
- _LIB.nettle_sha1_init(ctypes.byref(ctx))
- return ctx
-
- def __init__(self, ctx_dict: Optional[dict]):
- if ctx_dict:
- self.ctx = self.Context.deserialize(ctx_dict)
- else:
- self.ctx = self._new_context()
-
- def update(self, data):
- _LIB.nettle_sha1_update(ctypes.byref(self.ctx), len(data), data)
-
- def digest(self):
- """Return the current digest and reset the hasher state."""
- out = (ctypes.c_uint8 * self._DIGEST_SIZE)()
- _LIB.nettle_sha1_digest(ctypes.byref(self.ctx), self._DIGEST_SIZE, out)
- return bytes(out)
+++ /dev/null
-from dataclasses import dataclass
-from typing import Iterable, Union
-from urllib.error import URLError
-from urllib.parse import urlencode, urlparse, urlunparse
-from urllib.request import HTTPHandler, HTTPRedirectHandler, HTTPSHandler, OpenerDirector, UnknownHandler
-import argparse
-import contextlib
-import datetime as dt
-import itertools
-import re
-import sys
-import time
-
-import psycopg2
-
-from disk_jumble import bencode
-from disk_jumble.db import Wrapper as DbWrapper
-from disk_jumble.trackers import Tracker, TRACKERS
-
-
-class Result:
- pass
-
-
-@dataclass
-class OkResult(Result):
- data: bencode.Type
-
-
-@dataclass
-class ErrorResult(Result):
- status: int
- body: Union[bencode.Type, bytes]
-
-
-@dataclass
-class ScrapeInfo:
- info_hash: bytes
- timestamp: dt.datetime
- complete: int
- incomplete: int
- downloaded: int
-
-
-def scrape_batch(tracker: Tracker, info_hashes: Iterable[bytes], passkey: str) -> Result:
- qs = urlencode({"info_hash": list(info_hashes)}, doseq = True)
- url_parts = urlparse(tracker.scrape_url_format.format(passkey = passkey))
- assert not url_parts.query
- url = urlunparse(url_parts._replace(query = qs))
-
- # we handle HTTP errors ourself
- opener = OpenerDirector()
- for handler in [UnknownHandler(), HTTPHandler(), HTTPSHandler(), HTTPRedirectHandler()]:
- opener.add_handler(handler)
-
- with opener.open(url) as resp:
- body = resp.read()
-
- try:
- data = bencode.decode(body)
- except bencode.DecodeError:
- data = body
- else:
- if resp.getcode() == 200 and b"files" in data:
- return OkResult(data)
-
- return ErrorResult(resp.getcode(), data)
-
-
-if __name__ == "__main__":
- PSQL_PARAMS = ["dbname", "user", "password", "host", "port"]
-
- def tracker_name(val):
- matches = [n for n in TRACKERS.keys() if n.lower() == val.lower()]
- if matches:
- [name] = matches
- return name
- else:
- raise ValueError()
-
- def batch_size(val):
- out = int(val)
- if out > 0:
- return out
- else:
- raise ValueError()
-
- def delay(val):
- out = int(val)
- if out >= 0:
- return out
- else:
- raise ValueError()
-
- parser = argparse.ArgumentParser()
- parser.add_argument(
- "tracker",
- choices = sorted(TRACKERS.keys()),
- type = tracker_name,
- help = "name of tracker to scrape",
- )
- parser.add_argument(
- "batch_size",
- type = batch_size,
- help = "number of torrents per batch",
- )
- parser.add_argument(
- "delay",
- type = delay,
- help = "delay between batches, in seconds",
- )
-
- for arg_name in PSQL_PARAMS:
- parser.add_argument("--" + arg_name, nargs = "?")
-
- args = parser.parse_args()
- tracker = TRACKERS[args.tracker]
- delay = dt.timedelta(seconds = args.delay)
-
- params = {n: getattr(args, n) for n in PSQL_PARAMS if getattr(args, n)}
- with contextlib.closing(psycopg2.connect(**params)) as conn:
- conn.autocommit = True
- db_wrapper = DbWrapper(conn)
- passkey = db_wrapper.get_passkey(tracker.gazelle_id)
- info_hashes = iter(db_wrapper.get_torrents(tracker.gazelle_id, args.batch_size))
- batches = iter(lambda: list(itertools.islice(info_hashes, args.batch_size)), [])
- for (i, batch) in enumerate(batches):
- if i != 0:
- time.sleep(delay.total_seconds())
- timestamp = dt.datetime.now(dt.timezone.utc)
- try:
- result = scrape_batch(tracker, batch, passkey)
- except URLError as e:
- print("[ERR] couldn't connect: {}".format(e), file = sys.stderr, flush = True)
- else:
- if isinstance(result, OkResult):
- assert set(result.data[b"files"].keys()) <= set(batch), "unexpected torrent in respose"
- infos = [
- ScrapeInfo(
- info_hash,
- timestamp,
- complete = meta_dict[b"complete"],
- incomplete = meta_dict[b"incomplete"],
- downloaded = meta_dict[b"downloaded"],
- )
- for (info_hash, meta_dict) in result.data[b"files"].items()
- ]
- db_wrapper.insert_swarm_info(tracker.gazelle_id, infos)
- print("[OK] finished batch ({} of {} torrents)".format(len(infos), len(batch)), file = sys.stderr, flush = True)
- elif isinstance(result, ErrorResult):
- full_disp = result.body.decode("ascii", "ignore") if isinstance(result.body, bytes) else str(result.body)
- clean_disp = re.sub(r"\s", " ", full_disp)
- display_size = 100
- disp = clean_disp if len(clean_disp) <= display_size else clean_disp[:display_size] + "…"
- print("[ERR] tracker responded {}: {}".format(result.status, disp), file = sys.stderr, flush = True)
- else:
- raise Exception()
+++ /dev/null
-"""
-Tests for the verification program `disk_jumble.verify'
-
-Like the verification program itself, these tests take database connection information from the environment. The
-necessary schemas and tables are set up from scratch by the test code, so environment variables should point to a
-database that's not hosting a live instance of Disk Jumble. Ideally, this is a completely empty local database created
-for the purposes of testing, but other options are possible.
-
-The tests need access to an SQL source file containing the definitions for the required tables and other Postgres
-objects; see `test_util/dump_db.py'.
-"""
-
-from dataclasses import dataclass
-from importlib import resources
-from random import Random
-from typing import Optional
-import hashlib
-import io
-import tempfile
-import unittest
-import uuid
-
-from psycopg2.extras import NumericRange
-import psycopg2
-import psycopg2.extras
-
-from disk_jumble import bencode
-from disk_jumble.nettle import Sha1Hasher
-from disk_jumble.verify import do_verify
-
-
-_BUF_SIZE = 16 * 1024 ** 2 # in bytes
-
-
-class Tests(unittest.TestCase):
- _SCHEMAS = {"public", "diskjumble", "bittorrent"}
-
- def _basic_fresh_verify_helper(self, read_size):
- sector_size = 32
- piece_size = 64
-
- torrent_len = 3 * piece_size
- disk = self._write_disk(sector_size, torrent_len // sector_size)
- with _random_file(torrent_len, Random(0), on_disk = False) as torrent_file:
- torrent = _Torrent(torrent_file, piece_size)
- self._write_torrent(torrent)
- with self._conn.cursor() as cursor:
- cursor.execute(
- "insert into diskjumble.slab values (default, %s, %s, %s, 0, null);",
- (disk.id, NumericRange(0, disk.sector_count), torrent.info_hash)
- )
-
- do_verify(self._conn, disk.id, torrent_file, read_size, read_tries = 1)
-
- cursor.execute("select * from diskjumble.verify_pass;")
- self.assertEqual(cursor.rowcount, 1)
- (_, _, disk_id, sectors) = cursor.fetchone()
- self.assertEqual(disk_id, disk.id)
- self.assertEqual(sectors, NumericRange(0, torrent_len // sector_size))
-
- def test_basic_fresh_verify_small_read_size(self):
- self._basic_fresh_verify_helper(16)
-
- def test_basic_fresh_verify_large_read_size(self):
- self._basic_fresh_verify_helper(128)
-
- def test_resume_fragmentation_unaligned_end(self):
- """
- Test a run where a cached hash state is used, a piece is split on disk, and the end of the torrent isn't
- sector-aligned.
- """
- read_size = 8
- piece_size = 64
-
- other_disk = self._write_disk(16, 1)
- disk = self._write_disk(32, 3)
- with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
- torrent = _Torrent(torrent_file, piece_size)
- self._write_torrent(torrent)
- with self._conn.cursor() as cursor:
- cursor.executemany(
- "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
- [
- (other_disk.id, NumericRange(0, 1), torrent.info_hash, 0),
- (disk.id, NumericRange(0, 1), torrent.info_hash, other_disk.sector_size),
- (disk.id, NumericRange(2, 3), torrent.info_hash, other_disk.sector_size + disk.sector_size),
- ]
- )
-
- # Prepare the saved hasher state by running a verify
- do_verify(self._conn, other_disk.id, torrent_file, read_size, read_tries = 1)
- torrent_file.seek(0)
-
- cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
- [(row_count,)] = cursor.fetchall()
- self.assertEqual(row_count, 1)
-
- disk_file = io.BytesIO()
- torrent_file.seek(other_disk.sector_size)
- disk_file.write(torrent_file.read(disk.sector_size))
- disk_file.seek(disk_file.tell() + disk.sector_size)
- disk_file.write(torrent_file.read())
- disk_file.seek(0)
- do_verify(self._conn, disk.id, disk_file, read_size, read_tries = 1)
-
- # Check that there are no verify pieces in the database. Because of integrity constraints, this also
- # guarantees there aren't any stray saved hasher states, failed verifies, or piece contents.
- cursor.execute("select count(*) from diskjumble.verify_piece;")
- [(row_count,)] = cursor.fetchall()
- self.assertEqual(row_count, 0)
-
- cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;")
- self.assertEqual(
- cursor.fetchall(),
- [(other_disk.id, NumericRange(0, 1)), (disk.id, NumericRange(0, 1)), (disk.id, NumericRange(2, 3))]
- )
-
- def test_resume_no_completion(self):
- """
- Test a run where a saved hasher state is used and the target disk has subsequent entity data but not the full
- remainder of the piece.
- """
- read_size = 7
- piece_size = 64
-
- other_disk = self._write_disk(16, 1)
- disk = self._write_disk(32, 1)
- with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
- torrent = _Torrent(torrent_file, piece_size)
- self._write_torrent(torrent)
- with self._conn.cursor() as cursor:
- cursor.executemany(
- "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
- [
- (other_disk.id, NumericRange(0, 1), torrent.info_hash, 0),
- (disk.id, NumericRange(0, 1), torrent.info_hash, other_disk.sector_size),
- ]
- )
-
- do_verify(self._conn, other_disk.id, torrent_file, read_size, read_tries = 1)
-
- cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
- [(row_count,)] = cursor.fetchall()
- self.assertEqual(row_count, 1)
-
- disk_file = io.BytesIO()
- torrent_file.seek(other_disk.sector_size)
- disk_file.write(torrent_file.read(disk.sector_size))
- disk_file.seek(0)
- do_verify(self._conn, disk.id, disk_file, read_size, read_tries = 1)
-
- cursor.execute("select count(*) from diskjumble.verify_pass;")
- [(row_count,)] = cursor.fetchall()
- self.assertEqual(row_count, 0)
-
- cursor.execute("select entity_id, piece from diskjumble.verify_piece;")
- [(entity_id, piece_num)] = cursor.fetchall()
- self.assertEqual(bytes(entity_id), torrent.info_hash)
- self.assertEqual(piece_num, 0)
-
- cursor.execute("select disk_id, disk_sectors from diskjumble.verify_piece_content;")
- self.assertCountEqual(
- cursor.fetchall(),
- [(other_disk.id, NumericRange(0, 1)), (disk.id, NumericRange(0, 1))]
- )
-
- cursor.execute("select count(*) from diskjumble.verify_piece_fail;")
- [(row_count,)] = cursor.fetchall()
- self.assertEqual(row_count, 0)
-
- hasher = Sha1Hasher(None)
- torrent_file.seek(0)
- hasher.update(torrent_file.read(other_disk.sector_size + disk.sector_size))
- cursor.execute("select hasher_state from diskjumble.verify_piece_incomplete;")
- self.assertEqual(cursor.fetchall(), [(hasher.ctx.serialize(),)])
-
- def test_ignore_hasher_beginning_on_disk(self):
- """
- Test a run where a saved hasher state is available for use but isn't used due to the beginning of the piece
- being on disk.
- """
- piece_size = 64
-
- other_disk = self._write_disk(16, 1)
- disk = self._write_disk(16, 4)
- with _random_file(piece_size * 2, Random(0), on_disk = False) as torrent_file:
- torrent = _Torrent(torrent_file, piece_size)
- self._write_torrent(torrent)
- with self._conn.cursor() as cursor:
- cursor.executemany(
- "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
- [
- (other_disk.id, NumericRange(0, other_disk.sector_count), torrent.info_hash, piece_size),
- (disk.id, NumericRange(0, disk.sector_count), torrent.info_hash, piece_size),
- ]
- )
-
- do_verify(self._conn, other_disk.id, torrent_file, read_size = 128, read_tries = 1)
-
- cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
- [(row_count,)] = cursor.fetchall()
- self.assertEqual(row_count, 1)
-
- torrent_file.seek(piece_size)
- disk_file = io.BytesIO(torrent_file.read())
- do_verify(self._conn, disk.id, disk_file, read_size = 128, read_tries = 1)
-
- cursor.execute(
- "select disk_id from diskjumble.verify_piece_content natural join diskjumble.verify_piece_incomplete;"
- )
- self.assertEqual(cursor.fetchall(), [(other_disk.id,)])
-
- cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;")
- self.assertEqual(cursor.fetchall(), [(disk.id, NumericRange(0, disk.sector_count))])
-
- def test_ignore_hasher_unaligned(self):
- """
- Test a run where a saved hasher isn't used because its entity data offset isn't sector-aligned on the target
- disk.
-
- 0 16 32 48 64 80 96 112 128
- pieces: [-------------- 0 -------------]
- other disk: [--][--][--][--][--]
- disk: [------][------]
- """
- piece_size = 128
-
- other_disk = self._write_disk(16, 5)
- disk = self._write_disk(32, 2)
- with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
- torrent = _Torrent(torrent_file, piece_size)
- self._write_torrent(torrent)
- with self._conn.cursor() as cursor:
- cursor.executemany(
- "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
- [
- (other_disk.id, NumericRange(0, 5), torrent.info_hash, 0),
- (disk.id, NumericRange(0, 2), torrent.info_hash, 64),
- ]
- )
-
- do_verify(self._conn, other_disk.id, torrent_file, read_size = 16, read_tries = 1)
- cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
- [(row_count,)] = cursor.fetchall()
- self.assertEqual(row_count, 1)
-
- disk_file = io.BytesIO(torrent_file.getvalue()[64:])
- do_verify(self._conn, disk.id, disk_file, read_size = 16, read_tries = 1)
-
- cursor.execute("""
- select disk_id, disk_sectors
- from diskjumble.verify_piece_incomplete natural join diskjumble.verify_piece_content;
- """)
- self.assertEqual(
- cursor.fetchall(),
- [(other_disk.id, NumericRange(0, 5))]
- )
-
- cursor.execute("select count(*) from diskjumble.verify_pass;")
- [(row_count,)] = cursor.fetchall()
- self.assertEqual(row_count, 0)
-
- cursor.execute("select count(*) from diskjumble.verify_piece_fail;")
- [(row_count,)] = cursor.fetchall()
- self.assertEqual(row_count, 0)
-
- def test_transient_read_errors(self):
- """
- Test a run where a read to the disk fails but fewer times than needed to mark the sector bad.
- """
- piece_size = 32
-
- disk = self._write_disk(32, 1)
- with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
- torrent = _Torrent(torrent_file, piece_size)
- self._write_torrent(torrent)
- with self._conn.cursor() as cursor:
- cursor.execute(
- "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
- (disk.id, NumericRange(0, 1), torrent.info_hash, 0)
- )
-
- disk_file = _ReadErrorProxy(torrent_file, error_pos = 12, error_count = 2)
- do_verify(self._conn, disk.id, disk_file, read_size = 4, read_tries = 3)
-
- self.assertEqual(disk_file.triggered, 2)
-
- cursor.execute("select count(*) from diskjumble.verify_piece;")
- [(row_count,)] = cursor.fetchall()
- self.assertEqual(row_count, 0)
-
- cursor.execute("select disk_id, disk_sectors from diskjumble.verify_pass;")
- self.assertEqual(cursor.fetchall(), [(disk.id, NumericRange(0, 1))])
-
- def test_persistent_read_errors(self):
- """
- Test a run where a disk read fails enough times to trigger the bad sector logic.
- """
- piece_size = 64
-
- other_a = self._write_disk(16, 1)
- other_b = self._write_disk(16, 2)
- disk = self._write_disk(16, 1)
- with _random_file(piece_size, Random(0), on_disk = False) as torrent_file:
- torrent = _Torrent(torrent_file, piece_size)
- self._write_torrent(torrent)
- with self._conn.cursor() as cursor:
- cursor.executemany(
- "insert into diskjumble.slab values (default, %s, %s, %s, %s, null);",
- [
- (other_a.id, NumericRange(0, 1), torrent.info_hash, 0),
- (other_b.id, NumericRange(0, 2), torrent.info_hash, 16),
- (disk.id, NumericRange(0, 1), torrent.info_hash, 48),
- ]
- )
-
- do_verify(self._conn, other_a.id, torrent_file, read_size = 16, read_tries = 1)
- other_b_file = io.BytesIO(torrent_file.getvalue()[16:48])
- do_verify(self._conn, other_b.id, other_b_file, read_size = 16, read_tries = 1)
-
- cursor.execute("select verify_id from diskjumble.verify_piece;")
- [(verify_id,)] = cursor.fetchall()
-
- data = torrent_file.getvalue()[48:]
- disk_file = _ReadErrorProxy(io.BytesIO(data), error_pos = 5, error_count = None)
- do_verify(self._conn, disk.id, disk_file, read_size = 4, read_tries = 3)
-
- cursor.execute("select count(*) from diskjumble.verify_pass;")
- [(row_count,)] = cursor.fetchall()
- self.assertEqual(row_count, 0)
-
- cursor.execute("select disk_id, disk_sectors from diskjumble.verify_piece_content;")
- self.assertCountEqual(
- cursor.fetchall(),
- [(other_a.id, NumericRange(0, 1)), (other_b.id, NumericRange(0, 2)), (disk.id, NumericRange(0, 1))]
- )
-
- cursor.execute("select verify_id from diskjumble.verify_piece_fail;")
- self.assertEqual(cursor.fetchall(), [(verify_id,)])
-
- cursor.execute("select count(*) from diskjumble.verify_piece_incomplete;")
- [(row_count,)] = cursor.fetchall()
- self.assertEqual(row_count, 0)
-
- def _write_torrent(self, torrent: "_Torrent") -> None:
- with self._conn.cursor() as cursor:
- cursor.execute("insert into bittorrent.torrent_info values (%s);", (torrent.info,))
-
- def _write_disk(self, sector_size: int, sector_count: int) -> "_Disk":
- with self._conn.cursor() as cursor:
- cursor.execute(
- "insert into diskjumble.disk values (default, %s, null, %s, %s, false) returning disk_id;",
- (uuid.uuid4(), sector_size, sector_count)
- )
- [(id_,)] = cursor.fetchall()
- return _Disk(id_, sector_size, sector_count)
-
- @classmethod
- def setUpClass(cls):
- psycopg2.extras.register_uuid()
- prod_schema_sql = resources.files(__package__).joinpath("verify_setup.sql").read_text()
- schema_sql = "\n".join(
- l for l in prod_schema_sql.splitlines()
- if (
- not l.startswith("SET")
- and not l.startswith("SELECT") # ignore server settings
- and "OWNER TO" not in l # and ownership changes
- )
- )
- cls._conn = psycopg2.connect("")
- with cls._conn, cls._conn.cursor() as cursor:
- for name in cls._SCHEMAS:
- cursor.execute(f"create schema {name};")
- cursor.execute(schema_sql)
-
- @classmethod
- def tearDownClass(self):
- try:
- with self._conn, self._conn.cursor() as cursor:
- for name in self._SCHEMAS:
- cursor.execute(f"drop schema {name} cascade;")
- finally:
- self._conn.close()
-
- def tearDown(self):
- self._conn.rollback()
-
-
-@dataclass
-class _Disk:
- id: int
- sector_size: int
- sector_count: int
-
-
-class _Torrent:
- def __init__(self, data: io.BufferedIOBase, piece_size: int) -> None:
- data.seek(0)
- hashes = []
- while True:
- piece = data.read(piece_size)
- if piece:
- hashes.append(hashlib.sha1(piece).digest())
- else:
- break
-
- info_dict = {
- b"piece length": piece_size,
- b"length": data.tell(),
- b"pieces": b"".join(hashes),
- }
- self.data = data
- self.info = bencode.encode(info_dict)
- self.info_hash = hashlib.sha1(self.info).digest()
-
-
-def _random_file(size: int, rand_src: Random, on_disk: bool) -> tempfile.NamedTemporaryFile:
- f = tempfile.NamedTemporaryFile(buffering = _BUF_SIZE) if on_disk else io.BytesIO()
- try:
- while f.tell() < size:
- write_size = min(size - f.tell(), _BUF_SIZE)
- f.write(bytes(rand_src.getrandbits(8) for _ in range(write_size)))
- f.seek(0)
- return f
- except Exception:
- f.close()
- raise
-
-
-@dataclass
-class _ReadErrorProxy(io.BufferedIOBase):
- wrapped: io.BufferedIOBase
- error_pos: int
- error_count: Optional[int]
-
- def __post_init__(self):
- self.triggered = 0
-
- def read(self, size: int = -1) -> bytes:
- pre_pos = self.wrapped.tell()
- data = self.wrapped.read(size)
- erroring = self.error_count is None or self.triggered < self.error_count
- in_range = 0 <= pre_pos - self.error_pos < len(data)
- if erroring and in_range:
- self.triggered += 1
- raise OSError("simulated")
- else:
- return data
-
- def seek(self, *args, **kwargs) -> int:
- return self.wrapped.seek(*args, **kwargs)
+++ /dev/null
-create extension btree_gist;
-CREATE OR REPLACE FUNCTION public.digest(bytea, text)
- RETURNS bytea
- LANGUAGE c
- IMMUTABLE PARALLEL SAFE STRICT
-AS '$libdir/pgcrypto', $function$pg_digest$function$
-;
---
--- PostgreSQL database dump
---
-
--- Dumped from database version 13.2 (Debian 13.2-1.pgdg100+1)
--- Dumped by pg_dump version 13.4 (Debian 13.4-0+deb11u1)
-
-SET statement_timeout = 0;
-SET lock_timeout = 0;
-SET idle_in_transaction_session_timeout = 0;
-SET client_encoding = 'UTF8';
-SET standard_conforming_strings = on;
-SELECT pg_catalog.set_config('search_path', '', false);
-SET check_function_bodies = false;
-SET xmloption = content;
-SET client_min_messages = warning;
-SET row_security = off;
-
-SET default_tablespace = '';
-
-SET default_table_access_method = heap;
-
---
--- Name: torrent_info; Type: TABLE; Schema: bittorrent; Owner: eros
---
-
-CREATE TABLE bittorrent.torrent_info (
- info bytea NOT NULL
-);
-
-
-ALTER TABLE bittorrent.torrent_info OWNER TO eros;
-
---
--- Name: disk; Type: TABLE; Schema: diskjumble; Owner: eros
---
-
-CREATE TABLE diskjumble.disk (
- disk_id integer NOT NULL,
- dev_uuid uuid NOT NULL,
- dev_serial text,
- sector_size integer NOT NULL,
- num_sectors bigint NOT NULL,
- failed boolean DEFAULT false NOT NULL
-);
-
-
-ALTER TABLE diskjumble.disk OWNER TO eros;
-
---
--- Name: disk_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros
---
-
-CREATE SEQUENCE diskjumble.disk_id_seq
- AS integer
- START WITH 1
- INCREMENT BY 1
- NO MINVALUE
- NO MAXVALUE
- CACHE 1;
-
-
-ALTER TABLE diskjumble.disk_id_seq OWNER TO eros;
-
---
--- Name: disk_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros
---
-
-ALTER SEQUENCE diskjumble.disk_id_seq OWNED BY diskjumble.disk.disk_id;
-
-
---
--- Name: slab; Type: TABLE; Schema: diskjumble; Owner: eros
---
-
-CREATE TABLE diskjumble.slab (
- slab_id integer NOT NULL,
- disk_id integer NOT NULL,
- disk_sectors int8range NOT NULL,
- entity_id bytea NOT NULL,
- entity_offset bigint NOT NULL,
- crypt_key bytea
-);
-
-
-ALTER TABLE diskjumble.slab OWNER TO eros;
-
---
--- Name: slab_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros
---
-
-CREATE SEQUENCE diskjumble.slab_id_seq
- START WITH 1
- INCREMENT BY 1
- NO MINVALUE
- NO MAXVALUE
- CACHE 1;
-
-
-ALTER TABLE diskjumble.slab_id_seq OWNER TO eros;
-
---
--- Name: slab_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros
---
-
-ALTER SEQUENCE diskjumble.slab_id_seq OWNED BY diskjumble.slab.slab_id;
-
-
---
--- Name: verify_pass; Type: TABLE; Schema: diskjumble; Owner: eros
---
-
-CREATE TABLE diskjumble.verify_pass (
- verify_pass_id integer NOT NULL,
- at timestamp with time zone,
- disk_id integer NOT NULL,
- disk_sectors int8range NOT NULL
-);
-
-
-ALTER TABLE diskjumble.verify_pass OWNER TO eros;
-
---
--- Name: verify_pass_verify_pass_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros
---
-
-CREATE SEQUENCE diskjumble.verify_pass_verify_pass_id_seq
- AS integer
- START WITH 1
- INCREMENT BY 1
- NO MINVALUE
- NO MAXVALUE
- CACHE 1;
-
-
-ALTER TABLE diskjumble.verify_pass_verify_pass_id_seq OWNER TO eros;
-
---
--- Name: verify_pass_verify_pass_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros
---
-
-ALTER SEQUENCE diskjumble.verify_pass_verify_pass_id_seq OWNED BY diskjumble.verify_pass.verify_pass_id;
-
-
---
--- Name: verify_piece; Type: TABLE; Schema: diskjumble; Owner: eros
---
-
-CREATE TABLE diskjumble.verify_piece (
- verify_id integer NOT NULL,
- at timestamp with time zone,
- entity_id bytea NOT NULL,
- piece integer NOT NULL
-);
-
-
-ALTER TABLE diskjumble.verify_piece OWNER TO eros;
-
---
--- Name: verify_piece_content; Type: TABLE; Schema: diskjumble; Owner: eros
---
-
-CREATE TABLE diskjumble.verify_piece_content (
- verify_id integer NOT NULL,
- seq integer NOT NULL,
- disk_id integer NOT NULL,
- disk_sectors int8range NOT NULL
-);
-
-
-ALTER TABLE diskjumble.verify_piece_content OWNER TO eros;
-
---
--- Name: verify_piece_fail; Type: TABLE; Schema: diskjumble; Owner: eros
---
-
-CREATE TABLE diskjumble.verify_piece_fail (
- verify_id integer NOT NULL
-);
-
-
-ALTER TABLE diskjumble.verify_piece_fail OWNER TO eros;
-
---
--- Name: verify_piece_incomplete; Type: TABLE; Schema: diskjumble; Owner: eros
---
-
-CREATE TABLE diskjumble.verify_piece_incomplete (
- verify_id integer NOT NULL,
- hasher_state json
-);
-
-
-ALTER TABLE diskjumble.verify_piece_incomplete OWNER TO eros;
-
---
--- Name: verify_piece_verify_id_seq; Type: SEQUENCE; Schema: diskjumble; Owner: eros
---
-
-CREATE SEQUENCE diskjumble.verify_piece_verify_id_seq
- AS integer
- START WITH 1
- INCREMENT BY 1
- NO MINVALUE
- NO MAXVALUE
- CACHE 1;
-
-
-ALTER TABLE diskjumble.verify_piece_verify_id_seq OWNER TO eros;
-
---
--- Name: verify_piece_verify_id_seq; Type: SEQUENCE OWNED BY; Schema: diskjumble; Owner: eros
---
-
-ALTER SEQUENCE diskjumble.verify_piece_verify_id_seq OWNED BY diskjumble.verify_piece.verify_id;
-
-
---
--- Name: disk disk_id; Type: DEFAULT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.disk ALTER COLUMN disk_id SET DEFAULT nextval('diskjumble.disk_id_seq'::regclass);
-
-
---
--- Name: slab slab_id; Type: DEFAULT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.slab ALTER COLUMN slab_id SET DEFAULT nextval('diskjumble.slab_id_seq'::regclass);
-
-
---
--- Name: verify_pass verify_pass_id; Type: DEFAULT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_pass ALTER COLUMN verify_pass_id SET DEFAULT nextval('diskjumble.verify_pass_verify_pass_id_seq'::regclass);
-
-
---
--- Name: verify_piece verify_id; Type: DEFAULT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece ALTER COLUMN verify_id SET DEFAULT nextval('diskjumble.verify_piece_verify_id_seq'::regclass);
-
-
---
--- Name: disk disk_dev_uuid_key; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.disk
- ADD CONSTRAINT disk_dev_uuid_key UNIQUE (dev_uuid);
-
-
---
--- Name: disk disk_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.disk
- ADD CONSTRAINT disk_pkey PRIMARY KEY (disk_id);
-
-
---
--- Name: slab slab_disk_id_disk_sectors_excl; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.slab
- ADD CONSTRAINT slab_disk_id_disk_sectors_excl EXCLUDE USING gist (disk_id WITH =, disk_sectors WITH &&);
-
-
---
--- Name: slab slab_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.slab
- ADD CONSTRAINT slab_pkey PRIMARY KEY (slab_id);
-
-
---
--- Name: verify_pass verify_pass_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_pass
- ADD CONSTRAINT verify_pass_pkey PRIMARY KEY (verify_pass_id);
-
-
---
--- Name: verify_piece_content verify_piece_content_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece_content
- ADD CONSTRAINT verify_piece_content_pkey PRIMARY KEY (verify_id, seq);
-
-
---
--- Name: verify_piece_fail verify_piece_fail_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece_fail
- ADD CONSTRAINT verify_piece_fail_pkey PRIMARY KEY (verify_id);
-
-
---
--- Name: verify_piece_incomplete verify_piece_incomplete_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece_incomplete
- ADD CONSTRAINT verify_piece_incomplete_pkey PRIMARY KEY (verify_id);
-
-
---
--- Name: verify_piece verify_piece_pkey; Type: CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece
- ADD CONSTRAINT verify_piece_pkey PRIMARY KEY (verify_id);
-
-
---
--- Name: torrent_info_digest_idx; Type: INDEX; Schema: bittorrent; Owner: eros
---
-
-CREATE UNIQUE INDEX torrent_info_digest_idx ON bittorrent.torrent_info USING btree (public.digest(info, 'sha1'::text));
-
-
---
--- Name: slab slab_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.slab
- ADD CONSTRAINT slab_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id);
-
-
---
--- Name: verify_pass verify_pass_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_pass
- ADD CONSTRAINT verify_pass_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id);
-
-
---
--- Name: verify_piece_content verify_piece_content_disk_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece_content
- ADD CONSTRAINT verify_piece_content_disk_id_fkey FOREIGN KEY (disk_id) REFERENCES diskjumble.disk(disk_id);
-
-
---
--- Name: verify_piece_content verify_piece_content_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece_content
- ADD CONSTRAINT verify_piece_content_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id);
-
-
---
--- Name: verify_piece_fail verify_piece_fail_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece_fail
- ADD CONSTRAINT verify_piece_fail_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id);
-
-
---
--- Name: verify_piece_incomplete verify_piece_incomplete_verify_id_fkey; Type: FK CONSTRAINT; Schema: diskjumble; Owner: eros
---
-
-ALTER TABLE ONLY diskjumble.verify_piece_incomplete
- ADD CONSTRAINT verify_piece_incomplete_verify_id_fkey FOREIGN KEY (verify_id) REFERENCES diskjumble.verify_piece(verify_id);
-
-
---
--- PostgreSQL database dump complete
---
-
+++ /dev/null
-from typing import NamedTuple
-
-
-class Tracker(NamedTuple):
- gazelle_id: int
- scrape_url_format: str
-
-
-TRACKERS = {
- "PTP": Tracker(1, "http://please.passthepopcorn.me:2710/{passkey}/scrape"),
- "BTN": Tracker(3, "http://landof.tv/{passkey}/scrape"),
- "Redacted": Tracker(4, "https://flacsfor.me/{passkey}/scrape"),
-}
+++ /dev/null
-from __future__ import annotations
-from abc import ABCMeta, abstractmethod
-from dataclasses import dataclass
-from typing import Optional
-import argparse
-import contextlib
-import datetime as dt
-import io
-import itertools
-import math
-
-import psycopg2
-
-from disk_jumble import bencode
-from disk_jumble.db import HasherRef, Slab, Wrapper as DbWrapper
-from disk_jumble.nettle import Sha1Hasher
-
-
-_READ_BUFFER_SIZE = 16 * 1024 ** 2 # in bytes
-
-
-@dataclass
-class _SlabChunk:
- """A slice of a slab; comprising all or part of a piece to be hashed."""
- slab: Slab
- slice: slice
-
-
-@dataclass
-class _PieceTask:
- """The chunks needed to hash as fully as possible an entity piece."""
- entity_id: bytes
- piece_num: int
- hasher_ref: Optional[HasherRef]
- chunks: list[_SlabChunk]
- complete: bool # do these chunks complete the piece?
-
-
-class _BadSector(Exception):
- pass
-
-
-def do_verify(conn, disk_id: int, disk_file: io.BufferedIOBase, read_size: int, read_tries: int) -> None:
- db = DbWrapper(conn)
- disk = db.get_disk(disk_id)
-
- info_dicts = {
- info_hash: bencode.decode(info)
- for (info_hash, info) in db.get_torrent_info(disk_id)
- }
-
- tasks = []
- slabs_and_hashers = db.get_slabs_and_hashers(disk_id)
- for (entity_id, group) in itertools.groupby(slabs_and_hashers, lambda t: t[0].entity_id):
- info = info_dicts[entity_id]
- piece_len = info[b"piece length"]
- assert piece_len % disk.sector_size == 0
- if b"length" in info:
- torrent_len = info[b"length"]
- else:
- torrent_len = sum(d[b"length"] for d in info[b"files"])
-
- offset = None
- use_hasher = None
- chunks = []
- for (slab, hasher_ref) in group:
- slab_end = min(slab.entity_offset + len(slab.sectors) * disk.sector_size, torrent_len)
-
- while offset is None or offset < slab_end:
- if offset is not None and slab.entity_offset > offset:
- if chunks:
- tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, False))
- offset = None
- use_hasher = None
- chunks = []
-
- if offset is None:
- aligned = math.ceil(slab.entity_offset / piece_len) * piece_len
- if hasher_ref and hasher_ref.entity_offset < aligned:
- assert hasher_ref.entity_offset < torrent_len
- use_hasher = hasher_ref
- offset = hasher_ref.entity_offset
- elif aligned < slab_end:
- offset = aligned
- else:
- break # no usable data in this slab
-
- if offset is not None:
- piece_end = min(offset + piece_len - offset % piece_len, torrent_len)
- chunk_end = min(piece_end, slab_end)
- chunks.append(_SlabChunk(slab, slice(offset - slab.entity_offset, chunk_end - slab.entity_offset)))
- if chunk_end == piece_end:
- tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, True))
- use_hasher = None
- chunks = []
- offset = chunk_end
-
- if chunks:
- tasks.append(_PieceTask(entity_id, offset // piece_len, use_hasher, chunks, False))
-
- @dataclass
- class NewVerifyPiece:
- entity_id: bytes
- piece_num: int
- sector_ranges: list[range]
- hasher_state: Optional[dict]
- failed: bool
-
- @dataclass
- class VerifyUpdate:
- seq_start: int
- new_sector_ranges: list[range]
- hasher_state: Optional[dict]
-
- passed_verifies = set()
- failed_verifies = set()
- new_pass_ranges = []
- vp_updates = {}
- new_vps = []
-
- run_ts = dt.datetime.now(dt.timezone.utc)
- for task in tasks:
- hasher = Sha1Hasher(task.hasher_ref.state if task.hasher_ref else None)
- sector_ranges = [
- range(
- chunk.slab.sectors.start + chunk.slice.start // disk.sector_size,
- chunk.slab.sectors.start + math.ceil(chunk.slice.stop / disk.sector_size)
- )
- for chunk in task.chunks
- ]
-
- try:
- for chunk in task.chunks:
- slab_off = chunk.slab.sectors.start * disk.sector_size
- disk_file.seek(slab_off + chunk.slice.start)
- end_pos = slab_off + chunk.slice.stop
- while disk_file.tell() < end_pos:
- pos = disk_file.tell()
- for _ in range(read_tries):
- try:
- data = disk_file.read(min(end_pos - pos, read_size))
- except OSError as e:
- disk_file.seek(pos)
- else:
- break
- else:
- raise _BadSector()
-
- assert data
- hasher.update(data)
- except _BadSector:
- if task.hasher_ref:
- failed_verifies.add(task.hasher_ref.id)
- vp_updates[task.hasher_ref.id] = VerifyUpdate(task.hasher_ref.seq + 1, sector_ranges, None)
- else:
- new_vps.append(NewVerifyPiece(task.entity_id, task.piece_num, sector_ranges, None, True))
- else:
- hasher_state = hasher.ctx.serialize()
- if task.complete:
- s = slice(task.piece_num * 20, task.piece_num * 20 + 20)
- expected_hash = info_dicts[task.entity_id][b"pieces"][s]
- if hasher.digest() == expected_hash:
- write_piece_data = False
- new_pass_ranges.extend(sector_ranges)
- if task.hasher_ref:
- passed_verifies.add(task.hasher_ref.id)
- else:
- failed = True
- write_piece_data = True
- if task.hasher_ref:
- failed_verifies.add(task.hasher_ref.id)
- else:
- failed = False
- write_piece_data = True
-
- if write_piece_data:
- if task.hasher_ref:
- assert task.hasher_ref.id not in vp_updates
- vp_updates[task.hasher_ref.id] = VerifyUpdate(task.hasher_ref.seq + 1, sector_ranges, hasher_state)
- else:
- new_vps.append(NewVerifyPiece(task.entity_id, task.piece_num, sector_ranges, hasher_state, failed))
-
- new_pass_ranges.sort(key = lambda r: r.start)
- merged_ranges = []
- for r in new_pass_ranges:
- if merged_ranges and r.start == merged_ranges[-1].stop:
- merged_ranges[-1] = range(merged_ranges[-1].start, r.stop)
- else:
- merged_ranges.append(r)
-
- for vp in new_vps:
- verify_id = db.insert_verify_piece(run_ts, vp.entity_id, vp.piece_num)
- db.insert_verify_piece_content(verify_id, 0, disk_id, vp.sector_ranges)
- if vp.failed:
- db.mark_verify_piece_failed(verify_id)
- else:
- db.upsert_hasher_state(verify_id, vp.hasher_state)
-
- for (verify_id, update) in vp_updates.items():
- db.insert_verify_piece_content(verify_id, update.seq_start, disk_id, update.new_sector_ranges)
- if update.hasher_state:
- db.upsert_hasher_state(verify_id, update.hasher_state)
-
- for verify_id in passed_verifies:
- db.move_piece_content_for_pass(verify_id)
- db.delete_verify_piece(verify_id)
-
- for r in merged_ranges:
- db.insert_pass_data(run_ts, disk_id, r)
-
- for verify_id in failed_verifies:
- db.clear_incomplete(verify_id)
- db.mark_verify_piece_failed(verify_id)
-
-
-if __name__ == "__main__":
- def read_tries(raw_arg):
- val = int(raw_arg)
- if val > 0:
- return val
- else:
- raise ValueError()
-
- parser = argparse.ArgumentParser()
- parser.add_argument("disk_id", type = int)
- parser.add_argument(
- "read_tries",
- type = read_tries,
- help = "number of times to attempt a particular disk read before giving up on the sector",
- )
- args = parser.parse_args()
-
- with contextlib.closing(psycopg2.connect("")) as conn:
- conn.autocommit = True
- path = f"/dev/mapper/diskjumble-{args.disk_id}"
- with open(path, "rb", buffering = _READ_BUFFER_SIZE) as disk_file:
- do_verify(conn, args.disk_id, disk_file, _READ_BUFFER_SIZE, args.read_tries)
+++ /dev/null
-"""
-Using the live database, dump creation code for extensions, tables, and functions needed for local testing
-
-For testing the verification script, write the output of this script to:
-
- src/disk_jumble/tests/verify_setup.sql
-"""
-
-import contextlib
-import itertools
-import os
-import subprocess
-
-import psycopg2
-
-
-procedures = [
- "public.digest(bytea, text)",
-]
-
-extensions = [
- "btree_gist",
-]
-
-with contextlib.closing(psycopg2.connect("")) as conn:
- conn.autocommit = True
- with conn.cursor() as cursor:
- for ext in extensions:
- print(f"create extension {ext};", flush = True)
- for sig in procedures:
- cursor.execute("select pg_get_functiondef(to_regprocedure(%s));", (sig,))
- [(sql,)] = cursor.fetchall()
- print(sql + ";", flush = True)
-
-tables = [
- "diskjumble.disk",
- "diskjumble.slab",
- "diskjumble.verify_pass",
- "diskjumble.verify_piece",
- "diskjumble.verify_piece_content",
- "diskjumble.verify_piece_fail",
- "diskjumble.verify_piece_incomplete",
- "bittorrent.torrent_info",
-]
-argv = ["pg_dump", *itertools.chain.from_iterable(["-t", table] for table in tables), "--schema-only", os.environ["PGDATABASE"]]
-subprocess.run(argv, check = True)