Skip to content

Map-Reduce Agents

Overview

The map-reduce pattern enables parallel agent execution with result aggregation. This pattern is ideal for tasks that can be split into independent subtasks, processed in parallel, and then synthesized into a final result.

Architecture

When to Use

Use map-reduce when:

  • Tasks are parallelizable: Subtasks can be completed independently
  • Speed matters: Parallel execution is faster than sequential
  • Scale is needed: Large workloads benefit from distributed processing
  • No interdependencies: Workers don't need to coordinate with each other

Common Use Cases

1. Document Analysis

Each worker analyzes a section of a document, and the reducer synthesizes findings.

python
task = "Analyze this research paper in detail"
# Mapper splits into: intro, methods, results, conclusion
# Workers analyze their sections in parallel
# Reducer combines all analyses

2. Multi-Perspective Analysis

Each worker takes a different perspective on the same topic.

python
task = "Analyze AI impact on society"
# Mapper creates: economic, ethical, technological perspectives
# Workers each analyze from their perspective
# Reducer synthesizes all perspectives

3. Large-Scale Data Processing

Each worker processes a chunk of data independently.

python
task = "Process customer feedback from Q4"
# Mapper splits by category or time period
# Workers process their chunks
# Reducer aggregates insights

Key Components

1. State Schema

python
from typing import Annotated
from typing_extensions import TypedDict
import operator

class MapReduceState(TypedDict):
    task: str                                        # Main task
    subtasks: list[str]                              # Created by mapper
    worker_results: Annotated[list[dict], operator.add]  # Accumulated outputs
    final_result: str                                # Aggregated result

2. Mapper Node

The mapper splits the task into independent subtasks:

python
from pydantic import BaseModel, Field

class MapperOutput(BaseModel):
    subtasks: list[str] = Field(description="List of independent subtasks")
    reasoning: str = Field(description="Explanation of split")

def create_mapper_node(llm, num_workers=3):
    structured_llm = llm.with_structured_output(MapperOutput)

    def mapper(state):
        output = structured_llm.invoke([
            SystemMessage(content=f"Split into {num_workers} subtasks"),
            HumanMessage(content=f"Task: {state['task']}")
        ])
        return {"subtasks": output.subtasks[:num_workers]}

    return mapper

3. Worker Nodes

Workers process subtasks independently:

python
def create_worker_node(llm, worker_id, worker_prompt=""):
    def worker(state):
        subtask = state["subtasks"][worker_id]

        response = llm.invoke([
            SystemMessage(content=f"Process your subtask. {worker_prompt}"),
            HumanMessage(content=f"Subtask: {subtask}")
        ])

        return {
            "worker_results": [{
                "worker_id": worker_id,
                "subtask": subtask,
                "output": response.content,
            }]
        }

    return worker

4. Reducer Node

The reducer aggregates all worker results:

python
class ReducerOutput(BaseModel):
    final_result: str = Field(description="Synthesized result")
    summary: str = Field(description="Brief summary")

def create_reducer_node(llm):
    structured_llm = llm.with_structured_output(ReducerOutput)

    def reducer(state):
        # Combine all worker outputs
        results_text = "\n\n".join([
            f"Worker {r['worker_id']}: {r['output']}"
            for r in state["worker_results"]
        ])

        output = structured_llm.invoke([
            SystemMessage(content="Synthesize all results"),
            HumanMessage(content=f"Results:\n{results_text}")
        ])

        return {"final_result": output.final_result}

    return reducer

Graph Construction

Basic Graph

python
from langgraph.graph import StateGraph, START, END

workflow = StateGraph(MapReduceState)

# Add nodes
workflow.add_node("mapper", create_mapper_node(llm, num_workers=3))
for i in range(3):
    workflow.add_node(f"worker_{i}", create_worker_node(llm, i))
workflow.add_node("reducer", create_reducer_node(llm))

# Build structure
workflow.add_edge(START, "mapper")

# Fan-out: mapper to all workers (parallel)
for i in range(3):
    workflow.add_edge("mapper", f"worker_{i}")

# Fan-in: all workers to reducer
for i in range(3):
    workflow.add_edge(f"worker_{i}", "reducer")

workflow.add_edge("reducer", END)

graph = workflow.compile()

Execution

python
result = graph.invoke({
    "task": "Analyze this document comprehensively",
    "subtasks": [],
    "worker_results": [],
    "final_result": "",
})

print(result["final_result"])

Advanced Patterns

Specialized Workers

Give each worker a specific role:

python
worker_prompts = [
    "Focus on technical aspects",
    "Focus on business impact",
    "Focus on user experience",
]

for i in range(3):
    workflow.add_node(
        f"worker_{i}",
        create_worker_node(llm, i, worker_prompts[i])
    )

Custom LLMs per Role

Use different models for different roles:

python
from langgraph_ollama_local.patterns import create_custom_map_reduce_graph

graph = create_custom_map_reduce_graph(
    mapper_llm=ChatOllama(model="llama3.1:70b"),    # Larger for planning
    worker_llm=ChatOllama(model="llama3.1:8b"),     # Smaller for processing
    reducer_llm=ChatOllama(model="llama3.1:70b"),   # Larger for synthesis
    num_workers=5
)

Dynamic Worker Count

Adjust workers based on task complexity:

python
def determine_workers(task: str) -> int:
    """Determine optimal worker count based on task."""
    # Simple heuristic: more workers for longer tasks
    if len(task) > 1000:
        return 5
    elif len(task) > 500:
        return 3
    else:
        return 2

num_workers = determine_workers(task)
graph = create_map_reduce_graph(llm, num_workers=num_workers)

Pattern Comparison

Map-Reduce vs Supervisor Pattern

AspectMap-ReduceSupervisor
ExecutionParallel workersSequential agents
CoordinationNone (independent)High (supervisor decides)
Use CaseIndependent subtasksInterdependent tasks
SpeedFast (parallel)Slower (sequential)
ComplexitySimple (fan-out/fan-in)Complex (routing logic)

Map-Reduce vs Hierarchical Teams

AspectMap-ReduceHierarchical
StructureFlat (mapper-workers-reducer)Nested (teams and supervisors)
CoordinationMinimalHigh (multi-level)
ScalabilityHorizontal (add workers)Vertical (add teams)
ComplexityLowHigh

Best Practices

1. Task Decomposition

Ensure subtasks are truly independent:

python
# Good: Independent sections
subtasks = [
    "Analyze introduction section",
    "Analyze methodology section",
    "Analyze results section",
]

# Bad: Sequential dependencies
subtasks = [
    "Read the document",
    "Based on reading, identify themes",  # Depends on first
    "Based on themes, write summary",     # Depends on second
]

2. Load Balancing

Create subtasks of similar complexity:

python
# Good: Balanced subtasks
subtasks = [
    "Analyze chapters 1-3",
    "Analyze chapters 4-6",
    "Analyze chapters 7-9",
]

# Bad: Unbalanced
subtasks = [
    "Analyze chapter 1",
    "Analyze chapters 2-9",  # Much more work
]

3. Error Handling

Handle cases where workers fail:

python
def reducer_with_error_handling(state):
    results = state.get("worker_results", [])

    if not results:
        return {"final_result": "No results available"}

    # Check for incomplete results
    if len(results) < expected_workers:
        partial_note = f"Note: Only {len(results)}/{expected_workers} workers completed."
    else:
        partial_note = ""

    # Synthesize available results
    synthesis = synthesize_results(results)

    return {"final_result": f"{synthesis}\n\n{partial_note}"}

4. Result Quality

Ensure reducer does true synthesis, not just concatenation:

python
# Good: Synthesis with analysis
reducer_prompt = """
Synthesize the worker results by:
1. Identifying common themes across all outputs
2. Resolving any conflicts or inconsistencies
3. Organizing information logically
4. Providing integrated insights
"""

# Bad: Simple concatenation
# Don't just join worker outputs with "\n\n".join()

Performance Considerations

Worker Count

More workers ≠ always better:

  • Too few: Underutilized parallelism
  • Too many: Overhead, diminishing returns
  • Sweet spot: Usually 3-5 workers for most tasks

LLM Selection

Choose models based on role requirements:

python
# Mapper: Needs good reasoning (larger model)
mapper_llm = ChatOllama(model="llama3.1:70b")

# Workers: Can use smaller models (parallel execution)
worker_llm = ChatOllama(model="llama3.1:8b")

# Reducer: Needs synthesis ability (larger model)
reducer_llm = ChatOllama(model="llama3.1:70b")

Complete Example

python
from langgraph_ollama_local import LocalAgentConfig
from langgraph_ollama_local.patterns import (
    create_map_reduce_graph,
    run_map_reduce_task,
)

# Setup
config = LocalAgentConfig()
llm = config.create_chat_client()

# Create graph
graph = create_map_reduce_graph(
    llm,
    num_workers=3,
    worker_prompt="Provide detailed analysis with examples."
)

# Run task
result = run_map_reduce_task(
    graph,
    """Analyze the environmental, economic, and social impacts
    of renewable energy adoption in developing countries."""
)

print(f"Processed {len(result['worker_results'])} subtasks")
print(f"\nFinal Result:\n{result['final_result']}")

Quiz

Test your understanding of map-reduce agents:

Knowledge Check

What is the primary advantage of the map-reduce pattern?

AIt uses less memory than other patterns
BIt enables parallel processing of independent subtasks for faster execution
CIt requires fewer agents than supervisor patterns
DIt automatically optimizes LLM model selection

Knowledge Check

What is the role of the mapper in the map-reduce pattern?

AAggregate and synthesize all worker results
BExecute all subtasks sequentially
CSplit the main task into independent subtasks that can be processed in parallel
DMonitor worker performance and handle errors

Knowledge Check

What is the recommended worker count for most map-reduce tasks?

A1-2 workers
B3-5 workers
C10-20 workers
DAs many workers as possible for maximum parallelism

Knowledge Check

What is a key best practice for task decomposition in map-reduce?

ACreate subtasks with sequential dependencies for better flow
BEnsure subtasks are truly independent with no dependencies between them
CMake the first subtask much larger to front-load work
DAlways split into exactly 3 subtasks regardless of task size

Knowledge Check

What should the reducer do with worker results?

ASimply concatenate all worker outputs with newlines
BPick the best single worker output and discard others
CSynthesize results by identifying themes, resolving conflicts, and providing integrated insights
DAverage the numeric scores from each worker