From: Anders Cornell Date: Tue, 29 Mar 2022 00:44:03 +0000 (-0400) Subject: insert cod X-Git-Url: https://jcornell.net/gitweb/gitweb.cgi?a=commitdiff_plain;h=f11447cc7eb63a68d4df16f2ef17b4f27fb1d296;p=eros.git insert cod small changes to verify and scrape add sql files for useful info add classic entity_download script add disk_jumble.write program add disk_jumble.disk program --- diff --git a/disk_jumble/src/disk_jumble/disk.py b/disk_jumble/src/disk_jumble/disk.py index 8ddb41c..dd05052 100644 --- a/disk_jumble/src/disk_jumble/disk.py +++ b/disk_jumble/src/disk_jumble/disk.py @@ -24,7 +24,7 @@ class Device: size_in_512b = int(open('/sys/dev/block/{}/size'.format(self.rdev_str)).read()) if self.sector_size == 4096: assert size_in_512b % 8 == 0 - self.num_sectors = size_in_512b / 8 + self.num_sectors = size_in_512b // 8 self.header_sectors = 256 elif self.sector_size == 512: self.num_sectors = size_in_512b @@ -57,15 +57,13 @@ class Device: if header_data is None or header_data[0] != dev_uuid: os.close(dev_fd) continue - assert header_data[3] == self.num_blocks except: os.close(dev_fd) raise else: - self.device = device - break + return device else: - raise RuntimeError("Didn't find disk {}".format(disk_id)) + raise RuntimeError("no_matching_disk") def read_header(self): assert self.fd is not None @@ -80,9 +78,9 @@ class Device: dev_uuid = uuid.UUID(bytes=dev_uuid) assert sector_size == self.sector_size assert header_sectors == self.header_sectors - sectors_per_block = 16384 / self.sector_size + sectors_per_block = 16384 // self.sector_size assert data_sectors % sectors_per_block == 0 - num_blocks = data_sectors / sectors_per_block + num_blocks = data_sectors // sectors_per_block assert self.header_sectors + data_sectors <= self.num_sectors return (dev_uuid, self.sector_size, self.header_sectors, num_blocks) else: @@ -92,14 +90,14 @@ class Device: if (not force) and self.read_header(): raise RuntimeError('Refusing to overwrite existing diskjumble header') data_sectors_avail = self.num_sectors - self.header_sectors - sectors_per_block = 16384 / self.sector_size + sectors_per_block = 16384 // self.sector_size if num_blocks is None: num_blocks = data_sectors_avail // sectors_per_block data_sectors = num_blocks * sectors_per_block assert self.header_sectors + data_sectors <= self.num_sectors - header_data = struct.pack('>16s16sQQQ', Device.magic, dev_uuid.bytes, self.sector_size, self.header_sectors, data_sectors).ljust(sector_size, b'\0') + header_data = struct.pack('>16s16sQQQ', Device.magic, dev_uuid.bytes, self.sector_size, self.header_sectors, data_sectors).ljust(self.sector_size, b'\0') for i in range(0, self.header_sectors): - os.pwrite(self.fd, header_data, i*sector_size) + os.pwrite(self.fd, header_data, i*self.sector_size) def close(self): if self.fd: @@ -129,7 +127,7 @@ class Disk: dev_uuid = header_data[0] cursor.execute('''SELECT disk_id, dev_uuid, dev_serial, num_blocks FROM diskjumble.disk - WHERE coalesce(disk_id=%s, true) AND coalesce(dev_uuid=%s, true)''', (disk_id, dev_uuid)) + WHERE coalesce(disk_id=%s, true) AND coalesce(dev_uuid=%s, true)''', (disk_id, str(dev_uuid) if dev_uuid else None)) [(row)] = cursor.fetchall() disk_id = row[0] dev_uuid = uuid.UUID(row[1]) @@ -152,7 +150,7 @@ class Disk: device = Device(os.open(devname, os.O_RDWR)) data_sectors_avail = device.num_sectors - device.header_sectors - sectors_per_block = 16384 / device.sector_size + sectors_per_block = 16384 // device.sector_size if num_blocks is None: num_blocks = data_sectors_avail // sectors_per_block @@ -165,7 +163,7 @@ class Disk: self.device.close() def write_header(self, force=False): - self.device.write_header(self.dev_uuid, force) + self.device.write_header(self.dev_uuid, force=force) def mapping_name(self): assert self.disk_id is not None @@ -206,5 +204,5 @@ if __name__ == '__main__': if args.up: inst.create_mapping() if args.new: - inst.save(conn) + inst.save() conn.commit() diff --git a/disk_jumble/src/disk_jumble/scrape.py b/disk_jumble/src/disk_jumble/scrape.py index 4143b8a..6130ed3 100644 --- a/disk_jumble/src/disk_jumble/scrape.py +++ b/disk_jumble/src/disk_jumble/scrape.py @@ -68,7 +68,7 @@ def scrape_batch(tracker: Tracker, info_hashes: Iterable[bytes], passkey: str) - if __name__ == "__main__": - PSQL_PARAMS = ["dbname", "user", "password", "host", "port"] + PSQL_PARAMS = ["dbname", "user", "password", "host", "port", "service"] def tracker_name(val): matches = [n for n in TRACKERS.keys() if n.lower() == val.lower()] diff --git a/disk_jumble/src/disk_jumble/verify.py b/disk_jumble/src/disk_jumble/verify.py index 63f9ffb..2f61e3c 100644 --- a/disk_jumble/src/disk_jumble/verify.py +++ b/disk_jumble/src/disk_jumble/verify.py @@ -242,12 +242,12 @@ def _gen_verify_results(conn, disk_id: int, disk_file: io.BufferedIOBase, read_s *, int8range( slab.entity_offset / %(block_size)s, - salb.entity_offset / %(block_size)s + upper(slab.disk_blocks) - lower(slab.disk_blocks) + slab.entity_offset / %(block_size)s + upper(slab.disk_blocks) - lower(slab.disk_blocks) ) as entity_blocks from diskjumble.slab ) select - entity_id, + elh.entity_id, generate_series( lower(slab_plus.entity_blocks * elh.block_range), upper(slab_plus.entity_blocks * elh.block_range) - 1 @@ -274,7 +274,7 @@ def _gen_verify_results(conn, disk_id: int, disk_file: io.BufferedIOBase, read_s slab_plus.entity_id = elh.entity_id and slab_plus.entity_blocks && elh.block_range ) - left outer join public.entity using (entity_id) + left outer join public.entity on elh.entity_id=entity.entity_id ) where slab_plus.disk_id = %(disk_id)s order by sector diff --git a/disk_jumble/src/disk_jumble/write.py b/disk_jumble/src/disk_jumble/write.py new file mode 100644 index 0000000..9aea69e --- /dev/null +++ b/disk_jumble/src/disk_jumble/write.py @@ -0,0 +1,25 @@ +import subprocess +import psycopg2 +import contextlib + +if __name__ == '__main__': + import argparse + arg_parser = argparse.ArgumentParser() + arg_parser.add_argument('disk_id', help='integer disk_id', type=int) + arg_parser.add_argument('--repo', help='path to find entities', required=True) + args = arg_parser.parse_args() + + with contextlib.closing(psycopg2.connect("")) as conn, conn.cursor() as cursor: + cursor.execute("SELECT disk_id, lower(disk_blocks), upper(disk_blocks)-lower(disk_blocks), entity_id, entity_offset FROM diskjumble.slab WHERE disk_id=%s AND NOT realized ORDER BY lower(disk_blocks)", (args.disk_id,)) + for (disk_id, disk_offset, disk_blocks, entity_id, entity_offset) in cursor.fetchall(): + subprocess.run(['dd', + 'if={}/{}'.format(args.repo, entity_id.hex()), + 'skip={}'.format(entity_offset), + 'count={}'.format(16384*disk_blocks), + 'iflag=skip_bytes,count_bytes,fullblock', + 'conv=nocreat', + 'of=/dev/mapper/diskjumble-{}'.format(disk_id), + 'oflag=seek_bytes', + 'seek={}'.format(16384*disk_offset), + 'bs=1M', 'status=progress' + ], check=True) diff --git a/doc/gdrive_hash_worker.sql b/doc/gdrive_hash_worker.sql deleted file mode 100644 index a1bb98e..0000000 --- a/doc/gdrive_hash_worker.sql +++ /dev/null @@ -1,61 +0,0 @@ -CREATE TABLE gdrive.gdrive_hash_job ( - job_id serial PRIMARY KEY, - upload_id uuid NOT NULL REFERENCES gdrive.upload (upload_id), - upload_range int8range NOT NULL, - piece_size integer, -- log2, - hash_pieces boolean NOT NULL DEFAULT true, - hash_blocks boolean NOT NULL DEFAULT true, - done boolean NOT NULL default false -); - -CREATE TABLE gdrive.gdrive_hash_worker ( - backend_pid integer PRIMARY KEY, - job_id integer REFERENCES gdrive.gdrive_hash_job (job_id), - bw_limit integer DEFAULT 8192 -- in kB/s -); - -CREATE TABLE gdrive.gdrive_hashes ( - upload_id uuid NOT NULL REFERENCES gdrive.upload (upload_id), - upload_range int8range NOT NULL, - piece_size integer NOT NULL, -- log2 - algo text, - hashes bytea, - CHECK ( - ( /* expected number of digests */ - ( /* number of whole pieces */ - (upper(upload_range) - lower(upload_range)) /* job size */ - / (1::bigint< | ssh -J acornell@cs.oberlin.edu acornell@ source .local/bin/local_env \&\& PGDATABASE=eros PGUSER=occs PGPORT=5433 PGSSLCERT=.config/ssl/certs/acornell@cs.oberlin.edu/cert.pem PGSSLKEY=.config/ssl/private/acornell@cs.oberlin.edu/privkey.pem PGHOST=hatnd.internet6.net. PGAPPNAME=gdrive_hash_worker gdrive_hash_worker - --- deployment (from repo root) --- $ sftp bingus --- > put python/scripts/gdrive_access_token.py eros_gdrive_access_token.py --- --- $ sftp acornell@cs.oberlin.edu --- > put python/scripts/gdrive_hash_worker .local/bin/gdrive_hash_worker diff --git a/misc/agg_cte.sql b/misc/agg_cte.sql new file mode 100644 index 0000000..5839ad5 --- /dev/null +++ b/misc/agg_cte.sql @@ -0,0 +1,20 @@ +-- Conclusion: You wouldn't want to do tree traversal with a CTE. + +WITH RECURSIVE + tree (node, parent_node, value) AS (VALUES + ('root', NULL, '*'), + ('interior', 'root', '+'), + ('leaf1', 'interior', '1'), + ('leaf2', 'interior', '2'), + ('leaf3', 'root', '3') + ), + cte AS ( + SELECT node, parent_node, value AS serialized + FROM tree + WHERE node LIKE 'leaf%' + UNION ALL + SELECT child.parent_node, parent.parent_node, '(' || parent.value/*string_agg(child.serialized, parent.value)*/ || ')' + FROM cte AS child JOIN tree AS parent ON child.parent_node = parent.node + GROUP BY child.parent_node, parent.parent_node, parent.value + ) +SELECT node, parent_node, serialized FROM cte; diff --git a/scripts/entity_download b/scripts/entity_download new file mode 100755 index 0000000..95b16bf --- /dev/null +++ b/scripts/entity_download @@ -0,0 +1,29 @@ +#!/usr/bin/python3 + +import psycopg2 +from sys import argv +import subprocess + +dbconn = psycopg2.connect('') + +for entity_id in argv[1:]: + db = dbconn.cursor() + + db.execute("SELECT gdrive_file_id, access_token, encryption_key, lower(upload_range), upper(upload_range) FROM gdrive.entity_instance JOIN gdrive.upload USING (upload_id), gdrive.access_token WHERE entity_id=%s AND access_token.id=1 AND access_token.expires > CURRENT_TIMESTAMP", [bytes.fromhex(entity_id)]) + + [(gdrive_file,access_token,encrypt_key,upload_start,upload_end)] = db.fetchall() + + upload_waste_start = upload_start % 16 + upload_blk_start = upload_start - upload_waste_start + upload_waste_end = (16 - upload_end % 16 ) % 16 + upload_blk_end = upload_end + upload_waste_end + 15 + + fifo = subprocess.run(["/bin/mktemp", "-p", "./"], check=True, stdout=subprocess.PIPE, universal_newlines=True).stdout[0:-1] + + subprocess.run(["/usr/bin/fallocate", "--zero-range", "--keep-size", "--offset", "0", "--length", str(upload_end - upload_start), fifo], check=True) + + subprocess.run(["curl '--fail' '-H' 'Authorization: Bearer {}' '-H' 'Range: bytes={}-{}' 'https://www.googleapis.com/drive/v3/files/{}?alt=media' | openssl 'enc' '-aes-128-cbc' '-d' '-nopad' '-K' '{}' '-iv' \"`head --bytes 16 | xxd -p`\" | dd 'bs=16M' 'skip={}' 'count={}' 'iflag=skip_bytes,count_bytes,fullblock' 'status=none' 'of={}'".format(access_token, upload_blk_start, upload_blk_end, gdrive_file, encrypt_key.hex(), upload_waste_start, upload_end - upload_start, fifo)], check=True, shell=True) + + subprocess.run(["/bin/mv", "--no-clobber", "-v", fifo, "./{}".format(entity_id)], check=True) + + dbconn.commit() diff --git a/sql/diskjumble.sql b/sql/diskjumble.sql new file mode 100644 index 0000000..9133db0 --- /dev/null +++ b/sql/diskjumble.sql @@ -0,0 +1,17 @@ +CREATE TABLE diskjumble.disk ( + disk_id serial PRIMARY KEY, + dev_uuid uuid NOT NULL UNIQUE, + dev_serial text, + num_blocks bigint NOT NULL, + failed boolean NOT NULL DEFAULT false +); + +CREATE TABLE diskjumble.slab ( + slab_id serial PRIMARY KEY, + disk_id integer NOT NULL REFERENCES diskjumble.disk (disk_id), + disk_blocks int8range NOT NULL, + entity_id bytea NOT NULL, + entity_offset int8 NOT NULL, + crypt_key bytea, + realized boolean DEFAULT false +); diff --git a/sql/diskjumble_scheduling.sql b/sql/diskjumble_scheduling.sql new file mode 100644 index 0000000..78ae5ac --- /dev/null +++ b/sql/diskjumble_scheduling.sql @@ -0,0 +1,21 @@ +SELECT entity_id, copies_wanted, copies_scheduled, copies_real FROM (SELECT entity_id, count(*) AS copies_scheduled, count(*) FILTER (WHERE realized) AS copies_real FROM (SELECT entity_id, disk_id, coalesce(bool_or(realized), false) AS realized FROM diskjumble.slab JOIN entity USING (entity_id) WHERE entity_offset=0 AND upper(disk_blocks)-lower(disk_blocks)=(length+16383)/16384 GROUP BY entity_id, disk_id) t0 GROUP BY entity_id) t1 JOIN diskjumble.entity_policy USING (entity_id) WHERE copies_scheduled>0; + +SELECT entity_id, pg_size_pretty(length) AS "size", copies_wanted, coalesce(copies_scheduled, 0) AS copies_scheduled, coalesce(copies_real, 0) AS copies_real FROM entity JOIN diskjumble.entity_policy USING (entity_id) LEFT JOIN (SELECT entity_id, count(*) AS copies_scheduled, count(*) FILTER (WHERE realized) AS copies_real FROM (SELECT entity_id, disk_id, coalesce(bool_or(realized), false) AS realized FROM diskjumble.slab JOIN entity USING (entity_id) WHERE entity_offset=0 AND upper(disk_blocks)-lower(disk_blocks)=(length+16383)/16384 GROUP BY entity_id, disk_id) t0 GROUP BY entity_id) t1 USING (entity_id) WHERE priority > 1000 + +CREATE OR REPLACE FUNCTION diskjumble.schedule_by_policy(p_disk_id integer) RETURNS void LANGUAGE plpgsql AS $$ +DECLARE + p_block_start int8; + p_block_end int8; + p_entity_id bytea; + p_blocks int8; +BEGIN + SELECT coalesce(max(upper(disk_blocks)), 0) INTO STRICT p_block_start FROM diskjumble.slab WHERE disk_id=p_disk_id; + SELECT num_blocks INTO STRICT p_block_end FROM diskjumble.disk WHERE disk_id=p_disk_id; + FOR p_entity_id, p_blocks IN (SELECT entity_id, (length+16383)/16384 AS blocks FROM entity JOIN diskjumble.entity_policy USING (entity_id) LEFT JOIN (SELECT entity_id, count(*) AS copies_scheduled FROM (SELECT DISTINCT entity_id, disk_id FROM diskjumble.slab JOIN entity USING (entity_id) WHERE entity_offset=0 AND upper(disk_blocks)-lower(disk_blocks)=(length+16383)/16384) t0 GROUP BY entity_id HAVING count(*) FILTER (WHERE disk_id=p_disk_id)=0) t1 USING (entity_id) WHERE coalesce(copies_scheduled, 0) < copies_wanted ORDER BY priority DESC, blocks DESC, copies_wanted - coalesce(copies_scheduled, 0) DESC) LOOP + IF p_block_start + p_blocks <= p_block_end THEN + INSERT INTO diskjumble.slab (disk_id, disk_blocks, entity_id, entity_offset) SELECT p_disk_id, int8range(p_block_start, p_block_start + p_blocks), p_entity_id, 0; + SELECT p_block_start + p_blocks INTO STRICT p_block_start; + END IF; + END LOOP; +END +$$; diff --git a/sql/diskjumble_writing.sql b/sql/diskjumble_writing.sql new file mode 100644 index 0000000..d065807 --- /dev/null +++ b/sql/diskjumble_writing.sql @@ -0,0 +1,3 @@ +-- set slab `realized` flags after a verification run + +UPDATE diskjumble.slab SET realized=true FROM diskjumble.verify_pass WHERE verify_pass.disk_id=slab.disk_id AND slab.disk_blocks <@ verify_pass.disk_sectors AND NOT realized; diff --git a/sql/gdrive_download.sql b/sql/gdrive_download.sql new file mode 100644 index 0000000..e7c260a --- /dev/null +++ b/sql/gdrive_download.sql @@ -0,0 +1,49 @@ +CREATE TABLE entity_download_control ( + repository integer PRIMARY KEY, + bw_limit int8 -- in KiB/s; no limit if null +); + +-- rows in this table are entities we want to be downloaded +-- download higher-priority entities first +-- if an entity is downloaded but not in this table, it should be deleted +CREATE TABLE entity_download_policy ( + repository integer not null, + entity_id bytea not null references entity (entity_id), + priority integer not null default 1000, + PRIMARY KEY (repository, entity_id) +); + +-- rows in this table are entities we have +-- this table is advisory: the source of truth is the actual files present in the repository (directory.) +-- it's your responsibility to keep the table up to date. +-- if I rm a file, download program should eventually notice and delete any corresponding row in this table. +CREATE TABLE entity_instance_local ( + repository integer not null, + entity_id bytea not null references entity (entity_id), + PRIMARY KEY (repository, entity_id) +); + +/* + +Watch out for capacity-based I/O errors when doing filesystem operations: "disk quota exceeded" or "no more space on device" +Treat this as a temporary error: retry/continue until the problem is resolved. +Gotcha: downloading a large gdrive file (or large byte range of a large file) counts against my gdrive daily download quota even if you cancel the request (e.g. break the TCP connection) early on in the response. So don't do it over and over while you wait for me to free up disk space. +For other kinds of I/O error a crash is ok. + +If Google responds with 401 or 403, treat this as a temporary error. 401 is probably an expired access token; 403 is probably a reached download quota. + +Retrieve access token: SELECT access_token FROM gdrive.access_token WHERE id=1 AND expires > CURRENT_TIMESTAMP; + +Example code for downloading an entity is in scripts/entity_download in the eros repository. It's even python kinda! + +Create files in the given temp path; when download is finished, rename/mv into place (aToMiC hullalulla) + +python3 -m diskjumble.downloadsvc --repository --repository-path --temp-path [--poll-interval ] [--chunk-size ] + +When there's no work to do (or there was just a temporary error) wait poll-interval seconds for the situation to improve and try again (resuming download from where it left off, if applicable.) + +If --chunk-size is specified then don't ask for more than that many bytes in a single HTTP request to Google. (No need to be exact.) + +Filenames in a repository are the hex representation of the entity_id. + +*/ diff --git a/sql/gdrive_hash_worker.sql b/sql/gdrive_hash_worker.sql new file mode 100644 index 0000000..a1bb98e --- /dev/null +++ b/sql/gdrive_hash_worker.sql @@ -0,0 +1,61 @@ +CREATE TABLE gdrive.gdrive_hash_job ( + job_id serial PRIMARY KEY, + upload_id uuid NOT NULL REFERENCES gdrive.upload (upload_id), + upload_range int8range NOT NULL, + piece_size integer, -- log2, + hash_pieces boolean NOT NULL DEFAULT true, + hash_blocks boolean NOT NULL DEFAULT true, + done boolean NOT NULL default false +); + +CREATE TABLE gdrive.gdrive_hash_worker ( + backend_pid integer PRIMARY KEY, + job_id integer REFERENCES gdrive.gdrive_hash_job (job_id), + bw_limit integer DEFAULT 8192 -- in kB/s +); + +CREATE TABLE gdrive.gdrive_hashes ( + upload_id uuid NOT NULL REFERENCES gdrive.upload (upload_id), + upload_range int8range NOT NULL, + piece_size integer NOT NULL, -- log2 + algo text, + hashes bytea, + CHECK ( + ( /* expected number of digests */ + ( /* number of whole pieces */ + (upper(upload_range) - lower(upload_range)) /* job size */ + / (1::bigint< | ssh -J acornell@cs.oberlin.edu acornell@ source .local/bin/local_env \&\& PGDATABASE=eros PGUSER=occs PGPORT=5433 PGSSLCERT=.config/ssl/certs/acornell@cs.oberlin.edu/cert.pem PGSSLKEY=.config/ssl/private/acornell@cs.oberlin.edu/privkey.pem PGHOST=hatnd.internet6.net. PGAPPNAME=gdrive_hash_worker gdrive_hash_worker + +-- deployment (from repo root) +-- $ sftp bingus +-- > put python/scripts/gdrive_access_token.py eros_gdrive_access_token.py +-- +-- $ sftp acornell@cs.oberlin.edu +-- > put python/scripts/gdrive_hash_worker .local/bin/gdrive_hash_worker diff --git a/sql/merkle_entities.sql b/sql/merkle_entities.sql new file mode 100644 index 0000000..d6ea6c6 --- /dev/null +++ b/sql/merkle_entities.sql @@ -0,0 +1,76 @@ +CREATE VIEW entityv2_leaf_hashes AS +SELECT + entity_id, + int8range((lower(th.upload_range) - lower(tei.upload_range))/16384, + (upper(th.upload_range) - lower(tei.upload_range)+16383)/16384) AS block_range, + hashes +FROM gdrive.entity_instance tei +JOIN gdrive.gdrive_hashes th USING (upload_id) +WHERE + th.piece_size=14 AND + th.algo='sha256' AND + -- alignment: range starts on the start of a block of the entity + (lower(th.upload_range) - lower(tei.upload_range)) % 16384 = 0 AND + -- range contained within slab + th.upload_range <@ tei.upload_range AND + -- short block only ok if ends at end of slab/entity + ((upper(th.upload_range) - lower(th.upload_range)) % 16384 = 0 + OR upper(th.upload_range) = upper(tei.upload_range)); + +CREATE OR REPLACE FUNCTION merkle_iterate_zero_hash(levels int) +RETURNS bytea LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE AS $$ +DECLARE + hash bytea := '\x0000000000000000000000000000000000000000000000000000000000000000'; +BEGIN + IF levels < 0 OR levels IS NULL THEN + RAISE EXCEPTION 'levels must be nonnegative'; + END IF; + FOR _ IN 1..levels LOOP + SELECT sha256(hash || hash) INTO STRICT hash; + END LOOP; + RETURN hash; +END; +$$; + +CREATE OR REPLACE FUNCTION merkle_root_from_level(hashes bytea, level int4, force_iterations int4 DEFAULT 0) +RETURNS bytea LANGUAGE plpgsql IMMUTABLE STRICT PARALLEL SAFE AS $$ +DECLARE + len_hashes int4; + iteration int4 := 0; +BEGIN + WHILE octet_length(hashes) > 32 OR iteration < force_iterations LOOP + SELECT octet_length(hashes) INTO STRICT len_hashes; + SELECT string_agg(sha256( + CASE WHEN loc + 63 > len_hashes + THEN substring(hashes, loc, 32) || merkle_iterate_zero_hash(level + iteration) + ELSE substring(hashes, loc, 64) + END), NULL ORDER BY loc) + INTO STRICT hashes FROM generate_series(1, len_hashes, 64) ts(loc); + iteration := iteration + 1; + END LOOP; + RETURN hashes; +END; +$$; + +CREATE OR REPLACE FUNCTION entityv2_merkle_root(uuid) +RETURNS bytea LANGUAGE plpgsql STABLE STRICT PARALLEL SAFE AS $$ +DECLARE + blocks int8; + chunks int4; + chunk_hashes bytea := '\x'; +BEGIN + SELECT (length+16383)/16384, (length+1073741823)/1073741824 INTO STRICT blocks, chunks FROM entity WHERE entity_id=$1; + FOR chunk IN 0..chunks-1 LOOP + SELECT chunk_hashes || merkle_root_from_level(hashes, 0, CASE WHEN chunks>0 THEN 16 ELSE 0 END) + INTO STRICT chunk_hashes + FROM entityv2_leaf_hashes + WHERE entity_uuid = $1 AND + block_range = int8range(chunk*65536, LEAST(chunk*65536+65536, blocks)) + LIMIT 1; + END LOOP; + RETURN merkle_root_from_level(chunk_hashes, 16); +END; +$$; + +/* this takes a fricken long time (like an hour) */ +CREATE TABLE entityv2 AS SELECT entityv2_merkle_root(entity_uuid) AS entity_id, entity_uuid FROM entityv2_leaf_hashes GROUP BY entity_uuid;