Source code for langgraph_agent_toolkit.service.routes

from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.responses import RedirectResponse, StreamingResponse
from langchain_core.messages import AnyMessage, RemoveMessage
from langchain_core.runnables import RunnableConfig

from langgraph_agent_toolkit import __version__
from langgraph_agent_toolkit.agents.agent import Agent
from langgraph_agent_toolkit.helper.constants import get_default_agent
from langgraph_agent_toolkit.helper.logging import logger
from langgraph_agent_toolkit.helper.utils import langchain_to_chat_message
from langgraph_agent_toolkit.schema import (
    AddMessagesInput,
    AddMessagesResponse,
    ChatHistory,
    ChatHistoryInput,
    ChatMessage,
    ClearHistoryInput,
    ClearHistoryResponse,
    Feedback,
    FeedbackResponse,
    HealthCheck,
    ServiceMetadata,
    StreamInput,
    UserInput,
)
from langgraph_agent_toolkit.service.utils import (
    _sse_response_example,
    _validate_thread_or_user_id,
    get_agent,
    get_agent_executor,
    get_all_agent_info,
    message_generator,
)


# Create separate routers for private and public endpoints
private_router = APIRouter()
public_router = APIRouter(tags=["public"])


[docs] @private_router.get( "/info", status_code=status.HTTP_200_OK, tags=["info"], summary="Get information about available agents", description="Returns metadata about the service including available agents and default agent.", ) async def info(request: Request) -> ServiceMetadata: return ServiceMetadata( agents=get_all_agent_info(request), default_agent=get_default_agent(), )
[docs] @private_router.post( "/{agent_id}/invoke", status_code=status.HTTP_200_OK, tags=["agent"], summary="Invoke a specific agent to get a response", description="Invoke a specified agent with user input to retrieve a final response.", ) @private_router.post( "/invoke", status_code=status.HTTP_200_OK, tags=["agent"], summary="Invoke an agent to get a response", description="Invoke an agent with user input to retrieve a final response.", ) async def invoke(user_input: UserInput, agent_id: str = None, request: Request = None) -> ChatMessage: """Invoke an agent with user input to retrieve a final response. If agent_id is not provided, the default agent will be used. Use thread_id to persist and continue a multi-turn conversation. run_id kwarg is also attached to messages for recording feedback. """ executor = get_agent_executor(request) if agent_id is None: agent_id = get_default_agent() try: return await executor.invoke( agent_id=agent_id, input=user_input.input, thread_id=user_input.thread_id, user_id=user_input.user_id, model_name=user_input.model_name, model_provider=user_input.model_provider, model_config_key=user_input.model_config_key, agent_config=user_input.agent_config, recursion_limit=user_input.recursion_limit, ) except Exception as e: logger.error(f"An exception occurred: {e}") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Unexpected error")
[docs] @private_router.post( "/{agent_id}/stream", status_code=status.HTTP_200_OK, response_class=StreamingResponse, responses=_sse_response_example(), tags=["agent"], summary="Stream a specific agent's response", description="Stream a specified agent's response to a user input, including intermediate messages and tokens.", ) @private_router.post( "/stream", status_code=status.HTTP_200_OK, response_class=StreamingResponse, responses=_sse_response_example(), tags=["agent"], summary="Stream an agent's response", description="Stream an agent's response to a user input, including intermediate messages and tokens.", ) async def stream(user_input: StreamInput, agent_id: str | None = None, request: Request = None) -> StreamingResponse: """Stream an agent's response to a user input, including intermediate messages and tokens. If agent_id is not provided, the default agent will be used. Use thread_id to persist and continue a multi-turn conversation. run_id kwarg is also attached to all messages for recording feedback. Set `stream_tokens=false` to return intermediate messages but not token-by-token. """ if agent_id is None: agent_id = get_default_agent() return StreamingResponse( message_generator(user_input, request, agent_id), media_type="text/event-stream", )
[docs] @private_router.post( "/feedback", status_code=status.HTTP_201_CREATED, tags=["feedback"], summary="Record feedback", description="Record feedback for a run to the configured observability platform.", ) @private_router.post( "/{agent_id}/feedback", status_code=status.HTTP_201_CREATED, tags=["feedback"], summary="Record feedback for a specific agent", description="Record feedback for a run to the configured observability platform for a specific agent.", ) async def feedback(feedback: Feedback, agent_id: str | None = None, request: Request = None) -> FeedbackResponse: """Record feedback for a run to the configured observability platform. This routes the feedback to the appropriate platform based on the agent's configuration. """ try: if agent_id is None: agent_id = get_default_agent() agent = get_agent(request, agent_id) agent.observability.record_feedback( run_id=feedback.run_id, key=feedback.key, score=feedback.score, user_id=feedback.user_id, **feedback.kwargs, ) return FeedbackResponse( run_id=feedback.run_id, message=f"Feedback '{feedback.key}' recorded successfully for run {feedback.run_id}.", ) except ValueError as e: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) except Exception as e: logger.error(f"An exception occurred while recording feedback: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Unexpected error recording feedback" )
[docs] @private_router.get( "/history", status_code=status.HTTP_200_OK, tags=["chat"], summary="Get chat history", description="Get chat history for a thread or user.", ) @private_router.get( "/{agent_id}/history", status_code=status.HTTP_200_OK, tags=["chat"], summary="Get chat history for a specific agent", description="Get chat history for a thread or user with a specific agent.", ) async def history( input: ChatHistoryInput = Depends(), agent_id: str | None = None, request: Request = None, ) -> ChatHistory: """Get chat history.""" _validate_thread_or_user_id(input.thread_id, input.user_id) if agent_id is None: agent_id = get_default_agent() agent: Agent = get_agent(request, agent_id) try: state_snapshot = await agent.graph.aget_state( config=RunnableConfig( configurable={ "thread_id": input.thread_id, "user_id": input.user_id, } ) ) messages: list[AnyMessage] = state_snapshot.values["messages"] chat_messages: list[ChatMessage] = [langchain_to_chat_message(m) for m in messages] return ChatHistory(messages=chat_messages) except ValueError as e: logger.error(f"A validation error occurred: {e}") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) except Exception as e: logger.error(f"An exception occurred: {e}") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Unexpected error")
[docs] @private_router.delete( "/history/clear", status_code=status.HTTP_200_OK, tags=["chat"], summary="Clear chat history", description="Clear chat history for a thread or user.", ) @private_router.delete( "/{agent_id}/history/clear", status_code=status.HTTP_200_OK, tags=["chat"], summary="Clear chat history for a specific agent", description="Clear chat history for a thread or user with a specific agent.", ) async def clear_history( input: ClearHistoryInput, agent_id: str | None = None, request: Request = None, ) -> ClearHistoryResponse: """Clear chat history.""" _validate_thread_or_user_id(input.thread_id, input.user_id) if agent_id is None: agent_id = get_default_agent() agent: Agent = get_agent(request, agent_id) try: state_snapshot = await agent.graph.aget_state( config=RunnableConfig( configurable={ "thread_id": input.thread_id, "user_id": input.user_id, } ) ) messages: list[AnyMessage] = state_snapshot.values["messages"] await agent.graph.aupdate_state( config=RunnableConfig( configurable={ "thread_id": input.thread_id, "user_id": input.user_id, } ), values={"messages": [RemoveMessage(id=m.id) for m in messages]}, ) return ClearHistoryResponse( status="success", thread_id=input.thread_id, user_id=input.user_id, message=f"Cleared {len(messages)} messages from chat history.", ) except Exception as e: logger.error(f"An exception occurred: {e}") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Unexpected error")
[docs] @private_router.post( "/history/add_messages", status_code=status.HTTP_201_CREATED, tags=["chat"], summary="Add messages to chat history", description="Add messages to the end of chat history for a thread or user.", ) @private_router.post( "/{agent_id}/history/add_messages", status_code=status.HTTP_201_CREATED, tags=["chat"], summary="Add messages to chat history for a specific agent", description="Add messages to the end of chat history for a thread or user with a specific agent.", ) async def add_messages( input: AddMessagesInput, agent_id: str | None = None, request: Request = None, ) -> AddMessagesResponse: """Add messages to the end of chat history.""" _validate_thread_or_user_id(input.thread_id, input.user_id) if agent_id is None: agent_id = get_default_agent() agent: Agent = get_agent(request, agent_id) try: await agent.graph.aupdate_state( config=RunnableConfig( configurable={ "thread_id": input.thread_id, "user_id": input.user_id, } ), values={"messages": [{"type": m.type, "content": m.content} for m in input.messages]}, ) return AddMessagesResponse( status="success", thread_id=input.thread_id, user_id=input.user_id, message=f"Added {len(input.messages)} messages to chat history.", ) except Exception as e: logger.error(f"An exception occurred: {e}") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Unexpected error")
[docs] @public_router.get( "/", summary="API Home", description="Redirects to the API documentation.", ) async def redirect_to_docs() -> RedirectResponse: return RedirectResponse(url="/docs")
[docs] @public_router.get( "/health", tags=["healthcheck"], summary="Health Check", description="Perform a health check to verify the service is running correctly.", response_description="Return HTTP Status Code 200 (OK)", status_code=status.HTTP_200_OK, response_model=HealthCheck, ) async def health_check() -> HealthCheck: """Health check endpoint.""" return HealthCheck( content="healthy", version=__version__, )