DJ verify: control SQL size when updating disk maps
authorJakob Cornell <jakob+gpg@jcornell.net>
Sun, 21 Aug 2022 02:29:13 +0000 (21:29 -0500)
committerJakob Cornell <jakob+gpg@jcornell.net>
Sun, 21 Aug 2022 02:29:13 +0000 (21:29 -0500)
disk_jumble/src/disk_jumble/verify.py

index 6b309bc903caed250dad092afcc5d763b679d77e..19cf1a75a771b837d6161e3dd40823235164fc57 100644 (file)
@@ -1,5 +1,6 @@
 from __future__ import annotations
 from dataclasses import dataclass
+from itertools import islice, zip_longest
 from typing import Iterator, List, Optional
 from warnings import warn
 import argparse
@@ -367,31 +368,37 @@ def _do_verify(
        clean_passes = clean_up(passes)
        clean_fails = clean_up(fails)
 
+       batch_size = 100  # how many ranges of each type to process in each statement
+       pass_iter = iter(clean_passes)
+       pass_chunks = iter(lambda: list(islice(pass_iter, stop = batch_size)), [])
+       fail_iter = iter(clean_fails)
+       fail_chunks = iter(lambda: list(islice(fail_iter, stop = batch_size)), [])
        with conn:
-               conn.cursor().execute(
-                       """
-                               with
-                                       new_passes as (
-                                               select coalesce(range_agg(range), int8multirange()) as new_passes
-                                               from unnest(%(pass_ranges)s::int8range[]) as range
-                                       ),
-                                       new_fails as (
-                                               select coalesce(range_agg(range), int8multirange()) as new_fails
-                                               from unnest(%(fail_ranges)s::int8range[]) as range
-                                       )
-                               update diskjumble.disk_maps
-                               set
-                                       verified_map = coalesce(verified_map, int8multirange()) + new_passes - new_fails,
-                                       written_map = written_map - new_fails
-                               from new_passes, new_fails
-                               where disk_id = %(disk_id)s
-                       """,
-                       {
-                               "pass_ranges": [NumericRange(r.start, r.stop) for r in clean_passes],
-                               "fail_ranges": [NumericRange(r.start, r.stop) for r in clean_fails],
-                               "disk_id": disk_id,
-                       },
-               )
+               for (pass_chunk, fail_chunk) in zip_longest(pass_chunks, fail_chunks, fillvalue = []):
+                       conn.cursor().execute(
+                               """
+                                       with
+                                               new_passes as (
+                                                       select coalesce(range_agg(range), int8multirange()) as new_passes
+                                                       from unnest(%(pass_ranges)s::int8range[]) as range
+                                               ),
+                                               new_fails as (
+                                                       select coalesce(range_agg(range), int8multirange()) as new_fails
+                                                       from unnest(%(fail_ranges)s::int8range[]) as range
+                                               )
+                                       update diskjumble.disk_maps
+                                       set
+                                               verified_map = coalesce(verified_map, int8multirange()) + new_passes - new_fails,
+                                               written_map = written_map - new_fails
+                                       from new_passes, new_fails
+                                       where disk_id = %(disk_id)s
+                               """,
+                               {
+                                       "pass_ranges": [NumericRange(r.start, r.stop) for r in pass_chunk],
+                                       "fail_ranges": [NumericRange(r.start, r.stop) for r in fail_chunk],
+                                       "disk_id": disk_id,
+                               },
+                       )
 
 
 if __name__ == "__main__":