insert cod
authorAnders Cornell <anders@acorn.pw>
Tue, 29 Mar 2022 00:44:03 +0000 (20:44 -0400)
committerAnders Cornell <anders@acorn.pw>
Tue, 29 Mar 2022 00:44:03 +0000 (20:44 -0400)
small changes to verify and scrape
add sql files for useful info
add classic entity_download script
add disk_jumble.write program
add disk_jumble.disk program

13 files changed:
disk_jumble/src/disk_jumble/disk.py
disk_jumble/src/disk_jumble/scrape.py
disk_jumble/src/disk_jumble/verify.py
disk_jumble/src/disk_jumble/write.py [new file with mode: 0644]
doc/gdrive_hash_worker.sql [deleted file]
misc/agg_cte.sql [new file with mode: 0644]
scripts/entity_download [new file with mode: 0755]
sql/diskjumble.sql [new file with mode: 0644]
sql/diskjumble_scheduling.sql [new file with mode: 0644]
sql/diskjumble_writing.sql [new file with mode: 0644]
sql/gdrive_download.sql [new file with mode: 0644]
sql/gdrive_hash_worker.sql [new file with mode: 0644]
sql/merkle_entities.sql [new file with mode: 0644]

index 8ddb41cc68cb504a41f00d14448b83b642199aad..dd050521dbf10ba7b0d6bb95aaf5b670a1aaafe2 100644 (file)
@@ -24,7 +24,7 @@ class Device:
                size_in_512b = int(open('/sys/dev/block/{}/size'.format(self.rdev_str)).read())
                if self.sector_size == 4096:
                        assert size_in_512b % 8 == 0
-                       self.num_sectors = size_in_512b / 8
+                       self.num_sectors = size_in_512b // 8
                        self.header_sectors = 256
                elif self.sector_size == 512:
                        self.num_sectors = size_in_512b
@@ -57,15 +57,13 @@ class Device:
                                        if header_data is None or header_data[0] != dev_uuid:
                                                os.close(dev_fd)
                                                continue
-                                       assert header_data[3] == self.num_blocks
                                except:
                                        os.close(dev_fd)
                                        raise
                                else:
-                                       self.device = device
-                                       break
+                                       return device
                        else:
-                               raise RuntimeError("Didn't find disk {}".format(disk_id))
+                               raise RuntimeError("no_matching_disk")
 
        def read_header(self):
                assert self.fd is not None
@@ -80,9 +78,9 @@ class Device:
                        dev_uuid = uuid.UUID(bytes=dev_uuid)
                        assert sector_size == self.sector_size
                        assert header_sectors == self.header_sectors
-                       sectors_per_block = 16384 / self.sector_size
+                       sectors_per_block = 16384 // self.sector_size
                        assert data_sectors % sectors_per_block == 0
-                       num_blocks = data_sectors / sectors_per_block
+                       num_blocks = data_sectors // sectors_per_block
                        assert self.header_sectors + data_sectors <= self.num_sectors
                        return (dev_uuid, self.sector_size, self.header_sectors, num_blocks)
                else:
@@ -92,14 +90,14 @@ class Device:
                if (not force) and self.read_header():
                        raise RuntimeError('Refusing to overwrite existing diskjumble header')
                data_sectors_avail = self.num_sectors - self.header_sectors
-               sectors_per_block = 16384 / self.sector_size
+               sectors_per_block = 16384 // self.sector_size
                if num_blocks is None:
                        num_blocks = data_sectors_avail // sectors_per_block
                data_sectors = num_blocks * sectors_per_block
                assert self.header_sectors + data_sectors <= self.num_sectors
-               header_data = struct.pack('>16s16sQQQ', Device.magic, dev_uuid.bytes, self.sector_size, self.header_sectors, data_sectors).ljust(sector_size, b'\0')
+               header_data = struct.pack('>16s16sQQQ', Device.magic, dev_uuid.bytes, self.sector_size, self.header_sectors, data_sectors).ljust(self.sector_size, b'\0')
                for i in range(0, self.header_sectors):
-                       os.pwrite(self.fd, header_data, i*sector_size)
+                       os.pwrite(self.fd, header_data, i*self.sector_size)
        
        def close(self):
                if self.fd:
@@ -129,7 +127,7 @@ class Disk:
                                        dev_uuid = header_data[0]
                        cursor.execute('''SELECT disk_id, dev_uuid, dev_serial, num_blocks
                                        FROM diskjumble.disk
-                                       WHERE coalesce(disk_id=%s, true) AND coalesce(dev_uuid=%s, true)''', (disk_id, dev_uuid))
+                                       WHERE coalesce(disk_id=%s, true) AND coalesce(dev_uuid=%s, true)''', (disk_id, str(dev_uuid) if dev_uuid else None))
                        [(row)] = cursor.fetchall()
                        disk_id = row[0]
                        dev_uuid = uuid.UUID(row[1])
@@ -152,7 +150,7 @@ class Disk:
                device = Device(os.open(devname, os.O_RDWR))
 
                data_sectors_avail = device.num_sectors - device.header_sectors
-               sectors_per_block = 16384 / device.sector_size
+               sectors_per_block = 16384 // device.sector_size
 
                if num_blocks is None:
                        num_blocks = data_sectors_avail // sectors_per_block
@@ -165,7 +163,7 @@ class Disk:
                        self.device.close()
 
        def write_header(self, force=False):
-               self.device.write_header(self.dev_uuid, force)
+               self.device.write_header(self.dev_uuid, force=force)
 
        def mapping_name(self):
                assert self.disk_id is not None
@@ -206,5 +204,5 @@ if __name__ == '__main__':
                if args.up:
                        inst.create_mapping()
                if args.new:
-                       inst.save(conn)
+                       inst.save()
                        conn.commit()
index 4143b8a6a74fa1f8852923dcfa4a8c4f1df30c93..6130ed388d21531a8264dafe8dc57134bf81c320 100644 (file)
@@ -68,7 +68,7 @@ def scrape_batch(tracker: Tracker, info_hashes: Iterable[bytes], passkey: str) -
 
 
 if __name__ == "__main__":
-       PSQL_PARAMS = ["dbname", "user", "password", "host", "port"]
+       PSQL_PARAMS = ["dbname", "user", "password", "host", "port", "service"]
 
        def tracker_name(val):
                matches = [n for n in TRACKERS.keys() if n.lower() == val.lower()]
index 63f9ffb0d227d5ad59d92574831e705c6439bccc..2f61e3c5b6d1a769aab708b87d3a08ac4436f770 100644 (file)
@@ -242,12 +242,12 @@ def _gen_verify_results(conn, disk_id: int, disk_file: io.BufferedIOBase, read_s
                                                        *,
                                                        int8range(
                                                                slab.entity_offset / %(block_size)s,
-                                                               salb.entity_offset / %(block_size)s + upper(slab.disk_blocks) - lower(slab.disk_blocks)
+                                                               slab.entity_offset / %(block_size)s + upper(slab.disk_blocks) - lower(slab.disk_blocks)
                                                        ) as entity_blocks
                                                from diskjumble.slab
                                        )
                                select
-                                       entity_id,
+                                       elh.entity_id,
                                        generate_series(
                                                lower(slab_plus.entity_blocks * elh.block_range),
                                                upper(slab_plus.entity_blocks * elh.block_range) - 1
@@ -274,7 +274,7 @@ def _gen_verify_results(conn, disk_id: int, disk_file: io.BufferedIOBase, read_s
                                                slab_plus.entity_id = elh.entity_id
                                                and slab_plus.entity_blocks && elh.block_range
                                        )
-                                       left outer join public.entity using (entity_id)
+                                       left outer join public.entity on elh.entity_id=entity.entity_id
                                )
                                where slab_plus.disk_id = %(disk_id)s
                                order by sector
diff --git a/disk_jumble/src/disk_jumble/write.py b/disk_jumble/src/disk_jumble/write.py
new file mode 100644 (file)
index 0000000..9aea69e
--- /dev/null
@@ -0,0 +1,25 @@
+import subprocess
+import psycopg2
+import contextlib
+
+if __name__ == '__main__':
+       import argparse
+       arg_parser = argparse.ArgumentParser()
+       arg_parser.add_argument('disk_id', help='integer disk_id', type=int)
+       arg_parser.add_argument('--repo', help='path to find entities', required=True)
+       args = arg_parser.parse_args()
+
+       with contextlib.closing(psycopg2.connect("")) as conn, conn.cursor() as cursor:
+               cursor.execute("SELECT disk_id, lower(disk_blocks), upper(disk_blocks)-lower(disk_blocks), entity_id, entity_offset FROM diskjumble.slab WHERE disk_id=%s AND NOT realized ORDER BY lower(disk_blocks)", (args.disk_id,))
+               for (disk_id, disk_offset, disk_blocks, entity_id, entity_offset) in cursor.fetchall():
+                       subprocess.run(['dd',
+                                       'if={}/{}'.format(args.repo, entity_id.hex()),
+                                       'skip={}'.format(entity_offset),
+                                       'count={}'.format(16384*disk_blocks),
+                                       'iflag=skip_bytes,count_bytes,fullblock',
+                                       'conv=nocreat',
+                                       'of=/dev/mapper/diskjumble-{}'.format(disk_id),
+                                       'oflag=seek_bytes',
+                                       'seek={}'.format(16384*disk_offset),
+                                       'bs=1M', 'status=progress'
+                               ], check=True)
diff --git a/doc/gdrive_hash_worker.sql b/doc/gdrive_hash_worker.sql
deleted file mode 100644 (file)
index a1bb98e..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-CREATE TABLE gdrive.gdrive_hash_job (
-       job_id serial PRIMARY KEY,
-       upload_id uuid NOT NULL REFERENCES gdrive.upload (upload_id),
-       upload_range int8range NOT NULL,
-       piece_size integer, -- log2,
-       hash_pieces boolean NOT NULL DEFAULT true,
-       hash_blocks boolean NOT NULL DEFAULT true,
-       done boolean NOT NULL default false
-);
-
-CREATE TABLE gdrive.gdrive_hash_worker (
-       backend_pid integer PRIMARY KEY,
-       job_id integer REFERENCES gdrive.gdrive_hash_job (job_id),
-       bw_limit integer DEFAULT 8192 -- in kB/s
-);
-
-CREATE TABLE gdrive.gdrive_hashes (
-       upload_id uuid NOT NULL REFERENCES gdrive.upload (upload_id),
-       upload_range int8range NOT NULL,
-       piece_size integer NOT NULL, -- log2
-       algo text,
-       hashes bytea,
-       CHECK (
-               ( /* expected number of digests */
-                       ( /* number of whole pieces */
-                               (upper(upload_range) - lower(upload_range)) /* job size */
-                               / (1::bigint<<piece_size) -- /* intdiv by piece size */
-                       )
-                       + ( /* is there a partial piece? */
-                               ( /* bytes left after last full piece */
-                                       (upper(upload_range) - lower(upload_range)) /* job size */
-                                       % (1::bigint<<piece_size) -- /* mod piece size */
-                               ) != 0
-                       )::integer::bigint
-               )
-               * octet_length(digest(''::bytea, algo)) /* bytes per digest */
-               = octet_length(hashes)
-       )
-);
-
--- example of creating a series of 1gb jobs covering a single entity
-INSERT INTO gdrive.gdrive_hash_job (upload_id, upload_range, hash_pieces, hash_blocks) SELECT upload_id, int8range(lower(upload_range)+blknum*(1::bigint<<30), lower(upload_range)+LEAST((blknum+1)*(1::bigint<<30), length)), false, true FROM entity JOIN gdrive.slab ON (entity.entity_id=slab.entity_id AND slab.entity_range=int8range(0,entity.length)), LATERAL generate_series(0, (length/(1::bigint<<30)+(length%(1::bigint<<30)!=0)::int)-1) t1 (blknum) WHERE entity.entity_id IN ('91a02a99-c664-4470-a531-a76d8ba13907');
-
--- example of creating 1gb hash jobs covering all (named non-torrent entities with slabs)
-INSERT INTO gdrive.gdrive_hash_job (upload_id, upload_range, hash_pieces, hash_blocks) SELECT upload_id, int8range(lower(upload_range)+blknum*(1::bigint<<30), lower(upload_range)+LEAST((blknum+1)*(1::bigint<<30), length)), false, true FROM entity JOIN gdrive.slab ON (entity.entity_id=slab.entity_id AND slab.entity_range=int8range(0,entity.length)) JOIN (SELECT entity_id FROM entity_name JOIN gdrive.slab USING (entity_id) EXCEPT SELECT entity_id FROM bittorrent.torrent_data_entity) t ON t.entity_id=entity.entity_id, LATERAL generate_series(0, (length/(1::bigint<<30)+(length%(1::bigint<<30)!=0)::int)-1) t1 (blknum);
-
--- clean up crashed workers
-DELETE FROM gdrive.gdrive_hash_worker WHERE NOT EXISTS (SELECT * FROM pg_stat_activity WHERE application_name='gdrive_hash_worker' AND pid=backend_pid);
-
--- keep access token renewed (from bingus)
--- $ while true; do PGDATABASE=eros PGUSER=eros PGSSLCERT=.config/tls/anders\@bingus/cert.pem PGSSLKEY=.config/tls/anders\@bingus/privkey.pem PGHOST=hatnd.internet6.net. PGPORT=5433 python3 eros_gdrive_access_token.py && sleep 1800 || echo Hit Ctrl+C to stop && sleep 5 || break; done
-
--- launch a worker over ssh (from bingus)
--- $ echo <ssl key passphrase> | ssh -J acornell@cs.oberlin.edu acornell@<machine> source .local/bin/local_env \&\& PGDATABASE=eros PGUSER=occs PGPORT=5433 PGSSLCERT=.config/ssl/certs/acornell@cs.oberlin.edu/cert.pem PGSSLKEY=.config/ssl/private/acornell@cs.oberlin.edu/privkey.pem PGHOST=hatnd.internet6.net. PGAPPNAME=gdrive_hash_worker gdrive_hash_worker
-
--- deployment (from repo root)
--- $ sftp bingus
--- > put python/scripts/gdrive_access_token.py eros_gdrive_access_token.py
---
--- $ sftp acornell@cs.oberlin.edu
--- > put python/scripts/gdrive_hash_worker .local/bin/gdrive_hash_worker
diff --git a/misc/agg_cte.sql b/misc/agg_cte.sql
new file mode 100644 (file)
index 0000000..5839ad5
--- /dev/null
@@ -0,0 +1,20 @@
+-- Conclusion: You wouldn't want to do tree traversal with a CTE.
+
+WITH RECURSIVE
+       tree (node, parent_node, value) AS (VALUES
+               ('root', NULL, '*'),
+               ('interior', 'root', '+'),
+               ('leaf1', 'interior', '1'),
+               ('leaf2', 'interior', '2'),
+               ('leaf3', 'root', '3')
+       ),
+       cte AS (
+               SELECT node, parent_node, value AS serialized
+                       FROM tree
+                       WHERE node LIKE 'leaf%'
+               UNION ALL
+               SELECT child.parent_node, parent.parent_node, '(' || parent.value/*string_agg(child.serialized, parent.value)*/ || ')'
+               FROM cte AS child JOIN tree AS parent ON child.parent_node = parent.node
+               GROUP BY child.parent_node, parent.parent_node, parent.value
+       )
+SELECT node, parent_node, serialized FROM cte;
diff --git a/scripts/entity_download b/scripts/entity_download
new file mode 100755 (executable)
index 0000000..95b16bf
--- /dev/null
@@ -0,0 +1,29 @@
+#!/usr/bin/python3
+
+import psycopg2
+from sys import argv
+import subprocess
+
+dbconn = psycopg2.connect('')
+
+for entity_id in argv[1:]:
+       db = dbconn.cursor()
+
+       db.execute("SELECT gdrive_file_id, access_token, encryption_key, lower(upload_range), upper(upload_range) FROM gdrive.entity_instance JOIN gdrive.upload USING (upload_id), gdrive.access_token WHERE entity_id=%s AND access_token.id=1 AND access_token.expires > CURRENT_TIMESTAMP", [bytes.fromhex(entity_id)])
+
+       [(gdrive_file,access_token,encrypt_key,upload_start,upload_end)] = db.fetchall()
+
+       upload_waste_start = upload_start % 16
+       upload_blk_start = upload_start - upload_waste_start
+       upload_waste_end = (16 - upload_end % 16 ) % 16
+       upload_blk_end = upload_end + upload_waste_end + 15
+
+       fifo = subprocess.run(["/bin/mktemp", "-p", "./"], check=True, stdout=subprocess.PIPE, universal_newlines=True).stdout[0:-1]
+
+       subprocess.run(["/usr/bin/fallocate", "--zero-range", "--keep-size", "--offset", "0", "--length", str(upload_end - upload_start), fifo], check=True)
+
+       subprocess.run(["curl '--fail' '-H' 'Authorization: Bearer {}' '-H' 'Range: bytes={}-{}' 'https://www.googleapis.com/drive/v3/files/{}?alt=media' | openssl 'enc' '-aes-128-cbc' '-d' '-nopad' '-K' '{}' '-iv' \"`head --bytes 16 | xxd -p`\" | dd 'bs=16M' 'skip={}' 'count={}' 'iflag=skip_bytes,count_bytes,fullblock' 'status=none' 'of={}'".format(access_token, upload_blk_start, upload_blk_end, gdrive_file, encrypt_key.hex(), upload_waste_start, upload_end - upload_start, fifo)], check=True, shell=True)
+
+       subprocess.run(["/bin/mv", "--no-clobber", "-v", fifo, "./{}".format(entity_id)], check=True)
+
+       dbconn.commit()
diff --git a/sql/diskjumble.sql b/sql/diskjumble.sql
new file mode 100644 (file)
index 0000000..9133db0
--- /dev/null
@@ -0,0 +1,17 @@
+CREATE TABLE diskjumble.disk (
+       disk_id serial PRIMARY KEY,
+       dev_uuid uuid NOT NULL UNIQUE,
+       dev_serial text,
+       num_blocks bigint NOT NULL,
+       failed boolean NOT NULL DEFAULT false
+);
+
+CREATE TABLE diskjumble.slab (
+       slab_id serial PRIMARY KEY,
+       disk_id integer NOT NULL REFERENCES diskjumble.disk (disk_id),
+       disk_blocks int8range NOT NULL,
+       entity_id bytea NOT NULL,
+       entity_offset int8 NOT NULL,
+       crypt_key bytea,
+       realized boolean DEFAULT false
+);
diff --git a/sql/diskjumble_scheduling.sql b/sql/diskjumble_scheduling.sql
new file mode 100644 (file)
index 0000000..78ae5ac
--- /dev/null
@@ -0,0 +1,21 @@
+SELECT entity_id, copies_wanted, copies_scheduled, copies_real FROM (SELECT entity_id, count(*) AS copies_scheduled, count(*) FILTER (WHERE realized) AS copies_real FROM (SELECT entity_id, disk_id, coalesce(bool_or(realized), false) AS realized FROM diskjumble.slab JOIN entity USING (entity_id) WHERE entity_offset=0 AND upper(disk_blocks)-lower(disk_blocks)=(length+16383)/16384 GROUP BY entity_id, disk_id) t0 GROUP BY entity_id) t1 JOIN diskjumble.entity_policy USING (entity_id) WHERE copies_scheduled>0;
+
+SELECT entity_id, pg_size_pretty(length) AS "size", copies_wanted, coalesce(copies_scheduled, 0) AS copies_scheduled, coalesce(copies_real, 0) AS copies_real FROM entity JOIN diskjumble.entity_policy USING (entity_id) LEFT JOIN (SELECT entity_id, count(*) AS copies_scheduled, count(*) FILTER (WHERE realized) AS copies_real FROM (SELECT entity_id, disk_id, coalesce(bool_or(realized), false) AS realized FROM diskjumble.slab JOIN entity USING (entity_id) WHERE entity_offset=0 AND upper(disk_blocks)-lower(disk_blocks)=(length+16383)/16384 GROUP BY entity_id, disk_id) t0 GROUP BY entity_id) t1 USING (entity_id) WHERE priority > 1000
+
+CREATE OR REPLACE FUNCTION diskjumble.schedule_by_policy(p_disk_id integer) RETURNS void LANGUAGE plpgsql AS $$
+DECLARE
+       p_block_start int8;
+       p_block_end int8;
+       p_entity_id bytea;
+       p_blocks int8;
+BEGIN
+       SELECT coalesce(max(upper(disk_blocks)), 0) INTO STRICT p_block_start FROM diskjumble.slab WHERE disk_id=p_disk_id;
+       SELECT num_blocks INTO STRICT p_block_end FROM diskjumble.disk WHERE disk_id=p_disk_id;
+       FOR p_entity_id, p_blocks IN (SELECT entity_id, (length+16383)/16384 AS blocks FROM entity JOIN diskjumble.entity_policy USING (entity_id) LEFT JOIN (SELECT entity_id, count(*) AS copies_scheduled FROM (SELECT DISTINCT entity_id, disk_id FROM diskjumble.slab JOIN entity USING (entity_id) WHERE entity_offset=0 AND upper(disk_blocks)-lower(disk_blocks)=(length+16383)/16384) t0 GROUP BY entity_id HAVING count(*) FILTER (WHERE disk_id=p_disk_id)=0) t1 USING (entity_id) WHERE coalesce(copies_scheduled, 0) < copies_wanted ORDER BY priority DESC, blocks DESC, copies_wanted - coalesce(copies_scheduled, 0) DESC) LOOP
+               IF p_block_start + p_blocks <= p_block_end THEN
+                       INSERT INTO diskjumble.slab (disk_id, disk_blocks, entity_id, entity_offset) SELECT p_disk_id, int8range(p_block_start, p_block_start + p_blocks), p_entity_id, 0;
+                       SELECT p_block_start + p_blocks INTO STRICT p_block_start;
+               END IF;
+       END LOOP;
+END
+$$;
diff --git a/sql/diskjumble_writing.sql b/sql/diskjumble_writing.sql
new file mode 100644 (file)
index 0000000..d065807
--- /dev/null
@@ -0,0 +1,3 @@
+-- set slab `realized` flags after a verification run
+
+UPDATE diskjumble.slab SET realized=true FROM diskjumble.verify_pass WHERE verify_pass.disk_id=slab.disk_id AND slab.disk_blocks <@ verify_pass.disk_sectors AND NOT realized;
diff --git a/sql/gdrive_download.sql b/sql/gdrive_download.sql
new file mode 100644 (file)
index 0000000..e7c260a
--- /dev/null
@@ -0,0 +1,49 @@
+CREATE TABLE entity_download_control (
+       repository integer PRIMARY KEY,
+       bw_limit int8 -- in KiB/s; no limit if null
+);
+
+-- rows in this table are entities we want to be downloaded
+-- download higher-priority entities first
+-- if an entity is downloaded but not in this table, it should be deleted
+CREATE TABLE entity_download_policy (
+       repository integer not null,
+       entity_id bytea not null references entity (entity_id),
+       priority integer not null default 1000,
+       PRIMARY KEY (repository, entity_id)
+);
+
+-- rows in this table are entities we have
+-- this table is advisory: the source of truth is the actual files present in the repository (directory.)
+-- it's your responsibility to keep the table up to date.
+-- if I rm a file, download program should eventually notice and delete any corresponding row in this table.
+CREATE TABLE entity_instance_local (
+       repository integer not null,
+       entity_id bytea not null references entity (entity_id),
+       PRIMARY KEY (repository, entity_id)
+);
+
+/*
+
+Watch out for capacity-based I/O errors when doing filesystem operations: "disk quota exceeded" or "no more space on device"
+Treat this as a temporary error: retry/continue until the problem is resolved.
+Gotcha: downloading a large gdrive file (or large byte range of a large file) counts against my gdrive daily download quota even if you cancel the request (e.g. break the TCP connection) early on in the response. So don't do it over and over while you wait for me to free up disk space.
+For other kinds of I/O error a crash is ok.
+
+If Google responds with 401 or 403, treat this as a temporary error. 401 is probably an expired access token; 403 is probably a reached download quota.
+
+Retrieve access token: SELECT access_token FROM gdrive.access_token WHERE id=1 AND expires > CURRENT_TIMESTAMP;
+
+Example code for downloading an entity is in scripts/entity_download in the eros repository. It's even python kinda!
+
+Create files in the given temp path; when download is finished, rename/mv into place (aToMiC hullalulla)
+
+python3 -m diskjumble.downloadsvc --repository <int> --repository-path <path> --temp-path <path> [--poll-interval <int>] [--chunk-size <int>]
+
+When there's no work to do (or there was just a temporary error) wait poll-interval seconds for the situation to improve and try again (resuming download from where it left off, if applicable.)
+
+If --chunk-size is specified then don't ask for more than that many bytes in a single HTTP request to Google. (No need to be exact.)
+
+Filenames in a repository are the hex representation of the entity_id.
+
+*/
diff --git a/sql/gdrive_hash_worker.sql b/sql/gdrive_hash_worker.sql
new file mode 100644 (file)
index 0000000..a1bb98e
--- /dev/null
@@ -0,0 +1,61 @@
+CREATE TABLE gdrive.gdrive_hash_job (
+       job_id serial PRIMARY KEY,
+       upload_id uuid NOT NULL REFERENCES gdrive.upload (upload_id),
+       upload_range int8range NOT NULL,
+       piece_size integer, -- log2,
+       hash_pieces boolean NOT NULL DEFAULT true,
+       hash_blocks boolean NOT NULL DEFAULT true,
+       done boolean NOT NULL default false
+);
+
+CREATE TABLE gdrive.gdrive_hash_worker (
+       backend_pid integer PRIMARY KEY,
+       job_id integer REFERENCES gdrive.gdrive_hash_job (job_id),
+       bw_limit integer DEFAULT 8192 -- in kB/s
+);
+
+CREATE TABLE gdrive.gdrive_hashes (
+       upload_id uuid NOT NULL REFERENCES gdrive.upload (upload_id),
+       upload_range int8range NOT NULL,
+       piece_size integer NOT NULL, -- log2
+       algo text,
+       hashes bytea,
+       CHECK (
+               ( /* expected number of digests */
+                       ( /* number of whole pieces */
+                               (upper(upload_range) - lower(upload_range)) /* job size */
+                               / (1::bigint<<piece_size) -- /* intdiv by piece size */
+                       )
+                       + ( /* is there a partial piece? */
+                               ( /* bytes left after last full piece */
+                                       (upper(upload_range) - lower(upload_range)) /* job size */
+                                       % (1::bigint<<piece_size) -- /* mod piece size */
+                               ) != 0
+                       )::integer::bigint
+               )
+               * octet_length(digest(''::bytea, algo)) /* bytes per digest */
+               = octet_length(hashes)
+       )
+);
+
+-- example of creating a series of 1gb jobs covering a single entity
+INSERT INTO gdrive.gdrive_hash_job (upload_id, upload_range, hash_pieces, hash_blocks) SELECT upload_id, int8range(lower(upload_range)+blknum*(1::bigint<<30), lower(upload_range)+LEAST((blknum+1)*(1::bigint<<30), length)), false, true FROM entity JOIN gdrive.slab ON (entity.entity_id=slab.entity_id AND slab.entity_range=int8range(0,entity.length)), LATERAL generate_series(0, (length/(1::bigint<<30)+(length%(1::bigint<<30)!=0)::int)-1) t1 (blknum) WHERE entity.entity_id IN ('91a02a99-c664-4470-a531-a76d8ba13907');
+
+-- example of creating 1gb hash jobs covering all (named non-torrent entities with slabs)
+INSERT INTO gdrive.gdrive_hash_job (upload_id, upload_range, hash_pieces, hash_blocks) SELECT upload_id, int8range(lower(upload_range)+blknum*(1::bigint<<30), lower(upload_range)+LEAST((blknum+1)*(1::bigint<<30), length)), false, true FROM entity JOIN gdrive.slab ON (entity.entity_id=slab.entity_id AND slab.entity_range=int8range(0,entity.length)) JOIN (SELECT entity_id FROM entity_name JOIN gdrive.slab USING (entity_id) EXCEPT SELECT entity_id FROM bittorrent.torrent_data_entity) t ON t.entity_id=entity.entity_id, LATERAL generate_series(0, (length/(1::bigint<<30)+(length%(1::bigint<<30)!=0)::int)-1) t1 (blknum);
+
+-- clean up crashed workers
+DELETE FROM gdrive.gdrive_hash_worker WHERE NOT EXISTS (SELECT * FROM pg_stat_activity WHERE application_name='gdrive_hash_worker' AND pid=backend_pid);
+
+-- keep access token renewed (from bingus)
+-- $ while true; do PGDATABASE=eros PGUSER=eros PGSSLCERT=.config/tls/anders\@bingus/cert.pem PGSSLKEY=.config/tls/anders\@bingus/privkey.pem PGHOST=hatnd.internet6.net. PGPORT=5433 python3 eros_gdrive_access_token.py && sleep 1800 || echo Hit Ctrl+C to stop && sleep 5 || break; done
+
+-- launch a worker over ssh (from bingus)
+-- $ echo <ssl key passphrase> | ssh -J acornell@cs.oberlin.edu acornell@<machine> source .local/bin/local_env \&\& PGDATABASE=eros PGUSER=occs PGPORT=5433 PGSSLCERT=.config/ssl/certs/acornell@cs.oberlin.edu/cert.pem PGSSLKEY=.config/ssl/private/acornell@cs.oberlin.edu/privkey.pem PGHOST=hatnd.internet6.net. PGAPPNAME=gdrive_hash_worker gdrive_hash_worker
+
+-- deployment (from repo root)
+-- $ sftp bingus
+-- > put python/scripts/gdrive_access_token.py eros_gdrive_access_token.py
+--
+-- $ sftp acornell@cs.oberlin.edu
+-- > put python/scripts/gdrive_hash_worker .local/bin/gdrive_hash_worker
diff --git a/sql/merkle_entities.sql b/sql/merkle_entities.sql
new file mode 100644 (file)
index 0000000..d6ea6c6
--- /dev/null
@@ -0,0 +1,76 @@
+CREATE VIEW entityv2_leaf_hashes AS
+SELECT
+       entity_id,
+       int8range((lower(th.upload_range) - lower(tei.upload_range))/16384,
+                       (upper(th.upload_range) - lower(tei.upload_range)+16383)/16384) AS block_range,
+       hashes
+FROM gdrive.entity_instance tei
+JOIN gdrive.gdrive_hashes th USING (upload_id)
+WHERE
+       th.piece_size=14 AND
+       th.algo='sha256' AND
+       -- alignment: range starts on the start of a block of the entity
+       (lower(th.upload_range) - lower(tei.upload_range)) % 16384 = 0 AND
+       -- range contained within slab
+       th.upload_range <@ tei.upload_range AND
+       -- short block only ok if ends at end of slab/entity
+       ((upper(th.upload_range) - lower(th.upload_range)) % 16384 = 0
+               OR upper(th.upload_range) = upper(tei.upload_range));
+
+CREATE OR REPLACE FUNCTION merkle_iterate_zero_hash(levels int)
+RETURNS bytea LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE AS $$
+DECLARE
+       hash bytea := '\x0000000000000000000000000000000000000000000000000000000000000000';
+BEGIN
+       IF levels < 0 OR levels IS NULL THEN
+               RAISE EXCEPTION 'levels must be nonnegative';
+       END IF;
+       FOR _ IN 1..levels LOOP
+               SELECT sha256(hash || hash) INTO STRICT hash;
+       END LOOP;
+       RETURN hash;
+END;
+$$;
+
+CREATE OR REPLACE FUNCTION merkle_root_from_level(hashes bytea, level int4, force_iterations int4 DEFAULT 0)
+RETURNS bytea LANGUAGE plpgsql IMMUTABLE STRICT PARALLEL SAFE AS $$
+DECLARE
+       len_hashes int4;
+       iteration int4 := 0;
+BEGIN
+       WHILE octet_length(hashes) > 32 OR iteration < force_iterations LOOP
+               SELECT octet_length(hashes) INTO STRICT len_hashes;
+               SELECT string_agg(sha256(
+                       CASE WHEN loc + 63 > len_hashes
+                               THEN substring(hashes, loc, 32) || merkle_iterate_zero_hash(level + iteration)
+                               ELSE substring(hashes, loc, 64)
+                       END), NULL ORDER BY loc)
+               INTO STRICT hashes FROM generate_series(1, len_hashes, 64) ts(loc);
+               iteration := iteration + 1;
+       END LOOP;
+       RETURN hashes;
+END;
+$$;
+
+CREATE OR REPLACE FUNCTION entityv2_merkle_root(uuid)
+RETURNS bytea LANGUAGE plpgsql STABLE STRICT PARALLEL SAFE AS $$
+DECLARE
+       blocks int8;
+       chunks int4;
+       chunk_hashes bytea := '\x';
+BEGIN
+       SELECT (length+16383)/16384, (length+1073741823)/1073741824 INTO STRICT blocks, chunks FROM entity WHERE entity_id=$1;
+       FOR chunk IN 0..chunks-1 LOOP
+               SELECT chunk_hashes || merkle_root_from_level(hashes, 0, CASE WHEN chunks>0 THEN 16 ELSE 0 END)
+               INTO STRICT chunk_hashes
+               FROM entityv2_leaf_hashes
+               WHERE entity_uuid = $1 AND
+               block_range = int8range(chunk*65536, LEAST(chunk*65536+65536, blocks))
+               LIMIT 1;
+       END LOOP;
+       RETURN merkle_root_from_level(chunk_hashes, 16);
+END;
+$$;
+
+/* this takes a fricken long time (like an hour) */
+CREATE TABLE entityv2 AS SELECT entityv2_merkle_root(entity_uuid) AS entity_id, entity_uuid FROM entityv2_leaf_hashes GROUP BY entity_uuid;