Source code for langgraph_agent_toolkit.core.observability.base

import asyncio
import os
from abc import ABC, abstractmethod
from contextlib import contextmanager
from functools import wraps
from typing import Any, Callable, Dict, List, Literal, Optional, TypeVar, Union, cast

from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.prompts.chat import (
    AIMessagePromptTemplate,
    BaseMessage,
    HumanMessagePromptTemplate,
    MessagesPlaceholder,
    SystemMessagePromptTemplate,
)

from langgraph_agent_toolkit.core.observability.types import ChatMessageDict, MessageRole


T = TypeVar("T")

# Type for prompt templates that can be provided to push_prompt
PromptTemplateType = Union[str, List[ChatMessageDict]]

# Type for the return value of pull_prompt
PromptReturnType = Union[ChatPromptTemplate, str, dict, None]


[docs] class BaseObservabilityPlatform(ABC): """Base class for observability platforms. This is a lightweight base class that provides common utilities for observability platforms. It does NOT perform any disk I/O operations to avoid blocking in async contexts. """
[docs] def __init__(self, remote_first: bool = False): """Initialize the observability platform. Args: remote_first: If True, prioritize remote prompts over local cache. """ self._required_vars: List[str] = [] self._remote_first = remote_first
@property def remote_first(self) -> bool: return self._remote_first @property def required_vars(self) -> List[str]: return self._required_vars @required_vars.setter def required_vars(self, value: List[str]) -> None: self._required_vars = value
[docs] def validate_environment(self) -> bool: """Validate that required environment variables are set.""" missing_vars = [var for var in self._required_vars if not os.environ.get(var)] if missing_vars: raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") return True
[docs] @staticmethod def requires_env_vars(func: Callable[..., T]) -> Callable[..., T]: """Validate environment variables before calling a method.""" @wraps(func) def wrapper(self, *args, **kwargs): self.validate_environment() return func(self, *args, **kwargs) return wrapper
[docs] @abstractmethod def get_callback_handler(self, **kwargs) -> Any: """Get the callback handler for the observability platform.""" pass
[docs] @abstractmethod def before_shutdown(self) -> None: """Perform any necessary cleanup before shutdown.""" pass
[docs] @abstractmethod def record_feedback(self, run_id: str, key: str, score: float, **kwargs) -> None: """Record feedback for a run.""" pass
[docs] @abstractmethod def push_prompt( self, name: str, prompt_template: PromptTemplateType, metadata: Optional[Dict[str, Any]] = None, force_create_new_version: bool = True, ) -> None: """Push a prompt to the observability platform.""" pass
[docs] @abstractmethod def pull_prompt( self, name: str, template_format: Literal["f-string", "mustache", "jinja2"] = "f-string", **kwargs, ) -> PromptReturnType: """Pull a prompt from the observability platform.""" pass
[docs] async def apull_prompt( self, name: str, template_format: Literal["f-string", "mustache", "jinja2"] = "f-string", **kwargs, ) -> PromptReturnType: """Async version of pull_prompt. Runs synchronous version in thread pool.""" return await asyncio.to_thread(self.pull_prompt, name, template_format, **kwargs)
[docs] @abstractmethod def delete_prompt(self, name: str) -> None: """Delete a prompt from the observability platform.""" pass
[docs] @contextmanager def trace_context(self, run_id: str, **kwargs): """Create a trace context for the execution. Override in subclasses for platform-specific implementation. Args: run_id: The run ID to use as trace ID **kwargs: Additional context parameters (user_id, input, etc.) Yields: None (or platform-specific context object) """ yield
def _convert_to_chat_prompt(self, prompt_template: PromptTemplateType) -> ChatPromptTemplate: if isinstance(prompt_template, str): return ChatPromptTemplate.from_template(prompt_template) elif isinstance(prompt_template, list) and all(isinstance(msg, dict) for msg in prompt_template): messages = [] for msg in prompt_template: role = msg.get("role", "") content = msg.get("content", "") match role.lower(): case MessageRole.SYSTEM: messages.append(SystemMessage(content=content)) case MessageRole.HUMAN | MessageRole.USER: messages.append(HumanMessage(content=content)) case MessageRole.AI | MessageRole.ASSISTANT: messages.append(AIMessage(content=content)) case MessageRole.PLACEHOLDER | MessageRole.MESSAGES_PLACEHOLDER: messages.append(MessagesPlaceholder(variable_name=content)) case _: raise ValueError(f"Unknown message role: {role}") return ChatPromptTemplate.from_messages(messages) else: return cast(ChatPromptTemplate, prompt_template) def _process_messages_from_prompt( self, messages: List[Any], template_format: Literal["f-string", "mustache", "jinja2"] = "f-string", ) -> List[Any]: MESSAGE_TYPE_MAP = { MessageRole.SYSTEM: SystemMessagePromptTemplate, MessageRole.HUMAN: HumanMessagePromptTemplate, MessageRole.USER: HumanMessagePromptTemplate, MessageRole.AI: AIMessagePromptTemplate, MessageRole.ASSISTANT: AIMessagePromptTemplate, } processed_messages = [] for msg in messages: if isinstance(msg, MessagesPlaceholder): processed_messages.append(msg) continue if isinstance(msg, BaseMessage): msg_type = MessageRole(msg.type) if msg_type in MESSAGE_TYPE_MAP: template_class = MESSAGE_TYPE_MAP[msg_type] processed_messages.append( template_class.from_template(msg.content, template_format=template_format) ) continue if isinstance(msg, dict) and "role" in msg and "content" in msg: role, content = msg["role"], msg["content"] if role in MESSAGE_TYPE_MAP: template_class = MESSAGE_TYPE_MAP[role] processed_messages.append(template_class.from_template(content, template_format=template_format)) continue if role.lower() in (MessageRole.PLACEHOLDER, MessageRole.MESSAGES_PLACEHOLDER): processed_messages.append(MessagesPlaceholder(variable_name=content)) continue if isinstance(msg, tuple) and len(msg) == 2: role, content = msg if role in MESSAGE_TYPE_MAP: template_class = MESSAGE_TYPE_MAP[role] processed_messages.append(template_class.from_template(content, template_format=template_format)) continue if role.lower() in (MessageRole.PLACEHOLDER, MessageRole.MESSAGES_PLACEHOLDER): processed_messages.append(MessagesPlaceholder(variable_name=content)) continue processed_messages.append(msg) return processed_messages def _process_prompt_object( self, prompt_obj: Any, template_format: Literal["f-string", "mustache", "jinja2"] = "f-string", ) -> ChatPromptTemplate: if isinstance(prompt_obj, ChatPromptTemplate): return prompt_obj if hasattr(prompt_obj, "messages") and isinstance(prompt_obj.messages, list): processed_messages = self._process_messages_from_prompt( prompt_obj.messages, template_format=template_format ) if processed_messages: return ChatPromptTemplate.from_messages(processed_messages) elif isinstance(prompt_obj, list): if all(isinstance(item, dict) and "role" in item and "content" in item for item in prompt_obj): processed_messages = self._process_messages_from_prompt(prompt_obj, template_format=template_format) if processed_messages: return ChatPromptTemplate.from_messages(processed_messages) if isinstance(prompt_obj, str): return ChatPromptTemplate.from_template(prompt_obj, template_format=template_format) raise ValueError(f"Could not process prompt object of type {type(prompt_obj)}")