Source code for langgraph_agent_toolkit.core.observability.base
import asyncio
import os
from abc import ABC, abstractmethod
from contextlib import contextmanager
from functools import wraps
from typing import Any, Callable, Dict, List, Literal, Optional, TypeVar, Union, cast
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.prompts.chat import (
AIMessagePromptTemplate,
BaseMessage,
HumanMessagePromptTemplate,
MessagesPlaceholder,
SystemMessagePromptTemplate,
)
from langgraph_agent_toolkit.core.observability.types import ChatMessageDict, MessageRole
T = TypeVar("T")
# Type for prompt templates that can be provided to push_prompt
PromptTemplateType = Union[str, List[ChatMessageDict]]
# Type for the return value of pull_prompt
PromptReturnType = Union[ChatPromptTemplate, str, dict, None]
[docs]
class BaseObservabilityPlatform(ABC):
"""Base class for observability platforms.
This is a lightweight base class that provides common utilities for
observability platforms. It does NOT perform any disk I/O operations
to avoid blocking in async contexts.
"""
[docs]
def __init__(self, remote_first: bool = False):
"""Initialize the observability platform.
Args:
remote_first: If True, prioritize remote prompts over local cache.
"""
self._required_vars: List[str] = []
self._remote_first = remote_first
@property
def remote_first(self) -> bool:
return self._remote_first
@property
def required_vars(self) -> List[str]:
return self._required_vars
@required_vars.setter
def required_vars(self, value: List[str]) -> None:
self._required_vars = value
[docs]
def validate_environment(self) -> bool:
"""Validate that required environment variables are set."""
missing_vars = [var for var in self._required_vars if not os.environ.get(var)]
if missing_vars:
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
return True
[docs]
@staticmethod
def requires_env_vars(func: Callable[..., T]) -> Callable[..., T]:
"""Validate environment variables before calling a method."""
@wraps(func)
def wrapper(self, *args, **kwargs):
self.validate_environment()
return func(self, *args, **kwargs)
return wrapper
[docs]
@abstractmethod
def get_callback_handler(self, **kwargs) -> Any:
"""Get the callback handler for the observability platform."""
pass
[docs]
@abstractmethod
def before_shutdown(self) -> None:
"""Perform any necessary cleanup before shutdown."""
pass
[docs]
@abstractmethod
def record_feedback(self, run_id: str, key: str, score: float, **kwargs) -> None:
"""Record feedback for a run."""
pass
[docs]
@abstractmethod
def push_prompt(
self,
name: str,
prompt_template: PromptTemplateType,
metadata: Optional[Dict[str, Any]] = None,
force_create_new_version: bool = True,
) -> None:
"""Push a prompt to the observability platform."""
pass
[docs]
@abstractmethod
def pull_prompt(
self,
name: str,
template_format: Literal["f-string", "mustache", "jinja2"] = "f-string",
**kwargs,
) -> PromptReturnType:
"""Pull a prompt from the observability platform."""
pass
[docs]
async def apull_prompt(
self,
name: str,
template_format: Literal["f-string", "mustache", "jinja2"] = "f-string",
**kwargs,
) -> PromptReturnType:
"""Async version of pull_prompt. Runs synchronous version in thread pool."""
return await asyncio.to_thread(self.pull_prompt, name, template_format, **kwargs)
[docs]
@abstractmethod
def delete_prompt(self, name: str) -> None:
"""Delete a prompt from the observability platform."""
pass
[docs]
@contextmanager
def trace_context(self, run_id: str, **kwargs):
"""Create a trace context for the execution.
Override in subclasses for platform-specific implementation.
Args:
run_id: The run ID to use as trace ID
**kwargs: Additional context parameters (user_id, input, etc.)
Yields:
None (or platform-specific context object)
"""
yield
def _convert_to_chat_prompt(self, prompt_template: PromptTemplateType) -> ChatPromptTemplate:
if isinstance(prompt_template, str):
return ChatPromptTemplate.from_template(prompt_template)
elif isinstance(prompt_template, list) and all(isinstance(msg, dict) for msg in prompt_template):
messages = []
for msg in prompt_template:
role = msg.get("role", "")
content = msg.get("content", "")
match role.lower():
case MessageRole.SYSTEM:
messages.append(SystemMessage(content=content))
case MessageRole.HUMAN | MessageRole.USER:
messages.append(HumanMessage(content=content))
case MessageRole.AI | MessageRole.ASSISTANT:
messages.append(AIMessage(content=content))
case MessageRole.PLACEHOLDER | MessageRole.MESSAGES_PLACEHOLDER:
messages.append(MessagesPlaceholder(variable_name=content))
case _:
raise ValueError(f"Unknown message role: {role}")
return ChatPromptTemplate.from_messages(messages)
else:
return cast(ChatPromptTemplate, prompt_template)
def _process_messages_from_prompt(
self,
messages: List[Any],
template_format: Literal["f-string", "mustache", "jinja2"] = "f-string",
) -> List[Any]:
MESSAGE_TYPE_MAP = {
MessageRole.SYSTEM: SystemMessagePromptTemplate,
MessageRole.HUMAN: HumanMessagePromptTemplate,
MessageRole.USER: HumanMessagePromptTemplate,
MessageRole.AI: AIMessagePromptTemplate,
MessageRole.ASSISTANT: AIMessagePromptTemplate,
}
processed_messages = []
for msg in messages:
if isinstance(msg, MessagesPlaceholder):
processed_messages.append(msg)
continue
if isinstance(msg, BaseMessage):
msg_type = MessageRole(msg.type)
if msg_type in MESSAGE_TYPE_MAP:
template_class = MESSAGE_TYPE_MAP[msg_type]
processed_messages.append(
template_class.from_template(msg.content, template_format=template_format)
)
continue
if isinstance(msg, dict) and "role" in msg and "content" in msg:
role, content = msg["role"], msg["content"]
if role in MESSAGE_TYPE_MAP:
template_class = MESSAGE_TYPE_MAP[role]
processed_messages.append(template_class.from_template(content, template_format=template_format))
continue
if role.lower() in (MessageRole.PLACEHOLDER, MessageRole.MESSAGES_PLACEHOLDER):
processed_messages.append(MessagesPlaceholder(variable_name=content))
continue
if isinstance(msg, tuple) and len(msg) == 2:
role, content = msg
if role in MESSAGE_TYPE_MAP:
template_class = MESSAGE_TYPE_MAP[role]
processed_messages.append(template_class.from_template(content, template_format=template_format))
continue
if role.lower() in (MessageRole.PLACEHOLDER, MessageRole.MESSAGES_PLACEHOLDER):
processed_messages.append(MessagesPlaceholder(variable_name=content))
continue
processed_messages.append(msg)
return processed_messages
def _process_prompt_object(
self,
prompt_obj: Any,
template_format: Literal["f-string", "mustache", "jinja2"] = "f-string",
) -> ChatPromptTemplate:
if isinstance(prompt_obj, ChatPromptTemplate):
return prompt_obj
if hasattr(prompt_obj, "messages") and isinstance(prompt_obj.messages, list):
processed_messages = self._process_messages_from_prompt(
prompt_obj.messages, template_format=template_format
)
if processed_messages:
return ChatPromptTemplate.from_messages(processed_messages)
elif isinstance(prompt_obj, list):
if all(isinstance(item, dict) and "role" in item and "content" in item for item in prompt_obj):
processed_messages = self._process_messages_from_prompt(prompt_obj, template_format=template_format)
if processed_messages:
return ChatPromptTemplate.from_messages(processed_messages)
if isinstance(prompt_obj, str):
return ChatPromptTemplate.from_template(prompt_obj, template_format=template_format)
raise ValueError(f"Could not process prompt object of type {type(prompt_obj)}")