본문으로 건너뛰기

Recovery Header Validation — 통합 인터페이스 합의안 (NY ↔ DG)

대규님과 합의용. 두 브랜치가 같은 함수(_validate_loaded_entries)를 건드려 충돌하므로, "읽기 단계만 엔진별 분기, 판정·적용은 단일" 구조로 합치는 제안. 상세 분석 근거: raw_block_recovery_validation_parallel.md

1. 문제의식 — "엔진별로 검증 함수를 통째 가르는 건 과하다"

검증 로직(identity / payload_len 비교 → drop)은 엔진과 무관하게 완전히 동일하다. 엔진마다 다른 건 오직 "header N개를 어떻게 읽느냐"뿐:

  • POSIX: 동기 blocking pread밖에 없음 → 병렬성은 스레드 여러 개(threadpool)로만.
  • io_uring: 비동기 SQ → 단일 스레드에서 N개 batch 제출 + 한 번 wait로 device queue depth 활용. 여기에 threadpool까지 씌우면 안티패턴.

→ "읽는 방법이 엔진마다 다른 것"은 정상. 하지만 현재 DG 구조처럼 _validate_loaded_entries 를 엔진별 메서드 3개로 가르면 검증 흐름이 복제될 위험. 분기를 "읽기" 한 곳으로 좁히고, 검증 루프는 단 하나로.

2. 목표 구조 — 3개 층으로 경계 분리

읽기(엔진별 분기) → 판정(엔진 무관 단일) → 적용(drop, 단일)
_read_slot_headers _is_stale_header _validate_loaded_entries 본문

① 검증 루프는 하나 (NY 철학)

def _validate_loaded_entries(self) -> None:
"""Drop recovered entries whose slot headers do not match metadata."""
with self._lock:
items = list(self._index.items())
if not items:
return

offsets = [int(entry.offset) for _, entry in items]
headers = self._read_slot_headers(offsets) # ← 유일한 엔진 분기점

to_drop = [
key
for (key, entry), hdr in zip(items, headers, strict=True)
if self._is_stale_header(key, entry, hdr) # ← 판정은 엔진 무관
]
# ... drop 적용 (기존과 동일)

② 읽기 dispatch — 여기 한 곳에서만 엔진 분기

def _read_slot_headers(
self, offsets: list[int]
) -> list[Optional[tuple[int, int]]]:
n = len(offsets)
if self.io_engine == "io_uring" and not self.use_uring_cmd and n > 1:
return self._read_slot_headers_batched(offsets) # NY: io_uring
if self.io_engine == "posix" and self._recovery_read_threads > 1 and n > 1:
return self._read_slot_headers_posix_parallel(offsets) # DG: threadpool
return [self._read_slot_header(off) for off in offsets] # serial fallback

③ 판정 — DG _validate_loaded_entry를 "header 받는" 형태로 변형

DG 원본은 내부에서 직접 _read_slot_header를 호출. 읽기가 분리됐으니 header를 인자로 받아 판정만:

def _is_stale_header(
self, encoded_key: str, entry: _Entry, slot_hdr: Optional[tuple[int, int]]
) -> bool:
if slot_hdr is None:
return True
try:
expected = slot_identity_from_encoded_key(encoded_key, self.key_namespace)
except Exception:
return True
identity, payload_len = slot_hdr
return int(identity) != int(expected) or int(payload_len) != int(entry.size)

④ POSIX 병렬 읽기 — DG range 분할/threadpool 그대로, "검증" 대신 "읽기"만

def _read_slot_headers_posix_parallel(
self, offsets: list[int]
) -> list[Optional[tuple[int, int]]]:
n = len(offsets)
max_workers = min(self._recovery_read_threads, n)
ranges = self._build_recovery_item_ranges(n, max_workers) # DG 그대로 재사용
work = [(offsets, s, e) for s, e in ranges]
with ThreadPoolExecutor(max_workers=max_workers,
thread_name_prefix="rawblk-recover") as pool:
return [hdr for chunk in pool.map(self._read_slot_header_range, work)
for hdr in chunk] # pool.map 순서 보존 → flatten

def _read_slot_header_range(self, work_item) -> list[Optional[tuple[int, int]]]:
offsets, start, end = work_item
return [self._read_slot_header(offsets[i]) for i in range(start, end)]

⑤ io_uring batched — NY _read_slot_headers_batched 그대로 (수정 불필요)

대규님이 io_uring 자리에 남긴 TODO(_validate_loaded_entries_iouring_serial)가 자연스럽게 ⑤로 채워짐.

3. 두 코드에서 살리는 것 / 버리는 것

살림버림
DG (02874b7)DEFAULT_RECOVERY_READ_THREADS/_recovery_read_threads, _build_recovery_item_ranges, threadpool 패턴, _validate_loaded_entry(→_is_stale_header로 read 분리), 벤치(fdaec7f)_validate_loaded_entries_{serial,posix_parallel,iouring_serial} 3분기, _validate_loaded_entry_range_posix(읽기 range로 대체)
NY (dc3f5d36)_read_slot_headers_batched, 단일 검증 루프 철학진입부 inline 분기(→_read_slot_headers로 이동)

→ 검증 흐름이 한 군데, 엔진은 "읽기 전략"만 갈아끼움.

4. PR / 머지 순서 (각자 크레딧 유지)

  1. DG PR — 통합 인터페이스 골격: _read_slot_headers dispatch + _is_stale_header
    • serial + POSIX threadpool + 벤치마크. (io_uring 분기는 serial fallback 또는 빈 자리) → ✅ 구현 완료 (origin/priv/dg/raw-block-multithreads-recovery, top c85cb720). io_uring 자리는 _read_slot_headers_batched TODO placeholder로 비워둠.
  2. NY PR — DG 위에 rebase, io_uring 분기를 _read_slot_headers_batched로 채움. → ✅ 구현 완료 (2026-06-19, perf/iouring-recovery-batched-read 커밋 87638cb). placeholder 본문을 dc3f5d36 구현으로 교체 + batched/dispatch 테스트 6개. 부수: base의 test_..._iouring_recovery_does_not_use_posix_threads 기존 실패 (use_uring_cmd 미설정) 동반 수정. batch I/O 에러 격리 fallback(TDD): wait_iouring이 슬롯 1개 에러로 batch 전체 Err를 던져 유효 엔트리까지 전부 drop되던 회귀를, except에서 per-slot _read_slot_header fallback으로 교체해 dev/POSIX 격리 동작 복원. 벤치 io_engine 파라미터화도 포함(--io-engine posix io_uring, label-keyed variant + baseline 대비 speedup) — io_uring batched가 같은 fixture로 측정됨(tmpfs 768·20000 entries 실측). 검증 raw_block_core 13 passed + pre-commit 통과.

→ ①~④ 구조 소유 = DG, ⑤ = NY. 분담이 코드 경계와 일치, 충돌 없음. → 남은 것: DG 골격 PR upstream 제출 → 이 브랜치를 그 위에 올림 → dev rebase (base가 6e992ec4 기반, upstream/dev 7b0314fc보다 뒤) → 총괄 이슈 등록 + PR 링크.

5. 결정된 scope

  • POSIX threadpool = DG가 02874b7에 이미 구현 완료. 통합 인터페이스의 POSIX 구현(_read_slot_headers_posix_parallel)으로 그대로 살림. 재검토 대상 아님.
  • io_uring batched_read = NY가 dc3f5d36에 구현 완료. io_uring 구현으로 살림.
  • background/lazy 검증(6/5 3번)은 안 하기로 결정 — scope 제외.

6. 합의 한 줄 (대규님께)

"검증 로직은 엔진 무관하게 똑같으니, 분기를 _validate_loaded_entries 전체가 아니라 _read_slot_headers 읽기 한 곳으로 좁히고, 대규님 threadpool/range는 그 인터페이스의 POSIX 구현으로, 제 batched_read는 io_uring 구현으로 넣는 게 어떨까요?"