Source code for sqlspec.driver._sync

"""Synchronous driver protocol implementation."""

import logging
from abc import abstractmethod
from time import perf_counter
from typing import TYPE_CHECKING, Any, ClassVar, Final, cast, final, overload

from mypy_extensions import mypyc_attr

from sqlspec.core import SQL, StackResult, create_arrow_result
from sqlspec.core.result import DMLResult
from sqlspec.core.stack import StackOperation, StatementStack
from sqlspec.driver._common import (
    CommonDriverAttributesMixin,
    DataDictionaryDialectMixin,
    DataDictionaryMixin,
    ExecutionResult,
    StackExecutionObserver,
    SyncExceptionHandler,
    _raise_database_exception,
    describe_stack_statement,
    handle_single_row_error,
)
from sqlspec.driver._query_cache import CachedQuery
from sqlspec.driver._sql_helpers import DEFAULT_PRETTY
from sqlspec.driver._sql_helpers import convert_to_dialect as _convert_to_dialect_impl
from sqlspec.driver._storage_helpers import stringify_storage_target
from sqlspec.driver._stream import EagerSyncRowSource, SyncRowStream
from sqlspec.exceptions import ImproperConfigurationError, StackExecutionError
from sqlspec.observability import _runtime as observability_runtime
from sqlspec.storage import StorageBridgeJob, StorageDestination, StorageFormat, StorageTelemetry, SyncStoragePipeline
from sqlspec.utils.arrow_helpers import convert_dict_to_arrow_with_schema
from sqlspec.utils.logging import get_logger, log_with_context
from sqlspec.utils.schema import ValueT, to_value_type

if TYPE_CHECKING:
    from collections.abc import Sequence

    from sqlglot.dialects.dialect import DialectType

    from sqlspec.builder import QueryBuilder
    from sqlspec.core import ArrowResult, SQLResult, Statement, StatementConfig, StatementFilter
    from sqlspec.data_dictionary import ColumnMetadata, ForeignKeyMetadata, IndexMetadata, TableMetadata, VersionInfo
    from sqlspec.typing import ArrowReturnFormat, ArrowTable, SchemaT, StatementParameters

__all__ = ("SyncDataDictionaryBase", "SyncDriverAdapterBase", "SyncPoolConnectionContext", "SyncPoolSessionFactory")

_LOGGER_NAME: Final[str] = "sqlspec.driver"
logger = get_logger(_LOGGER_NAME)


@mypyc_attr(allow_interpreted_subclasses=True)
class SyncPoolConnectionContext:
    """Generic sync connection context using pool.get_connection() pattern.

    Subclass per adapter for type-safe ``provide_connection()`` return annotations.
    """

    __slots__ = ("_config", "_ctx")

    def __init__(self, config: Any) -> None:
        self._config = config
        self._ctx: Any = None

    def __enter__(self) -> Any:
        pool = self._config.provide_pool()
        self._ctx = pool.get_connection()
        return self._ctx.__enter__()

    def __exit__(
        self, exc_type: "type[BaseException] | None", exc_val: "BaseException | None", exc_tb: Any
    ) -> "bool | None":
        if self._ctx:
            return cast("bool | None", self._ctx.__exit__(exc_type, exc_val, exc_tb))
        return None


@mypyc_attr(allow_interpreted_subclasses=True)
class SyncPoolSessionFactory:
    """Generic sync session factory using pool.get_connection() pattern.

    Subclass per adapter for type-safe ``acquire_connection()`` return annotations.
    """

    __slots__ = ("_config", "_ctx")

    def __init__(self, config: Any) -> None:
        self._config = config
        self._ctx: Any = None

    def acquire_connection(self) -> Any:
        pool = self._config.provide_pool()
        self._ctx = pool.get_connection()
        return self._ctx.__enter__()

    def release_connection(self, _conn: Any, **kwargs: Any) -> None:
        if self._ctx is None:
            return
        self._ctx.__exit__(None, None, None)
        self._ctx = None


@mypyc_attr(allow_interpreted_subclasses=True)
class SyncDriverAdapterBase(CommonDriverAttributesMixin):
    """Base class for synchronous database drivers.

    This class includes flattened storage and SQL translation methods that were
    previously in StorageDriverMixin and SQLTranslatorMixin. The flattening
    eliminates cross-trait attribute access that caused mypyc segmentation faults.

    Method Organization:
        1. Core dispatch methods (the execution engine)
        2. Transaction management (abstract methods)
        3. Public API - execution methods
        4. Public API - query methods (select/fetch variants)
        5. Arrow API methods
        6. Stack execution
        7. Storage API methods
        8. Utility methods
        9. Private/internal methods
    """

    __slots__ = ()

    dialect: "DialectType | None" = None

    @property
    @abstractmethod
    def data_dictionary(self) -> "SyncDataDictionaryBase":
        """Get the data dictionary for this driver.

        Returns:
            Data dictionary instance for metadata queries
        """

    def set_migration_session_schema(self, schema: str) -> None:
        """Set the default schema for migration SQL when supported.

        Args:
            schema: Schema requested for the current migration session.
        """
        log_with_context(logger, logging.DEBUG, "migration.schema.noop", schema=schema, driver=type(self).__name__)
def set_migration_non_transactional_schema(self, schema: str) -> None: """Set the default schema for non-transactional migration SQL when supported. Args: schema: Schema requested for the current migration session. """ self.set_migration_session_schema(schema)