Plugin Pipeline — adapter ↔ plugin 연결 메커니즘
변경 검증 가이드 (다음 fetch 후):
git log eaa2bfee..HEAD -- lmcache/v1/distributed/l2_adapters/plugin_l2_adapter.py lmcache/v1/distributed/l2_adapters/native_connector_l2_adapter.py lmcache/v1/distributed/l2_adapters/factory.py lmcache/v1/distributed/l2_adapters/raw_block_l2_adapter.py
plugin_l2_adapter.py의 config 필드 또는_resolve_config_class우선순위 변경 시 L1 contract 표 + 자동 탐색 순서 다시 작성.native_connector_l2_adapter.py의 connector 6메서드 시그니처 변경 시 native_plugin 비교 표 갱신.factory.py의 lazy import / discover_subclasses 메커니즘이 바뀌면 다이어그램 A (생성/등록 시점) 통째로 재작성.raw_block_l2_adapter.py:320-345의 ThreadPoolExecutor 3개 패턴이 사라지면 "skeleton 레퍼런스" 문장 무효.
원문: docs/design/v1/distributed/l2_adapters/plugin.md, plugin_l2_adapter.py
계층 관계
L2AdapterInterface (추상)
│
├─ 빌트인 어댑터 (raw_block, dax, nixl, fs, resp ...)
│ 직접 L2AdapterInterface 구현
│
└─ plugin / native_plugin ← 외부 코드를 연결하는 브리지
│
├─ "plugin" 외부에서 L2AdapterInterface 구현체를 통째로 로드
└─ "native_plugin" 외부에서 connector (6메서드) 로드 → NativeConnectorL2Adapter 가 감쌈
L1 — Contract
plugin 타입 (plugin_l2_adapter.py:33)
JSON 에 "type": "plugin" 지정 시 동작:
| 필드 | 필수 | 설명 |
|---|---|---|
module_path | 필수 | 어댑터 클래스가 있는 Python 모듈 경로 (dotted) |
class_name | 필수 | 모듈 안에서 L2AdapterInterface 를 구현한 클래스 이름 |
adapter_params | 선택 | 어댑터 생성자에 전달할 kwargs dict |
config_class_name | 선택 | 어댑터 config 클래스 이름 (없으면 자동 탐색) |
플러그인이 반드시 해야 하는 것:
L2AdapterInterface서브클래스- 추상 메서드 전부 구현 (eventfd 3개 포함)
- thread-safe 구현 (StoreController + PrefetchController 동시 호출)
**kwargs수용 (forward-compatibility)
플러그인이 직접 해야 하는 것 (framework 미제공):
- 자체 asyncio event loop + background thread 생성
create_event_notifier()(lmcache.v1.platform) 로 eventfd 생성close()에서 모든 리소스 정리
native_plugin 타입
| 구분 | plugin | native_plugin |
|---|---|---|
| 외부가 구현하는 것 | L2AdapterInterface 전체 | connector 6메서드 |
| 브리지 로직 | 플러그인이 직접 | NativeConnectorL2Adapter 재사용 |
| 적합한 경우 | Python 플러그인 | C++/Rust pybind11 connector |
native_plugin 의 필수 connector 메서드:
def event_fd(self) -> int: ...
def submit_batch_get(self, keys, memoryviews) -> int: ...
def submit_batch_set(self, keys, memoryviews) -> int: ...
def submit_batch_exists(self, keys) -> int: ...
def drain_completions(self) -> list[tuple[int, bool, str, list[bool] | None]]: ...
def close(self) -> None: ...
L2 — I/O 경로 (call graph)
plugin 생성 흐름 (plugin_l2_adapter.py:126)
_create_plugin_adapter(config, l1_memory_desc)
│
├─ 1. importlib.import_module(config.module_path)
│ 실패 → ImportError
│
├─ 2. getattr(module, config.class_name)
│ 실패 → AttributeError
│
├─ 3. issubclass(adapter_cls, L2AdapterInterface) 확인
│ 실패 → TypeError
│
├─ 4. _resolve_config_class(module, config, adapter_cls)
│ config class 탐색 (우선순위 4단계, 아래 참조)
│
├─ 5a. [config class 발견] adapter_cls(cfg_cls.from_dict(adapter_params))
└─ 5b. [발견 안 됨] adapter_cls(adapter_params) ← raw dict 전달
config class 자동 탐색 순서 (plugin_l2_adapter.py:179)
1순위: JSON 의 config_class_name 필드 ("config_class_name": "MyConfig")
2순위: class_name + "Config" 컨벤션 MyL2Adapter → MyL2AdapterConfig
3순위: adapter_cls.config_class_name 클래스 속성으로 명시
4순위: None raw adapter_params dict 그대로 전달
컨벤션(2순위)을 따르면 JSON 에 config_class_name 없어도 자동으로 typed config 인스턴스 전달됨.
런타임 threading 모델 (plugin 내부)
Plugin.__init__()
├─ self._store_efd = create_event_notifier()
├─ self._lookup_efd = create_event_notifier()
├─ self._load_efd = create_event_notifier()
├─ self._loop = asyncio.new_event_loop()
└─ self._thread = Thread(target=run_loop, daemon=True).start()
StoreController 스레드 → submit_store_task()
└─ run_coroutine_threadsafe(coro, self._loop)
PrefetchController 스레드 → submit_load_task()
└─ run_coroutine_threadsafe(coro, self._loop)
Plugin background thread (event loop)
├─ store/load coroutine 실행
├─ 완료 → eventfd write (컨트롤러 poll 에서 깨어남)
└─ 공유 상태 접근 시 lock 필요
Framework 가 event loop 를 제공하지 않음 — plugin 이 직접 만들어야 함.
L3 — FDP / HC-SSD 삽입 관점
plugin 타입이 FDP/HC-SSD 백엔드의 자연스러운 진입점.
- LMCache 코드 수정 없이
L2AdapterInterface구현체로 등록 가능 adapter_params로 디바이스 경로, FDP stream 수, RUH 매핑 등 파라미터 전달- 예시 config:
{
"type": "plugin",
"module_path": "lmc_fdp_backend.adapter",
"class_name": "FDPBackendL2Adapter",
"adapter_params": {
"device": "/dev/ng0n1",
"fdp_nruh": 8,
"placement_strategy": "by_cache_salt"
}
}
HC-SSD offload 의 경우:
serde_config의SerdeL2AdapterWrapper위치가 후보- 또는 plugin 내
submit_store_task에서 직접 offload 호출
L4 — io_uring (해당 없음)
plugin 추상화 레벨에서는 io_uring 직접 노출 없음.
raw_block plugin (plugins/rust_raw_block_backend.py) 이 내부적으로 io_uring 사용 → TODO 4 에서 분석.
다이어그램
A. 생성/등록 시점 (한 번만 일어남)
flowchart TD
JSON["JSON config<br/>type: 'plugin'<br/>module_path, class_name,<br/>adapter_params"]
subgraph Factory["L2 Adapter Factory"]
Reg["_L2_ADAPTER_FACTORY_REGISTRY<br/>name → factory"]
Pend["_PENDING_MODULES<br/>(lazy import 대상)"]
EAL["ensure_adapter_loaded(name)<br/>name 보일 때까지<br/>importlib.import_module 반복"]
end
PluginMod["plugin_l2_adapter.py<br/>(self-register)<br/>register_l2_adapter_type('plugin', ...)<br/>register_l2_adapter_factory('plugin', _create_plugin_adapter)"]
Create["create_l2_adapter_from_registry(config)"]
Make["_create_plugin_adapter(config)"]
UserMod["사용자 모듈<br/>(module_path 가 가리키는 곳)"]
Resolve["_resolve_config_class<br/>1. config_class_name<br/>2. class_name + 'Config'<br/>3. adapter_cls.config_class_name<br/>4. None"]
SubChk["issubclass(adapter_cls,<br/>L2AdapterInterface) ?"]
PluginInst["plugin adapter 인스턴스<br/>__init__ 안에서:<br/>• 3 × eventfd 생성<br/>• ThreadPoolExecutor / asyncio loop<br/>• 결과 큐, lock"]
JSON --> Create
PluginMod -. import 시 self-register .-> Reg
Create --> EAL --> Pend
EAL --> Reg
Reg -- factory 호출 --> Make
Make -- importlib.import_module(module_path) --> UserMod
Make -- getattr(class_name) --> SubChk
SubChk -- yes --> Resolve
Resolve -- cfg_cls.from_dict(adapter_params) --> PluginInst
Resolve -- None --> PluginInst
B. 런타임 호출 경로 (요청마다)
flowchart TD
SC["StoreController<br/>(스레드 A)"]
PC["PrefetchController<br/>(스레드 B)"]
subgraph Wrap["SerdeL2AdapterWrapper (선택적, 투명 래퍼)"]
Ser["직렬화 / 역직렬화"]
end
subgraph Plugin["plugin adapter (사용자 코드)"]
SubStore["submit_store_task<br/>(non-blocking)"]
SubLook["submit_lookup_and_lock_task"]
SubLoad["submit_load_task"]
Pop["pop_completed_store_tasks<br/>query_lookup_result<br/>query_load_result"]
EFDS[("store_efd")]
EFDK[("lookup_efd")]
EFDL[("load_efd")]
Exec["Executor<br/>ThreadPoolExecutor /<br/>asyncio loop"]
Q["완료 결과 큐<br/>(task_id → result)"]
IO["실제 I/O<br/>(write / read / lookup)"]
end
Dev[("스토리지 디바이스")]
SC -- "1\. submit_store_task(task_id, kv_chunks)" --> Ser --> SubStore
PC -- "1\. submit_lookup..." --> SubLook
PC -- "1\. submit_load_task" --> SubLoad
SubStore -- "2\. 큐 enqueue + 즉시 return" --> Exec
SubLook --> Exec
SubLoad --> Exec
Exec -- "3\. 워커가 dispatch" --> IO
IO -- "4\. syscall / passthru" --> Dev
Dev -- "완료" --> IO
IO -- "5a. 결과 큐 push" --> Q
IO -- "5b. eventfd write(1)" --> EFDS
IO --> EFDK
IO --> EFDL
EFDS -. "6\. poll 깨어남" .-> SC
EFDK -. "6\. poll 깨어남" .-> PC
EFDL -. "6\. poll 깨어남" .-> PC
SC -- "7\. pop/query 로 결과 회수" --> Pop
PC --> Pop
Pop -- "큐에서 꺼냄" --> Q
다이어그램에서 봐야 하는 포인트
| 번호 | 의미 | 코드 위치 |
|---|---|---|
| A. self-register | plugin 모듈이 import 되는 순간 factory 등록 | plugin_l2_adapter.py:213-214 |
| A. lazy import | type 이름이 registry 에 없으면 _PENDING_MODULES 에서 import 시도 | factory.py:101-135 |
| A. config 자동 탐색 | class_name + "Config" 컨벤션이면 JSON 에 config_class_name 안 적어도 됨 | plugin_l2_adapter.py:179-209 |
| B. 1→2 비동기 경계 | 컨트롤러는 submit 후 즉시 다른 일 함 — adapter 가 blocking 하면 안 됨 | invariant |
| B. eventfd 3개 | store / lookup / load 각각 분리 — 컨트롤러가 다른 스레드라서 | base.py 계약 |
| B. 5a + 5b 순서 | 결과 큐 push → eventfd write 순서 지켜야 race 없음 | plugin 구현 책임 |
| B. 6→7 두 단계 | poll 만으로 결과 안 옴; pop/query 로 따로 가져와야 함 | "two-phase" 알림 |
새 plugin 만들 때 위 그림 (B) 의
Plugin박스 안쪽 전체가 짜야 할 코드. 바깥쪽 (StoreController,PrefetchController,SerdeL2AdapterWrapper, eventfd poll 인프라) 은 LMCache 가 다 깔아둠.raw_block_l2_adapter.py:320-345의ThreadPoolExecutor3개 패턴이 그 박스 안 skeleton 의 레퍼런스.
Open Questions
native_plugin의drain_completions()반환 타입list[tuple[int, bool, str, list[bool] | None]]— 각 원소가 정확히 무엇을 의미하는지NativeConnectorL2Adapter코드 확인 필요.- FDP backend 를
plugin으로 만들 때,_notify_keys_stored(keys, sizes)를 base class 처럼 호출해야 eviction 회계가 작동함 — base class__init__도 제대로 호출해야 하는지 확인 필요 (super().init(max_capacity_bytes=...)).