Simplify repository layout
authorJakob Cornell <jakob+gpg@jcornell.net>
Thu, 20 Jan 2022 01:50:00 +0000 (19:50 -0600)
committerJakob Cornell <jakob+gpg@jcornell.net>
Thu, 20 Jan 2022 01:50:00 +0000 (19:50 -0600)
python/scripts/gdrive_access_token.py [deleted file]
python/scripts/gdrive_hash_worker [deleted file]
python/scripts/ingest_ptp_json.py [deleted file]
scripts/gdrive_access_token.py [new file with mode: 0644]
scripts/gdrive_hash_worker [new file with mode: 0755]
scripts/ingest_ptp_json.py [new file with mode: 0644]

diff --git a/python/scripts/gdrive_access_token.py b/python/scripts/gdrive_access_token.py
deleted file mode 100644 (file)
index d52543e..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-import psycopg2
-import urllib.request
-import urllib.parse
-import io
-import json
-import re
-
-#
-# refresh tokens
-#
-
-cn = psycopg2.connect('')
-cr = cn.cursor()
-
-cr.execute("BEGIN")
-cr.execute("SELECT id, client_id, client_secret, refresh_token FROM gdrive.access_token WHERE id IN (1) AND refresh_token IS NOT NULL AND expires IS NULL OR expires < CURRENT_TIMESTAMP + '30min' FOR NO KEY UPDATE")
-for (id_, client_id, client_secret, refresh_token) in cr.fetchall():
-       token_req = urllib.parse.urlencode((
-                       ('grant_type', 'refresh_token'),
-                       ('client_id', client_id),
-                       ('client_secret', client_secret),
-                       ('refresh_token', refresh_token))).encode('utf-8')
-       resp = urllib.request.urlopen(urllib.request.Request("https://www.googleapis.com/oauth2/v4/token", data=token_req, method='POST'))
-       assert resp.status == 200
-       new_token = json.load(io.TextIOWrapper(io.BufferedReader(resp)))
-       cr.execute("UPDATE gdrive.access_token SET access_token = %s, scope = %s, expires = CURRENT_TIMESTAMP + make_interval(secs => %s) WHERE id = %s", (new_token['access_token'], new_token['scope'], new_token['expires_in'], id_))
-
-cn.commit()
-
-cn.close()
diff --git a/python/scripts/gdrive_hash_worker b/python/scripts/gdrive_hash_worker
deleted file mode 100755 (executable)
index 6f6fa25..0000000
+++ /dev/null
@@ -1,99 +0,0 @@
-#!/usr/bin/python3
-
-import hashlib
-import psycopg2
-import subprocess
-from time import sleep
-
-cn = psycopg2.connect('')
-cn.set_isolation_level(0) # autocommit
-cr = cn.cursor()
-
-while True:
-       cr.execute('WITH thw AS (INSERT INTO gdrive.gdrive_hash_worker (backend_pid, job_id) SELECT pg_backend_pid(), job_id FROM gdrive.gdrive_hash_job tj JOIN (SELECT job_id FROM gdrive.gdrive_hash_job WHERE NOT done EXCEPT SELECT job_id FROM gdrive.gdrive_hash_worker LIMIT 1) ti USING (job_id) ON CONFLICT (backend_pid) DO UPDATE SET job_id=EXCLUDED.job_id RETURNING job_id, bw_limit) SELECT job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, lower(upload_range), upper(upload_range) FROM thw JOIN gdrive.gdrive_hash_job USING (job_id) JOIN gdrive.upload USING (upload_id) JOIN gdrive.file USING (upload_id), gdrive.access_token WHERE access_token.id=1 AND access_token.expires>CURRENT_TIMESTAMP')
-       try:
-               [(job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, upload_start, upload_end)] = cr.fetchall()
-       except ValueError: # no jobs
-               sleep(60)
-               continue
-
-       upload_len = upload_end - upload_start
-       upload_waste_start = upload_start % 16
-       upload_blk_start = upload_start - upload_waste_start
-       upload_waste_end = (16 - upload_end % 16 ) % 16
-       upload_blk_end = upload_end + upload_waste_end + 15
-
-       buf = bytearray(16384)
-
-       num_blocks = upload_len >> 14
-       if upload_len > (num_blocks << 14):
-               num_blocks += 1
-
-       if hash_blocks:
-               block_hashes = bytearray(num_blocks * 32)
-       else:
-               block_hashes = None
-
-       if hash_pieces:
-               num_pieces = upload_len >> piece_size
-               if upload_len > (num_pieces << piece_size):
-                       num_pieces += 1
-               log2_blocks_per_piece = piece_size - 14
-               piece_hashes = bytearray(num_pieces * 20)
-       else:
-               num_pieces = None
-               piece_hashes = None
-       
-       if bw_limit is not None:
-               bw_limit_str = " '--limit-rate' '{}k'".format(bw_limit)
-       else:
-               bw_limit_str = ''
-
-       coproc = subprocess.Popen(["curl '--fail'{} '-H' 'Authorization: Bearer {}' '-H' 'Range: bytes={}-{}' 'https://www.googleapis.com/drive/v3/files/{}?alt=media' | openssl 'enc' '-aes-128-cbc' '-d' '-nopad' '-K' '{}' '-iv' \"`head --bytes 16 | xxd -p`\" | dd 'bs=16M' 'skip={}' 'count={}' 'iflag=skip_bytes,count_bytes,fullblock' 'status=none'".format(bw_limit_str, access_token, upload_blk_start, upload_blk_end, gdrive_file_id, encryption_key.hex(), upload_waste_start, upload_len)], stdout=subprocess.PIPE, shell=True)
-
-       block = 0
-       for piece in range(num_pieces or 1):
-               if hash_pieces:
-                       piece_end = min((piece+1) << log2_blocks_per_piece, num_blocks)
-                       piece_hasher = hashlib.sha1()
-               else:
-                       piece_end = num_blocks
-               while block < piece_end:
-                       bytes_to_end = upload_len - (block << 14)
-                       if bytes_to_end < 16384:
-                               buf = buf[:bytes_to_end]
-                       assert coproc.stdout.readinto(buf) == len(buf)
-                       if hash_blocks:
-                               hashbuf_offset = block << 5
-                               block_hashes[hashbuf_offset:hashbuf_offset+32] = hashlib.sha256(buf).digest()
-                       if hash_pieces:
-                               piece_hasher.update(buf)
-                       block += 1
-               if hash_pieces:
-                       hashbuf_offset = piece * 20
-                       piece_hashes[hashbuf_offset:hashbuf_offset+20] = piece_hasher.digest()
-       assert coproc.stdout.read(1) == b'' # EOF
-       assert coproc.wait() == 0 # subprocess success
-
-       cr.execute("""
-               WITH ti AS (SELECT
-                               %s AS job_id,
-                               %s::integer AS piece_size,
-                               %s AS hash_pieces,
-                               %s AS hash_blocks,
-                               upload_id,
-                               int8range(%s, %s) AS upload_range,
-                               %s::bytea AS piece_hashes,
-                               %s::bytea AS block_hashes
-                       FROM gdrive.file
-                       WHERE gdrive_file_id=%s),
-               th AS (INSERT INTO gdrive.gdrive_hashes (upload_id, upload_range, piece_size, algo, hashes)
-                       SELECT upload_id, upload_range, piece_size, 'sha1', piece_hashes FROM ti WHERE hash_pieces
-                       UNION SELECT upload_id, upload_range, 14, 'sha256', block_hashes FROM ti WHERE hash_blocks),
-               thw AS (UPDATE gdrive.gdrive_hash_worker ts SET job_id=NULL
-                       FROM ti WHERE backend_pid=pg_backend_pid() AND ts.job_id=ti.job_id)
-               UPDATE gdrive.gdrive_hash_job ts
-               SET upload_id=ti.upload_id, upload_range=ti.upload_range, piece_size=ti.piece_size,
-                       hash_pieces=ti.hash_pieces, hash_blocks=ti.hash_blocks, done=true
-               FROM ti WHERE ts.job_id=ti.job_id
-       """, (job_id, piece_size, hash_pieces, hash_blocks, upload_start, upload_end, piece_hashes, block_hashes, gdrive_file_id))
diff --git a/python/scripts/ingest_ptp_json.py b/python/scripts/ingest_ptp_json.py
deleted file mode 100644 (file)
index 1104530..0000000
+++ /dev/null
@@ -1,72 +0,0 @@
-import argparse
-import bz2
-import contextlib
-import json
-import pathlib
-import sys
-
-import psycopg2
-
-
-arg_parser = argparse.ArgumentParser()
-arg_parser.add_argument("paths", metavar = "path", nargs = "+")
-args = arg_parser.parse_args()
-paths = list({pathlib.Path(a): None for a in args.paths}.keys())  # deduplicate
-
-
-# Fail fast if a path is wrong
-for p in paths:
-       p.open("rb").close()
-
-
-def ingest(doc, db_cursor):
-       for group in doc["Movies"]:
-               group_id = group["GroupId"]
-               stmt = (
-                       "insert into ptp.torrent_group (id, title, year, imdb_id)"
-                       + " values (%s, %s, %s, %s)"
-                       + " on conflict (id) do update set (id, title, year, imdb_id) = row(excluded.*)"
-                       + ";"
-               )
-               db_cursor.execute(
-                       stmt,
-                       (group_id, group.get("Title"), group.get("Year"), group.get("ImdbId"))
-               )
-
-               group_cols = "(group_id, torrentid, quality, source, container, codec, resolution, seeders, leechers, snatched)"
-               stmt = (
-                       f"insert into ptp.torrent {group_cols}"
-                       + " values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
-                       + f" on conflict (torrentid) do update set {group_cols} = row(excluded.*)"
-                       + ";"
-               )
-               for torrent in group["Torrents"]:
-                       id_ = torrent["Id"]
-                       quality = torrent.get("Quality")
-                       source = torrent.get("Source")
-                       container = torrent.get("Container")
-                       codec = torrent.get("Codec")
-                       resolution = torrent.get("Resolution")
-                       seeders = int(torrent["Seeders"]) if "Seeders" in torrent else None
-                       leechers = int(torrent["Leechers"]) if "Leechers" in torrent else None
-                       snatched = int(torrent["Snatched"]) if "Snatched" in torrent else None
-                       db_cursor.execute(
-                               stmt,
-                               (group_id, id_, quality, source, container, codec, resolution, seeders, leechers, snatched),
-                       )
-
-
-with contextlib.closing(psycopg2.connect("")) as conn, conn.cursor() as cursor:
-       conn.autocommit = True
-
-       for p in paths:
-               with p.open("rb") as f:
-                       json_data = bz2.decompress(f.read()).decode("utf-8")
-
-               doc = json.loads(json_data)
-               group_count = len(doc["Movies"])
-               torrent_count = sum(len(g["Torrents"]) for g in doc["Movies"])
-
-               ingest(doc, cursor)
-               print(p, file = sys.stderr)
-               print("  processed {} torrents in {} groups".format(torrent_count, group_count), file = sys.stderr)
diff --git a/scripts/gdrive_access_token.py b/scripts/gdrive_access_token.py
new file mode 100644 (file)
index 0000000..d52543e
--- /dev/null
@@ -0,0 +1,30 @@
+import psycopg2
+import urllib.request
+import urllib.parse
+import io
+import json
+import re
+
+#
+# refresh tokens
+#
+
+cn = psycopg2.connect('')
+cr = cn.cursor()
+
+cr.execute("BEGIN")
+cr.execute("SELECT id, client_id, client_secret, refresh_token FROM gdrive.access_token WHERE id IN (1) AND refresh_token IS NOT NULL AND expires IS NULL OR expires < CURRENT_TIMESTAMP + '30min' FOR NO KEY UPDATE")
+for (id_, client_id, client_secret, refresh_token) in cr.fetchall():
+       token_req = urllib.parse.urlencode((
+                       ('grant_type', 'refresh_token'),
+                       ('client_id', client_id),
+                       ('client_secret', client_secret),
+                       ('refresh_token', refresh_token))).encode('utf-8')
+       resp = urllib.request.urlopen(urllib.request.Request("https://www.googleapis.com/oauth2/v4/token", data=token_req, method='POST'))
+       assert resp.status == 200
+       new_token = json.load(io.TextIOWrapper(io.BufferedReader(resp)))
+       cr.execute("UPDATE gdrive.access_token SET access_token = %s, scope = %s, expires = CURRENT_TIMESTAMP + make_interval(secs => %s) WHERE id = %s", (new_token['access_token'], new_token['scope'], new_token['expires_in'], id_))
+
+cn.commit()
+
+cn.close()
diff --git a/scripts/gdrive_hash_worker b/scripts/gdrive_hash_worker
new file mode 100755 (executable)
index 0000000..6f6fa25
--- /dev/null
@@ -0,0 +1,99 @@
+#!/usr/bin/python3
+
+import hashlib
+import psycopg2
+import subprocess
+from time import sleep
+
+cn = psycopg2.connect('')
+cn.set_isolation_level(0) # autocommit
+cr = cn.cursor()
+
+while True:
+       cr.execute('WITH thw AS (INSERT INTO gdrive.gdrive_hash_worker (backend_pid, job_id) SELECT pg_backend_pid(), job_id FROM gdrive.gdrive_hash_job tj JOIN (SELECT job_id FROM gdrive.gdrive_hash_job WHERE NOT done EXCEPT SELECT job_id FROM gdrive.gdrive_hash_worker LIMIT 1) ti USING (job_id) ON CONFLICT (backend_pid) DO UPDATE SET job_id=EXCLUDED.job_id RETURNING job_id, bw_limit) SELECT job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, lower(upload_range), upper(upload_range) FROM thw JOIN gdrive.gdrive_hash_job USING (job_id) JOIN gdrive.upload USING (upload_id) JOIN gdrive.file USING (upload_id), gdrive.access_token WHERE access_token.id=1 AND access_token.expires>CURRENT_TIMESTAMP')
+       try:
+               [(job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, upload_start, upload_end)] = cr.fetchall()
+       except ValueError: # no jobs
+               sleep(60)
+               continue
+
+       upload_len = upload_end - upload_start
+       upload_waste_start = upload_start % 16
+       upload_blk_start = upload_start - upload_waste_start
+       upload_waste_end = (16 - upload_end % 16 ) % 16
+       upload_blk_end = upload_end + upload_waste_end + 15
+
+       buf = bytearray(16384)
+
+       num_blocks = upload_len >> 14
+       if upload_len > (num_blocks << 14):
+               num_blocks += 1
+
+       if hash_blocks:
+               block_hashes = bytearray(num_blocks * 32)
+       else:
+               block_hashes = None
+
+       if hash_pieces:
+               num_pieces = upload_len >> piece_size
+               if upload_len > (num_pieces << piece_size):
+                       num_pieces += 1
+               log2_blocks_per_piece = piece_size - 14
+               piece_hashes = bytearray(num_pieces * 20)
+       else:
+               num_pieces = None
+               piece_hashes = None
+       
+       if bw_limit is not None:
+               bw_limit_str = " '--limit-rate' '{}k'".format(bw_limit)
+       else:
+               bw_limit_str = ''
+
+       coproc = subprocess.Popen(["curl '--fail'{} '-H' 'Authorization: Bearer {}' '-H' 'Range: bytes={}-{}' 'https://www.googleapis.com/drive/v3/files/{}?alt=media' | openssl 'enc' '-aes-128-cbc' '-d' '-nopad' '-K' '{}' '-iv' \"`head --bytes 16 | xxd -p`\" | dd 'bs=16M' 'skip={}' 'count={}' 'iflag=skip_bytes,count_bytes,fullblock' 'status=none'".format(bw_limit_str, access_token, upload_blk_start, upload_blk_end, gdrive_file_id, encryption_key.hex(), upload_waste_start, upload_len)], stdout=subprocess.PIPE, shell=True)
+
+       block = 0
+       for piece in range(num_pieces or 1):
+               if hash_pieces:
+                       piece_end = min((piece+1) << log2_blocks_per_piece, num_blocks)
+                       piece_hasher = hashlib.sha1()
+               else:
+                       piece_end = num_blocks
+               while block < piece_end:
+                       bytes_to_end = upload_len - (block << 14)
+                       if bytes_to_end < 16384:
+                               buf = buf[:bytes_to_end]
+                       assert coproc.stdout.readinto(buf) == len(buf)
+                       if hash_blocks:
+                               hashbuf_offset = block << 5
+                               block_hashes[hashbuf_offset:hashbuf_offset+32] = hashlib.sha256(buf).digest()
+                       if hash_pieces:
+                               piece_hasher.update(buf)
+                       block += 1
+               if hash_pieces:
+                       hashbuf_offset = piece * 20
+                       piece_hashes[hashbuf_offset:hashbuf_offset+20] = piece_hasher.digest()
+       assert coproc.stdout.read(1) == b'' # EOF
+       assert coproc.wait() == 0 # subprocess success
+
+       cr.execute("""
+               WITH ti AS (SELECT
+                               %s AS job_id,
+                               %s::integer AS piece_size,
+                               %s AS hash_pieces,
+                               %s AS hash_blocks,
+                               upload_id,
+                               int8range(%s, %s) AS upload_range,
+                               %s::bytea AS piece_hashes,
+                               %s::bytea AS block_hashes
+                       FROM gdrive.file
+                       WHERE gdrive_file_id=%s),
+               th AS (INSERT INTO gdrive.gdrive_hashes (upload_id, upload_range, piece_size, algo, hashes)
+                       SELECT upload_id, upload_range, piece_size, 'sha1', piece_hashes FROM ti WHERE hash_pieces
+                       UNION SELECT upload_id, upload_range, 14, 'sha256', block_hashes FROM ti WHERE hash_blocks),
+               thw AS (UPDATE gdrive.gdrive_hash_worker ts SET job_id=NULL
+                       FROM ti WHERE backend_pid=pg_backend_pid() AND ts.job_id=ti.job_id)
+               UPDATE gdrive.gdrive_hash_job ts
+               SET upload_id=ti.upload_id, upload_range=ti.upload_range, piece_size=ti.piece_size,
+                       hash_pieces=ti.hash_pieces, hash_blocks=ti.hash_blocks, done=true
+               FROM ti WHERE ts.job_id=ti.job_id
+       """, (job_id, piece_size, hash_pieces, hash_blocks, upload_start, upload_end, piece_hashes, block_hashes, gdrive_file_id))
diff --git a/scripts/ingest_ptp_json.py b/scripts/ingest_ptp_json.py
new file mode 100644 (file)
index 0000000..1104530
--- /dev/null
@@ -0,0 +1,72 @@
+import argparse
+import bz2
+import contextlib
+import json
+import pathlib
+import sys
+
+import psycopg2
+
+
+arg_parser = argparse.ArgumentParser()
+arg_parser.add_argument("paths", metavar = "path", nargs = "+")
+args = arg_parser.parse_args()
+paths = list({pathlib.Path(a): None for a in args.paths}.keys())  # deduplicate
+
+
+# Fail fast if a path is wrong
+for p in paths:
+       p.open("rb").close()
+
+
+def ingest(doc, db_cursor):
+       for group in doc["Movies"]:
+               group_id = group["GroupId"]
+               stmt = (
+                       "insert into ptp.torrent_group (id, title, year, imdb_id)"
+                       + " values (%s, %s, %s, %s)"
+                       + " on conflict (id) do update set (id, title, year, imdb_id) = row(excluded.*)"
+                       + ";"
+               )
+               db_cursor.execute(
+                       stmt,
+                       (group_id, group.get("Title"), group.get("Year"), group.get("ImdbId"))
+               )
+
+               group_cols = "(group_id, torrentid, quality, source, container, codec, resolution, seeders, leechers, snatched)"
+               stmt = (
+                       f"insert into ptp.torrent {group_cols}"
+                       + " values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
+                       + f" on conflict (torrentid) do update set {group_cols} = row(excluded.*)"
+                       + ";"
+               )
+               for torrent in group["Torrents"]:
+                       id_ = torrent["Id"]
+                       quality = torrent.get("Quality")
+                       source = torrent.get("Source")
+                       container = torrent.get("Container")
+                       codec = torrent.get("Codec")
+                       resolution = torrent.get("Resolution")
+                       seeders = int(torrent["Seeders"]) if "Seeders" in torrent else None
+                       leechers = int(torrent["Leechers"]) if "Leechers" in torrent else None
+                       snatched = int(torrent["Snatched"]) if "Snatched" in torrent else None
+                       db_cursor.execute(
+                               stmt,
+                               (group_id, id_, quality, source, container, codec, resolution, seeders, leechers, snatched),
+                       )
+
+
+with contextlib.closing(psycopg2.connect("")) as conn, conn.cursor() as cursor:
+       conn.autocommit = True
+
+       for p in paths:
+               with p.open("rb") as f:
+                       json_data = bz2.decompress(f.read()).decode("utf-8")
+
+               doc = json.loads(json_data)
+               group_count = len(doc["Movies"])
+               torrent_count = sum(len(g["Torrents"]) for g in doc["Movies"])
+
+               ingest(doc, cursor)
+               print(p, file = sys.stderr)
+               print("  processed {} torrents in {} groups".format(torrent_count, group_count), file = sys.stderr)