From 317adfa8f551f9a61146ddbfac8022da85d14b33 Mon Sep 17 00:00:00 2001 From: Jonathan Wrede Date: Thu, 7 May 2026 21:10:46 +0000 Subject: [PATCH 1/3] fix(trino): Clean up temporary entity tables after retrieval TrinoOfflineStore.get_historical_features() creates a temporary table for the entity DataFrame but never drops it, leaking tables indefinitely. Apply the same context manager pattern used by BigQuery, Redshift, and Athena offline stores: wrap the query in a generator that issues DROP TABLE IF EXISTS in a finally block. Fixes #6306 Signed-off-by: Jonathan Wrede --- .../contrib/trino_offline_store/trino.py | 59 ++++++++++++++----- 1 file changed, 45 insertions(+), 14 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py index 33190bd4635..010f0fe7ca6 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py @@ -1,6 +1,8 @@ +import contextlib +import logging import uuid from datetime import date, datetime -from typing import Any, Dict, List, Literal, Optional, Tuple, Union +from typing import Any, Callable, ContextManager, Dict, Iterator, List, Literal, Optional, Tuple, Union import numpy as np import pandas as pd @@ -37,6 +39,8 @@ from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage +logger = logging.getLogger(__name__) + class BasicAuthModel(FeastConfigBaseModel): username: StrictStr @@ -177,14 +181,23 @@ class TrinoOfflineStoreConfig(FeastConfigBaseModel): class TrinoRetrievalJob(RetrievalJob): def __init__( self, - query: str, + query: Union[str, Callable[[], ContextManager[str]]], client: Trino, config: RepoConfig, full_feature_names: bool, on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None, metadata: Optional[RetrievalMetadata] = None, ): - self._query = query + if not isinstance(query, str): + self._query_generator = query + else: + + @contextlib.contextmanager + def query_generator() -> Iterator[str]: + assert isinstance(query, str) + yield query + + self._query_generator = query_generator self._client = client self._config = config self._full_feature_names = full_feature_names @@ -201,9 +214,10 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]: def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: """Return dataset as Pandas DataFrame synchronously including on demand transforms""" - results = self._client.execute_query(query_text=self._query) - self.pyarrow_schema = results.pyarrow_schema - return results.to_dataframe() + with self._query_generator() as query: + results = self._client.execute_query(query_text=query) + self.pyarrow_schema = results.pyarrow_schema + return results.to_dataframe() def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: """Return payrrow dataset as synchronously including on demand transforms""" @@ -211,7 +225,8 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: def to_sql(self) -> str: """Returns the SQL query that will be executed in Trino to build the historical feature table""" - return self._query + with self._query_generator() as query: + return query def to_trino( self, @@ -234,8 +249,9 @@ def to_trino( destination_table = f"{self._client.catalog}.{self._config.offline_store.dataset}.historical_{today}_{rand_id}" # TODO: Implement the timeout logic - query = f"CREATE TABLE {destination_table} AS ({self._query})" - self._client.execute_query(query_text=query) + with self._query_generator() as query: + create_query = f"CREATE TABLE {destination_table} AS ({query})" + self._client.execute_query(query_text=create_query) return destination_table def persist( @@ -372,19 +388,36 @@ def get_historical_features( ) # Generate the Trino SQL query from the query context + entity_table_ref = table_reference if type(entity_df) is str: - table_reference = f"({entity_df})" + entity_table_ref = f"({entity_df})" query = offline_utils.build_point_in_time_query( query_context, - left_table_query_string=table_reference, + left_table_query_string=entity_table_ref, entity_df_event_timestamp_col=entity_df_event_timestamp_col, entity_df_columns=entity_schema.keys(), query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, full_feature_names=full_feature_names, ) + @contextlib.contextmanager + def query_generator() -> Iterator[str]: + try: + yield query + finally: + if isinstance(entity_df, pd.DataFrame): + try: + client.execute_query( + f"DROP TABLE IF EXISTS {table_reference}" + ) + except Exception: + logger.exception( + "Failed to drop temporary entity table %s", + table_reference, + ) + return TrinoRetrievalJob( - query=query, + query=query_generator, client=client, config=config, full_feature_names=full_feature_names, @@ -483,8 +516,6 @@ def _upload_entity_df_and_get_entity_schema( else: raise InvalidEntityType(type(entity_df)) - # TODO: Ensure that the table expires after some time - def _get_trino_client(config: RepoConfig) -> Trino: auth = None From f1008f23fb4eef98ccb8a892ea8cf3d0ccc19191 Mon Sep 17 00:00:00 2001 From: Jonathan Wrede Date: Thu, 14 May 2026 06:49:24 +0000 Subject: [PATCH 2/3] fix: sort imports for ruff compliance Signed-off-by: Jonathan Wrede --- .../contrib/trino_offline_store/trino.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py index 010f0fe7ca6..b24f8885a35 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py @@ -2,7 +2,18 @@ import logging import uuid from datetime import date, datetime -from typing import Any, Callable, ContextManager, Dict, Iterator, List, Literal, Optional, Tuple, Union +from typing import ( + Any, + Callable, + ContextManager, + Dict, + Iterator, + List, + Literal, + Optional, + Tuple, + Union, +) import numpy as np import pandas as pd @@ -407,9 +418,7 @@ def query_generator() -> Iterator[str]: finally: if isinstance(entity_df, pd.DataFrame): try: - client.execute_query( - f"DROP TABLE IF EXISTS {table_reference}" - ) + client.execute_query(f"DROP TABLE IF EXISTS {table_reference}") except Exception: logger.exception( "Failed to drop temporary entity table %s", From 3ddef730ec07fcd83eb0c542a73b2cd43bc28a18 Mon Sep 17 00:00:00 2001 From: Jonathan Wrede Date: Thu, 14 May 2026 06:59:10 +0000 Subject: [PATCH 3/3] fix: decouple temp table cleanup from query access Avoid dropping the temporary entity table on to_sql() calls. Previously, every method used a context manager that dropped the table on exit, so calling to_sql() before to_df() would destroy the table and cause subsequent queries to fail. Now the query is stored as a plain string and cleanup is handled by a dedicated _drop_temp_table() method called only after query execution (to_df, to_trino). A __del__ fallback ensures cleanup if execution methods are never called. The _cleaned_up flag makes the drop idempotent. Signed-off-by: Jonathan Wrede --- .../contrib/trino_offline_store/trino.py | 67 +++++++++---------- 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py index b24f8885a35..0f77d6e18fc 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py @@ -1,13 +1,9 @@ -import contextlib import logging import uuid from datetime import date, datetime from typing import ( Any, - Callable, - ContextManager, Dict, - Iterator, List, Literal, Optional, @@ -192,28 +188,22 @@ class TrinoOfflineStoreConfig(FeastConfigBaseModel): class TrinoRetrievalJob(RetrievalJob): def __init__( self, - query: Union[str, Callable[[], ContextManager[str]]], + query: str, client: Trino, config: RepoConfig, full_feature_names: bool, on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None, metadata: Optional[RetrievalMetadata] = None, + temp_table: Optional[str] = None, ): - if not isinstance(query, str): - self._query_generator = query - else: - - @contextlib.contextmanager - def query_generator() -> Iterator[str]: - assert isinstance(query, str) - yield query - - self._query_generator = query_generator + self._query = query self._client = client self._config = config self._full_feature_names = full_feature_names self._on_demand_feature_views = on_demand_feature_views or [] self._metadata = metadata + self._temp_table = temp_table + self._cleaned_up = False @property def full_feature_names(self) -> bool: @@ -223,12 +213,29 @@ def full_feature_names(self) -> bool: def on_demand_feature_views(self) -> List[OnDemandFeatureView]: return self._on_demand_feature_views + def _drop_temp_table(self) -> None: + if self._cleaned_up or not self._temp_table: + return + self._cleaned_up = True + try: + self._client.execute_query(f"DROP TABLE IF EXISTS {self._temp_table}") + except Exception: + logger.exception( + "Failed to drop temporary entity table %s", + self._temp_table, + ) + + def __del__(self) -> None: + self._drop_temp_table() + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: """Return dataset as Pandas DataFrame synchronously including on demand transforms""" - with self._query_generator() as query: - results = self._client.execute_query(query_text=query) + try: + results = self._client.execute_query(query_text=self._query) self.pyarrow_schema = results.pyarrow_schema return results.to_dataframe() + finally: + self._drop_temp_table() def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: """Return payrrow dataset as synchronously including on demand transforms""" @@ -236,8 +243,7 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: def to_sql(self) -> str: """Returns the SQL query that will be executed in Trino to build the historical feature table""" - with self._query_generator() as query: - return query + return self._query def to_trino( self, @@ -260,9 +266,11 @@ def to_trino( destination_table = f"{self._client.catalog}.{self._config.offline_store.dataset}.historical_{today}_{rand_id}" # TODO: Implement the timeout logic - with self._query_generator() as query: - create_query = f"CREATE TABLE {destination_table} AS ({query})" + try: + create_query = f"CREATE TABLE {destination_table} AS ({self._query})" self._client.execute_query(query_text=create_query) + finally: + self._drop_temp_table() return destination_table def persist( @@ -411,22 +419,9 @@ def get_historical_features( full_feature_names=full_feature_names, ) - @contextlib.contextmanager - def query_generator() -> Iterator[str]: - try: - yield query - finally: - if isinstance(entity_df, pd.DataFrame): - try: - client.execute_query(f"DROP TABLE IF EXISTS {table_reference}") - except Exception: - logger.exception( - "Failed to drop temporary entity table %s", - table_reference, - ) - return TrinoRetrievalJob( - query=query_generator, + query=query, + temp_table=table_reference if isinstance(entity_df, pd.DataFrame) else None, client=client, config=config, full_feature_names=full_feature_names,