Source code for sqlspec.observability._runtime

"""Runtime helpers that bundle lifecycle, observer, and span orchestration."""

import re
from typing import TYPE_CHECKING, Any, cast

from sqlspec.observability._common import compute_sql_hash, get_trace_context, resolve_db_system
from sqlspec.observability._config import LoggingConfig, ObservabilityConfig, StatementObserver
from sqlspec.observability._dispatcher import LifecycleDispatcher, LifecycleHook
from sqlspec.observability._observer import create_event, create_statement_observer
from sqlspec.observability._spans import SpanManager
from sqlspec.utils.correlation import CorrelationContext
from sqlspec.utils.type_guards import has_span_attribute

if TYPE_CHECKING:
    from collections.abc import Iterable

    from sqlspec.storage import StorageTelemetry

__all__ = ("ObservabilityRuntime", "compute_sql_hash")


_LITERAL_PATTERN = re.compile(r"'(?:''|[^'])*'")


class ObservabilityRuntime:
    """Aggregates dispatchers, observers, spans, and custom metrics."""

    __slots__ = (
        "_is_idle_cached",
        "_metrics",
        "_redaction",
        "_statement_observers",
        "bind_key",
        "config",
        "config_name",
        "lifecycle",
        "span_manager",
    )

    # Allow test injection with fake span managers (mypyc strict typing workaround)
    span_manager: "Any"

    def __init__(
        self, config: ObservabilityConfig | None = None, *, bind_key: str | None = None, config_name: str | None = None
    ) -> None:
        config = config.copy() if config else ObservabilityConfig()
        if config.logging is None:
            config.logging = LoggingConfig()
        self.config = config
        self.bind_key = bind_key
        self.config_name = config_name or "SQLSpecConfig"
        lifecycle_config = cast("dict[str, Iterable[LifecycleHook]] | None", config.lifecycle)
        self.lifecycle = LifecycleDispatcher(lifecycle_config)
        self.span_manager = SpanManager(config.telemetry)
        observers: list[StatementObserver] = []
        if config.statement_observers:
            observers.extend(config.statement_observers)
        if config.print_sql:
            observers.append(create_statement_observer(config.logging))
        self._statement_observers = tuple(observers)
        self._redaction = config.redaction.copy() if config.redaction else None
        self._metrics: dict[str, float] = {}
        # Pre-compute the non-span idle state (lifecycle and observers are immutable)
        # span_manager can be replaced for testing so we check it separately
        self._is_idle_cached = not self.lifecycle.is_enabled and not self._statement_observers
@property def is_idle(self) -> bool: """Return True when no observability features are active. A runtime is idle if it has no lifecycle hooks, no statement observers, and telemetry spans are disabled. Drivers can use this to skip expensive context construction. """ # Fast path: lifecycle and observers state is cached (immutable after init) # span_manager is checked each time as it can be replaced for testing return self._is_idle_cached and not self.span_manager.is_enabled @property def has_statement_observers(self) -> bool: """Return True when any observers are registered.""" return bool(self._statement_observers) @property def diagnostics_key(self) -> str: """Derive diagnostics key from bind key or configuration name.""" if self.bind_key: return self.bind_key return self.config_name def base_context(self) -> dict[str, Any]: """Return the base payload for lifecycle events.""" context = {"config": self.config_name} if self.bind_key: context["bind_key"] = self.bind_key correlation_id = CorrelationContext.get() if correlation_id: context["correlation_id"] = correlation_id return context