+++ /dev/null
-import psycopg2
-import urllib.request
-import urllib.parse
-import io
-import json
-import re
-
-#
-# refresh tokens
-#
-
-cn = psycopg2.connect('')
-cr = cn.cursor()
-
-cr.execute("BEGIN")
-cr.execute("SELECT id, client_id, client_secret, refresh_token FROM gdrive.access_token WHERE id IN (1) AND refresh_token IS NOT NULL AND expires IS NULL OR expires < CURRENT_TIMESTAMP + '30min' FOR NO KEY UPDATE")
-for (id_, client_id, client_secret, refresh_token) in cr.fetchall():
- token_req = urllib.parse.urlencode((
- ('grant_type', 'refresh_token'),
- ('client_id', client_id),
- ('client_secret', client_secret),
- ('refresh_token', refresh_token))).encode('utf-8')
- resp = urllib.request.urlopen(urllib.request.Request("https://www.googleapis.com/oauth2/v4/token", data=token_req, method='POST'))
- assert resp.status == 200
- new_token = json.load(io.TextIOWrapper(io.BufferedReader(resp)))
- cr.execute("UPDATE gdrive.access_token SET access_token = %s, scope = %s, expires = CURRENT_TIMESTAMP + make_interval(secs => %s) WHERE id = %s", (new_token['access_token'], new_token['scope'], new_token['expires_in'], id_))
-
-cn.commit()
-
-cn.close()
+++ /dev/null
-#!/usr/bin/python3
-
-import hashlib
-import psycopg2
-import subprocess
-from time import sleep
-
-cn = psycopg2.connect('')
-cn.set_isolation_level(0) # autocommit
-cr = cn.cursor()
-
-while True:
- cr.execute('WITH thw AS (INSERT INTO gdrive.gdrive_hash_worker (backend_pid, job_id) SELECT pg_backend_pid(), job_id FROM gdrive.gdrive_hash_job tj JOIN (SELECT job_id FROM gdrive.gdrive_hash_job WHERE NOT done EXCEPT SELECT job_id FROM gdrive.gdrive_hash_worker LIMIT 1) ti USING (job_id) ON CONFLICT (backend_pid) DO UPDATE SET job_id=EXCLUDED.job_id RETURNING job_id, bw_limit) SELECT job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, lower(upload_range), upper(upload_range) FROM thw JOIN gdrive.gdrive_hash_job USING (job_id) JOIN gdrive.upload USING (upload_id) JOIN gdrive.file USING (upload_id), gdrive.access_token WHERE access_token.id=1 AND access_token.expires>CURRENT_TIMESTAMP')
- try:
- [(job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, upload_start, upload_end)] = cr.fetchall()
- except ValueError: # no jobs
- sleep(60)
- continue
-
- upload_len = upload_end - upload_start
- upload_waste_start = upload_start % 16
- upload_blk_start = upload_start - upload_waste_start
- upload_waste_end = (16 - upload_end % 16 ) % 16
- upload_blk_end = upload_end + upload_waste_end + 15
-
- buf = bytearray(16384)
-
- num_blocks = upload_len >> 14
- if upload_len > (num_blocks << 14):
- num_blocks += 1
-
- if hash_blocks:
- block_hashes = bytearray(num_blocks * 32)
- else:
- block_hashes = None
-
- if hash_pieces:
- num_pieces = upload_len >> piece_size
- if upload_len > (num_pieces << piece_size):
- num_pieces += 1
- log2_blocks_per_piece = piece_size - 14
- piece_hashes = bytearray(num_pieces * 20)
- else:
- num_pieces = None
- piece_hashes = None
-
- if bw_limit is not None:
- bw_limit_str = " '--limit-rate' '{}k'".format(bw_limit)
- else:
- bw_limit_str = ''
-
- coproc = subprocess.Popen(["curl '--fail'{} '-H' 'Authorization: Bearer {}' '-H' 'Range: bytes={}-{}' 'https://www.googleapis.com/drive/v3/files/{}?alt=media' | openssl 'enc' '-aes-128-cbc' '-d' '-nopad' '-K' '{}' '-iv' \"`head --bytes 16 | xxd -p`\" | dd 'bs=16M' 'skip={}' 'count={}' 'iflag=skip_bytes,count_bytes,fullblock' 'status=none'".format(bw_limit_str, access_token, upload_blk_start, upload_blk_end, gdrive_file_id, encryption_key.hex(), upload_waste_start, upload_len)], stdout=subprocess.PIPE, shell=True)
-
- block = 0
- for piece in range(num_pieces or 1):
- if hash_pieces:
- piece_end = min((piece+1) << log2_blocks_per_piece, num_blocks)
- piece_hasher = hashlib.sha1()
- else:
- piece_end = num_blocks
- while block < piece_end:
- bytes_to_end = upload_len - (block << 14)
- if bytes_to_end < 16384:
- buf = buf[:bytes_to_end]
- assert coproc.stdout.readinto(buf) == len(buf)
- if hash_blocks:
- hashbuf_offset = block << 5
- block_hashes[hashbuf_offset:hashbuf_offset+32] = hashlib.sha256(buf).digest()
- if hash_pieces:
- piece_hasher.update(buf)
- block += 1
- if hash_pieces:
- hashbuf_offset = piece * 20
- piece_hashes[hashbuf_offset:hashbuf_offset+20] = piece_hasher.digest()
- assert coproc.stdout.read(1) == b'' # EOF
- assert coproc.wait() == 0 # subprocess success
-
- cr.execute("""
- WITH ti AS (SELECT
- %s AS job_id,
- %s::integer AS piece_size,
- %s AS hash_pieces,
- %s AS hash_blocks,
- upload_id,
- int8range(%s, %s) AS upload_range,
- %s::bytea AS piece_hashes,
- %s::bytea AS block_hashes
- FROM gdrive.file
- WHERE gdrive_file_id=%s),
- th AS (INSERT INTO gdrive.gdrive_hashes (upload_id, upload_range, piece_size, algo, hashes)
- SELECT upload_id, upload_range, piece_size, 'sha1', piece_hashes FROM ti WHERE hash_pieces
- UNION SELECT upload_id, upload_range, 14, 'sha256', block_hashes FROM ti WHERE hash_blocks),
- thw AS (UPDATE gdrive.gdrive_hash_worker ts SET job_id=NULL
- FROM ti WHERE backend_pid=pg_backend_pid() AND ts.job_id=ti.job_id)
- UPDATE gdrive.gdrive_hash_job ts
- SET upload_id=ti.upload_id, upload_range=ti.upload_range, piece_size=ti.piece_size,
- hash_pieces=ti.hash_pieces, hash_blocks=ti.hash_blocks, done=true
- FROM ti WHERE ts.job_id=ti.job_id
- """, (job_id, piece_size, hash_pieces, hash_blocks, upload_start, upload_end, piece_hashes, block_hashes, gdrive_file_id))
+++ /dev/null
-import argparse
-import bz2
-import contextlib
-import json
-import pathlib
-import sys
-
-import psycopg2
-
-
-arg_parser = argparse.ArgumentParser()
-arg_parser.add_argument("paths", metavar = "path", nargs = "+")
-args = arg_parser.parse_args()
-paths = list({pathlib.Path(a): None for a in args.paths}.keys()) # deduplicate
-
-
-# Fail fast if a path is wrong
-for p in paths:
- p.open("rb").close()
-
-
-def ingest(doc, db_cursor):
- for group in doc["Movies"]:
- group_id = group["GroupId"]
- stmt = (
- "insert into ptp.torrent_group (id, title, year, imdb_id)"
- + " values (%s, %s, %s, %s)"
- + " on conflict (id) do update set (id, title, year, imdb_id) = row(excluded.*)"
- + ";"
- )
- db_cursor.execute(
- stmt,
- (group_id, group.get("Title"), group.get("Year"), group.get("ImdbId"))
- )
-
- group_cols = "(group_id, torrentid, quality, source, container, codec, resolution, seeders, leechers, snatched)"
- stmt = (
- f"insert into ptp.torrent {group_cols}"
- + " values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
- + f" on conflict (torrentid) do update set {group_cols} = row(excluded.*)"
- + ";"
- )
- for torrent in group["Torrents"]:
- id_ = torrent["Id"]
- quality = torrent.get("Quality")
- source = torrent.get("Source")
- container = torrent.get("Container")
- codec = torrent.get("Codec")
- resolution = torrent.get("Resolution")
- seeders = int(torrent["Seeders"]) if "Seeders" in torrent else None
- leechers = int(torrent["Leechers"]) if "Leechers" in torrent else None
- snatched = int(torrent["Snatched"]) if "Snatched" in torrent else None
- db_cursor.execute(
- stmt,
- (group_id, id_, quality, source, container, codec, resolution, seeders, leechers, snatched),
- )
-
-
-with contextlib.closing(psycopg2.connect("")) as conn, conn.cursor() as cursor:
- conn.autocommit = True
-
- for p in paths:
- with p.open("rb") as f:
- json_data = bz2.decompress(f.read()).decode("utf-8")
-
- doc = json.loads(json_data)
- group_count = len(doc["Movies"])
- torrent_count = sum(len(g["Torrents"]) for g in doc["Movies"])
-
- ingest(doc, cursor)
- print(p, file = sys.stderr)
- print(" processed {} torrents in {} groups".format(torrent_count, group_count), file = sys.stderr)
--- /dev/null
+import psycopg2
+import urllib.request
+import urllib.parse
+import io
+import json
+import re
+
+#
+# refresh tokens
+#
+
+cn = psycopg2.connect('')
+cr = cn.cursor()
+
+cr.execute("BEGIN")
+cr.execute("SELECT id, client_id, client_secret, refresh_token FROM gdrive.access_token WHERE id IN (1) AND refresh_token IS NOT NULL AND expires IS NULL OR expires < CURRENT_TIMESTAMP + '30min' FOR NO KEY UPDATE")
+for (id_, client_id, client_secret, refresh_token) in cr.fetchall():
+ token_req = urllib.parse.urlencode((
+ ('grant_type', 'refresh_token'),
+ ('client_id', client_id),
+ ('client_secret', client_secret),
+ ('refresh_token', refresh_token))).encode('utf-8')
+ resp = urllib.request.urlopen(urllib.request.Request("https://www.googleapis.com/oauth2/v4/token", data=token_req, method='POST'))
+ assert resp.status == 200
+ new_token = json.load(io.TextIOWrapper(io.BufferedReader(resp)))
+ cr.execute("UPDATE gdrive.access_token SET access_token = %s, scope = %s, expires = CURRENT_TIMESTAMP + make_interval(secs => %s) WHERE id = %s", (new_token['access_token'], new_token['scope'], new_token['expires_in'], id_))
+
+cn.commit()
+
+cn.close()
--- /dev/null
+#!/usr/bin/python3
+
+import hashlib
+import psycopg2
+import subprocess
+from time import sleep
+
+cn = psycopg2.connect('')
+cn.set_isolation_level(0) # autocommit
+cr = cn.cursor()
+
+while True:
+ cr.execute('WITH thw AS (INSERT INTO gdrive.gdrive_hash_worker (backend_pid, job_id) SELECT pg_backend_pid(), job_id FROM gdrive.gdrive_hash_job tj JOIN (SELECT job_id FROM gdrive.gdrive_hash_job WHERE NOT done EXCEPT SELECT job_id FROM gdrive.gdrive_hash_worker LIMIT 1) ti USING (job_id) ON CONFLICT (backend_pid) DO UPDATE SET job_id=EXCLUDED.job_id RETURNING job_id, bw_limit) SELECT job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, lower(upload_range), upper(upload_range) FROM thw JOIN gdrive.gdrive_hash_job USING (job_id) JOIN gdrive.upload USING (upload_id) JOIN gdrive.file USING (upload_id), gdrive.access_token WHERE access_token.id=1 AND access_token.expires>CURRENT_TIMESTAMP')
+ try:
+ [(job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, upload_start, upload_end)] = cr.fetchall()
+ except ValueError: # no jobs
+ sleep(60)
+ continue
+
+ upload_len = upload_end - upload_start
+ upload_waste_start = upload_start % 16
+ upload_blk_start = upload_start - upload_waste_start
+ upload_waste_end = (16 - upload_end % 16 ) % 16
+ upload_blk_end = upload_end + upload_waste_end + 15
+
+ buf = bytearray(16384)
+
+ num_blocks = upload_len >> 14
+ if upload_len > (num_blocks << 14):
+ num_blocks += 1
+
+ if hash_blocks:
+ block_hashes = bytearray(num_blocks * 32)
+ else:
+ block_hashes = None
+
+ if hash_pieces:
+ num_pieces = upload_len >> piece_size
+ if upload_len > (num_pieces << piece_size):
+ num_pieces += 1
+ log2_blocks_per_piece = piece_size - 14
+ piece_hashes = bytearray(num_pieces * 20)
+ else:
+ num_pieces = None
+ piece_hashes = None
+
+ if bw_limit is not None:
+ bw_limit_str = " '--limit-rate' '{}k'".format(bw_limit)
+ else:
+ bw_limit_str = ''
+
+ coproc = subprocess.Popen(["curl '--fail'{} '-H' 'Authorization: Bearer {}' '-H' 'Range: bytes={}-{}' 'https://www.googleapis.com/drive/v3/files/{}?alt=media' | openssl 'enc' '-aes-128-cbc' '-d' '-nopad' '-K' '{}' '-iv' \"`head --bytes 16 | xxd -p`\" | dd 'bs=16M' 'skip={}' 'count={}' 'iflag=skip_bytes,count_bytes,fullblock' 'status=none'".format(bw_limit_str, access_token, upload_blk_start, upload_blk_end, gdrive_file_id, encryption_key.hex(), upload_waste_start, upload_len)], stdout=subprocess.PIPE, shell=True)
+
+ block = 0
+ for piece in range(num_pieces or 1):
+ if hash_pieces:
+ piece_end = min((piece+1) << log2_blocks_per_piece, num_blocks)
+ piece_hasher = hashlib.sha1()
+ else:
+ piece_end = num_blocks
+ while block < piece_end:
+ bytes_to_end = upload_len - (block << 14)
+ if bytes_to_end < 16384:
+ buf = buf[:bytes_to_end]
+ assert coproc.stdout.readinto(buf) == len(buf)
+ if hash_blocks:
+ hashbuf_offset = block << 5
+ block_hashes[hashbuf_offset:hashbuf_offset+32] = hashlib.sha256(buf).digest()
+ if hash_pieces:
+ piece_hasher.update(buf)
+ block += 1
+ if hash_pieces:
+ hashbuf_offset = piece * 20
+ piece_hashes[hashbuf_offset:hashbuf_offset+20] = piece_hasher.digest()
+ assert coproc.stdout.read(1) == b'' # EOF
+ assert coproc.wait() == 0 # subprocess success
+
+ cr.execute("""
+ WITH ti AS (SELECT
+ %s AS job_id,
+ %s::integer AS piece_size,
+ %s AS hash_pieces,
+ %s AS hash_blocks,
+ upload_id,
+ int8range(%s, %s) AS upload_range,
+ %s::bytea AS piece_hashes,
+ %s::bytea AS block_hashes
+ FROM gdrive.file
+ WHERE gdrive_file_id=%s),
+ th AS (INSERT INTO gdrive.gdrive_hashes (upload_id, upload_range, piece_size, algo, hashes)
+ SELECT upload_id, upload_range, piece_size, 'sha1', piece_hashes FROM ti WHERE hash_pieces
+ UNION SELECT upload_id, upload_range, 14, 'sha256', block_hashes FROM ti WHERE hash_blocks),
+ thw AS (UPDATE gdrive.gdrive_hash_worker ts SET job_id=NULL
+ FROM ti WHERE backend_pid=pg_backend_pid() AND ts.job_id=ti.job_id)
+ UPDATE gdrive.gdrive_hash_job ts
+ SET upload_id=ti.upload_id, upload_range=ti.upload_range, piece_size=ti.piece_size,
+ hash_pieces=ti.hash_pieces, hash_blocks=ti.hash_blocks, done=true
+ FROM ti WHERE ts.job_id=ti.job_id
+ """, (job_id, piece_size, hash_pieces, hash_blocks, upload_start, upload_end, piece_hashes, block_hashes, gdrive_file_id))
--- /dev/null
+import argparse
+import bz2
+import contextlib
+import json
+import pathlib
+import sys
+
+import psycopg2
+
+
+arg_parser = argparse.ArgumentParser()
+arg_parser.add_argument("paths", metavar = "path", nargs = "+")
+args = arg_parser.parse_args()
+paths = list({pathlib.Path(a): None for a in args.paths}.keys()) # deduplicate
+
+
+# Fail fast if a path is wrong
+for p in paths:
+ p.open("rb").close()
+
+
+def ingest(doc, db_cursor):
+ for group in doc["Movies"]:
+ group_id = group["GroupId"]
+ stmt = (
+ "insert into ptp.torrent_group (id, title, year, imdb_id)"
+ + " values (%s, %s, %s, %s)"
+ + " on conflict (id) do update set (id, title, year, imdb_id) = row(excluded.*)"
+ + ";"
+ )
+ db_cursor.execute(
+ stmt,
+ (group_id, group.get("Title"), group.get("Year"), group.get("ImdbId"))
+ )
+
+ group_cols = "(group_id, torrentid, quality, source, container, codec, resolution, seeders, leechers, snatched)"
+ stmt = (
+ f"insert into ptp.torrent {group_cols}"
+ + " values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
+ + f" on conflict (torrentid) do update set {group_cols} = row(excluded.*)"
+ + ";"
+ )
+ for torrent in group["Torrents"]:
+ id_ = torrent["Id"]
+ quality = torrent.get("Quality")
+ source = torrent.get("Source")
+ container = torrent.get("Container")
+ codec = torrent.get("Codec")
+ resolution = torrent.get("Resolution")
+ seeders = int(torrent["Seeders"]) if "Seeders" in torrent else None
+ leechers = int(torrent["Leechers"]) if "Leechers" in torrent else None
+ snatched = int(torrent["Snatched"]) if "Snatched" in torrent else None
+ db_cursor.execute(
+ stmt,
+ (group_id, id_, quality, source, container, codec, resolution, seeders, leechers, snatched),
+ )
+
+
+with contextlib.closing(psycopg2.connect("")) as conn, conn.cursor() as cursor:
+ conn.autocommit = True
+
+ for p in paths:
+ with p.open("rb") as f:
+ json_data = bz2.decompress(f.read()).decode("utf-8")
+
+ doc = json.loads(json_data)
+ group_count = len(doc["Movies"])
+ torrent_count = sum(len(g["Torrents"]) for g in doc["Movies"])
+
+ ingest(doc, cursor)
+ print(p, file = sys.stderr)
+ print(" processed {} torrents in {} groups".format(torrent_count, group_count), file = sys.stderr)