GraphRAG Data Construction Pipeline
The GraphRAG framework implements a sophisticated multi-stage pipeline for transforming unstructured documents into rich, interconnected knowledge graphs. The construction process follows a carefully orchestrated sequence of transformations, each leveraging large language models for intelligent information extraction and summarization.
Pipeline Stages
The data construction workflow consists of six primary stages, each building upon the output of the previous stage:
Document Chunking: Raw source documents are initially segmented into manageable text chunks. This chunking strategy balances the need for coherent context windows with the practical limitations of LLM processing capacity.
Element Instance Generation: Each text chunk undergoes intensive analysis using LLMs to identify and extract graph elements—specifically nodes representing entities and edges representing relationships. The model processes each chunk independently, producing structured extractions that capture the semantic content in graph-compatible format.
Element Summarization: Generated graph elements (entities and relationships) are processed through LLM-based summarization to create concise, informative descriptions. This abstraction step reduces noise while preserving essential semantic information, enabling more efficient downstream processing.
Community Detection: The resulting graph structure undergoes community detection using specialized algorithms. This partitioning organizes related entities and relationships into coherent clusters, enabling hierarchical understanding of document content.
Community Report Generation: Each identified community receives a comprehensive summary generated by LLMs. These reports capture the collective characteristics, significance, and interconnections within each community, serving as high-level abstractions of document content.
Answer Synthesis: For complex queries requiring holistic understanding, the system generates localized answers from individual community reports, then synthesizes these partial answers into comprehensive global responses.
The final output utilizes GraphML-formatted strings serialized within Parquet files, providing efficient storage with schema enforcement. Given the extensive LLM usage throughout this pipeline, processing large document collections incurs significant token consumption—a practical consideration for production deployments.
Engineering Implementation
The implementation architecture centers on the graphrag.index.cli.index_cli() entry point, which orchestrates the entire pipeline based on configuration parameters. When no explicit configuration is provided, the system invokes create_pipeline_config() to generate workflow definitions programmatically.
Workflow specifications reside in the graphrag/index/workflows/v1 directory, enabling modular customization of pipeline stages. This modular architecture supports custom entity extraction schemas, relationship definitions, and community detection parameters tailored to specific domain requirements.
Entity Extraction Prompt
The following prompt template guides LLM-based entity and relationship extraction from text chunks:
-Goal-
Given a text document potentially relevant to this activity and a list of entity types, identify all entities of those types from the text and all relationships among the identified entities.
-Steps-
1. Identify all entities. For each entity, extract:
- entity_name: Name of the entity, capitalized
- entity_type: One of the predefined type categories
- entity_description: Comprehensive description of entity attributes and activities
2. Identify relationships between entities:
- source_entity and target_entity
- relationship_description explaining the connection
- relationship_strength: numeric score from 1-10
3. Return output in English using {record_delimiter} as list separator
-Entity_types: {entity_types}
-Text: {input_text}
Community Report Generation
Community-level reports utilize a sophisticated prompting strategy that produces structured JSON output:
# Goal
Write a comprehensive report for a community containing entities, relationships, and claims.
# Report Structure
{
"title": "community_name",
"summary": "executive_summary",
"rating": float (0-10),
"rating_explanation": "single sentence justification",
"findings": [
{
"summary": "insight_summary",
"explanation": "detailed_explanation_with_evidence"
}
]
}
# Grounding Rules
Include data references in format: [Data: Dataset (record_ids)]
Maximum 5 record IDs per reference
Claim Extraction Prompt
For detailed claim analysis against specific entities:
-Goal-
Extract entities matching specifications and all claims against those entities.
For each claim, extract:
- Subject (capitalized entity name)
- Object (capitalized entity name or **NONE**)
- Claim Type (categorized)
- Claim Status: TRUE, FALSE, or SUSPECTED
- Claim Description with reasoning and evidence
- Claim Date period in ISO-8601 format
- Claim Source Text: original quotes supporting the claim
Format: ({subject}{tuple_delimiter}{object}{tuple_delimiter}{type}...
Query Processing Architecture
GraphRAG implements two distinct query modes—global and local—each optimized for different types of information needs. Both modes leverage the pre-computed knowledge graph but employ fundamentally different retrieval and reasoning strategies.
Global Search Mode
Global search targets questions requiring holistic understanding across the entire document collection. The implementation retrieves data from three primary sources:
- Nodes Table: Contains graph node information representing extracted entities and concepts
- Entities Table: Stores entity metadata including descriptions, types, and community associations
- Community Reports Table: Contains aggregated community summaries and rankings
The global search implementation follows a map-reduce pattern:
Global Search Implementation
def execute_global_search(
data_directory: str | None,
root_directory: str | None,
community_level: int,
response_format: str,
query: str,
) -> str:
"""Execute global search across community reports."""
data_path, _, config = initialize_paths(data_directory, root_directory)
# Load required data tables
nodes = pd.read_parquet(data_path / "create_final_nodes.parquet")
entities = pd.read_parquet(data_path / "create_final_entities.parquet")
reports = pd.read_parquet(data_path / "create_final_community_reports.parquet")
# Process reports and entities with community filtering
report_data = process_reports(reports, nodes, community_level)
entity_data = process_entities(nodes, entities, community_level)
# Initialize search engine with processed data
search_engine = GlobalSearchEngine(
config=config,
reports=report_data,
entities=entity_data,
response_type=response_format,
)
return search_engine.execute(query=query)
Report Selection Algorithm
Community reports undergo weighted ranking based on entity relevance:
def select_relevant_reports(
community_reports: list[Report],
entities: list[Entity],
minimum_rank: int = 0,
) -> list[Report]:
"""Select community reports meeting rank threshold."""
# Weight reports by entity coverage
weighted_reports = compute_community_weights(
community_reports=community_reports,
entities=entities,
weight_attribute="rank",
normalize=True,
)
# Filter by minimum rank threshold
selected = [
report for report in weighted_reports
if report.rank >= minimum_rank
]
return selected
Map Phase Prompt
The map phase generates individual insights from each community report:
MAP_PROMPT = """
Role: Assistant analyzing provided data tables
Goal: Generate key points responding to user query with importance scores
Output Format:
{
"points": [
{"description": "description [Data: Reports(id)]", "score": 0-100}
]
}
Rules:
- Use data tables as primary context
- Do not fabricate information
- Maximum 5 record IDs per reference
- Preserve modal verbs (shall, may, will)
Context: {context_data}
Query: {user_query}
"""
Reduce Phase Prompt
The reduce phase synthesizes insights from multiple reports:
REDUCE_PROMPT = """
Role: Assistant synthesizing perspectives from multiple analysts
Goal: Generate comprehensive response from ranked analyst reports
Process:
1. Reports are ranked by importance (descending)
2. Remove irrelevant information
3. Merge into coherent answer with proper markdown formatting
4. Preserve original meaning and modal verbs
5. Maintain all data references
Rules:
- Do not mention analyst roles
- Maximum 5 record IDs per reference
- Use markdown sections appropriately
Response Length: {response_format}
Analyst Reports: {report_data}
"""
Local Search Mode
Local search targets specific entities and their immediate contexts, providing detailed information about targeted topics. This mode incorporates additional data sources for granular retrieval:
- Text Units: Original document chunks for direct citation
- Relationships: Entity-to-entity connections with descriptions
- Claims: Extracted assertions with verification status
Local Search Implementation
def execute_local_search(
data_directory: str | None,
root_directory: str | None,
community_level: int,
response_format: str,
query: str,
) -> str:
"""Execute local search with entity-focused retrieval."""
data_path, _, config = initialize_paths(data_directory, root_directory)
# Load all data sources
nodes = pd.read_parquet(data_path / "create_final_nodes.parquet")
reports = pd.read_parquet(data_path / "create_final_community_reports.parquet")
text_units = pd.read_parquet(data_path / "create_final_text_units.parquet")
relationships = pd.read_parquet(data_path / "create_final_relationships.parquet")
entities = pd.read_parquet(data_path / "create_final_entities.parquet")
# Load claims if available
claims_path = data_path / "create_final_covariates.parquet"
claims = load_claims(claims_path) if claims_path.exists() else []
# Build embedding store for entity similarity search
vector_store = create_embedding_store(config.embeddings)
# Process entities with semantic embeddings
entity_data = process_entities(nodes, entities, community_level)
store_entity_embeddings(entity_data, vector_store)
# Initialize local search engine
search_engine = LocalSearchEngine(
config=config,
reports=process_reports(reports, nodes, community_level),
text_units=process_text_units(text_units),
entities=entity_data,
relationships=process_relationships(relationships),
claims={"claims": claims},
embedding_store=vector_store,
response_type=response_format,
)
return search_engine.execute(query=query)
Entity Retrieval via Semantic Similarity
Local search begins by identifying entities semantically related to the query:
def find_matching_entities(
query: str,
embedding_store: BaseVectorStore,
embedder: BaseTextEmbedding,
all_entities: list[Entity],
include_list: list[str] | None = None,
exclude_list: list[str] | None = None,
similarity_k: int = 10,
oversample_factor: int = 2,
) -> list[Entity]:
"""Find entities matching query using embedding similarity."""
if query:
# Search by semantic similarity with oversampling
search_results = embedding_store.similarity_search_by_text(
text=query,
embedder=lambda text: embedder.embed(text),
k=similarity_k * oversample_factor,
)
# Match results to entity objects
matched = []
for result in search_results:
entity = locate_entity(all_entities, "id", result.document.id)
if entity:
matched.append(entity)
else:
# Fallback to rank-based selection
sorted_entities = sorted(
all_entities,
key=lambda e: e.rank or 0,
reverse=True
)
matched = sorted_entities[:similarity_k]
# Apply inclusion/exclusion filters
if exclude_list:
matched = [e for e in matched if e.title not in exclude_list]
# Add explicitly included entities
included = [
entity for name in (include_list or [])
for entity in locate_entities_by_name(all_entities, name)
]
return included + matched
Community Context Assembly
Selected entities guide community report retrieval:
def build_community_context(
selected_entities: list[Entity],
community_reports: dict[int, CommunityReport],
token_encoder,
maximum_tokens: int = 4000,
use_summaries: bool = False,
) -> tuple[str, dict[str, pd.DataFrame]]:
"""Assemble community context from selected entities."""
if not selected_entities or not community_reports:
return ("", {})
# Count entity-community associations
community_hits: dict[int, int] = {}
for entity in selected_entities:
for community_id in entity.community_ids:
community_hits[community_id] = community_hits.get(community_id, 0) + 1
# Sort communities by entity coverage and rank
relevant_communities = [
community_reports[cid]
for cid in community_hits
if cid in community_reports
]
for community in relevant_communities:
community.attributes = community.attributes or {}
community.attributes["matches"] = community_hits[community.id]
relevant_communities.sort(
key=lambda c: (c.attributes["matches"], c.rank),
reverse=True,
)
for community in relevant_communities:
del community.attributes["matches"]
# Build context within token budget
context_text, context_data = construct_community_context(
community_reports=relevant_communities,
token_encoder=token_encoder,
use_community_summary=use_summaries,
max_tokens=maximum_tokens,
single_batch=True,
)
return ("\n\n".join(context_text) if isinstance(context_text, list) else context_text, context_data)
Local Search Prompt
LOCAL_SEARCH_PROMPT = """
Role: Assistant responding to questions using provided data tables
Goal: Generate response of specified length incorporating data and relevant knowledge
Output Requirements:
- Use markdown formatting with appropriate sections
- Include data references: [Data: Dataset (ids)]
- Maximum 5 record IDs per reference
- Do not fabricate information
- State inability to answer if data is insufficient
Reference Format:
"Entity X owns Company Y [Data: Sources (15, 16), Reports (1), Entities (5, 7)]"
Response Length: {response_format}
Data Context: {context_data}
Query: {user_query}
"""
Practical Considerations
GraphRAG's architecture offers significant advantages for complex reasoning tasks but requires careful resource planning. The multi-stage pipeline, particularly entity extraction and community report generation, involves extensive LLM invocations making token consumption a primary cost factor. Query mode selection depends on information needs: global search excels at holistic questions spanning multiple documents, while local search provides detailed entity-focused responses.
The framework's extensibility through custom workflow configurations enables adaptation to diverse domains, though optimal performance typically requires domain-specific tuning of extraction schemas and community detection parameters.