hash worker stuff
authorAnders Cornell <anders@acorn.pw>
Tue, 18 Jan 2022 00:32:03 +0000 (19:32 -0500)
committerAnders Cornell <anders@acorn.pw>
Tue, 18 Jan 2022 00:32:03 +0000 (19:32 -0500)
doc/gdrive_hash_worker.sql [new file with mode: 0644]
python/scripts/gdrive_access_token.py [new file with mode: 0644]
python/scripts/gdrive_hash_worker [new file with mode: 0755]

diff --git a/doc/gdrive_hash_worker.sql b/doc/gdrive_hash_worker.sql
new file mode 100644 (file)
index 0000000..a1bb98e
--- /dev/null
@@ -0,0 +1,61 @@
+CREATE TABLE gdrive.gdrive_hash_job (
+       job_id serial PRIMARY KEY,
+       upload_id uuid NOT NULL REFERENCES gdrive.upload (upload_id),
+       upload_range int8range NOT NULL,
+       piece_size integer, -- log2,
+       hash_pieces boolean NOT NULL DEFAULT true,
+       hash_blocks boolean NOT NULL DEFAULT true,
+       done boolean NOT NULL default false
+);
+
+CREATE TABLE gdrive.gdrive_hash_worker (
+       backend_pid integer PRIMARY KEY,
+       job_id integer REFERENCES gdrive.gdrive_hash_job (job_id),
+       bw_limit integer DEFAULT 8192 -- in kB/s
+);
+
+CREATE TABLE gdrive.gdrive_hashes (
+       upload_id uuid NOT NULL REFERENCES gdrive.upload (upload_id),
+       upload_range int8range NOT NULL,
+       piece_size integer NOT NULL, -- log2
+       algo text,
+       hashes bytea,
+       CHECK (
+               ( /* expected number of digests */
+                       ( /* number of whole pieces */
+                               (upper(upload_range) - lower(upload_range)) /* job size */
+                               / (1::bigint<<piece_size) -- /* intdiv by piece size */
+                       )
+                       + ( /* is there a partial piece? */
+                               ( /* bytes left after last full piece */
+                                       (upper(upload_range) - lower(upload_range)) /* job size */
+                                       % (1::bigint<<piece_size) -- /* mod piece size */
+                               ) != 0
+                       )::integer::bigint
+               )
+               * octet_length(digest(''::bytea, algo)) /* bytes per digest */
+               = octet_length(hashes)
+       )
+);
+
+-- example of creating a series of 1gb jobs covering a single entity
+INSERT INTO gdrive.gdrive_hash_job (upload_id, upload_range, hash_pieces, hash_blocks) SELECT upload_id, int8range(lower(upload_range)+blknum*(1::bigint<<30), lower(upload_range)+LEAST((blknum+1)*(1::bigint<<30), length)), false, true FROM entity JOIN gdrive.slab ON (entity.entity_id=slab.entity_id AND slab.entity_range=int8range(0,entity.length)), LATERAL generate_series(0, (length/(1::bigint<<30)+(length%(1::bigint<<30)!=0)::int)-1) t1 (blknum) WHERE entity.entity_id IN ('91a02a99-c664-4470-a531-a76d8ba13907');
+
+-- example of creating 1gb hash jobs covering all (named non-torrent entities with slabs)
+INSERT INTO gdrive.gdrive_hash_job (upload_id, upload_range, hash_pieces, hash_blocks) SELECT upload_id, int8range(lower(upload_range)+blknum*(1::bigint<<30), lower(upload_range)+LEAST((blknum+1)*(1::bigint<<30), length)), false, true FROM entity JOIN gdrive.slab ON (entity.entity_id=slab.entity_id AND slab.entity_range=int8range(0,entity.length)) JOIN (SELECT entity_id FROM entity_name JOIN gdrive.slab USING (entity_id) EXCEPT SELECT entity_id FROM bittorrent.torrent_data_entity) t ON t.entity_id=entity.entity_id, LATERAL generate_series(0, (length/(1::bigint<<30)+(length%(1::bigint<<30)!=0)::int)-1) t1 (blknum);
+
+-- clean up crashed workers
+DELETE FROM gdrive.gdrive_hash_worker WHERE NOT EXISTS (SELECT * FROM pg_stat_activity WHERE application_name='gdrive_hash_worker' AND pid=backend_pid);
+
+-- keep access token renewed (from bingus)
+-- $ while true; do PGDATABASE=eros PGUSER=eros PGSSLCERT=.config/tls/anders\@bingus/cert.pem PGSSLKEY=.config/tls/anders\@bingus/privkey.pem PGHOST=hatnd.internet6.net. PGPORT=5433 python3 eros_gdrive_access_token.py && sleep 1800 || echo Hit Ctrl+C to stop && sleep 5 || break; done
+
+-- launch a worker over ssh (from bingus)
+-- $ echo <ssl key passphrase> | ssh -J acornell@cs.oberlin.edu acornell@<machine> source .local/bin/local_env \&\& PGDATABASE=eros PGUSER=occs PGPORT=5433 PGSSLCERT=.config/ssl/certs/acornell@cs.oberlin.edu/cert.pem PGSSLKEY=.config/ssl/private/acornell@cs.oberlin.edu/privkey.pem PGHOST=hatnd.internet6.net. PGAPPNAME=gdrive_hash_worker gdrive_hash_worker
+
+-- deployment (from repo root)
+-- $ sftp bingus
+-- > put python/scripts/gdrive_access_token.py eros_gdrive_access_token.py
+--
+-- $ sftp acornell@cs.oberlin.edu
+-- > put python/scripts/gdrive_hash_worker .local/bin/gdrive_hash_worker
diff --git a/python/scripts/gdrive_access_token.py b/python/scripts/gdrive_access_token.py
new file mode 100644 (file)
index 0000000..d52543e
--- /dev/null
@@ -0,0 +1,30 @@
+import psycopg2
+import urllib.request
+import urllib.parse
+import io
+import json
+import re
+
+#
+# refresh tokens
+#
+
+cn = psycopg2.connect('')
+cr = cn.cursor()
+
+cr.execute("BEGIN")
+cr.execute("SELECT id, client_id, client_secret, refresh_token FROM gdrive.access_token WHERE id IN (1) AND refresh_token IS NOT NULL AND expires IS NULL OR expires < CURRENT_TIMESTAMP + '30min' FOR NO KEY UPDATE")
+for (id_, client_id, client_secret, refresh_token) in cr.fetchall():
+       token_req = urllib.parse.urlencode((
+                       ('grant_type', 'refresh_token'),
+                       ('client_id', client_id),
+                       ('client_secret', client_secret),
+                       ('refresh_token', refresh_token))).encode('utf-8')
+       resp = urllib.request.urlopen(urllib.request.Request("https://www.googleapis.com/oauth2/v4/token", data=token_req, method='POST'))
+       assert resp.status == 200
+       new_token = json.load(io.TextIOWrapper(io.BufferedReader(resp)))
+       cr.execute("UPDATE gdrive.access_token SET access_token = %s, scope = %s, expires = CURRENT_TIMESTAMP + make_interval(secs => %s) WHERE id = %s", (new_token['access_token'], new_token['scope'], new_token['expires_in'], id_))
+
+cn.commit()
+
+cn.close()
diff --git a/python/scripts/gdrive_hash_worker b/python/scripts/gdrive_hash_worker
new file mode 100755 (executable)
index 0000000..6f6fa25
--- /dev/null
@@ -0,0 +1,99 @@
+#!/usr/bin/python3
+
+import hashlib
+import psycopg2
+import subprocess
+from time import sleep
+
+cn = psycopg2.connect('')
+cn.set_isolation_level(0) # autocommit
+cr = cn.cursor()
+
+while True:
+       cr.execute('WITH thw AS (INSERT INTO gdrive.gdrive_hash_worker (backend_pid, job_id) SELECT pg_backend_pid(), job_id FROM gdrive.gdrive_hash_job tj JOIN (SELECT job_id FROM gdrive.gdrive_hash_job WHERE NOT done EXCEPT SELECT job_id FROM gdrive.gdrive_hash_worker LIMIT 1) ti USING (job_id) ON CONFLICT (backend_pid) DO UPDATE SET job_id=EXCLUDED.job_id RETURNING job_id, bw_limit) SELECT job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, lower(upload_range), upper(upload_range) FROM thw JOIN gdrive.gdrive_hash_job USING (job_id) JOIN gdrive.upload USING (upload_id) JOIN gdrive.file USING (upload_id), gdrive.access_token WHERE access_token.id=1 AND access_token.expires>CURRENT_TIMESTAMP')
+       try:
+               [(job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, upload_start, upload_end)] = cr.fetchall()
+       except ValueError: # no jobs
+               sleep(60)
+               continue
+
+       upload_len = upload_end - upload_start
+       upload_waste_start = upload_start % 16
+       upload_blk_start = upload_start - upload_waste_start
+       upload_waste_end = (16 - upload_end % 16 ) % 16
+       upload_blk_end = upload_end + upload_waste_end + 15
+
+       buf = bytearray(16384)
+
+       num_blocks = upload_len >> 14
+       if upload_len > (num_blocks << 14):
+               num_blocks += 1
+
+       if hash_blocks:
+               block_hashes = bytearray(num_blocks * 32)
+       else:
+               block_hashes = None
+
+       if hash_pieces:
+               num_pieces = upload_len >> piece_size
+               if upload_len > (num_pieces << piece_size):
+                       num_pieces += 1
+               log2_blocks_per_piece = piece_size - 14
+               piece_hashes = bytearray(num_pieces * 20)
+       else:
+               num_pieces = None
+               piece_hashes = None
+       
+       if bw_limit is not None:
+               bw_limit_str = " '--limit-rate' '{}k'".format(bw_limit)
+       else:
+               bw_limit_str = ''
+
+       coproc = subprocess.Popen(["curl '--fail'{} '-H' 'Authorization: Bearer {}' '-H' 'Range: bytes={}-{}' 'https://www.googleapis.com/drive/v3/files/{}?alt=media' | openssl 'enc' '-aes-128-cbc' '-d' '-nopad' '-K' '{}' '-iv' \"`head --bytes 16 | xxd -p`\" | dd 'bs=16M' 'skip={}' 'count={}' 'iflag=skip_bytes,count_bytes,fullblock' 'status=none'".format(bw_limit_str, access_token, upload_blk_start, upload_blk_end, gdrive_file_id, encryption_key.hex(), upload_waste_start, upload_len)], stdout=subprocess.PIPE, shell=True)
+
+       block = 0
+       for piece in range(num_pieces or 1):
+               if hash_pieces:
+                       piece_end = min((piece+1) << log2_blocks_per_piece, num_blocks)
+                       piece_hasher = hashlib.sha1()
+               else:
+                       piece_end = num_blocks
+               while block < piece_end:
+                       bytes_to_end = upload_len - (block << 14)
+                       if bytes_to_end < 16384:
+                               buf = buf[:bytes_to_end]
+                       assert coproc.stdout.readinto(buf) == len(buf)
+                       if hash_blocks:
+                               hashbuf_offset = block << 5
+                               block_hashes[hashbuf_offset:hashbuf_offset+32] = hashlib.sha256(buf).digest()
+                       if hash_pieces:
+                               piece_hasher.update(buf)
+                       block += 1
+               if hash_pieces:
+                       hashbuf_offset = piece * 20
+                       piece_hashes[hashbuf_offset:hashbuf_offset+20] = piece_hasher.digest()
+       assert coproc.stdout.read(1) == b'' # EOF
+       assert coproc.wait() == 0 # subprocess success
+
+       cr.execute("""
+               WITH ti AS (SELECT
+                               %s AS job_id,
+                               %s::integer AS piece_size,
+                               %s AS hash_pieces,
+                               %s AS hash_blocks,
+                               upload_id,
+                               int8range(%s, %s) AS upload_range,
+                               %s::bytea AS piece_hashes,
+                               %s::bytea AS block_hashes
+                       FROM gdrive.file
+                       WHERE gdrive_file_id=%s),
+               th AS (INSERT INTO gdrive.gdrive_hashes (upload_id, upload_range, piece_size, algo, hashes)
+                       SELECT upload_id, upload_range, piece_size, 'sha1', piece_hashes FROM ti WHERE hash_pieces
+                       UNION SELECT upload_id, upload_range, 14, 'sha256', block_hashes FROM ti WHERE hash_blocks),
+               thw AS (UPDATE gdrive.gdrive_hash_worker ts SET job_id=NULL
+                       FROM ti WHERE backend_pid=pg_backend_pid() AND ts.job_id=ti.job_id)
+               UPDATE gdrive.gdrive_hash_job ts
+               SET upload_id=ti.upload_id, upload_range=ti.upload_range, piece_size=ti.piece_size,
+                       hash_pieces=ti.hash_pieces, hash_blocks=ti.hash_blocks, done=true
+               FROM ti WHERE ts.job_id=ti.job_id
+       """, (job_id, piece_size, hash_pieces, hash_blocks, upload_start, upload_end, piece_hashes, block_hashes, gdrive_file_id))