Read OSS

Pool 系统:Vitest 如何在 Worker 之间分配和执行测试

高级

前置知识

  • 第 1 篇:架构与项目结构
  • 第 2 篇:CLI 启动与配置解析
  • Node.js worker_threads 和 child_process API
  • 理解 RPC(远程过程调用)模式

Pool 系统:Vitest 如何在 Worker 之间分配和执行测试

Vitest 完成配置解析和项目初始化后,面临的核心挑战是:如何在隔离环境中并行执行可能多达数千个测试文件,同时将结果实时推送给报告器。驱动这一切运转的,正是 Pool 系统。

Vitest 的 Pool 架构分为三层:Pool 负责维护任务队列并将工作调度到多个 PoolRunner 实例上;每个 PoolRunner 封装一个 PoolWorker,后者持有实际的线程或进程。这种分层抽象使得同一套调度逻辑可以驱动 worker_threadschild_process.fork()、VM 隔离变体、浏览器会话以及 TypeScript 类型检查——统一通过一个接口对外暴露。

Pool 类型与创建

Vitest 内置六种 Pool 类型,定义于 packages/vitest/src/node/pool.ts#L38-L45

export const builtinPools: BuiltinPool[] = [
  'forks',
  'threads',
  'browser',
  'vmThreads',
  'vmForks',
  'typescript',
]
Pool 机制 适用场景
threads worker_threads 默认选项。启动快,支持共享内存。
forks child_process.fork() 隔离性更强,部分原生模块必须使用此模式。
vmThreads worker_threads + node:vm 在线程内为每个文件创建独立的 VM 上下文。
vmForks child_process.fork() + node:vm 在子进程中启用 VM 隔离。
browser 浏览器编排 通过 Playwright/WebdriverIO 在真实浏览器中运行测试。
typescript 类型检查器 运行 tsc/vue-tsc 进行类型层面的测试。

第 54 行createPool() 函数负责创建 Pool 实例并应用解析后的配置。executeTests() 函数则按 Pool 类型对 TestSpecification 对象进行分组,再将各组分发给对应的 Pool 实现。

flowchart TD
    ET["executeTests(specs)"] --> Seq["sequencer.sort(specs)"]
    Seq --> Group["Group by pool type + project + environment"]
    Group --> G1["threads group"]
    Group --> G2["forks group"]
    Group --> G3["browser group"]
    G1 --> Pool["Pool.run(task)"]
    G2 --> Pool
    G3 --> BrowserPool["Browser Pool (separate)"]

三层 Pool 架构

Pool 系统由三个职责明确的层次组成:

classDiagram
    class Pool {
        -queue: QueuedTask[]
        -activeTasks: ActiveTask[]
        -maxWorkers: number
        +run(task, method): Promise
        +cancel(): Promise
        -schedule(): Promise
        -getPoolRunner(task): PoolRunner
    }

    class PoolRunner {
        +poolId: number
        +project: TestProject
        +environment: ContextTestEnvironment
        -_state: RunnerState
        -_rpc: BirpcReturn
        +start(): Promise
        +stop(options?): Promise
        +waitForTerminated(): Promise
    }

    class PoolWorker {
        <<interface>>
        +name: string
        +on(event, callback): void
        +off(event, callback): void
        +send(message: WorkerRequest): void
        +start(): Promise
        +stop(): Promise
        +canReuse?(task): boolean
        +deserialize(data): unknown
    }

    Pool --> "0..*" PoolRunner
    PoolRunner --> "1" PoolWorker

Poolpackages/vitest/src/node/pools/pool.ts#L31-L50)维护任务队列和活跃任务集合。调用 run() 时,任务入队,schedule() 随即检查是否有空闲 Worker。若活跃任务数未达到 maxWorkers 上限,则取出队首任务,查找或创建一个 PoolRunner 并分配工作。系统还会持续监控内存用量——一旦某个 Worker 超出限制,对应的 Runner 会被停止并重新创建。

PoolRunnerpackages/vitest/src/node/pools/poolRunner.ts#L42-L77)负责管理单个 Worker 的生命周期。它追踪状态迁移(IDLE → STARTING → STARTED → STOPPING → STOPPED),持有 birpc 通道,并处理启停超时。若关闭了隔离模式且 Worker 的 canReuse() 返回 true,同一个 Runner 可以复用于多个测试文件。

PoolWorkerpackages/vitest/src/node/pools/types.ts#L22-L41)是实际线程或进程实现必须满足的接口,契约设计得相当简洁:on/off 用于事件订阅,send 用于消息传递,start/stop 用于生命周期管理,可选的 canReuse 用于 Worker 复用决策。

Worker 实现

packages/vitest/src/node/pools/workers/threadsWorker.ts 中的 ThreadsPoolWorkerPoolWorker 接口的一个典型实现:

export class ThreadsPoolWorker implements PoolWorker {
  public readonly name: string = 'threads'
  
  constructor(options: PoolOptions) {
    this.entrypoint = resolve(options.distPath, 'workers/threads.js')
  }

  async start(): Promise<void> {
    this._thread ||= new Worker(this.entrypoint, {
      env: this.env,
      execArgv: this.execArgv,
      stdout: true,
      stderr: true,
    })
    this._thread.stdout.pipe(this.stdout)
    this._thread.stderr.pipe(this.stderr)
  }

  send(message: WorkerRequest): void {
    this.thread.postMessage(message)
  }

  async stop(): Promise<void> {
    await this.thread.terminate()
  }
}

入口点指向 rollup 构建产物中的 workers/threads.js,它是 Node 端编排逻辑与 Worker 端测试执行之间的桥梁。

提示: 调试 Worker 问题时,重点检查 execArgv 数组——--inspect--max-old-space-size 等 Node 标志都是在这里注入的。env 对象则控制 Worker 能看到哪些环境变量。

birpc 通信桥接

Node 与 Worker 之间的通信通过 birpc 建立双向 RPC 通道。Node 侧的 RPC 方法定义于 packages/vitest/src/node/pools/rpc.ts#L17-L29

sequenceDiagram
    participant Worker as Worker (Runtime)
    participant RPC as birpc Channel
    participant Node as Node (Orchestration)

    Worker->>RPC: rpc.fetch(url, importer, env)
    RPC->>Node: createMethodsRPC().fetch()
    Node->>Node: project._fetcher(url, ...)
    Node-->>RPC: TransformResult
    RPC-->>Worker: Module source code

    Worker->>RPC: rpc.resolve(id, importer, env)
    RPC->>Node: pluginContainer.resolveId()
    Node-->>Worker: Resolved path

    Worker->>RPC: rpc.onTaskUpdate(packs, events)
    RPC->>Node: StateManager + Reporters
    
    Worker->>RPC: rpc.snapshotSaved(snapshot)
    RPC->>Node: SnapshotManager

createMethodsRPC() 函数对外暴露以下核心操作:

  • fetch() — 通过 Vite 的处理管道获取并转换模块,测试文件及其依赖项正是通过这一机制完成编译的。
  • resolve() — 通过 Vite 的 plugin container 解析模块说明符。
  • onTaskUpdate() — 将测试进度和结果上报给 StateManager。
  • 快照相关操作snapshotSavedresolveSnapshotPath 等。

Worker 侧通过 packages/vitest/src/runtime/rpc.ts 中的 rpc() 调用访问这些方法。Worker 内部每一次模块导入,最终都会通过 fetch() 回调到 Node 进程,由 Vite 的完整 plugin 管道对源码进行转换。

Worker 启动流程

threads 模式的 Worker 入口文件出乎意料地简洁,packages/vitest/src/runtime/workers/threads.ts 中只有短短 4 行:

import { runBaseTests, setupBaseEnvironment } from './base'
import workerInit from './init-threads'

workerInit({ runTests: runBaseTests, setup: setupBaseEnvironment })

init-threads 中的 workerInit 函数负责搭建消息处理循环:监听 WorkerRequest 消息(startruncollectstopcancel)并做相应分发。收到 start 消息时,调用 setupBaseEnvironment

packages/vitest/src/runtime/workers/base.ts#L72 中的 setupBaseEnvironment 承担了最繁重的初始化工作:

sequenceDiagram
    participant PM as Pool Manager
    participant W as Worker Thread
    participant Base as base.ts
    participant MR as Module Runner
    participant Env as Environment

    PM->>W: { type: 'start', config, environment }
    W->>Base: setupBaseEnvironment(context)
    Base->>MR: startModuleRunner(options)
    MR->>MR: Override process.exit
    MR->>MR: Set up error listeners
    MR->>MR: Create VitestModuleRunner or NativeModuleRunner
    Base->>Env: loadEnvironment(name)
    Env->>Env: Setup (jsdom/happy-dom/node/edge-runtime)
    Base-->>W: Return teardown function
    W-->>PM: { type: 'started' }

Module runner 在每个 Worker 中只创建一次,并在多个文件间复用。若实验性标志 viteModuleRunnerfalse,则使用基于 Node 原生模块加载的 NativeModuleRunner,而非 Vite 的 ModuleRunner

Worker 中的测试执行:runBaseTests

Pool 发送 { type: 'run' } 消息后,Worker 会调用 packages/vitest/src/runtime/runBaseTests.ts#L21-L92 中的 run()

flowchart TD
    Run["run(method, files, config, moduleRunner, environment)"] --> Setup["Parallel setup:<br>1. Resolve test runner<br>2. Setup global env<br>3. Start coverage<br>4. Resolve snapshot env"]
    Setup --> Loop["For each file"]
    Loop --> Isolated{config.isolate?}
    Isolated -- "yes" --> Reset["Reset mocker + modules"]
    Isolated -- "no" --> Skip["Skip reset"]
    Reset & Skip --> SetPath["workerState.filepath = file"]
    SetPath --> Method{method?}
    Method -- "run" --> StartTests["startTests([file], testRunner)"]
    Method -- "collect" --> CollectTests["collectTests([file], testRunner)"]
    StartTests & CollectTests --> PostFile["vi.resetConfig()<br>vi.restoreAllMocks()"]
    PostFile --> Loop
    Loop -- "all files done" --> Coverage["stopCoverageInsideWorker()"]

四个初始化操作通过 Promise.all 并行执行,将测试 runner 解析、全局环境设置、覆盖率初始化和快照环境解析同步推进。之后进入逐文件的顺序循环:若开启了隔离模式,则在每个文件处理完后重置模块状态;接着调用测试 runner;最后恢复所有 mock。

startTests()collectTests() 来自 @vitest/runner——这个与框架无关的测试运行引擎将在下一篇文章中详细介绍。

提示: 如果测试启动缓慢,首先排查并行初始化阶段。覆盖率初始化和环境设置的开销可能相当可观——可以借助 trace(experimental.openTelemetry)定位瓶颈所在。

结果收集:StateManager 与 TestRun

测试结果从 Worker 经由 RPC 桥接传递到 Node 侧的 StateManagerTestRun

packages/vitest/src/node/state.ts#L19-L26StateManager 维护完整的测试状态:

export class StateManager {
  filesMap: Map<string, File[]> = new Map()
  pathsSet: Set<string> = new Set()
  idMap: Map<string, Task> = new Map()
  taskFileMap: WeakMap<Task, File> = new WeakMap()
  errorsSet: Set<unknown> = new Set()
  leakSet: Set<AsyncLeak> = new Set()
  reportedTasksMap: WeakMap<Task, TestModule | TestCase | TestSuite> = new WeakMap()
}

packages/vitest/src/node/test-run.ts#L27-L56TestRun 协调报告器事件的完整生命周期:

sequenceDiagram
    participant TR as TestRun
    participant SM as StateManager
    participant R as Reporters

    TR->>SM: collectPaths(filepaths)
    TR->>R: onTestRunStart(specifications)

    loop For each file
        TR->>SM: collectFiles(project, [file])
        TR->>R: onTestModuleQueued(testModule)
        
        Note over TR: Tests collected
        TR->>SM: collectFiles(project, files)
        TR->>R: onTestModuleCollected(testModule)
        
        Note over TR: Tests executing...
        TR->>R: onTestCaseReady(testCase)
        TR->>R: onTestCaseResult(testCase)
    end

    TR->>R: onTestRunEnd(modules, errors, reason)

TestRun.start() 将文件路径收集到 StateManager 并触发 onTestRunStart。随着 Worker 通过 RPC 持续上报结果,enqueued()collected() 会更新状态并通知报告器。各测试用例的结果通过任务更新事件逐条流入。所有 Worker 执行完毕后,TestRun 触发 onTestRunEnd,并附上最终结论:'passed''failed''interrupted'

下一篇

至此,我们已经完整追踪了从 Pool 调度器到 Worker 启动,再到结果收集的全链路。下一篇将深入 @vitest/runner——驱动测试 DSL 的框架无关引擎。我们将了解 describe/test/it 如何构建可链式调用的 API、文件如何被收集成任务树、hooks 如何按正确顺序执行,以及类 Playwright 的 fixture 机制如何实现依赖注入。