본문으로 건너뛰기

L2 — batched_submit_put_task 1-key 분해 → 진짜 배치

한 줄 요약: legacy RustRawBlockBackend.batched_submit_put_task 가 N개 key를 N개 coroutine으로 분해하고 각 coroutine이 core.put_many([1]) 을 호출하는 구조를, 사전 dedup → 단일 put_many(specs, objs) → 결과 비트맵에 따라 콜백/cleanup 으로 재구성하여 배치 이점을 회복한다.


1. 변경 배경

1.1 현재 구조 — N개 coroutine으로 분해

rust_raw_block_backend.py:347-407

def batched_submit_put_task(self, keys, objs, ...):
futures = []
for key, obj in zip(keys, objs, strict=False):
with self._put_lock: # per-key lock #1
if key in self._put_tasks:
continue
self._put_tasks.add(key)

spec = encode_legacy_key(key)
exists = self._core.contains_key(spec.encoded, lock=False) \
or self._core.exists_inflight(spec.encoded) # per-key core lock × 2
if exists:
with self._put_lock: # per-key lock #2
self._put_tasks.discard(key)
continue

obj.ref_count_up()
fut = asyncio.run_coroutine_threadsafe(
self._submit_put_one(key, spec, obj, on_complete_callback),
loop,
) # per-key event-loop hop
futures.append(fut)
return futures or None

async def _submit_put_one(self, key, spec, memory_obj, on_complete_callback):
try:
put_result = await asyncio.to_thread(
self._core.put_many, [spec], [memory_obj], # ← N=1 호출!
)
if not put_result.results or not put_result.results[0]:
raise RuntimeError(...)
if on_complete_callback is not None:
try: on_complete_callback(key)
except Exception as e: logger.warning(...)
finally:
memory_obj.ref_count_down()
with self._put_lock: # per-key lock #3
self._put_tasks.discard(key)

1.2 비용 구조

key 1개 처리 시:

단계비용
_put_lock 획득2~3회 (add → fail-cleanup or finally-discard)
_core.contains_key + exists_inflightcore _lock 2회
asyncio.run_coroutine_threadsafe이벤트루프 큐 enqueue 1회
asyncio.to_threadthread pool dispatch 1회
core.put_many([1], [1])core 내부 4회 lock (L1 미적용 시)
_put_lock finally discard1회

N=100 배치:

  • _put_lock: 200300회 (현재) → 24회 (제안)
  • 이벤트루프 hop: 100회 (현재) → 1회 (제안)
  • thread pool dispatch: 100회 (현재) → 1회 (제안)
  • core put_many 호출: 100회 × N=1 (현재) → 1회 × N=100 (제안)

1.3 왜 이렇게 짜였는가 (역사적 추정)

  • legacy backend가 본래 key 단위 submit_put_task 만 갖고 있었고, batched_submit_put_task 는 이름만 배치인 어댑터로 추가된 것으로 보인다.
  • core.put_many 가 진짜 배치 시맨틱을 갖게 된 시점에 호출부 쪽이 함께 업그레이드되지 못함.
  • 결과적으로 인터페이스(public)는 배치인데 구현은 1-key fan-out.

1.4 L1과의 직교성

L1은 core.py 내부 (put_many + _write_one) 의 lock 흡수 작업이고, L2는 rust_raw_block_backend.py 의 호출부 재구성이다. 수정 파일이 다르고, L2가 호출하는 core.put_many 의 시그니처는 변하지 않으므로 충돌 없음. 두 변경의 효과는 곱셈으로 합쳐진다 (L2가 호출 횟수를 줄이고, L1이 호출 1회의 lock 횟수를 줄임).


2. 검토 내용

2.1 dedup 시맨틱 보존

현재는 다음 3종 dedup이 key별로 직렬 수행된다:

  1. _put_tasks set 멤버십 (in-flight put)
  2. core.contains_key (이미 인덱스에 있음)
  3. core.exists_inflight (다른 경로의 inflight)

배치형으로 묶을 때 사전 필터링 단계 에서 동일 검사를 일괄 수행한다. 필터링 결과로 살아남은 (key, spec, obj) 쌍들만 put_many 에 전달한다.

검사현재제안
_put_tasks 멤버십per-key 직렬_put_lock 한 번 잡고 batch add (이미 있는 것 skip)
core.contains_keyper-keycore.exists_many(encoded_keys, lock=False) 1회
core.exists_inflightper-keycore 측에 batch API 추가 필요 (보류 — 아래 2.5 참조)

2.2 ref_count 정합성

현재 보장사항:

  • 살아남은 obj 만 ref_count_up()
  • coroutine _submit_put_one finally 에서 반드시 ref_count_down()
  • coroutine 진입 실패 시(loop is None) 즉시 ref_count_down() 후 raise

배치형에서 유지해야 할 invariant:

"모든 ref_count_up() 된 obj 는, put_many 의 결과 (성공/실패) 와 무관하게 정확히 한 번 ref_count_down() 된다."

→ 배치 coroutine 의 finally 에서 살아남은 obj 리스트 전체에 대해 일괄 ref_count_down(). put_many 가 도중에 예외를 던져도 finally 가 보장.

2.3 on_complete_callback 시점

현재: 각 key 별 coroutine 이 put_many([1]) 성공 직후 그 key 의 콜백 호출. 부분 성공이라는 개념 없음 (한 key 단위가 batch 전체).

제안: put_many(specs, objs).results 는 list[bool] — 정확히 어느 key 가 성공했는지 비트맵으로 반환된다. 비트맵을 순회하며:

  • results[i] == Trueon_complete_callback(keys[i]) 호출
  • results[i] == False → 콜백 호출 안 함 (현재와 동일 시맨틱)

콜백 예외는 현재처럼 catch + warning log.

시맨틱 동등성: put_many 는 key 별 독립적으로 인덱스에 commit 하므로, "key 가 인덱스에 들어간 후에만 콜백" 이라는 현재 invariant 가 유지된다.

2.4 Future 반환 시그니처

현재: list[Future] | None. 호출자가 future 별로 wait/cancel 가능.

제안 옵션 비교:

옵션반환호출자 호환성비고
A배치 1개 Future 를 N개로 복제하여 list 반환✅ 호출자 변경 불필요시맨틱: "한 future 이 done 이면 N개 모두 done"
B배치 결과를 N개 per-key Future 로 fan-out✅ 시맨틱 동일각 future 가 results[i] 를 결과로 가짐
Clist 길이 1 짜리 future 반환❌ 호출자 깨짐비추

호출자 검토 (cache_engine.py:1235, storage_manager.py:428):

  • storage_manager 는 반환 future 를 사용하지 않음 (fire-and-forget)
  • cache_engine.move_kv 는 async_batched_submit_put_task 만 사용 — 별도 경로
  • 테스트 (test_rust_raw_block_backend.py:1036) 는 futs[0].result(timeout=10) 패턴이 다수 → list 의 첫 요소 만 수집 하는 패턴이라 옵션 A 가 정확히 호환

옵션 A 채택 (가장 작은 표면적). 옵션 B 는 후속 PR 로 분리 가능.

2.5 exists_inflight 배치 API

현재 core.exists_inflight(encoded_key) -> bool 만 존재.

선택지:

  • (a) batch API core.exists_inflight_many(keys) -> list[bool] 신설
  • (b) L2 단계에서는 per-key 호출 유지 (lock N회) — 단, contains_key 는 exists_many 로 batch 화
  • (c) 사전 필터링 단계에서 _inflight 검사를 생략 (race window 영향 검토 필요)

판단: (b) 채택. 이유:

  • exists_inflight 는 Python 측 dict membership 검사 1회 → 매우 저렴
  • core API 표면 확장은 별도 PR 로 분리하는 게 리뷰 단위 측면에서 깔끔
  • 현재 측정 가능한 병목은 contains_key (Rust call + index lock) 와 이벤트루프/thread pool overhead 임

후속으로 (a) 가 필요해지면 별도 항목으로 트래킹.

2.6 _put_lock contention

현재: per-key 진입/이탈마다 lock — close() 의 polling 과 exists_in_put_tasks 가 매 key 마다 짧게 봉쇄됨.

제안: 사전 필터링 단계에서 _put_lock 한 번 잡고 batch add. coroutine finally 에서 한 번 잡고 batch discard. → lock 횟수 200N → 2~4 (배치당).

lock holding time 은 약간 늘어나지만 (N개 set add) , Python set 의 add 는 ~50ns 수준이라 N=100 에서도 수 μs 미만. close() polling 의 10ms 주기에 비해 무시 가능.

2.7 부분 실패 처리

put_many 결과 비트맵에서 일부 False 인 경우:

  • 현재: 각 key 별 coroutine 이 RuntimeError 를 raise → future 가 exception 으로 done. 호출자가 fut.result() 에서 예외 받음.
  • 제안: 비트맵을 보고 실패 key 들은:
    • log.error 로 보고 (key 식별자 포함)
    • 콜백 호출 안 함
    • future 자체는 정상 완료 (성공한 key 들은 정상 처리됨)

시맨틱이 살짝 다름. 현재는 "특정 key 가 실패하면 그 key 의 future 가 exception" / 제안은 "future 는 항상 정상, 실패 key 는 log 로만 보고".

호출자가 future 에서 exception 을 잡는 패턴이 있는지 확인 필요:

  • storage_manager.py: future 사용 안 함
  • 테스트: futs[0].result(timeout=10)timeout 만 잡고 exception 은 propagate

→ 테스트 호환을 위해서는 "전체 N 개 모두 실패 시에만 future 에 RuntimeError" 설정하는 절충안이 가능. 그러나 partial failure 가 현실에서 거의 발생하지 않는 (slot 부족, I/O 실패 등은 N개 동시 발생) 점을 감안하면 단순화 안전.

결정: 비트맵 전부 False 일 때만 future 에 exception 설정. 그 외에는 log + callback skip 로 graceful 처리. (현재 시맨틱과 가장 가까운 절충안)

2.8 검토에서 배제한 항목

  • core 측 batch API 신설 (exists_inflight_many 등): 별도 PR
  • legacy _submit_put_one 완전 삭제: 이번 PR 에서 제거 (호출부 없어짐)
  • option B (per-key future fan-out): 호출자가 per-key 완료를 진짜로 필요로 하는 시점에 별도 PR
  • pin/contains 경로 batching (T2 항목): L2 와 별도

3. 효과

3.1 정량 효과 (예상)

항목N=10N=100단위
_put_lock 획득30 → 2300 → 2
core._lock (dedup 단계)20 → 1200 → 1
이벤트루프 hop10 → 1100 → 1
asyncio.to_thread dispatch10 → 1100 → 1
core.put_many 호출10 (각 N=1)100 (각 N=1)호출
위 호출 내부 lock (L1 미적용)40400
위 호출 내부 lock (L1 적용)20200
배치형 + L1 put 내부 lock22

→ N=100 기준: dispatch overhead ~99% 감소, lock 총횟수 ~99% 감소 (L1 결합 시).

3.2 정성 효과

  • legacy 와 MP 경로 (이미 batched) 가 같은 core.put_many 시맨틱을 직접 사용 → 코드 일관성
  • 인터페이스 이름 (batched_submit_put_task) 과 구현 시맨틱 일치
  • io_uring SQ 활용률 잠재적 개선 (현재 동시 SQE = num_store_workers ≈ 2~4 → 배치 1회로 N개 SQE 가능. 단, 실효 활용은 Rust 측 pwrite_batch 미존재로 현재는 직렬 — L3 별도 항목)

3.3 비효과

  • core.put_many 시그니처/시맨틱 — 미변경
  • on-disk format — 미변경
  • public interface (batched_submit_put_task 시그니처) — 미변경
  • T1 (delete() TOCTOU 수정) — 영향 없음 (다른 메서드)
  • L1 (put_many 내부 lock) — 호출 횟수만 줄임. L1 효과는 그대로 유효.

4. 변경점 (계획)

4.1 batched_submit_put_task 재구성

def batched_submit_put_task(self, keys, objs, transfer_spec=None,
on_complete_callback=None):
del transfer_spec
if not keys:
return None
loop = self.loop
if loop is None:
raise RuntimeError("RustRawBlockBackend requires an asyncio event loop")

# --- 사전 필터링: dedup + spec encode (single _put_lock window) ---
accepted_keys: list[CacheEngineKey] = []
accepted_specs: list[RawBlockKeySpec] = []
accepted_objs: list[MemoryObj] = []

specs = [encode_legacy_key(key) for key in keys]
encoded_keys = [spec.encoded for spec in specs]
indexed_bitmap = self._core.exists_many(encoded_keys, lock=False)

with self._put_lock:
for i, key in enumerate(keys):
if key in self._put_tasks:
continue
if indexed_bitmap[i]:
continue
if self._core.exists_inflight(encoded_keys[i]):
continue
self._put_tasks.add(key)
accepted_keys.append(key)
accepted_specs.append(specs[i])
accepted_objs.append(objs[i])

if not accepted_keys:
return None

for obj in accepted_objs:
obj.ref_count_up()

fut = asyncio.run_coroutine_threadsafe(
self._submit_put_batch(
accepted_keys, accepted_specs, accepted_objs, on_complete_callback,
),
loop,
)
# 옵션 A: 호출자가 list[Future] 를 기대하므로 동일 future 를 N개 위치에 복제
return [fut] * len(accepted_keys)

4.2 _submit_put_batch (신규, _submit_put_one 대체)

async def _submit_put_batch(self, keys, specs, objs, on_complete_callback):
try:
put_result = await asyncio.to_thread(
self._core.put_many, specs, objs,
)
results = put_result.results
if not any(results):
raise RuntimeError(
f"Failed to persist {len(keys)} raw-block keys "
f"(first encoded: {specs[0].encoded})"
)
if on_complete_callback is not None:
for key, ok in zip(keys, results, strict=True):
if not ok:
continue
try:
on_complete_callback(key)
except Exception as e:
logger.warning(
"on_complete_callback failed for key %s: %s", key, e,
)
# 부분 실패 로깅
for key, ok in zip(keys, results, strict=True):
if not ok:
logger.warning(
"RustRawBlockBackend: put failed for key %s", key,
)
finally:
for obj in objs:
obj.ref_count_down()
with self._put_lock:
for key in keys:
self._put_tasks.discard(key)

4.3 _submit_put_one 제거

호출자 없음 → 삭제. 관련 docstring/주석 정리.

4.4 docstring/comment

batched_submit_put_task 에 다음 invariant 명시:

  • 반환 list 의 모든 항목은 동일 Future 객체를 가리킴 (옵션 A)
  • 부분 실패 시 future 는 정상 완료, 실패 key 는 log 로만 보고
  • 전부 실패 시 future 가 RuntimeError 로 done

4.5 테스트

  • 기존 test_rust_raw_block_backend.py 회귀 통과 (가장 중요)
  • 신규:
    1. test_batched_put_dedup_inflight_and_indexed: keys 일부가 이미 인덱스에 있고 일부는 inflight 일 때, 사전 필터링이 정확히 잔여 key 만 submit 하는지
    2. test_batched_put_partial_failure_does_not_break_others: core.put_many 를 mock 하여 일부 False 반환 시, 성공 key 의 callback 만 호출되는지
    3. test_batched_put_returns_same_future_per_key: 옵션 A 시맨틱 검증
    4. test_batched_put_ref_count_balanced_on_exception: put_many 가 raise 해도 모든 obj 의 ref_count 가 net 0 이 되는지

5. 개선 포인트 확인 방법 (수치 증명)

5.1 dispatch overhead 측정

# 변경 전/후 각각:
# - 100개 obj 준비
# - perf_counter 로 batched_submit_put_task 호출~모든 future done 까지 elapsed
# - run_coroutine_threadsafe 호출 횟수 (mock 으로 카운트)

기대치: dispatch elapsed N=100 기준 변경 후 1/10~1/50 (event loop 큐 동기화 overhead 가 main 비용).

5.2 lock contention 측정

# section 4.1 신설 sub: CountingLock 으로 _put_lock 을 swap
# batched_submit_put_task(keys=[100], objs=[100]) 1회
# assert acquire_count <= 4 (변경 후) / >= 200 (변경 전)

5.3 throughput 측정 (실제 raw device 또는 io_uring tmp file)

# - _has_ext() 가능한 환경에서만
# - N=1, 10, 100, 1000 배치 크기별 put 완료까지 wall clock
# - 변경 전/후 비교

기대치: N 이 클수록 변경 후 throughput 우위 (~N배 까지)

5.4 정합성

pytest tests/v1/storage_backend/test_rust_raw_block_backend.py -v
pytest tests/v1/storage_backend/test_storage_plugin.py -v

6. 위험 / 롤백

6.1 위험 항목

위험가능성영향완화
옵션 A 시맨틱 변화로 호출자 깨짐낮음일부 테스트 실패호출자 grep 결과 storage_manager/cache_engine 에서 future 사용 안 함 확인 완료
부분 실패 → future 정상 완료로 인한 silent loss중간일부 key 영구 미저장log.warning 명시 + 추후 metric 추가 검토
사전 필터링 단계의 exists_manyexists_inflight 간 race window낮음중복 put 시도core.put_many 가 이미 idempotent (_index / _inflight 재검사)
_submit_put_batch 의 finally 누수낮음ref_count 영구 +15.4 + ref_count 테스트 (5.x 항목)
L1 미적용 환경에서 batch 1회 lock holding 길어짐낮음exists_many 등 대기 ↑L1 동시 진행 권장. 미적용 시에도 200 → 2N 으로 횟수 줄어듦

6.2 롤백

변경이 단일 파일/3개 메서드에 국한. PR 단위 revert 충분. on-disk format/공개 인터페이스 시그니처 미변경 → 운영 중 롤백 시 추가 마이그레이션 불필요.

6.3 머지 순서 의존성

  • T1 (9fc5a901) 가 베이스. T1 머지 후 rebase 시 자동 정합 (T1 은 remove() 1줄, L2 는 batched_submit_put_task — 다른 메서드)
  • L1 과는 파일이 다름 → 무관
  • 권장 머지 순서: T1 → L1 → L2 (또는 T1 → L2 → L1, 순서 무관)

7. 변경 로그

일자작성자내용
2026-05-28ny초안 작성. 옵션 A 채택. 부분 실패는 graceful + 전부 실패 시 raise 절충안 결정. 구현 미착수.
2026-05-28ny구현 완료 (rust_raw_block_backend.py). _submit_put_one_submit_put_batch 로 교체. 기존 test_rust_raw_block_backend.py:1044 의 match="Failed to persist raw-block key"match="Failed to persist raw-block" 로 단축 (새 메시지 Failed to persist raw-block batch of N keys와 호환).
2026-05-28ny신규 테스트 4개 추가. 첫 실행에서 2개 실패 → 모두 테스트 코드 버그. 8.1 / 8.2 참조. production 코드 변경 사항은 그대로 유지.
2026-05-28ny테스트 helper config 수정 + patch lifecycle 수정 후 모든 테스트 통과. obj.ref_count_down() 명시 호출 추가 (기존 테스트 패턴과 일치). 최종: 7 passed (기존 3 + 신규 4), 26 skipped (Rust 확장 의존). ruff check / ruff format 통과.
2026-06-01ny코드 리뷰 (cr-93403c0f) 의 Tier 1 항목 본 PR 안에서 처리. (1) F2 — exists_many(lock=False) 호출을 _put_lock 안으로 이동시켜 dedup TOCTOU 윈도우 제거. (2) F1+F3 — ref_count_up 루프 + run_coroutine_threadsafe dispatch 를 try/except 로 감싸 부분 진행 보상 (성공한 ref_count 만 ref_count_down + _put_tasks 일괄 discard + 미-dispatch 코루틴 coro.close()). 신규 테스트 2개 (dispatch failure / ref_count_up failure 보상). 최종: 9 passed (기존 7 + 신규 2), 26 skipped. ruff check / ruff format 통과.

8. 구현 중 발견된 이슈

8.1 helper config 실수 — slot_bytes 너무 작음

증상: 첫 신규 테스트 실행 시

RawBlockCore write failed for ... :
RawBlockCore payload 65536 exceeds slot capacity 4096

원인: _make_legacy_backend helper 에서 rust_raw_block.slot_bytes=8192, header_bytes=4096 로 두면 payload capacity = slot - header = 4096 bytes. 하지만 테스트가 사용하는 KV chunk 는 [2,16,8,128] × bfloat16 = 64KB → 초과.

해결: slot_bytes 명시를 제거하여 local_cpu_backend.get_full_chunk_size_bytes() 로 자동 계산되도록 두었다 (rust_raw_block_backend._build_core_config 의 default 동작). 기존 production 테스트들이 모두 이 default 패턴을 사용 → helper 도 동일 패턴으로 통일.

의미: production 코드 변경 사항과 무관. 테스트 helper 의 config 실수.

8.2 테스트의 patch.object 범위 오류

증상:

recorded_specs == [] # 호출 시점에 patch unwound

mock 된 put_many 가 호출되지 않음에도 backend.contains(fresh_key) is True 는 통과 → 진짜 put_many 가 patch 가 풀린 뒤에 실행되고 있음.

원인: batched_submit_put_taskasyncio.run_coroutine_threadsafe 로 coroutine 을 큐잉하고 즉시 반환. 실제 asyncio.to_thread(self._core.put_many, ...) 는 별도 thread 에서 나중에 호출. 테스트가 with patch.object(...) 블록 밖에서 futs[0].result(timeout=10) 을 기다리도록 작성되어, patch 가 이미 unwind 된 후 worker thread 가 실제 호출을 수행했다.

해결: with patch.object(...) 블록 안에 result(timeout=10) 까지 포함. 다른 신규 테스트 (partial_failure) 도 같은 구조였으므로 함께 수정.

의미: production 코드 시맨틱과 무관. 테스트의 patch 수명 관리 오류. 이 이슈가 발생한 이유는 새 구조가 "단일 future가 batch 전체 완료를 표현" 하기 때문 — 기존의 per-key future 대비 await 위치가 한 단계 지연된다. 향후 mock 기반 테스트 작성 시 동일 패턴 주의 필요.