๋ณธ๋ฌธ์œผ๋กœ ๊ฑด๋„ˆ๋›ฐ๊ธฐ

L2 โ€” batched_submit_put_task correctness fix (TOCTOU + resource leak)

๐Ÿšซ 2026-06-10 ํ๊ธฐ ๊ฒฐ์ • โ€” P0๋กœ ํก์ˆ˜, ๋‹จ๋… PR ์•ˆ ํ•จ. ๋ธŒ๋žœ์น˜ fix/rawblock-batched-put-toctou(8860f132) ์‚ญ์ œ๋จ.

ํก์ˆ˜ ์ด์œ  (๋ฆฌ๋ทฐ ๊ฒฐ๋ก ):

  1. "dedup TOCTOU"๋Š” ์ง„์งœ correctness ๋ฒ„๊ทธ๊ฐ€ ์•„๋‹ˆ๋‹ค. dev์˜ core.put_many๊ฐ€ ์ด๋ฏธ ๋ฉฑ๋“ฑ โ€” _lock ์•ˆ์—์„œ if key in _index: ์„ฑ๊ณต ๋ณด๊ณ  / if key in _inflight: skip๋กœ ์žฌ๊ฒ€์‚ฌํ•˜๋ฏ€๋กœ, plugin ์‚ฌ์ „ dedup(contains_key/exists_inflight)์ด staleํ•ด๋„ ์ค‘๋ณต write๊ฐ€ ์ ˆ๋Œ€ ๋ฐœ์ƒํ•˜์ง€ ์•Š๋Š”๋‹ค. plugin dedup์€ "์ด๋ฏธ ์žˆ๋Š” ํ‚ค์— ์ฝ”๋ฃจํ‹ด dispatch ๋‚ญ๋น„๋ฅผ ์ค„์ด๋Š” best-effort ์ตœ์ ํ™”"์ผ ๋ฟ. ๊ฒŒ๋‹ค๊ฐ€ _put_lock์€ _put_tasks๋งŒ ๋ณดํ˜ธํ•˜์ง€ core์˜ _index/_inflight๋ฅผ ๋ณดํ˜ธํ•˜์ง€ ์•Š์•„, exists_many๋ฅผ ๋ฝ ์•ˆ์œผ๋กœ ์˜ฎ๊ฒจ๋„ core ์ƒํƒœ race๋ฅผ ๋ง‰์„ ์ˆ˜ ์—†๋‹ค(๋ง‰์„ ๊ฒŒ ์—†๋‹ค). ๋™์ผ ์ธ์Šคํ„ด์Šค ์ค‘๋ณต submit์€ _put_tasks(๋ฝ ์•ˆ add)๊ฐ€ ์ด๋ฏธ ๋ง‰๋Š”๋‹ค. โ†’ ์ด ๋ณ€๊ฒฝ์˜ ์‹ค์ฒด๋Š” **batched exists_many
    • ๋ฝ ํš๋“ 2~3Nโ†’2ํšŒ ๋งˆ์ดํฌ๋กœ ์ตœ์ ํ™”**.
  2. ref ๋ˆ„์ˆ˜ ๋กค๋ฐฑ(โ‘ก)์€ ์ง„์งœ์ง€๋งŒ ๋งค์šฐ ์ข๋‹ค โ€” run_coroutine_threadsafe๊ฐ€ raiseํ•˜๋Š” ์…ง๋‹ค์šด ๋ ˆ์ด์Šค ํ•œ์ •. ์ •์ƒ ์šด์˜ ์ค‘ ๋ฏธ๋ฐœ์ƒ. info/minor ์ˆ˜์ค€.
  3. โ‘ โ‘ก๋Š” ๋‘˜ ๋‹ค "๋ฐฐ์น˜ dispatch" ๋งฅ๋ฝ์—์„œ๋งŒ ๊ฐ’์ด ์‚ด์•„๋‚œ๋‹ค(N-object ๋กค๋ฐฑ ๋“ฑ). ๊ทธ ๋งฅ๋ฝ์ด ๋ฐ”๋กœ P0๋‹ค. dev ์œ„ ๋‹จ๋… correctness PR๋กœ๋Š” ๊ฐ„ํŒ๊ฐ’์„ ๋ชป ํ•˜๊ณ  reviewer ๋ฐ˜๋ฐ•๋งŒ ๋ถ€๋ฅธ๋‹ค.

โ†’ P0(perf/rawblock-put-many-batch-io)๊ฐ€ โ‘ (batched dedup)ยทโ‘ก(ref ๋กค๋ฐฑ)๋ฅผ ์ด๋ฏธ ํฌํ•จํ•œ๋‹ค (P0์˜ batched_submit_put_task = dev + โ‘ โ‘ก + dispatch batching). L2๋Š” P0์˜ ๋ถ€๋ถ„์ง‘ํ•ฉ์ด๋ผ ๋”ฐ๋กœ ์˜ฎ๊ธธ ์ฝ”๋“œ ์—†์Œ. ์•„๋ž˜ ยง1~ยง9๋Š” ๋ถ„๋ฆฌ ์‹œ๋„ ๋‹น์‹œ์˜ ๋ถ„์„ ๊ธฐ๋ก์œผ๋กœ ๋ณด์กด.


1. ๋ณ€๊ฒฝ ๋ฐฐ๊ฒฝโ€‹

1.1 ํ˜„์žฌ ๊ตฌ์กฐ โ€” N๊ฐœ coroutine์œผ๋กœ ๋ถ„ํ•ดโ€‹

rust_raw_block_backend.py:347-407

def batched_submit_put_task(self, keys, objs, ...):
futures = []
for key, obj in zip(keys, objs, strict=False):
with self._put_lock: # per-key lock #1
if key in self._put_tasks:
continue
self._put_tasks.add(key)

spec = encode_legacy_key(key)
exists = self._core.contains_key(spec.encoded, lock=False) \
or self._core.exists_inflight(spec.encoded) # per-key core lock ร— 2
if exists:
with self._put_lock: # per-key lock #2
self._put_tasks.discard(key)
continue

obj.ref_count_up()
fut = asyncio.run_coroutine_threadsafe(
self._submit_put_one(key, spec, obj, on_complete_callback),
loop,
) # per-key event-loop hop
futures.append(fut)
return futures or None

async def _submit_put_one(self, key, spec, memory_obj, on_complete_callback):
try:
put_result = await asyncio.to_thread(
self._core.put_many, [spec], [memory_obj], # โ† N=1 ํ˜ธ์ถœ!
)
if not put_result.results or not put_result.results[0]:
raise RuntimeError(...)
if on_complete_callback is not None:
try: on_complete_callback(key)
except Exception as e: logger.warning(...)
finally:
memory_obj.ref_count_down()
with self._put_lock: # per-key lock #3
self._put_tasks.discard(key)

1.2 ๋น„์šฉ ๊ตฌ์กฐโ€‹

key 1๊ฐœ ์ฒ˜๋ฆฌ ์‹œ:

๋‹จ๊ณ„๋น„์šฉ
_put_lock ํš๋“2~3ํšŒ (add โ†’ fail-cleanup or finally-discard)
_core.contains_key + exists_inflightcore _lock 2ํšŒ
asyncio.run_coroutine_threadsafe์ด๋ฒคํŠธ๋ฃจํ”„ ํ enqueue 1ํšŒ
asyncio.to_threadthread pool dispatch 1ํšŒ
core.put_many([1], [1])core ๋‚ด๋ถ€ 4ํšŒ lock (L1 ๋ฏธ์ ์šฉ ์‹œ)
_put_lock finally discard1ํšŒ

N=100 ๋ฐฐ์น˜:

  • _put_lock: 200300ํšŒ (ํ˜„์žฌ) โ†’ 24ํšŒ (์ œ์•ˆ)
  • ์ด๋ฒคํŠธ๋ฃจํ”„ hop: 100ํšŒ (ํ˜„์žฌ) โ†’ 1ํšŒ (์ œ์•ˆ)
  • thread pool dispatch: 100ํšŒ (ํ˜„์žฌ) โ†’ 1ํšŒ (์ œ์•ˆ)
  • core put_many ํ˜ธ์ถœ: 100ํšŒ ร— N=1 (ํ˜„์žฌ) โ†’ 1ํšŒ ร— N=100 (์ œ์•ˆ)

1.3 ์™œ ์ด๋ ‡๊ฒŒ ์งœ์˜€๋Š”๊ฐ€ (์—ญ์‚ฌ์  ์ถ”์ •)โ€‹

  • legacy backend๊ฐ€ ๋ณธ๋ž˜ key ๋‹จ์œ„ submit_put_task ๋งŒ ๊ฐ–๊ณ  ์žˆ์—ˆ๊ณ , batched_submit_put_task ๋Š” ์ด๋ฆ„๋งŒ ๋ฐฐ์น˜์ธ ์–ด๋Œ‘ํ„ฐ๋กœ ์ถ”๊ฐ€๋œ ๊ฒƒ์œผ๋กœ ๋ณด์ธ๋‹ค.
  • core.put_many ๊ฐ€ ์ง„์งœ ๋ฐฐ์น˜ ์‹œ๋งจํ‹ฑ์„ ๊ฐ–๊ฒŒ ๋œ ์‹œ์ ์— ํ˜ธ์ถœ๋ถ€ ์ชฝ์ด ํ•จ๊ป˜ ์—…๊ทธ๋ ˆ์ด๋“œ๋˜์ง€ ๋ชปํ•จ.
  • ๊ฒฐ๊ณผ์ ์œผ๋กœ ์ธํ„ฐํŽ˜์ด์Šค(public)๋Š” ๋ฐฐ์น˜์ธ๋ฐ ๊ตฌํ˜„์€ 1-key fan-out.

1.4 L1๊ณผ์˜ ์ง๊ต์„ฑโ€‹

L1์€ core.py ๋‚ด๋ถ€ (put_many + _write_one) ์˜ lock ํก์ˆ˜ ์ž‘์—…์ด๊ณ , L2๋Š” rust_raw_block_backend.py ์˜ ํ˜ธ์ถœ๋ถ€ ์žฌ๊ตฌ์„ฑ์ด๋‹ค. ์ˆ˜์ • ํŒŒ์ผ์ด ๋‹ค๋ฅด๊ณ , L2๊ฐ€ ํ˜ธ์ถœํ•˜๋Š” core.put_many ์˜ ์‹œ๊ทธ๋‹ˆ์ฒ˜๋Š” ๋ณ€ํ•˜์ง€ ์•Š์œผ๋ฏ€๋กœ ์ถฉ๋Œ ์—†์Œ. ๋‘ ๋ณ€๊ฒฝ์˜ ํšจ๊ณผ๋Š” ๊ณฑ์…ˆ์œผ๋กœ ํ•ฉ์ณ์ง„๋‹ค (L2๊ฐ€ ํ˜ธ์ถœ ํšŸ์ˆ˜๋ฅผ ์ค„์ด๊ณ , L1์ด ํ˜ธ์ถœ 1ํšŒ์˜ lock ํšŸ์ˆ˜๋ฅผ ์ค„์ž„).


2. ๊ฒ€ํ†  ๋‚ด์šฉโ€‹

2.1 dedup ์‹œ๋งจํ‹ฑ ๋ณด์กดโ€‹

ํ˜„์žฌ๋Š” ๋‹ค์Œ 3์ข… dedup์ด key๋ณ„๋กœ ์ง๋ ฌ ์ˆ˜ํ–‰๋œ๋‹ค:

  1. _put_tasks set ๋ฉค๋ฒ„์‹ญ (in-flight put)
  2. core.contains_key (์ด๋ฏธ ์ธ๋ฑ์Šค์— ์žˆ์Œ)
  3. core.exists_inflight (๋‹ค๋ฅธ ๊ฒฝ๋กœ์˜ inflight)

๋ฐฐ์น˜ํ˜•์œผ๋กœ ๋ฌถ์„ ๋•Œ ์‚ฌ์ „ ํ•„ํ„ฐ๋ง ๋‹จ๊ณ„ ์—์„œ ๋™์ผ ๊ฒ€์‚ฌ๋ฅผ ์ผ๊ด„ ์ˆ˜ํ–‰ํ•œ๋‹ค. ํ•„ํ„ฐ๋ง ๊ฒฐ๊ณผ๋กœ ์‚ด์•„๋‚จ์€ (key, spec, obj) ์Œ๋“ค๋งŒ put_many ์— ์ „๋‹ฌํ•œ๋‹ค.

๊ฒ€์‚ฌํ˜„์žฌ์ œ์•ˆ
_put_tasks ๋ฉค๋ฒ„์‹ญper-key ์ง๋ ฌ_put_lock ํ•œ ๋ฒˆ ์žก๊ณ  batch add (์ด๋ฏธ ์žˆ๋Š” ๊ฒƒ skip)
core.contains_keyper-keycore.exists_many(encoded_keys, lock=False) 1ํšŒ
core.exists_inflightper-keycore ์ธก์— batch API ์ถ”๊ฐ€ ํ•„์š” (๋ณด๋ฅ˜ โ€” ์•„๋ž˜ 2.5 ์ฐธ์กฐ)

2.2 ref_count ์ •ํ•ฉ์„ฑโ€‹

ํ˜„์žฌ ๋ณด์žฅ์‚ฌํ•ญ:

  • ์‚ด์•„๋‚จ์€ obj ๋งŒ ref_count_up()
  • coroutine _submit_put_one finally ์—์„œ ๋ฐ˜๋“œ์‹œ ref_count_down()
  • coroutine ์ง„์ž… ์‹คํŒจ ์‹œ(loop is None) ์ฆ‰์‹œ ref_count_down() ํ›„ raise

๋ฐฐ์น˜ํ˜•์—์„œ ์œ ์ง€ํ•ด์•ผ ํ•  invariant:

"๋ชจ๋“  ref_count_up() ๋œ obj ๋Š”, put_many ์˜ ๊ฒฐ๊ณผ (์„ฑ๊ณต/์‹คํŒจ) ์™€ ๋ฌด๊ด€ํ•˜๊ฒŒ ์ •ํ™•ํžˆ ํ•œ ๋ฒˆ ref_count_down() ๋œ๋‹ค."

โ†’ ๋ฐฐ์น˜ coroutine ์˜ finally ์—์„œ ์‚ด์•„๋‚จ์€ obj ๋ฆฌ์ŠคํŠธ ์ „์ฒด์— ๋Œ€ํ•ด ์ผ๊ด„ ref_count_down(). put_many ๊ฐ€ ๋„์ค‘์— ์˜ˆ์™ธ๋ฅผ ๋˜์ ธ๋„ finally ๊ฐ€ ๋ณด์žฅ.

2.3 on_complete_callback ์‹œ์ โ€‹

ํ˜„์žฌ: ๊ฐ key ๋ณ„ coroutine ์ด put_many([1]) ์„ฑ๊ณต ์งํ›„ ๊ทธ key ์˜ ์ฝœ๋ฐฑ ํ˜ธ์ถœ. ๋ถ€๋ถ„ ์„ฑ๊ณต์ด๋ผ๋Š” ๊ฐœ๋… ์—†์Œ (ํ•œ key ๋‹จ์œ„๊ฐ€ batch ์ „์ฒด).

์ œ์•ˆ: put_many(specs, objs).results ๋Š” list[bool] โ€” ์ •ํ™•ํžˆ ์–ด๋А key ๊ฐ€ ์„ฑ๊ณตํ–ˆ๋Š”์ง€ ๋น„ํŠธ๋งต์œผ๋กœ ๋ฐ˜ํ™˜๋œ๋‹ค. ๋น„ํŠธ๋งต์„ ์ˆœํšŒํ•˜๋ฉฐ:

  • results[i] == True โ†’ on_complete_callback(keys[i]) ํ˜ธ์ถœ
  • results[i] == False โ†’ ์ฝœ๋ฐฑ ํ˜ธ์ถœ ์•ˆ ํ•จ (ํ˜„์žฌ์™€ ๋™์ผ ์‹œ๋งจํ‹ฑ)

์ฝœ๋ฐฑ ์˜ˆ์™ธ๋Š” ํ˜„์žฌ์ฒ˜๋Ÿผ catch + warning log.

์‹œ๋งจํ‹ฑ ๋™๋“ฑ์„ฑ: put_many ๋Š” key ๋ณ„ ๋…๋ฆฝ์ ์œผ๋กœ ์ธ๋ฑ์Šค์— commit ํ•˜๋ฏ€๋กœ, "key ๊ฐ€ ์ธ๋ฑ์Šค์— ๋“ค์–ด๊ฐ„ ํ›„์—๋งŒ ์ฝœ๋ฐฑ" ์ด๋ผ๋Š” ํ˜„์žฌ invariant ๊ฐ€ ์œ ์ง€๋œ๋‹ค.

2.4 Future ๋ฐ˜ํ™˜ ์‹œ๊ทธ๋‹ˆ์ฒ˜โ€‹

ํ˜„์žฌ: list[Future] | None. ํ˜ธ์ถœ์ž๊ฐ€ future ๋ณ„๋กœ wait/cancel ๊ฐ€๋Šฅ.

์ œ์•ˆ ์˜ต์…˜ ๋น„๊ต:

์˜ต์…˜๋ฐ˜ํ™˜ํ˜ธ์ถœ์ž ํ˜ธํ™˜์„ฑ๋น„๊ณ 
A๋ฐฐ์น˜ 1๊ฐœ Future ๋ฅผ N๊ฐœ๋กœ ๋ณต์ œํ•˜์—ฌ list ๋ฐ˜ํ™˜โœ… ํ˜ธ์ถœ์ž ๋ณ€๊ฒฝ ๋ถˆํ•„์š”์‹œ๋งจํ‹ฑ: "ํ•œ future ์ด done ์ด๋ฉด N๊ฐœ ๋ชจ๋‘ done"
B๋ฐฐ์น˜ ๊ฒฐ๊ณผ๋ฅผ N๊ฐœ per-key Future ๋กœ fan-outโœ… ์‹œ๋งจํ‹ฑ ๋™์ผ๊ฐ future ๊ฐ€ results[i] ๋ฅผ ๊ฒฐ๊ณผ๋กœ ๊ฐ€์ง
Clist ๊ธธ์ด 1 ์งœ๋ฆฌ future ๋ฐ˜ํ™˜โŒ ํ˜ธ์ถœ์ž ๊นจ์ง๋น„์ถ”

ํ˜ธ์ถœ์ž ๊ฒ€ํ†  (cache_engine.py:1235, storage_manager.py:428):

  • storage_manager ๋Š” ๋ฐ˜ํ™˜ future ๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š์Œ (fire-and-forget)
  • cache_engine.move_kv ๋Š” async_batched_submit_put_task ๋งŒ ์‚ฌ์šฉ โ€” ๋ณ„๋„ ๊ฒฝ๋กœ
  • ํ…Œ์ŠคํŠธ (test_rust_raw_block_backend.py:1036) ๋Š” futs[0].result(timeout=10) ํŒจํ„ด์ด ๋‹ค์ˆ˜ โ†’ list ์˜ ์ฒซ ์š”์†Œ ๋งŒ ์ˆ˜์ง‘ ํ•˜๋Š” ํŒจํ„ด์ด๋ผ ์˜ต์…˜ A ๊ฐ€ ์ •ํ™•ํžˆ ํ˜ธํ™˜

โ†’ ์˜ต์…˜ A ์ฑ„ํƒ (๊ฐ€์žฅ ์ž‘์€ ํ‘œ๋ฉด์ ). ์˜ต์…˜ B ๋Š” ํ›„์† PR ๋กœ ๋ถ„๋ฆฌ ๊ฐ€๋Šฅ.

2.5 exists_inflight ๋ฐฐ์น˜ APIโ€‹

ํ˜„์žฌ core.exists_inflight(encoded_key) -> bool ๋งŒ ์กด์žฌ.

์„ ํƒ์ง€:

  • (a) batch API core.exists_inflight_many(keys) -> list[bool] ์‹ ์„ค
  • (b) L2 ๋‹จ๊ณ„์—์„œ๋Š” per-key ํ˜ธ์ถœ ์œ ์ง€ (lock NํšŒ) โ€” ๋‹จ, contains_key ๋Š” exists_many ๋กœ batch ํ™”
  • (c) ์‚ฌ์ „ ํ•„ํ„ฐ๋ง ๋‹จ๊ณ„์—์„œ _inflight ๊ฒ€์‚ฌ๋ฅผ ์ƒ๋žต (race window ์˜ํ–ฅ ๊ฒ€ํ†  ํ•„์š”)

ํŒ๋‹จ: (b) ์ฑ„ํƒ. ์ด์œ :

  • exists_inflight ๋Š” Python ์ธก dict membership ๊ฒ€์‚ฌ 1ํšŒ โ†’ ๋งค์šฐ ์ €๋ ด
  • core API ํ‘œ๋ฉด ํ™•์žฅ์€ ๋ณ„๋„ PR ๋กœ ๋ถ„๋ฆฌํ•˜๋Š” ๊ฒŒ ๋ฆฌ๋ทฐ ๋‹จ์œ„ ์ธก๋ฉด์—์„œ ๊น”๋”
  • ํ˜„์žฌ ์ธก์ • ๊ฐ€๋Šฅํ•œ ๋ณ‘๋ชฉ์€ contains_key (Rust call + index lock) ์™€ ์ด๋ฒคํŠธ๋ฃจํ”„/thread pool overhead ์ž„

ํ›„์†์œผ๋กœ (a) ๊ฐ€ ํ•„์š”ํ•ด์ง€๋ฉด ๋ณ„๋„ ํ•ญ๋ชฉ์œผ๋กœ ํŠธ๋ž˜ํ‚น.

2.6 _put_lock contentionโ€‹

ํ˜„์žฌ: per-key ์ง„์ž…/์ดํƒˆ๋งˆ๋‹ค lock โ€” close() ์˜ polling ๊ณผ exists_in_put_tasks ๊ฐ€ ๋งค key ๋งˆ๋‹ค ์งง๊ฒŒ ๋ด‰์‡„๋จ.

์ œ์•ˆ: ์‚ฌ์ „ ํ•„ํ„ฐ๋ง ๋‹จ๊ณ„์—์„œ _put_lock ํ•œ ๋ฒˆ ์žก๊ณ  batch add. coroutine finally ์—์„œ ํ•œ ๋ฒˆ ์žก๊ณ  batch discard. โ†’ lock ํšŸ์ˆ˜ 200N โ†’ 2~4 (๋ฐฐ์น˜๋‹น).

lock holding time ์€ ์•ฝ๊ฐ„ ๋Š˜์–ด๋‚˜์ง€๋งŒ (N๊ฐœ set add) , Python set ์˜ add ๋Š” ~50ns ์ˆ˜์ค€์ด๋ผ N=100 ์—์„œ๋„ ์ˆ˜ ฮผs ๋ฏธ๋งŒ. close() polling ์˜ 10ms ์ฃผ๊ธฐ์— ๋น„ํ•ด ๋ฌด์‹œ ๊ฐ€๋Šฅ.

2.7 ๋ถ€๋ถ„ ์‹คํŒจ ์ฒ˜๋ฆฌโ€‹

put_many ๊ฒฐ๊ณผ ๋น„ํŠธ๋งต์—์„œ ์ผ๋ถ€ False ์ธ ๊ฒฝ์šฐ:

  • ํ˜„์žฌ: ๊ฐ key ๋ณ„ coroutine ์ด RuntimeError ๋ฅผ raise โ†’ future ๊ฐ€ exception ์œผ๋กœ done. ํ˜ธ์ถœ์ž๊ฐ€ fut.result() ์—์„œ ์˜ˆ์™ธ ๋ฐ›์Œ.
  • ์ œ์•ˆ: ๋น„ํŠธ๋งต์„ ๋ณด๊ณ  ์‹คํŒจ key ๋“ค์€:
    • log.error ๋กœ ๋ณด๊ณ  (key ์‹๋ณ„์ž ํฌํ•จ)
    • ์ฝœ๋ฐฑ ํ˜ธ์ถœ ์•ˆ ํ•จ
    • future ์ž์ฒด๋Š” ์ •์ƒ ์™„๋ฃŒ (์„ฑ๊ณตํ•œ key ๋“ค์€ ์ •์ƒ ์ฒ˜๋ฆฌ๋จ)

โ†’ ์‹œ๋งจํ‹ฑ์ด ์‚ด์ง ๋‹ค๋ฆ„. ํ˜„์žฌ๋Š” "ํŠน์ • key ๊ฐ€ ์‹คํŒจํ•˜๋ฉด ๊ทธ key ์˜ future ๊ฐ€ exception" / ์ œ์•ˆ์€ "future ๋Š” ํ•ญ์ƒ ์ •์ƒ, ์‹คํŒจ key ๋Š” log ๋กœ๋งŒ ๋ณด๊ณ ".

ํ˜ธ์ถœ์ž๊ฐ€ future ์—์„œ exception ์„ ์žก๋Š” ํŒจํ„ด์ด ์žˆ๋Š”์ง€ ํ™•์ธ ํ•„์š”:

  • storage_manager.py: future ์‚ฌ์šฉ ์•ˆ ํ•จ
  • ํ…Œ์ŠคํŠธ: futs[0].result(timeout=10) โ†’ timeout ๋งŒ ์žก๊ณ  exception ์€ propagate

โ†’ ํ…Œ์ŠคํŠธ ํ˜ธํ™˜์„ ์œ„ํ•ด์„œ๋Š” "์ „์ฒด N ๊ฐœ ๋ชจ๋‘ ์‹คํŒจ ์‹œ์—๋งŒ future ์— RuntimeError" ์„ค์ •ํ•˜๋Š” ์ ˆ์ถฉ์•ˆ์ด ๊ฐ€๋Šฅ. ๊ทธ๋Ÿฌ๋‚˜ partial failure ๊ฐ€ ํ˜„์‹ค์—์„œ ๊ฑฐ์˜ ๋ฐœ์ƒํ•˜์ง€ ์•Š๋Š” (slot ๋ถ€์กฑ, I/O ์‹คํŒจ ๋“ฑ์€ N๊ฐœ ๋™์‹œ ๋ฐœ์ƒ) ์ ์„ ๊ฐ์•ˆํ•˜๋ฉด ๋‹จ์ˆœํ™” ์•ˆ์ „.

โ†’ ๊ฒฐ์ •: ๋น„ํŠธ๋งต ์ „๋ถ€ False ์ผ ๋•Œ๋งŒ future ์— exception ์„ค์ •. ๊ทธ ์™ธ์—๋Š” log + callback skip ๋กœ graceful ์ฒ˜๋ฆฌ. (ํ˜„์žฌ ์‹œ๋งจํ‹ฑ๊ณผ ๊ฐ€์žฅ ๊ฐ€๊นŒ์šด ์ ˆ์ถฉ์•ˆ)

2.8 ๊ฒ€ํ† ์—์„œ ๋ฐฐ์ œํ•œ ํ•ญ๋ชฉโ€‹

  • core ์ธก batch API ์‹ ์„ค (exists_inflight_many ๋“ฑ): ๋ณ„๋„ PR
  • legacy _submit_put_one ์™„์ „ ์‚ญ์ œ: ์ด๋ฒˆ PR ์—์„œ ์ œ๊ฑฐ (ํ˜ธ์ถœ๋ถ€ ์—†์–ด์ง)
  • option B (per-key future fan-out): ํ˜ธ์ถœ์ž๊ฐ€ per-key ์™„๋ฃŒ๋ฅผ ์ง„์งœ๋กœ ํ•„์š”๋กœ ํ•˜๋Š” ์‹œ์ ์— ๋ณ„๋„ PR
  • pin/contains ๊ฒฝ๋กœ batching (T2 ํ•ญ๋ชฉ): L2 ์™€ ๋ณ„๋„

3. ํšจ๊ณผโ€‹

3.1 ์ •๋Ÿ‰ ํšจ๊ณผ (์˜ˆ์ƒ)โ€‹

ํ•ญ๋ชฉN=10N=100๋‹จ์œ„
_put_lock ํš๋“30 โ†’ 2300 โ†’ 2ํšŒ
core._lock (dedup ๋‹จ๊ณ„)20 โ†’ 1200 โ†’ 1ํšŒ
์ด๋ฒคํŠธ๋ฃจํ”„ hop10 โ†’ 1100 โ†’ 1ํšŒ
asyncio.to_thread dispatch10 โ†’ 1100 โ†’ 1ํšŒ
core.put_many ํ˜ธ์ถœ10 (๊ฐ N=1)100 (๊ฐ N=1)ํ˜ธ์ถœ
์œ„ ํ˜ธ์ถœ ๋‚ด๋ถ€ lock (L1 ๋ฏธ์ ์šฉ)40400ํšŒ
์œ„ ํ˜ธ์ถœ ๋‚ด๋ถ€ lock (L1 ์ ์šฉ)20200ํšŒ
๋ฐฐ์น˜ํ˜• + L1 put ๋‚ด๋ถ€ lock22ํšŒ

โ†’ N=100 ๊ธฐ์ค€: dispatch overhead ~99% ๊ฐ์†Œ, lock ์ดํšŸ์ˆ˜ ~99% ๊ฐ์†Œ (L1 ๊ฒฐํ•ฉ ์‹œ).

3.2 ์ •์„ฑ ํšจ๊ณผโ€‹

  • legacy ์™€ MP ๊ฒฝ๋กœ (์ด๋ฏธ batched) ๊ฐ€ ๊ฐ™์€ core.put_many ์‹œ๋งจํ‹ฑ์„ ์ง์ ‘ ์‚ฌ์šฉ โ†’ ์ฝ”๋“œ ์ผ๊ด€์„ฑ
  • ์ธํ„ฐํŽ˜์ด์Šค ์ด๋ฆ„ (batched_submit_put_task) ๊ณผ ๊ตฌํ˜„ ์‹œ๋งจํ‹ฑ ์ผ์น˜
  • io_uring SQ ํ™œ์šฉ๋ฅ  ์ž ์žฌ์  ๊ฐœ์„  (ํ˜„์žฌ ๋™์‹œ SQE = num_store_workers โ‰ˆ 2~4 โ†’ ๋ฐฐ์น˜ 1ํšŒ๋กœ N๊ฐœ SQE ๊ฐ€๋Šฅ. ๋‹จ, ์‹คํšจ ํ™œ์šฉ์€ put_many ๋‚ด๋ถ€ ์ง๋ ฌ ๋ฃจํ”„๋กœ ํ˜„์žฌ๋Š” ๋น„ํ™œ์„ฑ โ€” P0 ๋ณ„๋„ ํ•ญ๋ชฉ P0-put_many-parallel-io.md)

3.3 ๋น„ํšจ๊ณผโ€‹

  • core.put_many ์‹œ๊ทธ๋‹ˆ์ฒ˜/์‹œ๋งจํ‹ฑ โ€” ๋ฏธ๋ณ€๊ฒฝ
  • on-disk format โ€” ๋ฏธ๋ณ€๊ฒฝ
  • public interface (batched_submit_put_task ์‹œ๊ทธ๋‹ˆ์ฒ˜) โ€” ๋ฏธ๋ณ€๊ฒฝ
  • T1 (delete() TOCTOU ์ˆ˜์ •) โ€” ์˜ํ–ฅ ์—†์Œ (๋‹ค๋ฅธ ๋ฉ”์„œ๋“œ)
  • L1 (put_many ๋‚ด๋ถ€ lock) โ€” ํ˜ธ์ถœ ํšŸ์ˆ˜๋งŒ ์ค„์ž„. L1 ํšจ๊ณผ๋Š” ๊ทธ๋Œ€๋กœ ์œ ํšจ.

4. ๋ณ€๊ฒฝ์  (๊ณ„ํš)โ€‹

4.1 batched_submit_put_task ์žฌ๊ตฌ์„ฑโ€‹

def batched_submit_put_task(self, keys, objs, transfer_spec=None,
on_complete_callback=None):
del transfer_spec
if not keys:
return None
loop = self.loop
if loop is None:
raise RuntimeError("RustRawBlockBackend requires an asyncio event loop")

# --- ์‚ฌ์ „ ํ•„ํ„ฐ๋ง: dedup + spec encode (single _put_lock window) ---
accepted_keys: list[CacheEngineKey] = []
accepted_specs: list[RawBlockKeySpec] = []
accepted_objs: list[MemoryObj] = []

specs = [encode_legacy_key(key) for key in keys]
encoded_keys = [spec.encoded for spec in specs]
indexed_bitmap = self._core.exists_many(encoded_keys, lock=False)

with self._put_lock:
for i, key in enumerate(keys):
if key in self._put_tasks:
continue
if indexed_bitmap[i]:
continue
if self._core.exists_inflight(encoded_keys[i]):
continue
self._put_tasks.add(key)
accepted_keys.append(key)
accepted_specs.append(specs[i])
accepted_objs.append(objs[i])

if not accepted_keys:
return None

for obj in accepted_objs:
obj.ref_count_up()

fut = asyncio.run_coroutine_threadsafe(
self._submit_put_batch(
accepted_keys, accepted_specs, accepted_objs, on_complete_callback,
),
loop,
)
# ์˜ต์…˜ A: ํ˜ธ์ถœ์ž๊ฐ€ list[Future] ๋ฅผ ๊ธฐ๋Œ€ํ•˜๋ฏ€๋กœ ๋™์ผ future ๋ฅผ N๊ฐœ ์œ„์น˜์— ๋ณต์ œ
return [fut] * len(accepted_keys)

4.2 _submit_put_batch (์‹ ๊ทœ, _submit_put_one ๋Œ€์ฒด)โ€‹

async def _submit_put_batch(self, keys, specs, objs, on_complete_callback):
try:
put_result = await asyncio.to_thread(
self._core.put_many, specs, objs,
)
results = put_result.results
if not any(results):
raise RuntimeError(
f"Failed to persist {len(keys)} raw-block keys "
f"(first encoded: {specs[0].encoded})"
)
if on_complete_callback is not None:
for key, ok in zip(keys, results, strict=True):
if not ok:
continue
try:
on_complete_callback(key)
except Exception as e:
logger.warning(
"on_complete_callback failed for key %s: %s", key, e,
)
# ๋ถ€๋ถ„ ์‹คํŒจ ๋กœ๊น…
for key, ok in zip(keys, results, strict=True):
if not ok:
logger.warning(
"RustRawBlockBackend: put failed for key %s", key,
)
finally:
for obj in objs:
obj.ref_count_down()
with self._put_lock:
for key in keys:
self._put_tasks.discard(key)

4.3 _submit_put_one ์ œ๊ฑฐโ€‹

ํ˜ธ์ถœ์ž ์—†์Œ โ†’ ์‚ญ์ œ. ๊ด€๋ จ docstring/์ฃผ์„ ์ •๋ฆฌ.

4.4 docstring/commentโ€‹

batched_submit_put_task ์— ๋‹ค์Œ invariant ๋ช…์‹œ:

  • ๋ฐ˜ํ™˜ list ์˜ ๋ชจ๋“  ํ•ญ๋ชฉ์€ ๋™์ผ Future ๊ฐ์ฒด๋ฅผ ๊ฐ€๋ฆฌํ‚ด (์˜ต์…˜ A)
  • ๋ถ€๋ถ„ ์‹คํŒจ ์‹œ future ๋Š” ์ •์ƒ ์™„๋ฃŒ, ์‹คํŒจ key ๋Š” log ๋กœ๋งŒ ๋ณด๊ณ 
  • ์ „๋ถ€ ์‹คํŒจ ์‹œ future ๊ฐ€ RuntimeError ๋กœ done

4.5 ํ…Œ์ŠคํŠธโ€‹

  • ๊ธฐ์กด test_rust_raw_block_backend.py ํšŒ๊ท€ ํ†ต๊ณผ (๊ฐ€์žฅ ์ค‘์š”)
  • ์‹ ๊ทœ:
    1. test_batched_put_dedup_inflight_and_indexed: keys ์ผ๋ถ€๊ฐ€ ์ด๋ฏธ ์ธ๋ฑ์Šค์— ์žˆ๊ณ  ์ผ๋ถ€๋Š” inflight ์ผ ๋•Œ, ์‚ฌ์ „ ํ•„ํ„ฐ๋ง์ด ์ •ํ™•ํžˆ ์ž”์—ฌ key ๋งŒ submit ํ•˜๋Š”์ง€
    2. test_batched_put_partial_failure_does_not_break_others: core.put_many ๋ฅผ mock ํ•˜์—ฌ ์ผ๋ถ€ False ๋ฐ˜ํ™˜ ์‹œ, ์„ฑ๊ณต key ์˜ callback ๋งŒ ํ˜ธ์ถœ๋˜๋Š”์ง€
    3. test_batched_put_returns_same_future_per_key: ์˜ต์…˜ A ์‹œ๋งจํ‹ฑ ๊ฒ€์ฆ
    4. test_batched_put_ref_count_balanced_on_exception: put_many ๊ฐ€ raise ํ•ด๋„ ๋ชจ๋“  obj ์˜ ref_count ๊ฐ€ net 0 ์ด ๋˜๋Š”์ง€

5. ๊ฐœ์„  ํฌ์ธํŠธ ํ™•์ธ ๋ฐฉ๋ฒ• (์ˆ˜์น˜ ์ฆ๋ช…)โ€‹

5.1 dispatch overhead ์ธก์ •โ€‹

# ๋ณ€๊ฒฝ ์ „/ํ›„ ๊ฐ๊ฐ:
# - 100๊ฐœ obj ์ค€๋น„
# - perf_counter ๋กœ batched_submit_put_task ํ˜ธ์ถœ~๋ชจ๋“  future done ๊นŒ์ง€ elapsed
# - run_coroutine_threadsafe ํ˜ธ์ถœ ํšŸ์ˆ˜ (mock ์œผ๋กœ ์นด์šดํŠธ)

๊ธฐ๋Œ€์น˜: dispatch elapsed N=100 ๊ธฐ์ค€ ๋ณ€๊ฒฝ ํ›„ 1/10~1/50 (event loop ํ ๋™๊ธฐํ™” overhead ๊ฐ€ main ๋น„์šฉ).

5.2 lock contention ์ธก์ •โ€‹

# section 4.1 ์‹ ์„ค sub: CountingLock ์œผ๋กœ _put_lock ์„ swap
# batched_submit_put_task(keys=[100], objs=[100]) 1ํšŒ
# assert acquire_count <= 4 (๋ณ€๊ฒฝ ํ›„) / >= 200 (๋ณ€๊ฒฝ ์ „)

5.3 throughput ์ธก์ • (์‹ค์ œ raw device ๋˜๋Š” io_uring tmp file)โ€‹

# - _has_ext() ๊ฐ€๋Šฅํ•œ ํ™˜๊ฒฝ์—์„œ๋งŒ
# - N=1, 10, 100, 1000 ๋ฐฐ์น˜ ํฌ๊ธฐ๋ณ„ put ์™„๋ฃŒ๊นŒ์ง€ wall clock
# - ๋ณ€๊ฒฝ ์ „/ํ›„ ๋น„๊ต

๊ธฐ๋Œ€์น˜: N ์ด ํด์ˆ˜๋ก ๋ณ€๊ฒฝ ํ›„ throughput ์šฐ์œ„ (~N๋ฐฐ ๊นŒ์ง€)

5.4 ์ •ํ•ฉ์„ฑโ€‹

pytest tests/v1/storage_backend/test_rust_raw_block_backend.py -v
pytest tests/v1/storage_backend/test_storage_plugin.py -v

6. ์œ„ํ—˜ / ๋กค๋ฐฑโ€‹

6.1 ์œ„ํ—˜ ํ•ญ๋ชฉโ€‹

์œ„ํ—˜๊ฐ€๋Šฅ์„ฑ์˜ํ–ฅ์™„ํ™”
์˜ต์…˜ A ์‹œ๋งจํ‹ฑ ๋ณ€ํ™”๋กœ ํ˜ธ์ถœ์ž ๊นจ์ง๋‚ฎ์Œ์ผ๋ถ€ ํ…Œ์ŠคํŠธ ์‹คํŒจํ˜ธ์ถœ์ž grep ๊ฒฐ๊ณผ storage_manager/cache_engine ์—์„œ future ์‚ฌ์šฉ ์•ˆ ํ•จ ํ™•์ธ ์™„๋ฃŒ
๋ถ€๋ถ„ ์‹คํŒจ โ†’ future ์ •์ƒ ์™„๋ฃŒ๋กœ ์ธํ•œ silent loss์ค‘๊ฐ„์ผ๋ถ€ key ์˜๊ตฌ ๋ฏธ์ €์žฅlog.warning ๋ช…์‹œ + ์ถ”ํ›„ metric ์ถ”๊ฐ€ ๊ฒ€ํ† 
์‚ฌ์ „ ํ•„ํ„ฐ๋ง ๋‹จ๊ณ„์˜ exists_many ์™€ exists_inflight ๊ฐ„ race window๋‚ฎ์Œ์ค‘๋ณต put ์‹œ๋„core.put_many ๊ฐ€ ์ด๋ฏธ idempotent (_index / _inflight ์žฌ๊ฒ€์‚ฌ)
_submit_put_batch ์˜ finally ๋ˆ„์ˆ˜๋‚ฎ์Œref_count ์˜๊ตฌ +15.4 + ref_count ํ…Œ์ŠคํŠธ (5.x ํ•ญ๋ชฉ)
L1 ๋ฏธ์ ์šฉ ํ™˜๊ฒฝ์—์„œ batch 1ํšŒ lock holding ๊ธธ์–ด์ง๋‚ฎ์Œexists_many ๋“ฑ ๋Œ€๊ธฐ โ†‘L1 ๋™์‹œ ์ง„ํ–‰ ๊ถŒ์žฅ. ๋ฏธ์ ์šฉ ์‹œ์—๋„ 200 โ†’ 2N ์œผ๋กœ ํšŸ์ˆ˜ ์ค„์–ด๋“ฆ

6.2 ๋กค๋ฐฑโ€‹

๋ณ€๊ฒฝ์ด ๋‹จ์ผ ํŒŒ์ผ/3๊ฐœ ๋ฉ”์„œ๋“œ์— ๊ตญํ•œ. PR ๋‹จ์œ„ revert ์ถฉ๋ถ„. on-disk format/๊ณต๊ฐœ ์ธํ„ฐํŽ˜์ด์Šค ์‹œ๊ทธ๋‹ˆ์ฒ˜ ๋ฏธ๋ณ€๊ฒฝ โ†’ ์šด์˜ ์ค‘ ๋กค๋ฐฑ ์‹œ ์ถ”๊ฐ€ ๋งˆ์ด๊ทธ๋ ˆ์ด์…˜ ๋ถˆํ•„์š”.

6.3 ๋จธ์ง€ ์ˆœ์„œ ์˜์กด์„ฑโ€‹

  • T1 (9fc5a901) ๊ฐ€ ๋ฒ ์ด์Šค. T1 ๋จธ์ง€ ํ›„ rebase ์‹œ ์ž๋™ ์ •ํ•ฉ (T1 ์€ remove() 1์ค„, L2 ๋Š” batched_submit_put_task โ€” ๋‹ค๋ฅธ ๋ฉ”์„œ๋“œ)
  • L1 ๊ณผ๋Š” ํŒŒ์ผ์ด ๋‹ค๋ฆ„ โ†’ ๋ฌด๊ด€
  • ๊ถŒ์žฅ ๋จธ์ง€ ์ˆœ์„œ: T1 โ†’ L1 โ†’ L2 (๋˜๋Š” T1 โ†’ L2 โ†’ L1, ์ˆœ์„œ ๋ฌด๊ด€)

7. ๋ณ€๊ฒฝ ๋กœ๊ทธโ€‹

์ผ์ž์ž‘์„ฑ์ž๋‚ด์šฉ
2026-05-28ny์ดˆ์•ˆ ์ž‘์„ฑ. ์˜ต์…˜ A ์ฑ„ํƒ. ๋ถ€๋ถ„ ์‹คํŒจ๋Š” graceful + ์ „๋ถ€ ์‹คํŒจ ์‹œ raise ์ ˆ์ถฉ์•ˆ ๊ฒฐ์ •. ๊ตฌํ˜„ ๋ฏธ์ฐฉ์ˆ˜.
2026-05-28ny๊ตฌํ˜„ ์™„๋ฃŒ (rust_raw_block_backend.py). _submit_put_one โ†’ _submit_put_batch ๋กœ ๊ต์ฒด. ๊ธฐ์กด test_rust_raw_block_backend.py:1044 ์˜ match="Failed to persist raw-block key" โ†’ match="Failed to persist raw-block" ๋กœ ๋‹จ์ถ• (์ƒˆ ๋ฉ”์‹œ์ง€ Failed to persist raw-block batch of N keys์™€ ํ˜ธํ™˜).
2026-05-28ny์‹ ๊ทœ ํ…Œ์ŠคํŠธ 4๊ฐœ ์ถ”๊ฐ€. ์ฒซ ์‹คํ–‰์—์„œ 2๊ฐœ ์‹คํŒจ โ†’ ๋ชจ๋‘ ํ…Œ์ŠคํŠธ ์ฝ”๋“œ ๋ฒ„๊ทธ. 8.1 / 8.2 ์ฐธ์กฐ. production ์ฝ”๋“œ ๋ณ€๊ฒฝ ์‚ฌํ•ญ์€ ๊ทธ๋Œ€๋กœ ์œ ์ง€.
2026-05-28nyํ…Œ์ŠคํŠธ helper config ์ˆ˜์ • + patch lifecycle ์ˆ˜์ • ํ›„ ๋ชจ๋“  ํ…Œ์ŠคํŠธ ํ†ต๊ณผ. obj.ref_count_down() ๋ช…์‹œ ํ˜ธ์ถœ ์ถ”๊ฐ€ (๊ธฐ์กด ํ…Œ์ŠคํŠธ ํŒจํ„ด๊ณผ ์ผ์น˜). ์ตœ์ข…: 7 passed (๊ธฐ์กด 3 + ์‹ ๊ทœ 4), 26 skipped (Rust ํ™•์žฅ ์˜์กด). ruff check / ruff format ํ†ต๊ณผ.
2026-06-01ny์ฝ”๋“œ ๋ฆฌ๋ทฐ (cr-93403c0f) ์˜ Tier 1 ํ•ญ๋ชฉ ๋ณธ PR ์•ˆ์—์„œ ์ฒ˜๋ฆฌ. (1) F2 โ€” exists_many(lock=False) ํ˜ธ์ถœ์„ _put_lock ์•ˆ์œผ๋กœ ์ด๋™์‹œ์ผœ dedup TOCTOU ์œˆ๋„์šฐ ์ œ๊ฑฐ. (2) F1+F3 โ€” ref_count_up ๋ฃจํ”„ + run_coroutine_threadsafe dispatch ๋ฅผ try/except ๋กœ ๊ฐ์‹ธ ๋ถ€๋ถ„ ์ง„ํ–‰ ๋ณด์ƒ (์„ฑ๊ณตํ•œ ref_count ๋งŒ ref_count_down + _put_tasks ์ผ๊ด„ discard + ๋ฏธ-dispatch ์ฝ”๋ฃจํ‹ด coro.close()). ์‹ ๊ทœ ํ…Œ์ŠคํŠธ 2๊ฐœ (dispatch failure / ref_count_up failure ๋ณด์ƒ). ์ตœ์ข…: 9 passed (๊ธฐ์กด 7 + ์‹ ๊ทœ 2), 26 skipped. ruff check / ruff format ํ†ต๊ณผ.

8. ๊ตฌํ˜„ ์ค‘ ๋ฐœ๊ฒฌ๋œ ์ด์Šˆโ€‹

8.1 helper config ์‹ค์ˆ˜ โ€” slot_bytes ๋„ˆ๋ฌด ์ž‘์Œโ€‹

์ฆ์ƒ: ์ฒซ ์‹ ๊ทœ ํ…Œ์ŠคํŠธ ์‹คํ–‰ ์‹œ

RawBlockCore write failed for ... :
RawBlockCore payload 65536 exceeds slot capacity 4096

์›์ธ: _make_legacy_backend helper ์—์„œ rust_raw_block.slot_bytes=8192, header_bytes=4096 ๋กœ ๋‘๋ฉด payload capacity = slot - header = 4096 bytes. ํ•˜์ง€๋งŒ ํ…Œ์ŠคํŠธ๊ฐ€ ์‚ฌ์šฉํ•˜๋Š” KV chunk ๋Š” [2,16,8,128] ร— bfloat16 = 64KB โ†’ ์ดˆ๊ณผ.

ํ•ด๊ฒฐ: slot_bytes ๋ช…์‹œ๋ฅผ ์ œ๊ฑฐํ•˜์—ฌ local_cpu_backend.get_full_chunk_size_bytes() ๋กœ ์ž๋™ ๊ณ„์‚ฐ๋˜๋„๋ก ๋‘์—ˆ๋‹ค (rust_raw_block_backend._build_core_config ์˜ default ๋™์ž‘). ๊ธฐ์กด production ํ…Œ์ŠคํŠธ๋“ค์ด ๋ชจ๋‘ ์ด default ํŒจํ„ด์„ ์‚ฌ์šฉ โ†’ helper ๋„ ๋™์ผ ํŒจํ„ด์œผ๋กœ ํ†ต์ผ.

์˜๋ฏธ: production ์ฝ”๋“œ ๋ณ€๊ฒฝ ์‚ฌํ•ญ๊ณผ ๋ฌด๊ด€. ํ…Œ์ŠคํŠธ helper ์˜ config ์‹ค์ˆ˜.

8.2 ํ…Œ์ŠคํŠธ์˜ patch.object ๋ฒ”์œ„ ์˜ค๋ฅ˜โ€‹

์ฆ์ƒ:

recorded_specs == [] # ํ˜ธ์ถœ ์‹œ์ ์— patch unwound

mock ๋œ put_many ๊ฐ€ ํ˜ธ์ถœ๋˜์ง€ ์•Š์Œ์—๋„ backend.contains(fresh_key) is True ๋Š” ํ†ต๊ณผ โ†’ ์ง„์งœ put_many ๊ฐ€ patch ๊ฐ€ ํ’€๋ฆฐ ๋’ค์— ์‹คํ–‰๋˜๊ณ  ์žˆ์Œ.

์›์ธ: batched_submit_put_task ๊ฐ€ asyncio.run_coroutine_threadsafe ๋กœ coroutine ์„ ํ์ž‰ํ•˜๊ณ  ์ฆ‰์‹œ ๋ฐ˜ํ™˜. ์‹ค์ œ asyncio.to_thread(self._core.put_many, ...) ๋Š” ๋ณ„๋„ thread ์—์„œ ๋‚˜์ค‘์— ํ˜ธ์ถœ. ํ…Œ์ŠคํŠธ๊ฐ€ with patch.object(...) ๋ธ”๋ก ๋ฐ–์—์„œ futs[0].result(timeout=10) ์„ ๊ธฐ๋‹ค๋ฆฌ๋„๋ก ์ž‘์„ฑ๋˜์–ด, patch ๊ฐ€ ์ด๋ฏธ unwind ๋œ ํ›„ worker thread ๊ฐ€ ์‹ค์ œ ํ˜ธ์ถœ์„ ์ˆ˜ํ–‰ํ–ˆ๋‹ค.

ํ•ด๊ฒฐ: with patch.object(...) ๋ธ”๋ก ์•ˆ์— result(timeout=10) ๊นŒ์ง€ ํฌํ•จ. ๋‹ค๋ฅธ ์‹ ๊ทœ ํ…Œ์ŠคํŠธ (partial_failure) ๋„ ๊ฐ™์€ ๊ตฌ์กฐ์˜€์œผ๋ฏ€๋กœ ํ•จ๊ป˜ ์ˆ˜์ •.

์˜๋ฏธ: production ์ฝ”๋“œ ์‹œ๋งจํ‹ฑ๊ณผ ๋ฌด๊ด€. ํ…Œ์ŠคํŠธ์˜ patch ์ˆ˜๋ช… ๊ด€๋ฆฌ ์˜ค๋ฅ˜. ์ด ์ด์Šˆ๊ฐ€ ๋ฐœ์ƒํ•œ ์ด์œ ๋Š” ์ƒˆ ๊ตฌ์กฐ๊ฐ€ "๋‹จ์ผ future๊ฐ€ batch ์ „์ฒด ์™„๋ฃŒ๋ฅผ ํ‘œํ˜„" ํ•˜๊ธฐ ๋•Œ๋ฌธ โ€” ๊ธฐ์กด์˜ per-key future ๋Œ€๋น„ await ์œ„์น˜๊ฐ€ ํ•œ ๋‹จ๊ณ„ ์ง€์—ฐ๋œ๋‹ค. ํ–ฅํ›„ mock ๊ธฐ๋ฐ˜ ํ…Œ์ŠคํŠธ ์ž‘์„ฑ ์‹œ ๋™์ผ ํŒจํ„ด ์ฃผ์˜ ํ•„์š”.


9. ์ธก์ • ๊ฒฐ๊ณผ (2026-06-08)โ€‹

๋ฒค์น˜๋งˆํฌ ์Šคํฌ๋ฆฝํŠธ: benchmarks/storage_backend_io/bench_dispatch_patterns.py ํ™˜๊ฒฝ: /home/ny/LMCache, fake in-memory device, Python 3.12, warmup=3, iters=10

๊ฒฐ๊ณผโ€‹

dispatch benchmark obj=64B warmup=3 iters=10
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
N lat ยตs batch med fanout med (b-f)/f verdict
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
10 0 0.117 ms 0.763 ms -84.7% batch faster โœ“
10 50 2.466 ms 1.175 ms +109.9% fanout faster !
10 200 5.389 ms 1.088 ms +395.1% fanout faster !
100 0 0.773 ms 5.059 ms -84.7% batch faster โœ“
100 50 29.140 ms 18.208 ms +60.0% fanout faster !
100 200 56.825 ms 39.412 ms +44.2% fanout faster !
1000 0 7.103 ms 97.951 ms -92.7% batch faster โœ“
1000 50 271.195 ms 267.597 ms +1.3% ~equal
1000 200 579.637 ms 256.461 ms +126.0% fanout faster !
โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
negative diff โ†’ batch is faster

ํ•ด์„โ€‹

๊ตฌ๊ฐ„๊ฒฐ๊ณผ์›์ธ
lat=0ยตs (์ˆœ์ˆ˜ CPU)batch 85~93% ๋น ๋ฆ„dispatch overhead ์ ˆ์•ฝ ํšจ๊ณผ
lat=50ยตs (NVMe ํ˜„์‹ค)fanout 60~110% ๋น ๋ฆ„thread pool ๋ณ‘๋ ฌ ์“ฐ๊ธฐ๊ฐ€ dispatch ์ ˆ์•ฝ์„ ์••๋„
lat=200ยตs (๋А๋ฆฐ NVMe)fanout 44~395% ๋น ๋ฆ„๋ณ‘๋ ฌ์„ฑ ํšจ๊ณผ ๊ทน๋Œ€ํ™”
lat=50ยตs, N=1000~equal (+1.3%)workers ํฌํ™”(~32) + dispatch ์ ˆ์•ฝ ์ƒ์‡„

๊ทผ๋ณธ ์›์ธ: batch๋Š” ํ•˜๋‚˜์˜ asyncio.to_thread์—์„œ N๊ฐœ ์“ฐ๊ธฐ๋ฅผ ์ง๋ ฌ ์‹คํ–‰. fanout์€ N๊ฐœ to_thread ํƒœ์Šคํฌ๊ฐ€ thread pool์—์„œ ๋™์‹œ ์‹คํ–‰. put_many ๋‚ด๋ถ€์—์„œ _write_one ์ „ _lock์„ ํ•ด์ œํ•˜๋ฏ€๋กœ ์‹ค์ œ I/O๊ฐ€ overlap๋จ โ†’ ์‹ค I/O๊ฐ€ ์žˆ์œผ๋ฉด parallelism ์ด๋“์ด dispatch ์ ˆ์•ฝ์„ ์••๋„.

ํŒ์ •โ€‹

์ด ๋ณ€๊ฒฝ์€ lat=0 (์ˆœ์ˆ˜ CPU) ๊ตฌ๊ฐ„์—์„œ๋งŒ ์ด๋“์ด ์žˆ์œผ๋ฉฐ, ์‹ค ์Šคํ† ๋ฆฌ์ง€๊ฐ€ ๋ถ™๋Š” NVMe ํ˜„์‹ค ๊ตฌ๊ฐ„์—์„œ๋Š” ์˜คํžˆ๋ ค regression.

5a27732f(4Nโ†’2N ๋ฝ)๊ฐ€ <1%๋ผ ๋“œ๋กญ๋œ ๊ฒƒ๊ณผ ๋‹ฌ๋ฆฌ, ์ด๋ฒˆ์€ ๋ฐฉํ–ฅ ์ž์ฒด๊ฐ€ ์—ญ์ „๋จ. batch ํŒจํ„ด์ด "์„ฑ๋Šฅ ์ตœ์ ํ™”"๊ฐ€ ๋˜๋ ค๋ฉด put_many ๋‚ด๋ถ€๊ฐ€ parallel I/O๋ฅผ ์ง€์›ํ•ด์•ผ ํ•จ (ํ˜„์žฌ๋Š” ์ง๋ ฌ ๋ฃจํ”„ โ€” PR #3274 ์ดํ›„ P0 ๊ณผ์ œ).

๊ฒฐ์ • (2026-06-10 ์žฌ๊ฒ€ํ† ): dispatch batching์„ L2์—์„œ ์ œ๊ฑฐํ•˜๊ณ  P0๋กœ ์ด์ „.

์ด์œ : L2๊ฐ€ correctness fix์™€ dispatch batching์„ ๋™์‹œ์— ํฌํ•จํ•˜๋ฉด, dispatch batching alone์ด NVMe ๊ตฌ๊ฐ„์—์„œ regression์„ ์ผ์œผ์ผœ ๋ฆฌ๋ทฐ์–ด๊ฐ€ "correctness PR์ธ๋ฐ ์™œ ์„ฑ๋Šฅ์ด ๋‚˜๋น ์ง€๋‚˜"๊ณ  ์˜์‹ฌํ•  ์ˆ˜ ์žˆ์Œ. PR ์‹ ๋ขฐ๋„ ์ €ํ•˜.

โ†’ L2 = pure correctness fix (TOCTOU + resource leak + F6 bug). #3274 ์—†์ด dev์— ๋‹จ๋… ์ œ์ถœ ๊ฐ€๋Šฅ. dispatch batching ์ œ๊ฑฐ ํ›„ 29๊ฐœ ํ…Œ์ŠคํŠธ ํ†ต๊ณผ.

dispatch batching (_submit_put_many + io_uring ๋ถ„๊ธฐ) ์€ P0 PR๋กœ ์ด์ „. ์ƒ์„ธ: P0-put_many-parallel-io.md