Feature #
-
Distributed Inference
-
Why distributed inference?
-
Infra-side
-
Communication device:
- NVLink: direct communication between GPUs
- Infinity Band: High-speed connection between nodes
- RDMA: Remote direct memory access
- RDMA NIC
- Software solution
- Key advantage: bypass operating system / zero copy
-
Communication library:
dlms/distributed/device_communicators- PyDCL: communication for NVIDIA
- shared memory : OS
- custom allreduce - A kernel jsut for all reduce operation
- Before:
- 0 machine: [0]
- 1 machine: [1]
- 2 machine: [2]
- 3 machine: [3]
- After:
- 0 machine: [0,1,2,3]
- 1 machine: [0,1,2,3]
- 2 machine: [0,1,2,3]
- 3 machine: [0,1,2,3]
- Before:
- torch.distributed : provide wide support to a list of communication library
-
GroupCoordinator
-
-
Algorithm-side
- [TP]
vlms/model_executor/models/llama.py
-
-
-
Pipeline parallel
- Much less requirement to device–device connection hardware
- Cost: not improve latency
- Tensor parallel: directly improve latency
- Algorithm-side:
- Worker in charge of a subset of layers
~~vlms/model_executor/models/llama.py~~vlms/model_executor/models/llama.py- self.start_layer –> self.end_layer
- between workers: communicate IntermediateTensor
- get_pp_group()
vlms/worker/model_runner.py: searchget_pp_group()
- Worker in charge of a subset of layers
-
Expert parallel & data parallel (advanced)
- Why expert parallel:
- Mistral / Mixtral / Deepseek model: Mixture of Experts (MoE)
- Only for linear layers
- Normal MoE: all weights participant in computation
- MoE: expert as granularity, only a small subset of experts participate the computation, this subset of experts may be different between request
- Place different experts onto different GPUs –> expert parallel
- Algorithm:
- Expert parallel:
- Shuffle (deepsp communication kernel)
- Forward
- Shuffle back
- Expert parallel:
- TP is for attention, EP is for linear layers.
- Shared expert will have high load –> duplicate shared expert.
- Mistral / Mixtral / Deepseek model: Mixture of Experts (MoE)
- Why expert parallel:
-
DP (data parallel)
- max tp « ep needed
- tp < # attention head
- basic linear layer “degree of parallism” » basic attention layer tp “degree of parallism”, parallel request to raise attention “degree of parallism”
- Difficult to implement in practice:
- request padding to avoid deadlock.
-
Types of distributed inference: TP / PP / EP / DP
-
PD Disaggregation
代码 #
TP #
https://github.com/vllm-project/vllm/blob/main/vllm/distributed/parallel_state.py
_TP: Optional[GroupCoordinator] = None ### TP
def get_tp_group() -> GroupCoordinator:
assert _TP is not None, ("tensor model parallel group is not initialized")
return _TP
class GroupCoordinator:
"""
PyTorch ProcessGroup wrapper for a group of processes.
PyTorch ProcessGroup is bound to one specific communication backend,
e.g. NCCL, Gloo, MPI, etc.
GroupCoordinator takes charge of all the communication operations among
the processes in the group. It manages both CPU and device
communication.
"""
# available attributes:
rank: int # global rank
ranks: list[int] # global ranks in the group
world_size: int # size of the group
# difference between `local_rank` and `rank_in_group`:
# if we have a group of size 4 across two nodes:
# Process | Node | Rank | Local Rank | Rank in Group
# 0 | 0 | 0 | 0 | 0
# 1 | 0 | 1 | 1 | 1
# 2 | 1 | 2 | 0 | 2
# 3 | 1 | 3 | 1 | 3
local_rank: int # local rank used to assign devices
rank_in_group: int # rank inside the group
cpu_group: ProcessGroup # group for CPU communication
device_group: ProcessGroup # group for device communication
use_device_communicator: bool # whether to use device communicator
device_communicator: DeviceCommunicatorBase # device communicator
mq_broadcaster: Optional[Any] # shared memory broadcaster
https://github.com/vllm-project/vllm/blob/main/vllm/distributed/device_communicators/pynccl.py
def all_reduce(self,
in_tensor: torch.Tensor,
op: ReduceOp = ReduceOp.SUM,
stream=None) -> torch.Tensor:
if self.disabled:
return None
# nccl communicator created on a specific device
# will only work on tensors on the same device
# otherwise it will cause "illegal memory access"
assert in_tensor.device == self.device, (
f"this nccl communicator is created to work on {self.device}, "
f"but the input tensor is on {in_tensor.device}")
out_tensor = torch.empty_like(in_tensor)
if stream is None:
stream = current_stream()
self.nccl.ncclAllReduce(buffer_type(in_tensor.data_ptr()),
buffer_type(out_tensor.data_ptr()),
in_tensor.numel(),
ncclDataTypeEnum.from_torch(in_tensor.dtype),
ncclRedOpTypeEnum.from_torch(op), self.comm,
cudaStream_t(stream.cuda_stream))
return out_tensor
def all_gather(self,
output_tensor: torch.Tensor,
input_tensor: torch.Tensor,
stream=None):
if self.disabled:
return
# nccl communicator created on a specific device
# will only work on tensors on the same device
# otherwise it will cause "illegal memory access"
assert input_tensor.device == self.device, (
f"this nccl communicator is created to work on {self.device}, "
f"but the input tensor is on {input_tensor.device}")
if stream is None:
stream = current_stream()
self.nccl.ncclAllGather(
buffer_type(input_tensor.data_ptr()),
buffer_type(output_tensor.data_ptr()), input_tensor.numel(),
ncclDataTypeEnum.from_torch(input_tensor.dtype), self.comm,
cudaStream_t(stream.cuda_stream))
TP in llama #
https://github.com/vllm-project/vllm/blob/main/vllm/model_executor/models/llama.py
class LlamaAttention(nn.Module):
def __init__(
self,
config: LlamaConfig,
hidden_size: int,
num_heads: int,
num_kv_heads: int,
rope_theta: float = 10000,
rope_scaling: Optional[dict[str, Any]] = None,
max_position_embeddings: int = 8192,
quant_config: Optional[QuantizationConfig] = None,
bias: bool = False,
bias_o_proj: bool = False,
cache_config: Optional[CacheConfig] = None,
prefix: str = "",
attn_type: str = AttentionType.DECODER,
) -> None:
super().__init__()
layer_idx = extract_layer_index(prefix)
self.hidden_size = hidden_size
tp_size = get_tensor_model_parallel_world_size() ###
self.total_num_heads = num_heads
assert self.total_num_heads % tp_size == 0
self.num_heads = self.total_num_heads // tp_size ###
self.total_num_kv_heads = num_kv_heads
if self.total_num_kv_heads >= tp_size:
# Number of KV heads is greater than TP size, so we partition
# the KV heads across multiple tensor parallel GPUs.
assert self.total_num_kv_heads % tp_size == 0
else:
# Number of KV heads is less than TP size, so we replicate
# the KV heads across multiple tensor parallel GPUs.
assert tp_size % self.total_num_kv_heads == 0
self.num_kv_heads = max(1, self.total_num_kv_heads // tp_size) ###
# MistralConfig has an optional head_dim introduced by Mistral-Nemo
head_dim = getattr(config, "head_dim", None)
if head_dim is None:
head_dim = self.hidden_size // self.total_num_heads
self.head_dim = head_dim
# Phi models introduced a partial_rotary_factor parameter in the config
self.partial_rotary_factor = getattr(config, "partial_rotary_factor",
1)
self.q_size = self.num_heads * self.head_dim
self.kv_size = self.num_kv_heads * self.head_dim
self.scaling = self.head_dim**-0.5
self.rope_theta = rope_theta
self.max_position_embeddings = max_position_embeddings
PP #
@support_torch_compile
class LlamaModel(nn.Module):
def __init__(self,
*,
vllm_config: VllmConfig,
prefix: str = "",
layer_type: type[nn.Module] = LlamaDecoderLayer):
super().__init__()
config = vllm_config.model_config.hf_config
cache_config = vllm_config.cache_config
quant_config = vllm_config.quant_config
lora_config = vllm_config.lora_config
self.config = config
self.quant_config = quant_config
lora_vocab = (lora_config.lora_extra_vocab_size *
(lora_config.max_loras or 1)) if lora_config else 0
self.vocab_size = config.vocab_size + lora_vocab
self.org_vocab_size = config.vocab_size
if get_pp_group().is_first_rank or (config.tie_word_embeddings
and get_pp_group().is_last_rank):
self.embed_tokens = VocabParallelEmbedding(
self.vocab_size,
config.hidden_size,
org_num_embeddings=config.vocab_size,
quant_config=quant_config,
)
else:
self.embed_tokens = PPMissingLayer()
self.start_layer, self.end_layer, self.layers = make_layers( ## start_layer end_layer
config.num_hidden_layers,
lambda prefix: layer_type(config=config,
cache_config=cache_config,
quant_config=quant_config,
prefix=prefix),
prefix=f"{prefix}.layers",
)
if get_pp_group().is_last_rank:
self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
else:
self.norm = PPMissingLayer()
self.aux_hidden_state_layers: tuple[int] = tuple()
self.make_empty_intermediate_tensors = (
make_empty_intermediate_tensors_factory(
["hidden_states", "residual"], config.hidden_size))