본문으로 건너뛰기

Plugin Pipeline — adapter ↔ plugin 연결 메커니즘

변경 검증 가이드 (다음 fetch 후):

git log eaa2bfee..HEAD -- lmcache/v1/distributed/l2_adapters/plugin_l2_adapter.py lmcache/v1/distributed/l2_adapters/native_connector_l2_adapter.py lmcache/v1/distributed/l2_adapters/factory.py lmcache/v1/distributed/l2_adapters/raw_block_l2_adapter.py
  • plugin_l2_adapter.py 의 config 필드 또는 _resolve_config_class 우선순위 변경 시 L1 contract 표 + 자동 탐색 순서 다시 작성.
  • native_connector_l2_adapter.py 의 connector 6메서드 시그니처 변경 시 native_plugin 비교 표 갱신.
  • factory.py 의 lazy import / discover_subclasses 메커니즘이 바뀌면 다이어그램 A (생성/등록 시점) 통째로 재작성.
  • raw_block_l2_adapter.py:320-345 의 ThreadPoolExecutor 3개 패턴이 사라지면 "skeleton 레퍼런스" 문장 무효.

원문: docs/design/v1/distributed/l2_adapters/plugin.md, plugin_l2_adapter.py


계층 관계

L2AdapterInterface (추상)

├─ 빌트인 어댑터 (raw_block, dax, nixl, fs, resp ...)
│ 직접 L2AdapterInterface 구현

└─ plugin / native_plugin ← 외부 코드를 연결하는 브리지

├─ "plugin" 외부에서 L2AdapterInterface 구현체를 통째로 로드
└─ "native_plugin" 외부에서 connector (6메서드) 로드 → NativeConnectorL2Adapter 가 감쌈

L1 — Contract

plugin 타입 (plugin_l2_adapter.py:33)

JSON 에 "type": "plugin" 지정 시 동작:

필드필수설명
module_path필수어댑터 클래스가 있는 Python 모듈 경로 (dotted)
class_name필수모듈 안에서 L2AdapterInterface 를 구현한 클래스 이름
adapter_params선택어댑터 생성자에 전달할 kwargs dict
config_class_name선택어댑터 config 클래스 이름 (없으면 자동 탐색)

플러그인이 반드시 해야 하는 것:

  • L2AdapterInterface 서브클래스
  • 추상 메서드 전부 구현 (eventfd 3개 포함)
  • thread-safe 구현 (StoreController + PrefetchController 동시 호출)
  • **kwargs 수용 (forward-compatibility)

플러그인이 직접 해야 하는 것 (framework 미제공):

  • 자체 asyncio event loop + background thread 생성
  • create_event_notifier() (lmcache.v1.platform) 로 eventfd 생성
  • close() 에서 모든 리소스 정리

native_plugin 타입

구분pluginnative_plugin
외부가 구현하는 것L2AdapterInterface 전체connector 6메서드
브리지 로직플러그인이 직접NativeConnectorL2Adapter 재사용
적합한 경우Python 플러그인C++/Rust pybind11 connector

native_plugin 의 필수 connector 메서드:

def event_fd(self) -> int: ...
def submit_batch_get(self, keys, memoryviews) -> int: ...
def submit_batch_set(self, keys, memoryviews) -> int: ...
def submit_batch_exists(self, keys) -> int: ...
def drain_completions(self) -> list[tuple[int, bool, str, list[bool] | None]]: ...
def close(self) -> None: ...

L2 — I/O 경로 (call graph)

plugin 생성 흐름 (plugin_l2_adapter.py:126)

_create_plugin_adapter(config, l1_memory_desc)

├─ 1. importlib.import_module(config.module_path)
│ 실패 → ImportError

├─ 2. getattr(module, config.class_name)
│ 실패 → AttributeError

├─ 3. issubclass(adapter_cls, L2AdapterInterface) 확인
│ 실패 → TypeError

├─ 4. _resolve_config_class(module, config, adapter_cls)
│ config class 탐색 (우선순위 4단계, 아래 참조)

├─ 5a. [config class 발견] adapter_cls(cfg_cls.from_dict(adapter_params))
└─ 5b. [발견 안 됨] adapter_cls(adapter_params) ← raw dict 전달

config class 자동 탐색 순서 (plugin_l2_adapter.py:179)

1순위: JSON 의 config_class_name 필드 ("config_class_name": "MyConfig")
2순위: class_name + "Config" 컨벤션 MyL2Adapter → MyL2AdapterConfig
3순위: adapter_cls.config_class_name 클래스 속성으로 명시
4순위: None raw adapter_params dict 그대로 전달

컨벤션(2순위)을 따르면 JSON 에 config_class_name 없어도 자동으로 typed config 인스턴스 전달됨.

런타임 threading 모델 (plugin 내부)

Plugin.__init__()
├─ self._store_efd = create_event_notifier()
├─ self._lookup_efd = create_event_notifier()
├─ self._load_efd = create_event_notifier()
├─ self._loop = asyncio.new_event_loop()
└─ self._thread = Thread(target=run_loop, daemon=True).start()

StoreController 스레드 → submit_store_task()
└─ run_coroutine_threadsafe(coro, self._loop)

PrefetchController 스레드 → submit_load_task()
└─ run_coroutine_threadsafe(coro, self._loop)

Plugin background thread (event loop)
├─ store/load coroutine 실행
├─ 완료 → eventfd write (컨트롤러 poll 에서 깨어남)
└─ 공유 상태 접근 시 lock 필요

Framework 가 event loop 를 제공하지 않음 — plugin 이 직접 만들어야 함.


L3 — FDP / HC-SSD 삽입 관점

plugin 타입이 FDP/HC-SSD 백엔드의 자연스러운 진입점.

  • LMCache 코드 수정 없이 L2AdapterInterface 구현체로 등록 가능
  • adapter_params 로 디바이스 경로, FDP stream 수, RUH 매핑 등 파라미터 전달
  • 예시 config:
{
"type": "plugin",
"module_path": "lmc_fdp_backend.adapter",
"class_name": "FDPBackendL2Adapter",
"adapter_params": {
"device": "/dev/ng0n1",
"fdp_nruh": 8,
"placement_strategy": "by_cache_salt"
}
}

HC-SSD offload 의 경우:

  • serde_configSerdeL2AdapterWrapper 위치가 후보
  • 또는 plugin 내 submit_store_task 에서 직접 offload 호출

L4 — io_uring (해당 없음)

plugin 추상화 레벨에서는 io_uring 직접 노출 없음. raw_block plugin (plugins/rust_raw_block_backend.py) 이 내부적으로 io_uring 사용 → TODO 4 에서 분석.


다이어그램

A. 생성/등록 시점 (한 번만 일어남)

flowchart TD
JSON["JSON config<br/>type: 'plugin'<br/>module_path, class_name,<br/>adapter_params"]

subgraph Factory["L2 Adapter Factory"]
Reg["_L2_ADAPTER_FACTORY_REGISTRY<br/>name → factory"]
Pend["_PENDING_MODULES<br/>(lazy import 대상)"]
EAL["ensure_adapter_loaded(name)<br/>name 보일 때까지<br/>importlib.import_module 반복"]
end

PluginMod["plugin_l2_adapter.py<br/>(self-register)<br/>register_l2_adapter_type('plugin', ...)<br/>register_l2_adapter_factory('plugin', _create_plugin_adapter)"]

Create["create_l2_adapter_from_registry(config)"]
Make["_create_plugin_adapter(config)"]

UserMod["사용자 모듈<br/>(module_path 가 가리키는 곳)"]
Resolve["_resolve_config_class<br/>1. config_class_name<br/>2. class_name + 'Config'<br/>3. adapter_cls.config_class_name<br/>4. None"]
SubChk["issubclass(adapter_cls,<br/>L2AdapterInterface) ?"]

PluginInst["plugin adapter 인스턴스<br/>__init__ 안에서:<br/>• 3 × eventfd 생성<br/>• ThreadPoolExecutor / asyncio loop<br/>• 결과 큐, lock"]

JSON --> Create
PluginMod -. import 시 self-register .-> Reg
Create --> EAL --> Pend
EAL --> Reg
Reg -- factory 호출 --> Make
Make -- importlib.import_module(module_path) --> UserMod
Make -- getattr(class_name) --> SubChk
SubChk -- yes --> Resolve
Resolve -- cfg_cls.from_dict(adapter_params) --> PluginInst
Resolve -- None --> PluginInst

B. 런타임 호출 경로 (요청마다)

flowchart TD
SC["StoreController<br/>(스레드 A)"]
PC["PrefetchController<br/>(스레드 B)"]

subgraph Wrap["SerdeL2AdapterWrapper (선택적, 투명 래퍼)"]
Ser["직렬화 / 역직렬화"]
end

subgraph Plugin["plugin adapter (사용자 코드)"]
SubStore["submit_store_task<br/>(non-blocking)"]
SubLook["submit_lookup_and_lock_task"]
SubLoad["submit_load_task"]
Pop["pop_completed_store_tasks<br/>query_lookup_result<br/>query_load_result"]

EFDS[("store_efd")]
EFDK[("lookup_efd")]
EFDL[("load_efd")]

Exec["Executor<br/>ThreadPoolExecutor /<br/>asyncio loop"]
Q["완료 결과 큐<br/>(task_id → result)"]

IO["실제 I/O<br/>(write / read / lookup)"]
end

Dev[("스토리지 디바이스")]

SC -- "1\. submit_store_task(task_id, kv_chunks)" --> Ser --> SubStore
PC -- "1\. submit_lookup..." --> SubLook
PC -- "1\. submit_load_task" --> SubLoad

SubStore -- "2\. 큐 enqueue + 즉시 return" --> Exec
SubLook --> Exec
SubLoad --> Exec

Exec -- "3\. 워커가 dispatch" --> IO
IO -- "4\. syscall / passthru" --> Dev
Dev -- "완료" --> IO

IO -- "5a. 결과 큐 push" --> Q
IO -- "5b. eventfd write(1)" --> EFDS
IO --> EFDK
IO --> EFDL

EFDS -. "6\. poll 깨어남" .-> SC
EFDK -. "6\. poll 깨어남" .-> PC
EFDL -. "6\. poll 깨어남" .-> PC

SC -- "7\. pop/query 로 결과 회수" --> Pop
PC --> Pop
Pop -- "큐에서 꺼냄" --> Q

다이어그램에서 봐야 하는 포인트

번호의미코드 위치
A. self-registerplugin 모듈이 import 되는 순간 factory 등록plugin_l2_adapter.py:213-214
A. lazy importtype 이름이 registry 에 없으면 _PENDING_MODULES 에서 import 시도factory.py:101-135
A. config 자동 탐색class_name + "Config" 컨벤션이면 JSON 에 config_class_name 안 적어도 됨plugin_l2_adapter.py:179-209
B. 1→2 비동기 경계컨트롤러는 submit 후 즉시 다른 일 함 — adapter 가 blocking 하면 안 됨invariant
B. eventfd 3개store / lookup / load 각각 분리 — 컨트롤러가 다른 스레드라서base.py 계약
B. 5a + 5b 순서결과 큐 push → eventfd write 순서 지켜야 race 없음plugin 구현 책임
B. 6→7 두 단계poll 만으로 결과 안 옴; pop/query 로 따로 가져와야 함"two-phase" 알림

새 plugin 만들 때 위 그림 (B) 의 Plugin 박스 안쪽 전체가 짜야 할 코드. 바깥쪽 (StoreController, PrefetchController, SerdeL2AdapterWrapper, eventfd poll 인프라) 은 LMCache 가 다 깔아둠. raw_block_l2_adapter.py:320-345ThreadPoolExecutor 3개 패턴이 그 박스 안 skeleton 의 레퍼런스.


Open Questions

  • native_plugindrain_completions() 반환 타입 list[tuple[int, bool, str, list[bool] | None]] — 각 원소가 정확히 무엇을 의미하는지 NativeConnectorL2Adapter 코드 확인 필요.
  • FDP backend 를 plugin 으로 만들 때, _notify_keys_stored(keys, sizes) 를 base class 처럼 호출해야 eviction 회계가 작동함 — base class __init__ 도 제대로 호출해야 하는지 확인 필요 (super().init(max_capacity_bytes=...)).