--- /dev/null
+import argparse
+import contextlib
+import json
+import pathlib
+import sys
+
+import psycopg2
+import zstd
+
+
+arg_parser = argparse.ArgumentParser()
+arg_parser.add_argument("paths", metavar = "path", nargs = "+")
+args = arg_parser.parse_args()
+paths = set(map(pathlib.Path, args.paths))
+
+
+# Fail fast if a path is wrong
+for p in paths:
+ p.open("rb").close()
+
+
+def ingest(doc, db_cursor):
+ for group in doc["Movies"]:
+ stmt = (
+ "insert into ptp.torrent_group (id, title, year, imdb_id)"
+ + " values (%s, %s, %s, %s, %s)"
+ + " on conflict (id) do update set (id, title, year, imdb_id) = row(excluded.*)"
+ + ";"
+ )
+ db_cursor.execute(
+ stmt,
+ (group["GroupId"], group["Title"], group["Year"], group.get("ImdbId"))
+ )
+
+ group_cols = "(group_id, torrentid, quality, source, container, codec, resolution, seeders, leechers, snatched)"
+ stmt = (
+ f"insert into ptp.torrent {group_cols}"
+ + " values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
+ + f" on conflict (torrentid) do update set {group_cols} = row(excluded.*)"
+ + ";"
+ )
+ for torrent in group["Torrents"]:
+ db_cursor.execute(
+ stmt,
+ (
+ group["GroupId"], torrent["Id"], torrent["Quality"], torrent["Source"], torrent["Container"],
+ torrent["Codec"], torrent["Resolution"], int(torrent["Seeders"]), int(torrent["Leechers"]),
+ int(torrent["Snatched"]),
+ )
+ )
+
+
+with contextlib.closing(psycopg2.connect()) as conn, conn.cursor() as cursor:
+ conn.autocommit = True
+
+ for p in paths:
+ with p.open("rb") as f:
+ json_data = zstd.decompress(f.read()).decode("utf-8")
+
+ doc = json.loads(json_data)
+ group_count = len(doc["Movies"])
+ torrent_count = sum(len(g["Torrents"]) for g in doc["Movies"])
+
+ ingest(doc, cursor)
+ print(p, file = sys.stderr)
+ print(" processed {} torrents in {} groups".format(torrent_count, group_count), file = sys.stderr)