Skip to content

vllm.model_executor.layers.fused_moe.runner.moe_runner_base

MoERunnerBase

Bases: MoERunner

Abstract base class providing common functionality for MoE runner implementations.

This class serves as the foundation for concrete MoE runner implementations by providing shared state management and common utilities. It handles: - Common initialization and configuration management - Shared expert output reduction logic for tensor parallel scenarios - Base methods for tensor model parallel reductions - Common properties and utility functions used across different runner types

Concrete subclasses must implement the abstract methods to define their specific execution strategies, such as standard execution, chunked processing, or other specialized approaches. The base class provides the infrastructure while allowing flexibility in the actual MoE computation implementation.

Key abstract methods that subclasses must implement: - reduce_results: Determines whether results should be reduced across ranks - _forward_impl: The core MoE computation logic specific to each runner type

Source code in vllm/model_executor/layers/fused_moe/runner/moe_runner_base.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
class MoERunnerBase(MoERunner):
    """
    Abstract base class providing common functionality for MoE runner implementations.

    This class serves as the foundation for concrete MoE runner implementations by
    providing shared state management and common utilities. It handles:
    - Common initialization and configuration management
    - Shared expert output reduction logic for tensor parallel scenarios
    - Base methods for tensor model parallel reductions
    - Common properties and utility functions used across different runner types

    Concrete subclasses must implement the abstract methods to define their specific
    execution strategies, such as standard execution, chunked processing, or other
    specialized approaches. The base class provides the infrastructure while
    allowing flexibility in the actual MoE computation implementation.

    Key abstract methods that subclasses must implement:
    - reduce_results: Determines whether results should be reduced across ranks
    - _forward_impl: The core MoE computation logic specific to each runner type
    """

    def __init__(
        self,
        layer_name: str,
        moe_config: FusedMoEConfig,
        router: FusedMoERouter,
        routed_input_transform: torch.nn.Module | None,
        gate: torch.nn.Module | None,
        shared_experts: torch.nn.Module | None,
        quant_method: FusedMoEMethodBase,
        reduce_results: bool,
        enable_dbo: bool,
    ):
        super().__init__()
        self.moe_config = moe_config
        self.router = router
        self.routed_input_transform = routed_input_transform
        self.gate = gate
        self.quant_method = quant_method
        self._reduce_results = reduce_results
        self.enable_dbo = enable_dbo

        self._shared_experts: SharedExperts | None = None
        if shared_experts is not None:
            self._shared_experts = SharedExperts(
                shared_experts,
                moe_config=moe_config,
                # Note: For now we must pass quant_method along to SharedExperts so it
                # can property determine where the shared experts are supposed to be
                # called, i.e. by a MK or by the MoERunner.
                # Once the MK can be created upfront, we can just pass in the proper
                # flags derived from the quant_method's MK.
                reduce_results=reduce_results,
                quant_method=quant_method,
                enable_dbo=enable_dbo,
            )

        # Needed for string -> FusedMoE layer lookup in custom ops.
        self.layer_name = layer_name

        self.forward_entry = self._select_forward()

    def _select_forward(self) -> Callable:
        if current_platform.is_tpu() or current_platform.is_cpu():
            # TODO: Once the OOM issue for the TPU backend is resolved, we
            # will switch to using the moe_forward custom op.
            # Note: CPU doesn't require wrapped _forward_impl.
            return _moe_forward if self._shared_experts is None else _moe_forward_shared

        return (
            torch.ops.vllm.moe_forward
            if self._shared_experts is None
            else torch.ops.vllm.moe_forward_shared
        )

    @property
    def shared_experts(self) -> SharedExperts | None:
        return self._shared_experts

    # TODO(bnell): temporary hack, do not call this method.
    def _replace_quant_method(self, quant_method: FusedMoEMethodBase):
        if self._shared_experts is not None:
            self._shared_experts._quant_method = quant_method
        self.quant_method = quant_method

    def is_internal_router(self) -> bool:
        return self.gate is not None

    @property
    @abstractmethod
    def reduce_results(self) -> bool:
        raise NotImplementedError

    def must_reduce_shared_expert_outputs(self) -> bool:
        """
        The shared_experts are typically computed using the RowParallelLinear
        layer. The result of this function is typically used as
        the reduce_results argument to the module.
        When just tensor-parallel is used, it is not required to reduce
        the shared_experts results immediately. Instead we reduce at the
        once at the end of the MoE op. (Refer to DeepSeekV2MoE module)
        With EP and all2all kernels - this is no longer viable as all
        GPU ranks in DP, produce the complete set of hidden_states.
        Therefore it is required that we reduce the shared_experts output
        early.
        """
        return (
            self.quant_method.moe_kernel is not None
            and self.quant_method.moe_kernel.output_is_reduced()
        )

    def maybe_all_reduce_tensor_model_parallel(self, final_hidden_states: torch.Tensor):
        """
        Some combine kernels reduce across GPU ranks by default.
        """
        if self.must_reduce_shared_expert_outputs():
            return final_hidden_states
        else:
            return tensor_model_parallel_all_reduce(final_hidden_states)

    def apply_routed_input_transform(
        self, hidden_states: torch.Tensor
    ) -> tuple[torch.Tensor, torch.Tensor | None]:
        """Apply transform for routed experts (e.g., latent projection).

        This is called by FusedMoE.forward_native. The original hidden_states
        is saved separately so shared experts get [S, hidden_size] while
        routed experts get the transformed [S, moe_latent_size].

        TODO: For latent MoE bandwidth optimization, fc2_latent_proj could be
        moved inside SharedFusedMoE to all-reduce on the smaller latent
        dimension.

        Returns (possibly transformed) hidden states and the input for shared
        experts (or None if there are no shared experts).
        """
        if self.routed_input_transform is not None:
            result = self.routed_input_transform(hidden_states)
            # ReplicatedLinear returns (output, extra_bias) tuple.
            # We only need the output tensor; extra_bias is not used here.
            if isinstance(result, tuple):
                return result[0], hidden_states
            return result, hidden_states

        return (
            hidden_states,
            hidden_states if self._shared_experts is not None else None,
        )

    def _maybe_reduce_output(
        self,
        states: torch.Tensor | tuple[torch.Tensor, torch.Tensor],
        trunc_sizes: list[int],
    ) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor]:
        def trunc(x: torch.Tensor, trunc_size: int) -> torch.Tensor:
            return x[..., :trunc_size]

        def reduce_and_trunc(x: torch.Tensor, trunc_size: int) -> torch.Tensor:
            return trunc(self.maybe_all_reduce_tensor_model_parallel(x), trunc_size)

        if (
            not self.moe_config.is_sequence_parallel
            and self.reduce_results
            and (self.moe_config.tp_size > 1 or self.moe_config.ep_size > 1)
        ):
            func = reduce_and_trunc
        else:
            func = trunc

        if isinstance(states, tuple):
            return tuple(
                [func(s, trunc_size) for s, trunc_size in zip(states, trunc_sizes)]
            )
        else:
            assert len(trunc_sizes) == 1
            return func(states, trunc_sizes[0])

    def _encode_layer_name(self) -> str | ModuleName:
        if HAS_OPAQUE_TYPE:
            return ModuleName(self.layer_name)
        # Can be unavailable or None in unittests
        if (
            is_forward_context_available()
            and get_forward_context().all_moe_layers is not None
        ):
            return "from_forward_context"
        return self.layer_name

    def _maybe_pad_hidden_states(
        self,
        shared_experts_input: torch.Tensor | None,
        hidden_states: torch.Tensor,
    ) -> tuple[torch.Tensor, list[int]]:
        shared_experts_hidden_dim = (
            shared_experts_input.shape[-1] if shared_experts_input is not None else 0
        )
        transformed_hidden_dim = hidden_states.shape[-1]
        if (
            not self.quant_method.skip_forward_padding
            and self.moe_config.hidden_dim != transformed_hidden_dim
        ):
            hidden_states = F.pad(
                hidden_states,
                (0, self.moe_config.hidden_dim - transformed_hidden_dim),
                mode="constant",
                value=0.0,
            )

        if self._shared_experts is not None:
            orig_hidden_dims = [shared_experts_hidden_dim, transformed_hidden_dim]
        else:
            orig_hidden_dims = [transformed_hidden_dim]

        return hidden_states, orig_hidden_dims

    def _maybe_apply_shared_experts(
        self,
        shared_experts_input: torch.Tensor | None,
        order: SharedExpertsOrder,
    ):
        if self._shared_experts is not None:
            assert shared_experts_input is not None
            self._shared_experts.apply(shared_experts_input, order)

    def _apply_quant_method(
        self,
        layer: torch.nn.Module,
        hidden_states: torch.Tensor,
        router_logits: torch.Tensor,
        shared_experts_input: torch.Tensor | None,
    ) -> tuple[torch.Tensor | None, torch.Tensor]:
        # Run this before quant_method to avoid inplace issues.
        # TODO(bnell): probably not needed anymore since inplace is
        # disabled when shared experts are present.
        self._maybe_apply_shared_experts(
            shared_experts_input, SharedExpertsOrder.NO_OVERLAP
        )

        if self.quant_method.is_monolithic:
            fused_out = self.quant_method.apply_monolithic(
                layer=layer,
                x=hidden_states,
                router_logits=router_logits,
            )
        else:
            topk_weights, topk_ids = self.router.select_experts(
                hidden_states=hidden_states,
                router_logits=router_logits,
            )

            # Passing shared_experts_input in case SharedExpertsOrder is
            # NO_OVERLAP or MK_INTERNAL_OVERLAPPED.
            fused_out = self.quant_method.apply(
                layer=layer,
                x=hidden_states,
                topk_weights=topk_weights,
                topk_ids=topk_ids,
                shared_experts_input=shared_experts_input,
            )

        self._maybe_apply_shared_experts(
            shared_experts_input,
            SharedExpertsOrder.MULTI_STREAM_OVERLAPPED,
        )

        return (
            self._shared_experts.output if self._shared_experts is not None else None,
            fused_out,
        )

    def _sequence_parallel_context(self):
        ctx = get_forward_context()
        return (
            ctx.dp_metadata.sp_local_sizes(self.moe_config.sp_size)
            if ctx.dp_metadata
            else nullcontext()
        )

    def _maybe_sync_shared_experts_stream(
        self,
        shared_experts_input: torch.Tensor | None,
    ):
        # If router/gate provided, then apply it here.
        # (Note: This code runs only when "overlapped mode" is on to allow
        #        parallel execution of shared experts with the FusedMoE via
        #        separate cuda stream)
        if self._shared_experts is not None:
            self._shared_experts.maybe_sync_shared_experts_stream(shared_experts_input)

    def forward(
        self,
        hidden_states: torch.Tensor,
        router_logits: torch.Tensor,
    ) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor]:
        """Invoke the fused moe layer.

        Input:
        - hidden_states
        - router_logits

        Output:
        - The new hidden_states.
        or
        - A tuple of (shared experts output, new hidden_states).

        Calling sequence
        - forward
          - self.forward_entry (_moe_forward or _moe_forward_shared custom op)
            - forward_dispatch
              - _forward_impl

        Note: The existence of _moe_forward and _moe_forward_shared custom ops are due
        to the following reasons:
        1. the chunking loop in ChunkingMoERunner._forward_impl cannot be compiled by
           torch.compile
        2. pytorch cannot handle union types in custom op signatures so _moe_forward
           and _moe_forward_shared must be split.

        If ChunkingMoERunner._forward_impl can be implemented via torch.scan we can
        potentially get rid of _moe_forward and _moe_forward_shared and collapse the
        whole sequence into the 'forward' method.
        """

        # Apply transform for routed experts (e.g., latent projection for latent MoE)
        hidden_states, shared_experts_input = self.apply_routed_input_transform(
            hidden_states
        )

        hidden_states, og_hidden_dims = self._maybe_pad_hidden_states(
            shared_experts_input,
            hidden_states,
        )

        fused_output = self.forward_entry(
            hidden_states,
            router_logits,
            shared_experts_input,
            self._encode_layer_name(),
        )

        return self._maybe_reduce_output(fused_output, og_hidden_dims)

    def forward_dispatch(
        self,
        layer: torch.nn.Module,
        hidden_states: torch.Tensor,
        router_logits: torch.Tensor,
        shared_experts_input: torch.Tensor | None,
    ) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor]:
        # TODO(bnell): this can be removed after MK migration is complete.
        layer.ensure_moe_quant_config_init()

        # Sync aux and main stream for shared expert multi-stream overlap.
        self._maybe_sync_shared_experts_stream(shared_experts_input)

        # If the Runner holds the gate, apply it after the stream sync,
        # so it can run overlapped with the
        # NOTE: in future PR, MoE runner will always hold the gate.
        if self.gate is not None:
            router_logits, _ = self.gate(hidden_states)

        with self._sequence_parallel_context():
            return self._forward_impl(
                layer,
                hidden_states,
                router_logits,
                shared_experts_input,
            )

    @abstractmethod
    def _forward_impl(
        self,
        layer: torch.nn.Module,
        hidden_states: torch.Tensor,
        router_logits: torch.Tensor,
        shared_experts_input: torch.Tensor | None,
    ) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor]:
        raise NotImplementedError

apply_routed_input_transform

apply_routed_input_transform(
    hidden_states: Tensor,
) -> tuple[Tensor, Tensor | None]

Apply transform for routed experts (e.g., latent projection).

This is called by FusedMoE.forward_native. The original hidden_states is saved separately so shared experts get [S, hidden_size] while routed experts get the transformed [S, moe_latent_size].

TODO: For latent MoE bandwidth optimization, fc2_latent_proj could be moved inside SharedFusedMoE to all-reduce on the smaller latent dimension.

Returns (possibly transformed) hidden states and the input for shared experts (or None if there are no shared experts).

Source code in vllm/model_executor/layers/fused_moe/runner/moe_runner_base.py
def apply_routed_input_transform(
    self, hidden_states: torch.Tensor
) -> tuple[torch.Tensor, torch.Tensor | None]:
    """Apply transform for routed experts (e.g., latent projection).

    This is called by FusedMoE.forward_native. The original hidden_states
    is saved separately so shared experts get [S, hidden_size] while
    routed experts get the transformed [S, moe_latent_size].

    TODO: For latent MoE bandwidth optimization, fc2_latent_proj could be
    moved inside SharedFusedMoE to all-reduce on the smaller latent
    dimension.

    Returns (possibly transformed) hidden states and the input for shared
    experts (or None if there are no shared experts).
    """
    if self.routed_input_transform is not None:
        result = self.routed_input_transform(hidden_states)
        # ReplicatedLinear returns (output, extra_bias) tuple.
        # We only need the output tensor; extra_bias is not used here.
        if isinstance(result, tuple):
            return result[0], hidden_states
        return result, hidden_states

    return (
        hidden_states,
        hidden_states if self._shared_experts is not None else None,
    )

forward

forward(
    hidden_states: Tensor, router_logits: Tensor
) -> Tensor | tuple[Tensor, Tensor]

Invoke the fused moe layer.

Input: - hidden_states - router_logits

Output: - The new hidden_states. or - A tuple of (shared experts output, new hidden_states).

Calling sequence - forward - self.forward_entry (_moe_forward or _moe_forward_shared custom op) - forward_dispatch - _forward_impl

Note: The existence of _moe_forward and _moe_forward_shared custom ops are due to the following reasons: 1. the chunking loop in ChunkingMoERunner._forward_impl cannot be compiled by torch.compile 2. pytorch cannot handle union types in custom op signatures so _moe_forward and _moe_forward_shared must be split.

If ChunkingMoERunner._forward_impl can be implemented via torch.scan we can potentially get rid of _moe_forward and _moe_forward_shared and collapse the whole sequence into the 'forward' method.

Source code in vllm/model_executor/layers/fused_moe/runner/moe_runner_base.py
def forward(
    self,
    hidden_states: torch.Tensor,
    router_logits: torch.Tensor,
) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor]:
    """Invoke the fused moe layer.

    Input:
    - hidden_states
    - router_logits

    Output:
    - The new hidden_states.
    or
    - A tuple of (shared experts output, new hidden_states).

    Calling sequence
    - forward
      - self.forward_entry (_moe_forward or _moe_forward_shared custom op)
        - forward_dispatch
          - _forward_impl

    Note: The existence of _moe_forward and _moe_forward_shared custom ops are due
    to the following reasons:
    1. the chunking loop in ChunkingMoERunner._forward_impl cannot be compiled by
       torch.compile
    2. pytorch cannot handle union types in custom op signatures so _moe_forward
       and _moe_forward_shared must be split.

    If ChunkingMoERunner._forward_impl can be implemented via torch.scan we can
    potentially get rid of _moe_forward and _moe_forward_shared and collapse the
    whole sequence into the 'forward' method.
    """

    # Apply transform for routed experts (e.g., latent projection for latent MoE)
    hidden_states, shared_experts_input = self.apply_routed_input_transform(
        hidden_states
    )

    hidden_states, og_hidden_dims = self._maybe_pad_hidden_states(
        shared_experts_input,
        hidden_states,
    )

    fused_output = self.forward_entry(
        hidden_states,
        router_logits,
        shared_experts_input,
        self._encode_layer_name(),
    )

    return self._maybe_reduce_output(fused_output, og_hidden_dims)

maybe_all_reduce_tensor_model_parallel

maybe_all_reduce_tensor_model_parallel(
    final_hidden_states: Tensor,
)

Some combine kernels reduce across GPU ranks by default.

Source code in vllm/model_executor/layers/fused_moe/runner/moe_runner_base.py
def maybe_all_reduce_tensor_model_parallel(self, final_hidden_states: torch.Tensor):
    """
    Some combine kernels reduce across GPU ranks by default.
    """
    if self.must_reduce_shared_expert_outputs():
        return final_hidden_states
    else:
        return tensor_model_parallel_all_reduce(final_hidden_states)

must_reduce_shared_expert_outputs

must_reduce_shared_expert_outputs() -> bool

The shared_experts are typically computed using the RowParallelLinear layer. The result of this function is typically used as the reduce_results argument to the module. When just tensor-parallel is used, it is not required to reduce the shared_experts results immediately. Instead we reduce at the once at the end of the MoE op. (Refer to DeepSeekV2MoE module) With EP and all2all kernels - this is no longer viable as all GPU ranks in DP, produce the complete set of hidden_states. Therefore it is required that we reduce the shared_experts output early.

Source code in vllm/model_executor/layers/fused_moe/runner/moe_runner_base.py
def must_reduce_shared_expert_outputs(self) -> bool:
    """
    The shared_experts are typically computed using the RowParallelLinear
    layer. The result of this function is typically used as
    the reduce_results argument to the module.
    When just tensor-parallel is used, it is not required to reduce
    the shared_experts results immediately. Instead we reduce at the
    once at the end of the MoE op. (Refer to DeepSeekV2MoE module)
    With EP and all2all kernels - this is no longer viable as all
    GPU ranks in DP, produce the complete set of hidden_states.
    Therefore it is required that we reduce the shared_experts output
    early.
    """
    return (
        self.quant_method.moe_kernel is not None
        and self.quant_method.moe_kernel.output_is_reduced()
    )