Skip to content

Flows:事件驱动的工作流

概述

Flows 是 CrewAI 提供的事件驱动工作流系统,用于构建需要精细控制的生产级自动化。与 Crews 的自主协作不同,Flows 提供确定性的执行路径和状态管理。

Flows

Crews vs Flows

特性CrewsFlows
控制方式自主协作精细控制
执行路径动态决策确定性
状态管理隐式显式
适用场景灵活任务复杂业务流程
调试难度较难较易

核心装饰器

@start - 启动点

标记流程的入口方法:

python
from crewai.flow.flow import Flow, start

class MyFlow(Flow):
    @start()
    def begin(self):
        """流程入口"""
        return {"status": "started"}

@listen - 监听器

监听其他方法的输出:

python
from crewai.flow.flow import Flow, start, listen

class MyFlow(Flow):
    @start()
    def fetch_data(self):
        return {"data": [1, 2, 3]}

    @listen(fetch_data)
    def process_data(self, data):
        """在 fetch_data 完成后执行"""
        return {"processed": sum(data["data"])}

@router - 路由器

根据条件路由到不同分支:

python
from crewai.flow.flow import Flow, start, listen, router

class MyFlow(Flow):
    @start()
    def check_status(self):
        return {"score": 85}

    @router(check_status)
    def route_decision(self):
        """根据分数路由"""
        if self.state.score >= 80:
            return "high_score"
        elif self.state.score >= 60:
            return "medium_score"
        return "low_score"

    @listen("high_score")
    def handle_high(self):
        return "Excellent!"

    @listen("medium_score")
    def handle_medium(self):
        return "Good job!"

    @listen("low_score")
    def handle_low(self):
        return "Keep trying!"

条件运算符

or_ - 任一条件

当任一指定条件满足时触发:

python
from crewai.flow.flow import Flow, listen, or_

class MyFlow(Flow):
    @listen(or_("task_a", "task_b"))
    def handle_either(self):
        """task_a 或 task_b 完成时触发"""
        pass

and_ - 全部条件

当所有指定条件都满足时触发:

python
from crewai.flow.flow import Flow, listen, and_

class MyFlow(Flow):
    @listen(and_("task_a", "task_b"))
    def handle_both(self):
        """task_a 和 task_b 都完成时触发"""
        pass

状态管理

基本状态

python
from crewai.flow.flow import Flow, start, listen

class MyFlow(Flow):
    @start()
    def init_state(self):
        self.state["counter"] = 0
        return self.state

    @listen(init_state)
    def increment(self):
        self.state["counter"] += 1
        return self.state

结构化状态(推荐)

使用 Pydantic 模型定义类型安全的状态:

python
from pydantic import BaseModel
from crewai.flow.flow import Flow, start, listen

class AnalysisState(BaseModel):
    """类型安全的状态定义"""
    data: list = []
    processed: bool = False
    result: str = ""
    confidence: float = 0.0

class AnalysisFlow(Flow[AnalysisState]):
    @start()
    def fetch_data(self):
        self.state.data = [1, 2, 3, 4, 5]
        return self.state

    @listen(fetch_data)
    def process(self):
        self.state.result = str(sum(self.state.data))
        self.state.processed = True
        self.state.confidence = 0.95
        return self.state

集成 Crews

Flows 的真正威力在于与 Crews 结合:

python
from crewai.flow.flow import Flow, start, listen, router
from crewai import Crew, Agent, Task, Process
from pydantic import BaseModel

class MarketState(BaseModel):
    sentiment: str = "neutral"
    confidence: float = 0.0
    recommendations: list = []

class MarketAnalysisFlow(Flow[MarketState]):

    @start()
    def fetch_market_data(self):
        """获取市场数据"""
        self.state.sentiment = "analyzing"
        return {"sector": "tech", "timeframe": "1W"}

    @listen(fetch_market_data)
    def analyze_with_crew(self, market_data):
        """使用 Crew 进行分析"""
        analyst = Agent(
            role="Senior Market Analyst",
            goal="Conduct deep market analysis",
            backstory="Veteran analyst known for identifying patterns"
        )

        researcher = Agent(
            role="Data Researcher",
            goal="Gather and validate market data",
            backstory="Expert at correlating data sources"
        )

        analysis_task = Task(
            description="Analyze {sector} sector for {timeframe}",
            expected_output="Detailed analysis with confidence score",
            agent=analyst
        )

        research_task = Task(
            description="Validate the analysis with data",
            expected_output="Supporting evidence",
            agent=researcher
        )

        crew = Crew(
            agents=[analyst, researcher],
            tasks=[analysis_task, research_task],
            process=Process.sequential,
            verbose=True
        )

        result = crew.kickoff(inputs=market_data)
        self.state.confidence = 0.85  # 从结果中提取
        return result

    @router(analyze_with_crew)
    def determine_action(self):
        """根据置信度决定下一步"""
        if self.state.confidence > 0.8:
            return "high_confidence"
        elif self.state.confidence > 0.5:
            return "medium_confidence"
        return "low_confidence"

    @listen("high_confidence")
    def execute_strategy(self):
        """高置信度:执行策略"""
        strategy_crew = Crew(
            agents=[Agent(role="Strategy Expert", goal="Develop strategy")],
            tasks=[Task(
                description="Create action plan",
                expected_output="Step-by-step plan"
            )]
        )
        return strategy_crew.kickoff()

    @listen(or_("medium_confidence", "low_confidence"))
    def gather_more_data(self):
        """低/中置信度:收集更多数据"""
        self.state.recommendations.append("Gather more data")
        return "Additional analysis required"

流程图示例

简单线性流程

Flow 1

python
class LinearFlow(Flow):
    @start()
    def step1(self):
        return "Step 1 done"

    @listen(step1)
    def step2(self, result):
        return "Step 2 done"

    @listen(step2)
    def step3(self, result):
        return "Final result"

条件分支流程

Flow 2

python
class BranchingFlow(Flow):
    @start()
    def evaluate(self):
        return {"value": 75}

    @router(evaluate)
    def route(self):
        if self.state["value"] > 80:
            return "high"
        return "low"

    @listen("high")
    def high_branch(self):
        return "High path"

    @listen("low")
    def low_branch(self):
        return "Low path"

并行执行流程

Flow 3

python
class ParallelFlow(Flow):
    @start()
    def begin(self):
        return "Start"

    @listen(begin)
    def task_a(self, _):
        return "A done"

    @listen(begin)
    def task_b(self, _):
        return "B done"

    @listen(and_(task_a, task_b))
    def merge(self):
        return "Both completed"

执行 Flow

同步执行

python
flow = MyFlow()
result = flow.kickoff()
print(result)

异步执行

python
import asyncio

async def main():
    flow = MyFlow()
    result = await flow.kickoff_async()
    print(result)

asyncio.run(main())

传入参数

python
flow = MyFlow()
result = flow.kickoff(inputs={"topic": "AI", "date": "2025-01-01"})

可视化

CrewAI 支持 Flow 可视化:

python
from crewai.flow.visualization import render_interactive

flow = MyFlow()
render_interactive(flow)  # 生成交互式可视化

最佳实践

1. 状态设计

python
# 推荐:使用 Pydantic 模型
class TaskState(BaseModel):
    input_data: dict = {}
    intermediate_results: list = []
    final_output: str = ""
    errors: list = []

class MyFlow(Flow[TaskState]):
    pass

2. 错误处理

python
class RobustFlow(Flow):
    @start()
    def risky_operation(self):
        try:
            result = self.external_api_call()
            return {"success": True, "data": result}
        except Exception as e:
            return {"success": False, "error": str(e)}

    @router(risky_operation)
    def handle_result(self):
        if self.state.get("success"):
            return "success_path"
        return "error_path"

    @listen("error_path")
    def handle_error(self):
        # 重试逻辑或通知
        pass

3. 模块化设计

python
# 将复杂逻辑封装为独立方法
class ModularFlow(Flow):
    def _validate_input(self, data):
        """验证输入"""
        return all(k in data for k in ["required_field"])

    def _transform_data(self, data):
        """转换数据"""
        return {k: v.upper() for k, v in data.items()}

    @start()
    def process(self):
        if not self._validate_input(self.state):
            raise ValueError("Invalid input")
        return self._transform_data(self.state)

与 LangGraph 对比

特性CrewAI FlowsLangGraph
语法装饰器图定义
状态Pydantic/dictTypedDict
路由@router + 返回值条件边
并行and_/or_分支节点
学习曲线较低较高
灵活性中等
调试较易较难

代码对比

CrewAI Flow

python
class MyFlow(Flow):
    @start()
    def step1(self):
        return "data"

    @listen(step1)
    def step2(self, data):
        return f"processed: {data}"

LangGraph

python
from langgraph.graph import StateGraph

def step1(state):
    return {"data": "data"}

def step2(state):
    return {"result": f"processed: {state['data']}"}

graph = StateGraph(State)
graph.add_node("step1", step1)
graph.add_node("step2", step2)
graph.add_edge("step1", "step2")

下一节:17.5 框架对比

基于 MIT 许可证发布。内容版权归作者所有。