RAGパイプライン:ドキュメントのインデックス化、ベクトルストレージ、ナレッジ検索
前提知識
- ›第1〜2回:アーキテクチャ概要とリクエストフロー
- ›テキスト埋め込みとベクトル類似検索の基本的な理解
- ›RAG(Retrieval-Augmented Generation)の概念に関する知識
RAGパイプライン:ドキュメントのインデックス化、ベクトルストレージ、ナレッジ検索
Retrieval-Augmented Generation(RAG)は、知識に基づいたLLMアプリケーションの根幹をなす仕組みです。Difyのパイプラインは、PDFのアップロードからチャット会話中のドキュメントチャンク返却まで、すべてを一貫して担います。本記事では、パイプラインの両面を追っていきます。ドキュメントを検索可能なベクトルへと変換するインデックス化の流れと、クエリ時に関連ナレッジを探し出す検索の流れです。途中では、30以上のベクトルデータベースアダプター、複数データセットのルーティング戦略、そして細粒度な検索を可能にする親子チャンキングモデルについても見ていきましょう。
ドキュメントのインデックス化パイプライン
ドキュメントのインデックス化は、IndexingRunner クラスが中心となり、複数ステージのパイプラインを調整します。
flowchart TD
Upload[Document Upload] --> Task[Celery Task<br/>document_indexing_task]
Task --> Runner[IndexingRunner.run]
Runner --> Extract[Extract Text<br/>PDF, DOCX, HTML, etc.]
Extract --> Clean[Clean Text<br/>remove extras, normalize]
Clean --> Split[Split Text<br/>FixedRecursiveCharacterTextSplitter]
Split --> Embed[Generate Embeddings<br/>via ModelInstance]
Embed --> Store[Store in Vector DB<br/>+ PostgreSQL metadata]
Store --> Done[Update Document Status]
Runner -->|Error| ErrorHandler[Mark Document as ERROR<br/>with error message]
run() メソッドはドキュメントを順に処理し、それぞれを以下の4つのステージに通します。
- Extract — データソースの種別に応じた専用エクストラクターに処理を委譲します
- Transform — クリーニングルールを適用し、データセットの処理ルールに従ってテキストをチャンクに分割します
- Load — 埋め込みを生成し、メタデータとともにベクトルを保存します
各ドキュメントのステータスは IndexingStatus enum(pending → indexing → completed/error/paused)で管理され、UIから進捗を確認できます。
インデックス化タスクは tasks/document_indexing_task.py で定義され、Celeryの dataset キューに投入されます。これにより、インデックス化の処理はリアルタイムのワークフロー実行とワーカーリソースを奪い合うことがありません。タスクはテナント間の公平なスケジューリングのために TenantIsolatedTaskQueue を使用します。
Tip: インデックス化が不明なエラーで失敗した場合は、データベースの
Document.errorフィールドを確認しましょう。_handle_indexing_error()メソッドがエラーメッセージをそこに記録しており、UIでユーザーに表示されます。
テキスト抽出と分割の戦略
Difyは core/rag/extractor/ 以下のエクストラクタークラス群を通じて、幅広いフォーマットのテキスト抽出をサポートしています。
| フォーマット | エクストラクター | 備考 |
|---|---|---|
PdfExtractor |
pypdf または Unstructured API を使用 | |
| DOCX | WordExtractor |
python-docx ベース |
| HTML | HtmlExtractor |
BeautifulSoup によるパース |
| Markdown | MarkdownExtractor |
カスタムパーサー |
| CSV/Excel | CsvExtractor, ExcelExtractor |
表形式データに対応 |
| Notion | NotionExtractor |
API連携 |
| Website | WebsiteExtractor |
Jina Reader / Firecrawl |
抽出後のチャンキングは FixedRecursiveCharacterTextSplitter が担います。このスプリッターはデータセット単位で設定可能で、以下のパラメーターを持ちます。
- チャンクサイズ — チャンクあたりの最大文字数
- チャンクオーバーラップ — 隣接チャンク間で共有される文字数
- セパレーター — 分割の区切り文字の優先順位(段落 → 文 → 単語)
コードブロックや構造化コンテンツをより適切に扱える EnhanceRecursiveCharacterTextSplitter も用意されています。データセットの DatasetProcessRule モデルに有効な分割設定が保存されるため、データセットごとに異なる戦略を使い分けることができます。
ベクトルデータベースのアダプターパターン
Difyの優れたエンジニアリングのひとつが、共通インターフェースによる32種類のベクトルデータベースサポートです。VectorType enumにすべてが列挙されています。
classDiagram
class BaseVector {
<<abstract>>
+create(texts, embeddings)
+delete()
+search_by_vector(query_vector)
+search_by_full_text(query)
}
class QdrantVector
class WeaviateVector
class MilvusVector
class PgVectorVector
class ElasticsearchVector
class ChromaVector
class OceanBaseVector
class TencentVector
BaseVector <|-- QdrantVector
BaseVector <|-- WeaviateVector
BaseVector <|-- MilvusVector
BaseVector <|-- PgVectorVector
BaseVector <|-- ElasticsearchVector
BaseVector <|-- ChromaVector
BaseVector <|-- OceanBaseVector
BaseVector <|-- TencentVector
note for BaseVector "32 implementations total"
各アダプターは core/rag/datasource/vdb/ 以下に配置され、ベクトルの作成・削除・検索メソッドを持つ BaseVector インターフェースを実装しています。vector_factory.py モジュールがデータセットの設定に基づいて適切なアダプターを選択します。
api/configs/middleware/vdb/ 以下の設定クラスは、各データベースに対して型付きの設定を提供します。第1回で見たように、これらはPydanticの多重継承を通じて MiddlewareConfig に組み込まれています。
一部のアダプターはハイブリッド検索(ベクトル類似性とキーワード/全文検索の組み合わせ)をサポートしています。
- Milvus —
MILVUS_ENABLE_HYBRID_SEARCH - Tencent Vector DB —
TENCENT_VECTOR_DB_ENABLE_HYBRID_SEARCH - Elasticsearch — BM25 + ベクトルによるネイティブなハイブリッド検索
- PgVector —
pg_bigmと組み合わせた全文検索
Tip: ベクトルデータベースを選ぶ際は、ハイブリッド検索が必要かどうかを検討しましょう。純粋なセマンティック検索では、エラーコードや製品SKUのようなキーワード完全一致を取りこぼすことがあります。ハイブリッド検索をネイティブサポートするデータベースを選べば、別途キーワードインデックスを構築する手間が省けます。
複数データセットの検索とクエリルーティング
ワークフロー内でナレッジ検索ノードが実行されると、複数のデータセットを横断して検索する場面があります。DatasetRetrieval クラスが2つのルーティング戦略でこれを調整します。
flowchart TD
Query[User Query] --> Strategy{Routing Strategy?}
Strategy -->|single_dataset| Direct[Search single dataset]
Strategy -->|function_call| FCRouter[FunctionCallMultiDatasetRouter<br/>LLM selects relevant datasets]
Strategy -->|react| ReactRouter[ReactMultiDatasetRouter<br/>ReAct reasoning loop]
FCRouter --> Selected[Selected datasets]
ReactRouter --> Selected
Selected --> Parallel[Parallel retrieval<br/>per-dataset strategy]
Parallel --> Semantic[Semantic Search<br/>vector similarity]
Parallel --> Keyword[Keyword Search<br/>BM25/full-text]
Parallel --> Hybrid[Hybrid Search<br/>combined score]
Semantic --> Merge[Merge results]
Keyword --> Merge
Hybrid --> Merge
Merge --> Rerank[Reranking Pipeline]
Rerank --> Return[Return top-K results]
FunctionCallMultiDatasetRouter のアプローチはシンプルながら巧みです。各データセットを「ツール」としてLLMに提示し、クエリに関連するデータセットを選ばせます。データセットが1つだけの場合はLLM呼び出しをスキップします。ReactMultiDatasetRouter は、より複雑なルーティング判断にReActの推論ループを活用します。
選択された各データセットは、セマンティック・キーワード・ハイブリッドといった独自に設定された検索方法と、top-K・スコア閾値・リランク設定などのパラメーターを使って個別に検索されます。
デフォルトの検索モデルには、合理的な初期値が設定されています。
default_retrieval_model = {
"search_method": RetrievalMethod.SEMANTIC_SEARCH,
"reranking_enable": False,
"top_k": 4,
"score_threshold_enabled": False,
}
リランキング、後処理、親子チャンキング
初期検索の後、結果は data_post_processor.py の後処理パイプラインを通過します。DataPostProcessor は以下を適用します。
- リランキング — Cohere RerankやBGE Rerankerなどのリランキングモデルを使い、関連度でスコアを再計算します。モデルベースのリランキング、重み付きスコアリング、クロスエンコーダーリランキングなど複数のモードをサポートしています。
- スコア閾値フィルタリング — 設定した類似度閾値を下回る結果を除外します。
- 並べ替え —
reorder.pyモジュールが多様性ベースの並べ替えを適用し、意味的に重複するチャンクが続けて返されることを防ぎます。
親子チャンキングモデルは、DifyのRAGにおける高度な機能のひとつです。ChildChunk モデルにより、2段階のチャンキング戦略が実現されます。
erDiagram
Dataset ||--o{ Document : contains
Document ||--o{ DocumentSegment : "split into"
DocumentSegment ||--o{ ChildChunk : "further split into"
DocumentSegment {
string id PK
string content
string index_node_id
int word_count
int position
}
ChildChunk {
string id PK
string segment_id FK
string content
int position
string index_node_id
}
親セグメントがコンテキストウィンドウを提供し、子チャンクが細粒度なマッチングを可能にします。検索時には子チャンクに対してマッチングを行い、コンテキストとして親セグメントを返す仕組みです。これはRAGにおけるよくあるトレードオフへの解決策です。小さいチャンクは検索精度を高め、大きいチャンクはLLM生成に必要な豊富なコンテキストを提供します。親子チャンキングはその両方を両立させます。
非同期インデックス化と進捗トラッキング
インデックス化パイプラインはCeleryを通じて非同期で実行され、きめ細かな進捗トラッキングが行われます。エントリーポイントは document_indexing_task です。
@shared_task(queue="dataset")
def document_indexing_task(dataset_id: str, document_ids: list):
"""Async process document"""
_document_indexing(dataset_id, document_ids)
dataset キューを分離することで、インデックス化ジョブとリアルタイムのワークフロータスクが競合しません。_document_indexing() 関数はセッションを作成してドキュメントを取得し、IndexingRunner.run() に処理を委譲します。
進捗は複数の粒度でトラッキングされます。
- ドキュメントレベル —
IndexingStatusenum(pending/indexing/completed/error/paused) - セグメントレベル —
SegmentStatusが個々のチャンク処理を追跡 - 時刻トラッキング —
indexing_latency、completed_at、stopped_atタイムスタンプ
また、増分再インデックス化もサポートしています。ドキュメントが更新された際、変更されたセグメントだけを再埋め込み・再保存すればよい仕組みです。
Tip: 大量のドキュメントを扱う場合は、Celeryの
datasetキューの深さを監視しましょう。インデックス化が遅い場合は、CELERY_WORKER_AMOUNTを増やすかオートスケール(CELERY_AUTO_SCALE=true)の利用を検討してください。テナント分離タスクキューにより、あるテナントの大量アップロードが別テナントの処理をブロックすることはありません。
次回予告
ドキュメントがアップロードからベクトルストアを経て検索結果として返るまでの流れを追ってきました。第5回では、100以上のLLMプロバイダーを統一インターフェースで抽象化する仕組みと、エージェントやワークフローで利用できる5種類のツールを紹介します。加えて、信頼されていないコードを安全に実行するプラグインデーモンアーキテクチャも探ります。