From a8aea3f288f5f5d236a7f5fb23ba775ab70d8adb Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Sun, 15 Mar 2026 01:56:29 +0530 Subject: [PATCH 01/12] support historical retrieval without entity_df Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../feast/infra/offline_stores/duckdb.py | 91 ++++++++++++++++++- 1 file changed, 89 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/duckdb.py b/sdk/python/feast/infra/offline_stores/duckdb.py index e0a69e53c56..7e1b7069b22 100644 --- a/sdk/python/feast/infra/offline_stores/duckdb.py +++ b/sdk/python/feast/infra/offline_stores/duckdb.py @@ -1,5 +1,5 @@ import os -from datetime import datetime +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Callable, List, Optional, Union @@ -25,6 +25,7 @@ from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob from feast.infra.registry.base_registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.utils import make_tzaware def _read_data_source(data_source: DataSource, repo_path: str) -> Table: @@ -113,6 +114,61 @@ def _write_data_source( ) +DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp" + + +def _build_entity_df_from_sources( + config: RepoConfig, + feature_views: List[FeatureView], + start_date: datetime, + end_date: datetime, + data_source_reader: Callable[[DataSource, str], Table], +) -> pd.DataFrame: + """Derive an entity DataFrame from feature view sources for non-entity retrieval. + + Reads distinct join keys from each feature view source within the time window, + unions them, and adds event_timestamp = end_date for point-in-time joins. + """ + entity_dfs: List[pd.DataFrame] = [] + + for fv in feature_views: + fv_table = data_source_reader(fv.batch_source, str(config.repo_path)) + + for old_name, new_name in fv.batch_source.field_mapping.items(): + if old_name in fv_table.columns: + fv_table = fv_table.rename({new_name: old_name}) + + timestamp_field = fv.batch_source.timestamp_field + + fv_table = fv_table.filter( + ibis.and_( + fv_table[timestamp_field] >= ibis.literal(start_date), + fv_table[timestamp_field] <= ibis.literal(end_date), + ) + ) + + join_key_map = fv.projection.join_key_map or { + e.name: e.name for e in fv.entity_columns + } + join_key_cols = list(join_key_map.values()) + + if join_key_cols: + distinct_entities = fv_table.select(*join_key_cols).distinct().execute() + entity_dfs.append(distinct_entities) + + if not entity_dfs: + return pd.DataFrame({DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL: [end_date]}) + + combined = pd.concat(entity_dfs, ignore_index=True) + + all_cols = list(combined.columns) + combined = combined.drop_duplicates(subset=all_cols).reset_index(drop=True) + + combined[DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL] = end_date + + return combined + + class DuckDBOfflineStoreConfig(FeastConfigBaseModel): type: StrictStr = "duckdb" # """ Offline store type selector""" @@ -154,11 +210,42 @@ def get_historical_features( config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], - entity_df: Union[pd.DataFrame, str], + entity_df: Optional[Union[pd.DataFrame, str]], registry: BaseRegistry, project: str, full_feature_names: bool = False, + **kwargs, ) -> RetrievalJob: + start_date: Optional[datetime] = kwargs.get("start_date", None) + end_date: Optional[datetime] = kwargs.get("end_date", None) + non_entity_mode = entity_df is None + + if non_entity_mode: + end_date = ( + make_tzaware(end_date) if end_date else datetime.now(timezone.utc) + ) + + if start_date is None: + max_ttl_seconds = 0 + for fv in feature_views: + if fv.ttl and isinstance(fv.ttl, timedelta): + max_ttl_seconds = max( + max_ttl_seconds, int(fv.ttl.total_seconds()) + ) + if max_ttl_seconds > 0: + start_date = end_date - timedelta(seconds=max_ttl_seconds) + else: + start_date = end_date - timedelta(days=30) + start_date = make_tzaware(start_date) + + entity_df = _build_entity_df_from_sources( + config=config, + feature_views=feature_views, + start_date=start_date, + end_date=end_date, + data_source_reader=_read_data_source, + ) + return get_historical_features_ibis( config=config, feature_views=feature_views, From b2bec2e110947114abd7b7a26259d83cad765bcc Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Sun, 15 Mar 2026 23:38:55 +0530 Subject: [PATCH 02/12] test cases Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../offline_stores/test_duckdb_non_entity.py | 496 ++++++++++++++++++ 1 file changed, 496 insertions(+) create mode 100644 sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py diff --git a/sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py b/sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py new file mode 100644 index 00000000000..ac3d89ab346 --- /dev/null +++ b/sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py @@ -0,0 +1,496 @@ +from datetime import datetime, timedelta, timezone +from unittest.mock import MagicMock + +import ibis +import pandas as pd + +from feast.entity import Entity +from feast.feature_view import FeatureView, Field +from feast.infra.offline_stores import duckdb as duckdb_mod +from feast.infra.offline_stores.duckdb import ( + DuckDBOfflineStore, + DuckDBOfflineStoreConfig, +) +from feast.infra.offline_stores.file_source import FileSource +from feast.infra.offline_stores.offline_store import RetrievalJob +from feast.repo_config import RepoConfig +from feast.types import Float32, Int64, ValueType + + +def _mock_duckdb_offline_store_config(): + return DuckDBOfflineStoreConfig(type="duckdb") + + +def _mock_entity(): + return [ + Entity( + name="driver_id", + join_keys=["driver_id"], + description="Driver ID", + value_type=ValueType.INT64, + ) + ] + + +def _mock_feature_view(name: str = "driver_stats", ttl: timedelta = None): + return FeatureView( + name=name, + entities=_mock_entity(), + schema=[ + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource( + path="dummy.parquet", + timestamp_field="event_timestamp", + ), + ttl=ttl, + ) + + +def _mock_data_source_reader(src_df): + """Return a data_source_reader that wraps a pandas DataFrame as an ibis memtable.""" + + def reader(data_source, repo_path): + return ibis.memtable(src_df) + + return reader + + +def test_duckdb_non_entity_historical_retrieval_accepts_dates(monkeypatch): + src = pd.DataFrame( + { + "driver_id": [1], + "event_timestamp": pd.to_datetime(["2023-01-01T12:00:00Z"]), + "conv_rate": [0.5], + } + ) + monkeypatch.setattr(duckdb_mod, "_read_data_source", _mock_data_source_reader(src)) + + repo_config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + fv = _mock_feature_view() + + retrieval_job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["driver_stats:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="test_project", + full_feature_names=False, + start_date=datetime(2023, 1, 1, tzinfo=timezone.utc), + end_date=datetime(2023, 1, 2, tzinfo=timezone.utc), + ) + + assert isinstance(retrieval_job, RetrievalJob) + + +class TestNonEntityRetrieval: + """Test suite for non-entity retrieval functionality (entity_df=None) in DuckDB offline store.""" + + def test_duckdb_non_entity_snapshot_ttl_and_dedup(self, monkeypatch): + src = pd.DataFrame( + { + "driver_id": [1, 1, 1, 2, 2], + "event_timestamp": pd.to_datetime( + [ + "2025-01-01T10:00:00Z", + "2025-01-01T10:00:00Z", + "2024-12-30T10:00:00Z", + "2025-01-01T12:00:00Z", + "2025-01-02T11:00:00Z", + ] + ), + "created_ts": pd.to_datetime( + [ + "2025-01-01T10:00:01Z", + "2025-01-01T10:00:02Z", + "2024-12-30T10:00:00Z", + "2025-01-01T12:00:00Z", + "2025-01-02T11:00:00Z", + ] + ), + "conv_rate": [0.1, 0.2, 0.05, 0.3, 0.4], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + fv = FeatureView( + name="driver_stats", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource( + path="unused", + timestamp_field="event_timestamp", + created_timestamp_column="created_ts", + ), + ttl=timedelta(days=1), + ) + + repo_config = RepoConfig( + project="proj", + registry="unused", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + end = datetime(2025, 1, 2, 10, 0, tzinfo=timezone.utc) + start = end - timedelta(days=7) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["driver_stats:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="proj", + full_feature_names=False, + start_date=start, + end_date=end, + ) + + df = job.to_df() + + assert set(df["driver_id"]) == {1, 2} + out = df.set_index("driver_id")["conv_rate"].to_dict() + assert out[1] == 0.2 + assert out[2] == 0.3 + + def test_non_entity_mode_with_both_dates_retrieves_data(self, monkeypatch): + src = pd.DataFrame( + { + "driver_id": [1, 1, 1, 2], + "event_timestamp": pd.to_datetime( + [ + "2023-01-01T08:00:00Z", + "2023-01-03T10:00:00Z", + "2023-01-05T12:00:00Z", + "2023-01-08T14:00:00Z", + ] + ), + "conv_rate": [0.1, 0.2, 0.3, 0.4], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + repo_config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + fv = FeatureView( + name="test_fv", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource(path="unused", timestamp_field="event_timestamp"), + ttl=None, + ) + + start_date = datetime(2023, 1, 2, tzinfo=timezone.utc) + end_date = datetime(2023, 1, 7, tzinfo=timezone.utc) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["test_fv:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="test_project", + start_date=start_date, + end_date=end_date, + ) + + df = job.to_df() + + assert job.metadata.min_event_timestamp == end_date + assert job.metadata.max_event_timestamp == end_date + + assert len(df) >= 1 + assert set(df["driver_id"]) == {1} + driver1_data = df[df["driver_id"] == 1] + assert 0.3 in driver1_data["conv_rate"].values + + def test_non_entity_mode_with_end_date_only_calculates_start_from_ttl( + self, monkeypatch + ): + src = pd.DataFrame( + { + "driver_id": [1, 1, 1], + "event_timestamp": pd.to_datetime( + [ + "2023-01-05T08:00:00Z", + "2023-01-06T10:00:00Z", + "2023-01-07T12:00:00Z", + ] + ), + "conv_rate": [0.1, 0.2, 0.3], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + repo_config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + fv = FeatureView( + name="test_fv", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource(path="unused", timestamp_field="event_timestamp"), + ttl=timedelta(days=1), + ) + + end_date = datetime(2023, 1, 7, 12, 0, tzinfo=timezone.utc) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["test_fv:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="test_project", + end_date=end_date, + ) + + df = job.to_df() + + driver1_data = df[df["driver_id"] == 1] + assert len(driver1_data) >= 1 + assert 0.3 in driver1_data["conv_rate"].values + + def test_ttl_filtering_excludes_old_rows(self, monkeypatch): + src = pd.DataFrame( + { + "driver_id": [1, 1, 1], + "event_timestamp": pd.to_datetime( + [ + "2025-01-01T08:00:00Z", + "2025-01-01T09:30:00Z", + "2025-01-01T10:00:00Z", + ] + ), + "conv_rate": [0.1, 0.2, 0.3], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + fv = FeatureView( + name="driver_stats", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource(path="unused", timestamp_field="event_timestamp"), + ttl=timedelta(hours=1), + ) + + repo_config = RepoConfig( + project="proj", + registry="unused", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + end = datetime(2025, 1, 1, 10, 0, tzinfo=timezone.utc) + start = end - timedelta(days=1) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["driver_stats:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="proj", + full_feature_names=False, + start_date=start, + end_date=end, + ) + + df = job.to_df() + + driver1_data = df[df["driver_id"] == 1] + assert len(driver1_data) >= 1 + assert ( + 0.2 in driver1_data["conv_rate"].values + or 0.3 in driver1_data["conv_rate"].values + ) + assert 0.1 not in driver1_data["conv_rate"].values + + def test_multiple_feature_views_with_different_ttls(self, monkeypatch): + src1 = pd.DataFrame( + { + "driver_id": [1, 1], + "event_timestamp": pd.to_datetime( + [ + "2025-01-01T08:00:00Z", + "2025-01-01T09:30:00Z", + ] + ), + "age": [25, 26], + } + ) + + src2 = pd.DataFrame( + { + "driver_id": [1, 1], + "event_timestamp": pd.to_datetime( + [ + "2025-01-01T08:00:00Z", + "2025-01-01T09:30:00Z", + ] + ), + "total_trips": [100, 101], + } + ) + + def mock_read_data_source(data_source, repo_path): + if data_source.path == "unused1": + return ibis.memtable(src1) + return ibis.memtable(src2) + + monkeypatch.setattr(duckdb_mod, "_read_data_source", mock_read_data_source) + + fv1 = FeatureView( + name="user_fv", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="age", dtype=Float32), + ], + source=FileSource(path="unused1", timestamp_field="event_timestamp"), + ttl=timedelta(hours=1), + ) + + fv2 = FeatureView( + name="transaction_fv", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="total_trips", dtype=Float32), + ], + source=FileSource(path="unused2", timestamp_field="event_timestamp"), + ttl=timedelta(days=1), + ) + + repo_config = RepoConfig( + project="proj", + registry="unused", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + end = datetime(2025, 1, 1, 10, 0, tzinfo=timezone.utc) + start = end - timedelta(days=1) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv1, fv2], + feature_refs=["user_fv:age", "transaction_fv:total_trips"], + entity_df=None, + registry=MagicMock(), + project="proj", + full_feature_names=False, + start_date=start, + end_date=end, + ) + + df = job.to_df() + + assert "age" in df.columns or "user_fv__age" in df.columns + assert ( + "total_trips" in df.columns or "transaction_fv__total_trips" in df.columns + ) + + if "age" in df.columns: + age_values = df["age"].values + assert 26 in age_values + assert 25 not in age_values + + if "total_trips" in df.columns: + trips_values = df["total_trips"].values + assert 101 in trips_values + + def test_entity_df_still_works(self, monkeypatch): + """Verify standard entity_df path is not broken by the changes.""" + src = pd.DataFrame( + { + "driver_id": [1, 2], + "event_timestamp": pd.to_datetime( + ["2025-01-01T10:00:00Z", "2025-01-01T11:00:00Z"] + ), + "conv_rate": [0.5, 0.6], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + repo_config = RepoConfig( + project="proj", + registry="unused", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + fv = FeatureView( + name="driver_stats", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource(path="unused", timestamp_field="event_timestamp"), + ttl=timedelta(days=1), + ) + + entity_df = pd.DataFrame( + { + "driver_id": [1, 2], + "event_timestamp": pd.to_datetime( + ["2025-01-01T12:00:00Z", "2025-01-01T12:00:00Z"] + ), + } + ) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["driver_stats:conv_rate"], + entity_df=entity_df, + registry=MagicMock(), + project="proj", + full_feature_names=False, + ) + + df = job.to_df() + + assert set(df["driver_id"]) == {1, 2} + assert len(df) == 2 From 14d79e8fdde79f0ba2264c1965478571d7f50769 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Sun, 15 Mar 2026 23:45:51 +0530 Subject: [PATCH 03/12] fix-lint Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- sdk/python/feast/infra/offline_stores/duckdb.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/duckdb.py b/sdk/python/feast/infra/offline_stores/duckdb.py index 7e1b7069b22..df226601f4e 100644 --- a/sdk/python/feast/infra/offline_stores/duckdb.py +++ b/sdk/python/feast/infra/offline_stores/duckdb.py @@ -124,11 +124,7 @@ def _build_entity_df_from_sources( end_date: datetime, data_source_reader: Callable[[DataSource, str], Table], ) -> pd.DataFrame: - """Derive an entity DataFrame from feature view sources for non-entity retrieval. - Reads distinct join keys from each feature view source within the time window, - unions them, and adds event_timestamp = end_date for point-in-time joins. - """ entity_dfs: List[pd.DataFrame] = [] for fv in feature_views: @@ -157,7 +153,9 @@ def _build_entity_df_from_sources( entity_dfs.append(distinct_entities) if not entity_dfs: - return pd.DataFrame({DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL: [end_date]}) + return pd.DataFrame( + {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL: [end_date]} + ) combined = pd.concat(entity_dfs, ignore_index=True) From 10b7f703269e823a3ae1c4df04e16eb74de5b267 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Mon, 16 Mar 2026 18:49:28 +0530 Subject: [PATCH 04/12] fix issue Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- sdk/python/feast/infra/offline_stores/duckdb.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/duckdb.py b/sdk/python/feast/infra/offline_stores/duckdb.py index df226601f4e..0c1c372cec3 100644 --- a/sdk/python/feast/infra/offline_stores/duckdb.py +++ b/sdk/python/feast/infra/offline_stores/duckdb.py @@ -146,10 +146,15 @@ def _build_entity_df_from_sources( join_key_map = fv.projection.join_key_map or { e.name: e.name for e in fv.entity_columns } - join_key_cols = list(join_key_map.values()) - - if join_key_cols: - distinct_entities = fv_table.select(*join_key_cols).distinct().execute() + source_join_key_cols = list(join_key_map.keys()) + + if source_join_key_cols: + distinct_entities = ( + fv_table.select(*source_join_key_cols) + .rename(join_key_map) + .distinct() + .execute() + ) entity_dfs.append(distinct_entities) if not entity_dfs: From 2d8252b1c56c54452bc41402ad2fa06b54616eda Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Mon, 16 Mar 2026 19:07:13 +0530 Subject: [PATCH 05/12] fix rename Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- sdk/python/feast/infra/offline_stores/duckdb.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/duckdb.py b/sdk/python/feast/infra/offline_stores/duckdb.py index 0c1c372cec3..91eba1ab580 100644 --- a/sdk/python/feast/infra/offline_stores/duckdb.py +++ b/sdk/python/feast/infra/offline_stores/duckdb.py @@ -149,9 +149,11 @@ def _build_entity_df_from_sources( source_join_key_cols = list(join_key_map.keys()) if source_join_key_cols: + # join_key_map is {feature_key: entity_key}; ibis rename({new: old}) renames + # old->new, so pass inverted map to rename feature columns to entity names. distinct_entities = ( fv_table.select(*source_join_key_cols) - .rename(join_key_map) + .rename({v: k for k, v in join_key_map.items()}) .distinct() .execute() ) From c2a6449f79052289ce647a22598448822911b00a Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Tue, 24 Mar 2026 12:02:49 +0530 Subject: [PATCH 06/12] addressed reviews Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .../feast/infra/offline_stores/duckdb.py | 20 +++---- .../offline_store/test_non_entity_mode.py | 3 +- .../test_universal_historical_retrieval.py | 2 +- .../offline_stores/test_duckdb_non_entity.py | 59 ++++++++++++++++++- .../feature_repos/repo_configuration.py | 6 ++ 5 files changed, 76 insertions(+), 14 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/duckdb.py b/sdk/python/feast/infra/offline_stores/duckdb.py index 91eba1ab580..d5b5da185f1 100644 --- a/sdk/python/feast/infra/offline_stores/duckdb.py +++ b/sdk/python/feast/infra/offline_stores/duckdb.py @@ -23,9 +23,10 @@ write_logged_features_ibis, ) from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob +from feast.infra.offline_stores.offline_utils import DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL from feast.infra.registry.base_registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel, RepoConfig -from feast.utils import make_tzaware +from feast.utils import make_tzaware, to_naive_utc def _read_data_source(data_source: DataSource, repo_path: str) -> Table: @@ -114,9 +115,6 @@ def _write_data_source( ) -DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp" - - def _build_entity_df_from_sources( config: RepoConfig, feature_views: List[FeatureView], @@ -124,7 +122,7 @@ def _build_entity_df_from_sources( end_date: datetime, data_source_reader: Callable[[DataSource, str], Table], ) -> pd.DataFrame: - + entity_dfs: List[pd.DataFrame] = [] for fv in feature_views: @@ -136,10 +134,13 @@ def _build_entity_df_from_sources( timestamp_field = fv.batch_source.timestamp_field + start_naive = to_naive_utc(start_date) + end_naive = to_naive_utc(end_date) + fv_table = fv_table.filter( ibis.and_( - fv_table[timestamp_field] >= ibis.literal(start_date), - fv_table[timestamp_field] <= ibis.literal(end_date), + fv_table[timestamp_field] >= ibis.literal(start_naive), + fv_table[timestamp_field] <= ibis.literal(end_naive), ) ) @@ -219,10 +220,9 @@ def get_historical_features( registry: BaseRegistry, project: str, full_feature_names: bool = False, - **kwargs, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: - start_date: Optional[datetime] = kwargs.get("start_date", None) - end_date: Optional[datetime] = kwargs.get("end_date", None) non_entity_mode = entity_df is None if non_entity_mode: diff --git a/sdk/python/tests/integration/offline_store/test_non_entity_mode.py b/sdk/python/tests/integration/offline_store/test_non_entity_mode.py index f17352fb356..9879f331a09 100644 --- a/sdk/python/tests/integration/offline_store/test_non_entity_mode.py +++ b/sdk/python/tests/integration/offline_store/test_non_entity_mode.py @@ -11,8 +11,7 @@ @pytest.mark.integration -@pytest.mark.universal_offline_stores -@pytest.mark.ray_offline_stores_only +@pytest.mark.universal_offline_stores(only=["file", "duckdb"]) def test_non_entity_mode_basic(environment, universal_data_sources): """Test historical features retrieval without entity_df (non-entity mode). diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index af0de479f3e..92d89a74f3a 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -731,7 +731,7 @@ def test_historical_features_field_mapping( @pytest.mark.integration -@pytest.mark.universal_offline_stores(only=["file"]) +@pytest.mark.universal_offline_stores(only=["file", "duckdb"]) def test_historical_features_non_entity_retrieval(environment): """Test get_historical_features with entity_df=None using start_date/end_date. diff --git a/sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py b/sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py index ac3d89ab346..575b0243891 100644 --- a/sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py +++ b/sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta, timezone -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import ibis import pandas as pd @@ -283,6 +283,63 @@ def test_non_entity_mode_with_end_date_only_calculates_start_from_ttl( assert len(driver1_data) >= 1 assert 0.3 in driver1_data["conv_rate"].values + @patch("feast.infra.offline_stores.duckdb.datetime") + def test_no_dates_provided_defaults_to_current_time_and_filters_data( + self, mock_datetime, monkeypatch + ): + fixed_now = datetime(2023, 1, 7, 12, 0, 0, tzinfo=timezone.utc) + mock_datetime.now.return_value = fixed_now + + src = pd.DataFrame( + { + "driver_id": [1, 1, 1], + "event_timestamp": pd.to_datetime( + [ + "2023-01-05T12:00:00Z", + "2023-01-06T18:00:00Z", + "2023-01-07T11:00:00Z", + ] + ), + "conv_rate": [0.1, 0.2, 0.3], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + repo_config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + fv = FeatureView( + name="test_fv", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource(path="unused", timestamp_field="event_timestamp"), + ttl=timedelta(days=1), + ) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["test_fv:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="test_project", + ) + + df = job.to_df() + + driver1_data = df[df["driver_id"] == 1] + assert len(driver1_data) >= 1 + assert 0.3 in driver1_data["conv_rate"].values + def test_ttl_filtering_excludes_old_rows(self, monkeypatch): src = pd.DataFrame( { diff --git a/sdk/python/tests/universal/feature_repos/repo_configuration.py b/sdk/python/tests/universal/feature_repos/repo_configuration.py index ddd952f71dc..454ca12d972 100644 --- a/sdk/python/tests/universal/feature_repos/repo_configuration.py +++ b/sdk/python/tests/universal/feature_repos/repo_configuration.py @@ -91,6 +91,12 @@ OFFLINE_STORE_TO_PROVIDER_CONFIG: Dict[str, Tuple[str, Type[DataSourceCreator]]] = { "file": ("local", FileDataSourceCreator), + "duckdb": ( + "local", + importlib.import_module( + "tests.universal.feature_repos.duckdb_repo_configuration" + ).DuckDBDataSourceCreator, + ), } AVAILABLE_OFFLINE_STORES: List[Tuple[str, Type[DataSourceCreator]]] = [ From 04d72fd36ed167c69edfff6b9e36f237d98d1df3 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Fri, 10 Apr 2026 16:24:45 +0530 Subject: [PATCH 07/12] fix-lint Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- sdk/python/feast/infra/offline_stores/duckdb.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/duckdb.py b/sdk/python/feast/infra/offline_stores/duckdb.py index d5b5da185f1..712c748af44 100644 --- a/sdk/python/feast/infra/offline_stores/duckdb.py +++ b/sdk/python/feast/infra/offline_stores/duckdb.py @@ -23,7 +23,9 @@ write_logged_features_ibis, ) from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob -from feast.infra.offline_stores.offline_utils import DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL +from feast.infra.offline_stores.offline_utils import ( + DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, +) from feast.infra.registry.base_registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.utils import make_tzaware, to_naive_utc @@ -122,7 +124,6 @@ def _build_entity_df_from_sources( end_date: datetime, data_source_reader: Callable[[DataSource, str], Table], ) -> pd.DataFrame: - entity_dfs: List[pd.DataFrame] = [] for fv in feature_views: From ecb84b851ff195fc1ca4d6b048ec4c8db4f3cd59 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Mon, 13 Apr 2026 19:16:07 +0530 Subject: [PATCH 08/12] fix-lint Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- sdk/python/feast/infra/offline_stores/duckdb.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/duckdb.py b/sdk/python/feast/infra/offline_stores/duckdb.py index 712c748af44..68ec5041ca0 100644 --- a/sdk/python/feast/infra/offline_stores/duckdb.py +++ b/sdk/python/feast/infra/offline_stores/duckdb.py @@ -162,9 +162,7 @@ def _build_entity_df_from_sources( entity_dfs.append(distinct_entities) if not entity_dfs: - return pd.DataFrame( - {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL: [end_date]} - ) + return pd.DataFrame({DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL: [end_date]}) combined = pd.concat(entity_dfs, ignore_index=True) From 81df5a24e4e2d2c964b3d0fc4b20cd27ac373471 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Thu, 16 Apr 2026 19:52:17 +0530 Subject: [PATCH 09/12] address-reviews Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .secrets.baseline | 20 ++++++++++-- .../feast/infra/offline_stores/duckdb.py | 31 +++++++++---------- .../offline_stores/test_duckdb_non_entity.py | 3 +- .../feature_repos/repo_configuration.py | 15 +++++---- 4 files changed, 42 insertions(+), 27 deletions(-) diff --git a/.secrets.baseline b/.secrets.baseline index 96bf780809c..bc9743313f5 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -1312,6 +1312,13 @@ } ], "sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py": [ + { + "type": "AWS Access Key", + "filename": "sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py", + "hashed_secret": "25910f981e85ca04baf359199dd0bd4a3ae738b6", + "is_verified": false, + "line_number": 34 + }, { "type": "AWS Access Key", "filename": "sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py", @@ -1460,17 +1467,24 @@ "filename": "sdk/python/tests/universal/feature_repos/repo_configuration.py", "hashed_secret": "d90e76ef629fb00c95f4e84fec29fbda111e2392", "is_verified": false, - "line_number": 459 + "line_number": 468 }, { "type": "Secret Keyword", "filename": "sdk/python/tests/universal/feature_repos/repo_configuration.py", "hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8", "is_verified": false, - "line_number": 461 + "line_number": 470 } ], "sdk/python/tests/universal/feature_repos/universal/data_sources/file.py": [ + { + "type": "AWS Access Key", + "filename": "sdk/python/tests/universal/feature_repos/universal/data_sources/file.py", + "hashed_secret": "25910f981e85ca04baf359199dd0bd4a3ae738b6", + "is_verified": false, + "line_number": 256 + }, { "type": "Base64 High Entropy String", "filename": "sdk/python/tests/universal/feature_repos/universal/data_sources/file.py", @@ -1539,5 +1553,5 @@ } ] }, - "generated_at": "2026-04-07T15:56:56Z" + "generated_at": "2026-04-16T14:18:36Z" } diff --git a/sdk/python/feast/infra/offline_stores/duckdb.py b/sdk/python/feast/infra/offline_stores/duckdb.py index 68ec5041ca0..d090093d0ce 100644 --- a/sdk/python/feast/infra/offline_stores/duckdb.py +++ b/sdk/python/feast/infra/offline_stores/duckdb.py @@ -1,5 +1,5 @@ import os -from datetime import datetime, timedelta, timezone +from datetime import datetime from pathlib import Path from typing import Any, Callable, List, Optional, Union @@ -28,7 +28,7 @@ ) from feast.infra.registry.base_registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel, RepoConfig -from feast.utils import make_tzaware, to_naive_utc +from feast.utils import compute_non_entity_date_range, to_naive_utc def _read_data_source(data_source: DataSource, repo_path: str) -> Table: @@ -124,6 +124,14 @@ def _build_entity_df_from_sources( end_date: datetime, data_source_reader: Callable[[DataSource, str], Table], ) -> pd.DataFrame: + """Build a synthetic entity DataFrame from feature view sources for non-entity retrieval. + + Reads each feature view's backing data source, extracts distinct entity key + combinations within [start_date, end_date], and returns a single DataFrame + with one row per unique entity combination and ``event_timestamp`` set to + ``end_date``. When no entity columns exist across all feature views, a + minimal single-row DataFrame with only the timestamp column is returned. + """ entity_dfs: List[pd.DataFrame] = [] for fv in feature_views: @@ -225,23 +233,12 @@ def get_historical_features( non_entity_mode = entity_df is None if non_entity_mode: - end_date = ( - make_tzaware(end_date) if end_date else datetime.now(timezone.utc) + start_date, end_date = compute_non_entity_date_range( + feature_views, + start_date=start_date, + end_date=end_date, ) - if start_date is None: - max_ttl_seconds = 0 - for fv in feature_views: - if fv.ttl and isinstance(fv.ttl, timedelta): - max_ttl_seconds = max( - max_ttl_seconds, int(fv.ttl.total_seconds()) - ) - if max_ttl_seconds > 0: - start_date = end_date - timedelta(seconds=max_ttl_seconds) - else: - start_date = end_date - timedelta(days=30) - start_date = make_tzaware(start_date) - entity_df = _build_entity_df_from_sources( config=config, feature_views=feature_views, diff --git a/sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py b/sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py index 575b0243891..68652a0e61e 100644 --- a/sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py +++ b/sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py @@ -283,12 +283,13 @@ def test_non_entity_mode_with_end_date_only_calculates_start_from_ttl( assert len(driver1_data) >= 1 assert 0.3 in driver1_data["conv_rate"].values - @patch("feast.infra.offline_stores.duckdb.datetime") + @patch("feast.utils.datetime") def test_no_dates_provided_defaults_to_current_time_and_filters_data( self, mock_datetime, monkeypatch ): fixed_now = datetime(2023, 1, 7, 12, 0, 0, tzinfo=timezone.utc) mock_datetime.now.return_value = fixed_now + mock_datetime.side_effect = lambda *args, **kw: datetime(*args, **kw) src = pd.DataFrame( { diff --git a/sdk/python/tests/universal/feature_repos/repo_configuration.py b/sdk/python/tests/universal/feature_repos/repo_configuration.py index 454ca12d972..9df2b877acc 100644 --- a/sdk/python/tests/universal/feature_repos/repo_configuration.py +++ b/sdk/python/tests/universal/feature_repos/repo_configuration.py @@ -91,14 +91,17 @@ OFFLINE_STORE_TO_PROVIDER_CONFIG: Dict[str, Tuple[str, Type[DataSourceCreator]]] = { "file": ("local", FileDataSourceCreator), - "duckdb": ( - "local", - importlib.import_module( - "tests.universal.feature_repos.duckdb_repo_configuration" - ).DuckDBDataSourceCreator, - ), } +try: + from tests.universal.feature_repos.duckdb_repo_configuration import ( + DuckDBDataSourceCreator, + ) + + OFFLINE_STORE_TO_PROVIDER_CONFIG["duckdb"] = ("local", DuckDBDataSourceCreator) +except ImportError: + pass + AVAILABLE_OFFLINE_STORES: List[Tuple[str, Type[DataSourceCreator]]] = [ ("local", FileDataSourceCreator), ("local", RemoteOfflineStoreDataSourceCreator), From ff5471d6ca47f11808f8656b976c1bcadf2ea2f1 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Thu, 16 Apr 2026 20:16:38 +0530 Subject: [PATCH 10/12] fix Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .secrets.baseline | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/.secrets.baseline b/.secrets.baseline index bc9743313f5..b5475cce5b0 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -1312,13 +1312,6 @@ } ], "sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py": [ - { - "type": "AWS Access Key", - "filename": "sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py", - "hashed_secret": "25910f981e85ca04baf359199dd0bd4a3ae738b6", - "is_verified": false, - "line_number": 34 - }, { "type": "AWS Access Key", "filename": "sdk/python/tests/integration/offline_store/test_s3_custom_endpoint.py", @@ -1553,5 +1546,5 @@ } ] }, - "generated_at": "2026-04-16T14:18:36Z" + "generated_at": "2026-04-16T14:45:12Z" } From da6374078bd39ac3d263c10663f90bdfbb673823 Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Fri, 17 Apr 2026 17:09:20 +0530 Subject: [PATCH 11/12] fix Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- .secrets.baseline | 7 ------- 1 file changed, 7 deletions(-) diff --git a/.secrets.baseline b/.secrets.baseline index b5475cce5b0..44e6d7ad765 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -1471,13 +1471,6 @@ } ], "sdk/python/tests/universal/feature_repos/universal/data_sources/file.py": [ - { - "type": "AWS Access Key", - "filename": "sdk/python/tests/universal/feature_repos/universal/data_sources/file.py", - "hashed_secret": "25910f981e85ca04baf359199dd0bd4a3ae738b6", - "is_verified": false, - "line_number": 256 - }, { "type": "Base64 High Entropy String", "filename": "sdk/python/tests/universal/feature_repos/universal/data_sources/file.py", From 9d6ddec8ffe6440367c759c6208f15634bed6bca Mon Sep 17 00:00:00 2001 From: Vanshika Vanshika Date: Fri, 17 Apr 2026 18:06:25 +0530 Subject: [PATCH 12/12] fix Signed-off-by: Vanshika Vanshika rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED --- sdk/python/feast/infra/offline_stores/duckdb.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/python/feast/infra/offline_stores/duckdb.py b/sdk/python/feast/infra/offline_stores/duckdb.py index d090093d0ce..97dbe2f708e 100644 --- a/sdk/python/feast/infra/offline_stores/duckdb.py +++ b/sdk/python/feast/infra/offline_stores/duckdb.py @@ -135,6 +135,8 @@ def _build_entity_df_from_sources( entity_dfs: List[pd.DataFrame] = [] for fv in feature_views: + if fv.batch_source is None: + continue fv_table = data_source_reader(fv.batch_source, str(config.repo_path)) for old_name, new_name in fv.batch_source.field_mapping.items():