From f11447cc7eb63a68d4df16f2ef17b4f27fb1d296 Mon Sep 17 00:00:00 2001 From: Anders Cornell Date: Mon, 28 Mar 2022 20:44:03 -0400 Subject: [PATCH] insert cod small changes to verify and scrape add sql files for useful info add classic entity_download script add disk_jumble.write program add disk_jumble.disk program --- disk_jumble/src/disk_jumble/disk.py | 26 +++++---- disk_jumble/src/disk_jumble/scrape.py | 2 +- disk_jumble/src/disk_jumble/verify.py | 6 +-- disk_jumble/src/disk_jumble/write.py | 25 +++++++++ misc/agg_cte.sql | 20 +++++++ scripts/entity_download | 29 ++++++++++ sql/diskjumble.sql | 17 ++++++ sql/diskjumble_scheduling.sql | 21 ++++++++ sql/diskjumble_writing.sql | 3 ++ sql/gdrive_download.sql | 49 +++++++++++++++++ {doc => sql}/gdrive_hash_worker.sql | 0 sql/merkle_entities.sql | 76 +++++++++++++++++++++++++++ 12 files changed, 256 insertions(+), 18 deletions(-) create mode 100644 disk_jumble/src/disk_jumble/write.py create mode 100644 misc/agg_cte.sql create mode 100755 scripts/entity_download create mode 100644 sql/diskjumble.sql create mode 100644 sql/diskjumble_scheduling.sql create mode 100644 sql/diskjumble_writing.sql create mode 100644 sql/gdrive_download.sql rename {doc => sql}/gdrive_hash_worker.sql (100%) create mode 100644 sql/merkle_entities.sql diff --git a/disk_jumble/src/disk_jumble/disk.py b/disk_jumble/src/disk_jumble/disk.py index 8ddb41c..dd05052 100644 --- a/disk_jumble/src/disk_jumble/disk.py +++ b/disk_jumble/src/disk_jumble/disk.py @@ -24,7 +24,7 @@ class Device: size_in_512b = int(open('/sys/dev/block/{}/size'.format(self.rdev_str)).read()) if self.sector_size == 4096: assert size_in_512b % 8 == 0 - self.num_sectors = size_in_512b / 8 + self.num_sectors = size_in_512b // 8 self.header_sectors = 256 elif self.sector_size == 512: self.num_sectors = size_in_512b @@ -57,15 +57,13 @@ class Device: if header_data is None or header_data[0] != dev_uuid: os.close(dev_fd) continue - assert header_data[3] == self.num_blocks except: os.close(dev_fd) raise else: - self.device = device - break + return device else: - raise RuntimeError("Didn't find disk {}".format(disk_id)) + raise RuntimeError("no_matching_disk") def read_header(self): assert self.fd is not None @@ -80,9 +78,9 @@ class Device: dev_uuid = uuid.UUID(bytes=dev_uuid) assert sector_size == self.sector_size assert header_sectors == self.header_sectors - sectors_per_block = 16384 / self.sector_size + sectors_per_block = 16384 // self.sector_size assert data_sectors % sectors_per_block == 0 - num_blocks = data_sectors / sectors_per_block + num_blocks = data_sectors // sectors_per_block assert self.header_sectors + data_sectors <= self.num_sectors return (dev_uuid, self.sector_size, self.header_sectors, num_blocks) else: @@ -92,14 +90,14 @@ class Device: if (not force) and self.read_header(): raise RuntimeError('Refusing to overwrite existing diskjumble header') data_sectors_avail = self.num_sectors - self.header_sectors - sectors_per_block = 16384 / self.sector_size + sectors_per_block = 16384 // self.sector_size if num_blocks is None: num_blocks = data_sectors_avail // sectors_per_block data_sectors = num_blocks * sectors_per_block assert self.header_sectors + data_sectors <= self.num_sectors - header_data = struct.pack('>16s16sQQQ', Device.magic, dev_uuid.bytes, self.sector_size, self.header_sectors, data_sectors).ljust(sector_size, b'\0') + header_data = struct.pack('>16s16sQQQ', Device.magic, dev_uuid.bytes, self.sector_size, self.header_sectors, data_sectors).ljust(self.sector_size, b'\0') for i in range(0, self.header_sectors): - os.pwrite(self.fd, header_data, i*sector_size) + os.pwrite(self.fd, header_data, i*self.sector_size) def close(self): if self.fd: @@ -129,7 +127,7 @@ class Disk: dev_uuid = header_data[0] cursor.execute('''SELECT disk_id, dev_uuid, dev_serial, num_blocks FROM diskjumble.disk - WHERE coalesce(disk_id=%s, true) AND coalesce(dev_uuid=%s, true)''', (disk_id, dev_uuid)) + WHERE coalesce(disk_id=%s, true) AND coalesce(dev_uuid=%s, true)''', (disk_id, str(dev_uuid) if dev_uuid else None)) [(row)] = cursor.fetchall() disk_id = row[0] dev_uuid = uuid.UUID(row[1]) @@ -152,7 +150,7 @@ class Disk: device = Device(os.open(devname, os.O_RDWR)) data_sectors_avail = device.num_sectors - device.header_sectors - sectors_per_block = 16384 / device.sector_size + sectors_per_block = 16384 // device.sector_size if num_blocks is None: num_blocks = data_sectors_avail // sectors_per_block @@ -165,7 +163,7 @@ class Disk: self.device.close() def write_header(self, force=False): - self.device.write_header(self.dev_uuid, force) + self.device.write_header(self.dev_uuid, force=force) def mapping_name(self): assert self.disk_id is not None @@ -206,5 +204,5 @@ if __name__ == '__main__': if args.up: inst.create_mapping() if args.new: - inst.save(conn) + inst.save() conn.commit() diff --git a/disk_jumble/src/disk_jumble/scrape.py b/disk_jumble/src/disk_jumble/scrape.py index 4143b8a..6130ed3 100644 --- a/disk_jumble/src/disk_jumble/scrape.py +++ b/disk_jumble/src/disk_jumble/scrape.py @@ -68,7 +68,7 @@ def scrape_batch(tracker: Tracker, info_hashes: Iterable[bytes], passkey: str) - if __name__ == "__main__": - PSQL_PARAMS = ["dbname", "user", "password", "host", "port"] + PSQL_PARAMS = ["dbname", "user", "password", "host", "port", "service"] def tracker_name(val): matches = [n for n in TRACKERS.keys() if n.lower() == val.lower()] diff --git a/disk_jumble/src/disk_jumble/verify.py b/disk_jumble/src/disk_jumble/verify.py index 63f9ffb..2f61e3c 100644 --- a/disk_jumble/src/disk_jumble/verify.py +++ b/disk_jumble/src/disk_jumble/verify.py @@ -242,12 +242,12 @@ def _gen_verify_results(conn, disk_id: int, disk_file: io.BufferedIOBase, read_s *, int8range( slab.entity_offset / %(block_size)s, - salb.entity_offset / %(block_size)s + upper(slab.disk_blocks) - lower(slab.disk_blocks) + slab.entity_offset / %(block_size)s + upper(slab.disk_blocks) - lower(slab.disk_blocks) ) as entity_blocks from diskjumble.slab ) select - entity_id, + elh.entity_id, generate_series( lower(slab_plus.entity_blocks * elh.block_range), upper(slab_plus.entity_blocks * elh.block_range) - 1 @@ -274,7 +274,7 @@ def _gen_verify_results(conn, disk_id: int, disk_file: io.BufferedIOBase, read_s slab_plus.entity_id = elh.entity_id and slab_plus.entity_blocks && elh.block_range ) - left outer join public.entity using (entity_id) + left outer join public.entity on elh.entity_id=entity.entity_id ) where slab_plus.disk_id = %(disk_id)s order by sector diff --git a/disk_jumble/src/disk_jumble/write.py b/disk_jumble/src/disk_jumble/write.py new file mode 100644 index 0000000..9aea69e --- /dev/null +++ b/disk_jumble/src/disk_jumble/write.py @@ -0,0 +1,25 @@ +import subprocess +import psycopg2 +import contextlib + +if __name__ == '__main__': + import argparse + arg_parser = argparse.ArgumentParser() + arg_parser.add_argument('disk_id', help='integer disk_id', type=int) + arg_parser.add_argument('--repo', help='path to find entities', required=True) + args = arg_parser.parse_args() + + with contextlib.closing(psycopg2.connect("")) as conn, conn.cursor() as cursor: + cursor.execute("SELECT disk_id, lower(disk_blocks), upper(disk_blocks)-lower(disk_blocks), entity_id, entity_offset FROM diskjumble.slab WHERE disk_id=%s AND NOT realized ORDER BY lower(disk_blocks)", (args.disk_id,)) + for (disk_id, disk_offset, disk_blocks, entity_id, entity_offset) in cursor.fetchall(): + subprocess.run(['dd', + 'if={}/{}'.format(args.repo, entity_id.hex()), + 'skip={}'.format(entity_offset), + 'count={}'.format(16384*disk_blocks), + 'iflag=skip_bytes,count_bytes,fullblock', + 'conv=nocreat', + 'of=/dev/mapper/diskjumble-{}'.format(disk_id), + 'oflag=seek_bytes', + 'seek={}'.format(16384*disk_offset), + 'bs=1M', 'status=progress' + ], check=True) diff --git a/misc/agg_cte.sql b/misc/agg_cte.sql new file mode 100644 index 0000000..5839ad5 --- /dev/null +++ b/misc/agg_cte.sql @@ -0,0 +1,20 @@ +-- Conclusion: You wouldn't want to do tree traversal with a CTE. + +WITH RECURSIVE + tree (node, parent_node, value) AS (VALUES + ('root', NULL, '*'), + ('interior', 'root', '+'), + ('leaf1', 'interior', '1'), + ('leaf2', 'interior', '2'), + ('leaf3', 'root', '3') + ), + cte AS ( + SELECT node, parent_node, value AS serialized + FROM tree + WHERE node LIKE 'leaf%' + UNION ALL + SELECT child.parent_node, parent.parent_node, '(' || parent.value/*string_agg(child.serialized, parent.value)*/ || ')' + FROM cte AS child JOIN tree AS parent ON child.parent_node = parent.node + GROUP BY child.parent_node, parent.parent_node, parent.value + ) +SELECT node, parent_node, serialized FROM cte; diff --git a/scripts/entity_download b/scripts/entity_download new file mode 100755 index 0000000..95b16bf --- /dev/null +++ b/scripts/entity_download @@ -0,0 +1,29 @@ +#!/usr/bin/python3 + +import psycopg2 +from sys import argv +import subprocess + +dbconn = psycopg2.connect('') + +for entity_id in argv[1:]: + db = dbconn.cursor() + + db.execute("SELECT gdrive_file_id, access_token, encryption_key, lower(upload_range), upper(upload_range) FROM gdrive.entity_instance JOIN gdrive.upload USING (upload_id), gdrive.access_token WHERE entity_id=%s AND access_token.id=1 AND access_token.expires > CURRENT_TIMESTAMP", [bytes.fromhex(entity_id)]) + + [(gdrive_file,access_token,encrypt_key,upload_start,upload_end)] = db.fetchall() + + upload_waste_start = upload_start % 16 + upload_blk_start = upload_start - upload_waste_start + upload_waste_end = (16 - upload_end % 16 ) % 16 + upload_blk_end = upload_end + upload_waste_end + 15 + + fifo = subprocess.run(["/bin/mktemp", "-p", "./"], check=True, stdout=subprocess.PIPE, universal_newlines=True).stdout[0:-1] + + subprocess.run(["/usr/bin/fallocate", "--zero-range", "--keep-size", "--offset", "0", "--length", str(upload_end - upload_start), fifo], check=True) + + subprocess.run(["curl '--fail' '-H' 'Authorization: Bearer {}' '-H' 'Range: bytes={}-{}' 'https://www.googleapis.com/drive/v3/files/{}?alt=media' | openssl 'enc' '-aes-128-cbc' '-d' '-nopad' '-K' '{}' '-iv' \"`head --bytes 16 | xxd -p`\" | dd 'bs=16M' 'skip={}' 'count={}' 'iflag=skip_bytes,count_bytes,fullblock' 'status=none' 'of={}'".format(access_token, upload_blk_start, upload_blk_end, gdrive_file, encrypt_key.hex(), upload_waste_start, upload_end - upload_start, fifo)], check=True, shell=True) + + subprocess.run(["/bin/mv", "--no-clobber", "-v", fifo, "./{}".format(entity_id)], check=True) + + dbconn.commit() diff --git a/sql/diskjumble.sql b/sql/diskjumble.sql new file mode 100644 index 0000000..9133db0 --- /dev/null +++ b/sql/diskjumble.sql @@ -0,0 +1,17 @@ +CREATE TABLE diskjumble.disk ( + disk_id serial PRIMARY KEY, + dev_uuid uuid NOT NULL UNIQUE, + dev_serial text, + num_blocks bigint NOT NULL, + failed boolean NOT NULL DEFAULT false +); + +CREATE TABLE diskjumble.slab ( + slab_id serial PRIMARY KEY, + disk_id integer NOT NULL REFERENCES diskjumble.disk (disk_id), + disk_blocks int8range NOT NULL, + entity_id bytea NOT NULL, + entity_offset int8 NOT NULL, + crypt_key bytea, + realized boolean DEFAULT false +); diff --git a/sql/diskjumble_scheduling.sql b/sql/diskjumble_scheduling.sql new file mode 100644 index 0000000..78ae5ac --- /dev/null +++ b/sql/diskjumble_scheduling.sql @@ -0,0 +1,21 @@ +SELECT entity_id, copies_wanted, copies_scheduled, copies_real FROM (SELECT entity_id, count(*) AS copies_scheduled, count(*) FILTER (WHERE realized) AS copies_real FROM (SELECT entity_id, disk_id, coalesce(bool_or(realized), false) AS realized FROM diskjumble.slab JOIN entity USING (entity_id) WHERE entity_offset=0 AND upper(disk_blocks)-lower(disk_blocks)=(length+16383)/16384 GROUP BY entity_id, disk_id) t0 GROUP BY entity_id) t1 JOIN diskjumble.entity_policy USING (entity_id) WHERE copies_scheduled>0; + +SELECT entity_id, pg_size_pretty(length) AS "size", copies_wanted, coalesce(copies_scheduled, 0) AS copies_scheduled, coalesce(copies_real, 0) AS copies_real FROM entity JOIN diskjumble.entity_policy USING (entity_id) LEFT JOIN (SELECT entity_id, count(*) AS copies_scheduled, count(*) FILTER (WHERE realized) AS copies_real FROM (SELECT entity_id, disk_id, coalesce(bool_or(realized), false) AS realized FROM diskjumble.slab JOIN entity USING (entity_id) WHERE entity_offset=0 AND upper(disk_blocks)-lower(disk_blocks)=(length+16383)/16384 GROUP BY entity_id, disk_id) t0 GROUP BY entity_id) t1 USING (entity_id) WHERE priority > 1000 + +CREATE OR REPLACE FUNCTION diskjumble.schedule_by_policy(p_disk_id integer) RETURNS void LANGUAGE plpgsql AS $$ +DECLARE + p_block_start int8; + p_block_end int8; + p_entity_id bytea; + p_blocks int8; +BEGIN + SELECT coalesce(max(upper(disk_blocks)), 0) INTO STRICT p_block_start FROM diskjumble.slab WHERE disk_id=p_disk_id; + SELECT num_blocks INTO STRICT p_block_end FROM diskjumble.disk WHERE disk_id=p_disk_id; + FOR p_entity_id, p_blocks IN (SELECT entity_id, (length+16383)/16384 AS blocks FROM entity JOIN diskjumble.entity_policy USING (entity_id) LEFT JOIN (SELECT entity_id, count(*) AS copies_scheduled FROM (SELECT DISTINCT entity_id, disk_id FROM diskjumble.slab JOIN entity USING (entity_id) WHERE entity_offset=0 AND upper(disk_blocks)-lower(disk_blocks)=(length+16383)/16384) t0 GROUP BY entity_id HAVING count(*) FILTER (WHERE disk_id=p_disk_id)=0) t1 USING (entity_id) WHERE coalesce(copies_scheduled, 0) < copies_wanted ORDER BY priority DESC, blocks DESC, copies_wanted - coalesce(copies_scheduled, 0) DESC) LOOP + IF p_block_start + p_blocks <= p_block_end THEN + INSERT INTO diskjumble.slab (disk_id, disk_blocks, entity_id, entity_offset) SELECT p_disk_id, int8range(p_block_start, p_block_start + p_blocks), p_entity_id, 0; + SELECT p_block_start + p_blocks INTO STRICT p_block_start; + END IF; + END LOOP; +END +$$; diff --git a/sql/diskjumble_writing.sql b/sql/diskjumble_writing.sql new file mode 100644 index 0000000..d065807 --- /dev/null +++ b/sql/diskjumble_writing.sql @@ -0,0 +1,3 @@ +-- set slab `realized` flags after a verification run + +UPDATE diskjumble.slab SET realized=true FROM diskjumble.verify_pass WHERE verify_pass.disk_id=slab.disk_id AND slab.disk_blocks <@ verify_pass.disk_sectors AND NOT realized; diff --git a/sql/gdrive_download.sql b/sql/gdrive_download.sql new file mode 100644 index 0000000..e7c260a --- /dev/null +++ b/sql/gdrive_download.sql @@ -0,0 +1,49 @@ +CREATE TABLE entity_download_control ( + repository integer PRIMARY KEY, + bw_limit int8 -- in KiB/s; no limit if null +); + +-- rows in this table are entities we want to be downloaded +-- download higher-priority entities first +-- if an entity is downloaded but not in this table, it should be deleted +CREATE TABLE entity_download_policy ( + repository integer not null, + entity_id bytea not null references entity (entity_id), + priority integer not null default 1000, + PRIMARY KEY (repository, entity_id) +); + +-- rows in this table are entities we have +-- this table is advisory: the source of truth is the actual files present in the repository (directory.) +-- it's your responsibility to keep the table up to date. +-- if I rm a file, download program should eventually notice and delete any corresponding row in this table. +CREATE TABLE entity_instance_local ( + repository integer not null, + entity_id bytea not null references entity (entity_id), + PRIMARY KEY (repository, entity_id) +); + +/* + +Watch out for capacity-based I/O errors when doing filesystem operations: "disk quota exceeded" or "no more space on device" +Treat this as a temporary error: retry/continue until the problem is resolved. +Gotcha: downloading a large gdrive file (or large byte range of a large file) counts against my gdrive daily download quota even if you cancel the request (e.g. break the TCP connection) early on in the response. So don't do it over and over while you wait for me to free up disk space. +For other kinds of I/O error a crash is ok. + +If Google responds with 401 or 403, treat this as a temporary error. 401 is probably an expired access token; 403 is probably a reached download quota. + +Retrieve access token: SELECT access_token FROM gdrive.access_token WHERE id=1 AND expires > CURRENT_TIMESTAMP; + +Example code for downloading an entity is in scripts/entity_download in the eros repository. It's even python kinda! + +Create files in the given temp path; when download is finished, rename/mv into place (aToMiC hullalulla) + +python3 -m diskjumble.downloadsvc --repository --repository-path --temp-path [--poll-interval ] [--chunk-size ] + +When there's no work to do (or there was just a temporary error) wait poll-interval seconds for the situation to improve and try again (resuming download from where it left off, if applicable.) + +If --chunk-size is specified then don't ask for more than that many bytes in a single HTTP request to Google. (No need to be exact.) + +Filenames in a repository are the hex representation of the entity_id. + +*/ diff --git a/doc/gdrive_hash_worker.sql b/sql/gdrive_hash_worker.sql similarity index 100% rename from doc/gdrive_hash_worker.sql rename to sql/gdrive_hash_worker.sql diff --git a/sql/merkle_entities.sql b/sql/merkle_entities.sql new file mode 100644 index 0000000..d6ea6c6 --- /dev/null +++ b/sql/merkle_entities.sql @@ -0,0 +1,76 @@ +CREATE VIEW entityv2_leaf_hashes AS +SELECT + entity_id, + int8range((lower(th.upload_range) - lower(tei.upload_range))/16384, + (upper(th.upload_range) - lower(tei.upload_range)+16383)/16384) AS block_range, + hashes +FROM gdrive.entity_instance tei +JOIN gdrive.gdrive_hashes th USING (upload_id) +WHERE + th.piece_size=14 AND + th.algo='sha256' AND + -- alignment: range starts on the start of a block of the entity + (lower(th.upload_range) - lower(tei.upload_range)) % 16384 = 0 AND + -- range contained within slab + th.upload_range <@ tei.upload_range AND + -- short block only ok if ends at end of slab/entity + ((upper(th.upload_range) - lower(th.upload_range)) % 16384 = 0 + OR upper(th.upload_range) = upper(tei.upload_range)); + +CREATE OR REPLACE FUNCTION merkle_iterate_zero_hash(levels int) +RETURNS bytea LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE AS $$ +DECLARE + hash bytea := '\x0000000000000000000000000000000000000000000000000000000000000000'; +BEGIN + IF levels < 0 OR levels IS NULL THEN + RAISE EXCEPTION 'levels must be nonnegative'; + END IF; + FOR _ IN 1..levels LOOP + SELECT sha256(hash || hash) INTO STRICT hash; + END LOOP; + RETURN hash; +END; +$$; + +CREATE OR REPLACE FUNCTION merkle_root_from_level(hashes bytea, level int4, force_iterations int4 DEFAULT 0) +RETURNS bytea LANGUAGE plpgsql IMMUTABLE STRICT PARALLEL SAFE AS $$ +DECLARE + len_hashes int4; + iteration int4 := 0; +BEGIN + WHILE octet_length(hashes) > 32 OR iteration < force_iterations LOOP + SELECT octet_length(hashes) INTO STRICT len_hashes; + SELECT string_agg(sha256( + CASE WHEN loc + 63 > len_hashes + THEN substring(hashes, loc, 32) || merkle_iterate_zero_hash(level + iteration) + ELSE substring(hashes, loc, 64) + END), NULL ORDER BY loc) + INTO STRICT hashes FROM generate_series(1, len_hashes, 64) ts(loc); + iteration := iteration + 1; + END LOOP; + RETURN hashes; +END; +$$; + +CREATE OR REPLACE FUNCTION entityv2_merkle_root(uuid) +RETURNS bytea LANGUAGE plpgsql STABLE STRICT PARALLEL SAFE AS $$ +DECLARE + blocks int8; + chunks int4; + chunk_hashes bytea := '\x'; +BEGIN + SELECT (length+16383)/16384, (length+1073741823)/1073741824 INTO STRICT blocks, chunks FROM entity WHERE entity_id=$1; + FOR chunk IN 0..chunks-1 LOOP + SELECT chunk_hashes || merkle_root_from_level(hashes, 0, CASE WHEN chunks>0 THEN 16 ELSE 0 END) + INTO STRICT chunk_hashes + FROM entityv2_leaf_hashes + WHERE entity_uuid = $1 AND + block_range = int8range(chunk*65536, LEAST(chunk*65536+65536, blocks)) + LIMIT 1; + END LOOP; + RETURN merkle_root_from_level(chunk_hashes, 16); +END; +$$; + +/* this takes a fricken long time (like an hour) */ +CREATE TABLE entityv2 AS SELECT entityv2_merkle_root(entity_uuid) AS entity_id, entity_uuid FROM entityv2_leaf_hashes GROUP BY entity_uuid; -- 2.30.2