index-tts/indextts/accel/kv_manager.py
storyicon c1ef4148af feat: achieve inference acceleration for the gpt2 stage
Signed-off-by: storyicon <storyicon@foxmail.com>
2025-10-24 08:15:00 +00:00

209 lines
6.8 KiB
Python

import hashlib
import pickle
from collections import deque
from copy import copy
from typing import Dict, List, Optional, Set
import torch
class KVCacheBlock:
def __init__(self, block_id: int):
self.block_id = block_id
self.ref_cnt = 0
self._block_hash = None
self.token_ids = []
@property
def block_hash(self) -> Optional[bytes]:
return self._block_hash
def update(self, block_hash: bytes, token_ids: List[int]):
self._block_hash = block_hash
self.token_ids = token_ids
def reset(self):
self.ref_cnt = 1
self._block_hash = None
self.token_ids = []
class Seq:
def __init__(self, token_ids: List[int], block_size: int = 256):
self.token_ids = copy(token_ids)
self.last_token = token_ids[-1] if token_ids else 0
self.num_tokens = len(self.token_ids)
self.num_prompt_tokens = len(token_ids)
self.num_cached_tokens = 0
self.block_table: List[int] = []
self.block_size = block_size
def __len__(self):
return self.num_tokens
def __getitem__(self, key):
return self.token_ids[key]
@property
def num_blocks(self):
return (self.num_tokens + self.block_size - 1) // self.block_size
@property
def num_cached_blocks(self):
return self.num_cached_tokens // self.block_size
@property
def last_block_num_tokens(self):
return self.num_tokens - (self.num_blocks - 1) * self.block_size
def get_block_tokens(self, block_idx: int) -> List[int]:
assert 0 <= block_idx < self.num_blocks
start = block_idx * self.block_size
end = start + self.block_size
return self.token_ids[start:end]
def append_token(self, token_id: int):
self.token_ids.append(token_id)
self.last_token = token_id
self.num_tokens += 1
class KVCacheManager:
def __init__(
self,
num_layers: int,
num_heads: int,
head_dim: int,
block_size: int,
num_blocks: int,
dtype: torch.dtype,
):
self.num_layers = num_layers
self.num_heads = num_heads
self.head_dim = head_dim
self.block_size = block_size
self.num_blocks = num_blocks
self.dtype = dtype
self.blocks: List[KVCacheBlock] = [KVCacheBlock(i) for i in range(num_blocks)]
self.block_hash_to_id: Dict[bytes, int] = {}
self.free_block_ids: deque = deque(range(num_blocks))
self.used_block_ids: Set[int] = set()
device = "cuda" if torch.cuda.is_available() else "cpu"
cache_dtype = torch.float16 if device == "cuda" else dtype
self.kv_cache = torch.empty(
2,
num_layers,
num_blocks,
block_size,
num_heads,
head_dim,
dtype=cache_dtype,
device=device,
)
@classmethod
def compute_block_hash(
cls, token_ids: List[int], parent_hash: Optional[bytes] = None
) -> bytes:
hash_input = []
if parent_hash is not None:
hash_input.append(parent_hash)
hash_input.extend(token_ids)
input_bytes = pickle.dumps(tuple(hash_input), protocol=pickle.HIGHEST_PROTOCOL)
return hashlib.sha256(input_bytes).digest()
def _allocate_block(self, block_id: int) -> KVCacheBlock:
block = self.blocks[block_id]
assert block.ref_cnt == 0
block.reset()
self.free_block_ids.remove(block_id)
self.used_block_ids.add(block_id)
return block
def _deallocate_block(self, block_id: int):
assert self.blocks[block_id].ref_cnt == 0
self.used_block_ids.remove(block_id)
self.free_block_ids.append(block_id)
def allocate(self, sequence: Seq):
assert not sequence.block_table, "Sequence already has allocated blocks"
parent_hash = None
cache_miss = False
for i in range(sequence.num_blocks):
token_ids = sequence.get_block_tokens(i)
block_hash = (
self.compute_block_hash(token_ids, parent_hash)
if len(token_ids) == self.block_size
else None
)
block_id = self.block_hash_to_id.get(block_hash) if block_hash else None
if block_id is None or self.blocks[block_id].token_ids != token_ids:
cache_miss = True
if cache_miss:
block_id = self.free_block_ids[0]
block = self._allocate_block(block_id)
else:
sequence.num_cached_tokens += self.block_size
if block_id is not None and block_id in self.used_block_ids:
block = self.blocks[block_id]
block.ref_cnt += 1
else:
block_id = self.free_block_ids[0]
block = self._allocate_block(block_id)
if block_hash is not None:
block.update(block_hash, token_ids)
self.block_hash_to_id[block_hash] = block_id
parent_hash = block_hash
sequence.block_table.append(block_id)
def deallocate(self, sequence: Seq):
for block_id in reversed(sequence.block_table):
block = self.blocks[block_id]
block.ref_cnt -= 1
if block.ref_cnt == 0:
self._deallocate_block(block_id)
sequence.num_cached_tokens = 0
sequence.block_table.clear()
def append_to_seq(self, sequence: Seq):
block_table = sequence.block_table
last_block = self.blocks[block_table[-1]]
if len(sequence) % self.block_size == 1:
assert last_block.block_hash is not None
block_id = self.free_block_ids[0]
self._allocate_block(block_id)
block_table.append(block_id)
elif len(sequence) % self.block_size == 0:
assert last_block.block_hash is None
token_ids = sequence.get_block_tokens(sequence.num_blocks - 1)
parent_hash = (
self.blocks[block_table[-2]].block_hash
if len(block_table) > 1
else None
)
block_hash = self.compute_block_hash(token_ids, parent_hash)
last_block.update(block_hash, token_ids)
self.block_hash_to_id[block_hash] = last_block.block_id
else:
assert last_block.block_hash is None
def remove_seq(self, sequence: Seq):
self.deallocate(sequence)
def wire_kv_cache_to_model(self, model):
layer_id = 0
for module in model.modules():
if hasattr(module, "k_cache") and hasattr(module, "v_cache"):
module.k_cache = self.kv_cache[0, layer_id]
module.v_cache = self.kv_cache[1, layer_id]
layer_id += 1