Recovery Header Validation — 통합 인터페이스 합의안 (NY ↔ DG)
대규님과 합의용. 두 브랜치가 같은 함수(
_validate_loaded_entries)를 건드려 충돌하므로, "읽기 단계만 엔진별 분기, 판정·적용은 단일" 구조로 합치는 제안. 상세 분석 근거: raw_block_recovery_validation_parallel.md
1. 문제의식 — "엔진별로 검증 함수를 통째 가르는 건 과하다"
검증 로직(identity / payload_len 비교 → drop)은 엔진과 무관하게 완전히 동일하다. 엔진마다 다른 건 오직 "header N개를 어떻게 읽느냐"뿐:
- POSIX: 동기 blocking
pread밖에 없음 → 병렬성은 스레드 여러 개(threadpool)로만. - io_uring: 비동기 SQ → 단일 스레드에서 N개 batch 제출 + 한 번 wait로 device queue depth 활용. 여기에 threadpool까지 씌우면 안티패턴.
→ "읽는 방법이 엔진마다 다른 것"은 정상. 하지만 현재 DG 구조처럼
_validate_loaded_entries 를 엔진별 메서드 3개로 가르면 검증 흐름이 복제될 위험.
분기를 "읽기" 한 곳으로 좁히고, 검증 루프는 단 하나로.
2. 목표 구조 — 3개 층으로 경계 분리
읽기(엔진별 분기) → 판정(엔진 무관 단일) → 적용(drop, 단일)
_read_slot_headers _is_stale_header _validate_loaded_entries 본문
① 검증 루프는 하나 (NY 철학)
def _validate_loaded_entries(self) -> None:
"""Drop recovered entries whose slot headers do not match metadata."""
with self._lock:
items = list(self._index.items())
if not items:
return
offsets = [int(entry.offset) for _, entry in items]
headers = self._read_slot_headers(offsets) # ← 유일한 엔진 분기점
to_drop = [
key
for (key, entry), hdr in zip(items, headers, strict=True)
if self._is_stale_header(key, entry, hdr) # ← 판정은 엔진 무관
]
# ... drop 적용 (기존과 동일)
② 읽기 dispatch — 여기 한 곳에서만 엔진 분기
def _read_slot_headers(
self, offsets: list[int]
) -> list[Optional[tuple[int, int]]]:
n = len(offsets)
if self.io_engine == "io_uring" and not self.use_uring_cmd and n > 1:
return self._read_slot_headers_batched(offsets) # NY: io_uring
if self.io_engine == "posix" and self._recovery_read_threads > 1 and n > 1:
return self._read_slot_headers_posix_parallel(offsets) # DG: threadpool
return [self._read_slot_header(off) for off in offsets] # serial fallback
③ 판정 — DG _validate_loaded_entry를 "header 받는" 형태로 변형
DG 원본은 내부에서 직접 _read_slot_header를 호출. 읽기가 분리됐으니 header를
인자로 받아 판정만:
def _is_stale_header(
self, encoded_key: str, entry: _Entry, slot_hdr: Optional[tuple[int, int]]
) -> bool:
if slot_hdr is None:
return True
try:
expected = slot_identity_from_encoded_key(encoded_key, self.key_namespace)
except Exception:
return True
identity, payload_len = slot_hdr
return int(identity) != int(expected) or int(payload_len) != int(entry.size)
④ POSIX 병렬 읽기 — DG range 분할/threadpool 그대로, "검증" 대신 "읽기"만
def _read_slot_headers_posix_parallel(
self, offsets: list[int]
) -> list[Optional[tuple[int, int]]]:
n = len(offsets)
max_workers = min(self._recovery_read_threads, n)
ranges = self._build_recovery_item_ranges(n, max_workers) # DG 그대로 재사용
work = [(offsets, s, e) for s, e in ranges]
with ThreadPoolExecutor(max_workers=max_workers,
thread_name_prefix="rawblk-recover") as pool:
return [hdr for chunk in pool.map(self._read_slot_header_range, work)
for hdr in chunk] # pool.map 순서 보존 → flatten
def _read_slot_header_range(self, work_item) -> list[Optional[tuple[int, int]]]:
offsets, start, end = work_item
return [self._read_slot_header(offsets[i]) for i in range(start, end)]
⑤ io_uring batched — NY _read_slot_headers_batched 그대로 (수정 불필요)
대규님이 io_uring 자리에 남긴 TODO(_validate_loaded_entries_iouring_serial)가
자연스럽게 ⑤로 채워짐.
3. 두 코드에서 살리는 것 / 버리는 것
| 살림 | 버림 | |
|---|---|---|
| DG (02874b7) | DEFAULT_RECOVERY_READ_THREADS/_recovery_read_threads, _build_recovery_item_ranges, threadpool 패턴, _validate_loaded_entry(→_is_stale_header로 read 분리), 벤치(fdaec7f) | _validate_loaded_entries_{serial,posix_parallel,iouring_serial} 3분기, _validate_loaded_entry_range_posix(읽기 range로 대체) |
| NY (dc3f5d36) | _read_slot_headers_batched, 단일 검증 루프 철학 | 진입부 inline 분기(→_read_slot_headers로 이동) |
→ 검증 흐름이 한 군데, 엔진은 "읽기 전략"만 갈아끼움.
4. PR / 머지 순서 (각자 크레딧 유지)
- DG PR — 통합 인터페이스 골격:
_read_slot_headersdispatch +_is_stale_header- serial + POSIX threadpool + 벤치마크. (io_uring 분기는 serial fallback 또는 빈 자리)
→ ✅ 구현 완료 (
origin/priv/dg/raw-block-multithreads-recovery, topc85cb720). io_uring 자리는_read_slot_headers_batchedTODO placeholder로 비워둠.
- serial + POSIX threadpool + 벤치마크. (io_uring 분기는 serial fallback 또는 빈 자리)
→ ✅ 구현 완료 (
- NY PR — DG 위에 rebase, io_uring 분기를
_read_slot_headers_batched로 채움. → ✅ 구현 완료 (2026-06-19,perf/iouring-recovery-batched-read커밋87638cb). placeholder 본문을dc3f5d36구현으로 교체 + batched/dispatch 테스트 6개. 부수: base의test_..._iouring_recovery_does_not_use_posix_threads기존 실패 (use_uring_cmd미설정) 동반 수정. batch I/O 에러 격리 fallback(TDD):wait_iouring이 슬롯 1개 에러로 batch 전체Err를 던져 유효 엔트리까지 전부 drop되던 회귀를,except에서 per-slot_read_slot_headerfallback으로 교체해 dev/POSIX 격리 동작 복원. 벤치io_engine파라미터화도 포함(--io-engine posix io_uring, label-keyed variant + baseline 대비 speedup) — io_uring batched가 같은 fixture로 측정됨(tmpfs 768·20000 entries 실측). 검증 raw_block_core 13 passed + pre-commit 통과.
→ ①~④ 구조 소유 = DG, ⑤ = NY. 분담이 코드 경계와 일치, 충돌 없음.
→ 남은 것: DG 골격 PR upstream 제출 → 이 브랜치를 그 위에 올림 → dev rebase
(base가 6e992ec4 기반, upstream/dev 7b0314fc보다 뒤) → 총괄 이슈 등록 + PR 링크.
5. 결정된 scope
- POSIX threadpool = DG가 02874b7에 이미 구현 완료. 통합 인터페이스의
POSIX 구현(
_read_slot_headers_posix_parallel)으로 그대로 살림. 재검토 대상 아님. - io_uring batched_read = NY가 dc3f5d36에 구현 완료. io_uring 구현으로 살림.
- background/lazy 검증(6/5 3번)은 안 하기로 결정 — scope 제외.
6. 합의 한 줄 (대규님께)
"검증 로직은 엔진 무관하게 똑같으니, 분기를
_validate_loaded_entries전체가 아니라_read_slot_headers읽기 한 곳으로 좁히고, 대규님 threadpool/range는 그 인터페이스의 POSIX 구현으로, 제 batched_read는 io_uring 구현으로 넣는 게 어떨까요?"