Source code for langgraph_agent_toolkit.agents.blueprints.bg_task_agent.utils

from typing import Any

from langchain_core.messages import ChatMessage
from langgraph.types import StreamWriter
from pydantic import BaseModel, Field


[docs] class CustomData(BaseModel): """Custom data being sent by an agent.""" data: dict[str, Any] = Field(description="The custom data")
[docs] def to_langchain(self) -> ChatMessage: return ChatMessage(content=[self.data], role="custom")
[docs] def dispatch(self, writer: StreamWriter) -> None: writer(self.to_langchain())