--- /dev/null
+from typing import Dict, List, Union
+
+
+Bdict = Dict[bytes, 'Type']
+Type = Union[bytes, int, List['Type'], Bdict]
+
+
+class DecodeError(Exception):
+ pass
+
+
+def _pop_bytes(vals: List[bytes]) -> bytes:
+ len_parts = []
+ while vals and vals[0].isdigit():
+ len_parts.append(vals.pop(0))
+
+ try:
+ length = int(b"".join(len_parts).decode("ascii"))
+ except ValueError:
+ raise DecodeError()
+
+ try:
+ if vals.pop(0) != b":":
+ raise DecodeError()
+ except IndexError:
+ raise DecodeError()
+
+ if length > len(vals):
+ raise DecodeError()
+
+ out = b"".join(vals[:length])
+ del vals[:length]
+ return out
+
+
+def _pop_int(vals: List[bytes]) -> int:
+ assert vals.pop(0) == b"i"
+
+ try:
+ end = vals.index(b"e")
+ except ValueError:
+ raise DecodeError()
+
+ try:
+ out = int(b"".join(vals[:end]).decode("ascii"))
+ except ValueError:
+ raise DecodeError()
+
+ del vals[slice(end + 1)]
+ return out
+
+
+def _pop_list(vals: List[bytes]) -> List[Type]:
+ assert vals.pop(0) == b"l"
+
+ out = []
+ while vals and vals[0] != b"e":
+ out.append(_pop_value(vals))
+
+ if vals:
+ del vals[0]
+ return out
+ else:
+ raise DecodeError()
+
+
+def _pop_dict(vals: List[bytes]) -> Bdict:
+ assert vals.pop(0) == b"d"
+
+ out = {}
+ while vals and vals[0] != b"e":
+ key = _pop_bytes(vals)
+ out[key] = _pop_value(vals)
+
+ if vals:
+ del vals[0]
+ return out
+ else:
+ raise DecodeError()
+
+
+def _pop_value(vals: List[bytes]) -> Type:
+ if vals:
+ if vals[0].isdigit():
+ return _pop_bytes(vals)
+ elif vals[0] == b"i":
+ return _pop_int(vals)
+ elif vals[0] == b"l":
+ return _pop_list(vals)
+ elif vals[0] == b"d":
+ return _pop_dict(vals)
+ else:
+ raise DecodeError()
+ else:
+ raise DecodeError()
+
+
+def decode(data: bytes) -> Type:
+ vals = [bytes([v]) for v in data]
+ out = _pop_value(vals)
+ if vals:
+ raise DecodeError()
+ else:
+ return out
--- /dev/null
+from typing import Iterable, Optional
+
+from psycopg2.extras import execute_batch
+
+
+class Wrapper:
+ def __init__(self, conn) -> None:
+ conn.autocommit = True
+ self.conn = conn
+
+ def get_passkey(self, tracker_id: int) -> str:
+ with self.conn.cursor() as cursor:
+ cursor.execute("select passkey from gazelle.passkey where gazelle_tracker_id = %s;", (tracker_id,))
+ [(passkey,)] = cursor.fetchall()
+ return passkey
+
+ def get_torrents(self, tracker_id: int, batch_size: Optional[int] = None) -> Iterable[bytes]:
+ """Iterate the info hashes for the specified tracker which haven't been marked deleted."""
+
+ stmt = (
+ "select infohash from gazelle.torrent"
+ + " where gazelle_tracker_id = %s and not is_deleted"
+ + " order by infohash asc"
+ + ";"
+ )
+ with self.conn.cursor() as cursor:
+ if batch_size is not None:
+ cursor.itersize = batch_size
+
+ cursor.execute(stmt, (tracker_id,))
+ for row in cursor:
+ (info_hash,) = row
+ assert len(info_hash) == 20
+ yield bytes(info_hash)
+
+ def insert_swarm_info(self, tracker_id: int, infos: Iterable["disk_jumble.scrape.ScrapeInfo"]) -> None:
+ stmt = (
+ "insert into gazelle.tracker_stat (gazelle_tracker_id, infohash, ts, complete, incomplete, downloaded)"
+ + " values (%s, %s, %s, %s, %s, %s)"
+ + ";"
+ )
+ with self.conn.cursor() as cursor:
+ param_sets = [
+ (tracker_id, i.info_hash, i.timestamp, i.complete, i.incomplete, i.downloaded)
+ for i in infos
+ ]
+ execute_batch(cursor, stmt, param_sets)
--- /dev/null
+from dataclasses import dataclass
+from typing import Iterable, Union
+from urllib.error import URLError
+from urllib.parse import urlencode, urlparse, urlunparse
+from urllib.request import HTTPHandler, HTTPRedirectHandler, HTTPSHandler, OpenerDirector, UnknownHandler
+import argparse
+import contextlib
+import datetime as dt
+import itertools
+import re
+import sys
+import time
+
+import psycopg2
+
+from disk_jumble import bencode
+from disk_jumble.db import Wrapper as DbWrapper
+from disk_jumble.trackers import Tracker, TRACKERS
+
+
+class Result:
+ pass
+
+
+@dataclass
+class OkResult(Result):
+ data: bencode.Type
+
+
+@dataclass
+class ErrorResult(Result):
+ status: int
+ body: Union[bencode.Type, bytes]
+
+
+@dataclass
+class ScrapeInfo:
+ info_hash: bytes
+ timestamp: dt.datetime
+ complete: int
+ incomplete: int
+ downloaded: int
+
+
+def scrape_batch(tracker: Tracker, info_hashes: Iterable[bytes], passkey: str) -> Result:
+ qs = urlencode({"info_hash": list(info_hashes)}, doseq = True)
+ url_parts = urlparse(tracker.scrape_url_format.format(passkey = passkey))
+ assert not url_parts.query
+ url = urlunparse(url_parts._replace(query = qs))
+
+ # we handle HTTP errors ourself
+ opener = OpenerDirector()
+ for handler in [UnknownHandler(), HTTPHandler(), HTTPSHandler(), HTTPRedirectHandler()]:
+ opener.add_handler(handler)
+
+ with opener.open(url) as resp:
+ body = resp.read()
+
+ try:
+ data = bencode.decode(body)
+ except bencode.DecodeError:
+ data = body
+ else:
+ if resp.getcode() == 200 and b"files" in data:
+ return OkResult(data)
+
+ return ErrorResult(resp.getcode(), data)
+
+
+if __name__ == "__main__":
+ PSQL_PARAMS = ["dbname", "user", "password", "host", "port"]
+
+ def tracker_name(val):
+ matches = [n for n in TRACKERS.keys() if n.lower() == val.lower()]
+ if matches:
+ [name] = matches
+ return name
+ else:
+ raise ValueError()
+
+ def batch_size(val):
+ out = int(val)
+ if out > 0:
+ return out
+ else:
+ raise ValueError()
+
+ def delay(val):
+ out = int(val)
+ if out >= 0:
+ return out
+ else:
+ raise ValueError()
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "tracker",
+ choices = sorted(TRACKERS.keys()),
+ type = tracker_name,
+ help = "name of tracker to scrape",
+ )
+ parser.add_argument(
+ "batch_size",
+ type = batch_size,
+ help = "number of torrents per batch",
+ )
+ parser.add_argument(
+ "delay",
+ type = delay,
+ help = "delay between batches, in seconds",
+ )
+
+ for arg_name in PSQL_PARAMS:
+ parser.add_argument("--" + arg_name, nargs = "?")
+
+ args = parser.parse_args()
+ tracker = TRACKERS[args.tracker]
+ delay = dt.timedelta(seconds = args.delay)
+
+ params = {n: getattr(args, n) for n in PSQL_PARAMS if getattr(args, n)}
+ with contextlib.closing(psycopg2.connect(**params)) as conn:
+ db_wrapper = DbWrapper(conn)
+ passkey = db_wrapper.get_passkey(tracker.gazelle_id)
+ info_hashes = iter(db_wrapper.get_torrents(tracker.gazelle_id, args.batch_size))
+ batches = iter(lambda: list(itertools.islice(info_hashes, args.batch_size)), [])
+ for (i, batch) in enumerate(batches):
+ if i != 0:
+ time.sleep(delay.total_seconds())
+ timestamp = dt.datetime.now(dt.timezone.utc)
+ try:
+ result = scrape_batch(tracker, batch, passkey)
+ except URLError as e:
+ print("[ERR] couldn't connect: {}".format(e), file = sys.stderr, flush = True)
+ else:
+ if isinstance(result, OkResult):
+ assert set(result.data[b"files"].keys()) <= set(batch), "unexpected torrent in respose"
+ infos = [
+ ScrapeInfo(
+ info_hash,
+ timestamp,
+ complete = meta_dict[b"complete"],
+ incomplete = meta_dict[b"incomplete"],
+ downloaded = meta_dict[b"downloaded"],
+ )
+ for (info_hash, meta_dict) in result.data[b"files"].items()
+ ]
+ db_wrapper.insert_swarm_info(tracker.gazelle_id, infos)
+ print("[OK] finished batch ({} of {} torrents)".format(len(infos), len(batch)), file = sys.stderr, flush = True)
+ elif isinstance(result, ErrorResult):
+ full_disp = result.body.decode("ascii", "ignore") if isinstance(result.body, bytes) else str(result.body)
+ clean_disp = re.sub(r"\s", " ", full_disp)
+ display_size = 100
+ disp = clean_disp if len(clean_disp) <= display_size else clean_disp[:display_size] + "…"
+ print("[ERR] tracker responded {}: {}".format(result.status, disp), file = sys.stderr, flush = True)
+ else:
+ raise Exception()