runStreamNode

workflow.runStreamNode() 执行流式节点。它会遍历 node.stream(input, context) 产出的 chunks,逐个触发 chunk hook,并在结束后调用可选 node.finalize(chunks, context, input)

签名

workflow.runStreamNode<Input, Chunk, Output>(
  node: StreamNodeDefinition<Input, Chunk, Output>,
  input: Input,
  context: NodeContext,
): Promise<{
  chunks: Chunk[];
  output: Output | undefined;
}>

参数

参数类型说明
nodeStreamNodeDefinition要执行的流式节点,通常来自 createLLMStreamNode 或流式 createOutputNode
inputInput节点输入。
contextNodeContextruntime 上下文,用于 hooks、provider、KV、files、abortSignal。

StreamNodeDefinition

字段说明
name节点名。必填。
standardName标准节点类型名,例如 llm-streamoutput
metadata结构图和 UI 元信息。
stream流式执行函数,接收 inputNodeContext。必填。
finalize可选聚合函数,用 chunks 生成最终输出。

执行过程

步骤说明
1. 检查中断如果 context.abortSignal 已中断,抛出 execution_aborted
2. 发送开始事件触发流式节点开始 hook,并记录 executionId、nodeName、metadata 和 startedAt。
3. 遍历 stream收集每个 chunk,并触发 chunk hook。
4. finalize如果节点提供 finalize,用 chunks 生成最终 output。
5. 发送完成事件记录 output、duration 和 executionInfo,并触发完成 hook。
6. 错误包装失败时转换为 stream_interrupted 并写入报告。

返回值

字段类型说明
chunksChunk[]收集到的所有 chunk,顺序与流输出一致。
outputOutput | undefinedfinalize 结果。节点没有 finalize 时为 undefined

错误

情况错误类型说明
执行前或流中中断execution_abortedstream_interrupted中断会在流式遍历中被检查。
stream/finalize 抛错stream_interrupted错误会写入节点执行报告。

示例

const result = await workflow.runStreamNode(streamNode, parsed, context);

return {
  errCode: 0,
  errMessage: "",
  content: result.output ?? "",
};