From 910f1f24936d8854e0740a585ecf252bdb09c84f Mon Sep 17 00:00:00 2001 From: Jakob Cornell Date: Sat, 9 Oct 2021 01:28:38 -0500 Subject: [PATCH 1/1] Add gitignore and PTP JSON ingest script --- .gitignore | 3 ++ python/scripts/ingest_ptp_json.py | 66 +++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 .gitignore create mode 100644 python/scripts/ingest_ptp_json.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fbf1bc5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.egg-info +__pycache__ +.mypy_cache diff --git a/python/scripts/ingest_ptp_json.py b/python/scripts/ingest_ptp_json.py new file mode 100644 index 0000000..9de0380 --- /dev/null +++ b/python/scripts/ingest_ptp_json.py @@ -0,0 +1,66 @@ +import argparse +import contextlib +import json +import pathlib +import sys + +import psycopg2 +import zstd + + +arg_parser = argparse.ArgumentParser() +arg_parser.add_argument("paths", metavar = "path", nargs = "+") +args = arg_parser.parse_args() +paths = set(map(pathlib.Path, args.paths)) + + +# Fail fast if a path is wrong +for p in paths: + p.open("rb").close() + + +def ingest(doc, db_cursor): + for group in doc["Movies"]: + stmt = ( + "insert into ptp.torrent_group (id, title, year, imdb_id)" + + " values (%s, %s, %s, %s, %s)" + + " on conflict (id) do update set (id, title, year, imdb_id) = row(excluded.*)" + + ";" + ) + db_cursor.execute( + stmt, + (group["GroupId"], group["Title"], group["Year"], group.get("ImdbId")) + ) + + group_cols = "(group_id, torrentid, quality, source, container, codec, resolution, seeders, leechers, snatched)" + stmt = ( + f"insert into ptp.torrent {group_cols}" + + " values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" + + f" on conflict (torrentid) do update set {group_cols} = row(excluded.*)" + + ";" + ) + for torrent in group["Torrents"]: + db_cursor.execute( + stmt, + ( + group["GroupId"], torrent["Id"], torrent["Quality"], torrent["Source"], torrent["Container"], + torrent["Codec"], torrent["Resolution"], int(torrent["Seeders"]), int(torrent["Leechers"]), + int(torrent["Snatched"]), + ) + ) + + +with contextlib.closing(psycopg2.connect()) as conn, conn.cursor() as cursor: + conn.autocommit = True + + for p in paths: + with p.open("rb") as f: + json_data = zstd.decompress(f.read()).decode("utf-8") + + doc = json.loads(json_data) + group_count = len(doc["Movies"]) + torrent_count = sum(len(g["Torrents"]) for g in doc["Movies"]) + + ingest(doc, cursor) + print(p, file = sys.stderr) + print(" processed {} torrents in {} groups".format(torrent_count, group_count), file = sys.stderr) -- 2.30.2