Read OSS

交易的流转:交易池架构与 P2P 传播机制

高级

前置知识

  • 第 1–3 篇文章
  • 以太坊交易类型(Legacy、EIP-1559、EIP-4844 blob)
  • 基本的 P2P 网络概念

交易的流转:交易池架构与 P2P 传播机制

我们已经追踪了区块从执行、状态更新到落盘的完整流程。现在让我们换一个方向,来看看交易是如何到达节点、经过验证后进入交易池、在网络中传播,并最终被打包进区块的。这一生命周期涉及两个核心子系统:交易池(及其 SubPool 聚合器模式)和 devp2p 网络栈。handler 结构体作为中央协调器,将二者紧密连接在一起。

交易类型与多交易池的必要性

以太坊目前共有五种交易类型,均定义于 core/types/transaction.go

const (
    LegacyTxType     = 0x00
    AccessListTxType = 0x01
    DynamicFeeTxType = 0x02
    BlobTxType       = 0x03
    SetCodeTxType    = 0x04
)

类型 0x00 到 0x02 以及 0x04 属于"常规"交易——它们携带 calldata,大小特征相近,生命周期也基本一致。类型 0x03(由 EIP-4844 引入的 blob 交易)则截然不同:每笔 blob 交易可携带数百 KB 的 blob 数据。这种体积上的差异,对交易池管理、网络传播和淘汰策略都有深远影响。

一个单一的交易池无法同时高效地处理 200 字节的常规交易和 200 KB 的 blob 交易——二者所需的淘汰策略、内存预算和持久化方案都截然不同。这正是 Geth 引入 SubPool 聚合器模式的根本原因。

SubPool 聚合器模式

TxPool 结构体本身并不是一个交易池,而是一个聚合器,负责统一管理多个各有侧重的交易池实现:

type TxPool struct {
    subpools []SubPool
    chain    BlockChain
    stateLock sync.RWMutex
    state     *state.StateDB
    subs      event.SubscriptionScope
    quit      chan chan error
    term      chan struct{}
    sync      chan chan error
}

聚合器对外提供统一的 API,各个 SubPool 则针对自身负责的交易类型采用最优策略处理:

flowchart TD
    INCOMING["Incoming Transaction"] --> FILTER["TxPool.Add()"]
    FILTER --> DISPATCH["Dispatch to matching SubPool<br/>(based on Filter())"]
    DISPATCH --> LP["legacypool<br/>Types: 0x00, 0x01, 0x02, 0x04<br/>In-memory with journal<br/>Price-based eviction"]
    DISPATCH --> BP["blobpool<br/>Type: 0x03<br/>Disk-backed (billy)<br/>Size-aware eviction"]
    LP --> PENDING["Pending() — merged view"]
    BP --> PENDING
    PENDING --> MINER["Miner / Block Builder"]

SubPool 接口定义非常全面,实现方需支持过滤、初始化、重置(在链头变化时)、gas 小费更新、交易添加、待处理交易获取以及状态查询等功能。其中 Filter(tx) 方法用于让每个 subpool 声明自己所处理的交易类型。

eth.New() 中,两个交易池被分别创建并组合在一起:

legacyPool := legacypool.New(config.TxPool, eth.blockchain)
eth.blobTxPool = blobpool.New(config.BlobPool, eth.blockchain, legacyPool.HasPendingAuth)
eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, eth.blobTxPool})

传入 blobpool.NewlegacyPool.HasPendingAuth 回调是一个跨池协调点——它允许 blob pool 查询 legacy pool 中某个账户是否存在待处理的 SetCode 授权。

LazyTransaction:延迟加载优化

当矿工请求待处理交易以构建区块时,如果对每一笔候选交易都加载完整数据,代价会相当高昂——尤其是那些最终可能根本不会被打包的多 MB 级 blob 交易。LazyTransaction 模式正是为此而生:

type LazyTransaction struct {
    Pool      LazyResolver
    Hash      common.Hash
    Tx        *types.Transaction  // nil until resolved
    Time      time.Time
    GasFeeCap *uint256.Int
    GasTipCap *uint256.Int
    Gas       uint64
    BlobGas   uint64
}

LazyTransaction 仅携带矿工进行排序和筛选所需的最少元数据(gas 上限、gas 限制、哈希值),完整的交易数据只在真正需要时才通过 Resolve() 加载:

func (ltx *LazyTransaction) Resolve() *types.Transaction {
    if ltx.Tx != nil {
        return ltx.Tx
    }
    return ltx.Pool.Get(ltx.Hash)
}

LazyResolver 接口极为精简,仅包含一个 Get(hash) 方法。每个 SubPool 都实现了该接口,resolver 被注入到 lazy transaction 中,使其能够从对应的交易池中按需拉取完整数据。

提示: Resolve() 上的注释揭示了一个重要的设计决策——该方法有意缓存已解析的交易(如果原始交易池本身也未缓存的话)。对于 blob 交易而言,盲目缓存可能导致内存膨胀。

devp2p 网络栈

交易通过 devp2p 网络栈进出节点。p2p.Server 负责管理对等节点连接、协议多路复用和节点发现:

flowchart TD
    subgraph "Discovery"
        DNS["DNS Discovery"]
        V4["discv4 (UDP)"]
        V5["discv5 (UDP)"]
        FAIR["FairMix<br/>Balanced source selection"]
    end
    subgraph "Transport"
        RLPX["RLPx Encrypted TCP"]
    end
    subgraph "Protocols"
        ETH_P["eth/68 Protocol"]
        SNAP_P["snap/1 Protocol"]
    end
    DNS --> FAIR
    V4 --> FAIR
    V5 --> FAIR
    FAIR --> RLPX
    RLPX --> ETH_P
    RLPX --> SNAP_P

Protocol 结构体定义了一个 devp2p 子协议:

type Protocol struct {
    Name    string
    Version uint
    Length  uint64
    Run     func(peer *Peer, rw MsgReadWriter) error
    DialCandidates enode.Iterator
    Attributes     []enr.Entry
}

每当有新的对等节点连接时,Run 函数都会在一个新的 goroutine 中被调用,通过 MsgReadWriter 收发协议消息。DialCandidates 字段提供了一个候选节点的迭代器,数据来源于 FairMix 发现混合器。

FairMix 的设计值得关注——它将多种节点发现来源(DNS、discv4、discv5)整合在一起,并通过公平性保证确保没有任何单一来源独占连接机会。每轮超时设为 100ms,让各来源都有均等的出场机会。

handler:P2P 与区块链的连接枢纽

handler 结构体是网络子系统与区块链子系统之间的中央协调器:

type handler struct {
    nodeID    enode.ID
    networkID uint64
    synced    atomic.Bool
    database  ethdb.Database
    txpool    txPool
    chain     *core.BlockChain
    maxPeers  int
    downloader     *downloader.Downloader
    txFetcher      *fetcher.TxFetcher
    peers          *peerSet
    txBroadcastKey [16]byte
    // ...
}
sequenceDiagram
    participant Peer as Remote Peer
    participant Handler as handler
    participant TxFetcher as TxFetcher
    participant TxPool as TxPool
    participant Miner as Miner

    Peer->>Handler: NewPooledTransactionHashes (announcement)
    Handler->>TxFetcher: Notify(peer, hashes)
    TxFetcher->>Peer: GetPooledTransactions (fetch)
    Peer-->>TxFetcher: PooledTransactions (response)
    TxFetcher->>TxPool: Add(txs)
    TxPool-->>Handler: NewTxsEvent
    Handler->>Peer: Broadcast or Announce to other peers
    Miner->>TxPool: Pending() — pull txs for block

handler 协调三个核心流程:

  1. 交易传播:当新交易事件触发时,handler 会决定是广播完整交易数据,还是仅通告其哈希值。判断阈值为 txMaxBroadcastSize = 4096 字节——超过此大小的交易只会被通告,对等节点需主动请求才能获取完整数据。这一机制对 blob 交易尤为关键。

  2. 链同步downloader 负责处理初始同步以及离线后的追块逻辑,支持全量同步和 snap 同步两种模式。

  3. 节点管理peerSet 跟踪已连接的对等节点,txBroadcastKey 则为交易广播提供确定性路由——每个节点基于交易哈希的 SipHash 值选择一个子集的对等节点进行广播,在保证良好网络覆盖的同时避免泛洪。

newHandler() 构造函数负责初始化 downloader 和交易获取器,并将相关回调函数与交易池连接起来:

fetchTx := func(peer string, hashes []common.Hash) error {
    p := h.peers.peer(peer)
    if p == nil { return errors.New("unknown peer") }
    return p.RequestTxs(hashes)
}
addTxs := func(txs []*types.Transaction) []error {
    return h.txpool.Add(txs, false)
}
h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer)

提示: synced 原子布尔值是一个关键的协调标志。在节点认为自己与网络完成同步之前,交易处理功能处于禁用状态。enableSyncedFeatures() 方法负责翻转该标志,从而解锁交易广播和交易池接收功能。如果你的节点无法接受交易,不妨先确认同步是否已经完成。

交易在网络中流转并进入交易池,区块在状态上不断执行——这一切就绪之后,还剩下最后一个问题:外部世界如何与这一切交互?答案是 RPC 层,这也将是第 6 篇的主题。