"""Synchronous driver protocol implementation."""
import logging
from abc import abstractmethod
from time import perf_counter
from typing import TYPE_CHECKING, Any, ClassVar, Final, cast, final, overload
from mypy_extensions import mypyc_attr
from sqlspec.core import SQL, StackResult, create_arrow_result
from sqlspec.core.result import DMLResult
from sqlspec.core.stack import StackOperation, StatementStack
from sqlspec.driver._common import (
CommonDriverAttributesMixin,
DataDictionaryDialectMixin,
DataDictionaryMixin,
ExecutionResult,
StackExecutionObserver,
SyncExceptionHandler,
_raise_database_exception,
describe_stack_statement,
handle_single_row_error,
)
from sqlspec.driver._query_cache import CachedQuery
from sqlspec.driver._sql_helpers import DEFAULT_PRETTY
from sqlspec.driver._sql_helpers import convert_to_dialect as _convert_to_dialect_impl
from sqlspec.driver._storage_helpers import stringify_storage_target
from sqlspec.driver._stream import EagerSyncRowSource, SyncRowStream
from sqlspec.exceptions import ImproperConfigurationError, StackExecutionError
from sqlspec.observability import _runtime as observability_runtime
from sqlspec.storage import StorageBridgeJob, StorageDestination, StorageFormat, StorageTelemetry, SyncStoragePipeline
from sqlspec.utils.arrow_helpers import convert_dict_to_arrow_with_schema
from sqlspec.utils.logging import get_logger, log_with_context
from sqlspec.utils.schema import ValueT, to_value_type
if TYPE_CHECKING:
from collections.abc import Sequence
from sqlglot.dialects.dialect import DialectType
from sqlspec.builder import QueryBuilder
from sqlspec.core import ArrowResult, SQLResult, Statement, StatementConfig, StatementFilter
from sqlspec.data_dictionary import ColumnMetadata, ForeignKeyMetadata, IndexMetadata, TableMetadata, VersionInfo
from sqlspec.typing import ArrowReturnFormat, ArrowTable, SchemaT, StatementParameters
__all__ = ("SyncDataDictionaryBase", "SyncDriverAdapterBase", "SyncPoolConnectionContext", "SyncPoolSessionFactory")
_LOGGER_NAME: Final[str] = "sqlspec.driver"
logger = get_logger(_LOGGER_NAME)
@mypyc_attr(allow_interpreted_subclasses=True)
class SyncPoolConnectionContext:
"""Generic sync connection context using pool.get_connection() pattern.
Subclass per adapter for type-safe ``provide_connection()`` return annotations.
"""
__slots__ = ("_config", "_ctx")
def __init__(self, config: Any) -> None:
self._config = config
self._ctx: Any = None
def __enter__(self) -> Any:
pool = self._config.provide_pool()
self._ctx = pool.get_connection()
return self._ctx.__enter__()
def __exit__(
self, exc_type: "type[BaseException] | None", exc_val: "BaseException | None", exc_tb: Any
) -> "bool | None":
if self._ctx:
return cast("bool | None", self._ctx.__exit__(exc_type, exc_val, exc_tb))
return None
@mypyc_attr(allow_interpreted_subclasses=True)
class SyncPoolSessionFactory:
"""Generic sync session factory using pool.get_connection() pattern.
Subclass per adapter for type-safe ``acquire_connection()`` return annotations.
"""
__slots__ = ("_config", "_ctx")
def __init__(self, config: Any) -> None:
self._config = config
self._ctx: Any = None
def acquire_connection(self) -> Any:
pool = self._config.provide_pool()
self._ctx = pool.get_connection()
return self._ctx.__enter__()
def release_connection(self, _conn: Any, **kwargs: Any) -> None:
if self._ctx is None:
return
self._ctx.__exit__(None, None, None)
self._ctx = None
@mypyc_attr(allow_interpreted_subclasses=True)
class SyncDriverAdapterBase(CommonDriverAttributesMixin):
"""Base class for synchronous database drivers.
This class includes flattened storage and SQL translation methods that were
previously in StorageDriverMixin and SQLTranslatorMixin. The flattening
eliminates cross-trait attribute access that caused mypyc segmentation faults.
Method Organization:
1. Core dispatch methods (the execution engine)
2. Transaction management (abstract methods)
3. Public API - execution methods
4. Public API - query methods (select/fetch variants)
5. Arrow API methods
6. Stack execution
7. Storage API methods
8. Utility methods
9. Private/internal methods
"""
__slots__ = ()
dialect: "DialectType | None" = None
@property
@abstractmethod
def data_dictionary(self) -> "SyncDataDictionaryBase":
"""Get the data dictionary for this driver.
Returns:
Data dictionary instance for metadata queries
"""
def set_migration_session_schema(self, schema: str) -> None:
"""Set the default schema for migration SQL when supported.
Args:
schema: Schema requested for the current migration session.
"""
log_with_context(logger, logging.DEBUG, "migration.schema.noop", schema=schema, driver=type(self).__name__)