L2 โ batched_submit_put_task correctness fix (TOCTOU + resource leak)
๐ซ 2026-06-10 ํ๊ธฐ ๊ฒฐ์ โ P0๋ก ํก์, ๋จ๋ PR ์ ํจ. ๋ธ๋์น
fix/rawblock-batched-put-toctou(8860f132) ์ญ์ ๋จ.ํก์ ์ด์ (๋ฆฌ๋ทฐ ๊ฒฐ๋ก ):
- "dedup TOCTOU"๋ ์ง์ง correctness ๋ฒ๊ทธ๊ฐ ์๋๋ค. dev์
core.put_many๊ฐ ์ด๋ฏธ ๋ฉฑ๋ฑ โ_lock์์์if key in _index: ์ฑ๊ณต ๋ณด๊ณ / if key in _inflight: skip๋ก ์ฌ๊ฒ์ฌํ๋ฏ๋ก, plugin ์ฌ์ dedup(contains_key/exists_inflight)์ด staleํด๋ ์ค๋ณต write๊ฐ ์ ๋ ๋ฐ์ํ์ง ์๋๋ค. plugin dedup์ "์ด๋ฏธ ์๋ ํค์ ์ฝ๋ฃจํด dispatch ๋ญ๋น๋ฅผ ์ค์ด๋ best-effort ์ต์ ํ"์ผ ๋ฟ. ๊ฒ๋ค๊ฐ_put_lock์_put_tasks๋ง ๋ณดํธํ์ง core์_index/_inflight๋ฅผ ๋ณดํธํ์ง ์์,exists_many๋ฅผ ๋ฝ ์์ผ๋ก ์ฎ๊ฒจ๋ core ์ํ race๋ฅผ ๋ง์ ์ ์๋ค(๋ง์ ๊ฒ ์๋ค). ๋์ผ ์ธ์คํด์ค ์ค๋ณต submit์_put_tasks(๋ฝ ์ add)๊ฐ ์ด๋ฏธ ๋ง๋๋ค. โ ์ด ๋ณ๊ฒฝ์ ์ค์ฒด๋ **batchedexists_many
- ๋ฝ ํ๋ 2~3Nโ2ํ ๋ง์ดํฌ๋ก ์ต์ ํ**.
- ref ๋์ ๋กค๋ฐฑ(โก)์ ์ง์ง์ง๋ง ๋งค์ฐ ์ข๋ค โ
run_coroutine_threadsafe๊ฐ raiseํ๋ ์ ง๋ค์ด ๋ ์ด์ค ํ์ . ์ ์ ์ด์ ์ค ๋ฏธ๋ฐ์. info/minor ์์ค.- โ โก๋ ๋ ๋ค "๋ฐฐ์น dispatch" ๋งฅ๋ฝ์์๋ง ๊ฐ์ด ์ด์๋๋ค(N-object ๋กค๋ฐฑ ๋ฑ). ๊ทธ ๋งฅ๋ฝ์ด ๋ฐ๋ก P0๋ค. dev ์ ๋จ๋ correctness PR๋ก๋ ๊ฐํ๊ฐ์ ๋ชป ํ๊ณ reviewer ๋ฐ๋ฐ๋ง ๋ถ๋ฅธ๋ค.
โ P0(
perf/rawblock-put-many-batch-io)๊ฐ โ (batched dedup)ยทโก(ref ๋กค๋ฐฑ)๋ฅผ ์ด๋ฏธ ํฌํจํ๋ค (P0์batched_submit_put_task= dev + โ โก + dispatch batching). L2๋ P0์ ๋ถ๋ถ์งํฉ์ด๋ผ ๋ฐ๋ก ์ฎ๊ธธ ์ฝ๋ ์์. ์๋ ยง1~ยง9๋ ๋ถ๋ฆฌ ์๋ ๋น์์ ๋ถ์ ๊ธฐ๋ก์ผ๋ก ๋ณด์กด.
1. ๋ณ๊ฒฝ ๋ฐฐ๊ฒฝโ
1.1 ํ์ฌ ๊ตฌ์กฐ โ N๊ฐ coroutine์ผ๋ก ๋ถํดโ
rust_raw_block_backend.py:347-407
def batched_submit_put_task(self, keys, objs, ...):
futures = []
for key, obj in zip(keys, objs, strict=False):
with self._put_lock: # per-key lock #1
if key in self._put_tasks:
continue
self._put_tasks.add(key)
spec = encode_legacy_key(key)
exists = self._core.contains_key(spec.encoded, lock=False) \
or self._core.exists_inflight(spec.encoded) # per-key core lock ร 2
if exists:
with self._put_lock: # per-key lock #2
self._put_tasks.discard(key)
continue
obj.ref_count_up()
fut = asyncio.run_coroutine_threadsafe(
self._submit_put_one(key, spec, obj, on_complete_callback),
loop,
) # per-key event-loop hop
futures.append(fut)
return futures or None
async def _submit_put_one(self, key, spec, memory_obj, on_complete_callback):
try:
put_result = await asyncio.to_thread(
self._core.put_many, [spec], [memory_obj], # โ N=1 ํธ์ถ!
)
if not put_result.results or not put_result.results[0]:
raise RuntimeError(...)
if on_complete_callback is not None:
try: on_complete_callback(key)
except Exception as e: logger.warning(...)
finally:
memory_obj.ref_count_down()
with self._put_lock: # per-key lock #3
self._put_tasks.discard(key)
1.2 ๋น์ฉ ๊ตฌ์กฐโ
key 1๊ฐ ์ฒ๋ฆฌ ์:
| ๋จ๊ณ | ๋น์ฉ |
|---|---|
_put_lock ํ๋ | 2~3ํ (add โ fail-cleanup or finally-discard) |
_core.contains_key + exists_inflight | core _lock 2ํ |
asyncio.run_coroutine_threadsafe | ์ด๋ฒคํธ๋ฃจํ ํ enqueue 1ํ |
asyncio.to_thread | thread pool dispatch 1ํ |
core.put_many([1], [1]) | core ๋ด๋ถ 4ํ lock (L1 ๋ฏธ์ ์ฉ ์) |
_put_lock finally discard | 1ํ |
N=100 ๋ฐฐ์น:
_put_lock: 200300ํ (ํ์ฌ) โ 24ํ (์ ์)- ์ด๋ฒคํธ๋ฃจํ hop: 100ํ (ํ์ฌ) โ 1ํ (์ ์)
- thread pool dispatch: 100ํ (ํ์ฌ) โ 1ํ (์ ์)
- core
put_manyํธ์ถ: 100ํ ร N=1 (ํ์ฌ) โ 1ํ ร N=100 (์ ์)
1.3 ์ ์ด๋ ๊ฒ ์ง์๋๊ฐ (์ญ์ฌ์ ์ถ์ )โ
- legacy backend๊ฐ ๋ณธ๋ key ๋จ์
submit_put_task๋ง ๊ฐ๊ณ ์์๊ณ ,batched_submit_put_task๋ ์ด๋ฆ๋ง ๋ฐฐ์น์ธ ์ด๋ํฐ๋ก ์ถ๊ฐ๋ ๊ฒ์ผ๋ก ๋ณด์ธ๋ค. core.put_many๊ฐ ์ง์ง ๋ฐฐ์น ์๋งจํฑ์ ๊ฐ๊ฒ ๋ ์์ ์ ํธ์ถ๋ถ ์ชฝ์ด ํจ๊ป ์ ๊ทธ๋ ์ด๋๋์ง ๋ชปํจ.- ๊ฒฐ๊ณผ์ ์ผ๋ก ์ธํฐํ์ด์ค(public)๋ ๋ฐฐ์น์ธ๋ฐ ๊ตฌํ์ 1-key fan-out.
1.4 L1๊ณผ์ ์ง๊ต์ฑโ
L1์ core.py ๋ด๋ถ (put_many + _write_one) ์ lock ํก์ ์์
์ด๊ณ ,
L2๋ rust_raw_block_backend.py ์ ํธ์ถ๋ถ ์ฌ๊ตฌ์ฑ์ด๋ค. ์์ ํ์ผ์ด ๋ค๋ฅด๊ณ ,
L2๊ฐ ํธ์ถํ๋ core.put_many ์ ์๊ทธ๋์ฒ๋ ๋ณํ์ง ์์ผ๋ฏ๋ก ์ถฉ๋ ์์.
๋ ๋ณ๊ฒฝ์ ํจ๊ณผ๋ ๊ณฑ์
์ผ๋ก ํฉ์ณ์ง๋ค (L2๊ฐ ํธ์ถ ํ์๋ฅผ ์ค์ด๊ณ , L1์ด ํธ์ถ 1ํ์
lock ํ์๋ฅผ ์ค์).
2. ๊ฒํ ๋ด์ฉโ
2.1 dedup ์๋งจํฑ ๋ณด์กดโ
ํ์ฌ๋ ๋ค์ 3์ข dedup์ด key๋ณ๋ก ์ง๋ ฌ ์ํ๋๋ค:
_put_tasksset ๋ฉค๋ฒ์ญ (in-flight put)core.contains_key(์ด๋ฏธ ์ธ๋ฑ์ค์ ์์)core.exists_inflight(๋ค๋ฅธ ๊ฒฝ๋ก์ inflight)
๋ฐฐ์นํ์ผ๋ก ๋ฌถ์ ๋ ์ฌ์ ํํฐ๋ง ๋จ๊ณ ์์ ๋์ผ ๊ฒ์ฌ๋ฅผ ์ผ๊ด ์ํํ๋ค.
ํํฐ๋ง ๊ฒฐ๊ณผ๋ก ์ด์๋จ์ (key, spec, obj) ์๋ค๋ง put_many ์ ์ ๋ฌํ๋ค.
| ๊ฒ์ฌ | ํ์ฌ | ์ ์ |
|---|---|---|
_put_tasks ๋ฉค๋ฒ์ญ | per-key ์ง๋ ฌ | _put_lock ํ ๋ฒ ์ก๊ณ batch add (์ด๋ฏธ ์๋ ๊ฒ skip) |
core.contains_key | per-key | core.exists_many(encoded_keys, lock=False) 1ํ |
core.exists_inflight | per-key | core ์ธก์ batch API ์ถ๊ฐ ํ์ (๋ณด๋ฅ โ ์๋ 2.5 ์ฐธ์กฐ) |
2.2 ref_count ์ ํฉ์ฑโ
ํ์ฌ ๋ณด์ฅ์ฌํญ:
- ์ด์๋จ์ obj ๋ง
ref_count_up() - coroutine
_submit_put_onefinally ์์ ๋ฐ๋์ref_count_down() - coroutine ์ง์
์คํจ ์(
loop is None) ์ฆ์ref_count_down()ํ raise
๋ฐฐ์นํ์์ ์ ์งํด์ผ ํ invariant:
"๋ชจ๋
ref_count_up()๋ obj ๋,put_many์ ๊ฒฐ๊ณผ (์ฑ๊ณต/์คํจ) ์ ๋ฌด๊ดํ๊ฒ ์ ํํ ํ ๋ฒref_count_down()๋๋ค."
โ ๋ฐฐ์น coroutine ์ finally ์์ ์ด์๋จ์ obj ๋ฆฌ์คํธ ์ ์ฒด์ ๋ํด ์ผ๊ด
ref_count_down(). put_many ๊ฐ ๋์ค์ ์์ธ๋ฅผ ๋์ ธ๋ finally ๊ฐ ๋ณด์ฅ.
2.3 on_complete_callback ์์ โ
ํ์ฌ: ๊ฐ key ๋ณ coroutine ์ด put_many([1]) ์ฑ๊ณต ์งํ ๊ทธ key ์ ์ฝ๋ฐฑ
ํธ์ถ. ๋ถ๋ถ ์ฑ๊ณต์ด๋ผ๋ ๊ฐ๋
์์ (ํ key ๋จ์๊ฐ batch ์ ์ฒด).
์ ์: put_many(specs, objs).results ๋ list[bool] โ ์ ํํ ์ด๋ key ๊ฐ
์ฑ๊ณตํ๋์ง ๋นํธ๋งต์ผ๋ก ๋ฐํ๋๋ค. ๋นํธ๋งต์ ์ํํ๋ฉฐ:
results[i] == Trueโon_complete_callback(keys[i])ํธ์ถresults[i] == Falseโ ์ฝ๋ฐฑ ํธ์ถ ์ ํจ (ํ์ฌ์ ๋์ผ ์๋งจํฑ)
์ฝ๋ฐฑ ์์ธ๋ ํ์ฌ์ฒ๋ผ catch + warning log.
์๋งจํฑ ๋๋ฑ์ฑ: put_many ๋ key ๋ณ ๋
๋ฆฝ์ ์ผ๋ก ์ธ๋ฑ์ค์ commit ํ๋ฏ๋ก,
"key ๊ฐ ์ธ๋ฑ์ค์ ๋ค์ด๊ฐ ํ์๋ง ์ฝ๋ฐฑ" ์ด๋ผ๋ ํ์ฌ invariant ๊ฐ ์ ์ง๋๋ค.
2.4 Future ๋ฐํ ์๊ทธ๋์ฒโ
ํ์ฌ: list[Future] | None. ํธ์ถ์๊ฐ future ๋ณ๋ก wait/cancel ๊ฐ๋ฅ.
์ ์ ์ต์ ๋น๊ต:
| ์ต์ | ๋ฐํ | ํธ์ถ์ ํธํ์ฑ | ๋น๊ณ |
|---|---|---|---|
| A | ๋ฐฐ์น 1๊ฐ Future ๋ฅผ N๊ฐ๋ก ๋ณต์ ํ์ฌ list ๋ฐํ | โ ํธ์ถ์ ๋ณ๊ฒฝ ๋ถํ์ | ์๋งจํฑ: "ํ future ์ด done ์ด๋ฉด N๊ฐ ๋ชจ๋ done" |
| B | ๋ฐฐ์น ๊ฒฐ๊ณผ๋ฅผ N๊ฐ per-key Future ๋ก fan-out | โ ์๋งจํฑ ๋์ผ | ๊ฐ future ๊ฐ results[i] ๋ฅผ ๊ฒฐ๊ณผ๋ก ๊ฐ์ง |
| C | list ๊ธธ์ด 1 ์ง๋ฆฌ future ๋ฐํ | โ ํธ์ถ์ ๊นจ์ง | ๋น์ถ |
ํธ์ถ์ ๊ฒํ (cache_engine.py:1235, storage_manager.py:428):
- storage_manager ๋ ๋ฐํ future ๋ฅผ ์ฌ์ฉํ์ง ์์ (fire-and-forget)
- cache_engine.move_kv ๋
async_batched_submit_put_task๋ง ์ฌ์ฉ โ ๋ณ๋ ๊ฒฝ๋ก - ํ
์คํธ (test_rust_raw_block_backend.py:1036)
๋
futs[0].result(timeout=10)ํจํด์ด ๋ค์ โ list ์ ์ฒซ ์์ ๋ง ์์ง ํ๋ ํจํด์ด๋ผ ์ต์ A ๊ฐ ์ ํํ ํธํ
โ ์ต์ A ์ฑํ (๊ฐ์ฅ ์์ ํ๋ฉด์ ). ์ต์ B ๋ ํ์ PR ๋ก ๋ถ๋ฆฌ ๊ฐ๋ฅ.
2.5 exists_inflight ๋ฐฐ์น APIโ
ํ์ฌ core.exists_inflight(encoded_key) -> bool ๋ง ์กด์ฌ.
์ ํ์ง:
- (a) batch API
core.exists_inflight_many(keys) -> list[bool]์ ์ค - (b) L2 ๋จ๊ณ์์๋ per-key ํธ์ถ ์ ์ง (lock Nํ) โ ๋จ, contains_key ๋
exists_many๋ก batch ํ - (c) ์ฌ์ ํํฐ๋ง ๋จ๊ณ์์
_inflight๊ฒ์ฌ๋ฅผ ์๋ต (race window ์ํฅ ๊ฒํ ํ์)
ํ๋จ: (b) ์ฑํ. ์ด์ :
exists_inflight๋ Python ์ธก dict membership ๊ฒ์ฌ 1ํ โ ๋งค์ฐ ์ ๋ ด- core API ํ๋ฉด ํ์ฅ์ ๋ณ๋ PR ๋ก ๋ถ๋ฆฌํ๋ ๊ฒ ๋ฆฌ๋ทฐ ๋จ์ ์ธก๋ฉด์์ ๊น๋
- ํ์ฌ ์ธก์ ๊ฐ๋ฅํ ๋ณ๋ชฉ์
contains_key(Rust call + index lock) ์ ์ด๋ฒคํธ๋ฃจํ/thread pool overhead ์
ํ์์ผ๋ก (a) ๊ฐ ํ์ํด์ง๋ฉด ๋ณ๋ ํญ๋ชฉ์ผ๋ก ํธ๋ํน.
2.6 _put_lock contentionโ
ํ์ฌ: per-key ์ง์
/์ดํ๋ง๋ค lock โ close() ์ polling ๊ณผ exists_in_put_tasks
๊ฐ ๋งค key ๋ง๋ค ์งง๊ฒ ๋ด์๋จ.
์ ์: ์ฌ์ ํํฐ๋ง ๋จ๊ณ์์ _put_lock ํ ๋ฒ ์ก๊ณ batch add. coroutine
finally ์์ ํ ๋ฒ ์ก๊ณ batch discard. โ lock ํ์ 200N โ 2~4 (๋ฐฐ์น๋น).
lock holding time ์ ์ฝ๊ฐ ๋์ด๋์ง๋ง (N๊ฐ set add) , Python set ์ add ๋ ~50ns ์์ค์ด๋ผ N=100 ์์๋ ์ ฮผs ๋ฏธ๋ง. close() polling ์ 10ms ์ฃผ๊ธฐ์ ๋นํด ๋ฌด์ ๊ฐ๋ฅ.
2.7 ๋ถ๋ถ ์คํจ ์ฒ๋ฆฌโ
put_many ๊ฒฐ๊ณผ ๋นํธ๋งต์์ ์ผ๋ถ False ์ธ ๊ฒฝ์ฐ:
- ํ์ฌ: ๊ฐ key ๋ณ coroutine ์ด RuntimeError ๋ฅผ raise โ future ๊ฐ exception ์ผ๋ก
done. ํธ์ถ์๊ฐ
fut.result()์์ ์์ธ ๋ฐ์. - ์ ์: ๋นํธ๋งต์ ๋ณด๊ณ ์คํจ key ๋ค์:
- log.error ๋ก ๋ณด๊ณ (key ์๋ณ์ ํฌํจ)
- ์ฝ๋ฐฑ ํธ์ถ ์ ํจ
- future ์์ฒด๋ ์ ์ ์๋ฃ (์ฑ๊ณตํ key ๋ค์ ์ ์ ์ฒ๋ฆฌ๋จ)
โ ์๋งจํฑ์ด ์ด์ง ๋ค๋ฆ. ํ์ฌ๋ "ํน์ key ๊ฐ ์คํจํ๋ฉด ๊ทธ key ์ future ๊ฐ exception" / ์ ์์ "future ๋ ํญ์ ์ ์, ์คํจ key ๋ log ๋ก๋ง ๋ณด๊ณ ".
ํธ์ถ์๊ฐ future ์์ exception ์ ์ก๋ ํจํด์ด ์๋์ง ํ์ธ ํ์:
- storage_manager.py: future ์ฌ์ฉ ์ ํจ
- ํ
์คํธ:
futs[0].result(timeout=10)โ timeout ๋ง ์ก๊ณ exception ์ propagate
โ ํ ์คํธ ํธํ์ ์ํด์๋ "์ ์ฒด N ๊ฐ ๋ชจ๋ ์คํจ ์์๋ง future ์ RuntimeError" ์ค์ ํ๋ ์ ์ถฉ์์ด ๊ฐ๋ฅ. ๊ทธ๋ฌ๋ partial failure ๊ฐ ํ์ค์์ ๊ฑฐ์ ๋ฐ์ํ์ง ์๋ (slot ๋ถ์กฑ, I/O ์คํจ ๋ฑ์ N๊ฐ ๋์ ๋ฐ์) ์ ์ ๊ฐ์ํ๋ฉด ๋จ์ํ ์์ .
โ ๊ฒฐ์ : ๋นํธ๋งต ์ ๋ถ False ์ผ ๋๋ง future ์ exception ์ค์ . ๊ทธ ์ธ์๋ log + callback skip ๋ก graceful ์ฒ๋ฆฌ. (ํ์ฌ ์๋งจํฑ๊ณผ ๊ฐ์ฅ ๊ฐ๊น์ด ์ ์ถฉ์)
2.8 ๊ฒํ ์์ ๋ฐฐ์ ํ ํญ๋ชฉโ
- core ์ธก batch API ์ ์ค (exists_inflight_many ๋ฑ): ๋ณ๋ PR
- legacy
_submit_put_one์์ ์ญ์ : ์ด๋ฒ PR ์์ ์ ๊ฑฐ (ํธ์ถ๋ถ ์์ด์ง) - option B (per-key future fan-out): ํธ์ถ์๊ฐ per-key ์๋ฃ๋ฅผ ์ง์ง๋ก ํ์๋ก ํ๋ ์์ ์ ๋ณ๋ PR
pin/contains๊ฒฝ๋ก batching (T2 ํญ๋ชฉ): L2 ์ ๋ณ๋
3. ํจ๊ณผโ
3.1 ์ ๋ ํจ๊ณผ (์์)โ
| ํญ๋ชฉ | N=10 | N=100 | ๋จ์ |
|---|---|---|---|
_put_lock ํ๋ | 30 โ 2 | 300 โ 2 | ํ |
core._lock (dedup ๋จ๊ณ) | 20 โ 1 | 200 โ 1 | ํ |
| ์ด๋ฒคํธ๋ฃจํ hop | 10 โ 1 | 100 โ 1 | ํ |
asyncio.to_thread dispatch | 10 โ 1 | 100 โ 1 | ํ |
core.put_many ํธ์ถ | 10 (๊ฐ N=1) | 100 (๊ฐ N=1) | ํธ์ถ |
| ์ ํธ์ถ ๋ด๋ถ lock (L1 ๋ฏธ์ ์ฉ) | 40 | 400 | ํ |
| ์ ํธ์ถ ๋ด๋ถ lock (L1 ์ ์ฉ) | 20 | 200 | ํ |
| ๋ฐฐ์นํ + L1 put ๋ด๋ถ lock | 2 | 2 | ํ |
โ N=100 ๊ธฐ์ค: dispatch overhead ~99% ๊ฐ์, lock ์ดํ์ ~99% ๊ฐ์ (L1 ๊ฒฐํฉ ์).
3.2 ์ ์ฑ ํจ๊ณผโ
- legacy ์ MP ๊ฒฝ๋ก (์ด๋ฏธ batched) ๊ฐ ๊ฐ์
core.put_many์๋งจํฑ์ ์ง์ ์ฌ์ฉ โ ์ฝ๋ ์ผ๊ด์ฑ - ์ธํฐํ์ด์ค ์ด๋ฆ (
batched_submit_put_task) ๊ณผ ๊ตฌํ ์๋งจํฑ ์ผ์น - io_uring SQ ํ์ฉ๋ฅ ์ ์ฌ์ ๊ฐ์ (ํ์ฌ ๋์ SQE = num_store_workers โ 2~4 โ
๋ฐฐ์น 1ํ๋ก N๊ฐ SQE ๊ฐ๋ฅ. ๋จ, ์คํจ ํ์ฉ์
put_many๋ด๋ถ ์ง๋ ฌ ๋ฃจํ๋ก ํ์ฌ๋ ๋นํ์ฑ โ P0 ๋ณ๋ ํญ๋ชฉ P0-put_many-parallel-io.md)
3.3 ๋นํจ๊ณผโ
core.put_many์๊ทธ๋์ฒ/์๋งจํฑ โ ๋ฏธ๋ณ๊ฒฝ- on-disk format โ ๋ฏธ๋ณ๊ฒฝ
- public interface (
batched_submit_put_task์๊ทธ๋์ฒ) โ ๋ฏธ๋ณ๊ฒฝ - T1 (
delete()TOCTOU ์์ ) โ ์ํฅ ์์ (๋ค๋ฅธ ๋ฉ์๋) - L1 (
put_many๋ด๋ถ lock) โ ํธ์ถ ํ์๋ง ์ค์. L1 ํจ๊ณผ๋ ๊ทธ๋๋ก ์ ํจ.
4. ๋ณ๊ฒฝ์ (๊ณํ)โ
4.1 batched_submit_put_task ์ฌ๊ตฌ์ฑโ
def batched_submit_put_task(self, keys, objs, transfer_spec=None,
on_complete_callback=None):
del transfer_spec
if not keys:
return None
loop = self.loop
if loop is None:
raise RuntimeError("RustRawBlockBackend requires an asyncio event loop")
# --- ์ฌ์ ํํฐ๋ง: dedup + spec encode (single _put_lock window) ---
accepted_keys: list[CacheEngineKey] = []
accepted_specs: list[RawBlockKeySpec] = []
accepted_objs: list[MemoryObj] = []
specs = [encode_legacy_key(key) for key in keys]
encoded_keys = [spec.encoded for spec in specs]
indexed_bitmap = self._core.exists_many(encoded_keys, lock=False)
with self._put_lock:
for i, key in enumerate(keys):
if key in self._put_tasks:
continue
if indexed_bitmap[i]:
continue
if self._core.exists_inflight(encoded_keys[i]):
continue
self._put_tasks.add(key)
accepted_keys.append(key)
accepted_specs.append(specs[i])
accepted_objs.append(objs[i])
if not accepted_keys:
return None
for obj in accepted_objs:
obj.ref_count_up()
fut = asyncio.run_coroutine_threadsafe(
self._submit_put_batch(
accepted_keys, accepted_specs, accepted_objs, on_complete_callback,
),
loop,
)
# ์ต์
A: ํธ์ถ์๊ฐ list[Future] ๋ฅผ ๊ธฐ๋ํ๋ฏ๋ก ๋์ผ future ๋ฅผ N๊ฐ ์์น์ ๋ณต์
return [fut] * len(accepted_keys)
4.2 _submit_put_batch (์ ๊ท, _submit_put_one ๋์ฒด)โ
async def _submit_put_batch(self, keys, specs, objs, on_complete_callback):
try:
put_result = await asyncio.to_thread(
self._core.put_many, specs, objs,
)
results = put_result.results
if not any(results):
raise RuntimeError(
f"Failed to persist {len(keys)} raw-block keys "
f"(first encoded: {specs[0].encoded})"
)
if on_complete_callback is not None:
for key, ok in zip(keys, results, strict=True):
if not ok:
continue
try:
on_complete_callback(key)
except Exception as e:
logger.warning(
"on_complete_callback failed for key %s: %s", key, e,
)
# ๋ถ๋ถ ์คํจ ๋ก๊น
for key, ok in zip(keys, results, strict=True):
if not ok:
logger.warning(
"RustRawBlockBackend: put failed for key %s", key,
)
finally:
for obj in objs:
obj.ref_count_down()
with self._put_lock:
for key in keys:
self._put_tasks.discard(key)
4.3 _submit_put_one ์ ๊ฑฐโ
ํธ์ถ์ ์์ โ ์ญ์ . ๊ด๋ จ docstring/์ฃผ์ ์ ๋ฆฌ.
4.4 docstring/commentโ
batched_submit_put_task ์ ๋ค์ invariant ๋ช
์:
- ๋ฐํ list ์ ๋ชจ๋ ํญ๋ชฉ์ ๋์ผ Future ๊ฐ์ฒด๋ฅผ ๊ฐ๋ฆฌํด (์ต์ A)
- ๋ถ๋ถ ์คํจ ์ future ๋ ์ ์ ์๋ฃ, ์คํจ key ๋ log ๋ก๋ง ๋ณด๊ณ
- ์ ๋ถ ์คํจ ์ future ๊ฐ RuntimeError ๋ก done
4.5 ํ ์คํธโ
- ๊ธฐ์กด test_rust_raw_block_backend.py ํ๊ท ํต๊ณผ (๊ฐ์ฅ ์ค์)
- ์ ๊ท:
test_batched_put_dedup_inflight_and_indexed: keys ์ผ๋ถ๊ฐ ์ด๋ฏธ ์ธ๋ฑ์ค์ ์๊ณ ์ผ๋ถ๋ inflight ์ผ ๋, ์ฌ์ ํํฐ๋ง์ด ์ ํํ ์์ฌ key ๋ง submit ํ๋์งtest_batched_put_partial_failure_does_not_break_others:core.put_many๋ฅผ mock ํ์ฌ ์ผ๋ถ False ๋ฐํ ์, ์ฑ๊ณต key ์ callback ๋ง ํธ์ถ๋๋์งtest_batched_put_returns_same_future_per_key: ์ต์ A ์๋งจํฑ ๊ฒ์ฆtest_batched_put_ref_count_balanced_on_exception:put_many๊ฐ raise ํด๋ ๋ชจ๋ obj ์ ref_count ๊ฐ net 0 ์ด ๋๋์ง
5. ๊ฐ์ ํฌ์ธํธ ํ์ธ ๋ฐฉ๋ฒ (์์น ์ฆ๋ช )โ
5.1 dispatch overhead ์ธก์ โ
# ๋ณ๊ฒฝ ์ /ํ ๊ฐ๊ฐ:
# - 100๊ฐ obj ์ค๋น
# - perf_counter ๋ก batched_submit_put_task ํธ์ถ~๋ชจ๋ future done ๊น์ง elapsed
# - run_coroutine_threadsafe ํธ์ถ ํ์ (mock ์ผ๋ก ์นด์ดํธ)
๊ธฐ๋์น: dispatch elapsed N=100 ๊ธฐ์ค ๋ณ๊ฒฝ ํ 1/10~1/50 (event loop ํ ๋๊ธฐํ overhead ๊ฐ main ๋น์ฉ).
5.2 lock contention ์ธก์ โ
# section 4.1 ์ ์ค sub: CountingLock ์ผ๋ก _put_lock ์ swap
# batched_submit_put_task(keys=[100], objs=[100]) 1ํ
# assert acquire_count <= 4 (๋ณ๊ฒฝ ํ) / >= 200 (๋ณ๊ฒฝ ์ )
5.3 throughput ์ธก์ (์ค์ raw device ๋๋ io_uring tmp file)โ
# - _has_ext() ๊ฐ๋ฅํ ํ๊ฒฝ์์๋ง
# - N=1, 10, 100, 1000 ๋ฐฐ์น ํฌ๊ธฐ๋ณ put ์๋ฃ๊น์ง wall clock
# - ๋ณ๊ฒฝ ์ /ํ ๋น๊ต
๊ธฐ๋์น: N ์ด ํด์๋ก ๋ณ๊ฒฝ ํ throughput ์ฐ์ (~N๋ฐฐ ๊น์ง)
5.4 ์ ํฉ์ฑโ
pytest tests/v1/storage_backend/test_rust_raw_block_backend.py -v
pytest tests/v1/storage_backend/test_storage_plugin.py -v
6. ์ํ / ๋กค๋ฐฑโ
6.1 ์ํ ํญ๋ชฉโ
| ์ํ | ๊ฐ๋ฅ์ฑ | ์ํฅ | ์ํ |
|---|---|---|---|
| ์ต์ A ์๋งจํฑ ๋ณํ๋ก ํธ์ถ์ ๊นจ์ง | ๋ฎ์ | ์ผ๋ถ ํ ์คํธ ์คํจ | ํธ์ถ์ grep ๊ฒฐ๊ณผ storage_manager/cache_engine ์์ future ์ฌ์ฉ ์ ํจ ํ์ธ ์๋ฃ |
| ๋ถ๋ถ ์คํจ โ future ์ ์ ์๋ฃ๋ก ์ธํ silent loss | ์ค๊ฐ | ์ผ๋ถ key ์๊ตฌ ๋ฏธ์ ์ฅ | log.warning ๋ช ์ + ์ถํ metric ์ถ๊ฐ ๊ฒํ |
์ฌ์ ํํฐ๋ง ๋จ๊ณ์ exists_many ์ exists_inflight ๊ฐ race window | ๋ฎ์ | ์ค๋ณต put ์๋ | core.put_many ๊ฐ ์ด๋ฏธ idempotent (_index / _inflight ์ฌ๊ฒ์ฌ) |
_submit_put_batch ์ finally ๋์ | ๋ฎ์ | ref_count ์๊ตฌ +1 | 5.4 + ref_count ํ ์คํธ (5.x ํญ๋ชฉ) |
| L1 ๋ฏธ์ ์ฉ ํ๊ฒฝ์์ batch 1ํ lock holding ๊ธธ์ด์ง | ๋ฎ์ | exists_many ๋ฑ ๋๊ธฐ โ | L1 ๋์ ์งํ ๊ถ์ฅ. ๋ฏธ์ ์ฉ ์์๋ 200 โ 2N ์ผ๋ก ํ์ ์ค์ด๋ฆ |
6.2 ๋กค๋ฐฑโ
๋ณ๊ฒฝ์ด ๋จ์ผ ํ์ผ/3๊ฐ ๋ฉ์๋์ ๊ตญํ. PR ๋จ์ revert ์ถฉ๋ถ. on-disk format/๊ณต๊ฐ ์ธํฐํ์ด์ค ์๊ทธ๋์ฒ ๋ฏธ๋ณ๊ฒฝ โ ์ด์ ์ค ๋กค๋ฐฑ ์ ์ถ๊ฐ ๋ง์ด๊ทธ๋ ์ด์ ๋ถํ์.
6.3 ๋จธ์ง ์์ ์์กด์ฑโ
- T1 (
9fc5a901) ๊ฐ ๋ฒ ์ด์ค. T1 ๋จธ์ง ํ rebase ์ ์๋ ์ ํฉ (T1 ์remove()1์ค, L2 ๋batched_submit_put_taskโ ๋ค๋ฅธ ๋ฉ์๋) - L1 ๊ณผ๋ ํ์ผ์ด ๋ค๋ฆ โ ๋ฌด๊ด
- ๊ถ์ฅ ๋จธ์ง ์์: T1 โ L1 โ L2 (๋๋ T1 โ L2 โ L1, ์์ ๋ฌด๊ด)
7. ๋ณ๊ฒฝ ๋ก๊ทธโ
| ์ผ์ | ์์ฑ์ | ๋ด์ฉ |
|---|---|---|
| 2026-05-28 | ny | ์ด์ ์์ฑ. ์ต์ A ์ฑํ. ๋ถ๋ถ ์คํจ๋ graceful + ์ ๋ถ ์คํจ ์ raise ์ ์ถฉ์ ๊ฒฐ์ . ๊ตฌํ ๋ฏธ์ฐฉ์. |
| 2026-05-28 | ny | ๊ตฌํ ์๋ฃ (rust_raw_block_backend.py). _submit_put_one โ _submit_put_batch ๋ก ๊ต์ฒด. ๊ธฐ์กด test_rust_raw_block_backend.py:1044 ์ match="Failed to persist raw-block key" โ match="Failed to persist raw-block" ๋ก ๋จ์ถ (์ ๋ฉ์์ง Failed to persist raw-block batch of N keys์ ํธํ). |
| 2026-05-28 | ny | ์ ๊ท ํ ์คํธ 4๊ฐ ์ถ๊ฐ. ์ฒซ ์คํ์์ 2๊ฐ ์คํจ โ ๋ชจ๋ ํ ์คํธ ์ฝ๋ ๋ฒ๊ทธ. 8.1 / 8.2 ์ฐธ์กฐ. production ์ฝ๋ ๋ณ๊ฒฝ ์ฌํญ์ ๊ทธ๋๋ก ์ ์ง. |
| 2026-05-28 | ny | ํ
์คํธ helper config ์์ + patch lifecycle ์์ ํ ๋ชจ๋ ํ
์คํธ ํต๊ณผ. obj.ref_count_down() ๋ช
์ ํธ์ถ ์ถ๊ฐ (๊ธฐ์กด ํ
์คํธ ํจํด๊ณผ ์ผ์น). ์ต์ข
: 7 passed (๊ธฐ์กด 3 + ์ ๊ท 4), 26 skipped (Rust ํ์ฅ ์์กด). ruff check / ruff format ํต๊ณผ. |
| 2026-06-01 | ny | ์ฝ๋ ๋ฆฌ๋ทฐ (cr-93403c0f) ์ Tier 1 ํญ๋ชฉ ๋ณธ PR ์์์ ์ฒ๋ฆฌ. (1) F2 โ exists_many(lock=False) ํธ์ถ์ _put_lock ์์ผ๋ก ์ด๋์์ผ dedup TOCTOU ์๋์ฐ ์ ๊ฑฐ. (2) F1+F3 โ ref_count_up ๋ฃจํ + run_coroutine_threadsafe dispatch ๋ฅผ try/except ๋ก ๊ฐ์ธ ๋ถ๋ถ ์งํ ๋ณด์ (์ฑ๊ณตํ ref_count ๋ง ref_count_down + _put_tasks ์ผ๊ด discard + ๋ฏธ-dispatch ์ฝ๋ฃจํด coro.close()). ์ ๊ท ํ
์คํธ 2๊ฐ (dispatch failure / ref_count_up failure ๋ณด์). ์ต์ข
: 9 passed (๊ธฐ์กด 7 + ์ ๊ท 2), 26 skipped. ruff check / ruff format ํต๊ณผ. |
8. ๊ตฌํ ์ค ๋ฐ๊ฒฌ๋ ์ด์โ
8.1 helper config ์ค์ โ slot_bytes ๋๋ฌด ์์โ
์ฆ์: ์ฒซ ์ ๊ท ํ ์คํธ ์คํ ์
RawBlockCore write failed for ... :
RawBlockCore payload 65536 exceeds slot capacity 4096
์์ธ: _make_legacy_backend helper ์์ rust_raw_block.slot_bytes=8192,
header_bytes=4096 ๋ก ๋๋ฉด payload capacity = slot - header = 4096 bytes.
ํ์ง๋ง ํ
์คํธ๊ฐ ์ฌ์ฉํ๋ KV chunk ๋ [2,16,8,128] ร bfloat16 = 64KB โ
์ด๊ณผ.
ํด๊ฒฐ: slot_bytes ๋ช
์๋ฅผ ์ ๊ฑฐํ์ฌ local_cpu_backend.get_full_chunk_size_bytes()
๋ก ์๋ ๊ณ์ฐ๋๋๋ก ๋์๋ค (rust_raw_block_backend._build_core_config ์
default ๋์). ๊ธฐ์กด production ํ
์คํธ๋ค์ด ๋ชจ๋ ์ด default ํจํด์ ์ฌ์ฉ โ
helper ๋ ๋์ผ ํจํด์ผ๋ก ํต์ผ.
์๋ฏธ: production ์ฝ๋ ๋ณ๊ฒฝ ์ฌํญ๊ณผ ๋ฌด๊ด. ํ ์คํธ helper ์ config ์ค์.
8.2 ํ
์คํธ์ patch.object ๋ฒ์ ์ค๋ฅโ
์ฆ์:
recorded_specs == [] # ํธ์ถ ์์ ์ patch unwound
mock ๋ put_many ๊ฐ ํธ์ถ๋์ง ์์์๋ backend.contains(fresh_key) is True
๋ ํต๊ณผ โ ์ง์ง put_many ๊ฐ patch ๊ฐ ํ๋ฆฐ ๋ค์ ์คํ๋๊ณ ์์.
์์ธ: batched_submit_put_task ๊ฐ asyncio.run_coroutine_threadsafe ๋ก
coroutine ์ ํ์ํ๊ณ ์ฆ์ ๋ฐํ. ์ค์ asyncio.to_thread(self._core.put_many, ...)
๋ ๋ณ๋ thread ์์ ๋์ค์ ํธ์ถ. ํ
์คํธ๊ฐ with patch.object(...) ๋ธ๋ก
๋ฐ์์ futs[0].result(timeout=10) ์ ๊ธฐ๋ค๋ฆฌ๋๋ก ์์ฑ๋์ด, patch ๊ฐ
์ด๋ฏธ unwind ๋ ํ worker thread ๊ฐ ์ค์ ํธ์ถ์ ์ํํ๋ค.
ํด๊ฒฐ: with patch.object(...) ๋ธ๋ก ์์ result(timeout=10) ๊น์ง ํฌํจ.
๋ค๋ฅธ ์ ๊ท ํ
์คํธ (partial_failure) ๋ ๊ฐ์ ๊ตฌ์กฐ์์ผ๋ฏ๋ก ํจ๊ป ์์ .
์๋ฏธ: production ์ฝ๋ ์๋งจํฑ๊ณผ ๋ฌด๊ด. ํ ์คํธ์ patch ์๋ช ๊ด๋ฆฌ ์ค๋ฅ. ์ด ์ด์๊ฐ ๋ฐ์ํ ์ด์ ๋ ์ ๊ตฌ์กฐ๊ฐ "๋จ์ผ future๊ฐ batch ์ ์ฒด ์๋ฃ๋ฅผ ํํ" ํ๊ธฐ ๋๋ฌธ โ ๊ธฐ์กด์ per-key future ๋๋น await ์์น๊ฐ ํ ๋จ๊ณ ์ง์ฐ๋๋ค. ํฅํ mock ๊ธฐ๋ฐ ํ ์คํธ ์์ฑ ์ ๋์ผ ํจํด ์ฃผ์ ํ์.
9. ์ธก์ ๊ฒฐ๊ณผ (2026-06-08)โ
๋ฒค์น๋งํฌ ์คํฌ๋ฆฝํธ: benchmarks/storage_backend_io/bench_dispatch_patterns.py
ํ๊ฒฝ: /home/ny/LMCache, fake in-memory device, Python 3.12, warmup=3, iters=10
๊ฒฐ๊ณผโ
dispatch benchmark obj=64B warmup=3 iters=10
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
N lat ยตs batch med fanout med (b-f)/f verdict
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
10 0 0.117 ms 0.763 ms -84.7% batch faster โ
10 50 2.466 ms 1.175 ms +109.9% fanout faster !
10 200 5.389 ms 1.088 ms +395.1% fanout faster !
100 0 0.773 ms 5.059 ms -84.7% batch faster โ
100 50 29.140 ms 18.208 ms +60.0% fanout faster !
100 200 56.825 ms 39.412 ms +44.2% fanout faster !
1000 0 7.103 ms 97.951 ms -92.7% batch faster โ
1000 50 271.195 ms 267.597 ms +1.3% ~equal
1000 200 579.637 ms 256.461 ms +126.0% fanout faster !
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
negative diff โ batch is faster
ํด์โ
| ๊ตฌ๊ฐ | ๊ฒฐ๊ณผ | ์์ธ |
|---|---|---|
| lat=0ยตs (์์ CPU) | batch 85~93% ๋น ๋ฆ | dispatch overhead ์ ์ฝ ํจ๊ณผ |
| lat=50ยตs (NVMe ํ์ค) | fanout 60~110% ๋น ๋ฆ | thread pool ๋ณ๋ ฌ ์ฐ๊ธฐ๊ฐ dispatch ์ ์ฝ์ ์๋ |
| lat=200ยตs (๋๋ฆฐ NVMe) | fanout 44~395% ๋น ๋ฆ | ๋ณ๋ ฌ์ฑ ํจ๊ณผ ๊ทน๋ํ |
| lat=50ยตs, N=1000 | ~equal (+1.3%) | workers ํฌํ(~32) + dispatch ์ ์ฝ ์์ |
๊ทผ๋ณธ ์์ธ: batch๋ ํ๋์ asyncio.to_thread์์ N๊ฐ ์ฐ๊ธฐ๋ฅผ ์ง๋ ฌ ์คํ.
fanout์ N๊ฐ to_thread ํ์คํฌ๊ฐ thread pool์์ ๋์ ์คํ.
put_many ๋ด๋ถ์์ _write_one ์ _lock์ ํด์ ํ๋ฏ๋ก ์ค์ I/O๊ฐ overlap๋จ
โ ์ค I/O๊ฐ ์์ผ๋ฉด parallelism ์ด๋์ด dispatch ์ ์ฝ์ ์๋.
ํ์ โ
์ด ๋ณ๊ฒฝ์ lat=0 (์์ CPU) ๊ตฌ๊ฐ์์๋ง ์ด๋์ด ์์ผ๋ฉฐ, ์ค ์คํ ๋ฆฌ์ง๊ฐ ๋ถ๋ NVMe ํ์ค ๊ตฌ๊ฐ์์๋ ์คํ๋ ค regression.
5a27732f(4Nโ2N ๋ฝ)๊ฐ <1%๋ผ ๋๋กญ๋ ๊ฒ๊ณผ ๋ฌ๋ฆฌ, ์ด๋ฒ์ ๋ฐฉํฅ ์์ฒด๊ฐ ์ญ์ ๋จ.
batch ํจํด์ด "์ฑ๋ฅ ์ต์ ํ"๊ฐ ๋๋ ค๋ฉด put_many ๋ด๋ถ๊ฐ parallel I/O๋ฅผ ์ง์ํด์ผ ํจ
(ํ์ฌ๋ ์ง๋ ฌ ๋ฃจํ โ PR #3274 ์ดํ P0 ๊ณผ์ ).
๊ฒฐ์ (2026-06-10 ์ฌ๊ฒํ ): dispatch batching์ L2์์ ์ ๊ฑฐํ๊ณ P0๋ก ์ด์ .
์ด์ : L2๊ฐ correctness fix์ dispatch batching์ ๋์์ ํฌํจํ๋ฉด, dispatch batching alone์ด NVMe ๊ตฌ๊ฐ์์ regression์ ์ผ์ผ์ผ ๋ฆฌ๋ทฐ์ด๊ฐ "correctness PR์ธ๋ฐ ์ ์ฑ๋ฅ์ด ๋๋น ์ง๋"๊ณ ์์ฌํ ์ ์์. PR ์ ๋ขฐ๋ ์ ํ.
โ L2 = pure correctness fix (TOCTOU + resource leak + F6 bug). #3274 ์์ด dev์ ๋จ๋ ์ ์ถ ๊ฐ๋ฅ. dispatch batching ์ ๊ฑฐ ํ 29๊ฐ ํ ์คํธ ํต๊ณผ.
dispatch batching (_submit_put_many + io_uring ๋ถ๊ธฐ) ์ P0 PR๋ก ์ด์ .
์์ธ: P0-put_many-parallel-io.md