Pool 系统:Vitest 如何在 Worker 之间分配和执行测试
前置知识
- ›第 1 篇:架构与项目结构
- ›第 2 篇:CLI 启动与配置解析
- ›Node.js worker_threads 和 child_process API
- ›理解 RPC(远程过程调用)模式
Pool 系统:Vitest 如何在 Worker 之间分配和执行测试
Vitest 完成配置解析和项目初始化后,面临的核心挑战是:如何在隔离环境中并行执行可能多达数千个测试文件,同时将结果实时推送给报告器。驱动这一切运转的,正是 Pool 系统。
Vitest 的 Pool 架构分为三层:Pool 负责维护任务队列并将工作调度到多个 PoolRunner 实例上;每个 PoolRunner 封装一个 PoolWorker,后者持有实际的线程或进程。这种分层抽象使得同一套调度逻辑可以驱动 worker_threads、child_process.fork()、VM 隔离变体、浏览器会话以及 TypeScript 类型检查——统一通过一个接口对外暴露。
Pool 类型与创建
Vitest 内置六种 Pool 类型,定义于 packages/vitest/src/node/pool.ts#L38-L45:
export const builtinPools: BuiltinPool[] = [
'forks',
'threads',
'browser',
'vmThreads',
'vmForks',
'typescript',
]
| Pool | 机制 | 适用场景 |
|---|---|---|
threads |
worker_threads |
默认选项。启动快,支持共享内存。 |
forks |
child_process.fork() |
隔离性更强,部分原生模块必须使用此模式。 |
vmThreads |
worker_threads + node:vm |
在线程内为每个文件创建独立的 VM 上下文。 |
vmForks |
child_process.fork() + node:vm |
在子进程中启用 VM 隔离。 |
browser |
浏览器编排 | 通过 Playwright/WebdriverIO 在真实浏览器中运行测试。 |
typescript |
类型检查器 | 运行 tsc/vue-tsc 进行类型层面的测试。 |
第 54 行 的 createPool() 函数负责创建 Pool 实例并应用解析后的配置。executeTests() 函数则按 Pool 类型对 TestSpecification 对象进行分组,再将各组分发给对应的 Pool 实现。
flowchart TD
ET["executeTests(specs)"] --> Seq["sequencer.sort(specs)"]
Seq --> Group["Group by pool type + project + environment"]
Group --> G1["threads group"]
Group --> G2["forks group"]
Group --> G3["browser group"]
G1 --> Pool["Pool.run(task)"]
G2 --> Pool
G3 --> BrowserPool["Browser Pool (separate)"]
三层 Pool 架构
Pool 系统由三个职责明确的层次组成:
classDiagram
class Pool {
-queue: QueuedTask[]
-activeTasks: ActiveTask[]
-maxWorkers: number
+run(task, method): Promise
+cancel(): Promise
-schedule(): Promise
-getPoolRunner(task): PoolRunner
}
class PoolRunner {
+poolId: number
+project: TestProject
+environment: ContextTestEnvironment
-_state: RunnerState
-_rpc: BirpcReturn
+start(): Promise
+stop(options?): Promise
+waitForTerminated(): Promise
}
class PoolWorker {
<<interface>>
+name: string
+on(event, callback): void
+off(event, callback): void
+send(message: WorkerRequest): void
+start(): Promise
+stop(): Promise
+canReuse?(task): boolean
+deserialize(data): unknown
}
Pool --> "0..*" PoolRunner
PoolRunner --> "1" PoolWorker
Pool(packages/vitest/src/node/pools/pool.ts#L31-L50)维护任务队列和活跃任务集合。调用 run() 时,任务入队,schedule() 随即检查是否有空闲 Worker。若活跃任务数未达到 maxWorkers 上限,则取出队首任务,查找或创建一个 PoolRunner 并分配工作。系统还会持续监控内存用量——一旦某个 Worker 超出限制,对应的 Runner 会被停止并重新创建。
PoolRunner(packages/vitest/src/node/pools/poolRunner.ts#L42-L77)负责管理单个 Worker 的生命周期。它追踪状态迁移(IDLE → STARTING → STARTED → STOPPING → STOPPED),持有 birpc 通道,并处理启停超时。若关闭了隔离模式且 Worker 的 canReuse() 返回 true,同一个 Runner 可以复用于多个测试文件。
PoolWorker(packages/vitest/src/node/pools/types.ts#L22-L41)是实际线程或进程实现必须满足的接口,契约设计得相当简洁:on/off 用于事件订阅,send 用于消息传递,start/stop 用于生命周期管理,可选的 canReuse 用于 Worker 复用决策。
Worker 实现
packages/vitest/src/node/pools/workers/threadsWorker.ts 中的 ThreadsPoolWorker 是 PoolWorker 接口的一个典型实现:
export class ThreadsPoolWorker implements PoolWorker {
public readonly name: string = 'threads'
constructor(options: PoolOptions) {
this.entrypoint = resolve(options.distPath, 'workers/threads.js')
}
async start(): Promise<void> {
this._thread ||= new Worker(this.entrypoint, {
env: this.env,
execArgv: this.execArgv,
stdout: true,
stderr: true,
})
this._thread.stdout.pipe(this.stdout)
this._thread.stderr.pipe(this.stderr)
}
send(message: WorkerRequest): void {
this.thread.postMessage(message)
}
async stop(): Promise<void> {
await this.thread.terminate()
}
}
入口点指向 rollup 构建产物中的 workers/threads.js,它是 Node 端编排逻辑与 Worker 端测试执行之间的桥梁。
提示: 调试 Worker 问题时,重点检查
execArgv数组——--inspect、--max-old-space-size等 Node 标志都是在这里注入的。env对象则控制 Worker 能看到哪些环境变量。
birpc 通信桥接
Node 与 Worker 之间的通信通过 birpc 建立双向 RPC 通道。Node 侧的 RPC 方法定义于 packages/vitest/src/node/pools/rpc.ts#L17-L29:
sequenceDiagram
participant Worker as Worker (Runtime)
participant RPC as birpc Channel
participant Node as Node (Orchestration)
Worker->>RPC: rpc.fetch(url, importer, env)
RPC->>Node: createMethodsRPC().fetch()
Node->>Node: project._fetcher(url, ...)
Node-->>RPC: TransformResult
RPC-->>Worker: Module source code
Worker->>RPC: rpc.resolve(id, importer, env)
RPC->>Node: pluginContainer.resolveId()
Node-->>Worker: Resolved path
Worker->>RPC: rpc.onTaskUpdate(packs, events)
RPC->>Node: StateManager + Reporters
Worker->>RPC: rpc.snapshotSaved(snapshot)
RPC->>Node: SnapshotManager
createMethodsRPC() 函数对外暴露以下核心操作:
fetch()— 通过 Vite 的处理管道获取并转换模块,测试文件及其依赖项正是通过这一机制完成编译的。resolve()— 通过 Vite 的 plugin container 解析模块说明符。onTaskUpdate()— 将测试进度和结果上报给 StateManager。- 快照相关操作 —
snapshotSaved、resolveSnapshotPath等。
Worker 侧通过 packages/vitest/src/runtime/rpc.ts 中的 rpc() 调用访问这些方法。Worker 内部每一次模块导入,最终都会通过 fetch() 回调到 Node 进程,由 Vite 的完整 plugin 管道对源码进行转换。
Worker 启动流程
threads 模式的 Worker 入口文件出乎意料地简洁,packages/vitest/src/runtime/workers/threads.ts 中只有短短 4 行:
import { runBaseTests, setupBaseEnvironment } from './base'
import workerInit from './init-threads'
workerInit({ runTests: runBaseTests, setup: setupBaseEnvironment })
init-threads 中的 workerInit 函数负责搭建消息处理循环:监听 WorkerRequest 消息(start、run、collect、stop、cancel)并做相应分发。收到 start 消息时,调用 setupBaseEnvironment。
packages/vitest/src/runtime/workers/base.ts#L72 中的 setupBaseEnvironment 承担了最繁重的初始化工作:
sequenceDiagram
participant PM as Pool Manager
participant W as Worker Thread
participant Base as base.ts
participant MR as Module Runner
participant Env as Environment
PM->>W: { type: 'start', config, environment }
W->>Base: setupBaseEnvironment(context)
Base->>MR: startModuleRunner(options)
MR->>MR: Override process.exit
MR->>MR: Set up error listeners
MR->>MR: Create VitestModuleRunner or NativeModuleRunner
Base->>Env: loadEnvironment(name)
Env->>Env: Setup (jsdom/happy-dom/node/edge-runtime)
Base-->>W: Return teardown function
W-->>PM: { type: 'started' }
Module runner 在每个 Worker 中只创建一次,并在多个文件间复用。若实验性标志 viteModuleRunner 为 false,则使用基于 Node 原生模块加载的 NativeModuleRunner,而非 Vite 的 ModuleRunner。
Worker 中的测试执行:runBaseTests
Pool 发送 { type: 'run' } 消息后,Worker 会调用 packages/vitest/src/runtime/runBaseTests.ts#L21-L92 中的 run():
flowchart TD
Run["run(method, files, config, moduleRunner, environment)"] --> Setup["Parallel setup:<br>1. Resolve test runner<br>2. Setup global env<br>3. Start coverage<br>4. Resolve snapshot env"]
Setup --> Loop["For each file"]
Loop --> Isolated{config.isolate?}
Isolated -- "yes" --> Reset["Reset mocker + modules"]
Isolated -- "no" --> Skip["Skip reset"]
Reset & Skip --> SetPath["workerState.filepath = file"]
SetPath --> Method{method?}
Method -- "run" --> StartTests["startTests([file], testRunner)"]
Method -- "collect" --> CollectTests["collectTests([file], testRunner)"]
StartTests & CollectTests --> PostFile["vi.resetConfig()<br>vi.restoreAllMocks()"]
PostFile --> Loop
Loop -- "all files done" --> Coverage["stopCoverageInsideWorker()"]
四个初始化操作通过 Promise.all 并行执行,将测试 runner 解析、全局环境设置、覆盖率初始化和快照环境解析同步推进。之后进入逐文件的顺序循环:若开启了隔离模式,则在每个文件处理完后重置模块状态;接着调用测试 runner;最后恢复所有 mock。
startTests() 和 collectTests() 来自 @vitest/runner——这个与框架无关的测试运行引擎将在下一篇文章中详细介绍。
提示: 如果测试启动缓慢,首先排查并行初始化阶段。覆盖率初始化和环境设置的开销可能相当可观——可以借助 trace(
experimental.openTelemetry)定位瓶颈所在。
结果收集:StateManager 与 TestRun
测试结果从 Worker 经由 RPC 桥接传递到 Node 侧的 StateManager 和 TestRun。
packages/vitest/src/node/state.ts#L19-L26 — StateManager 维护完整的测试状态:
export class StateManager {
filesMap: Map<string, File[]> = new Map()
pathsSet: Set<string> = new Set()
idMap: Map<string, Task> = new Map()
taskFileMap: WeakMap<Task, File> = new WeakMap()
errorsSet: Set<unknown> = new Set()
leakSet: Set<AsyncLeak> = new Set()
reportedTasksMap: WeakMap<Task, TestModule | TestCase | TestSuite> = new WeakMap()
}
packages/vitest/src/node/test-run.ts#L27-L56 — TestRun 协调报告器事件的完整生命周期:
sequenceDiagram
participant TR as TestRun
participant SM as StateManager
participant R as Reporters
TR->>SM: collectPaths(filepaths)
TR->>R: onTestRunStart(specifications)
loop For each file
TR->>SM: collectFiles(project, [file])
TR->>R: onTestModuleQueued(testModule)
Note over TR: Tests collected
TR->>SM: collectFiles(project, files)
TR->>R: onTestModuleCollected(testModule)
Note over TR: Tests executing...
TR->>R: onTestCaseReady(testCase)
TR->>R: onTestCaseResult(testCase)
end
TR->>R: onTestRunEnd(modules, errors, reason)
TestRun.start() 将文件路径收集到 StateManager 并触发 onTestRunStart。随着 Worker 通过 RPC 持续上报结果,enqueued() 和 collected() 会更新状态并通知报告器。各测试用例的结果通过任务更新事件逐条流入。所有 Worker 执行完毕后,TestRun 触发 onTestRunEnd,并附上最终结论:'passed'、'failed' 或 'interrupted'。
下一篇
至此,我们已经完整追踪了从 Pool 调度器到 Worker 启动,再到结果收集的全链路。下一篇将深入 @vitest/runner——驱动测试 DSL 的框架无关引擎。我们将了解 describe/test/it 如何构建可链式调用的 API、文件如何被收集成任务树、hooks 如何按正确顺序执行,以及类 Playwright 的 fixture 机制如何实现依赖注入。