runStreamNode
workflow.runStreamNode() 执行流式节点。它会遍历 node.stream(input, context) 产出的 chunks,逐个触发 chunk hook,并在结束后调用可选 node.finalize(chunks, context, input)。
签名
参数
| 参数 | 类型 | 说明 |
|---|---|---|
node | StreamNodeDefinition | 要执行的流式节点,通常来自 createLLMStreamNode 或流式 createOutputNode。 |
input | Input | 节点输入。 |
context | NodeContext | runtime 上下文,用于 hooks、provider、KV、files、abortSignal。 |
StreamNodeDefinition
| 字段 | 说明 |
|---|---|
name | 节点名。必填。 |
standardName | 标准节点类型名,例如 llm-stream、output。 |
metadata | 结构图和 UI 元信息。 |
stream | 流式执行函数,接收 input 和 NodeContext。必填。 |
finalize | 可选聚合函数,用 chunks 生成最终输出。 |
执行过程
| 步骤 | 说明 |
|---|---|
| 1. 检查中断 | 如果 context.abortSignal 已中断,抛出 execution_aborted。 |
| 2. 发送开始事件 | 触发流式节点开始 hook,并记录 executionId、nodeName、metadata 和 startedAt。 |
| 3. 遍历 stream | 收集每个 chunk,并触发 chunk hook。 |
| 4. finalize | 如果节点提供 finalize,用 chunks 生成最终 output。 |
| 5. 发送完成事件 | 记录 output、duration 和 executionInfo,并触发完成 hook。 |
| 6. 错误包装 | 失败时转换为 stream_interrupted 并写入报告。 |
返回值
| 字段 | 类型 | 说明 |
|---|---|---|
chunks | Chunk[] | 收集到的所有 chunk,顺序与流输出一致。 |
output | Output | undefined | finalize 结果。节点没有 finalize 时为 undefined。 |
错误
| 情况 | 错误类型 | 说明 |
|---|---|---|
| 执行前或流中中断 | execution_aborted 或 stream_interrupted | 中断会在流式遍历中被检查。 |
| stream/finalize 抛错 | stream_interrupted | 错误会写入节点执行报告。 |