---
title: Streaming Flow Execution
description: Stream real-time output from your CrewAI flow execution
icon: wave-pulse
mode: "wide"
---

## Introduction

CrewAI Flows support streaming output, allowing you to receive real-time updates as your flow executes. This feature enables you to build responsive applications that display results incrementally, provide live progress updates, and create better user experiences for long-running workflows.

## How Flow Streaming Works

When streaming is enabled on a Flow, CrewAI captures and streams output from any crews or LLM calls within the flow. The stream delivers structured chunks containing the content, task context, and agent information as execution progresses.

## Enabling Streaming

To enable streaming, set the `stream` attribute to `True` on your Flow class:

```python Code
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task

class ResearchFlow(Flow):
    stream = True  # Enable streaming for the entire flow

    @start()
    def initialize(self):
        return {"topic": "AI trends"}

    @listen(initialize)
    def research_topic(self, data):
        researcher = Agent(
            role="Research Analyst",
            goal="Research topics thoroughly",
            backstory="Expert researcher with analytical skills",
        )

        task = Task(
            description="Research {topic} and provide insights",
            expected_output="Detailed research findings",
            agent=researcher,
        )

        crew = Crew(
            agents=[researcher],
            tasks=[task],
        )

        return crew.kickoff(inputs=data)
```

## Synchronous Streaming

When you call `kickoff()` on a flow with streaming enabled, it returns a `FlowStreamingOutput` object that you can iterate over:

```python Code
flow = ResearchFlow()

# Start streaming execution
streaming = flow.kickoff()

# Iterate over chunks as they arrive
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# Access the final result after streaming completes
result = streaming.result
print(f"\n\nFinal output: {result}")
```

### Stream Chunk Information

Each chunk provides context about where it originated in the flow:

```python Code
streaming = flow.kickoff()

for chunk in streaming:
    print(f"Agent: {chunk.agent_role}")
    print(f"Task: {chunk.task_name}")
    print(f"Content: {chunk.content}")
    print(f"Type: {chunk.chunk_type}")  # TEXT or TOOL_CALL
```

### Accessing Streaming Properties

The `FlowStreamingOutput` object provides useful properties and methods:

```python Code
streaming = flow.kickoff()

# Iterate and collect chunks
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# After iteration completes
print(f"\nCompleted: {streaming.is_completed}")
print(f"Full text: {streaming.get_full_text()}")
print(f"Total chunks: {len(streaming.chunks)}")
print(f"Final result: {streaming.result}")
```

## Asynchronous Streaming

For async applications, use `kickoff_async()` with async iteration:

```python Code
import asyncio

async def stream_flow():
    flow = ResearchFlow()

    # Start async streaming
    streaming = await flow.kickoff_async()

    # Async iteration over chunks
    async for chunk in streaming:
        print(chunk.content, end="", flush=True)

    # Access final result
    result = streaming.result
    print(f"\n\nFinal output: {result}")

asyncio.run(stream_flow())
```

## Streaming with Multi-Step Flows

Streaming works seamlessly across multiple flow steps, including flows that execute multiple crews:

```python Code
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task

class MultiStepFlow(Flow):
    stream = True

    @start()
    def research_phase(self):
        """First crew: Research the topic."""
        researcher = Agent(
            role="Research Analyst",
            goal="Gather comprehensive information",
            backstory="Expert at finding relevant information",
        )

        task = Task(
            description="Research AI developments in healthcare",
            expected_output="Research findings on AI in healthcare",
            agent=researcher,
        )

        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()

        self.state["research"] = result.raw
        return result.raw

    @listen(research_phase)
    def analysis_phase(self, research_data):
        """Second crew: Analyze the research."""
        analyst = Agent(
            role="Data Analyst",
            goal="Analyze information and extract insights",
            backstory="Expert at identifying patterns and trends",
        )

        task = Task(
            description="Analyze this research: {research}",
            expected_output="Key insights and trends",
            agent=analyst,
        )

        crew = Crew(agents=[analyst], tasks=[task])
        return crew.kickoff(inputs={"research": research_data})


# Stream across both phases
flow = MultiStepFlow()
streaming = flow.kickoff()

current_step = ""
for chunk in streaming:
    # Track which flow step is executing
    if chunk.task_name != current_step:
        current_step = chunk.task_name
        print(f"\n\n=== {chunk.task_name} ===\n")

    print(chunk.content, end="", flush=True)

result = streaming.result
print(f"\n\nFinal analysis: {result}")
```

## Practical Example: Progress Dashboard

Here's a complete example showing how to build a progress dashboard with streaming:

```python Code
import asyncio
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType

class ResearchPipeline(Flow):
    stream = True

    @start()
    def gather_data(self):
        researcher = Agent(
            role="Data Gatherer",
            goal="Collect relevant information",
            backstory="Skilled at finding quality sources",
        )

        task = Task(
            description="Gather data on renewable energy trends",
            expected_output="Collection of relevant data points",
            agent=researcher,
        )

        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()
        self.state["data"] = result.raw
        return result.raw

    @listen(gather_data)
    def analyze_data(self, data):
        analyst = Agent(
            role="Data Analyst",
            goal="Extract meaningful insights",
            backstory="Expert at data analysis",
        )

        task = Task(
            description="Analyze: {data}",
            expected_output="Key insights and trends",
            agent=analyst,
        )

        crew = Crew(agents=[analyst], tasks=[task])
        return crew.kickoff(inputs={"data": data})


async def run_with_dashboard():
    flow = ResearchPipeline()

    print("="*60)
    print("RESEARCH PIPELINE DASHBOARD")
    print("="*60)

    streaming = await flow.kickoff_async()

    current_agent = ""
    current_task = ""
    chunk_count = 0

    async for chunk in streaming:
        chunk_count += 1

        # Display phase transitions
        if chunk.task_name != current_task:
            current_task = chunk.task_name
            current_agent = chunk.agent_role
            print(f"\n\n📋 Phase: {current_task}")
            print(f"👤 Agent: {current_agent}")
            print("-" * 60)

        # Display text output
        if chunk.chunk_type == StreamChunkType.TEXT:
            print(chunk.content, end="", flush=True)

        # Display tool usage
        elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
            print(f"\n🔧 Tool: {chunk.tool_call.tool_name}")

    # Show completion summary
    result = streaming.result
    print(f"\n\n{'='*60}")
    print("PIPELINE COMPLETE")
    print(f"{'='*60}")
    print(f"Total chunks: {chunk_count}")
    print(f"Final output length: {len(str(result))} characters")

asyncio.run(run_with_dashboard())
```

## Streaming with State Management

Streaming works naturally with Flow state management:

```python Code
from pydantic import BaseModel

class AnalysisState(BaseModel):
    topic: str = ""
    research: str = ""
    insights: str = ""

class StatefulStreamingFlow(Flow[AnalysisState]):
    stream = True

    @start()
    def research(self):
        # State is available during streaming
        topic = self.state.topic
        print(f"Researching: {topic}")

        researcher = Agent(
            role="Researcher",
            goal="Research topics thoroughly",
            backstory="Expert researcher",
        )

        task = Task(
            description=f"Research {topic}",
            expected_output="Research findings",
            agent=researcher,
        )

        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()

        self.state.research = result.raw
        return result.raw

    @listen(research)
    def analyze(self, research):
        # Access updated state
        print(f"Analyzing {len(self.state.research)} chars of research")

        analyst = Agent(
            role="Analyst",
            goal="Extract insights",
            backstory="Expert analyst",
        )

        task = Task(
            description="Analyze: {research}",
            expected_output="Key insights",
            agent=analyst,
        )

        crew = Crew(agents=[analyst], tasks=[task])
        result = crew.kickoff(inputs={"research": research})

        self.state.insights = result.raw
        return result.raw


# Run with streaming
flow = StatefulStreamingFlow()
streaming = flow.kickoff(inputs={"topic": "quantum computing"})

for chunk in streaming:
    print(chunk.content, end="", flush=True)

result = streaming.result
print(f"\n\nFinal state:")
print(f"Topic: {flow.state.topic}")
print(f"Research length: {len(flow.state.research)}")
print(f"Insights length: {len(flow.state.insights)}")
```

## Use Cases

Flow streaming is particularly valuable for:

- **Multi-Stage Workflows**: Show progress across research, analysis, and synthesis phases
- **Complex Pipelines**: Provide visibility into long-running data processing flows
- **Interactive Applications**: Build responsive UIs that display intermediate results
- **Monitoring and Debugging**: Observe flow execution and crew interactions in real-time
- **Progress Tracking**: Show users which stage of the workflow is currently executing
- **Live Dashboards**: Create monitoring interfaces for production flows

## Stream Chunk Types

Like crew streaming, flow chunks can be of different types:

### TEXT Chunks

Standard text content from LLM responses:

```python Code
for chunk in streaming:
    if chunk.chunk_type == StreamChunkType.TEXT:
        print(chunk.content, end="", flush=True)
```

### TOOL_CALL Chunks

Information about tool calls within the flow:

```python Code
for chunk in streaming:
    if chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
        print(f"\nTool: {chunk.tool_call.tool_name}")
        print(f"Args: {chunk.tool_call.arguments}")
```

## Error Handling

Handle errors gracefully during streaming:

```python Code
flow = ResearchFlow()
streaming = flow.kickoff()

try:
    for chunk in streaming:
        print(chunk.content, end="", flush=True)

    result = streaming.result
    print(f"\nSuccess! Result: {result}")

except Exception as e:
    print(f"\nError during flow execution: {e}")
    if streaming.is_completed:
        print("Streaming completed but flow encountered an error")
```

## Cancellation and Resource Cleanup

`FlowStreamingOutput` supports graceful cancellation so that in-flight work stops promptly when the consumer disconnects.

### Async Context Manager

```python Code
streaming = await flow.kickoff_async()

async with streaming:
    async for chunk in streaming:
        print(chunk.content, end="", flush=True)
```

### Explicit Cancellation

```python Code
streaming = await flow.kickoff_async()
try:
    async for chunk in streaming:
        print(chunk.content, end="", flush=True)
finally:
    await streaming.aclose()  # async
    # streaming.close()       # sync equivalent
```

After cancellation, `streaming.is_cancelled` and `streaming.is_completed` are both `True`. Both `aclose()` and `close()` are idempotent.

## Important Notes

- Streaming automatically enables LLM streaming for any crews used within the flow
- You must iterate through all chunks before accessing the `.result` property
- Streaming works with both structured and unstructured flow state
- Flow streaming captures output from all crews and LLM calls in the flow
- Each chunk includes context about which agent and task generated it
- Streaming adds minimal overhead to flow execution

## Combining with Flow Visualization

You can combine streaming with flow visualization to provide a complete picture:

```python Code
# Generate flow visualization
flow = ResearchFlow()
flow.plot("research_flow")  # Creates HTML visualization

# Run with streaming
streaming = flow.kickoff()
for chunk in streaming:
    print(chunk.content, end="", flush=True)

result = streaming.result
print(f"\nFlow complete! View structure at: research_flow.html")
```

By leveraging flow streaming, you can build sophisticated, responsive applications that provide users with real-time visibility into complex multi-stage workflows, making your AI automations more transparent and engaging.