From: Jakob Cornell Date: Thu, 20 Jan 2022 01:50:00 +0000 (-0600) Subject: Simplify repository layout X-Git-Url: https://jcornell.net/gitweb/gitweb.cgi?a=commitdiff_plain;h=e715ae82910ef6307168ae17d7bc584abe6e8bc1;p=eros.git Simplify repository layout --- diff --git a/python/scripts/gdrive_access_token.py b/python/scripts/gdrive_access_token.py deleted file mode 100644 index d52543e..0000000 --- a/python/scripts/gdrive_access_token.py +++ /dev/null @@ -1,30 +0,0 @@ -import psycopg2 -import urllib.request -import urllib.parse -import io -import json -import re - -# -# refresh tokens -# - -cn = psycopg2.connect('') -cr = cn.cursor() - -cr.execute("BEGIN") -cr.execute("SELECT id, client_id, client_secret, refresh_token FROM gdrive.access_token WHERE id IN (1) AND refresh_token IS NOT NULL AND expires IS NULL OR expires < CURRENT_TIMESTAMP + '30min' FOR NO KEY UPDATE") -for (id_, client_id, client_secret, refresh_token) in cr.fetchall(): - token_req = urllib.parse.urlencode(( - ('grant_type', 'refresh_token'), - ('client_id', client_id), - ('client_secret', client_secret), - ('refresh_token', refresh_token))).encode('utf-8') - resp = urllib.request.urlopen(urllib.request.Request("https://www.googleapis.com/oauth2/v4/token", data=token_req, method='POST')) - assert resp.status == 200 - new_token = json.load(io.TextIOWrapper(io.BufferedReader(resp))) - cr.execute("UPDATE gdrive.access_token SET access_token = %s, scope = %s, expires = CURRENT_TIMESTAMP + make_interval(secs => %s) WHERE id = %s", (new_token['access_token'], new_token['scope'], new_token['expires_in'], id_)) - -cn.commit() - -cn.close() diff --git a/python/scripts/gdrive_hash_worker b/python/scripts/gdrive_hash_worker deleted file mode 100755 index 6f6fa25..0000000 --- a/python/scripts/gdrive_hash_worker +++ /dev/null @@ -1,99 +0,0 @@ -#!/usr/bin/python3 - -import hashlib -import psycopg2 -import subprocess -from time import sleep - -cn = psycopg2.connect('') -cn.set_isolation_level(0) # autocommit -cr = cn.cursor() - -while True: - cr.execute('WITH thw AS (INSERT INTO gdrive.gdrive_hash_worker (backend_pid, job_id) SELECT pg_backend_pid(), job_id FROM gdrive.gdrive_hash_job tj JOIN (SELECT job_id FROM gdrive.gdrive_hash_job WHERE NOT done EXCEPT SELECT job_id FROM gdrive.gdrive_hash_worker LIMIT 1) ti USING (job_id) ON CONFLICT (backend_pid) DO UPDATE SET job_id=EXCLUDED.job_id RETURNING job_id, bw_limit) SELECT job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, lower(upload_range), upper(upload_range) FROM thw JOIN gdrive.gdrive_hash_job USING (job_id) JOIN gdrive.upload USING (upload_id) JOIN gdrive.file USING (upload_id), gdrive.access_token WHERE access_token.id=1 AND access_token.expires>CURRENT_TIMESTAMP') - try: - [(job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, upload_start, upload_end)] = cr.fetchall() - except ValueError: # no jobs - sleep(60) - continue - - upload_len = upload_end - upload_start - upload_waste_start = upload_start % 16 - upload_blk_start = upload_start - upload_waste_start - upload_waste_end = (16 - upload_end % 16 ) % 16 - upload_blk_end = upload_end + upload_waste_end + 15 - - buf = bytearray(16384) - - num_blocks = upload_len >> 14 - if upload_len > (num_blocks << 14): - num_blocks += 1 - - if hash_blocks: - block_hashes = bytearray(num_blocks * 32) - else: - block_hashes = None - - if hash_pieces: - num_pieces = upload_len >> piece_size - if upload_len > (num_pieces << piece_size): - num_pieces += 1 - log2_blocks_per_piece = piece_size - 14 - piece_hashes = bytearray(num_pieces * 20) - else: - num_pieces = None - piece_hashes = None - - if bw_limit is not None: - bw_limit_str = " '--limit-rate' '{}k'".format(bw_limit) - else: - bw_limit_str = '' - - coproc = subprocess.Popen(["curl '--fail'{} '-H' 'Authorization: Bearer {}' '-H' 'Range: bytes={}-{}' 'https://www.googleapis.com/drive/v3/files/{}?alt=media' | openssl 'enc' '-aes-128-cbc' '-d' '-nopad' '-K' '{}' '-iv' \"`head --bytes 16 | xxd -p`\" | dd 'bs=16M' 'skip={}' 'count={}' 'iflag=skip_bytes,count_bytes,fullblock' 'status=none'".format(bw_limit_str, access_token, upload_blk_start, upload_blk_end, gdrive_file_id, encryption_key.hex(), upload_waste_start, upload_len)], stdout=subprocess.PIPE, shell=True) - - block = 0 - for piece in range(num_pieces or 1): - if hash_pieces: - piece_end = min((piece+1) << log2_blocks_per_piece, num_blocks) - piece_hasher = hashlib.sha1() - else: - piece_end = num_blocks - while block < piece_end: - bytes_to_end = upload_len - (block << 14) - if bytes_to_end < 16384: - buf = buf[:bytes_to_end] - assert coproc.stdout.readinto(buf) == len(buf) - if hash_blocks: - hashbuf_offset = block << 5 - block_hashes[hashbuf_offset:hashbuf_offset+32] = hashlib.sha256(buf).digest() - if hash_pieces: - piece_hasher.update(buf) - block += 1 - if hash_pieces: - hashbuf_offset = piece * 20 - piece_hashes[hashbuf_offset:hashbuf_offset+20] = piece_hasher.digest() - assert coproc.stdout.read(1) == b'' # EOF - assert coproc.wait() == 0 # subprocess success - - cr.execute(""" - WITH ti AS (SELECT - %s AS job_id, - %s::integer AS piece_size, - %s AS hash_pieces, - %s AS hash_blocks, - upload_id, - int8range(%s, %s) AS upload_range, - %s::bytea AS piece_hashes, - %s::bytea AS block_hashes - FROM gdrive.file - WHERE gdrive_file_id=%s), - th AS (INSERT INTO gdrive.gdrive_hashes (upload_id, upload_range, piece_size, algo, hashes) - SELECT upload_id, upload_range, piece_size, 'sha1', piece_hashes FROM ti WHERE hash_pieces - UNION SELECT upload_id, upload_range, 14, 'sha256', block_hashes FROM ti WHERE hash_blocks), - thw AS (UPDATE gdrive.gdrive_hash_worker ts SET job_id=NULL - FROM ti WHERE backend_pid=pg_backend_pid() AND ts.job_id=ti.job_id) - UPDATE gdrive.gdrive_hash_job ts - SET upload_id=ti.upload_id, upload_range=ti.upload_range, piece_size=ti.piece_size, - hash_pieces=ti.hash_pieces, hash_blocks=ti.hash_blocks, done=true - FROM ti WHERE ts.job_id=ti.job_id - """, (job_id, piece_size, hash_pieces, hash_blocks, upload_start, upload_end, piece_hashes, block_hashes, gdrive_file_id)) diff --git a/python/scripts/ingest_ptp_json.py b/python/scripts/ingest_ptp_json.py deleted file mode 100644 index 1104530..0000000 --- a/python/scripts/ingest_ptp_json.py +++ /dev/null @@ -1,72 +0,0 @@ -import argparse -import bz2 -import contextlib -import json -import pathlib -import sys - -import psycopg2 - - -arg_parser = argparse.ArgumentParser() -arg_parser.add_argument("paths", metavar = "path", nargs = "+") -args = arg_parser.parse_args() -paths = list({pathlib.Path(a): None for a in args.paths}.keys()) # deduplicate - - -# Fail fast if a path is wrong -for p in paths: - p.open("rb").close() - - -def ingest(doc, db_cursor): - for group in doc["Movies"]: - group_id = group["GroupId"] - stmt = ( - "insert into ptp.torrent_group (id, title, year, imdb_id)" - + " values (%s, %s, %s, %s)" - + " on conflict (id) do update set (id, title, year, imdb_id) = row(excluded.*)" - + ";" - ) - db_cursor.execute( - stmt, - (group_id, group.get("Title"), group.get("Year"), group.get("ImdbId")) - ) - - group_cols = "(group_id, torrentid, quality, source, container, codec, resolution, seeders, leechers, snatched)" - stmt = ( - f"insert into ptp.torrent {group_cols}" - + " values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" - + f" on conflict (torrentid) do update set {group_cols} = row(excluded.*)" - + ";" - ) - for torrent in group["Torrents"]: - id_ = torrent["Id"] - quality = torrent.get("Quality") - source = torrent.get("Source") - container = torrent.get("Container") - codec = torrent.get("Codec") - resolution = torrent.get("Resolution") - seeders = int(torrent["Seeders"]) if "Seeders" in torrent else None - leechers = int(torrent["Leechers"]) if "Leechers" in torrent else None - snatched = int(torrent["Snatched"]) if "Snatched" in torrent else None - db_cursor.execute( - stmt, - (group_id, id_, quality, source, container, codec, resolution, seeders, leechers, snatched), - ) - - -with contextlib.closing(psycopg2.connect("")) as conn, conn.cursor() as cursor: - conn.autocommit = True - - for p in paths: - with p.open("rb") as f: - json_data = bz2.decompress(f.read()).decode("utf-8") - - doc = json.loads(json_data) - group_count = len(doc["Movies"]) - torrent_count = sum(len(g["Torrents"]) for g in doc["Movies"]) - - ingest(doc, cursor) - print(p, file = sys.stderr) - print(" processed {} torrents in {} groups".format(torrent_count, group_count), file = sys.stderr) diff --git a/scripts/gdrive_access_token.py b/scripts/gdrive_access_token.py new file mode 100644 index 0000000..d52543e --- /dev/null +++ b/scripts/gdrive_access_token.py @@ -0,0 +1,30 @@ +import psycopg2 +import urllib.request +import urllib.parse +import io +import json +import re + +# +# refresh tokens +# + +cn = psycopg2.connect('') +cr = cn.cursor() + +cr.execute("BEGIN") +cr.execute("SELECT id, client_id, client_secret, refresh_token FROM gdrive.access_token WHERE id IN (1) AND refresh_token IS NOT NULL AND expires IS NULL OR expires < CURRENT_TIMESTAMP + '30min' FOR NO KEY UPDATE") +for (id_, client_id, client_secret, refresh_token) in cr.fetchall(): + token_req = urllib.parse.urlencode(( + ('grant_type', 'refresh_token'), + ('client_id', client_id), + ('client_secret', client_secret), + ('refresh_token', refresh_token))).encode('utf-8') + resp = urllib.request.urlopen(urllib.request.Request("https://www.googleapis.com/oauth2/v4/token", data=token_req, method='POST')) + assert resp.status == 200 + new_token = json.load(io.TextIOWrapper(io.BufferedReader(resp))) + cr.execute("UPDATE gdrive.access_token SET access_token = %s, scope = %s, expires = CURRENT_TIMESTAMP + make_interval(secs => %s) WHERE id = %s", (new_token['access_token'], new_token['scope'], new_token['expires_in'], id_)) + +cn.commit() + +cn.close() diff --git a/scripts/gdrive_hash_worker b/scripts/gdrive_hash_worker new file mode 100755 index 0000000..6f6fa25 --- /dev/null +++ b/scripts/gdrive_hash_worker @@ -0,0 +1,99 @@ +#!/usr/bin/python3 + +import hashlib +import psycopg2 +import subprocess +from time import sleep + +cn = psycopg2.connect('') +cn.set_isolation_level(0) # autocommit +cr = cn.cursor() + +while True: + cr.execute('WITH thw AS (INSERT INTO gdrive.gdrive_hash_worker (backend_pid, job_id) SELECT pg_backend_pid(), job_id FROM gdrive.gdrive_hash_job tj JOIN (SELECT job_id FROM gdrive.gdrive_hash_job WHERE NOT done EXCEPT SELECT job_id FROM gdrive.gdrive_hash_worker LIMIT 1) ti USING (job_id) ON CONFLICT (backend_pid) DO UPDATE SET job_id=EXCLUDED.job_id RETURNING job_id, bw_limit) SELECT job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, lower(upload_range), upper(upload_range) FROM thw JOIN gdrive.gdrive_hash_job USING (job_id) JOIN gdrive.upload USING (upload_id) JOIN gdrive.file USING (upload_id), gdrive.access_token WHERE access_token.id=1 AND access_token.expires>CURRENT_TIMESTAMP') + try: + [(job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, upload_start, upload_end)] = cr.fetchall() + except ValueError: # no jobs + sleep(60) + continue + + upload_len = upload_end - upload_start + upload_waste_start = upload_start % 16 + upload_blk_start = upload_start - upload_waste_start + upload_waste_end = (16 - upload_end % 16 ) % 16 + upload_blk_end = upload_end + upload_waste_end + 15 + + buf = bytearray(16384) + + num_blocks = upload_len >> 14 + if upload_len > (num_blocks << 14): + num_blocks += 1 + + if hash_blocks: + block_hashes = bytearray(num_blocks * 32) + else: + block_hashes = None + + if hash_pieces: + num_pieces = upload_len >> piece_size + if upload_len > (num_pieces << piece_size): + num_pieces += 1 + log2_blocks_per_piece = piece_size - 14 + piece_hashes = bytearray(num_pieces * 20) + else: + num_pieces = None + piece_hashes = None + + if bw_limit is not None: + bw_limit_str = " '--limit-rate' '{}k'".format(bw_limit) + else: + bw_limit_str = '' + + coproc = subprocess.Popen(["curl '--fail'{} '-H' 'Authorization: Bearer {}' '-H' 'Range: bytes={}-{}' 'https://www.googleapis.com/drive/v3/files/{}?alt=media' | openssl 'enc' '-aes-128-cbc' '-d' '-nopad' '-K' '{}' '-iv' \"`head --bytes 16 | xxd -p`\" | dd 'bs=16M' 'skip={}' 'count={}' 'iflag=skip_bytes,count_bytes,fullblock' 'status=none'".format(bw_limit_str, access_token, upload_blk_start, upload_blk_end, gdrive_file_id, encryption_key.hex(), upload_waste_start, upload_len)], stdout=subprocess.PIPE, shell=True) + + block = 0 + for piece in range(num_pieces or 1): + if hash_pieces: + piece_end = min((piece+1) << log2_blocks_per_piece, num_blocks) + piece_hasher = hashlib.sha1() + else: + piece_end = num_blocks + while block < piece_end: + bytes_to_end = upload_len - (block << 14) + if bytes_to_end < 16384: + buf = buf[:bytes_to_end] + assert coproc.stdout.readinto(buf) == len(buf) + if hash_blocks: + hashbuf_offset = block << 5 + block_hashes[hashbuf_offset:hashbuf_offset+32] = hashlib.sha256(buf).digest() + if hash_pieces: + piece_hasher.update(buf) + block += 1 + if hash_pieces: + hashbuf_offset = piece * 20 + piece_hashes[hashbuf_offset:hashbuf_offset+20] = piece_hasher.digest() + assert coproc.stdout.read(1) == b'' # EOF + assert coproc.wait() == 0 # subprocess success + + cr.execute(""" + WITH ti AS (SELECT + %s AS job_id, + %s::integer AS piece_size, + %s AS hash_pieces, + %s AS hash_blocks, + upload_id, + int8range(%s, %s) AS upload_range, + %s::bytea AS piece_hashes, + %s::bytea AS block_hashes + FROM gdrive.file + WHERE gdrive_file_id=%s), + th AS (INSERT INTO gdrive.gdrive_hashes (upload_id, upload_range, piece_size, algo, hashes) + SELECT upload_id, upload_range, piece_size, 'sha1', piece_hashes FROM ti WHERE hash_pieces + UNION SELECT upload_id, upload_range, 14, 'sha256', block_hashes FROM ti WHERE hash_blocks), + thw AS (UPDATE gdrive.gdrive_hash_worker ts SET job_id=NULL + FROM ti WHERE backend_pid=pg_backend_pid() AND ts.job_id=ti.job_id) + UPDATE gdrive.gdrive_hash_job ts + SET upload_id=ti.upload_id, upload_range=ti.upload_range, piece_size=ti.piece_size, + hash_pieces=ti.hash_pieces, hash_blocks=ti.hash_blocks, done=true + FROM ti WHERE ts.job_id=ti.job_id + """, (job_id, piece_size, hash_pieces, hash_blocks, upload_start, upload_end, piece_hashes, block_hashes, gdrive_file_id)) diff --git a/scripts/ingest_ptp_json.py b/scripts/ingest_ptp_json.py new file mode 100644 index 0000000..1104530 --- /dev/null +++ b/scripts/ingest_ptp_json.py @@ -0,0 +1,72 @@ +import argparse +import bz2 +import contextlib +import json +import pathlib +import sys + +import psycopg2 + + +arg_parser = argparse.ArgumentParser() +arg_parser.add_argument("paths", metavar = "path", nargs = "+") +args = arg_parser.parse_args() +paths = list({pathlib.Path(a): None for a in args.paths}.keys()) # deduplicate + + +# Fail fast if a path is wrong +for p in paths: + p.open("rb").close() + + +def ingest(doc, db_cursor): + for group in doc["Movies"]: + group_id = group["GroupId"] + stmt = ( + "insert into ptp.torrent_group (id, title, year, imdb_id)" + + " values (%s, %s, %s, %s)" + + " on conflict (id) do update set (id, title, year, imdb_id) = row(excluded.*)" + + ";" + ) + db_cursor.execute( + stmt, + (group_id, group.get("Title"), group.get("Year"), group.get("ImdbId")) + ) + + group_cols = "(group_id, torrentid, quality, source, container, codec, resolution, seeders, leechers, snatched)" + stmt = ( + f"insert into ptp.torrent {group_cols}" + + " values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" + + f" on conflict (torrentid) do update set {group_cols} = row(excluded.*)" + + ";" + ) + for torrent in group["Torrents"]: + id_ = torrent["Id"] + quality = torrent.get("Quality") + source = torrent.get("Source") + container = torrent.get("Container") + codec = torrent.get("Codec") + resolution = torrent.get("Resolution") + seeders = int(torrent["Seeders"]) if "Seeders" in torrent else None + leechers = int(torrent["Leechers"]) if "Leechers" in torrent else None + snatched = int(torrent["Snatched"]) if "Snatched" in torrent else None + db_cursor.execute( + stmt, + (group_id, id_, quality, source, container, codec, resolution, seeders, leechers, snatched), + ) + + +with contextlib.closing(psycopg2.connect("")) as conn, conn.cursor() as cursor: + conn.autocommit = True + + for p in paths: + with p.open("rb") as f: + json_data = bz2.decompress(f.read()).decode("utf-8") + + doc = json.loads(json_data) + group_count = len(doc["Movies"]) + torrent_count = sum(len(g["Torrents"]) for g in doc["Movies"]) + + ingest(doc, cursor) + print(p, file = sys.stderr) + print(" processed {} torrents in {} groups".format(torrent_count, group_count), file = sys.stderr)