Blue Streams: ワークフローのオーケストレーション

これまでの投稿では、エージェント型フレームワークBlue」を紹介し、「エンタープライズ向けエージェント型の要件や、Blueのアーキテクチャ内にある様々なレジストリを通じてBlueがエンタープライズ統合をどのように支援するかを詳しく説明しました。

このブログでは、ワークフローによる複雑なタスクの実行を容易にするために、Blue のもう一つの重要な抽象化であるストリームについて説明します。私たちは、ストリームがオーケストレーションに不可欠な抽象化であることを説明し、なぜストリームが、企業のスケーラビリティ要件に対応するエージェント型ワークフローのデータと制御のオーケストレーションに適した抽象化であるかを論じます。

ストリームとは何か?ストリーム処理とは?

コンピュータサイエンスにおいて、ストリーム(stream)とは本質的に動的なデータ構造であり、逐次到着するデータ要素の連なりを表します。例えば、センサーからのデータは、一連の測定値のストリームとして表現できます。
ストリーム処理(stream processing)は、ストリームを計算の第一級オブジェクトとして扱い、データの分配・消費・処理を行う演算子とともに扱うプログラミングパラダイムです。map、filter、split、reduceなどの演算子によって、複雑なワークフローを構成し、幅広い種類のデータ計算をサポートすることが可能です。
実際、ストリーム処理はオペレーティングシステムや分散システムなどの基盤技術から、データ処理や分析といった応用的な分野に至るまで、コンピューティングの様々な領域で応用されています。

stream processing

Blueにおけるストリーム

Blueにおいて、ストリームはオーケストレーションの中核をなす抽象概念です。ストリームは、データと制御の分配を可能にする仕組みとして機能します。したがって、ストリームはBlueにおけるエージェントやその他のコンポーネント間の通信の基盤を形成しています。

なぜストリームなのか?

Blueにおけるストリームの具体的な説明に入る前に、なぜストリームがエージェントのワークフローに適した抽象概念なのかを考えてみましょう。

第一に、ストリームは複雑なワークフローを効率的かつ効果的に表現でき、データとタスクによって作業を自然に分解することができます。ストリームは動的な構造でもあり、データ内容とストリーム間の接続はいつでも作成・更新できるため、データの処理方法にスケーラビリティと柔軟性を持たせることができます。

第二に、ストリームは可観測性(observability)と制御性(controllability)の向上にも寄与します。データと制御のフローを永続的なデータ構造で明示的に表現することで、デバッグやパフォーマンス上の問題の調査が可能になります。可観測性は、プロダクトデザインの観点だけでなく、ソフトウェアの検証の観点からも非常に重要です。また、エンドユーザーにとっては、システムの出力に対する信頼性を高めるのに役立ちます。

さらに、実運用の観点から見ると、ストリームは”役割の分離”(separation of concerns)を実現する役割も果たし、結果としてシステムの信頼性を向上させます。ストリームは、エージェントやその他のコンポーネント間の通信バスとして機能し、これらのエージェントやコンポーネントを独立して構成・デプロイ・スケールできるようにします。これにより、パフォーマンスと信頼性の両面で利点が得られます。

最後に、ストリームはLLM/AIの世界においても馴染みのある概念です。実際、LLMにデータを入力し、出力を受け取る流れは、ストリームコンピューティングの仕組みと非常によく似ています。多くのLLM APIは、データが生成されると同時に処理できるようにストリーミング出力を提供しています。
テキストのみを扱うLLMを超えて、より構造化されたデータを扱うようになると、ストリームはエージェントとの連携のための基盤として拡張可能な仕組みになります。このようにストリームはデータの内容(データセマンティクスの曖昧性を減らす)とデータの流れ(処理の曖昧性を減らす)の両方に構造を与えることで、システム全体の制御性を高めます。

ストリームはBlueでどのように機能するのか?

stream messages

Blueにおいて、ストリームとは本質的に「メッセージ」と呼ばれるデータの列です。ストリームには一意のIDが付与され、タグによって内容の属性を記述できます。例えば、あるストリームにUSERタグが付けられている場合、そのストリームの内容があるユーザーから発信されたものであることを示します。

各ストリームは、ストリームの開始を表す特別なメッセージBOS(Beginning of Stream)で始まり、複数のメッセージを挟んで、ストリームの終了を示す特別なメッセージEOS(End of Stream)で終わります。BOSとEOSによって、ストリームを処理するエージェント(consumer)は、計算やリソースの初期化および終了処理を行うことができます。

ストリームメッセージ

ストリーム内の各メッセージは、3つの要素からなるタプルで構成されます。 k はメッセージのキーまたはID、 v はデータ、 t はメッセージのタグを表します。 k はデータを一意に識別するために使われるほか、アグリゲーションのように計算のキーとして使用することも可能です。 vはメッセージ内のデータを表し、 floatstr などの基本的なデータ型から、複合データを表すJSON形式まで、様々な構造化データを表現できるよう標準化されています。 t はデータの意味(セマンティクス)を示すタグであり、DATAやCONTROLといった基本タグから、よりアプリケーション固有のセマンティクスまで、複数の値を取ることができます。例えば、ストリームが複数の履歴書(レジュメ)からなる文書のシーケンスを含んでいる場合、それぞれのメッセージはレジュメとして扱われ、レジュメID、本文、DOCやRESUMEといったタグを持つことができます。

Blueのメッセージの大半はデータメッセージであり、エージェントが取得し処理するためのデータを提供します。しかし、Blueにはコントロール(制御)メッセージも存在します。これは本質的に、エージェントがストリームを処理させるための指示を表します。例えば、あるエージェントが他のエージェントに何かを計算させたい場合、メッセージを出力して依頼できます。これはツールの呼び出し(tool calling)に似ていますが、ツールだけでなく他のエージェントも対象とする点で拡張的です。これらのコントロールメッセージは、「実行時に動的に生成される指示(インタプリタ形式のコードのようなもの)」として表現され、エージェント同士を結びつける明確な役割を担います。さらに、これらの指示はデータと同様に明示的に表現され、コンピュータシステムにおける命令キュー(instruction queue)のような役割も果たします。コントロールメッセージは、どのエージェントでも使用でき、別のエージェントに処理を依頼したり、全体の実行フローを管理するプランナーエージェントが利用したりすることも可能です。これらのメッセージは、任意のストリームに含めることも、制御専用のストリームに分離して構成することもできます。

Calculations

コントロールメッセージも、k(キー/ID)、v(コントロールメッセージの構造を記述するための特定の構造を持つ値)、t(そのメッセージがコントロールおよび指示メッセージであることを明示的に示すタグ)という、同じタプル形式のコンポーネントを持ちます。エージェントの実行命令を表す場合、実行対象のエージェント(A)と、任意のプロパティ(P)、処理対象のストリーム(S)が含まれます。これにより、コントロールメッセージがどのエージェントにどのような操作をさせるべきかを具体的に記述することができます。そしてtにより、このメッセージが制御および指示用であることが明確にタグ付けされます。

ストリームとエージェント

Blueにおいて、エージェントはストリームデータの主要な”消費者”であり、”生産生成者”でもあります。エージェントは、ストリーム内のすべてのメッセージに対して実行される プロセッサ関数(processor function) を定義し、必要に応じて新しいストリームにデータを書き込むことも可能です。エージェントは複数の入力・出力パラメータを持つことができ、入力ストリームはプロセッサの特定の入力パラメータにマッピングされ、出力データは特定の出力パラメータに割り当てられるようになっています。例えば、インテント分類エージェント(intent classifier agent)は、 TEXTのような名前付き入力パラメータと、INTENTのような出力パラメータを持つことができます。さらに、エージェントはデータ処理をガイドするプロパティも持ちます。例えば、インテント分類エージェントは、インテント識別に使用するモデルを指定する modelプロパティを定義することができます。

streams and agents

エージェントは、各入力パラメータに対して宣言されたリスナー(listener)を通じてストリームに接続し、処理を行うことができます。例えば、インテント分類エージェントは、 入力パラメータTEXTに対して、USERというタグが付いたストリームを監視(listen)するよう指定できます。さらに、エージェントは、先に述べたように、ストリーム内の明示的な指示メッセージ(コントロールメッセージ)を通じてストリーム処理を開始することも可能です。

このように、ストリームデータプロセッサとしてのエージェントモデルは、処理の流れを明確かつシンプルに保ち、アプリケーション内でのデータフローの理解と管理を容易にします。

次に、このモデルを用いて複雑なワークフローがどのように構築されるのかを説明します。

複雑なワークフロー

ワークフローとは本質的に、特定の目的を達成するためにデータ上で実行される一連のタスクを指します。エージェント型の統合管理においては、これはエージェントとその処理対象となるデータを整理し、全体として望ましい成果を生み出すことに対応します。ワークフローは、ノードがプロセッサ(処理単位)を、エッジがエージェント間の入力および出力データの接続を表す有向非巡回グラフ(DAG)として表現できます。

Blueにおいては、ノードは複数の入力と出力を持つエージェントであり、エッジはエージェント間で対応する入力と出力を明示的に接続するものです。例えば、 NL2SQLの出力ストリームSQLを、SQL EXECUTORエージェントの入力パラメータSQLに接続することで、クエリの実行を行うことができます。

complex workflows

ワークフローは、単に実行順序を定義するだけでなく、データとタスクを分解して複雑な処理パイプラインを表現することも可能です。こうした複雑なワークフローの表現にはストリームが非常に適しています。

簡単に言えば、データの連続は自然に反復処理iterationに対応し、エージェントによるフィルタリングは条件分岐conditional statementとして機能します。 出力されたデータは、様々なストリームに分配され、 それぞれのストリームで特定の処理を実行することで、プロシージャ呼び出しのような挙動を再現し、抽象化やスコープの明確化に貢献します。さらに、後でストリーム同士を結合することで、結果のアグリゲーションも可能になります。

Blueでは、ストリームが本来持つこうした複雑なワークフロー表現の能力に加え、エージェントの実行指示のようなコントロールメッセージも含めることができるため、さらに表現力が強化されています。特にプランナーエージェントのようなエージェントは、これらの仕組みを活用して、多くのプログラミング言語に匹敵する表現力を持つ計画(プラン)を生成することが可能です。

それでは、具体的な例を見てみましょう。

例:不動産分析
Home data analyzed by data, by tax

あなたは今、家の購入を検討しているとしましょう。興味のある地域はいくつもあり、物件の選択肢も豊富です。それぞれの家には異なる長所と短所があり、広い敷地を持つもの、良い学校がある地域にあるもの、公共交通機関へのアクセスが便利な場所にあるものなど様々です。また、あなた自身や家族にとっての重要な要素もいくつかあります。例えば、予算に限りがあり、中学生の子どもがいるため教育環境を重視しているかもしれません。さらに、パートナーが都心で働いていれば、地下鉄の駅に近い場所を希望するかもしれません。

このように多くの条件が関係し、優先順位も異なる複雑な意思決定ワークフローに、私たちはどのように対応すればよいのでしょうか?

第一に、Blueは作業を分散し、それぞれに適切なスコープを設定することで容易にスケール可能です。例えば、複数の物件を異なるストリームに分けて処理することができます。これはまるで、複数の不動産エージェントに異なる地域や物件に注目するよう依頼し、並行して調査を進めてもらった上で結果を報告してもらうようなものです。

第二に、各物件は異なるため、それぞれを個別に精査し、それぞれのトレードオフを考慮する必要があります。Blueを使えば、各物件に特化したプランを作成し、分析・優先順位付け・深掘りを行い、その物件に最適なデータを使って検討を支援することができます。 例えば、ある物件では土地モデルを活用して敷地条件を詳細に分析したり、別の物件では学校に関するレビューのデータベースをもとに、学校のレベルを評価することが可能です。

最後に、Blueはそれらのすべての調査結果を統合し、各エージェントからの情報を集約することができます。様々な物件のレポートを比較・対照し、住宅の状態や地域の学校のレベルといったトレードオフを確認することができます。Blueは調査内容を説明しながら、最終的な購入判断を支援してくれる強力なパートナーになるのです。

ストリームはどのようにして Blue の独自性を生み出しているのか?

Blueにおいて、ストリームは中心的な統合管理の抽象概念であり、表現力と制御性の点で他のフレームワークとの差別化要素となっています。

多くの既存手法では、自然言語がエージェントの入出力データの唯一の形式として扱われています。しかし、自然言語は曖昧さを含むため、テキストのみを扱うアプローチはビジネス用途において問題を引き起こす可能性があります。一方、Blueではエージェント間でやり取りされるデータは構造化されており、セマンティクスと制御の両面で利点があります。

また、データの流れやエージェント間の通信パターンにおいて、Blueは非常に優れた表現力を発揮します。ストリームベースのワークフローによって、Blueのエージェント型ワークフローは多くのプログラミング言語に匹敵する計算能力を持つことが可能になります。

ワークフローは、事前に設計されたもの(エージェントの入出力を明示的に接続する)、プランナーによってオンデマンドで構築されるもの、またはエージェントが宣言する興味に基づくアドホックなもの(リスナー経由)として設計することができます。これにより、計画と調整を必要とする複雑で知識集約的なタスクにも対応できます。さらに、エージェントはコントロールメッセージを生成することで、他のエージェントへの委任のようなワークフローの計画や調整を行うことも可能です。

最後に、ストリームはエージェント型ワークフローにおける「コンテキスト」としての役割も果たします。これは、Blueと他のエージェント型フレームワークとの重要な違いです。Blueでは、タスクに合わせてコンテキストのスコープを明示的に設定できるのに対し、他の多くのフレームワークでは処理が進むにつれてコンテキストを無制限に拡張し続けてしまい、その結果として処理が遅くなり、品質も低下するという問題が生じがちです。

まとめ

私たちは「エージェント型ワークフロー」をエンタープライズで実用化するうえで、ストリーム処理が不可欠な要素であると強く信じています。ストリームは幅広いワークフローとその複雑性に対応しながら、正確性、拡張性、使いやすさを促進するための適切な抽象化とスコープを提供します。

次回の記事

次回は、これまで紹介してきた内容をすべて統合し、Blueのエージェント型フレームワーク全体のアーキテクチャと、その中で各コンポーネントや抽象概念がどのように連携して動作するかを解説します。また、Blueアーキテクチャに特有の設計要素と、他のエージェント型フレームワークに対して持つ優位性についても議論します。

今後もプランナーや高度なストリーム機能の分野を中心に、Blueのさらなる進展に合わせて追加の記事を投稿予定です。

まずは、Blue v0.9 を実際にご覧ください。アイデア、質問、貢献のお申し出などがあれば、ぜひ コミュニティを通じてご連絡ください。

執筆者:Eser Kandogan, MegagonLabs

この記事をシェアする
13 Min Read
November 7, 2025
「混合シグナル(Mixed Signals)」は、視覚言語モデル(VLM)の隠れたバイアスを明らかにし、ヘルスケア、RAG システム、AI の安全性に対して重大な示唆を与えています。
13 Min Read
May 8, 2025
エージェントによるワークフローをサポートするために、企業システムはどのように進化できるでしょうか?本記事では、AIエージェントやデータ、サービスを、スケーラビリティと可観測性があり、制御可能なエンタープライズ・アプリケーションに統合するためにデザインされたフレームワークであるBlueの概念的基盤を探ります。
7 Min Read
May 5, 2025
私たちは、自然言語処理における喫緊かつ未開拓のトピックである「複数文書推論」に取り組む3つの新しい論文を紹介します。これらの論文は、大規模言語モデル(LLM)が複数の情報源にまたがる複雑性をどのように扱うかについて、厳密なベンチマーク、新しい方法論、経験的洞察を提供します。