[对比学习LangChain和MAF-16]基于Checkpoint的持久化

[对比学习LangChain和MAF-16]基于Checkpoint的持久化

LangGraph和MAF的Workflow真的很像,它们的核心目标非常一致,都是解决大模型逻辑链脆弱、非确定性带来的失控问题,将Agent转化为可控、稳定、可扩展的业务流水线。但是它们只是看上去很像,实际上在底层的实现又完全不一样。这篇文章我们就来讨论两者在基于Checkpoint的持久化方面的异同。

1. LangGraph

为了应对生产环境中的网络中断或长周期任务,两者都内置了基于Checkpointing的持久化机制,保障工作流在异常中断后可恢复、可重试。为了让用户更好地理解Checkpointing的概念,我们先来演示一个利用Checkpoint恢复执行的例子。

1.1 从Checkpoint所在的地方开始执行

在如下这段程序中,我们基于指定的状态类型State创建了一个StateGraph,并为它添加了四个节点foobarbazqux。每个节点在执行时都会将自己的名称写入一个名为nodes的状态成员中。我们将foo设置为入口节点,qux设置为出口节点,并采用Sequential的方式将四个节点串联起来。为了跟踪四个节点的执行,我们在每个节点的执行函数中将节点名称写入一个全局列表log中。

fromtypingimportAnnotated,Callable,Any,Required,TypedDictfromdotenvimportload_dotenvfromlanggraph.graphimportStateGraphfromlanggraph.checkpoint.memoryimportInMemorySaverfromlangchain_core.runnablesimportRunnableConfigimportasyncio,operator load_dotenv()log=[]classState(TypedDict):nodes:Required[Annotated[list[str],operator.add]]defbuild_node(node_id:str)->Callable[[State],dict[str,Any]]:defhandle(state:State)->dict[str,Any]:log.append(node_id)return{"nodes":[node_id]}returnhandle checkpointer=InMemorySaver()nodes={node_id:build_node(node_id)fornode_idin["foo","bar","baz","qux"]}agent=(StateGraph(State).add_node("foo",nodes["foo"])#type: ignore.add_node("bar",nodes["bar"])#type: ignore.add_node("baz",nodes["baz"])#type: ignore.add_node("qux",nodes["qux"])#type: ignore.set_entry_point("foo").set_finish_point("qux").add_edge("foo","bar").add_edge("bar","baz").add_edge("baz","qux").compile(checkpointer=checkpointer))asyncdefmain():config:RunnableConfig={"configurable":{"thread_id":"thread_001"}}input:State={"nodes":[]}awaitagent.ainvoke(input=input,config=config)history=list(agent.get_state_history(config=config))print("Printing the history:")forstateinhistory:print(f"{state.metadata.get('step')}:{state.values}")#type: ignoreprint("\nReplaying the history:")forstateinhistory:log.clear()awaitagent.ainvoke(input=None,config=state.config)print(f"Replayed{state.metadata.get('step')}:{log}")#type: ignoreasyncio.run(main())

为了支持基于Checkpointing的持久化,我们在compile方法对StateGraph进行编译的时候,指定一个InMemorySaver对象作为Checkpointer,它会在每个Superstep完成的时候创建针对当前状态创建对应的Checkpoint,并存储在内存中。由于Checkpointing是基于Thread ID进行持久化的,所以我们在调用编译生成的Agent对象时,在作为参数的RunnableConfig中指定了一个Thread ID。

在Agent调用结束后,我们调用它的get_state_history方法将表示历史状态的StateSnapshot列表收集起来。每个StateSnapshot都对应一个创建的Checkpoint,并提供额外的元数据。我们将Checkpoint对应的Superstep编号和状态值(所有通道值)打印出来。接下来我们遍历这个StateSnapshot列表,从StateSnapshotconfig字段中提取出RunnableConfig,并将其作为参数调用Agent对象,其目的是从Checkpoint所在的地方开始执行。

从如下的输出可以看出,整个过程涉及6个Superstep,序号从-1到4。foo节点在Superstep 1执行之后,将自身的名称写入状态中,barbazqux节点依次执行,最终在Superstep 4完成整个工作流的执行。与之对应,如果我们从最初的两个Superstep(编号分别为-1和0)处恢复执行,四个节点会依次执行。但是若从Superstep 1开始支持,此时Checkpoint记录的是节点foo执行后的状态,所以会从节点bar开始执行,以此类推。

The state history: 4:{'nodes': ['foo', 'bar', 'baz', 'qux']} 3:{'nodes': ['foo', 'bar', 'baz']} 2:{'nodes': ['foo', 'bar']} 1:{'nodes': ['foo']} 0:{'nodes': []} -1:{'nodes': []} Replaying the history: Replayed from checkpoints[4]:[] Replayed from checkpoints[3]:['qux'] Replayed from checkpoints[2]:['baz', 'qux'] Replayed from checkpoints[1]:['bar', 'baz', 'qux'] Replayed from checkpoints[0]:['foo', 'bar', 'baz', 'qux'] Replayed from checkpoints[-1]:['foo', 'bar', 'baz', 'qux']

1.2 基于通道的Checkpointing

LangGraph与MAF Workflow的最大不同之处在于,状态图并不直接用于执行,而是先将其编译成一个Actor模型,并将整个状态拆分为具有不同类型的通道。整个Actor模型由无状态的节点和存储状态的通道组成,状态图中节点之间的边转换成节点和通道之间的订阅关系。Actor模型的执行并非基于状态图定义的消息路由,而是基于节点针对通道变更的订阅。

由于Agent的状态完全集中在通道中,所以成功完成的Superstep来说,Checkpointing过程变得异常的简单:只需要持久化通道的状态转换成Checkpoint就可以了。但是对因异常或者中断尚未完成的Superstep来说,需要成功执行的节点针对通道写入意图存储起来,这样才能既保证成功执行的节点不再重复执行,又能保证它们针对通道的写入在当前Superstep中被正确的执行。我们将这种更新称为PendingWrite,除了描述节点针对通道写入之外,PendingWrite还可以描述如下的未决状态:

  • 节点成功执行,它们它的处理方法根本不涉及通道的写入;
  • 节点在执行过程中抛出异常,应该将异常信息记录下来;
  • 节点在执行过程中被中断,应该将中断信息记录下来;
  • 从某个中断点处恢复执行,应该将提供的ResumeValue记录下来。

对于LangGraph基于Checkpointing机制来说,被持久化的不仅仅是存储通道最终状态的Checkpoint,还包括PendingWrite、元数据以及一些调用时采用的配置。持久化的数据基本上可以表示为如下这个名为CheckpointTuple的元组。

classCheckpointTuple(NamedTuple):config:RunnableConfig checkpoint:Checkpoint metadata:CheckpointMetadata parent_config:RunnableConfig|None=Nonepending_writes:list[PendingWrite]|None=NonePendingWrite=tuple[str,str,Any]

LangGraph的Checkpointing基本上可以视为针对上面这个CheckpointTuple持久化,以及如何从持久化的CheckpointTuple恢复现场。当前具体的实现远不止我们说的这么简单,具体的机制可以参考我如下这几篇文章:

  • 基于Checkpoint的持久化
  • 持久化状态的提取
  • 非常规Pending Write的持久化
  • 三种持久化模式的差异
  • 梳理Agent的执行流程
  • 回到过去,开启平行世界-上篇
  • 回到过去,开启平行世界-下篇

2. MAF Worflow

虽然MAF Workflow的Checkpointing持久化机制也是基于Superstep进行,但是由于底层的执行引擎的差异,导致持久化的实现方式与LangGraph有很大的不同。不过在具体介绍之前,我们先来演示一个利用Checkpoint恢复执行的例子。

2.1 从Checkpoint所在的地方开始执行

前面我们利用LangGraph演示了如何从Checkpoint所在的地方开始执行,下面我们利用MAF Workflow演示同样的功能。我们定义了辅助方法CreateExecutor,它会根据提供的Executor的ID创建一个FunctionExecutor<string,string>类型的Executor对象。该对象在执行的时候,会将当前ID写入log中以利于跟踪每个节点的执行。我们调用此方法创建了foobarbazqux四个Executor,并以Sequential模式将它们编排成按序执行的Workflow。

usingMicrosoft.Agents.AI.Workflows;usingSystem.Diagnostics;List<string>log=[];varrandom=newRandom();varworkflow=BuildWorkflow();varcheckpointManager=CheckpointManager.CreateInMemory();varrun=awaitInProcessExecution.Default.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow,"start");awaitrun.RunToCompletionAsync();varcheckpoints=run.Checkpoints;Debug.Assert(checkpoints.Count==4);for(varindex=0;index<4;index++){log.Clear();awaitrun.RestoreCheckpointAsync(checkpoints[index]);awaitrun.RunToCompletionAsync();Console.WriteLine($"Restore from Checkpoints[{index}]: [{string.Join(",",log)}]");}WorkflowBuildWorkflow(){varfoo=CreateExecutor("Foo");varbar=CreateExecutor("Bar");varbaz=CreateExecutor("Baz");varqux=CreateExecutor("Qux");returnnewWorkflowBuilder(foo).AddEdge(source:foo,target:bar).AddEdge(source:bar,target:baz).AddEdge(source:baz,target:qux).Build();}ExecutorBindingCreateExecutor(stringid)=>newFunc<string,ValueTask<string>>(asyncinput=>{log.Add(id);awaitTask.Delay(random.Next(100,500));returnid;}).BindAsExecutor(id);

在以流的形式执行Workflow之前,我们调用WithCheckpointing指定了一个通过调用CheckpointManager.CreateInMemory创建的CheckpointManager对象,它会帮助我们创建Checkpoint并将其存储在内存中。Workflow执行完成后,我们将StreamingRun对象的Checkpoints属性存储的CheckpointInfo收集起来。通过断言,我们知道这里只有4个Checkpoint被创。按照一个Superstep一个Checkpoint的原则,意味着这里只涉及4个Superstep(LangGraph涉及6个Superstep)。

我们遍历这个CheckpointInfo列表,调用StreamingRun对象的RestoreCheckpointAsync方法将Workflow恢复到指定的Checkpoint所在的地方,然后调用RunToCompletionAsync方法继续执行Workflow。通过打印log,我们可以看到每个Checkpoint对应的Superstep编号和节点执行顺序。

Restore from Checkpoints[0]: [Bar,Baz,Qux] Restore from Checkpoints[1]: [Baz,Qux] Restore from Checkpoints[2]: [Qux] Restore from Checkpoints[3]: []

同样是第一个执行的节点,foo在LangGraph中式在第三个Superstep中执行的(编号为1),而在MAF Workflow中是在第一个Superstep中执行的(编号为0)。当我们从第一个Checkpoint开始恢复执行时,foo已经成功执行,所以它不会再被执行,barbazqux依次执行,以此类推。

2.2 基于消息路由的Checkpointing

由于LangGraph创建的Agent是以Actor模型的形式运行的,并且所有的状态都集中在通道中,所以它的Checkpointing机制主要围绕通道进行,整个设计变得很简单。而MAF Workflow的采用消息路由的方法执行,并将状态控制在IWorkflowContext上下文中,所以它不仅需要持久化未处理的消息,还需要持久化上下文中的状态。对于跨越多个Superstep的FanInEdge,它还将针对某个Superstep的中间状态记录下来。

除此之外,两者对Checkpoint这个对象的定义也不一样。LangGraph的Checkpoint对象主要是针对通道的状态进行持久化,并利用PendingWrite来记录未决状态。而Checkpoint在MAF Workflow中的表示的时整个持久化的状态,相当于我们CheckpointTuple元组。MAF将Checkpointing的细节全部隐藏了起来,所以很多核心的类型都是internal类型,其中就包括承载所有持久化信息的如下这个Checkpoint类型。

internalsealedclassCheckpoint{publicboolIsInitial=>StepNumber==-1;publicintStepNumber{get;}publicWorkflowInfoWorkflow{get;}publicRunnerStateDataRunnerData{get;}publicDictionary<ScopeKey,PortableValue>StateData{get;}=newDictionary<ScopeKey,PortableValue>();publicDictionary<EdgeId,PortableValue>EdgeStateData{get;}=newDictionary<EdgeId,PortableValue>();publicCheckpointInfo?Parent{get;}}

属性成员说明如下:

  • StepNumberCheckpoint对应的Superstep编号;
  • Workflow:描述Workflow的WorkflowInfo对象;
  • RunnerData:描述Workflow执行器状态的RunnerStateData对象;
  • StateData:一个字典,Key是ScopeKey对象,Value是PortableValue对象,用于存储Workflow中不同Scope维度的状态数据;
  • EdgeStateData:一个字典,Key是EdgeId对象,Value是PortableValue对象,用于存储Workflow中不同Edge维度的状态数据;
  • Parent:一个可选的CheckpointInfo对象,指向上一个Checkpoint

关于Checkpoint对象以及MAF Workflow具体的Checkpointing机制的更多细节,可以参考我如下这几篇文章:

  • Workflow基于Checkpointing的持久化
  • 关于Checkpointing的一个严重Bug