Code
from logging import error
from typing import TypedDict, Dict, Any
from langchain_core.tools import tool
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_deepseek import ChatDeepSeek
from langgraph.graph import StateGraph, END
# 1.define state type
class Workflowstate(TypedDict):
data: Any
status: str
error: str
# 2 定义工具函数
@tool
def load_data() -> str:
"""加载数据工具:加载并返回数据"""
print("step1:load data")
return "data"
@tool
def process_data(data: str) -> str:
"""处理数据工具:处理输入的数据并返回处理后的结果"""
print("step2:process data")
if not data:
raise ValueError("not process data")
return "processed_data"
@tool
def generate_result(data: str) -> str:
"""生成结果工具:根据处理后的数据生成最终结果"""
print("step3:generate result")
if not data or data != "processed_data":
raise ValueError("data is not correct")
return "result"
@tool
def save_result(result: str) -> str:
"""保存结果工具:将生成的结果保存并返回状态"""
print("step4:save result")
if not result or result != "result":
raise ValueError("not save result")
return "done"
# 3 定义工作流节点
def step_load_data(state: Workflowstate) -> Workflowstate:
try:
result = load_data.invoke({})
return {"data": result, "status": "load successful", "error": ""}
except Exception as e:
return {"data": None, "status": "load failed", "error": str(e)}
def step_process_data(state: Workflowstate) -> Workflowstate:
try:
result = process_data.invoke({"data": state["data"]})
return {**state, "data": result, "status": "processed successful"}
except Exception as e:
return {**state, "status": "process failed", "error": str(e)}
def step_generate_result(state: Workflowstate) -> Workflowstate:
try:
result = generate_result.invoke({"data": state["data"]})
return {**state, "data": result, "status": "generated successful"}
except Exception as e:
return {**state, "status": "generated failed", "error": str(e)}
def step_save_result(state: Workflowstate) -> Workflowstate:
try:
result = save_result.invoke({"result": state["data"]})
return {**state, "data": result, "status": "saved successful"}
except Exception as e:
return {**state, "status": "saved failed", "error": str(e)}
# 4 使用langgraph构建工作流
def build_workflow():
workflow = StateGraph(Workflowstate)
workflow.add_node("load_data", step_load_data)
workflow.add_node("process_data", step_process_data)
workflow.add_node("generate_result", step_generate_result)
workflow.add_node("save_result", step_save_result)
# add edge
workflow.add_edge("load_data", "process_data")
workflow.add_edge("process_data", "generate_result")
workflow.add_edge("generate_result", "save_result")
workflow.add_edge("save_result", END)
# 设置入口点
workflow.set_entry_point("load_data")
return workflow.compile()
# 5 错误处理和条件分支版本
def build_advanced_workflow():
workflow = StateGraph(Workflowstate)
# add node
workflow.add_node("load_data", step_load_data)
workflow.add_node("process_data", step_process_data)
workflow.add_node("generate_result", step_generate_result)
workflow.add_node("save_result", step_save_result)
# 添加错误处理节点
def handle_error(state: Workflowstate) -> Workflowstate:
print(f"错误处理:{state['error']}")
return {**state, "status": "error_handled"}
workflow.add_node("handle_error", handle_error)
# 条件路由函数
def route_next_step(state: Workflowstate):
if state["error"]:
return "handle_error"
return None
# 添加有条件边
workflow.add_conditional_edges(
"load_data",
route_next_step,
{None: "process_data", "handle_error": "handle_error"},
)
workflow.add_conditional_edges(
"process_data",
route_next_step,
{None: "generate_result", "handle_error": "handle_error"},
)
workflow.add_conditional_edges(
"generate_result",
route_next_step,
{None: "save_result", "handle_error": "handle_error"},
)
# 设置正常结束
workflow.add_edge("save_result", END)
workflow.add_edge("handle_error", END)
# 设置入口
workflow.set_entry_point("load_data")
# 返回编译后的工作流
return workflow.compile()
def run_workflow(advanced: bool = False):
print("\n======begin run workflow========\n")
# create init state
initial_state = {"data": None, "status": "init", "error": ""}
# build & run workflow
if advanced:
workflow = build_advanced_workflow()
else:
workflow = build_workflow()
final_state = workflow.invoke(initial_state)
##output result
print("\n======workflow end=======\n")
print(f"final state:{final_state['status']}")
print(f"result:{final_state['data']}")
if final_state['error']:
print(f"error:{final_state['error']}")
if __name__=='__main__':
# 正常工作流
print("\n=== 正常工作流 ===")
run_workflow(advanced=True)
# 测试错误处理
print("\n=== 错误处理测试 ===")
@tool
def load_data_with_error() -> str:
"""故意引发错误的加载数据工具"""
print("测试错误处理:引发错误")
raise ValueError("测试错误:数据加载失败")
return "data"
# 备份原始函数
original_load_data = globals()["load_data"]
# 替换为错误版本
globals()["load_data"] = load_data_with_error
# 运行测试
run_workflow(advanced=True)
# 恢复原始函数
globals()["load_data"] = original_load_data
文章评论