diff --git a/.secrets.baseline b/.secrets.baseline index 96bf780809c..44e6d7ad765 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -1460,14 +1460,14 @@ "filename": "sdk/python/tests/universal/feature_repos/repo_configuration.py", "hashed_secret": "d90e76ef629fb00c95f4e84fec29fbda111e2392", "is_verified": false, - "line_number": 459 + "line_number": 468 }, { "type": "Secret Keyword", "filename": "sdk/python/tests/universal/feature_repos/repo_configuration.py", "hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8", "is_verified": false, - "line_number": 461 + "line_number": 470 } ], "sdk/python/tests/universal/feature_repos/universal/data_sources/file.py": [ @@ -1539,5 +1539,5 @@ } ] }, - "generated_at": "2026-04-07T15:56:56Z" + "generated_at": "2026-04-16T14:45:12Z" } diff --git a/sdk/python/feast/infra/offline_stores/duckdb.py b/sdk/python/feast/infra/offline_stores/duckdb.py index e0a69e53c56..97dbe2f708e 100644 --- a/sdk/python/feast/infra/offline_stores/duckdb.py +++ b/sdk/python/feast/infra/offline_stores/duckdb.py @@ -23,8 +23,12 @@ write_logged_features_ibis, ) from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob +from feast.infra.offline_stores.offline_utils import ( + DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, +) from feast.infra.registry.base_registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.utils import compute_non_entity_date_range, to_naive_utc def _read_data_source(data_source: DataSource, repo_path: str) -> Table: @@ -113,6 +117,73 @@ def _write_data_source( ) +def _build_entity_df_from_sources( + config: RepoConfig, + feature_views: List[FeatureView], + start_date: datetime, + end_date: datetime, + data_source_reader: Callable[[DataSource, str], Table], +) -> pd.DataFrame: + """Build a synthetic entity DataFrame from feature view sources for non-entity retrieval. + + Reads each feature view's backing data source, extracts distinct entity key + combinations within [start_date, end_date], and returns a single DataFrame + with one row per unique entity combination and ``event_timestamp`` set to + ``end_date``. When no entity columns exist across all feature views, a + minimal single-row DataFrame with only the timestamp column is returned. + """ + entity_dfs: List[pd.DataFrame] = [] + + for fv in feature_views: + if fv.batch_source is None: + continue + fv_table = data_source_reader(fv.batch_source, str(config.repo_path)) + + for old_name, new_name in fv.batch_source.field_mapping.items(): + if old_name in fv_table.columns: + fv_table = fv_table.rename({new_name: old_name}) + + timestamp_field = fv.batch_source.timestamp_field + + start_naive = to_naive_utc(start_date) + end_naive = to_naive_utc(end_date) + + fv_table = fv_table.filter( + ibis.and_( + fv_table[timestamp_field] >= ibis.literal(start_naive), + fv_table[timestamp_field] <= ibis.literal(end_naive), + ) + ) + + join_key_map = fv.projection.join_key_map or { + e.name: e.name for e in fv.entity_columns + } + source_join_key_cols = list(join_key_map.keys()) + + if source_join_key_cols: + # join_key_map is {feature_key: entity_key}; ibis rename({new: old}) renames + # old->new, so pass inverted map to rename feature columns to entity names. + distinct_entities = ( + fv_table.select(*source_join_key_cols) + .rename({v: k for k, v in join_key_map.items()}) + .distinct() + .execute() + ) + entity_dfs.append(distinct_entities) + + if not entity_dfs: + return pd.DataFrame({DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL: [end_date]}) + + combined = pd.concat(entity_dfs, ignore_index=True) + + all_cols = list(combined.columns) + combined = combined.drop_duplicates(subset=all_cols).reset_index(drop=True) + + combined[DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL] = end_date + + return combined + + class DuckDBOfflineStoreConfig(FeastConfigBaseModel): type: StrictStr = "duckdb" # """ Offline store type selector""" @@ -154,11 +225,30 @@ def get_historical_features( config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], - entity_df: Union[pd.DataFrame, str], + entity_df: Optional[Union[pd.DataFrame, str]], registry: BaseRegistry, project: str, full_feature_names: bool = False, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: + non_entity_mode = entity_df is None + + if non_entity_mode: + start_date, end_date = compute_non_entity_date_range( + feature_views, + start_date=start_date, + end_date=end_date, + ) + + entity_df = _build_entity_df_from_sources( + config=config, + feature_views=feature_views, + start_date=start_date, + end_date=end_date, + data_source_reader=_read_data_source, + ) + return get_historical_features_ibis( config=config, feature_views=feature_views, diff --git a/sdk/python/tests/integration/offline_store/test_non_entity_mode.py b/sdk/python/tests/integration/offline_store/test_non_entity_mode.py index f17352fb356..9879f331a09 100644 --- a/sdk/python/tests/integration/offline_store/test_non_entity_mode.py +++ b/sdk/python/tests/integration/offline_store/test_non_entity_mode.py @@ -11,8 +11,7 @@ @pytest.mark.integration -@pytest.mark.universal_offline_stores -@pytest.mark.ray_offline_stores_only +@pytest.mark.universal_offline_stores(only=["file", "duckdb"]) def test_non_entity_mode_basic(environment, universal_data_sources): """Test historical features retrieval without entity_df (non-entity mode). diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index af0de479f3e..92d89a74f3a 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -731,7 +731,7 @@ def test_historical_features_field_mapping( @pytest.mark.integration -@pytest.mark.universal_offline_stores(only=["file"]) +@pytest.mark.universal_offline_stores(only=["file", "duckdb"]) def test_historical_features_non_entity_retrieval(environment): """Test get_historical_features with entity_df=None using start_date/end_date. diff --git a/sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py b/sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py new file mode 100644 index 00000000000..68652a0e61e --- /dev/null +++ b/sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py @@ -0,0 +1,554 @@ +from datetime import datetime, timedelta, timezone +from unittest.mock import MagicMock, patch + +import ibis +import pandas as pd + +from feast.entity import Entity +from feast.feature_view import FeatureView, Field +from feast.infra.offline_stores import duckdb as duckdb_mod +from feast.infra.offline_stores.duckdb import ( + DuckDBOfflineStore, + DuckDBOfflineStoreConfig, +) +from feast.infra.offline_stores.file_source import FileSource +from feast.infra.offline_stores.offline_store import RetrievalJob +from feast.repo_config import RepoConfig +from feast.types import Float32, Int64, ValueType + + +def _mock_duckdb_offline_store_config(): + return DuckDBOfflineStoreConfig(type="duckdb") + + +def _mock_entity(): + return [ + Entity( + name="driver_id", + join_keys=["driver_id"], + description="Driver ID", + value_type=ValueType.INT64, + ) + ] + + +def _mock_feature_view(name: str = "driver_stats", ttl: timedelta = None): + return FeatureView( + name=name, + entities=_mock_entity(), + schema=[ + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource( + path="dummy.parquet", + timestamp_field="event_timestamp", + ), + ttl=ttl, + ) + + +def _mock_data_source_reader(src_df): + """Return a data_source_reader that wraps a pandas DataFrame as an ibis memtable.""" + + def reader(data_source, repo_path): + return ibis.memtable(src_df) + + return reader + + +def test_duckdb_non_entity_historical_retrieval_accepts_dates(monkeypatch): + src = pd.DataFrame( + { + "driver_id": [1], + "event_timestamp": pd.to_datetime(["2023-01-01T12:00:00Z"]), + "conv_rate": [0.5], + } + ) + monkeypatch.setattr(duckdb_mod, "_read_data_source", _mock_data_source_reader(src)) + + repo_config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + fv = _mock_feature_view() + + retrieval_job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["driver_stats:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="test_project", + full_feature_names=False, + start_date=datetime(2023, 1, 1, tzinfo=timezone.utc), + end_date=datetime(2023, 1, 2, tzinfo=timezone.utc), + ) + + assert isinstance(retrieval_job, RetrievalJob) + + +class TestNonEntityRetrieval: + """Test suite for non-entity retrieval functionality (entity_df=None) in DuckDB offline store.""" + + def test_duckdb_non_entity_snapshot_ttl_and_dedup(self, monkeypatch): + src = pd.DataFrame( + { + "driver_id": [1, 1, 1, 2, 2], + "event_timestamp": pd.to_datetime( + [ + "2025-01-01T10:00:00Z", + "2025-01-01T10:00:00Z", + "2024-12-30T10:00:00Z", + "2025-01-01T12:00:00Z", + "2025-01-02T11:00:00Z", + ] + ), + "created_ts": pd.to_datetime( + [ + "2025-01-01T10:00:01Z", + "2025-01-01T10:00:02Z", + "2024-12-30T10:00:00Z", + "2025-01-01T12:00:00Z", + "2025-01-02T11:00:00Z", + ] + ), + "conv_rate": [0.1, 0.2, 0.05, 0.3, 0.4], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + fv = FeatureView( + name="driver_stats", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource( + path="unused", + timestamp_field="event_timestamp", + created_timestamp_column="created_ts", + ), + ttl=timedelta(days=1), + ) + + repo_config = RepoConfig( + project="proj", + registry="unused", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + end = datetime(2025, 1, 2, 10, 0, tzinfo=timezone.utc) + start = end - timedelta(days=7) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["driver_stats:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="proj", + full_feature_names=False, + start_date=start, + end_date=end, + ) + + df = job.to_df() + + assert set(df["driver_id"]) == {1, 2} + out = df.set_index("driver_id")["conv_rate"].to_dict() + assert out[1] == 0.2 + assert out[2] == 0.3 + + def test_non_entity_mode_with_both_dates_retrieves_data(self, monkeypatch): + src = pd.DataFrame( + { + "driver_id": [1, 1, 1, 2], + "event_timestamp": pd.to_datetime( + [ + "2023-01-01T08:00:00Z", + "2023-01-03T10:00:00Z", + "2023-01-05T12:00:00Z", + "2023-01-08T14:00:00Z", + ] + ), + "conv_rate": [0.1, 0.2, 0.3, 0.4], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + repo_config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + fv = FeatureView( + name="test_fv", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource(path="unused", timestamp_field="event_timestamp"), + ttl=None, + ) + + start_date = datetime(2023, 1, 2, tzinfo=timezone.utc) + end_date = datetime(2023, 1, 7, tzinfo=timezone.utc) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["test_fv:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="test_project", + start_date=start_date, + end_date=end_date, + ) + + df = job.to_df() + + assert job.metadata.min_event_timestamp == end_date + assert job.metadata.max_event_timestamp == end_date + + assert len(df) >= 1 + assert set(df["driver_id"]) == {1} + driver1_data = df[df["driver_id"] == 1] + assert 0.3 in driver1_data["conv_rate"].values + + def test_non_entity_mode_with_end_date_only_calculates_start_from_ttl( + self, monkeypatch + ): + src = pd.DataFrame( + { + "driver_id": [1, 1, 1], + "event_timestamp": pd.to_datetime( + [ + "2023-01-05T08:00:00Z", + "2023-01-06T10:00:00Z", + "2023-01-07T12:00:00Z", + ] + ), + "conv_rate": [0.1, 0.2, 0.3], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + repo_config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + fv = FeatureView( + name="test_fv", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource(path="unused", timestamp_field="event_timestamp"), + ttl=timedelta(days=1), + ) + + end_date = datetime(2023, 1, 7, 12, 0, tzinfo=timezone.utc) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["test_fv:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="test_project", + end_date=end_date, + ) + + df = job.to_df() + + driver1_data = df[df["driver_id"] == 1] + assert len(driver1_data) >= 1 + assert 0.3 in driver1_data["conv_rate"].values + + @patch("feast.utils.datetime") + def test_no_dates_provided_defaults_to_current_time_and_filters_data( + self, mock_datetime, monkeypatch + ): + fixed_now = datetime(2023, 1, 7, 12, 0, 0, tzinfo=timezone.utc) + mock_datetime.now.return_value = fixed_now + mock_datetime.side_effect = lambda *args, **kw: datetime(*args, **kw) + + src = pd.DataFrame( + { + "driver_id": [1, 1, 1], + "event_timestamp": pd.to_datetime( + [ + "2023-01-05T12:00:00Z", + "2023-01-06T18:00:00Z", + "2023-01-07T11:00:00Z", + ] + ), + "conv_rate": [0.1, 0.2, 0.3], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + repo_config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + fv = FeatureView( + name="test_fv", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource(path="unused", timestamp_field="event_timestamp"), + ttl=timedelta(days=1), + ) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["test_fv:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="test_project", + ) + + df = job.to_df() + + driver1_data = df[df["driver_id"] == 1] + assert len(driver1_data) >= 1 + assert 0.3 in driver1_data["conv_rate"].values + + def test_ttl_filtering_excludes_old_rows(self, monkeypatch): + src = pd.DataFrame( + { + "driver_id": [1, 1, 1], + "event_timestamp": pd.to_datetime( + [ + "2025-01-01T08:00:00Z", + "2025-01-01T09:30:00Z", + "2025-01-01T10:00:00Z", + ] + ), + "conv_rate": [0.1, 0.2, 0.3], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + fv = FeatureView( + name="driver_stats", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource(path="unused", timestamp_field="event_timestamp"), + ttl=timedelta(hours=1), + ) + + repo_config = RepoConfig( + project="proj", + registry="unused", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + end = datetime(2025, 1, 1, 10, 0, tzinfo=timezone.utc) + start = end - timedelta(days=1) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["driver_stats:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="proj", + full_feature_names=False, + start_date=start, + end_date=end, + ) + + df = job.to_df() + + driver1_data = df[df["driver_id"] == 1] + assert len(driver1_data) >= 1 + assert ( + 0.2 in driver1_data["conv_rate"].values + or 0.3 in driver1_data["conv_rate"].values + ) + assert 0.1 not in driver1_data["conv_rate"].values + + def test_multiple_feature_views_with_different_ttls(self, monkeypatch): + src1 = pd.DataFrame( + { + "driver_id": [1, 1], + "event_timestamp": pd.to_datetime( + [ + "2025-01-01T08:00:00Z", + "2025-01-01T09:30:00Z", + ] + ), + "age": [25, 26], + } + ) + + src2 = pd.DataFrame( + { + "driver_id": [1, 1], + "event_timestamp": pd.to_datetime( + [ + "2025-01-01T08:00:00Z", + "2025-01-01T09:30:00Z", + ] + ), + "total_trips": [100, 101], + } + ) + + def mock_read_data_source(data_source, repo_path): + if data_source.path == "unused1": + return ibis.memtable(src1) + return ibis.memtable(src2) + + monkeypatch.setattr(duckdb_mod, "_read_data_source", mock_read_data_source) + + fv1 = FeatureView( + name="user_fv", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="age", dtype=Float32), + ], + source=FileSource(path="unused1", timestamp_field="event_timestamp"), + ttl=timedelta(hours=1), + ) + + fv2 = FeatureView( + name="transaction_fv", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="total_trips", dtype=Float32), + ], + source=FileSource(path="unused2", timestamp_field="event_timestamp"), + ttl=timedelta(days=1), + ) + + repo_config = RepoConfig( + project="proj", + registry="unused", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + end = datetime(2025, 1, 1, 10, 0, tzinfo=timezone.utc) + start = end - timedelta(days=1) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv1, fv2], + feature_refs=["user_fv:age", "transaction_fv:total_trips"], + entity_df=None, + registry=MagicMock(), + project="proj", + full_feature_names=False, + start_date=start, + end_date=end, + ) + + df = job.to_df() + + assert "age" in df.columns or "user_fv__age" in df.columns + assert ( + "total_trips" in df.columns or "transaction_fv__total_trips" in df.columns + ) + + if "age" in df.columns: + age_values = df["age"].values + assert 26 in age_values + assert 25 not in age_values + + if "total_trips" in df.columns: + trips_values = df["total_trips"].values + assert 101 in trips_values + + def test_entity_df_still_works(self, monkeypatch): + """Verify standard entity_df path is not broken by the changes.""" + src = pd.DataFrame( + { + "driver_id": [1, 2], + "event_timestamp": pd.to_datetime( + ["2025-01-01T10:00:00Z", "2025-01-01T11:00:00Z"] + ), + "conv_rate": [0.5, 0.6], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + repo_config = RepoConfig( + project="proj", + registry="unused", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + fv = FeatureView( + name="driver_stats", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource(path="unused", timestamp_field="event_timestamp"), + ttl=timedelta(days=1), + ) + + entity_df = pd.DataFrame( + { + "driver_id": [1, 2], + "event_timestamp": pd.to_datetime( + ["2025-01-01T12:00:00Z", "2025-01-01T12:00:00Z"] + ), + } + ) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["driver_stats:conv_rate"], + entity_df=entity_df, + registry=MagicMock(), + project="proj", + full_feature_names=False, + ) + + df = job.to_df() + + assert set(df["driver_id"]) == {1, 2} + assert len(df) == 2 diff --git a/sdk/python/tests/universal/feature_repos/repo_configuration.py b/sdk/python/tests/universal/feature_repos/repo_configuration.py index ddd952f71dc..9df2b877acc 100644 --- a/sdk/python/tests/universal/feature_repos/repo_configuration.py +++ b/sdk/python/tests/universal/feature_repos/repo_configuration.py @@ -93,6 +93,15 @@ "file": ("local", FileDataSourceCreator), } +try: + from tests.universal.feature_repos.duckdb_repo_configuration import ( + DuckDBDataSourceCreator, + ) + + OFFLINE_STORE_TO_PROVIDER_CONFIG["duckdb"] = ("local", DuckDBDataSourceCreator) +except ImportError: + pass + AVAILABLE_OFFLINE_STORES: List[Tuple[str, Type[DataSourceCreator]]] = [ ("local", FileDataSourceCreator), ("local", RemoteOfflineStoreDataSourceCreator),