From 81c9d8994d96ecc5e736148e1a322783470c2106 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 11 Apr 2022 13:35:51 -0700 Subject: [PATCH 1/7] chore: Add a source field in the feature view API Signed-off-by: Achal Shah --- sdk/python/feast/feature_view.py | 61 +++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 16 deletions(-) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 1a885443b90..2bdbac1c562 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -20,7 +20,7 @@ from feast import utils from feast.base_feature_view import BaseFeatureView -from feast.data_source import DataSource, PushSource +from feast.data_source import DataSource, KafkaSource, KinesisSource, PushSource from feast.entity import Entity from feast.feature import Feature from feast.feature_view_projection import FeatureViewProjection @@ -88,6 +88,7 @@ class FeatureView(BaseFeatureView): tags: Dict[str, str] owner: str materialization_intervals: List[Tuple[datetime, datetime]] + source: Optional[DataSource] @log_exceptions def __init__( @@ -104,6 +105,7 @@ def __init__( description: str = "", owner: str = "", schema: Optional[List[Field]] = None, + source: Optional[DataSource] = None, ): """ Creates a FeatureView object. @@ -126,6 +128,8 @@ def __init__( primary maintainer. schema (optional): The schema of the feature view, including feature, timestamp, and entity columns. + source (optional): The source of data for this group of features. May be a stream source, or a batch source. + If a stream source, the source should contain a batch_source for backfills & batch materialization. Raises: ValueError: A field mapping conflicts with an Entity or a Feature. @@ -163,6 +167,8 @@ def __init__( self.name = _name self.entities = _entities if _entities else [DUMMY_ENTITY_NAME] + self._initialize_sources(_name, batch_source, stream_source, source) + if isinstance(_ttl, Duration): self.ttl = timedelta(seconds=int(_ttl.seconds)) warnings.warn( @@ -199,20 +205,6 @@ def __init__( # current `features` parameter only accepts feature columns. _features = _schema - if stream_source is not None and isinstance(stream_source, PushSource): - if stream_source.batch_source is None or not isinstance( - stream_source.batch_source, DataSource - ): - raise ValueError( - f"A batch_source needs to be specified for feature view `{name}`" - ) - self.batch_source = stream_source.batch_source - else: - if batch_source is None: - raise ValueError( - f"A batch_source needs to be specified for feature view `{name}`" - ) - self.batch_source = batch_source cols = [entity for entity in self.entities] + [ field.name for field in _features @@ -236,9 +228,46 @@ def __init__( owner=owner, ) self.online = online - self.stream_source = stream_source self.materialization_intervals = [] + def _initialize_sources(self, name, batch_source, stream_source, source): + if source: + if ( + isinstance(source, PushSource) + or isinstance(source, KafkaSource) + or isinstance(source, KinesisSource) + ): + self.stream_source = source + if not source.batch_source: + raise ValueError( + f"A batch_source needs to be specified for stream source `{source.name}`" + ) + else: + self.batch_source = source.batch_source + else: + self.batch_source = source + else: + warnings.warn( + "batch_source and stream_source have been deprecated in favor or `source`." + "The deprecated fields will be removed in Feast 0.23.", + DeprecationWarning, + ) + if stream_source is not None and isinstance(stream_source, PushSource): + if stream_source.batch_source is None or not isinstance( + stream_source.batch_source, DataSource + ): + raise ValueError( + f"A batch_source needs to be specified for feature view `{name}`" + ) + self.batch_source = stream_source.batch_source + else: + if batch_source is None: + raise ValueError( + f"A batch_source needs to be specified for feature view `{name}`" + ) + self.batch_source = batch_source + self.source = source + # Note: Python requires redefining hash in child classes that override __eq__ def __hash__(self): return super().__hash__() From a81da90d5a501625c9c93129f7a4544ecbc31026 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 11 Apr 2022 13:47:22 -0700 Subject: [PATCH 2/7] fix reference Signed-off-by: Achal Shah --- sdk/python/feast/feature_view.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 2bdbac1c562..92780172681 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -259,6 +259,7 @@ def _initialize_sources(self, name, batch_source, stream_source, source): raise ValueError( f"A batch_source needs to be specified for feature view `{name}`" ) + self.stream_source = stream_source self.batch_source = stream_source.batch_source else: if batch_source is None: From 2fed4c829042e83953a31fbadfaaa15a627d838f Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 11 Apr 2022 13:57:28 -0700 Subject: [PATCH 3/7] fix reference Signed-off-by: Achal Shah --- sdk/python/feast/feature_view.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 92780172681..19c2db55edc 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -266,6 +266,7 @@ def _initialize_sources(self, name, batch_source, stream_source, source): raise ValueError( f"A batch_source needs to be specified for feature view `{name}`" ) + self.stream_source = stream_source self.batch_source = batch_source self.source = source From 3f6bc644074ad280047b4cf785fdc58774b1bb2a Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 11 Apr 2022 14:04:05 -0700 Subject: [PATCH 4/7] fix reference Signed-off-by: Achal Shah --- sdk/python/feast/feature_view.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 19c2db55edc..5c8f3dab325 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -245,6 +245,7 @@ def _initialize_sources(self, name, batch_source, stream_source, source): else: self.batch_source = source.batch_source else: + self.stream_source = stream_source self.batch_source = source else: warnings.warn( From 3952e121ae5e1ac726f79f2c398ab80b6e7d82de Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 12 Apr 2022 09:28:37 -0700 Subject: [PATCH 5/7] Start updating tests Signed-off-by: Achal Shah --- sdk/python/feast/data_source.py | 2 +- sdk/python/feast/feature_view.py | 6 ++++-- .../feature_repos/universal/feature_views.py | 18 +++++++++--------- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 46df1088db8..ef7000d9ce4 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -730,7 +730,7 @@ def to_proto(self) -> DataSourceProto: name=self.name, type=DataSourceProto.PUSH_SOURCE, push_options=options, - timestamp_field=self.timestamp_field, + timestamp_sdk/python/feast/feature_view.pyself.timestamp_field, description=self.description, tags=self.tags, owner=self.owner, diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 5c8f3dab325..c5cd292081f 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -61,9 +61,9 @@ class FeatureView(BaseFeatureView): can result in extremely computationally intensive queries. batch_source (optional): The batch source of data where this group of features is stored. This is optional ONLY if a push source is specified as the - stream_source, since push sources contain their own batch sources. + stream_source, since push sources contain their own batch sources. This is deprecated in favor of `source`. stream_source (optional): The stream source of data where this group of features - is stored. + is stored. This is deprecated in favor of `source`. schema: The schema of the feature view, including feature, timestamp, and entity columns. features: The list of features defined as part of this feature view. Each @@ -74,6 +74,8 @@ class FeatureView(BaseFeatureView): tags: A dictionary of key-value pairs to store arbitrary metadata. owner: The owner of the feature view, typically the email of the primary maintainer. + source (optional): The source of data for this group of features. May be a stream source, or a batch source. + If a stream source, the source should contain a batch_source for backfills & batch materialization. """ name: str diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 02d8baddadc..a3fcbce32ed 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -31,7 +31,7 @@ def driver_feature_view( entities=["driver"], schema=None if infer_features else [Field(name="value", dtype=dtype)], ttl=timedelta(days=5), - batch_source=data_source, + source=data_source, ) @@ -49,7 +49,7 @@ def global_feature_view( if infer_features else [Feature(name="entityless_value", dtype=value_type)], ttl=timedelta(days=5), - batch_source=data_source, + source=data_source, ) @@ -162,7 +162,7 @@ def create_driver_hourly_stats_feature_view(source, infer_features: bool = False Field(name="acc_rate", dtype=Float32), Field(name="avg_daily_trips", dtype=Int32), ], - batch_source=source, + source=source, ttl=timedelta(hours=2), ) return driver_stats_feature_view @@ -179,7 +179,7 @@ def create_customer_daily_profile_feature_view(source, infer_features: bool = Fa Field(name="avg_passenger_count", dtype=Float32), Field(name="lifetime_trip_count", dtype=Int32), ], - batch_source=source, + source=source, ttl=timedelta(days=2), ) return customer_profile_feature_view @@ -196,7 +196,7 @@ def create_global_stats_feature_view(source, infer_features: bool = False): Feature(name="num_rides", dtype=ValueType.INT32), Feature(name="avg_ride_length", dtype=ValueType.FLOAT), ], - batch_source=source, + source=source, ttl=timedelta(days=2), ) return global_stats_feature_view @@ -209,7 +209,7 @@ def create_order_feature_view(source, infer_features: bool = False): schema=None if infer_features else [Field(name="order_is_success", dtype=Int32)], - batch_source=source, + source=source, ttl=timedelta(days=2), ) @@ -219,7 +219,7 @@ def create_location_stats_feature_view(source, infer_features: bool = False): name="location_stats", entities=["location_id"], schema=None if infer_features else [Field(name="temperature", dtype=Int32)], - batch_source=source, + source=source, ttl=timedelta(days=2), ) return location_stats_feature_view @@ -231,7 +231,7 @@ def create_field_mapping_feature_view(source): entities=[], # Test that Features still work for FeatureViews. features=[Feature(name="feature_name", dtype=ValueType.INT32)], - batch_source=source, + source=source, ttl=timedelta(days=2), ) @@ -252,5 +252,5 @@ def create_pushable_feature_view(batch_source: DataSource): # Test that Features still work for FeatureViews. features=[Feature(name="temperature", dtype=ValueType.INT32)], ttl=timedelta(days=2), - stream_source=push_source, + source=push_source, ) From d4b8ca23eea4a3fe04fb11c51cfed8d267e80f92 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 12 Apr 2022 09:32:29 -0700 Subject: [PATCH 6/7] fixes Signed-off-by: Achal Shah --- sdk/python/feast/data_source.py | 2 +- sdk/python/feast/feature_view.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index ef7000d9ce4..46df1088db8 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -730,7 +730,7 @@ def to_proto(self) -> DataSourceProto: name=self.name, type=DataSourceProto.PUSH_SOURCE, push_options=options, - timestamp_sdk/python/feast/feature_view.pyself.timestamp_field, + timestamp_field=self.timestamp_field, description=self.description, tags=self.tags, owner=self.owner, diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index c5cd292081f..245c748dab0 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -207,7 +207,6 @@ def __init__( # current `features` parameter only accepts feature columns. _features = _schema - cols = [entity for entity in self.entities] + [ field.name for field in _features ] From cb5b31b53d558be52583899a84e37988a33b3139 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 12 Apr 2022 09:36:05 -0700 Subject: [PATCH 7/7] simpify Signed-off-by: Achal Shah --- sdk/python/feast/feature_view.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 245c748dab0..4ef1820a3f4 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -255,12 +255,6 @@ def _initialize_sources(self, name, batch_source, stream_source, source): DeprecationWarning, ) if stream_source is not None and isinstance(stream_source, PushSource): - if stream_source.batch_source is None or not isinstance( - stream_source.batch_source, DataSource - ): - raise ValueError( - f"A batch_source needs to be specified for feature view `{name}`" - ) self.stream_source = stream_source self.batch_source = stream_source.batch_source else: