You are viewing a single comment's thread from:

RE: LangGraph开发实战

in #starnote29 days ago

在实际应用中,流式输出尤其适用于需要快速反馈的业务场景,如聊天机器人,因为大语言模型可能需要几秒钟才能生成对查询的完整响应,这远远慢于应用程序对最终用户的响应速度约为 200-300 毫秒的阈值,如果是涉及多个大模型调用的复杂应用程序,这种延时会变得更加明显。让应用程序感觉响应更快的关键策略是显示中间进度;即,通过 token 流式传输大模型Token的输出,以此来显著提升用户体验。而在开发阶段,利用流式输出功能可以准确追踪到事件的具体执行阶段,并捕获相关数据,从而接入不同逻辑的数据处理和决策流程。是我们在应用开发中必须理解和掌握的技术点。

流式输出功能在LangGraph 框架中的实现方式比较简单,因为LangGraph底层是基于 LangChain 构建的,所有就直接把LangChain中的回调系统拿过来使用了。在LangChain中的流式输出是:以块的形式传输最终输出,即一旦监测到有可用的块,就直接生成它。最常见和最关键的流数据是大模型本身生成的输出。 大模型通常需要时间才能生成完整的响应,通过实时流式传输输出,用户可以在生成时看到部分结果,这可以提供即时反馈并有助于减少用户的等待时间。

LangGraph框架中的工作流中由各个步骤的节点和边组成。这里的流式传输涉及在各个节点请求更新时跟踪图状态的变化。这样可以更精细地监控工作流中当前处于活动状态的节点,并在工作流经过不同阶段时提供有关工作流状态的实时更新。其实现方式也是和LangChain一样通过.stream.astream方法执行流式输出,只不过适配到了图结构中。调用.stream.astream方法时可以指定几种不同的模式,即:

  • "values" :在图中的每个步骤之后流式传输状态的完整值。
  • "updates" :在图中的每个步骤之后将更新流式传输到状态。如果在同一步骤中进行多个更新(例如运行多个节点),则这些更新将单独流式传输。
  • "debug" :在整个图的执行过程中流式传输尽可能多的信息,主要用于调试程序。
  • "messages":记录每个messages中的增量token
  • "custom":自定义流,通过LangGraph 的 StreamWriter方法
async def main():
    async for event in graph.astream_events({"messages": ["what is labubu"]}):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            print(event["data"]["chunk"].content, flush=True)

asyncio.run(main())