pass
-def _get_v1_worklist(conn, disk_id: int, block_ranges: List[NumericRange]) -> List[_V1Run]:
+def _get_v1_worklist(conn, disk_id: int, target_ranges: List[NumericRange]) -> List[_V1Run]:
"""
How this works: First, we fetch some info about each torrent on the disk that has data within the requested blocks.
Then, we make one query per torrent (because each one may have a different piece size) to determine which disk
)
where disk_id = %s and disk_blocks && any (%s::int8range[])
""",
- (disk_id, block_ranges)
+ (disk_id, target_ranges)
)
infos = [
_TorrentInfo.build(bytes(id_), bencode.decode(bytes(info)))
select relevant_slab.*, target_range * disk_blocks as target_span
from
relevant_slab
- join unnest(%(block_ranges)s::int8range[]) as target_range
+ join unnest(%(target_ranges)s::int8range[]) as target_range
on target_range && disk_blocks
),
targeted_piece as (
{
"disk_id": disk_id,
"entity_id": info.id,
- "block_ranges": block_ranges,
+ "target_ranges": target_ranges,
"block_size": BLOCK_SIZE,
"piece_length": info.piece_length,
}
if sum(map(len, run.block_ranges)) == math.ceil(run_length / BLOCK_SIZE):
runs.append(run)
- runs.sort(lambda r: r.block_ranges)
+ runs.sort(key = lambda r: r.block_ranges)
return runs
-def _get_v2_worklist(conn, disk_id: int, block_ranges: List[NumericRange]) -> List[_V2Run]:
+def _get_v2_worklist(conn, disk_id: int, target_ranges: List[NumericRange]) -> List[_V2Run]:
cursor = conn.cursor()
cursor.execute(
"""
slab_plus.disk_blocks,
entity_blocks * elh.block_range as check_erange
from
- entityv2_leaf_hashes elh
+ public.entityv2_leaf_hashes elh
join slab_plus using (entity_id)
left outer join public.entity using (entity_id)
),
from filtered
)
select * from exploded
- where block <@ any (%(block_ranges)s::int8range[])
+ where block <@ any (%(target_ranges)s::int8range[])
order by block
""",
{
"block_size": BLOCK_SIZE,
"disk_id": disk_id,
- "block_ranges": block_ranges,
+ "target_ranges": target_ranges,
}
)
rows = cursor.fetchall()
]
-def _do_verify(conn, disk_id: int, block_ranges: Optional[List[range]], disk_file: io.BufferedIOBase, read_size: int, read_tries: int):
+def _do_verify(conn, disk_id: int, target_ranges: Optional[List[range]], disk_file: io.BufferedIOBase, read_size: int, read_tries: int):
@dataclass
class Pass:
blocks: range
if curr_pass:
yield Pass(curr_pass)
- if block_ranges is None:
- pg_block_ranges = [NumericRange()]
+ if target_ranges is None:
+ pg_target_ranges = [NumericRange()]
else:
- pg_block_ranges = [NumericRange(r.start, r.stop) for r in block_ranges]
+ pg_target_ranges = [NumericRange(r.start, r.stop) for r in target_ranges]
worklist = list(heapq.merge(
- _get_v1_worklist(conn, disk_id, pg_block_ranges),
- _get_v2_worklist(conn, disk_id, pg_block_ranges),
+ _get_v1_worklist(conn, disk_id, pg_target_ranges),
+ _get_v2_worklist(conn, disk_id, pg_target_ranges),
key = lambda run: run.block_ranges,
))
- if block_ranges is not None:
+ if target_ranges is not None:
requested_blocks = {
block
- for r in block_ranges
+ for r in target_ranges
for block in r
}
covered_blocks = {
args = parser.parse_args()
if args.block_ranges is None:
- block_ranges = None
+ target_ranges = None
else:
- block_ranges = []
+ target_ranges = []
for r in sorted(args.block_ranges, key = lambda r: r.start):
- if block_ranges and r.start <= block_ranges[-1].stop:
- prev = block_ranges.pop()
- block_ranges.append(range(prev.start, max(prev.stop, r.stop)))
+ if target_ranges and r.start <= target_ranges[-1].stop:
+ prev = target_ranges.pop()
+ target_ranges.append(range(prev.start, max(prev.stop, r.stop)))
else:
- block_ranges.append(r)
+ target_ranges.append(r)
with contextlib.closing(psycopg2.connect("")) as conn:
path = f"/dev/mapper/diskjumble-{args.disk_id}"
with open(path, "rb", buffering = _READ_BUFFER_SIZE) as disk_file:
with conn:
- _do_verify(conn, args.disk_id, block_ranges, disk_file, _READ_BUFFER_SIZE, args.read_tries)
+ _do_verify(conn, args.disk_id, target_ranges, disk_file, _READ_BUFFER_SIZE, args.read_tries)