From: Anders Cornell Date: Tue, 18 Jan 2022 00:32:03 +0000 (-0500) Subject: hash worker stuff X-Git-Url: https://jcornell.net/gitweb/gitweb.cgi?a=commitdiff_plain;h=5dd8fb3cad8d58ac869a6c775ffd027d04b0cefb;p=eros.git hash worker stuff --- diff --git a/doc/gdrive_hash_worker.sql b/doc/gdrive_hash_worker.sql new file mode 100644 index 0000000..a1bb98e --- /dev/null +++ b/doc/gdrive_hash_worker.sql @@ -0,0 +1,61 @@ +CREATE TABLE gdrive.gdrive_hash_job ( + job_id serial PRIMARY KEY, + upload_id uuid NOT NULL REFERENCES gdrive.upload (upload_id), + upload_range int8range NOT NULL, + piece_size integer, -- log2, + hash_pieces boolean NOT NULL DEFAULT true, + hash_blocks boolean NOT NULL DEFAULT true, + done boolean NOT NULL default false +); + +CREATE TABLE gdrive.gdrive_hash_worker ( + backend_pid integer PRIMARY KEY, + job_id integer REFERENCES gdrive.gdrive_hash_job (job_id), + bw_limit integer DEFAULT 8192 -- in kB/s +); + +CREATE TABLE gdrive.gdrive_hashes ( + upload_id uuid NOT NULL REFERENCES gdrive.upload (upload_id), + upload_range int8range NOT NULL, + piece_size integer NOT NULL, -- log2 + algo text, + hashes bytea, + CHECK ( + ( /* expected number of digests */ + ( /* number of whole pieces */ + (upper(upload_range) - lower(upload_range)) /* job size */ + / (1::bigint< | ssh -J acornell@cs.oberlin.edu acornell@ source .local/bin/local_env \&\& PGDATABASE=eros PGUSER=occs PGPORT=5433 PGSSLCERT=.config/ssl/certs/acornell@cs.oberlin.edu/cert.pem PGSSLKEY=.config/ssl/private/acornell@cs.oberlin.edu/privkey.pem PGHOST=hatnd.internet6.net. PGAPPNAME=gdrive_hash_worker gdrive_hash_worker + +-- deployment (from repo root) +-- $ sftp bingus +-- > put python/scripts/gdrive_access_token.py eros_gdrive_access_token.py +-- +-- $ sftp acornell@cs.oberlin.edu +-- > put python/scripts/gdrive_hash_worker .local/bin/gdrive_hash_worker diff --git a/python/scripts/gdrive_access_token.py b/python/scripts/gdrive_access_token.py new file mode 100644 index 0000000..d52543e --- /dev/null +++ b/python/scripts/gdrive_access_token.py @@ -0,0 +1,30 @@ +import psycopg2 +import urllib.request +import urllib.parse +import io +import json +import re + +# +# refresh tokens +# + +cn = psycopg2.connect('') +cr = cn.cursor() + +cr.execute("BEGIN") +cr.execute("SELECT id, client_id, client_secret, refresh_token FROM gdrive.access_token WHERE id IN (1) AND refresh_token IS NOT NULL AND expires IS NULL OR expires < CURRENT_TIMESTAMP + '30min' FOR NO KEY UPDATE") +for (id_, client_id, client_secret, refresh_token) in cr.fetchall(): + token_req = urllib.parse.urlencode(( + ('grant_type', 'refresh_token'), + ('client_id', client_id), + ('client_secret', client_secret), + ('refresh_token', refresh_token))).encode('utf-8') + resp = urllib.request.urlopen(urllib.request.Request("https://www.googleapis.com/oauth2/v4/token", data=token_req, method='POST')) + assert resp.status == 200 + new_token = json.load(io.TextIOWrapper(io.BufferedReader(resp))) + cr.execute("UPDATE gdrive.access_token SET access_token = %s, scope = %s, expires = CURRENT_TIMESTAMP + make_interval(secs => %s) WHERE id = %s", (new_token['access_token'], new_token['scope'], new_token['expires_in'], id_)) + +cn.commit() + +cn.close() diff --git a/python/scripts/gdrive_hash_worker b/python/scripts/gdrive_hash_worker new file mode 100755 index 0000000..6f6fa25 --- /dev/null +++ b/python/scripts/gdrive_hash_worker @@ -0,0 +1,99 @@ +#!/usr/bin/python3 + +import hashlib +import psycopg2 +import subprocess +from time import sleep + +cn = psycopg2.connect('') +cn.set_isolation_level(0) # autocommit +cr = cn.cursor() + +while True: + cr.execute('WITH thw AS (INSERT INTO gdrive.gdrive_hash_worker (backend_pid, job_id) SELECT pg_backend_pid(), job_id FROM gdrive.gdrive_hash_job tj JOIN (SELECT job_id FROM gdrive.gdrive_hash_job WHERE NOT done EXCEPT SELECT job_id FROM gdrive.gdrive_hash_worker LIMIT 1) ti USING (job_id) ON CONFLICT (backend_pid) DO UPDATE SET job_id=EXCLUDED.job_id RETURNING job_id, bw_limit) SELECT job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, lower(upload_range), upper(upload_range) FROM thw JOIN gdrive.gdrive_hash_job USING (job_id) JOIN gdrive.upload USING (upload_id) JOIN gdrive.file USING (upload_id), gdrive.access_token WHERE access_token.id=1 AND access_token.expires>CURRENT_TIMESTAMP') + try: + [(job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, upload_start, upload_end)] = cr.fetchall() + except ValueError: # no jobs + sleep(60) + continue + + upload_len = upload_end - upload_start + upload_waste_start = upload_start % 16 + upload_blk_start = upload_start - upload_waste_start + upload_waste_end = (16 - upload_end % 16 ) % 16 + upload_blk_end = upload_end + upload_waste_end + 15 + + buf = bytearray(16384) + + num_blocks = upload_len >> 14 + if upload_len > (num_blocks << 14): + num_blocks += 1 + + if hash_blocks: + block_hashes = bytearray(num_blocks * 32) + else: + block_hashes = None + + if hash_pieces: + num_pieces = upload_len >> piece_size + if upload_len > (num_pieces << piece_size): + num_pieces += 1 + log2_blocks_per_piece = piece_size - 14 + piece_hashes = bytearray(num_pieces * 20) + else: + num_pieces = None + piece_hashes = None + + if bw_limit is not None: + bw_limit_str = " '--limit-rate' '{}k'".format(bw_limit) + else: + bw_limit_str = '' + + coproc = subprocess.Popen(["curl '--fail'{} '-H' 'Authorization: Bearer {}' '-H' 'Range: bytes={}-{}' 'https://www.googleapis.com/drive/v3/files/{}?alt=media' | openssl 'enc' '-aes-128-cbc' '-d' '-nopad' '-K' '{}' '-iv' \"`head --bytes 16 | xxd -p`\" | dd 'bs=16M' 'skip={}' 'count={}' 'iflag=skip_bytes,count_bytes,fullblock' 'status=none'".format(bw_limit_str, access_token, upload_blk_start, upload_blk_end, gdrive_file_id, encryption_key.hex(), upload_waste_start, upload_len)], stdout=subprocess.PIPE, shell=True) + + block = 0 + for piece in range(num_pieces or 1): + if hash_pieces: + piece_end = min((piece+1) << log2_blocks_per_piece, num_blocks) + piece_hasher = hashlib.sha1() + else: + piece_end = num_blocks + while block < piece_end: + bytes_to_end = upload_len - (block << 14) + if bytes_to_end < 16384: + buf = buf[:bytes_to_end] + assert coproc.stdout.readinto(buf) == len(buf) + if hash_blocks: + hashbuf_offset = block << 5 + block_hashes[hashbuf_offset:hashbuf_offset+32] = hashlib.sha256(buf).digest() + if hash_pieces: + piece_hasher.update(buf) + block += 1 + if hash_pieces: + hashbuf_offset = piece * 20 + piece_hashes[hashbuf_offset:hashbuf_offset+20] = piece_hasher.digest() + assert coproc.stdout.read(1) == b'' # EOF + assert coproc.wait() == 0 # subprocess success + + cr.execute(""" + WITH ti AS (SELECT + %s AS job_id, + %s::integer AS piece_size, + %s AS hash_pieces, + %s AS hash_blocks, + upload_id, + int8range(%s, %s) AS upload_range, + %s::bytea AS piece_hashes, + %s::bytea AS block_hashes + FROM gdrive.file + WHERE gdrive_file_id=%s), + th AS (INSERT INTO gdrive.gdrive_hashes (upload_id, upload_range, piece_size, algo, hashes) + SELECT upload_id, upload_range, piece_size, 'sha1', piece_hashes FROM ti WHERE hash_pieces + UNION SELECT upload_id, upload_range, 14, 'sha256', block_hashes FROM ti WHERE hash_blocks), + thw AS (UPDATE gdrive.gdrive_hash_worker ts SET job_id=NULL + FROM ti WHERE backend_pid=pg_backend_pid() AND ts.job_id=ti.job_id) + UPDATE gdrive.gdrive_hash_job ts + SET upload_id=ti.upload_id, upload_range=ti.upload_range, piece_size=ti.piece_size, + hash_pieces=ti.hash_pieces, hash_blocks=ti.hash_blocks, done=true + FROM ti WHERE ts.job_id=ti.job_id + """, (job_id, piece_size, hash_pieces, hash_blocks, upload_start, upload_end, piece_hashes, block_hashes, gdrive_file_id))