raw_block io_uring_cmd ๊ตฌํ (PR #3274)
[!tldr] ์ ๋ฌด ๊ด์ takeaway LMCache raw_block ๋ฐฑ์๋์ NVMe io_uring_cmd passthrough ๊ฒฝ๋ก๊ฐ ์ถ๊ฐ๋๋ค. Block Layer๋ฅผ ์์ ํ ์ฐํํด NVMe ๋๋ผ์ด๋ฒ์ ์ง์ ๋ช ๋ น์ ๋ณด๋ธ๋ค. ํต์ฌ์
NvmeUringCmd.cdw13์dspecํ๋ โ ์ฌ๊ธฐ์ FDP placement_id๋ฅผ ๋ฃ์ผ๋ฉด NVMe ๋ ๋ฒจ์์ ๋ฐ์ดํฐ ๋ฐฐ์น ์คํธ๋ฆผ์ด ์ง์ ๋๋ค. ์ฆ ์ด PR์ด FDP Backend์ ๋ฐฐ๊ด์ ์์ฑํ๋ค. ๊ทธ๋ฆฌ๊ณbuilder()ํจํด ๋์ ์ผ๋ก M3(io_uring setup flag ํ๋)์ ์ฐฉ์์ ๋ ๋๋ค.
PR ๊ฐ์โ
| ํญ๋ชฉ | ๋ด์ฉ |
|---|---|
| PR | #3274 |
| ์์ฑ์ | Ankit Kumar (@ankit-sam) |
| ์ํ | Open (๋ฆฌ๋ทฐ ์ค) |
| ์์ฝ | MP ๋ชจ๋ ํตํฉ rebase ์ค ๋๋ฝ๋ io_uring ์ฝ๋ ๋ณต๊ตฌ + NVMe io_uring_cmd passthrough ์ ๊ท ์ถ๊ฐ |
I/O ๊ฒฝ๋ก ๋น๊ตโ
๊ธฐ์กด io_uring:
App โ io_uring (SQE 64B) โ Block Layer โ NVMe Driver โ SSD
io_uring_cmd (์ด PR):
App โ io_uring_cmd (big SQE 128B) โ NVMe Driver โ SSD
โ
Block Layer ์์ ์๋ต
Rust ๋ณ๊ฒฝ์ฌํญ (lib.rs)โ
IoUringWrapper โ ์ปค๋ ๋ฒ์ ํธํ์ฑ ์ถ์ํโ
#[derive(Clone)]
enum IoUringWrapper {
Standard(Arc<Mutex<IoUring<SqueueEntry, Entry>>>), // ์ปค๋ 5.4~5.18
Big(Arc<Mutex<IoUring<Entry128, Entry32>>>), // ์ปค๋ 5.19+
}
| ํ์ | SQE ํฌ๊ธฐ | CQE ํฌ๊ธฐ | ํ์ ์ปค๋ | ์ฉ๋ |
|---|---|---|---|---|
Standard | 64 bytes | 16 bytes | 5.4+ | ์ผ๋ฐ pread/pwrite |
Big | 128 bytes | 32 bytes | 5.19+ | io_uring_cmd (NVMe passthrough) |
NVMe ๋ช ๋ น ๊ตฌ์กฐ์ฒด(80 bytes)๋ฅผ SQE์ inline์ผ๋ก ๋ด์ผ๋ ค๋ฉด 128-byte SQE๊ฐ ํ์.
์ด๊ธฐํ โ Big ๋จผ์ ์๋, ์คํจ ์ Standard fallback:
let ring = match IoUring::<Entry128, Entry32>::builder()
.build(iouring_queue_depth as u32)
{
Ok(big_ring) => IoUringWrapper::Big(Arc::new(Mutex::new(big_ring))),
Err(_) => {
if use_uring_cmd {
return Err(PyRuntimeError::new_err(
"io_uring_cmd requires kernel 5.19 or later",
));
}
// fallback: Standard
IoUringWrapper::Standard(Arc::new(Mutex::new(std_ring)))
}
};
M3 ์ฐฉ์์ :
.build()์์.setup_single_issuer().setup_defer_taskrun()๋ฑ์ ์ถ๊ฐํ๋ ๊ฒ M3์ ํต์ฌ. Big/Standard ๋ ๊ฒฝ๋ก ๋ชจ๋์ ์ ์ฉํด์ผ ํ๋ค.
NvmeUringCmd โ NVMe ๋ช ๋ น ๊ตฌ์กฐ์ฒดโ
#[repr(C)]
#[derive(Debug, Clone, Copy)]
struct NvmeUringCmd {
opcode: u8, // NVME_IO_READ(0x02) / NVME_IO_WRITE(0x01)
flags: u8,
rsvd1: u16,
nsid: u32, // NVMe Namespace ID
cdw2: u32,
cdw3: u32,
metadata: u64,
addr: u64, // ๋ฐ์ดํฐ ๋ฒํผ ์ฃผ์
metadata_len: u32,
data_len: u32, // ์ ์ก ํฌ๊ธฐ (bytes)
cdw10: u32, // SLBA[31:0] โ ์์ LBA ํ์ 32๋นํธ
cdw11: u32, // SLBA[63:32] โ ์์ LBA ์์ 32๋นํธ
cdw12: u32, // NLB (Number of Logical Blocks - 1) | dtype
cdw13: u32, // dspec โ FDP placement handle ID โ ํต์ฌ
cdw14: u32,
cdw15: u32,
rsvd2: [u32; 4],
}
cdw13์ dspec ํ๋๊ฐ FDP ์ฐ๊ฒฐ ์ง์ ์ด๋ค. FDP placement_id๋ฅผ ์ฌ๊ธฐ ๋ฃ์ผ๋ฉด NVMe ๋ ๋ฒจ์์ ๋ฐ์ดํฐ ๋ฐฐ์น ์คํธ๋ฆผ์ ์ง์ ํ ์ ์๋ค.
nvme_uring_cmd_prep โ NVMe ๋ช ๋ น ๋น๋โ
fn nvme_uring_cmd_prep(
cmd: &mut NvmeUringCmd,
is_write: bool,
nsid: u32,
offset: u64, // ๋ฐ์ดํธ ์คํ์
len: usize,
lba_shift: u32, // LBA ํฌ๊ธฐ = 1 << lba_shift (9=512B, 12=4KB)
ptr: *const u8,
dtype: u8, // Directive Type (FDP = 2)
dspec: u16, // FDP placement handle ID
) {
let slba = offset >> lba_shift; // ๋ฐ์ดํธ ์คํ์
โ LBA ๋ฒํธ
let nlb = (len >> lba_shift) - 1; // ์ ์ก ํฌ๊ธฐ โ ๋ธ๋ก ์ - 1
cmd.cdw10 = (slba & 0xFFFFFFFF) as u32;
cmd.cdw11 = (slba >> 32) as u32;
cmd.cdw12 = nlb as u32 | ((dtype as u32) << 20);
cmd.cdw13 = (dspec as u32) << 16; // FDP placement_id ์ฌ๊ธฐ
cmd.addr = ptr as u64;
cmd.data_len = len as u32;
}
LBA ๋ณํ:
๋ฐ์ดํธ ์คํ์
โ SLBA = offset >> lba_shift
์ ์ก ํฌ๊ธฐ โ NLB = (len >> lba_shift) - 1
register_fixed_buffers โ zero-copy I/O ๋ฑ๋กโ
fn register_fixed_buffers(&self, buffer_ptrs: Vec<usize>, buffer_sizes: Vec<usize>) -> PyResult<()> {
let mut map = self.fixed_buffer_map.lock().unwrap();
for (idx, (ptr, size)) in buffer_ptrs.iter().zip(buffer_sizes.iter()).enumerate() {
map.insert(*ptr, (idx as u16, *size));
}
let iovecs: Vec<libc::iovec> = ...;
ring.submitter().register_buffers(&iovecs) // syscall 1๋ฒ์ผ๋ก N๊ฐ ๋ฒํผ ๋ฑ๋ก
}
๋ฒํผ๋ฅผ ์ปค๋์ ๋ฏธ๋ฆฌ ๋ฑ๋กํ๋ฉด I/O ์ buf_index๋ก ์ฐธ์กฐ ๊ฐ๋ฅ โ ๋งค๋ฒ ์ฃผ์ ๋ฒ์ญ ๋ถํ์. CPU/GPU ๋ฉ๋ชจ๋ฆฌ๋ฅผ ํ ๋ฒ ํํ๊ณ ์ฌ์ฌ์ฉํ๋ ๊ตฌ์กฐ.
Worker thread ๋ฐฐ์น submit ํจํดโ
let batch: Vec<IoSubmission> = std::mem::take(&mut *q);
for sub in batch.iter().take(to_submit_count) {
build_and_submit_sqe(&ring_clone, sub, user_data); // SQ์ ์ถ๊ฐ (syscall ์๋)
}
ring.submitter().submit() // syscall 1๋ฒ์ผ๋ก N๊ฐ ํ๊บผ๋ฒ์ ์ปค๋๋ก
์์ฒญ์ SQ์ ์์ ๋ค ํ ๋ฒ์ syscall๋ก ๋ฐฐ์น ์ ์ถ โ N๊ฐ I/O์ syscall 1๋ฒ.
Python ๋ณ๊ฒฝ์ฌํญ (core.py)โ
_write_buffers / _read_buffers โ I/O ๊ฒฝ๋ก ๋ผ์ฐํ
โ
io_engine = "posix" โ pwrite_from_buffer (๊ธฐ์กด ๋๊ธฐ I/O)
io_engine = "io_uring"
โโโ use_uring_cmd=True โ _write_uring_cmd_buffers (NVMe passthrough)
โโโ use_uring_cmd=False
โโโ payload==total (์ ๋ ฌ๋จ) โ batched_write + wait_iouring (๋ฐฐ์น ๋น๋๊ธฐ)
โโโ ๊ทธ ์ธ โ write_uring (๊ฐ๋ณ ๋น๋๊ธฐ)
๊ธฐ์กด _write_one์ด header/payload๋ฅผ 2๋ฒ pwriteํ๋ ๊ฒ๊ณผ ๋ฌ๋ฆฌ, io_uring ๊ฒฝ๋ก๋ batched_write๋ก ์ฌ๋ฌ I/O๋ฅผ ํ ๋ฒ์ submit.
register_fixed_buffers_from_allocatorโ
def register_fixed_buffers_from_allocator(self, memory_allocator) -> None:
buffers = memory_allocator.get_paged_buffers()
buffer_ptrs = [buf.data_ptr() for buf in buffers]
buffer_sizes = [buf.numel() * buf.element_size() for buf in buffers]
self._rawdev().register_fixed_buffers(buffer_ptrs, buffer_sizes)
CPU allocator์ ํ์ด์ง ๋ฒํผ๋ฅผ io_uring์ ๋ฑ๋ก โ ์ดํ ํด๋น ๋ฒํผ I/O๋ zero-copy.
max_hw_sectors_kb โ ์๋ ์ ์ก ํฌ๊ธฐ ๋ถํ โ
max_hw_sectors_kb = _read_sysfs_int(f"{queue_dir}/max_hw_sectors_kb")
resolved_bytes = max_hw_sectors_kb * 1024
aligned_bytes = (resolved_bytes // self.block_align) * self.block_align
NVMe ๋๋ฐ์ด์ค๊ฐ ํ ๋ฒ์ ์ฒ๋ฆฌํ ์ ์๋ ์ต๋ ํฌ๊ธฐ๋ฅผ sysfs์์ ์ฝ์ด์, KV ์ฒญํฌ๊ฐ ์ด๊ณผํ๋ฉด ์ฌ๋ฌ NVMe ๋ช ๋ น์ผ๋ก ๋ถํ ๋ฐํ.
์ ์ฒด I/O ํ๋ฆ (io_uring_cmd ๊ฒฝ๋ก)โ
Python: put_many(keys, objs)
โโ _write_buffers(offsets, bufs, ...)
โโ use_uring_cmd=True
โโ _write_uring_cmd_buffers()
โโ nvme_uring_cmd_prep(cmd, offset, len, dspec=placement_id)
โโ IoUringWrapper::Big โ UringCmd80 โ SQ push
โโ submitter().submit() [syscall 1๋ฒ]
โโ NVMe HW: SLBA, NLB, FDP dspec ์ฒ๋ฆฌ
โโ wait_iouring(batch_id) โ CQ ์๊ฑฐ
M3์์ ๊ด๊ณโ
์ด PR์ด ์์ฑ๋๋ฉด builder() ํจํด์ด ๋์
๋์ง๋ง setup flag๋ ์๋ค:
// #3274 ์ดํ ์ํ (M3 ์ฐฉ์์ )
IoUring::<Entry128, Entry32>::builder()
.build(iouring_queue_depth as u32) // โ ์ฌ๊ธฐ์ ํ๋๊ทธ ์ถ๊ฐ๊ฐ M3
IoUring::<SqueueEntry, Entry>::builder()
.build(iouring_queue_depth as u32) // โ fallback ๊ฒฝ๋ก์๋ ๋์ผ ์ ์ฉ
Worker thread๊ฐ single issuer ๊ตฌ์กฐ์์๋ ์ปค๋์ด ๊ทธ ์ฌ์ค์ ๋ชจ๋ฅด๋ ์ํ.
M3์์ setup_single_issuer(), setup_defer_taskrun() ์ถ๊ฐ โ SQ submission ์ค๋ฒํค๋ ๊ฐ์.
FDP ์ฐ๋ ๊ฐ๋ฅ์ฑโ
nvme_uring_cmd_prep์ dspec ํ๋ผ๋ฏธํฐ๊ฐ FDP placement handle์ ์ ๋ฌํ๋ ํ๋:
cdw13 = (dspec as u32) << 16 โ FDP placement_id ์ฌ๊ธฐ
FDP placement_id ๊ฒฐ์ ๋ก์ง์ด ์์ฑ๋๋ฉด, ์ด dspec์ ๋๊ธฐ๋ ๊ฒ์ผ๋ก FDP ํ์ฑํ.
#3274๊ฐ ๊ทธ ๋ฐฐ๊ด์ ์์ฑํ๋ PR์ด๋ค.
ํ์ฌ PR:
nvme_uring_cmd_prep(... dspec=0) โ placement ๋ฏธ์ง์
FDP ๋ค์ ๋จ๊ณ:
nvme_uring_cmd_prep(... dspec=placement_id)
โ
RUH ๋ฒํธ ์ง์ ์ง์ โ WAF โ
ํ์ฌ ์ ์ฝ ๋ฐ ๋ฆฌ๋ทฐ ํผ๋๋ฐฑโ
| ํญ๋ชฉ | ๋ด์ฉ |
|---|---|
| fixed buffer + uring_cmd ์กฐํฉ | ์์ง ๋ฏธ๊ตฌํ (PR ๋ณธ๋ฌธ ๋ช ์) |
| ์ ๋ ฌ ๊ฒ์ฆ ๋ก์ง | ์ค์ ๋ ฌ ๋ฐ์ดํธ ๋ฒ์ ๊ฒ์ฆ ๊ฐ์ ์์ฒญ (DongDongJu ์ฝ๋ฉํธ) |
| ๋น์ ๋ ฌ I/O | ์ง์ ์ ํจ (๋ธ๋ก ์ ๋ ฌ ์ ์ก๋ง) |
--use-uring-cmd UX | --use-uring ์์ด ๋จ๋
์ฌ์ฉ ์ ์คํด ์์ง ์๋ ์๋ฌ ๋ฉ์์ง |
๊ด๋ จ ํ์ด์งโ
- [[io_uring]] โ io_uring_cmd ๊ฐ๋ , big SQE/CQE ๊ตฌ์กฐ ์์ธ
- [[raw_block-์ข ๋จ-๋ถ์]] โ raw_block ์ ๊ณ์ธต ๋ถ์, FDP ์ฝ์ ํฌ์ธํธ H1-H8
- [[NVMe-FDP]] โ Placement Handle, RUH โ dspec์ ๋ฃ์ ๋์
- [[๊ธฐ์ฌ-ํฌ์ธํธ-๋งต]] โ [2][8] ๊ธฐ์ฌ ํฌ์ธํธ
- [[PR-3274-IoUring-NVMe]] โ ์ด PR ์ถ์ ํ์ด์ง (ํ์ฌ ์ํ, M1/M2 ์ฐฉ์ ์กฐ๊ฑด)