From e7eec5250c926908a326b5ebf87d099e39f21e5c Mon Sep 17 00:00:00 2001 From: ntkathole Date: Mon, 9 Mar 2026 21:17:43 +0530 Subject: [PATCH] feat: Added odfv transformations metrics Signed-off-by: ntkathole --- .../components/open-telemetry.md | 6 + .../feature-servers/python-feature-server.md | 56 +++-- docs/reference/feature-store-yaml.md | 2 +- sdk/python/feast/feature_store.py | 148 +++++++------ .../infra/feature_servers/base_config.py | 9 +- .../feast/infra/online_stores/online_store.py | 36 ++++ sdk/python/feast/metrics.py | 54 ++++- sdk/python/feast/on_demand_feature_view.py | 34 ++- sdk/python/feast/utils.py | 26 ++- sdk/python/tests/unit/test_metrics.py | 198 +++++++++++++++++- .../tests/unit/test_on_demand_feature_view.py | 172 +++++++++++++++ 11 files changed, 649 insertions(+), 92 deletions(-) diff --git a/docs/getting-started/components/open-telemetry.md b/docs/getting-started/components/open-telemetry.md index bdffad1d27b..bbad5ed296e 100644 --- a/docs/getting-started/components/open-telemetry.md +++ b/docs/getting-started/components/open-telemetry.md @@ -144,6 +144,12 @@ Once configured, you can monitor various metrics including: - `feast_feature_server_memory_usage`: Memory utilization of the feature server - `feast_feature_server_cpu_usage`: CPU usage statistics +- `feast_feature_server_request_latency_seconds`: Request latency with feature count dimensions +- `feast_feature_server_online_store_read_duration_seconds`: Online store read phase duration +- `feast_feature_server_transformation_duration_seconds`: ODFV read-path transformation duration (per ODFV, requires `track_metrics=True`) +- `feast_feature_server_write_transformation_duration_seconds`: ODFV write-path transformation duration (per ODFV, requires `track_metrics=True`) - Additional custom metrics based on your configuration +For the full list of metrics, see the [Python Feature Server reference](../../reference/feature-servers/python-feature-server.md#available-metrics). + These metrics can be visualized using Prometheus and other compatible monitoring tools. diff --git a/docs/reference/feature-servers/python-feature-server.md b/docs/reference/feature-servers/python-feature-server.md index bf288b191ef..654c4b9f938 100644 --- a/docs/reference/feature-servers/python-feature-server.md +++ b/docs/reference/feature-servers/python-feature-server.md @@ -360,18 +360,50 @@ thread from starting). All categories default to `true`. ### Available metrics -| Metric | Type | Labels | Description | -|--------|------|--------|-------------| -| `feast_feature_server_cpu_usage` | Gauge | — | Process CPU usage % | -| `feast_feature_server_memory_usage` | Gauge | — | Process memory usage % | -| `feast_feature_server_request_total` | Counter | `endpoint`, `status` | Total requests per endpoint | -| `feast_feature_server_request_latency_seconds` | Histogram | `endpoint`, `feature_count`, `feature_view_count` | Request latency with p50/p95/p99 support | -| `feast_online_features_request_total` | Counter | — | Total online feature retrieval requests | -| `feast_online_features_entity_count` | Histogram | — | Entity rows per online feature request | -| `feast_push_request_total` | Counter | `push_source`, `mode` | Push requests by source and mode | -| `feast_materialization_total` | Counter | `feature_view`, `status` | Materialization runs (success/failure) | -| `feast_materialization_duration_seconds` | Histogram | `feature_view` | Materialization duration per feature view | -| `feast_feature_freshness_seconds` | Gauge | `feature_view`, `project` | Seconds since last materialization | +| Metric | Type | Labels | Category | Description | +|--------|------|--------|----------|-------------| +| `feast_feature_server_cpu_usage` | Gauge | — | `resource` | Process CPU usage % | +| `feast_feature_server_memory_usage` | Gauge | — | `resource` | Process memory usage % | +| `feast_feature_server_request_total` | Counter | `endpoint`, `status` | `request` | Total requests per endpoint | +| `feast_feature_server_request_latency_seconds` | Histogram | `endpoint`, `feature_count`, `feature_view_count` | `request` | Request latency with p50/p95/p99 support | +| `feast_online_features_request_total` | Counter | — | `online_features` | Total online feature retrieval requests | +| `feast_online_features_entity_count` | Histogram | — | `online_features` | Entity rows per online feature request | +| `feast_feature_server_online_store_read_duration_seconds` | Histogram | — | `online_features` | Online store read phase duration (sync and async) | +| `feast_feature_server_transformation_duration_seconds` | Histogram | `odfv_name`, `mode` | `online_features` | ODFV read-path transformation duration (requires `track_metrics=True` on the ODFV) | +| `feast_feature_server_write_transformation_duration_seconds` | Histogram | `odfv_name`, `mode` | `online_features` | ODFV write-path transformation duration (requires `track_metrics=True` on the ODFV) | +| `feast_push_request_total` | Counter | `push_source`, `mode` | `push` | Push requests by source and mode | +| `feast_materialization_result_total` | Counter | `feature_view`, `status` | `materialization` | Materialization runs (success/failure) | +| `feast_materialization_duration_seconds` | Histogram | `feature_view` | `materialization` | Materialization duration per feature view | +| `feast_feature_freshness_seconds` | Gauge | `feature_view`, `project` | `freshness` | Seconds since last materialization | + +### Per-ODFV transformation metrics + +The `transformation_duration_seconds` and `write_transformation_duration_seconds` +metrics are gated behind **two** conditions — both must be true for any +instrumentation to run: + +1. **Server-level**: the `online_features` category must be enabled in the + metrics configuration. +2. **ODFV-level**: the `OnDemandFeatureView` must have `track_metrics=True`. + +This defaults to `False`, so no ODFV incurs timing overhead unless explicitly +opted in: + +```python +from feast.on_demand_feature_view import on_demand_feature_view + +@on_demand_feature_view( + sources=[my_feature_view, my_request_source], + schema=[Field(name="output", dtype=Float64)], + track_metrics=True, # opt in to transformation timing +) +def my_transform(inputs: pd.DataFrame) -> pd.DataFrame: + ... +``` + +The `odfv_name` label lets you filter or group by individual ODFV, +and the `mode` label (`python`, `pandas`, `substrait`) lets you compare +transformation engines. ### Scraping with Prometheus diff --git a/docs/reference/feature-store-yaml.md b/docs/reference/feature-store-yaml.md index 7411c673576..c287ddbc73a 100644 --- a/docs/reference/feature-store-yaml.md +++ b/docs/reference/feature-store-yaml.md @@ -40,7 +40,7 @@ feature_server: enabled: true # Enable Prometheus metrics server on port 8000 resource: true # CPU / memory gauges request: true # endpoint latency histograms & request counters - online_features: true # online feature retrieval counters + online_features: true # online feature retrieval counters + store read & ODFV transform timing push: true # push request counters materialization: true # materialization counters & duration histograms freshness: true # per-feature-view freshness gauges diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index fe0e7967345..8e16b0a30a4 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -2097,76 +2097,104 @@ def _transform_on_demand_feature_view_df( Raises: Exception: For unsupported OnDemandFeatureView modes """ - if feature_view.mode == "python" and isinstance( - feature_view.feature_transformation, PythonTransformation - ): - input_dict = ( - df.to_dict(orient="records")[0] - if feature_view.singleton - else df.to_dict(orient="list") + _should_track = False + try: + from feast.metrics import _config as _metrics_config + + _should_track = _metrics_config.online_features and getattr( + feature_view, "track_metrics", False ) + except Exception: + pass - if feature_view.singleton: - transformed_rows = [] + if _should_track: + import time as _time - for i, row in df.iterrows(): - output = feature_view.feature_transformation.udf(row.to_dict()) - if i == 0: - transformed_rows = output - else: - for k in output: - if isinstance(output[k], list): - transformed_rows[k].extend(output[k]) - else: - transformed_rows[k].append(output[k]) - - transformed_data = pd.DataFrame(transformed_rows) - else: - transformed_data = feature_view.feature_transformation.udf(input_dict) + _t0 = _time.monotonic() - if feature_view.write_to_online_store: - entities = [ - self.get_entity(entity) for entity in (feature_view.entities or []) - ] - join_keys = [entity.join_key for entity in entities if entity] - join_keys = [k for k in join_keys if k in input_dict.keys()] - transformed_df = ( - pd.DataFrame(transformed_data) - if not isinstance(transformed_data, pd.DataFrame) - else transformed_data - ) - input_df = pd.DataFrame( - [input_dict] if feature_view.singleton else input_dict + try: + if feature_view.mode == "python" and isinstance( + feature_view.feature_transformation, PythonTransformation + ): + input_dict = ( + df.to_dict(orient="records")[0] + if feature_view.singleton + else df.to_dict(orient="list") ) - if input_df.shape[0] == transformed_df.shape[0]: + + if feature_view.singleton: + transformed_rows = [] + + for i, row in df.iterrows(): + output = feature_view.feature_transformation.udf(row.to_dict()) + if i == 0: + transformed_rows = output + else: + for k in output: + if isinstance(output[k], list): + transformed_rows[k].extend(output[k]) + else: + transformed_rows[k].append(output[k]) + + transformed_data = pd.DataFrame(transformed_rows) + else: + transformed_data = feature_view.feature_transformation.udf( + input_dict + ) + + if feature_view.write_to_online_store: + entities = [ + self.get_entity(entity) + for entity in (feature_view.entities or []) + ] + join_keys = [entity.join_key for entity in entities if entity] + join_keys = [k for k in join_keys if k in input_dict.keys()] + transformed_df = ( + pd.DataFrame(transformed_data) + if not isinstance(transformed_data, pd.DataFrame) + else transformed_data + ) + input_df = pd.DataFrame( + [input_dict] if feature_view.singleton else input_dict + ) + if input_df.shape[0] == transformed_df.shape[0]: + for k in input_dict: + if k not in transformed_data: + transformed_data[k] = input_dict[k] + transformed_df = pd.DataFrame(transformed_data) + else: + transformed_df = pd.merge( + transformed_df, + input_df, + how="left", + on=join_keys, + ) + else: + # overwrite any transformed features and update the dictionary for k in input_dict: if k not in transformed_data: transformed_data[k] = input_dict[k] - transformed_df = pd.DataFrame(transformed_data) - else: - transformed_df = pd.merge( - transformed_df, - input_df, - how="left", - on=join_keys, - ) - else: - # overwrite any transformed features and update the dictionary - for k in input_dict: - if k not in transformed_data: - transformed_data[k] = input_dict[k] - return pd.DataFrame(transformed_data) + return pd.DataFrame(transformed_data) - elif feature_view.mode == "pandas" and isinstance( - feature_view.feature_transformation, PandasTransformation - ): - transformed_df = feature_view.feature_transformation.udf(df) - for col in df.columns: - transformed_df[col] = df[col] - return transformed_df - else: - raise Exception("Unsupported OnDemandFeatureView mode") + elif feature_view.mode == "pandas" and isinstance( + feature_view.feature_transformation, PandasTransformation + ): + transformed_df = feature_view.feature_transformation.udf(df) + for col in df.columns: + transformed_df[col] = df[col] + return transformed_df + else: + raise Exception("Unsupported OnDemandFeatureView mode") + finally: + if _should_track: + from feast.metrics import track_write_transformation + + track_write_transformation( + feature_view.name, + feature_view.mode, + _time.monotonic() - _t0, + ) def _validate_vector_features(self, feature_view, df: pd.DataFrame) -> None: """ diff --git a/sdk/python/feast/infra/feature_servers/base_config.py b/sdk/python/feast/infra/feature_servers/base_config.py index d6b650ced15..df324dc57d3 100644 --- a/sdk/python/feast/infra/feature_servers/base_config.py +++ b/sdk/python/feast/infra/feature_servers/base_config.py @@ -62,7 +62,12 @@ class MetricsConfig(FeastConfigBaseModel): online_features: StrictBool = True """Emit online feature retrieval metrics (feast_online_features_request_total, - feast_online_features_entity_count).""" + feast_online_features_entity_count, + feast_feature_server_online_store_read_duration_seconds, + feast_feature_server_transformation_duration_seconds, + feast_feature_server_write_transformation_duration_seconds). + ODFV transformation metrics additionally require track_metrics=True + on the OnDemandFeatureView definition.""" push: StrictBool = True """Emit push/write request counters @@ -70,7 +75,7 @@ class MetricsConfig(FeastConfigBaseModel): materialization: StrictBool = True """Emit materialization success/failure counters and duration histograms - (feast_materialization_total, + (feast_materialization_result_total, feast_materialization_duration_seconds).""" freshness: StrictBool = True diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index b77185229d5..49cb2c55ef2 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -185,6 +185,19 @@ def get_online_features( native_entity_values=True, ) + _track_read = False + try: + from feast.metrics import _config as _metrics_config + + _track_read = _metrics_config.online_features + except Exception: + pass + + if _track_read: + import time as _time + + _read_start = _time.monotonic() + for table, requested_features in grouped_refs: # Get the correct set of entity values with the correct join keys. table_entity_values, idxs, output_len = utils._get_unique_entities( @@ -218,6 +231,11 @@ def get_online_features( output_len, ) + if _track_read: + from feast.metrics import track_online_store_read + + track_online_store_read(_time.monotonic() - _read_start) + if requested_on_demand_feature_views: utils._augment_response_with_on_demand_transforms( online_features_response, @@ -293,6 +311,19 @@ async def query_table(table, requested_features): return idxs, read_rows, output_len + _track_read = False + try: + from feast.metrics import _config as _metrics_config + + _track_read = _metrics_config.online_features + except Exception: + pass + + if _track_read: + import time as _time + + _read_start = _time.monotonic() + all_responses = await asyncio.gather( *[ query_table(table, requested_features) @@ -318,6 +349,11 @@ async def query_table(table, requested_features): output_len, ) + if _track_read: + from feast.metrics import track_online_store_read + + track_online_store_read(_time.monotonic() - _read_start) + if requested_on_demand_feature_views: utils._augment_response_with_on_demand_transforms( online_features_response, diff --git a/sdk/python/feast/metrics.py b/sdk/python/feast/metrics.py index be2b068d32c..7786af6f2f5 100644 --- a/sdk/python/feast/metrics.py +++ b/sdk/python/feast/metrics.py @@ -211,8 +211,8 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag # --------------------------------------------------------------------------- # Materialization metrics # --------------------------------------------------------------------------- -materialization_total = Counter( - "feast_materialization_total", +materialization_result_total = Counter( + "feast_materialization_result_total", "Total materialization runs per feature view", ["feature_view", "status"], ) @@ -223,6 +223,27 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag buckets=(1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0, 1800.0, 3600.0), ) +# --------------------------------------------------------------------------- +# Sub-request timing — online store reads and ODFV transformations +# --------------------------------------------------------------------------- +online_store_read_duration_seconds = Histogram( + "feast_feature_server_online_store_read_duration_seconds", + "Duration of the online store read phase in seconds (covers all table reads including parallel async)", + buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0), +) +transformation_duration_seconds = Histogram( + "feast_feature_server_transformation_duration_seconds", + "Duration of on-demand feature view transformations on read in seconds", + ["odfv_name", "mode"], + buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0), +) +write_transformation_duration_seconds = Histogram( + "feast_feature_server_write_transformation_duration_seconds", + "Duration of on-demand feature view transformations on write in seconds", + ["odfv_name", "mode"], + buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0), +) + # --------------------------------------------------------------------------- # Feature freshness metrics — "max" shows the worst-case staleness across # processes (freshness is identical regardless of which process computes it). @@ -306,6 +327,31 @@ def track_push(push_source: str, mode: str): push_request_count.labels(push_source=push_source, mode=mode).inc() +def track_online_store_read(duration_seconds: float): + """Record the duration of the online store read phase.""" + if not _config.online_features: + return + online_store_read_duration_seconds.observe(duration_seconds) + + +def track_transformation(odfv_name: str, mode: str, duration_seconds: float): + """Record the duration of an on-demand feature view read-path transformation.""" + if not _config.online_features: + return + transformation_duration_seconds.labels(odfv_name=odfv_name, mode=mode).observe( + duration_seconds + ) + + +def track_write_transformation(odfv_name: str, mode: str, duration_seconds: float): + """Record the duration of an on-demand feature view write-path transformation.""" + if not _config.online_features: + return + write_transformation_duration_seconds.labels( + odfv_name=odfv_name, mode=mode + ).observe(duration_seconds) + + def track_materialization( feature_view_name: str, success: bool, duration_seconds: float ): @@ -313,7 +359,9 @@ def track_materialization( if not _config.materialization: return status = "success" if success else "failure" - materialization_total.labels(feature_view=feature_view_name, status=status).inc() + materialization_result_total.labels( + feature_view=feature_view_name, status=status + ).inc() materialization_duration_seconds.labels(feature_view=feature_view_name).observe( duration_seconds ) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 6430675f4e7..eaf3ca88ed8 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -131,6 +131,8 @@ class OnDemandFeatureView(BaseFeatureView): maintainer. """ + _TRACK_METRICS_TAG = "feast:track_metrics" + name: str entities: Optional[List[str]] features: List[Field] @@ -143,6 +145,7 @@ class OnDemandFeatureView(BaseFeatureView): owner: str write_to_online_store: bool singleton: bool + track_metrics: bool udf: Optional[FunctionType] udf_string: Optional[str] aggregations: List[Aggregation] @@ -163,6 +166,7 @@ def __init__( # noqa: C901 owner: str = "", write_to_online_store: bool = False, singleton: bool = False, + track_metrics: bool = False, aggregations: Optional[List[Aggregation]] = None, ): """ @@ -189,6 +193,11 @@ def __init__( # noqa: C901 the online store for faster retrieval. singleton (optional): A boolean that indicates whether the transformation is executed on a singleton (only applicable when mode="python"). + track_metrics (optional): Whether to emit Prometheus timing metrics + (``feast_feature_server_transformation_duration_seconds``) for + this ODFV. Defaults to ``False``. Set to ``True`` to opt in + to per-ODFV transformation duration tracking when the server + is started with metrics enabled. aggregations (optional): List of aggregations to apply before transformation. """ super().__init__( @@ -255,6 +264,7 @@ def __init__( # noqa: C901 raise ValueError( ODFVErrorMessages.singleton_mode_requires_python(self.mode) ) + self.track_metrics = track_metrics self.aggregations = aggregations or [] def _add_source_to_collections(self, odfv_source: OnDemandSourceType) -> None: @@ -318,6 +328,7 @@ def __copy__(self): owner=self.owner, write_to_online_store=self.write_to_online_store, singleton=self.singleton, + track_metrics=self.track_metrics, ) fv.entities = self.entities fv.features = self.features @@ -345,6 +356,7 @@ def __eq__(self, other): or self.write_to_online_store != other.write_to_online_store or sorted(self.entity_columns) != sorted(other.entity_columns) or self.singleton != other.singleton + or self.track_metrics != other.track_metrics or self.aggregations != other.aggregations ): return False @@ -471,6 +483,12 @@ def to_proto(self) -> OnDemandFeatureViewProto: feature_transformation = transformation_to_proto(self.feature_transformation) + tags = dict(self.tags) if self.tags else {} + if self.track_metrics: + tags[self._TRACK_METRICS_TAG] = "true" + else: + tags.pop(self._TRACK_METRICS_TAG, None) + spec = OnDemandFeatureViewSpec( name=self.name, entities=self.entities or None, @@ -482,7 +500,7 @@ def to_proto(self) -> OnDemandFeatureViewProto: feature_transformation=feature_transformation, mode=self.mode, description=self.description, - tags=self.tags, + tags=tags, owner=self.owner, write_to_online_store=self.write_to_online_store, singleton=self.singleton or False, @@ -519,6 +537,13 @@ def from_proto( on_demand_feature_view_proto ) + # Extract track_metrics from proto tags and strip the internal key + # so it doesn't leak into user-facing self.tags. + proto_tags = dict(on_demand_feature_view_proto.spec.tags) + track_metrics = ( + proto_tags.pop(cls._TRACK_METRICS_TAG, "false").lower() == "true" + ) + # Create the OnDemandFeatureView object on_demand_feature_view_obj = cls( name=on_demand_feature_view_proto.spec.name, @@ -527,10 +552,11 @@ def from_proto( feature_transformation=transformation, mode=on_demand_feature_view_proto.spec.mode or "pandas", description=on_demand_feature_view_proto.spec.description, - tags=dict(on_demand_feature_view_proto.spec.tags), + tags=proto_tags, owner=on_demand_feature_view_proto.spec.owner, write_to_online_store=optional_fields["write_to_online_store"], singleton=optional_fields["singleton"], + track_metrics=track_metrics, aggregations=optional_fields["aggregations"], ) @@ -1118,6 +1144,7 @@ def on_demand_feature_view( owner: str = "", write_to_online_store: bool = False, singleton: bool = False, + track_metrics: bool = False, explode: bool = False, ): """ @@ -1140,6 +1167,8 @@ def on_demand_feature_view( the online store for faster retrieval. singleton (optional): A boolean that indicates whether the transformation is executed on a singleton (only applicable when mode="python"). + track_metrics (optional): Whether to emit Prometheus timing metrics for this ODFV. + Defaults to False. Set to True to opt in when the server is started with metrics. explode (optional): A boolean that indicates whether the transformation explodes the input data into multiple rows. """ @@ -1164,6 +1193,7 @@ def decorator(user_function): write_to_online_store=write_to_online_store, entities=entities, singleton=singleton, + track_metrics=track_metrics, udf=user_function, udf_string=udf_string, ) diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 511186066c6..24d9d2f89a4 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -696,11 +696,27 @@ def _augment_response_with_on_demand_transforms( initial_response_arrow: Optional[pyarrow.Table] = None initial_response_dict: Optional[Dict[str, List[Any]]] = None + def _is_metrics_active(): + try: + from feast.metrics import _config + + return _config.online_features + except Exception: + return False + + _metrics_active = _is_metrics_active() + # Apply on demand transformations and augment the result rows odfv_result_names = set() for odfv_name, _feature_refs in odfv_feature_refs.items(): odfv = requested_odfv_map[odfv_name] if not odfv.write_to_online_store: + _should_track = _metrics_active and getattr(odfv, "track_metrics", False) + if _should_track: + import time as _time + + _transform_start = _time.monotonic() + # Apply aggregations if configured. if odfv.aggregations: if odfv.mode == "python": @@ -721,11 +737,12 @@ def _augment_response_with_on_demand_transforms( odfv.entities, odfv.mode, ) + continue # Apply transformation. Note: aggregations and transformation configs are mutually exclusive # TODO: Fix to make it work for having both aggregation and transformation # ticket: https://github.com/feast-dev/feast/issues/5689 - elif odfv.mode == "python": + if odfv.mode == "python": if initial_response_dict is None: initial_response_dict = initial_response.to_dict() transformed_features_dict: Dict[str, List[Any]] = odfv.transform_dict( @@ -742,6 +759,13 @@ def _augment_response_with_on_demand_transforms( f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas', 'python', or 'substrait'." ) + if _should_track: + from feast.metrics import track_transformation + + track_transformation( + odfv_name, odfv.mode, _time.monotonic() - _transform_start + ) + transformed_features = ( transformed_features_dict if odfv.mode == "python" diff --git a/sdk/python/tests/unit/test_metrics.py b/sdk/python/tests/unit/test_metrics.py index ba014064669..17aaa5d1337 100644 --- a/sdk/python/tests/unit/test_metrics.py +++ b/sdk/python/tests/unit/test_metrics.py @@ -20,17 +20,23 @@ from feast.metrics import ( feature_freshness_seconds, materialization_duration_seconds, - materialization_total, + materialization_result_total, online_features_entity_count, online_features_request_count, + online_store_read_duration_seconds, push_request_count, request_count, request_latency, track_materialization, track_online_features_entities, + track_online_store_read, track_push, track_request_latency, + track_transformation, + track_write_transformation, + transformation_duration_seconds, update_feature_freshness, + write_transformation_duration_seconds, ) @@ -201,12 +207,12 @@ def test_track_push_noop_when_disabled(self): def test_track_materialization_noop_when_disabled(self): self._all_off() - before = materialization_total.labels( + before = materialization_result_total.labels( feature_view="fv_disabled", status="success" )._value.get() track_materialization("fv_disabled", success=True, duration_seconds=1.0) assert ( - materialization_total.labels( + materialization_result_total.labels( feature_view="fv_disabled", status="success" )._value.get() == before @@ -267,12 +273,12 @@ def test_online_features_disabled_but_materialization_enabled(self): assert online_features_request_count._value.get() == before_of # materialization should still record - before_mat = materialization_total.labels( + before_mat = materialization_result_total.labels( feature_view="fv_gran", status="success" )._value.get() track_materialization("fv_gran", success=True, duration_seconds=1.0) assert ( - materialization_total.labels( + materialization_result_total.labels( feature_view="fv_gran", status="success" )._value.get() == before_mat + 1 @@ -298,7 +304,7 @@ def test_only_resource_enabled(self): before_push = push_request_count.labels( push_source="x", mode="offline" )._value.get() - before_mat = materialization_total.labels( + before_mat = materialization_result_total.labels( feature_view="fv_res", status="success" )._value.get() @@ -315,7 +321,7 @@ def test_only_resource_enabled(self): == before_push ) assert ( - materialization_total.labels( + materialization_result_total.labels( feature_view="fv_res", status="success" )._value.get() == before_mat @@ -445,24 +451,24 @@ def test_increments_push_counter(self): class TestTrackMaterialization: def test_success_counter(self): - before = materialization_total.labels( + before = materialization_result_total.labels( feature_view="fv1", status="success" )._value.get() track_materialization("fv1", success=True, duration_seconds=1.5) assert ( - materialization_total.labels( + materialization_result_total.labels( feature_view="fv1", status="success" )._value.get() == before + 1 ) def test_failure_counter(self): - before = materialization_total.labels( + before = materialization_result_total.labels( feature_view="fv2", status="failure" )._value.get() track_materialization("fv2", success=False, duration_seconds=0.5) assert ( - materialization_total.labels( + materialization_result_total.labels( feature_view="fv2", status="failure" )._value.get() == before + 1 @@ -824,3 +830,173 @@ def test_cleanup_runs_in_owner_process(self, tmp_path): m._prometheus_mp_dir = original_dir m._owns_mp_dir = original_owns m._owner_pid = original_pid + + +class TestTrackOnlineStoreRead: + """Tests for the online store read duration metric.""" + + def test_records_duration(self): + before_sum = online_store_read_duration_seconds._sum.get() + + track_online_store_read(0.123) + + assert online_store_read_duration_seconds._sum.get() >= before_sum + 0.123 + + def test_noop_when_online_features_disabled(self): + import feast.metrics as m + + m._config = m._MetricsFlags(enabled=True, online_features=False) + + before_sum = online_store_read_duration_seconds._sum.get() + + track_online_store_read(0.5) + + assert online_store_read_duration_seconds._sum.get() == before_sum + + m._config = m._MetricsFlags( + enabled=True, + resource=True, + request=True, + online_features=True, + push=True, + materialization=True, + freshness=True, + ) + + +class TestTrackTransformation: + """Tests for the ODFV transformation duration metric.""" + + def test_records_python_mode(self): + labels = ("my_odfv", "python") + before = transformation_duration_seconds._metrics.get(labels, None) + before_sum = before._sum.get() if before else 0.0 + + track_transformation("my_odfv", "python", 0.042) + + sample = transformation_duration_seconds._metrics[labels] + assert sample._sum.get() >= before_sum + 0.042 + + def test_records_pandas_mode(self): + labels = ("my_odfv", "pandas") + before = transformation_duration_seconds._metrics.get(labels, None) + before_sum = before._sum.get() if before else 0.0 + + track_transformation("my_odfv", "pandas", 0.15) + + sample = transformation_duration_seconds._metrics[labels] + assert sample._sum.get() >= before_sum + 0.15 + + def test_noop_when_online_features_disabled(self): + import feast.metrics as m + + m._config = m._MetricsFlags(enabled=True, online_features=False) + + labels = ("disabled_odfv", "python") + before = transformation_duration_seconds._metrics.get(labels, None) + before_sum = before._sum.get() if before else 0.0 + + track_transformation("disabled_odfv", "python", 1.0) + + sample = transformation_duration_seconds._metrics.get(labels, None) + after_sum = sample._sum.get() if sample else 0.0 + assert after_sum == before_sum + + m._config = m._MetricsFlags( + enabled=True, + resource=True, + request=True, + online_features=True, + push=True, + materialization=True, + freshness=True, + ) + + def test_multiple_odfvs_tracked_independently(self): + labels_a = ("odfv_a", "python") + labels_b = ("odfv_b", "pandas") + before_a = transformation_duration_seconds._metrics.get(labels_a, None) + before_a_sum = before_a._sum.get() if before_a else 0.0 + before_b = transformation_duration_seconds._metrics.get(labels_b, None) + before_b_sum = before_b._sum.get() if before_b else 0.0 + + track_transformation("odfv_a", "python", 0.01) + track_transformation("odfv_b", "pandas", 0.05) + + sample_a = transformation_duration_seconds._metrics[labels_a] + sample_b = transformation_duration_seconds._metrics[labels_b] + assert sample_a._sum.get() >= before_a_sum + 0.01 + assert sample_b._sum.get() >= before_b_sum + 0.05 + + +class TestTrackWriteTransformation: + """Tests for the write-path ODFV transformation duration metric.""" + + def test_records_python_mode(self): + labels = ("write_odfv", "python") + before = write_transformation_duration_seconds._metrics.get(labels, None) + before_sum = before._sum.get() if before else 0.0 + + track_write_transformation("write_odfv", "python", 0.033) + + sample = write_transformation_duration_seconds._metrics[labels] + assert sample._sum.get() >= before_sum + 0.033 + + def test_records_pandas_mode(self): + labels = ("write_odfv", "pandas") + before = write_transformation_duration_seconds._metrics.get(labels, None) + before_sum = before._sum.get() if before else 0.0 + + track_write_transformation("write_odfv", "pandas", 0.12) + + sample = write_transformation_duration_seconds._metrics[labels] + assert sample._sum.get() >= before_sum + 0.12 + + def test_noop_when_online_features_disabled(self): + import feast.metrics as m + + m._config = m._MetricsFlags(enabled=True, online_features=False) + + labels = ("disabled_write_odfv", "python") + before = write_transformation_duration_seconds._metrics.get(labels, None) + before_sum = before._sum.get() if before else 0.0 + + track_write_transformation("disabled_write_odfv", "python", 1.0) + + sample = write_transformation_duration_seconds._metrics.get(labels, None) + after_sum = sample._sum.get() if sample else 0.0 + assert after_sum == before_sum + + m._config = m._MetricsFlags( + enabled=True, + resource=True, + request=True, + online_features=True, + push=True, + materialization=True, + freshness=True, + ) + + def test_separate_from_read_transform_metric(self): + """Write and read transform metrics are independent histograms.""" + read_labels = ("shared_odfv", "python") + write_labels = ("shared_odfv", "python") + + read_before = transformation_duration_seconds._metrics.get(read_labels, None) + read_before_sum = read_before._sum.get() if read_before else 0.0 + write_before = write_transformation_duration_seconds._metrics.get( + write_labels, None + ) + write_before_sum = write_before._sum.get() if write_before else 0.0 + + track_transformation("shared_odfv", "python", 0.01) + track_write_transformation("shared_odfv", "python", 0.05) + + read_after = transformation_duration_seconds._metrics[read_labels] + write_after = write_transformation_duration_seconds._metrics[write_labels] + + read_delta = read_after._sum.get() - read_before_sum + write_delta = write_after._sum.get() - write_before_sum + + assert abs(read_delta - 0.01) < 0.001 + assert abs(write_delta - 0.05) < 0.001 diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index 505146aa612..07e9e6a0b94 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -418,3 +418,175 @@ def another_transform(features_df: pd.DataFrame) -> pd.DataFrame: deserialized = OnDemandFeatureView.from_proto(proto) assert deserialized.name == CUSTOM_FUNCTION_NAME + + +def test_track_metrics_defaults_to_false(): + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="my-feature-view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + odfv = OnDemandFeatureView( + name="metrics-default-odfv", + sources=[feature_view], + schema=[Field(name="output1", dtype=Float32)], + feature_transformation=PandasTransformation( + udf=udf1, udf_string="udf1 source code" + ), + ) + assert odfv.track_metrics is False + + +def test_track_metrics_true_persists_via_proto(): + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="my-feature-view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + odfv = OnDemandFeatureView( + name="tracked-metrics-odfv", + sources=[feature_view], + schema=[Field(name="output1", dtype=Float32)], + feature_transformation=PandasTransformation( + udf=udf1, udf_string="udf1 source code" + ), + track_metrics=True, + ) + assert odfv.track_metrics is True + + proto = odfv.to_proto() + assert proto.spec.tags.get("feast:track_metrics") == "true" + + restored = OnDemandFeatureView.from_proto(proto) + assert restored.track_metrics is True + assert "feast:track_metrics" not in restored.tags, ( + "Internal feast:track_metrics tag leaked into user-facing self.tags " + "after proto round-trip" + ) + + +def test_track_metrics_proto_roundtrip_preserves_user_tags(): + """User tags must survive a proto round-trip without internal tag pollution.""" + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="my-feature-view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + user_tags = {"team": "ml-platform", "priority": "high"} + odfv = OnDemandFeatureView( + name="tagged-odfv", + sources=[feature_view], + schema=[Field(name="output1", dtype=Float32)], + feature_transformation=PandasTransformation( + udf=udf1, udf_string="udf1 source code" + ), + tags=user_tags, + track_metrics=True, + ) + assert odfv.tags == user_tags + + proto = odfv.to_proto() + restored = OnDemandFeatureView.from_proto(proto) + + assert restored.tags == user_tags + assert restored.track_metrics is True + + +def test_track_metrics_false_not_stored_in_tags(): + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="my-feature-view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + odfv = OnDemandFeatureView( + name="no-metrics-odfv", + sources=[feature_view], + schema=[Field(name="output1", dtype=Float32)], + feature_transformation=PandasTransformation( + udf=udf1, udf_string="udf1 source code" + ), + track_metrics=False, + ) + proto = odfv.to_proto() + assert "feast:track_metrics" not in proto.spec.tags + + restored = OnDemandFeatureView.from_proto(proto) + assert restored.track_metrics is False + + +def test_copy_preserves_track_metrics(): + """__copy__ must carry track_metrics so FeatureService projections keep timing enabled.""" + import copy + + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="my-feature-view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + odfv = OnDemandFeatureView( + name="tracked-odfv", + sources=[feature_view], + schema=[Field(name="output1", dtype=Float32)], + feature_transformation=PandasTransformation( + udf=udf1, udf_string="udf1 source code" + ), + track_metrics=True, + ) + assert odfv.track_metrics is True + + copied = copy.copy(odfv) + assert copied.track_metrics is True, ( + "__copy__ lost track_metrics; ODFV timing metrics will be silently disabled " + "when using FeatureService projections" + ) + + +def test_eq_considers_track_metrics(): + """__eq__ must distinguish ODFVs that differ only in track_metrics.""" + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="my-feature-view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + common = dict( + name="eq-odfv", + sources=[feature_view], + schema=[Field(name="output1", dtype=Float32)], + feature_transformation=PandasTransformation( + udf=udf1, udf_string="udf1 source code" + ), + ) + odfv_tracked = OnDemandFeatureView(**common, track_metrics=True) + odfv_untracked = OnDemandFeatureView(**common, track_metrics=False) + + assert odfv_tracked != odfv_untracked