L2 — batched_submit_put_task 1-key 분해 → 진짜 배치
한 줄 요약: legacy
RustRawBlockBackend.batched_submit_put_task가 N개 key를 N개 coroutine으로 분해하고 각 coroutine이core.put_many([1])을 호출하는 구조를, 사전 dedup → 단일put_many(specs, objs)→ 결과 비트맵에 따라 콜백/cleanup 으로 재구성하여 배치 이점을 회복한다.
1. 변경 배경
1.1 현재 구조 — N개 coroutine으로 분해
rust_raw_block_backend.py:347-407
def batched_submit_put_task(self, keys, objs, ...):
futures = []
for key, obj in zip(keys, objs, strict=False):
with self._put_lock: # per-key lock #1
if key in self._put_tasks:
continue
self._put_tasks.add(key)
spec = encode_legacy_key(key)
exists = self._core.contains_key(spec.encoded, lock=False) \
or self._core.exists_inflight(spec.encoded) # per-key core lock × 2
if exists:
with self._put_lock: # per-key lock #2
self._put_tasks.discard(key)
continue
obj.ref_count_up()
fut = asyncio.run_coroutine_threadsafe(
self._submit_put_one(key, spec, obj, on_complete_callback),
loop,
) # per-key event-loop hop
futures.append(fut)
return futures or None
async def _submit_put_one(self, key, spec, memory_obj, on_complete_callback):
try:
put_result = await asyncio.to_thread(
self._core.put_many, [spec], [memory_obj], # ← N=1 호출!
)
if not put_result.results or not put_result.results[0]:
raise RuntimeError(...)
if on_complete_callback is not None:
try: on_complete_callback(key)
except Exception as e: logger.warning(...)
finally:
memory_obj.ref_count_down()
with self._put_lock: # per-key lock #3
self._put_tasks.discard(key)
1.2 비용 구조
key 1개 처리 시:
| 단계 | 비용 |
|---|---|
_put_lock 획득 | 2~3회 (add → fail-cleanup or finally-discard) |
_core.contains_key + exists_inflight | core _lock 2회 |
asyncio.run_coroutine_threadsafe | 이벤트루프 큐 enqueue 1회 |
asyncio.to_thread | thread pool dispatch 1회 |
core.put_many([1], [1]) | core 내부 4회 lock (L1 미적용 시) |
_put_lock finally discard | 1회 |
N=100 배치:
_put_lock: 200300회 (현재) → 24회 (제안)- 이벤트루프 hop: 100회 (현재) → 1회 (제안)
- thread pool dispatch: 100회 (현재) → 1회 (제안)
- core
put_many호출: 100회 × N=1 (현재) → 1회 × N=100 (제안)
1.3 왜 이렇게 짜였는가 (역사적 추정)
- legacy backend가 본래 key 단위
submit_put_task만 갖고 있었고,batched_submit_put_task는 이름만 배치인 어댑터로 추가된 것으로 보인다. core.put_many가 진짜 배치 시맨틱을 갖게 된 시점에 호출부 쪽이 함께 업그레이드되지 못함.- 결과적으로 인터페이스(public)는 배치인데 구현은 1-key fan-out.
1.4 L1과의 직교성
L1은 core.py 내부 (put_many + _write_one) 의 lock 흡수 작업이고,
L2는 rust_raw_block_backend.py 의 호출부 재구성이다. 수정 파일이 다르고,
L2가 호출하는 core.put_many 의 시그니처는 변하지 않으므로 충돌 없음.
두 변경의 효과는 곱셈으로 합쳐진다 (L2가 호출 횟수를 줄이고, L1이 호출 1회의
lock 횟수를 줄임).
2. 검토 내용
2.1 dedup 시맨틱 보존
현재는 다음 3종 dedup이 key별로 직렬 수행된다:
_put_tasksset 멤버십 (in-flight put)core.contains_key(이미 인덱스에 있음)core.exists_inflight(다른 경로의 inflight)
배치형으로 묶을 때 사전 필터링 단계 에서 동일 검사를 일괄 수행한다.
필터링 결과로 살아남은 (key, spec, obj) 쌍들만 put_many 에 전달한다.
| 검사 | 현재 | 제안 |
|---|---|---|
_put_tasks 멤버십 | per-key 직렬 | _put_lock 한 번 잡고 batch add (이미 있는 것 skip) |
core.contains_key | per-key | core.exists_many(encoded_keys, lock=False) 1회 |
core.exists_inflight | per-key | core 측에 batch API 추가 필요 (보류 — 아래 2.5 참조) |
2.2 ref_count 정합성
현재 보장사항:
- 살아남은 obj 만
ref_count_up() - coroutine
_submit_put_onefinally 에서 반드시ref_count_down() - coroutine 진입 실패 시(
loop is None) 즉시ref_count_down()후 raise
배치형에서 유지해야 할 invariant:
"모든
ref_count_up()된 obj 는,put_many의 결과 (성공/실패) 와 무관하게 정확히 한 번ref_count_down()된다."
→ 배치 coroutine 의 finally 에서 살아남은 obj 리스트 전체에 대해 일괄
ref_count_down(). put_many 가 도중에 예외를 던져도 finally 가 보장.
2.3 on_complete_callback 시점
현재: 각 key 별 coroutine 이 put_many([1]) 성공 직후 그 key 의 콜백
호출. 부분 성공이라는 개념 없음 (한 key 단위가 batch 전체).
제안: put_many(specs, objs).results 는 list[bool] — 정확히 어느 key 가
성공했는지 비트맵으로 반환된다. 비트맵을 순회하며:
results[i] == True→on_complete_callback(keys[i])호출results[i] == False→ 콜백 호출 안 함 (현재와 동일 시맨틱)
콜백 예외는 현재처럼 catch + warning log.
시맨틱 동등성: put_many 는 key 별 독립적으로 인덱스에 commit 하므로,
"key 가 인덱스에 들어간 후에만 콜백" 이라는 현재 invariant 가 유지된다.
2.4 Future 반환 시그니처
현재: list[Future] | None. 호출자가 future 별로 wait/cancel 가능.
제안 옵션 비교:
| 옵션 | 반환 | 호출자 호환성 | 비고 |
|---|---|---|---|
| A | 배치 1개 Future 를 N개로 복제하여 list 반환 | ✅ 호출자 변경 불필요 | 시맨틱: "한 future 이 done 이면 N개 모두 done" |
| B | 배치 결과를 N개 per-key Future 로 fan-out | ✅ 시맨틱 동일 | 각 future 가 results[i] 를 결과로 가짐 |
| C | list 길이 1 짜리 future 반환 | ❌ 호출자 깨짐 | 비추 |
호출자 검토 (cache_engine.py:1235, storage_manager.py:428):
- storage_manager 는 반환 future 를 사용하지 않음 (fire-and-forget)
- cache_engine.move_kv 는
async_batched_submit_put_task만 사용 — 별도 경로 - 테스트 (test_rust_raw_block_backend.py:1036)
는
futs[0].result(timeout=10)패턴이 다수 → list 의 첫 요소 만 수집 하는 패턴이라 옵션 A 가 정확히 호환
→ 옵션 A 채택 (가장 작은 표면적). 옵션 B 는 후속 PR 로 분리 가능.
2.5 exists_inflight 배치 API
현재 core.exists_inflight(encoded_key) -> bool 만 존재.
선택지:
- (a) batch API
core.exists_inflight_many(keys) -> list[bool]신설 - (b) L2 단계에서는 per-key 호출 유지 (lock N회) — 단, contains_key 는
exists_many로 batch 화 - (c) 사전 필터링 단계에서
_inflight검사를 생략 (race window 영향 검토 필요)
판단: (b) 채택. 이유:
exists_inflight는 Python 측 dict membership 검사 1회 → 매우 저렴- core API 표면 확장은 별도 PR 로 분리하는 게 리뷰 단위 측면에서 깔끔
- 현재 측정 가능한 병목은
contains_key(Rust call + index lock) 와 이벤트루프/thread pool overhead 임
후속으로 (a) 가 필요해지면 별도 항목으로 트래킹.
2.6 _put_lock contention
현재: per-key 진입/이탈마다 lock — close() 의 polling 과 exists_in_put_tasks
가 매 key 마다 짧게 봉쇄됨.
제안: 사전 필터링 단계에서 _put_lock 한 번 잡고 batch add. coroutine
finally 에서 한 번 잡고 batch discard. → lock 횟수 200N → 2~4 (배치당).
lock holding time 은 약간 늘어나지만 (N개 set add) , Python set 의 add 는 ~50ns 수준이라 N=100 에서도 수 μs 미만. close() polling 의 10ms 주기에 비해 무시 가능.
2.7 부분 실패 처리
put_many 결과 비트맵에서 일부 False 인 경우:
- 현재: 각 key 별 coroutine 이 RuntimeError 를 raise → future 가 exception 으로
done. 호출자가
fut.result()에서 예외 받음. - 제안: 비트맵을 보고 실패 key 들은:
- log.error 로 보고 (key 식별자 포함)
- 콜백 호출 안 함
- future 자체는 정상 완료 (성공한 key 들은 정상 처리됨)
→ 시맨틱이 살짝 다름. 현재는 "특정 key 가 실패하면 그 key 의 future 가 exception" / 제안은 "future 는 항상 정상, 실패 key 는 log 로만 보고".
호출자가 future 에서 exception 을 잡는 패턴이 있는지 확인 필요:
- storage_manager.py: future 사용 안 함
- 테스트:
futs[0].result(timeout=10)→ timeout 만 잡고 exception 은 propagate
→ 테스트 호환을 위해서는 "전체 N 개 모두 실패 시에만 future 에 RuntimeError" 설정하는 절충안이 가능. 그러나 partial failure 가 현실에서 거의 발생하지 않는 (slot 부족, I/O 실패 등은 N개 동시 발생) 점을 감안하면 단순화 안전.
→ 결정: 비트맵 전부 False 일 때만 future 에 exception 설정. 그 외에는 log + callback skip 로 graceful 처리. (현재 시맨틱과 가장 가까운 절충안)
2.8 검토에서 배제한 항목
- core 측 batch API 신설 (exists_inflight_many 등): 별도 PR
- legacy
_submit_put_one완전 삭제: 이번 PR 에서 제거 (호출부 없어짐) - option B (per-key future fan-out): 호출자가 per-key 완료를 진짜로 필요로 하는 시점에 별도 PR
pin/contains경로 batching (T2 항목): L2 와 별도
3. 효과
3.1 정량 효과 (예상)
| 항목 | N=10 | N=100 | 단위 |
|---|---|---|---|
_put_lock 획득 | 30 → 2 | 300 → 2 | 회 |
core._lock (dedup 단계) | 20 → 1 | 200 → 1 | 회 |
| 이벤트루프 hop | 10 → 1 | 100 → 1 | 회 |
asyncio.to_thread dispatch | 10 → 1 | 100 → 1 | 회 |
core.put_many 호출 | 10 (각 N=1) | 100 (각 N=1) | 호출 |
| 위 호출 내부 lock (L1 미적용) | 40 | 400 | 회 |
| 위 호출 내부 lock (L1 적용) | 20 | 200 | 회 |
| 배치형 + L1 put 내부 lock | 2 | 2 | 회 |
→ N=100 기준: dispatch overhead ~99% 감소, lock 총횟수 ~99% 감소 (L1 결합 시).
3.2 정성 효과
- legacy 와 MP 경로 (이미 batched) 가 같은
core.put_many시맨틱을 직접 사용 → 코드 일관성 - 인터페이스 이름 (
batched_submit_put_task) 과 구현 시맨틱 일치 - io_uring SQ 활용률 잠재적 개선 (현재 동시 SQE = num_store_workers ≈ 2~4 →
배치 1회로 N개 SQE 가능. 단, 실효 활용은 Rust 측
pwrite_batch미존재로 현재는 직렬 — L3 별도 항목)
3.3 비효과
core.put_many시그니처/시맨틱 — 미변경- on-disk format — 미변경
- public interface (
batched_submit_put_task시그니처) — 미변경 - T1 (
delete()TOCTOU 수정) — 영향 없음 (다른 메서드) - L1 (
put_many내부 lock) — 호출 횟수만 줄임. L1 효과는 그대로 유효.
4. 변경점 (계획)
4.1 batched_submit_put_task 재구성
def batched_submit_put_task(self, keys, objs, transfer_spec=None,
on_complete_callback=None):
del transfer_spec
if not keys:
return None
loop = self.loop
if loop is None:
raise RuntimeError("RustRawBlockBackend requires an asyncio event loop")
# --- 사전 필터링: dedup + spec encode (single _put_lock window) ---
accepted_keys: list[CacheEngineKey] = []
accepted_specs: list[RawBlockKeySpec] = []
accepted_objs: list[MemoryObj] = []
specs = [encode_legacy_key(key) for key in keys]
encoded_keys = [spec.encoded for spec in specs]
indexed_bitmap = self._core.exists_many(encoded_keys, lock=False)
with self._put_lock:
for i, key in enumerate(keys):
if key in self._put_tasks:
continue
if indexed_bitmap[i]:
continue
if self._core.exists_inflight(encoded_keys[i]):
continue
self._put_tasks.add(key)
accepted_keys.append(key)
accepted_specs.append(specs[i])
accepted_objs.append(objs[i])
if not accepted_keys:
return None
for obj in accepted_objs:
obj.ref_count_up()
fut = asyncio.run_coroutine_threadsafe(
self._submit_put_batch(
accepted_keys, accepted_specs, accepted_objs, on_complete_callback,
),
loop,
)
# 옵션 A: 호출자가 list[Future] 를 기대하므로 동일 future 를 N개 위치에 복제
return [fut] * len(accepted_keys)
4.2 _submit_put_batch (신규, _submit_put_one 대체)
async def _submit_put_batch(self, keys, specs, objs, on_complete_callback):
try:
put_result = await asyncio.to_thread(
self._core.put_many, specs, objs,
)
results = put_result.results
if not any(results):
raise RuntimeError(
f"Failed to persist {len(keys)} raw-block keys "
f"(first encoded: {specs[0].encoded})"
)
if on_complete_callback is not None:
for key, ok in zip(keys, results, strict=True):
if not ok:
continue
try:
on_complete_callback(key)
except Exception as e:
logger.warning(
"on_complete_callback failed for key %s: %s", key, e,
)
# 부분 실패 로깅
for key, ok in zip(keys, results, strict=True):
if not ok:
logger.warning(
"RustRawBlockBackend: put failed for key %s", key,
)
finally:
for obj in objs:
obj.ref_count_down()
with self._put_lock:
for key in keys:
self._put_tasks.discard(key)
4.3 _submit_put_one 제거
호출자 없음 → 삭제. 관련 docstring/주석 정리.
4.4 docstring/comment
batched_submit_put_task 에 다음 invariant 명시:
- 반환 list 의 모든 항목은 동일 Future 객체를 가리킴 (옵션 A)
- 부분 실패 시 future 는 정상 완료, 실패 key 는 log 로만 보고
- 전부 실패 시 future 가 RuntimeError 로 done
4.5 테스트
- 기존 test_rust_raw_block_backend.py 회귀 통과 (가장 중요)
- 신규:
test_batched_put_dedup_inflight_and_indexed: keys 일부가 이미 인덱스에 있고 일부는 inflight 일 때, 사전 필터링이 정확히 잔여 key 만 submit 하는지test_batched_put_partial_failure_does_not_break_others:core.put_many를 mock 하여 일부 False 반환 시, 성공 key 의 callback 만 호출되는지test_batched_put_returns_same_future_per_key: 옵션 A 시맨틱 검증test_batched_put_ref_count_balanced_on_exception:put_many가 raise 해도 모든 obj 의 ref_count 가 net 0 이 되는지
5. 개선 포인트 확인 방법 (수치 증명)
5.1 dispatch overhead 측정
# 변경 전/후 각각:
# - 100개 obj 준비
# - perf_counter 로 batched_submit_put_task 호출~모든 future done 까지 elapsed
# - run_coroutine_threadsafe 호출 횟수 (mock 으로 카운트)
기대치: dispatch elapsed N=100 기준 변경 후 1/10~1/50 (event loop 큐 동기화 overhead 가 main 비용).
5.2 lock contention 측정
# section 4.1 신설 sub: CountingLock 으로 _put_lock 을 swap
# batched_submit_put_task(keys=[100], objs=[100]) 1회
# assert acquire_count <= 4 (변경 후) / >= 200 (변경 전)
5.3 throughput 측정 (실제 raw device 또는 io_uring tmp file)
# - _has_ext() 가능한 환경에서만
# - N=1, 10, 100, 1000 배치 크기별 put 완료까지 wall clock
# - 변경 전/후 비교
기대치: N 이 클수록 변경 후 throughput 우위 (~N배 까지)
5.4 정합성
pytest tests/v1/storage_backend/test_rust_raw_block_backend.py -v
pytest tests/v1/storage_backend/test_storage_plugin.py -v
6. 위험 / 롤백
6.1 위험 항목
| 위험 | 가능성 | 영향 | 완화 |
|---|---|---|---|
| 옵션 A 시맨틱 변화로 호출자 깨짐 | 낮음 | 일부 테스트 실패 | 호출자 grep 결과 storage_manager/cache_engine 에서 future 사용 안 함 확인 완료 |
| 부분 실패 → future 정상 완료로 인한 silent loss | 중간 | 일부 key 영구 미저장 | log.warning 명시 + 추후 metric 추가 검토 |
사전 필터링 단계의 exists_many 와 exists_inflight 간 race window | 낮음 | 중복 put 시도 | core.put_many 가 이미 idempotent (_index / _inflight 재검사) |
_submit_put_batch 의 finally 누수 | 낮음 | ref_count 영구 +1 | 5.4 + ref_count 테스트 (5.x 항목) |
| L1 미적용 환경에서 batch 1회 lock holding 길어짐 | 낮음 | exists_many 등 대기 ↑ | L1 동시 진행 권장. 미적용 시에도 200 → 2N 으로 횟수 줄어듦 |
6.2 롤백
변경이 단일 파일/3개 메서드에 국한. PR 단위 revert 충분. on-disk format/공개 인터페이스 시그니처 미변경 → 운영 중 롤백 시 추가 마이그레이션 불필요.
6.3 머지 순서 의존성
- T1 (
9fc5a901) 가 베이스. T1 머지 후 rebase 시 자동 정합 (T1 은remove()1줄, L2 는batched_submit_put_task— 다른 메서드) - L1 과는 파일이 다름 → 무관
- 권장 머지 순서: T1 → L1 → L2 (또는 T1 → L2 → L1, 순서 무관)
7. 변경 로그
| 일자 | 작성자 | 내용 |
|---|---|---|
| 2026-05-28 | ny | 초안 작성. 옵션 A 채택. 부분 실패는 graceful + 전부 실패 시 raise 절충안 결정. 구현 미착수. |
| 2026-05-28 | ny | 구현 완료 (rust_raw_block_backend.py). _submit_put_one → _submit_put_batch 로 교체. 기존 test_rust_raw_block_backend.py:1044 의 match="Failed to persist raw-block key" → match="Failed to persist raw-block" 로 단축 (새 메시지 Failed to persist raw-block batch of N keys와 호환). |
| 2026-05-28 | ny | 신규 테스트 4개 추가. 첫 실행에서 2개 실패 → 모두 테스트 코드 버그. 8.1 / 8.2 참조. production 코드 변경 사항은 그대로 유지. |
| 2026-05-28 | ny | 테스트 helper config 수정 + patch lifecycle 수정 후 모든 테스트 통과. obj.ref_count_down() 명시 호출 추가 (기존 테스트 패턴과 일치). 최종: 7 passed (기존 3 + 신규 4), 26 skipped (Rust 확장 의존). ruff check / ruff format 통과. |
| 2026-06-01 | ny | 코드 리뷰 (cr-93403c0f) 의 Tier 1 항목 본 PR 안에서 처리. (1) F2 — exists_many(lock=False) 호출을 _put_lock 안으로 이동시켜 dedup TOCTOU 윈도우 제거. (2) F1+F3 — ref_count_up 루프 + run_coroutine_threadsafe dispatch 를 try/except 로 감싸 부분 진행 보상 (성공한 ref_count 만 ref_count_down + _put_tasks 일괄 discard + 미-dispatch 코루틴 coro.close()). 신규 테스트 2개 (dispatch failure / ref_count_up failure 보상). 최종: 9 passed (기존 7 + 신규 2), 26 skipped. ruff check / ruff format 통과. |
8. 구현 중 발견된 이슈
8.1 helper config 실수 — slot_bytes 너무 작음
증상: 첫 신규 테스트 실행 시
RawBlockCore write failed for ... :
RawBlockCore payload 65536 exceeds slot capacity 4096
원인: _make_legacy_backend helper 에서 rust_raw_block.slot_bytes=8192,
header_bytes=4096 로 두면 payload capacity = slot - header = 4096 bytes.
하지만 테스트가 사용하는 KV chunk 는 [2,16,8,128] × bfloat16 = 64KB →
초과.
해결: slot_bytes 명시를 제거하여 local_cpu_backend.get_full_chunk_size_bytes()
로 자동 계산되도록 두었다 (rust_raw_block_backend._build_core_config 의
default 동작). 기존 production 테스트들이 모두 이 default 패턴을 사용 →
helper 도 동일 패턴으로 통일.
의미: production 코드 변경 사항과 무관. 테스트 helper 의 config 실수.
8.2 테스트의 patch.object 범위 오류
증상:
recorded_specs == [] # 호출 시점에 patch unwound
mock 된 put_many 가 호출되지 않음에도 backend.contains(fresh_key) is True
는 통과 → 진짜 put_many 가 patch 가 풀린 뒤에 실행되고 있음.
원인: batched_submit_put_task 가 asyncio.run_coroutine_threadsafe 로
coroutine 을 큐잉하고 즉시 반환. 실제 asyncio.to_thread(self._core.put_many, ...)
는 별도 thread 에서 나중에 호출. 테스트가 with patch.object(...) 블록
밖에서 futs[0].result(timeout=10) 을 기다리도록 작성되어, patch 가
이미 unwind 된 후 worker thread 가 실제 호출을 수행했다.
해결: with patch.object(...) 블록 안에 result(timeout=10) 까지 포함.
다른 신규 테스트 (partial_failure) 도 같은 구조였으므로 함께 수정.
의미: production 코드 시맨틱과 무관. 테스트의 patch 수명 관리 오류. 이 이슈가 발생한 이유는 새 구조가 "단일 future가 batch 전체 완료를 표현" 하기 때문 — 기존의 per-key future 대비 await 위치가 한 단계 지연된다. 향후 mock 기반 테스트 작성 시 동일 패턴 주의 필요.