--- /dev/null
+CREATE TABLE gdrive.gdrive_hash_job (
+ job_id serial PRIMARY KEY,
+ upload_id uuid NOT NULL REFERENCES gdrive.upload (upload_id),
+ upload_range int8range NOT NULL,
+ piece_size integer, -- log2,
+ hash_pieces boolean NOT NULL DEFAULT true,
+ hash_blocks boolean NOT NULL DEFAULT true,
+ done boolean NOT NULL default false
+);
+
+CREATE TABLE gdrive.gdrive_hash_worker (
+ backend_pid integer PRIMARY KEY,
+ job_id integer REFERENCES gdrive.gdrive_hash_job (job_id),
+ bw_limit integer DEFAULT 8192 -- in kB/s
+);
+
+CREATE TABLE gdrive.gdrive_hashes (
+ upload_id uuid NOT NULL REFERENCES gdrive.upload (upload_id),
+ upload_range int8range NOT NULL,
+ piece_size integer NOT NULL, -- log2
+ algo text,
+ hashes bytea,
+ CHECK (
+ ( /* expected number of digests */
+ ( /* number of whole pieces */
+ (upper(upload_range) - lower(upload_range)) /* job size */
+ / (1::bigint<<piece_size) -- /* intdiv by piece size */
+ )
+ + ( /* is there a partial piece? */
+ ( /* bytes left after last full piece */
+ (upper(upload_range) - lower(upload_range)) /* job size */
+ % (1::bigint<<piece_size) -- /* mod piece size */
+ ) != 0
+ )::integer::bigint
+ )
+ * octet_length(digest(''::bytea, algo)) /* bytes per digest */
+ = octet_length(hashes)
+ )
+);
+
+-- example of creating a series of 1gb jobs covering a single entity
+INSERT INTO gdrive.gdrive_hash_job (upload_id, upload_range, hash_pieces, hash_blocks) SELECT upload_id, int8range(lower(upload_range)+blknum*(1::bigint<<30), lower(upload_range)+LEAST((blknum+1)*(1::bigint<<30), length)), false, true FROM entity JOIN gdrive.slab ON (entity.entity_id=slab.entity_id AND slab.entity_range=int8range(0,entity.length)), LATERAL generate_series(0, (length/(1::bigint<<30)+(length%(1::bigint<<30)!=0)::int)-1) t1 (blknum) WHERE entity.entity_id IN ('91a02a99-c664-4470-a531-a76d8ba13907');
+
+-- example of creating 1gb hash jobs covering all (named non-torrent entities with slabs)
+INSERT INTO gdrive.gdrive_hash_job (upload_id, upload_range, hash_pieces, hash_blocks) SELECT upload_id, int8range(lower(upload_range)+blknum*(1::bigint<<30), lower(upload_range)+LEAST((blknum+1)*(1::bigint<<30), length)), false, true FROM entity JOIN gdrive.slab ON (entity.entity_id=slab.entity_id AND slab.entity_range=int8range(0,entity.length)) JOIN (SELECT entity_id FROM entity_name JOIN gdrive.slab USING (entity_id) EXCEPT SELECT entity_id FROM bittorrent.torrent_data_entity) t ON t.entity_id=entity.entity_id, LATERAL generate_series(0, (length/(1::bigint<<30)+(length%(1::bigint<<30)!=0)::int)-1) t1 (blknum);
+
+-- clean up crashed workers
+DELETE FROM gdrive.gdrive_hash_worker WHERE NOT EXISTS (SELECT * FROM pg_stat_activity WHERE application_name='gdrive_hash_worker' AND pid=backend_pid);
+
+-- keep access token renewed (from bingus)
+-- $ while true; do PGDATABASE=eros PGUSER=eros PGSSLCERT=.config/tls/anders\@bingus/cert.pem PGSSLKEY=.config/tls/anders\@bingus/privkey.pem PGHOST=hatnd.internet6.net. PGPORT=5433 python3 eros_gdrive_access_token.py && sleep 1800 || echo Hit Ctrl+C to stop && sleep 5 || break; done
+
+-- launch a worker over ssh (from bingus)
+-- $ echo <ssl key passphrase> | ssh -J acornell@cs.oberlin.edu acornell@<machine> source .local/bin/local_env \&\& PGDATABASE=eros PGUSER=occs PGPORT=5433 PGSSLCERT=.config/ssl/certs/acornell@cs.oberlin.edu/cert.pem PGSSLKEY=.config/ssl/private/acornell@cs.oberlin.edu/privkey.pem PGHOST=hatnd.internet6.net. PGAPPNAME=gdrive_hash_worker gdrive_hash_worker
+
+-- deployment (from repo root)
+-- $ sftp bingus
+-- > put python/scripts/gdrive_access_token.py eros_gdrive_access_token.py
+--
+-- $ sftp acornell@cs.oberlin.edu
+-- > put python/scripts/gdrive_hash_worker .local/bin/gdrive_hash_worker
--- /dev/null
+import psycopg2
+import urllib.request
+import urllib.parse
+import io
+import json
+import re
+
+#
+# refresh tokens
+#
+
+cn = psycopg2.connect('')
+cr = cn.cursor()
+
+cr.execute("BEGIN")
+cr.execute("SELECT id, client_id, client_secret, refresh_token FROM gdrive.access_token WHERE id IN (1) AND refresh_token IS NOT NULL AND expires IS NULL OR expires < CURRENT_TIMESTAMP + '30min' FOR NO KEY UPDATE")
+for (id_, client_id, client_secret, refresh_token) in cr.fetchall():
+ token_req = urllib.parse.urlencode((
+ ('grant_type', 'refresh_token'),
+ ('client_id', client_id),
+ ('client_secret', client_secret),
+ ('refresh_token', refresh_token))).encode('utf-8')
+ resp = urllib.request.urlopen(urllib.request.Request("https://www.googleapis.com/oauth2/v4/token", data=token_req, method='POST'))
+ assert resp.status == 200
+ new_token = json.load(io.TextIOWrapper(io.BufferedReader(resp)))
+ cr.execute("UPDATE gdrive.access_token SET access_token = %s, scope = %s, expires = CURRENT_TIMESTAMP + make_interval(secs => %s) WHERE id = %s", (new_token['access_token'], new_token['scope'], new_token['expires_in'], id_))
+
+cn.commit()
+
+cn.close()
--- /dev/null
+#!/usr/bin/python3
+
+import hashlib
+import psycopg2
+import subprocess
+from time import sleep
+
+cn = psycopg2.connect('')
+cn.set_isolation_level(0) # autocommit
+cr = cn.cursor()
+
+while True:
+ cr.execute('WITH thw AS (INSERT INTO gdrive.gdrive_hash_worker (backend_pid, job_id) SELECT pg_backend_pid(), job_id FROM gdrive.gdrive_hash_job tj JOIN (SELECT job_id FROM gdrive.gdrive_hash_job WHERE NOT done EXCEPT SELECT job_id FROM gdrive.gdrive_hash_worker LIMIT 1) ti USING (job_id) ON CONFLICT (backend_pid) DO UPDATE SET job_id=EXCLUDED.job_id RETURNING job_id, bw_limit) SELECT job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, lower(upload_range), upper(upload_range) FROM thw JOIN gdrive.gdrive_hash_job USING (job_id) JOIN gdrive.upload USING (upload_id) JOIN gdrive.file USING (upload_id), gdrive.access_token WHERE access_token.id=1 AND access_token.expires>CURRENT_TIMESTAMP')
+ try:
+ [(job_id, bw_limit, piece_size, hash_pieces, hash_blocks, access_token, gdrive_file_id, encryption_key, upload_start, upload_end)] = cr.fetchall()
+ except ValueError: # no jobs
+ sleep(60)
+ continue
+
+ upload_len = upload_end - upload_start
+ upload_waste_start = upload_start % 16
+ upload_blk_start = upload_start - upload_waste_start
+ upload_waste_end = (16 - upload_end % 16 ) % 16
+ upload_blk_end = upload_end + upload_waste_end + 15
+
+ buf = bytearray(16384)
+
+ num_blocks = upload_len >> 14
+ if upload_len > (num_blocks << 14):
+ num_blocks += 1
+
+ if hash_blocks:
+ block_hashes = bytearray(num_blocks * 32)
+ else:
+ block_hashes = None
+
+ if hash_pieces:
+ num_pieces = upload_len >> piece_size
+ if upload_len > (num_pieces << piece_size):
+ num_pieces += 1
+ log2_blocks_per_piece = piece_size - 14
+ piece_hashes = bytearray(num_pieces * 20)
+ else:
+ num_pieces = None
+ piece_hashes = None
+
+ if bw_limit is not None:
+ bw_limit_str = " '--limit-rate' '{}k'".format(bw_limit)
+ else:
+ bw_limit_str = ''
+
+ coproc = subprocess.Popen(["curl '--fail'{} '-H' 'Authorization: Bearer {}' '-H' 'Range: bytes={}-{}' 'https://www.googleapis.com/drive/v3/files/{}?alt=media' | openssl 'enc' '-aes-128-cbc' '-d' '-nopad' '-K' '{}' '-iv' \"`head --bytes 16 | xxd -p`\" | dd 'bs=16M' 'skip={}' 'count={}' 'iflag=skip_bytes,count_bytes,fullblock' 'status=none'".format(bw_limit_str, access_token, upload_blk_start, upload_blk_end, gdrive_file_id, encryption_key.hex(), upload_waste_start, upload_len)], stdout=subprocess.PIPE, shell=True)
+
+ block = 0
+ for piece in range(num_pieces or 1):
+ if hash_pieces:
+ piece_end = min((piece+1) << log2_blocks_per_piece, num_blocks)
+ piece_hasher = hashlib.sha1()
+ else:
+ piece_end = num_blocks
+ while block < piece_end:
+ bytes_to_end = upload_len - (block << 14)
+ if bytes_to_end < 16384:
+ buf = buf[:bytes_to_end]
+ assert coproc.stdout.readinto(buf) == len(buf)
+ if hash_blocks:
+ hashbuf_offset = block << 5
+ block_hashes[hashbuf_offset:hashbuf_offset+32] = hashlib.sha256(buf).digest()
+ if hash_pieces:
+ piece_hasher.update(buf)
+ block += 1
+ if hash_pieces:
+ hashbuf_offset = piece * 20
+ piece_hashes[hashbuf_offset:hashbuf_offset+20] = piece_hasher.digest()
+ assert coproc.stdout.read(1) == b'' # EOF
+ assert coproc.wait() == 0 # subprocess success
+
+ cr.execute("""
+ WITH ti AS (SELECT
+ %s AS job_id,
+ %s::integer AS piece_size,
+ %s AS hash_pieces,
+ %s AS hash_blocks,
+ upload_id,
+ int8range(%s, %s) AS upload_range,
+ %s::bytea AS piece_hashes,
+ %s::bytea AS block_hashes
+ FROM gdrive.file
+ WHERE gdrive_file_id=%s),
+ th AS (INSERT INTO gdrive.gdrive_hashes (upload_id, upload_range, piece_size, algo, hashes)
+ SELECT upload_id, upload_range, piece_size, 'sha1', piece_hashes FROM ti WHERE hash_pieces
+ UNION SELECT upload_id, upload_range, 14, 'sha256', block_hashes FROM ti WHERE hash_blocks),
+ thw AS (UPDATE gdrive.gdrive_hash_worker ts SET job_id=NULL
+ FROM ti WHERE backend_pid=pg_backend_pid() AND ts.job_id=ti.job_id)
+ UPDATE gdrive.gdrive_hash_job ts
+ SET upload_id=ti.upload_id, upload_range=ti.upload_range, piece_size=ti.piece_size,
+ hash_pieces=ti.hash_pieces, hash_blocks=ti.hash_blocks, done=true
+ FROM ti WHERE ts.job_id=ti.job_id
+ """, (job_id, piece_size, hash_pieces, hash_blocks, upload_start, upload_end, piece_hashes, block_hashes, gdrive_file_id))