---
title: Streaming na Execução da Crew
description: Transmita saída em tempo real da execução da sua crew no CrewAI
icon: wave-pulse
mode: "wide"
---

## Introdução

O CrewAI fornece a capacidade de transmitir saída em tempo real durante a execução da crew, permitindo que você exiba resultados conforme são gerados, em vez de esperar que todo o processo seja concluído. Este recurso é particularmente útil para construir aplicações interativas, fornecer feedback ao usuário e monitorar processos de longa duração.

## Como o Streaming Funciona

Quando o streaming está ativado, o CrewAI captura respostas do LLM e chamadas de ferramentas conforme acontecem, empacotando-as em chunks estruturados que incluem contexto sobre qual task e agent está executando. Você pode iterar sobre esses chunks em tempo real e acessar o resultado final quando a execução for concluída.

## Ativando o Streaming

Para ativar o streaming, defina o parâmetro `stream` como `True` ao criar sua crew:

```python Code
from crewai import Agent, Crew, Task

# Crie seus agentes e tasks
researcher = Agent(
    role="Research Analyst",
    goal="Gather comprehensive information on topics",
    backstory="You are an experienced researcher with excellent analytical skills.",
)

task = Task(
    description="Research the latest developments in AI",
    expected_output="A detailed report on recent AI advancements",
    agent=researcher,
)

# Ativar streaming
crew = Crew(
    agents=[researcher],
    tasks=[task],
    stream=True  # Ativar saída em streaming
)
```

## Streaming Síncrono

Quando você chama `kickoff()` em uma crew com streaming ativado, ele retorna um objeto `CrewStreamingOutput` que você pode iterar para receber chunks conforme chegam:

```python Code
# Iniciar execução com streaming
streaming = crew.kickoff(inputs={"topic": "artificial intelligence"})

# Iterar sobre chunks conforme chegam
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# Acessar o resultado final após o streaming completar
result = streaming.result
print(f"\n\nSaída final: {result.raw}")
```

### Informações do Chunk de Stream

Cada chunk fornece contexto rico sobre a execução:

```python Code
streaming = crew.kickoff(inputs={"topic": "AI"})

for chunk in streaming:
    print(f"Task: {chunk.task_name} (índice {chunk.task_index})")
    print(f"Agent: {chunk.agent_role}")
    print(f"Content: {chunk.content}")
    print(f"Type: {chunk.chunk_type}")  # TEXT ou TOOL_CALL
    if chunk.tool_call:
        print(f"Tool: {chunk.tool_call.tool_name}")
        print(f"Arguments: {chunk.tool_call.arguments}")
```

### Acessando Resultados do Streaming

O objeto `CrewStreamingOutput` fornece várias propriedades úteis:

```python Code
streaming = crew.kickoff(inputs={"topic": "AI"})

# Iterar e coletar chunks
for chunk in streaming:
    print(chunk.content, end="", flush=True)

# Após a iteração completar
print(f"\nCompletado: {streaming.is_completed}")
print(f"Texto completo: {streaming.get_full_text()}")
print(f"Todos os chunks: {len(streaming.chunks)}")
print(f"Resultado final: {streaming.result.raw}")
```

## Streaming Assíncrono

Para aplicações assíncronas, você pode usar `akickoff()` (async nativo) ou `kickoff_async()` (baseado em threads) com iteração assíncrona:

### Async Nativo com `akickoff()`

O método `akickoff()` fornece execução async nativa verdadeira em toda a cadeia:

```python Code
import asyncio

async def stream_crew():
    crew = Crew(
        agents=[researcher],
        tasks=[task],
        stream=True
    )

    # Iniciar streaming async nativo
    streaming = await crew.akickoff(inputs={"topic": "AI"})

    # Iteração assíncrona sobre chunks
    async for chunk in streaming:
        print(chunk.content, end="", flush=True)

    # Acessar resultado final
    result = streaming.result
    print(f"\n\nSaída final: {result.raw}")

asyncio.run(stream_crew())
```

### Async Baseado em Threads com `kickoff_async()`

Para integração async mais simples ou compatibilidade retroativa:

```python Code
import asyncio

async def stream_crew():
    crew = Crew(
        agents=[researcher],
        tasks=[task],
        stream=True
    )

    # Iniciar streaming async baseado em threads
    streaming = await crew.kickoff_async(inputs={"topic": "AI"})

    # Iteração assíncrona sobre chunks
    async for chunk in streaming:
        print(chunk.content, end="", flush=True)

    # Acessar resultado final
    result = streaming.result
    print(f"\n\nSaída final: {result.raw}")

asyncio.run(stream_crew())
```

<Note>
Para cargas de trabalho de alta concorrência, `akickoff()` é recomendado pois usa async nativo para execução de tasks, operações de memória e recuperação de conhecimento. Consulte o guia [Iniciar Crew de Forma Assíncrona](/pt-BR/learn/kickoff-async) para mais detalhes.
</Note>

## Streaming com kickoff_for_each

Ao executar uma crew para múltiplas entradas com `kickoff_for_each()`, o streaming funciona de forma diferente dependendo se você usa síncrono ou assíncrono:

### kickoff_for_each Síncrono

Com `kickoff_for_each()` síncrono, você obtém uma lista de objetos `CrewStreamingOutput`, um para cada entrada:

```python Code
crew = Crew(
    agents=[researcher],
    tasks=[task],
    stream=True
)

inputs_list = [
    {"topic": "AI in healthcare"},
    {"topic": "AI in finance"}
]

# Retorna lista de saídas de streaming
streaming_outputs = crew.kickoff_for_each(inputs=inputs_list)

# Iterar sobre cada saída de streaming
for i, streaming in enumerate(streaming_outputs):
    print(f"\n=== Entrada {i + 1} ===")
    for chunk in streaming:
        print(chunk.content, end="", flush=True)

    result = streaming.result
    print(f"\n\nResultado {i + 1}: {result.raw}")
```

### kickoff_for_each_async Assíncrono

Com `kickoff_for_each_async()` assíncrono, você obtém um único `CrewStreamingOutput` que produz chunks de todas as crews conforme chegam concorrentemente:

```python Code
import asyncio

async def stream_multiple_crews():
    crew = Crew(
        agents=[researcher],
        tasks=[task],
        stream=True
    )

    inputs_list = [
        {"topic": "AI in healthcare"},
        {"topic": "AI in finance"}
    ]

    # Retorna saída de streaming única para todas as crews
    streaming = await crew.kickoff_for_each_async(inputs=inputs_list)

    # Chunks de todas as crews chegam conforme são gerados
    async for chunk in streaming:
        print(f"[{chunk.task_name}] {chunk.content}", end="", flush=True)

    # Acessar todos os resultados
    results = streaming.results  # Lista de objetos CrewOutput
    for i, result in enumerate(results):
        print(f"\n\nResultado {i + 1}: {result.raw}")

asyncio.run(stream_multiple_crews())
```

## Tipos de Chunk de Stream

Chunks podem ser de diferentes tipos, indicados pelo campo `chunk_type`:

### Chunks TEXT

Conteúdo de texto padrão de respostas do LLM:

```python Code
for chunk in streaming:
    if chunk.chunk_type == StreamChunkType.TEXT:
        print(chunk.content, end="", flush=True)
```

### Chunks TOOL_CALL

Informações sobre chamadas de ferramentas sendo feitas:

```python Code
for chunk in streaming:
    if chunk.chunk_type == StreamChunkType.TOOL_CALL:
        print(f"\nChamando ferramenta: {chunk.tool_call.tool_name}")
        print(f"Argumentos: {chunk.tool_call.arguments}")
```

## Exemplo Prático: Construindo uma UI com Streaming

Aqui está um exemplo completo mostrando como construir uma aplicação interativa com streaming:

```python Code
import asyncio
from crewai import Agent, Crew, Task
from crewai.types.streaming import StreamChunkType

async def interactive_research():
    # Criar crew com streaming ativado
    researcher = Agent(
        role="Research Analyst",
        goal="Provide detailed analysis on any topic",
        backstory="You are an expert researcher with broad knowledge.",
    )

    task = Task(
        description="Research and analyze: {topic}",
        expected_output="A comprehensive analysis with key insights",
        agent=researcher,
    )

    crew = Crew(
        agents=[researcher],
        tasks=[task],
        stream=True,
        verbose=False
    )

    # Obter entrada do usuário
    topic = input("Digite um tópico para pesquisar: ")

    print(f"\n{'='*60}")
    print(f"Pesquisando: {topic}")
    print(f"{'='*60}\n")

    # Iniciar execução com streaming
    streaming = await crew.kickoff_async(inputs={"topic": topic})

    current_task = ""
    async for chunk in streaming:
        # Mostrar transições de task
        if chunk.task_name != current_task:
            current_task = chunk.task_name
            print(f"\n[{chunk.agent_role}] Trabalhando em: {chunk.task_name}")
            print("-" * 60)

        # Exibir chunks de texto
        if chunk.chunk_type == StreamChunkType.TEXT:
            print(chunk.content, end="", flush=True)

        # Exibir chamadas de ferramentas
        elif chunk.chunk_type == StreamChunkType.TOOL_CALL and chunk.tool_call:
            print(f"\n🔧 Usando ferramenta: {chunk.tool_call.tool_name}")

    # Mostrar resultado final
    result = streaming.result
    print(f"\n\n{'='*60}")
    print("Análise Completa!")
    print(f"{'='*60}")
    print(f"\nUso de Tokens: {result.token_usage}")

asyncio.run(interactive_research())
```

## Casos de Uso

O streaming é particularmente valioso para:

- **Aplicações Interativas**: Fornecer feedback em tempo real aos usuários enquanto os agentes trabalham
- **Tasks de Longa Duração**: Mostrar progresso para pesquisa, análise ou geração de conteúdo
- **Depuração e Monitoramento**: Observar comportamento e tomada de decisão dos agentes em tempo real
- **Experiência do Usuário**: Reduzir latência percebida mostrando resultados incrementais
- **Dashboards ao Vivo**: Construir interfaces de monitoramento que exibem status de execução da crew

## Cancelamento e Limpeza de Recursos

`CrewStreamingOutput` suporta cancelamento gracioso para que o trabalho em andamento pare imediatamente quando o consumidor desconecta.

### Gerenciador de Contexto Assíncrono

```python Code
streaming = await crew.akickoff(inputs={"topic": "AI"})

async with streaming:
    async for chunk in streaming:
        print(chunk.content, end="", flush=True)
```

### Cancelamento Explícito

```python Code
streaming = await crew.akickoff(inputs={"topic": "AI"})
try:
    async for chunk in streaming:
        print(chunk.content, end="", flush=True)
finally:
    await streaming.aclose()  # assíncrono
    # streaming.close()       # equivalente síncrono
```

Após o cancelamento, `streaming.is_cancelled` e `streaming.is_completed` são ambos `True`. Tanto `aclose()` quanto `close()` são idempotentes.

## Notas Importantes

- O streaming ativa automaticamente o streaming do LLM para todos os agentes na crew
- Você deve iterar através de todos os chunks antes de acessar a propriedade `.result`
- Para `kickoff_for_each_async()` com streaming, use `.results` (plural) para obter todas as saídas
- O streaming adiciona overhead mínimo e pode realmente melhorar a performance percebida
- Cada chunk inclui contexto completo (task, agente, tipo de chunk) para UIs ricas

## Tratamento de Erros

Trate erros durante a execução com streaming:

```python Code
streaming = crew.kickoff(inputs={"topic": "AI"})

try:
    for chunk in streaming:
        print(chunk.content, end="", flush=True)

    result = streaming.result
    print(f"\nSucesso: {result.raw}")

except Exception as e:
    print(f"\nErro durante o streaming: {e}")
    if streaming.is_completed:
        print("O streaming foi completado mas ocorreu um erro")
```

Ao aproveitar o streaming, você pode construir aplicações mais responsivas e interativas com o CrewAI, fornecendo aos usuários visibilidade em tempo real da execução dos agentes e resultados.