Source code for langgraph_agent_toolkit.core.observability.base
import os
from abc import ABC, abstractmethod
from contextlib import contextmanager
from functools import wraps
from typing import Any, Callable, Dict, List, Literal, Optional, TypeVar, cast
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.prompts.chat import (
AIMessagePromptTemplate,
BaseMessage,
HumanMessagePromptTemplate,
MessagesPlaceholder,
SystemMessagePromptTemplate,
)
from langgraph_agent_toolkit.core.observability.types import MessageRole, PromptReturnType, PromptTemplateType
T = TypeVar("T")
[docs]
class BaseObservabilityPlatform(ABC):
"""Base class for observability platforms.
This is a lightweight base class that provides common utilities for
observability platforms. It does NOT perform any disk I/O operations
to avoid blocking in async contexts.
"""
[docs]
def __init__(self, remote_first: bool = False):
"""Initialize the observability platform.
Args:
remote_first: If True, prioritize remote prompts over local cache.
"""
self._required_vars: List[str] = []
self._remote_first = remote_first
@property
def remote_first(self) -> bool:
return self._remote_first
@property
def required_vars(self) -> List[str]:
return self._required_vars
@required_vars.setter
def required_vars(self, value: List[str]) -> None:
self._required_vars = value
[docs]
def validate_environment(self) -> bool:
"""Validate that required environment variables are set."""
missing_vars = [var for var in self._required_vars if not os.environ.get(var)]
if missing_vars:
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
return True
[docs]
@staticmethod
def requires_env_vars(func: Callable[..., T]) -> Callable[..., T]:
"""Validate environment variables before calling a method."""
@wraps(func)
def wrapper(self, *args, **kwargs):
self.validate_environment()
return func(self, *args, **kwargs)
return wrapper
[docs]
@abstractmethod
def get_callback_handler(self, **kwargs) -> Any:
"""Get the callback handler for the observability platform."""
pass
[docs]
@abstractmethod
def before_shutdown(self) -> None:
"""Perform any necessary cleanup before shutdown."""
pass
[docs]
@abstractmethod
def record_feedback(self, run_id: str, key: str, score: float, **kwargs) -> None:
"""Record feedback for a run."""
pass
[docs]
@abstractmethod
def push_prompt(
self,
name: str,
prompt_template: PromptTemplateType,
metadata: Optional[Dict[str, Any]] = None,
force_create_new_version: bool = True,
) -> None:
"""Push a prompt to the observability platform."""
pass
[docs]
@abstractmethod
def pull_prompt(
self,
name: str,
template_format: Literal["f-string", "mustache", "jinja2"] = "f-string",
**kwargs,
) -> PromptReturnType:
"""Pull a prompt from the observability platform."""
pass
[docs]
@abstractmethod
def delete_prompt(self, name: str) -> None:
"""Delete a prompt from the observability platform."""
pass
[docs]
@contextmanager
def trace_context(self, run_id: str, **kwargs):
"""Create a trace context for the execution.
Override in subclasses for platform-specific implementation.
Args:
run_id: The run ID to use as trace ID
**kwargs: Additional context parameters (user_id, input, etc.)
Yields:
None (or platform-specific context object)
"""
yield
def _convert_to_chat_prompt(self, prompt_template: PromptTemplateType) -> ChatPromptTemplate:
if isinstance(prompt_template, str):
return ChatPromptTemplate.from_template(prompt_template)
elif isinstance(prompt_template, list) and all(isinstance(msg, dict) for msg in prompt_template):
messages = []
for msg in prompt_template:
role = msg.get("role", "")
content = msg.get("content", "")
match role.lower():
case MessageRole.SYSTEM:
messages.append(SystemMessage(content=content))
case MessageRole.HUMAN | MessageRole.USER:
messages.append(HumanMessage(content=content))
case MessageRole.AI | MessageRole.ASSISTANT:
messages.append(AIMessage(content=content))
case MessageRole.PLACEHOLDER | MessageRole.MESSAGES_PLACEHOLDER:
messages.append(MessagesPlaceholder(variable_name=content))
case _:
raise ValueError(f"Unknown message role: {role}")
return ChatPromptTemplate.from_messages(messages)
else:
return cast(ChatPromptTemplate, prompt_template)
def _process_messages_from_prompt(
self,
messages: List[Any],
template_format: Literal["f-string", "mustache", "jinja2"] = "f-string",
) -> List[Any]:
MESSAGE_TYPE_MAP = {
MessageRole.SYSTEM: SystemMessagePromptTemplate,
MessageRole.HUMAN: HumanMessagePromptTemplate,
MessageRole.USER: HumanMessagePromptTemplate,
MessageRole.AI: AIMessagePromptTemplate,
MessageRole.ASSISTANT: AIMessagePromptTemplate,
}
processed_messages = []
for msg in messages:
if isinstance(msg, MessagesPlaceholder):
processed_messages.append(msg)
continue
if isinstance(msg, BaseMessage):
msg_type = MessageRole(msg.type)
if msg_type in MESSAGE_TYPE_MAP:
template_class = MESSAGE_TYPE_MAP[msg_type]
processed_messages.append(
template_class.from_template(msg.content, template_format=template_format)
)
continue
if isinstance(msg, dict) and "role" in msg and "content" in msg:
role, content = msg["role"], msg["content"]
if role in MESSAGE_TYPE_MAP:
template_class = MESSAGE_TYPE_MAP[role]
processed_messages.append(template_class.from_template(content, template_format=template_format))
continue
if role.lower() in (MessageRole.PLACEHOLDER, MessageRole.MESSAGES_PLACEHOLDER):
processed_messages.append(MessagesPlaceholder(variable_name=content))
continue
if isinstance(msg, tuple) and len(msg) == 2:
role, content = msg
if role in MESSAGE_TYPE_MAP:
template_class = MESSAGE_TYPE_MAP[role]
processed_messages.append(template_class.from_template(content, template_format=template_format))
continue
if role.lower() in (MessageRole.PLACEHOLDER, MessageRole.MESSAGES_PLACEHOLDER):
processed_messages.append(MessagesPlaceholder(variable_name=content))
continue
processed_messages.append(msg)
return processed_messages
def _process_prompt_object(
self,
prompt_obj: Any,
template_format: Literal["f-string", "mustache", "jinja2"] = "f-string",
) -> ChatPromptTemplate:
if isinstance(prompt_obj, ChatPromptTemplate):
return prompt_obj
if hasattr(prompt_obj, "messages") and isinstance(prompt_obj.messages, list):
processed_messages = self._process_messages_from_prompt(
prompt_obj.messages, template_format=template_format
)
if processed_messages:
return ChatPromptTemplate.from_messages(processed_messages)
elif isinstance(prompt_obj, list):
if all(isinstance(item, dict) and "role" in item and "content" in item for item in prompt_obj):
processed_messages = self._process_messages_from_prompt(prompt_obj, template_format=template_format)
if processed_messages:
return ChatPromptTemplate.from_messages(processed_messages)
if isinstance(prompt_obj, str):
return ChatPromptTemplate.from_template(prompt_obj, template_format=template_format)
raise ValueError(f"Could not process prompt object of type {type(prompt_obj)}")