Skip to content

Commit c2c1384

Browse files
authored
Support on demand feature views in feature services (#1849)
* Support on demand feature views in feature services Signed-off-by: Achal Shah <achals@gmail.com> * Fix copy paste Signed-off-by: Achal Shah <achals@gmail.com>
1 parent 2d52ce7 commit c2c1384

5 files changed

Lines changed: 148 additions & 5 deletions

File tree

sdk/python/feast/feature_service.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from feast.feature_table import FeatureTable
77
from feast.feature_view import FeatureView
88
from feast.feature_view_projection import FeatureViewProjection
9+
from feast.on_demand_feature_view import OnDemandFeatureView
910
from feast.protos.feast.core.FeatureService_pb2 import (
1011
FeatureService as FeatureServiceProto,
1112
)
@@ -38,7 +39,9 @@ class FeatureService:
3839
def __init__(
3940
self,
4041
name: str,
41-
features: List[Union[FeatureTable, FeatureView, FeatureViewProjection]],
42+
features: List[
43+
Union[FeatureTable, FeatureView, OnDemandFeatureView, FeatureViewProjection]
44+
],
4245
tags: Optional[Dict[str, str]] = None,
4346
description: Optional[str] = None,
4447
):
@@ -51,7 +54,11 @@ def __init__(
5154
self.name = name
5255
self.features = []
5356
for feature in features:
54-
if isinstance(feature, FeatureTable) or isinstance(feature, FeatureView):
57+
if (
58+
isinstance(feature, FeatureTable)
59+
or isinstance(feature, FeatureView)
60+
or isinstance(feature, OnDemandFeatureView)
61+
):
5562
self.features.append(FeatureViewProjection.from_definition(feature))
5663
elif isinstance(feature, FeatureViewProjection):
5764
self.features.append(feature)

sdk/python/feast/on_demand_feature_view.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from feast.errors import RegistryInferenceFailure
1010
from feast.feature import Feature
1111
from feast.feature_view import FeatureView
12+
from feast.feature_view_projection import FeatureViewProjection
1213
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
1314
OnDemandFeatureView as OnDemandFeatureViewProto,
1415
)
@@ -132,6 +133,16 @@ def get_transformed_features_df(
132133
df_with_features.drop(columns=columns_to_cleanup, inplace=True)
133134
return df_with_transformed_features
134135

136+
def __getitem__(self, item) -> FeatureViewProjection:
137+
assert isinstance(item, list)
138+
139+
referenced_features = []
140+
for feature in self.features:
141+
if feature.name in item:
142+
referenced_features.append(feature)
143+
144+
return FeatureViewProjection(self.name, referenced_features)
145+
135146
def infer_features_from_batch_source(self, config: RepoConfig):
136147
"""
137148
Infers the set of features associated to this feature view from the input source.

sdk/python/tests/integration/feature_repos/universal/feature_views.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def conv_rate_plus_100_feature_view(
5757
)
5858

5959

60-
def create_driver_hourly_stats_feature_view(source, infer_features: bool = True):
60+
def create_driver_hourly_stats_feature_view(source, infer_features: bool = False):
6161
driver_stats_feature_view = FeatureView(
6262
name="driver_stats",
6363
entities=["driver"],

sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from pytz import utc
99

1010
from feast import utils
11+
from feast.feature_service import FeatureService
1112
from feast.feature_view import FeatureView
1213
from feast.infra.offline_stores.offline_utils import (
1314
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
@@ -183,9 +184,22 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
183184
feature_views["global"],
184185
)
185186

187+
feature_service = FeatureService(
188+
"convrate_plus100",
189+
features=[feature_views["driver"][["conv_rate"]], feature_views["driver_odfv"]],
190+
)
191+
186192
feast_objects = []
187193
feast_objects.extend(
188-
[customer_fv, driver_fv, driver_odfv, global_fv, driver(), customer()]
194+
[
195+
customer_fv,
196+
driver_fv,
197+
driver_odfv,
198+
global_fv,
199+
driver(),
200+
customer(),
201+
feature_service,
202+
]
189203
)
190204
store.apply(feast_objects)
191205

@@ -312,6 +326,14 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
312326
assert_frame_equal(
313327
expected_df, actual_df_from_df_entities, check_dtype=False,
314328
)
329+
assert_feature_service_correctness(
330+
store,
331+
feature_service,
332+
full_feature_names,
333+
orders_df,
334+
expected_df,
335+
event_timestamp,
336+
)
315337

316338
# on demand features is only plumbed through to to_df for now.
317339
table_from_df_entities: pd.DataFrame = job_from_df.to_arrow().to_pandas()
@@ -330,3 +352,54 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
330352
.reset_index(drop=True)
331353
)
332354
assert_frame_equal(actual_df_from_df_entities_for_table, table_from_df_entities)
355+
356+
357+
def response_feature_name(feature: str, full_feature_names: bool) -> str:
358+
if feature in {"conv_rate", "avg_daily_trips"} and full_feature_names:
359+
return f"driver_stats__{feature}"
360+
361+
if feature in {"conv_rate_plus_100"} and full_feature_names:
362+
return f"conv_rate_plus_100__{feature}"
363+
364+
return feature
365+
366+
367+
def assert_feature_service_correctness(
368+
store, feature_service, full_feature_names, orders_df, expected_df, event_timestamp
369+
):
370+
371+
job_from_df = store.get_historical_features(
372+
entity_df=orders_df,
373+
features=feature_service,
374+
full_feature_names=full_feature_names,
375+
)
376+
377+
actual_df_from_df_entities = job_from_df.to_df()
378+
379+
expected_df: pd.DataFrame = (
380+
expected_df.sort_values(
381+
by=[event_timestamp, "order_id", "driver_id", "customer_id"]
382+
)
383+
.drop_duplicates()
384+
.reset_index(drop=True)
385+
)
386+
expected_df = expected_df[
387+
[
388+
event_timestamp,
389+
"order_id",
390+
"driver_id",
391+
"customer_id",
392+
response_feature_name("conv_rate", full_feature_names),
393+
"conv_rate_plus_100",
394+
]
395+
]
396+
actual_df_from_df_entities = (
397+
actual_df_from_df_entities[expected_df.columns]
398+
.sort_values(by=[event_timestamp, "order_id", "driver_id", "customer_id"])
399+
.drop_duplicates()
400+
.reset_index(drop=True)
401+
)
402+
403+
assert_frame_equal(
404+
expected_df, actual_df_from_df_entities, check_dtype=False,
405+
)

sdk/python/tests/integration/online_store/test_universal_online.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import pandas as pd
55
import pytest
66

7+
from feast import FeatureService
78
from tests.integration.feature_repos.repo_configuration import (
89
construct_universal_feature_views,
910
)
@@ -17,9 +18,15 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name
1718
fs = environment.feature_store
1819
entities, datasets, data_sources = universal_data_sources
1920
feature_views = construct_universal_feature_views(data_sources)
21+
22+
feature_service = FeatureService(
23+
"convrate_plus100",
24+
features=[feature_views["driver"][["conv_rate"]], feature_views["driver_odfv"]],
25+
)
26+
2027
feast_objects = []
2128
feast_objects.extend(feature_views.values())
22-
feast_objects.extend([driver(), customer()])
29+
feast_objects.extend([driver(), customer(), feature_service])
2330
fs.apply(feast_objects)
2431
fs.materialize(environment.start_date, environment.end_date)
2532

@@ -114,6 +121,16 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name
114121
][0]
115122
)
116123

124+
assert_feature_service_correctness(
125+
fs,
126+
feature_service,
127+
entity_rows,
128+
full_feature_names,
129+
drivers_df,
130+
customers_df,
131+
global_df,
132+
)
133+
117134

118135
def response_feature_name(feature: str, full_feature_names: bool) -> str:
119136
if (
@@ -147,3 +164,38 @@ def get_latest_feature_values_from_dataframes(
147164
latest_global_row = global_df.loc[global_df["event_timestamp"].idxmax()].to_dict()
148165

149166
return {**latest_customer_row, **latest_driver_row, **latest_global_row}
167+
168+
169+
def assert_feature_service_correctness(
170+
fs,
171+
feature_service,
172+
entity_rows,
173+
full_feature_names,
174+
drivers_df,
175+
customers_df,
176+
global_df,
177+
):
178+
feature_service_response = fs.get_online_features(
179+
features=feature_service,
180+
entity_rows=entity_rows,
181+
full_feature_names=full_feature_names,
182+
)
183+
assert feature_service_response is not None
184+
185+
feature_service_online_features_dict = feature_service_response.to_dict()
186+
feature_service_keys = feature_service_online_features_dict.keys()
187+
188+
assert (
189+
len(feature_service_keys) == len(feature_service.features) + 2
190+
) # Add two for the driver id and the customer id entity keys.
191+
192+
for i, entity_row in enumerate(entity_rows):
193+
df_features = get_latest_feature_values_from_dataframes(
194+
drivers_df, customers_df, global_df, entity_row
195+
)
196+
assert (
197+
feature_service_online_features_dict[
198+
response_feature_name("conv_rate_plus_100", full_feature_names)
199+
][i]
200+
== df_features["conv_rate"] + 100
201+
)

0 commit comments

Comments
 (0)