diff --git a/docs/reference/feature-servers/python-feature-server.md b/docs/reference/feature-servers/python-feature-server.md index 654c4b9f938..4802599866d 100644 --- a/docs/reference/feature-servers/python-feature-server.md +++ b/docs/reference/feature-servers/python-feature-server.md @@ -352,11 +352,14 @@ feature_server: push: true # push request counters materialization: true # materialization counters & duration freshness: true # feature freshness gauges + offline_features: true # offline store retrieval counters & latency + audit_logging: false # structured JSON audit logs (see below) ``` Any category set to `false` will emit no metrics and start no background threads (e.g., setting `freshness: false` prevents the registry polling -thread from starting). All categories default to `true`. +thread from starting). All categories default to `true` except +`audit_logging`, which defaults to `false`. ### Available metrics @@ -375,6 +378,9 @@ thread from starting). All categories default to `true`. | `feast_materialization_result_total` | Counter | `feature_view`, `status` | `materialization` | Materialization runs (success/failure) | | `feast_materialization_duration_seconds` | Histogram | `feature_view` | `materialization` | Materialization duration per feature view | | `feast_feature_freshness_seconds` | Gauge | `feature_view`, `project` | `freshness` | Seconds since last materialization | +| `feast_offline_store_request_total` | Counter | `method`, `status` | `offline_features` | Total offline store retrieval requests | +| `feast_offline_store_request_latency_seconds` | Histogram | `method` | `offline_features` | Latency of offline store retrieval operations | +| `feast_offline_store_row_count` | Histogram | `method` | `offline_features` | Rows returned by offline store retrieval | ### Per-ODFV transformation metrics @@ -405,6 +411,70 @@ The `odfv_name` label lets you filter or group by individual ODFV, and the `mode` label (`python`, `pandas`, `substrait`) lets you compare transformation engines. +### Audit logging + +Feast can emit structured JSON audit log entries for every online and offline +feature retrieval. These are written via the standard `feast.audit` Python +logger, so you can route them to a dedicated file, SIEM, or log aggregator +independently of application logs. + +Audit logging is **disabled by default**. Enable it in `feature_store.yaml`: + +```yaml +feature_server: + type: local + metrics: + enabled: true + audit_logging: true +``` + +**Online audit log** (emitted per `/get-online-features` call): + +```json +{ + "event": "online_feature_request", + "timestamp": "2026-05-11T08:30:00.123456+00:00", + "requestor_id": "user@example.com", + "entity_keys": ["driver_id"], + "entity_count": 3, + "feature_views": ["driver_hourly_stats"], + "feature_count": 3, + "status": "success", + "latency_ms": 12.34 +} +``` + +**Offline audit log** (emitted per `RetrievalJob.to_arrow()` call): + +```json +{ + "event": "offline_feature_retrieval", + "timestamp": "2026-05-11T08:31:00.456789+00:00", + "method": "to_arrow", + "start_time": "2026-05-11T08:30:59.226789+00:00", + "end_time": "2026-05-11T08:31:00.456789+00:00", + "feature_views": ["driver_hourly_stats"], + "feature_count": 3, + "row_count": 500, + "status": "success", + "duration_ms": 1230.0 +} +``` + +The `requestor_id` field in online audit logs is populated from the +security manager's current user when authentication is configured, and +falls back to `"anonymous"` otherwise. + +To route audit logs to a separate file: + +```python +import logging + +handler = logging.FileHandler("/var/log/feast/audit.log") +handler.setFormatter(logging.Formatter("%(message)s")) +logging.getLogger("feast.audit").addHandler(handler) +``` + ### Scraping with Prometheus ```yaml diff --git a/infra/feast-operator/config/samples/v1_featurestore_serving.yaml b/infra/feast-operator/config/samples/v1_featurestore_serving.yaml index f60640624c9..412499412e6 100644 --- a/infra/feast-operator/config/samples/v1_featurestore_serving.yaml +++ b/infra/feast-operator/config/samples/v1_featurestore_serving.yaml @@ -26,8 +26,8 @@ spec: push: true # push/write request counters materialization: true # materialization counters and duration histograms freshness: false # feature freshness gauges (can be expensive at scale) - # Example: when a future SDK adds "registry_sync", enable it here - # registry_sync: false + offline_features: true # offline store retrieval counters, latency, row count + audit_logging: false # structured JSON audit logs via the feast.audit logger offlinePushBatching: enabled: true batchSize: 1000 # max rows per offline write batch diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 81359222797..bba91130db3 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -152,28 +152,71 @@ class ChatRequest(BaseModel): messages: List[ChatMessage] -def _resolve_feature_counts( +def _parse_feature_info( features: Union[List[str], "feast.FeatureService"], ) -> tuple: - """Return (feature_count, feature_view_count) from the resolved features. + """Return ``(feature_view_names, feature_count)`` from resolved features. ``features`` is either a list of ``"feature_view:feature"`` strings or a ``FeatureService`` with ``feature_view_projections``. + + Returns: + (fv_names, feat_count) where fv_names is a list of unique feature + view name strings and feat_count is the total number of features. """ from feast.feature_service import FeatureService + from feast.utils import _parse_feature_ref if isinstance(features, FeatureService): projections = features.feature_view_projections - fv_count = len(projections) + fv_names = [p.name for p in projections] feat_count = sum(len(p.features) for p in projections) elif isinstance(features, list): feat_count = len(features) - fv_names = {ref.split(":")[0].split("@")[0] for ref in features if ":" in ref} - fv_count = len(fv_names) + fv_names = list({_parse_feature_ref(ref)[0] for ref in features if ":" in ref}) else: + fv_names = [] feat_count = 0 - fv_count = 0 - return str(feat_count), str(fv_count) + return fv_names, feat_count + + +def _resolve_feature_counts( + features: Union[List[str], "feast.FeatureService"], +) -> tuple: + """Return ``(feature_count_str, feature_view_count_str)`` for Prometheus labels.""" + fv_names, feat_count = _parse_feature_info(features) + return str(feat_count), str(len(fv_names)) + + +def _emit_online_audit( + request: GetOnlineFeaturesRequest, + features: Union[List[str], "feast.FeatureService"], + entity_count: int, + status: str, + latency_ms: float, +): + """Best-effort audit log emission for online feature requests.""" + try: + from feast.permissions.security_manager import get_security_manager + + requestor_id = "anonymous" + sm = get_security_manager() + if sm and sm.current_user: + requestor_id = sm.current_user.username or "anonymous" + + fv_names, feat_count = _parse_feature_info(features) + + feast_metrics.emit_online_audit_log( + requestor_id=requestor_id, + entity_keys=list(request.entities.keys()), + entity_count=entity_count, + feature_views=fv_names, + feature_count=feat_count, + status=status, + latency_ms=latency_ms, + ) + except Exception: + logger.warning("Failed to emit online audit log", exc_info=True) async def _get_features( @@ -390,11 +433,22 @@ async def get_online_features(request: GetOnlineFeaturesRequest) -> Any: include_feature_view_version_metadata=request.include_feature_view_version_metadata, ) - if store._get_provider().async_supported.online.read: - response = await store.get_online_features_async(**read_params) # type: ignore - else: - response = await run_in_threadpool( - lambda: store.get_online_features(**read_params) # type: ignore + audit_start_ms = time.monotonic() * 1000 + audit_status = "success" + try: + if store._get_provider().async_supported.online.read: + response = await store.get_online_features_async(**read_params) # type: ignore + else: + response = await run_in_threadpool( + lambda: store.get_online_features(**read_params) # type: ignore + ) + except Exception: + audit_status = "error" + raise + finally: + audit_latency_ms = time.monotonic() * 1000 - audit_start_ms + _emit_online_audit( + request, features, entity_count, audit_status, audit_latency_ms ) response_dict = await run_in_threadpool( diff --git a/sdk/python/feast/infra/feature_servers/base_config.py b/sdk/python/feast/infra/feature_servers/base_config.py index df324dc57d3..14ad2fe505e 100644 --- a/sdk/python/feast/infra/feature_servers/base_config.py +++ b/sdk/python/feast/infra/feature_servers/base_config.py @@ -82,6 +82,17 @@ class MetricsConfig(FeastConfigBaseModel): """Emit per-feature-view freshness gauges (feast_feature_freshness_seconds).""" + offline_features: StrictBool = True + """Emit offline store retrieval metrics + (feast_offline_store_request_total, + feast_offline_store_request_latency_seconds, + feast_offline_store_row_count).""" + + audit_logging: StrictBool = False + """Emit structured JSON audit log entries for online and offline + feature requests via the ``feast.audit`` logger. Captures requestor + identity, entity keys, feature views, row counts, and latency.""" + class BaseFeatureServerConfig(FeastConfigBaseModel): """Base Feature Server config that should be extended""" diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 4ae0c680c3b..24e1e743953 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -11,9 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import time import warnings from abc import ABC -from datetime import datetime +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import ( TYPE_CHECKING, @@ -70,6 +72,23 @@ def __init__( self.max_event_timestamp = max_event_timestamp +def _extract_retrieval_metadata(job: "RetrievalJob") -> tuple: + """Return ``(feature_view_names, feature_count)`` from a RetrievalJob's metadata.""" + from feast.utils import _parse_feature_ref + + try: + meta = job.metadata + if meta: + feature_count = len(meta.features) + feature_views = list( + {_parse_feature_ref(ref)[0] for ref in meta.features if ":" in ref} + ) + return feature_views, feature_count + except (NotImplementedError, AttributeError): + pass + return [], 0 + + class RetrievalJob(ABC): """A RetrievalJob manages the execution of a query to retrieve data from the offline store.""" @@ -152,7 +171,51 @@ def to_arrow( validation_reference (optional): The validation to apply against the retrieved dataframe. timeout (optional): The query timeout if applicable. """ - features_table = self._to_arrow_internal(timeout=timeout) + start_wall = time.monotonic() + status_label = "success" + row_count = 0 + try: + features_table = self._to_arrow_internal(timeout=timeout) + row_count = features_table.num_rows + except Exception: + status_label = "error" + raise + finally: + try: + from feast import metrics as feast_metrics + + elapsed = time.monotonic() - start_wall + + if feast_metrics._config.offline_features: + feast_metrics.offline_store_request_total.labels( + method="to_arrow", status=status_label + ).inc() + feast_metrics.offline_store_request_latency_seconds.labels( + method="to_arrow" + ).observe(elapsed) + feast_metrics.offline_store_row_count.labels( + method="to_arrow" + ).observe(row_count) + + if feast_metrics._config.audit_logging: + feature_views, feature_count = _extract_retrieval_metadata(self) + end_dt = datetime.now(tz=timezone.utc) + start_dt = end_dt - timedelta(seconds=elapsed) + feast_metrics.emit_offline_audit_log( + method="to_arrow", + feature_views=feature_views, + feature_count=feature_count, + row_count=row_count, + status=status_label, + start_time=start_dt.isoformat(), + end_time=end_dt.isoformat(), + duration_ms=elapsed * 1000, + ) + except Exception: + logging.getLogger(__name__).debug( + "Failed to record offline store metrics", exc_info=True + ) + if self.on_demand_feature_views: # Build a mapping of ODFV name to requested feature names # This ensures we only return the features that were explicitly requested diff --git a/sdk/python/feast/metrics.py b/sdk/python/feast/metrics.py index 694f25a687e..13a855d587b 100644 --- a/sdk/python/feast/metrics.py +++ b/sdk/python/feast/metrics.py @@ -42,6 +42,7 @@ """ import atexit +import json import logging import os import shutil @@ -51,7 +52,7 @@ from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime, timezone -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, List, Optional import psutil @@ -123,6 +124,8 @@ class _MetricsFlags: push: bool = False materialization: bool = False freshness: bool = False + offline_features: bool = False + audit_logging: bool = False _config = _MetricsFlags() @@ -144,6 +147,8 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag push=True, materialization=True, freshness=True, + offline_features=True, + audit_logging=False, ) return _MetricsFlags( enabled=True, @@ -153,6 +158,8 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag push=getattr(metrics_config, "push", True), materialization=getattr(metrics_config, "materialization", True), freshness=getattr(metrics_config, "freshness", True), + offline_features=getattr(metrics_config, "offline_features", True), + audit_logging=getattr(metrics_config, "audit_logging", False), ) @@ -260,6 +267,33 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag multiprocess_mode="max", ) +# --------------------------------------------------------------------------- +# Offline store retrieval metrics +# --------------------------------------------------------------------------- +offline_store_request_total = Counter( + "feast_offline_store_request_total", + "Total offline store retrieval requests", + ["method", "status"], +) +offline_store_request_latency_seconds = Histogram( + "feast_offline_store_request_latency_seconds", + "Latency of offline store retrieval operations in seconds", + ["method"], + buckets=(0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0), +) +offline_store_row_count = Histogram( + "feast_offline_store_row_count", + "Number of rows returned by offline store retrieval", + ["method"], + buckets=(100, 1000, 10000, 100000, 500000, 1000000, 5000000), +) + +# --------------------------------------------------------------------------- +# Audit logger — separate from the main feast logger so operators can +# route SOX-style audit entries to a dedicated sink. +# --------------------------------------------------------------------------- +audit_logger = logging.getLogger("feast.audit") + # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -388,6 +422,72 @@ def track_materialization( ) +def emit_online_audit_log( + *, + requestor_id: str, + entity_keys: List[str], + entity_count: int, + feature_views: List[str], + feature_count: int, + status: str, + latency_ms: float, +): + """Emit a structured JSON audit log entry for an online feature request.""" + if not _config.audit_logging: + return + audit_logger.info( + _json_dumps( + { + "event": "online_feature_request", + "timestamp": datetime.now(tz=timezone.utc).isoformat(), + "requestor_id": requestor_id, + "entity_keys": entity_keys, + "entity_count": entity_count, + "feature_views": feature_views, + "feature_count": feature_count, + "status": status, + "latency_ms": round(latency_ms, 2), + } + ) + ) + + +def emit_offline_audit_log( + *, + method: str, + feature_views: List[str], + feature_count: int, + row_count: int, + status: str, + start_time: str, + end_time: str, + duration_ms: float, +): + """Emit a structured JSON audit log entry for an offline feature retrieval.""" + if not _config.audit_logging: + return + audit_logger.info( + _json_dumps( + { + "event": "offline_feature_retrieval", + "timestamp": datetime.now(tz=timezone.utc).isoformat(), + "method": method, + "start_time": start_time, + "end_time": end_time, + "feature_views": feature_views, + "feature_count": feature_count, + "row_count": row_count, + "status": status, + "duration_ms": round(duration_ms, 2), + } + ) + ) + + +def _json_dumps(obj: dict) -> str: + return json.dumps(obj, separators=(",", ":")) + + def update_feature_freshness( store: "FeatureStore", ) -> None: @@ -507,6 +607,8 @@ def start_metrics_server( push=True, materialization=True, freshness=True, + offline_features=True, + audit_logging=False, ) from prometheus_client import CollectorRegistry, make_wsgi_app diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 28fe86602ad..0ba74a23d37 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -1,594 +1,594 @@ -import base64 -import importlib -import json -import logging -import os -import random -import re -import sys -import tempfile -from importlib.abc import Loader -from importlib.machinery import ModuleSpec -from pathlib import Path -from typing import List, Optional, Set, Union - -import click -from click.exceptions import BadParameter - -from feast import PushSource -from feast.batch_feature_view import BatchFeatureView -from feast.constants import FEATURE_STORE_YAML_ENV_NAME -from feast.data_source import DataSource, KafkaSource, KinesisSource -from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add -from feast.entity import Entity -from feast.feature_service import FeatureService -from feast.feature_store import FeatureStore -from feast.feature_view import DUMMY_ENTITY, FeatureView -from feast.file_utils import replace_str_in_file -from feast.infra.registry.base_registry import BaseRegistry -from feast.infra.registry.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry -from feast.names import adjectives, animals -from feast.on_demand_feature_view import OnDemandFeatureView -from feast.permissions.permission import Permission -from feast.project import Project -from feast.repo_config import RepoConfig -from feast.repo_contents import RepoContents -from feast.stream_feature_view import StreamFeatureView - -logger = logging.getLogger(__name__) - - -def py_path_to_module(path: Path) -> str: - return ( - str(path.relative_to(os.getcwd()))[: -len(".py")] - .replace("./", "") - .replace("/", ".") - .replace("\\", ".") - ) - - -def read_feastignore(repo_root: Path) -> List[str]: - """Read .feastignore in the repo root directory (if exists) and return the list of user-defined ignore paths""" - feast_ignore = repo_root / ".feastignore" - if not feast_ignore.is_file(): - return [] - lines = feast_ignore.read_text().strip().split("\n") - ignore_paths = [] - for line in lines: - # Remove everything after the first occurance of "#" symbol (comments) - if line.find("#") >= 0: - line = line[: line.find("#")] - # Strip leading or ending whitespaces - line = line.strip() - # Add this processed line to ignore_paths if it's not empty - if len(line) > 0: - ignore_paths.append(line) - return ignore_paths - - -def get_ignore_files(repo_root: Path, ignore_paths: List[str]) -> Set[Path]: - """Get all ignore files that match any of the user-defined ignore paths""" - ignore_files = set() - for ignore_path in set(ignore_paths): - # ignore_path may contains matchers (* or **). Use glob() to match user-defined path to actual paths - for matched_path in repo_root.glob(ignore_path): - if matched_path.is_file(): - # If the matched path is a file, add that to ignore_files set - ignore_files.add(matched_path.resolve()) - else: - # Otherwise, list all Python files in that directory and add all of them to ignore_files set - ignore_files |= { - sub_path.resolve() - for sub_path in matched_path.glob("**/*.py") - if sub_path.is_file() - } - return ignore_files - - -def get_repo_files(repo_root: Path) -> List[Path]: - """Get the list of all repo files, ignoring undesired files & directories specified in .feastignore""" - # Read ignore paths from .feastignore and create a set of all files that match any of these paths - ignore_paths = read_feastignore(repo_root) + [ - ".git", - ".feastignore", - ".venv", - "**/.ipynb_checkpoints", - "**/.pytest_cache", - "**/__pycache__", - ] - ignore_files = get_ignore_files(repo_root, ignore_paths) - - # List all Python files in the root directory (recursively) - repo_files = { - p.resolve() - for p in repo_root.glob("**/*.py") - if p.is_file() and "__init__.py" != p.name - } - # Ignore all files that match any of the ignore paths in .feastignore - repo_files -= ignore_files - - # Sort repo_files to read them in the same order every time - return sorted(repo_files) - - -def parse_repo(repo_root: Path) -> RepoContents: - """ - Collects unique Feast object definitions from the given feature repo. - - Specifically, if an object foo has already been added, bar will still be added if - (bar == foo), but not if (bar is foo). This ensures that import statements will - not result in duplicates, but defining two equal objects will. - """ - res = RepoContents( - projects=[], - data_sources=[], - entities=[], - feature_views=[], - feature_services=[], - on_demand_feature_views=[], - stream_feature_views=[], - permissions=[], - ) - - for repo_file in get_repo_files(repo_root): - module_path = py_path_to_module(repo_file) - module = importlib.import_module(module_path) - - for attr_name in dir(module): - obj = getattr(module, attr_name) - - if isinstance(obj, DataSource) and not any( - (obj is ds) for ds in res.data_sources - ): - res.data_sources.append(obj) - - # Handle batch sources defined within stream sources. - if ( - isinstance(obj, PushSource) - or isinstance(obj, KafkaSource) - or isinstance(obj, KinesisSource) - ): - batch_source = obj.batch_source - - if batch_source and not any( - (batch_source is ds) for ds in res.data_sources - ): - res.data_sources.append(batch_source) - if ( - isinstance(obj, FeatureView) - and not any((obj is fv) for fv in res.feature_views) - and not isinstance(obj, StreamFeatureView) - and not isinstance(obj, BatchFeatureView) - ): - res.feature_views.append(obj) - - # Handle batch sources defined with feature views. - batch_source = obj.batch_source - if batch_source is not None and not any( - (batch_source is ds) for ds in res.data_sources - ): - res.data_sources.append(batch_source) - - # Handle stream sources defined with feature views. - if obj.stream_source: - stream_source = obj.stream_source - if not any((stream_source is ds) for ds in res.data_sources): - res.data_sources.append(stream_source) - elif isinstance(obj, StreamFeatureView) and not any( - (obj is sfv) for sfv in res.stream_feature_views - ): - res.stream_feature_views.append(obj) - - # Handle batch sources defined with feature views. - batch_source = obj.batch_source - if batch_source is not None and not any( - (batch_source is ds) for ds in res.data_sources - ): - res.data_sources.append(batch_source) - assert obj.stream_source - stream_source = obj.stream_source - if not any((stream_source is ds) for ds in res.data_sources): - res.data_sources.append(stream_source) - elif isinstance(obj, BatchFeatureView) and not any( - (obj is bfv) for bfv in res.feature_views - ): - res.feature_views.append(obj) - - # Handle batch sources defined with feature views. - batch_source = obj.batch_source - if batch_source is not None and not any( - (batch_source is ds) for ds in res.data_sources - ): - res.data_sources.append(batch_source) - elif isinstance(obj, Entity) and not any( - (obj is entity) for entity in res.entities - ): - res.entities.append(obj) - elif isinstance(obj, FeatureService) and not any( - (obj is fs) for fs in res.feature_services - ): - res.feature_services.append(obj) - elif isinstance(obj, OnDemandFeatureView) and not any( - (obj is odfv) for odfv in res.on_demand_feature_views - ): - res.on_demand_feature_views.append(obj) - elif isinstance(obj, Permission) and not any( - (obj is p) for p in res.permissions - ): - res.permissions.append(obj) - elif isinstance(obj, Project) and not any((obj is p) for p in res.projects): - res.projects.append(obj) - - res.entities.append(DUMMY_ENTITY) - return res - - -def plan( - repo_config: RepoConfig, - repo_path: Path, - skip_source_validation: bool, - skip_feature_view_validation: bool = False, -): - os.chdir(repo_path) - repo = _get_repo_contents(repo_path, repo_config.project, repo_config) - for project in repo.projects: - repo_config.project = project.name - store, registry = _get_store_and_registry(repo_config) - # TODO: When we support multiple projects in a single repo, we should filter repo contents by project - if not skip_source_validation: - provider = store._get_provider() - data_sources = [ - t.batch_source for t in repo.feature_views if t.batch_source is not None - ] - # Make sure the data source used by this feature view is supported by Feast - for data_source in data_sources: - provider.validate_data_source(store.config, data_source) - - registry_diff, infra_diff, _ = store.plan( - repo, skip_feature_view_validation=skip_feature_view_validation - ) - click.echo(registry_diff.to_string()) - click.echo(infra_diff.to_string()) - - -def _get_repo_contents( - repo_path, - project_name: Optional[str] = None, - repo_config: Optional[RepoConfig] = None, -): - sys.dont_write_bytecode = True - repo = parse_repo(repo_path) - - if len(repo.projects) < 1: - if project_name: - print( - f"No project found in the repository. Using project name {project_name} defined in feature_store.yaml" - ) - project_description = ( - repo_config.project_description if repo_config else None - ) - repo.projects.append( - Project(name=project_name, description=project_description or "") - ) - else: - print( - "No project found in the repository. Either define Project in repository or define a project in feature_store.yaml" - ) - sys.exit(1) - elif len(repo.projects) == 1: - if repo.projects[0].name != project_name: - print( - "Project object name should match with the project name defined in feature_store.yaml" - ) - sys.exit(1) - else: - print( - "Multiple projects found in the repository. Currently no support for multiple projects" - ) - sys.exit(1) - - return repo - - -def _get_store_and_registry(repo_config): - store = FeatureStore(config=repo_config) - registry = store.registry - return store, registry - - -def extract_objects_for_apply_delete(project, registry, repo): - # TODO(achals): This code path should be refactored to handle added & kept entities separately. - ( - _, - objs_to_delete, - objs_to_update, - objs_to_add, - ) = extract_objects_for_keep_delete_update_add(registry, project, repo) - - all_to_apply: List[ - Union[ - Entity, - FeatureView, - OnDemandFeatureView, - StreamFeatureView, - FeatureService, - ] - ] = [] - for object_type in FEAST_OBJECT_TYPES: - to_apply = set(objs_to_add[object_type]).union(objs_to_update[object_type]) - all_to_apply.extend(to_apply) - - all_to_delete: List[ - Union[ - Entity, - FeatureView, - OnDemandFeatureView, - StreamFeatureView, - FeatureService, - ] - ] = [] - for object_type in FEAST_OBJECT_TYPES: - all_to_delete.extend(objs_to_delete[object_type]) - - return ( - all_to_apply, - all_to_delete, - set(objs_to_add[FeastObjectType.FEATURE_VIEW]).union( - set(objs_to_update[FeastObjectType.FEATURE_VIEW]) - ), - objs_to_delete[FeastObjectType.FEATURE_VIEW], - ) - - -def apply_total_with_repo_instance( - store: FeatureStore, - project_name: str, - registry: BaseRegistry, - repo: RepoContents, - skip_source_validation: bool, - skip_feature_view_validation: bool = False, - no_promote: bool = False, -): - if not skip_source_validation: - provider = store._get_provider() - data_sources = [ - t.batch_source for t in repo.feature_views if t.batch_source is not None - ] - # Make sure the data source used by this feature view is supported by Feast - for data_source in data_sources: - provider.validate_data_source(store.config, data_source) - - # For each object in the registry, determine whether it should be kept or deleted. - ( - all_to_apply, - all_to_delete, - views_to_keep, - views_to_delete, - ) = extract_objects_for_apply_delete(project_name, registry, repo) - - try: - if store._should_use_plan(): - # Planning phase - compute diffs first without progress bars - registry_diff, infra_diff, new_infra = store.plan( - repo, - skip_feature_view_validation=skip_feature_view_validation, - ) - click.echo(registry_diff.to_string()) - - # Only show progress bars if there are actual infrastructure changes - progress_ctx = None - if len(infra_diff.infra_object_diffs) > 0: - from feast.diff.apply_progress import ApplyProgressContext - - progress_ctx = ApplyProgressContext() - progress_ctx.start_overall_progress() - - # Apply phase - store._apply_diffs( - registry_diff, - infra_diff, - new_infra, - progress_ctx=progress_ctx, - no_promote=no_promote, - ) - click.echo(infra_diff.to_string()) - else: - # Legacy apply path - no progress bars for legacy path - store.apply( - all_to_apply, - objects_to_delete=all_to_delete, - partial=False, - skip_feature_view_validation=skip_feature_view_validation, - no_promote=no_promote, - ) - log_infra_changes(views_to_keep, views_to_delete) - finally: - # Cleanup is handled in the new _apply_diffs method - pass - - -def log_infra_changes( - views_to_keep: Set[FeatureView], views_to_delete: Set[FeatureView] -): - from colorama import Fore, Style - - for view in views_to_keep: - click.echo( - f"Deploying infrastructure for {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}" - ) - for view in views_to_delete: - click.echo( - f"Removing infrastructure for {Style.BRIGHT + Fore.RED}{view.name}{Style.RESET_ALL}" - ) - - -def create_feature_store( - ctx: click.Context, -) -> FeatureStore: - repo = ctx.obj["CHDIR"] - # If we received a base64 encoded version of feature_store.yaml, use that - config_base64 = os.getenv(FEATURE_STORE_YAML_ENV_NAME) - if config_base64: - print("Received base64 encoded feature_store.yaml") - config_bytes = base64.b64decode(config_base64) - # Create a new unique directory for writing feature_store.yaml - repo_path = Path(tempfile.mkdtemp()) - with open(repo_path / "feature_store.yaml", "wb") as f: - f.write(config_bytes) - return FeatureStore(repo_path=str(repo_path.resolve())) - else: - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - return FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) - - -def apply_total( - repo_config: RepoConfig, - repo_path: Path, - skip_source_validation: bool, - skip_feature_view_validation: bool = False, - no_promote: bool = False, -): - os.chdir(repo_path) - repo = _get_repo_contents(repo_path, repo_config.project, repo_config) - for project in repo.projects: - repo_config.project = project.name - store, registry = _get_store_and_registry(repo_config) - if not is_valid_name(project.name): - print( - f"{project.name} is not valid. Project name should only have " - f"alphanumerical values, underscores, and hyphens but not start with an underscore or hyphen." - ) - sys.exit(1) - # TODO: When we support multiple projects in a single repo, we should filter repo contents by project. Currently there is no way to associate Feast objects to project. - print(f"Applying changes for project {project.name}") - apply_total_with_repo_instance( - store, - project.name, - registry, - repo, - skip_source_validation, - skip_feature_view_validation, - no_promote=no_promote, - ) - - -def teardown(repo_config: RepoConfig, repo_path: Optional[str]): - # Cannot pass in both repo_path and repo_config to FeatureStore. - feature_store = FeatureStore(repo_path=repo_path, config=repo_config) - feature_store.teardown() - - -def registry_dump(repo_config: RepoConfig, repo_path: Path) -> str: - """For debugging only: output contents of the metadata registry""" - registry_config = repo_config.registry - project = repo_config.project - registry = Registry( - project, - registry_config=registry_config, - repo_path=repo_path, - auth_config=repo_config.auth_config, - ) - registry_dict = registry.to_dict(project=project) - return json.dumps(registry_dict, indent=2, sort_keys=True) - - -def cli_check_repo(repo_path: Path, fs_yaml_file: Path): - sys.path.append(str(repo_path)) - if not fs_yaml_file.exists(): - print( - f"Can't find feature repo configuration file at {fs_yaml_file}. " - "Make sure you're running feast from an initialized feast repository." - ) - sys.exit(1) - - -def init_repo(repo_name: str, template: str, repo_path: Optional[str] = None): - import os - from pathlib import Path - from shutil import copytree - - from colorama import Fore, Style - - # Validate project name - if not is_valid_name(repo_name): - raise BadParameter( - message="Name should be alphanumeric values, underscores, and hyphens but not start with an underscore or hyphen", - param_hint="PROJECT_DIRECTORY", - ) - - # Determine where to create the repository - if repo_path: - # User specified a custom path - target_path = Path(repo_path).resolve() - target_path.mkdir(parents=True, exist_ok=True) - display_path = repo_path - else: - # Default behavior: create subdirectory with project name - target_path = Path(os.path.join(Path.cwd(), repo_name)) - target_path.mkdir(exist_ok=True) - display_path = repo_name - - repo_config_path = target_path / "feature_store.yaml" - - if repo_config_path.exists(): - print( - f"The directory {Style.BRIGHT + Fore.GREEN}{display_path}{Style.RESET_ALL} contains an existing feature " - f"store repository that may cause a conflict" - ) - print() - sys.exit(1) - - # Copy template directory - template_path = str(Path(Path(__file__).parent / "templates" / template).absolute()) - if not os.path.exists(template_path): - raise IOError(f"Could not find template {template}") - copytree(template_path, str(target_path), dirs_exist_ok=True) - - # Rename gitignore files back to .gitignore - for gitignore_path in target_path.rglob("gitignore"): - gitignore_path.rename(gitignore_path.with_name(".gitignore")) - - # Seed the repository - bootstrap_path = target_path / "bootstrap.py" - if os.path.exists(bootstrap_path): - import importlib.util - - spec = importlib.util.spec_from_file_location("bootstrap", str(bootstrap_path)) - assert isinstance(spec, ModuleSpec) - bootstrap = importlib.util.module_from_spec(spec) - assert isinstance(spec.loader, Loader) - spec.loader.exec_module(bootstrap) - bootstrap.bootstrap() # type: ignore - os.remove(bootstrap_path) - - # Template the feature_store.yaml file - feature_store_yaml_path = target_path / "feature_repo" / "feature_store.yaml" - replace_str_in_file( - feature_store_yaml_path, "project: my_project", f"project: {repo_name}" - ) - - # Remove the __pycache__ folder if it exists - import shutil - - shutil.rmtree(target_path / "__pycache__", ignore_errors=True) - - import click - - click.echo() - click.echo( - f"Creating a new Feast repository in {Style.BRIGHT + Fore.GREEN}{target_path}{Style.RESET_ALL}." - ) - click.echo() - - -def is_valid_name(name: str) -> bool: - """A name should be alphanumeric values, underscores, and hyphens but not start with an underscore""" - return ( - not name.startswith(("_", "-")) and re.compile(r"[^\w-]+").search(name) is None - ) - - -def generate_project_name() -> str: - """Generates a unique project name""" - return f"{random.choice(adjectives)}_{random.choice(animals)}" +import base64 +import importlib +import json +import logging +import os +import random +import re +import sys +import tempfile +from importlib.abc import Loader +from importlib.machinery import ModuleSpec +from pathlib import Path +from typing import List, Optional, Set, Union + +import click +from click.exceptions import BadParameter + +from feast import PushSource +from feast.batch_feature_view import BatchFeatureView +from feast.constants import FEATURE_STORE_YAML_ENV_NAME +from feast.data_source import DataSource, KafkaSource, KinesisSource +from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add +from feast.entity import Entity +from feast.feature_service import FeatureService +from feast.feature_store import FeatureStore +from feast.feature_view import DUMMY_ENTITY, FeatureView +from feast.file_utils import replace_str_in_file +from feast.infra.registry.base_registry import BaseRegistry +from feast.infra.registry.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry +from feast.names import adjectives, animals +from feast.on_demand_feature_view import OnDemandFeatureView +from feast.permissions.permission import Permission +from feast.project import Project +from feast.repo_config import RepoConfig +from feast.repo_contents import RepoContents +from feast.stream_feature_view import StreamFeatureView + +logger = logging.getLogger(__name__) + + +def py_path_to_module(path: Path) -> str: + return ( + str(path.relative_to(os.getcwd()))[: -len(".py")] + .replace("./", "") + .replace("/", ".") + .replace("\\", ".") + ) + + +def read_feastignore(repo_root: Path) -> List[str]: + """Read .feastignore in the repo root directory (if exists) and return the list of user-defined ignore paths""" + feast_ignore = repo_root / ".feastignore" + if not feast_ignore.is_file(): + return [] + lines = feast_ignore.read_text().strip().split("\n") + ignore_paths = [] + for line in lines: + # Remove everything after the first occurance of "#" symbol (comments) + if line.find("#") >= 0: + line = line[: line.find("#")] + # Strip leading or ending whitespaces + line = line.strip() + # Add this processed line to ignore_paths if it's not empty + if len(line) > 0: + ignore_paths.append(line) + return ignore_paths + + +def get_ignore_files(repo_root: Path, ignore_paths: List[str]) -> Set[Path]: + """Get all ignore files that match any of the user-defined ignore paths""" + ignore_files = set() + for ignore_path in set(ignore_paths): + # ignore_path may contains matchers (* or **). Use glob() to match user-defined path to actual paths + for matched_path in repo_root.glob(ignore_path): + if matched_path.is_file(): + # If the matched path is a file, add that to ignore_files set + ignore_files.add(matched_path.resolve()) + else: + # Otherwise, list all Python files in that directory and add all of them to ignore_files set + ignore_files |= { + sub_path.resolve() + for sub_path in matched_path.glob("**/*.py") + if sub_path.is_file() + } + return ignore_files + + +def get_repo_files(repo_root: Path) -> List[Path]: + """Get the list of all repo files, ignoring undesired files & directories specified in .feastignore""" + # Read ignore paths from .feastignore and create a set of all files that match any of these paths + ignore_paths = read_feastignore(repo_root) + [ + ".git", + ".feastignore", + ".venv", + "**/.ipynb_checkpoints", + "**/.pytest_cache", + "**/__pycache__", + ] + ignore_files = get_ignore_files(repo_root, ignore_paths) + + # List all Python files in the root directory (recursively) + repo_files = { + p.resolve() + for p in repo_root.glob("**/*.py") + if p.is_file() and "__init__.py" != p.name + } + # Ignore all files that match any of the ignore paths in .feastignore + repo_files -= ignore_files + + # Sort repo_files to read them in the same order every time + return sorted(repo_files) + + +def parse_repo(repo_root: Path) -> RepoContents: + """ + Collects unique Feast object definitions from the given feature repo. + + Specifically, if an object foo has already been added, bar will still be added if + (bar == foo), but not if (bar is foo). This ensures that import statements will + not result in duplicates, but defining two equal objects will. + """ + res = RepoContents( + projects=[], + data_sources=[], + entities=[], + feature_views=[], + feature_services=[], + on_demand_feature_views=[], + stream_feature_views=[], + permissions=[], + ) + + for repo_file in get_repo_files(repo_root): + module_path = py_path_to_module(repo_file) + module = importlib.import_module(module_path) + + for attr_name in dir(module): + obj = getattr(module, attr_name) + + if isinstance(obj, DataSource) and not any( + (obj is ds) for ds in res.data_sources + ): + res.data_sources.append(obj) + + # Handle batch sources defined within stream sources. + if ( + isinstance(obj, PushSource) + or isinstance(obj, KafkaSource) + or isinstance(obj, KinesisSource) + ): + batch_source = obj.batch_source + + if batch_source and not any( + (batch_source is ds) for ds in res.data_sources + ): + res.data_sources.append(batch_source) + if ( + isinstance(obj, FeatureView) + and not any((obj is fv) for fv in res.feature_views) + and not isinstance(obj, StreamFeatureView) + and not isinstance(obj, BatchFeatureView) + ): + res.feature_views.append(obj) + + # Handle batch sources defined with feature views. + batch_source = obj.batch_source + if batch_source is not None and not any( + (batch_source is ds) for ds in res.data_sources + ): + res.data_sources.append(batch_source) + + # Handle stream sources defined with feature views. + if obj.stream_source: + stream_source = obj.stream_source + if not any((stream_source is ds) for ds in res.data_sources): + res.data_sources.append(stream_source) + elif isinstance(obj, StreamFeatureView) and not any( + (obj is sfv) for sfv in res.stream_feature_views + ): + res.stream_feature_views.append(obj) + + # Handle batch sources defined with feature views. + batch_source = obj.batch_source + if batch_source is not None and not any( + (batch_source is ds) for ds in res.data_sources + ): + res.data_sources.append(batch_source) + assert obj.stream_source + stream_source = obj.stream_source + if not any((stream_source is ds) for ds in res.data_sources): + res.data_sources.append(stream_source) + elif isinstance(obj, BatchFeatureView) and not any( + (obj is bfv) for bfv in res.feature_views + ): + res.feature_views.append(obj) + + # Handle batch sources defined with feature views. + batch_source = obj.batch_source + if batch_source is not None and not any( + (batch_source is ds) for ds in res.data_sources + ): + res.data_sources.append(batch_source) + elif isinstance(obj, Entity) and not any( + (obj is entity) for entity in res.entities + ): + res.entities.append(obj) + elif isinstance(obj, FeatureService) and not any( + (obj is fs) for fs in res.feature_services + ): + res.feature_services.append(obj) + elif isinstance(obj, OnDemandFeatureView) and not any( + (obj is odfv) for odfv in res.on_demand_feature_views + ): + res.on_demand_feature_views.append(obj) + elif isinstance(obj, Permission) and not any( + (obj is p) for p in res.permissions + ): + res.permissions.append(obj) + elif isinstance(obj, Project) and not any((obj is p) for p in res.projects): + res.projects.append(obj) + + res.entities.append(DUMMY_ENTITY) + return res + + +def plan( + repo_config: RepoConfig, + repo_path: Path, + skip_source_validation: bool, + skip_feature_view_validation: bool = False, +): + os.chdir(repo_path) + repo = _get_repo_contents(repo_path, repo_config.project, repo_config) + for project in repo.projects: + repo_config.project = project.name + store, registry = _get_store_and_registry(repo_config) + # TODO: When we support multiple projects in a single repo, we should filter repo contents by project + if not skip_source_validation: + provider = store._get_provider() + data_sources = [ + t.batch_source for t in repo.feature_views if t.batch_source is not None + ] + # Make sure the data source used by this feature view is supported by Feast + for data_source in data_sources: + provider.validate_data_source(store.config, data_source) + + registry_diff, infra_diff, _ = store.plan( + repo, skip_feature_view_validation=skip_feature_view_validation + ) + click.echo(registry_diff.to_string()) + click.echo(infra_diff.to_string()) + + +def _get_repo_contents( + repo_path, + project_name: Optional[str] = None, + repo_config: Optional[RepoConfig] = None, +): + sys.dont_write_bytecode = True + repo = parse_repo(repo_path) + + if len(repo.projects) < 1: + if project_name: + print( + f"No project found in the repository. Using project name {project_name} defined in feature_store.yaml" + ) + project_description = ( + repo_config.project_description if repo_config else None + ) + repo.projects.append( + Project(name=project_name, description=project_description or "") + ) + else: + print( + "No project found in the repository. Either define Project in repository or define a project in feature_store.yaml" + ) + sys.exit(1) + elif len(repo.projects) == 1: + if repo.projects[0].name != project_name: + print( + "Project object name should match with the project name defined in feature_store.yaml" + ) + sys.exit(1) + else: + print( + "Multiple projects found in the repository. Currently no support for multiple projects" + ) + sys.exit(1) + + return repo + + +def _get_store_and_registry(repo_config): + store = FeatureStore(config=repo_config) + registry = store.registry + return store, registry + + +def extract_objects_for_apply_delete(project, registry, repo): + # TODO(achals): This code path should be refactored to handle added & kept entities separately. + ( + _, + objs_to_delete, + objs_to_update, + objs_to_add, + ) = extract_objects_for_keep_delete_update_add(registry, project, repo) + + all_to_apply: List[ + Union[ + Entity, + FeatureView, + OnDemandFeatureView, + StreamFeatureView, + FeatureService, + ] + ] = [] + for object_type in FEAST_OBJECT_TYPES: + to_apply = set(objs_to_add[object_type]).union(objs_to_update[object_type]) + all_to_apply.extend(to_apply) + + all_to_delete: List[ + Union[ + Entity, + FeatureView, + OnDemandFeatureView, + StreamFeatureView, + FeatureService, + ] + ] = [] + for object_type in FEAST_OBJECT_TYPES: + all_to_delete.extend(objs_to_delete[object_type]) + + return ( + all_to_apply, + all_to_delete, + set(objs_to_add[FeastObjectType.FEATURE_VIEW]).union( + set(objs_to_update[FeastObjectType.FEATURE_VIEW]) + ), + objs_to_delete[FeastObjectType.FEATURE_VIEW], + ) + + +def apply_total_with_repo_instance( + store: FeatureStore, + project_name: str, + registry: BaseRegistry, + repo: RepoContents, + skip_source_validation: bool, + skip_feature_view_validation: bool = False, + no_promote: bool = False, +): + if not skip_source_validation: + provider = store._get_provider() + data_sources = [ + t.batch_source for t in repo.feature_views if t.batch_source is not None + ] + # Make sure the data source used by this feature view is supported by Feast + for data_source in data_sources: + provider.validate_data_source(store.config, data_source) + + # For each object in the registry, determine whether it should be kept or deleted. + ( + all_to_apply, + all_to_delete, + views_to_keep, + views_to_delete, + ) = extract_objects_for_apply_delete(project_name, registry, repo) + + try: + if store._should_use_plan(): + # Planning phase - compute diffs first without progress bars + registry_diff, infra_diff, new_infra = store.plan( + repo, + skip_feature_view_validation=skip_feature_view_validation, + ) + click.echo(registry_diff.to_string()) + + # Only show progress bars if there are actual infrastructure changes + progress_ctx = None + if len(infra_diff.infra_object_diffs) > 0: + from feast.diff.apply_progress import ApplyProgressContext + + progress_ctx = ApplyProgressContext() + progress_ctx.start_overall_progress() + + # Apply phase + store._apply_diffs( + registry_diff, + infra_diff, + new_infra, + progress_ctx=progress_ctx, + no_promote=no_promote, + ) + click.echo(infra_diff.to_string()) + else: + # Legacy apply path - no progress bars for legacy path + store.apply( + all_to_apply, + objects_to_delete=all_to_delete, + partial=False, + skip_feature_view_validation=skip_feature_view_validation, + no_promote=no_promote, + ) + log_infra_changes(views_to_keep, views_to_delete) + finally: + # Cleanup is handled in the new _apply_diffs method + pass + + +def log_infra_changes( + views_to_keep: Set[FeatureView], views_to_delete: Set[FeatureView] +): + from colorama import Fore, Style + + for view in views_to_keep: + click.echo( + f"Deploying infrastructure for {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}" + ) + for view in views_to_delete: + click.echo( + f"Removing infrastructure for {Style.BRIGHT + Fore.RED}{view.name}{Style.RESET_ALL}" + ) + + +def create_feature_store( + ctx: click.Context, +) -> FeatureStore: + repo = ctx.obj["CHDIR"] + # If we received a base64 encoded version of feature_store.yaml, use that + config_base64 = os.getenv(FEATURE_STORE_YAML_ENV_NAME) + if config_base64: + print("Received base64 encoded feature_store.yaml") + config_bytes = base64.b64decode(config_base64) + # Create a new unique directory for writing feature_store.yaml + repo_path = Path(tempfile.mkdtemp()) + with open(repo_path / "feature_store.yaml", "wb") as f: + f.write(config_bytes) + return FeatureStore(repo_path=str(repo_path.resolve())) + else: + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + return FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + + +def apply_total( + repo_config: RepoConfig, + repo_path: Path, + skip_source_validation: bool, + skip_feature_view_validation: bool = False, + no_promote: bool = False, +): + os.chdir(repo_path) + repo = _get_repo_contents(repo_path, repo_config.project, repo_config) + for project in repo.projects: + repo_config.project = project.name + store, registry = _get_store_and_registry(repo_config) + if not is_valid_name(project.name): + print( + f"{project.name} is not valid. Project name should only have " + f"alphanumerical values, underscores, and hyphens but not start with an underscore or hyphen." + ) + sys.exit(1) + # TODO: When we support multiple projects in a single repo, we should filter repo contents by project. Currently there is no way to associate Feast objects to project. + print(f"Applying changes for project {project.name}") + apply_total_with_repo_instance( + store, + project.name, + registry, + repo, + skip_source_validation, + skip_feature_view_validation, + no_promote=no_promote, + ) + + +def teardown(repo_config: RepoConfig, repo_path: Optional[str]): + # Cannot pass in both repo_path and repo_config to FeatureStore. + feature_store = FeatureStore(repo_path=repo_path, config=repo_config) + feature_store.teardown() + + +def registry_dump(repo_config: RepoConfig, repo_path: Path) -> str: + """For debugging only: output contents of the metadata registry""" + registry_config = repo_config.registry + project = repo_config.project + registry = Registry( + project, + registry_config=registry_config, + repo_path=repo_path, + auth_config=repo_config.auth_config, + ) + registry_dict = registry.to_dict(project=project) + return json.dumps(registry_dict, indent=2, sort_keys=True) + + +def cli_check_repo(repo_path: Path, fs_yaml_file: Path): + sys.path.append(str(repo_path)) + if not fs_yaml_file.exists(): + print( + f"Can't find feature repo configuration file at {fs_yaml_file}. " + "Make sure you're running feast from an initialized feast repository." + ) + sys.exit(1) + + +def init_repo(repo_name: str, template: str, repo_path: Optional[str] = None): + import os + from pathlib import Path + from shutil import copytree + + from colorama import Fore, Style + + # Validate project name + if not is_valid_name(repo_name): + raise BadParameter( + message="Name should be alphanumeric values, underscores, and hyphens but not start with an underscore or hyphen", + param_hint="PROJECT_DIRECTORY", + ) + + # Determine where to create the repository + if repo_path: + # User specified a custom path + target_path = Path(repo_path).resolve() + target_path.mkdir(parents=True, exist_ok=True) + display_path = repo_path + else: + # Default behavior: create subdirectory with project name + target_path = Path(os.path.join(Path.cwd(), repo_name)) + target_path.mkdir(exist_ok=True) + display_path = repo_name + + repo_config_path = target_path / "feature_store.yaml" + + if repo_config_path.exists(): + print( + f"The directory {Style.BRIGHT + Fore.GREEN}{display_path}{Style.RESET_ALL} contains an existing feature " + f"store repository that may cause a conflict" + ) + print() + sys.exit(1) + + # Copy template directory + template_path = str(Path(Path(__file__).parent / "templates" / template).absolute()) + if not os.path.exists(template_path): + raise IOError(f"Could not find template {template}") + copytree(template_path, str(target_path), dirs_exist_ok=True) + + # Rename gitignore files back to .gitignore + for gitignore_path in target_path.rglob("gitignore"): + gitignore_path.rename(gitignore_path.with_name(".gitignore")) + + # Seed the repository + bootstrap_path = target_path / "bootstrap.py" + if os.path.exists(bootstrap_path): + import importlib.util + + spec = importlib.util.spec_from_file_location("bootstrap", str(bootstrap_path)) + assert isinstance(spec, ModuleSpec) + bootstrap = importlib.util.module_from_spec(spec) + assert isinstance(spec.loader, Loader) + spec.loader.exec_module(bootstrap) + bootstrap.bootstrap() # type: ignore + os.remove(bootstrap_path) + + # Template the feature_store.yaml file + feature_store_yaml_path = target_path / "feature_repo" / "feature_store.yaml" + replace_str_in_file( + feature_store_yaml_path, "project: my_project", f"project: {repo_name}" + ) + + # Remove the __pycache__ folder if it exists + import shutil + + shutil.rmtree(target_path / "__pycache__", ignore_errors=True) + + import click + + click.echo() + click.echo( + f"Creating a new Feast repository in {Style.BRIGHT + Fore.GREEN}{target_path}{Style.RESET_ALL}." + ) + click.echo() + + +def is_valid_name(name: str) -> bool: + """A name should be alphanumeric values, underscores, and hyphens but not start with an underscore""" + return ( + not name.startswith(("_", "-")) and re.compile(r"[^\w-]+").search(name) is None + ) + + +def generate_project_name() -> str: + """Generates a unique project name""" + return f"{random.choice(adjectives)}_{random.choice(animals)}" diff --git a/sdk/python/tests/unit/test_metrics.py b/sdk/python/tests/unit/test_metrics.py index bffde73dd91..abf2a35e389 100644 --- a/sdk/python/tests/unit/test_metrics.py +++ b/sdk/python/tests/unit/test_metrics.py @@ -18,9 +18,14 @@ import pytest from feast.metrics import ( + emit_offline_audit_log, + emit_online_audit_log, feature_freshness_seconds, materialization_duration_seconds, materialization_result_total, + offline_store_request_latency_seconds, + offline_store_request_total, + offline_store_row_count, online_features_entity_count, online_features_request_count, online_features_status_total, @@ -42,13 +47,11 @@ ) -@pytest.fixture(autouse=True) -def _enable_metrics(): - """Enable all metric categories for each test, then restore.""" +def _all_enabled_flags(): + """Return a _MetricsFlags with every category enabled.""" import feast.metrics as m - original = m._config - m._config = m._MetricsFlags( + return m._MetricsFlags( enabled=True, resource=True, request=True, @@ -56,7 +59,18 @@ def _enable_metrics(): push=True, materialization=True, freshness=True, + offline_features=True, + audit_logging=True, ) + + +@pytest.fixture(autouse=True) +def _enable_metrics(): + """Enable all metric categories for each test, then restore.""" + import feast.metrics as m + + original = m._config + m._config = _all_enabled_flags() yield m._config = original @@ -1081,3 +1095,640 @@ def test_separate_from_read_transform_metric(self): assert abs(read_delta - 0.01) < 0.001 assert abs(write_delta - 0.05) < 0.001 + + +class TestOfflineStoreMetrics: + """Tests for the offline store Prometheus metrics (RED pattern).""" + + def test_request_total_increments_on_success(self): + before = offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + + offline_store_request_total.labels(method="to_arrow", status="success").inc() + + assert ( + offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + == before + 1 + ) + + def test_request_total_increments_on_error(self): + before = offline_store_request_total.labels( + method="to_arrow", status="error" + )._value.get() + + offline_store_request_total.labels(method="to_arrow", status="error").inc() + + assert ( + offline_store_request_total.labels( + method="to_arrow", status="error" + )._value.get() + == before + 1 + ) + + def test_latency_histogram_records(self): + before_sum = offline_store_request_latency_seconds.labels( + method="to_arrow" + )._sum.get() + + offline_store_request_latency_seconds.labels(method="to_arrow").observe(2.5) + + after_sum = offline_store_request_latency_seconds.labels( + method="to_arrow" + )._sum.get() + assert pytest.approx(after_sum - before_sum, abs=0.01) == 2.5 + + def test_row_count_histogram_records(self): + before_sum = offline_store_row_count.labels(method="to_arrow")._sum.get() + + offline_store_row_count.labels(method="to_arrow").observe(1000) + + after_sum = offline_store_row_count.labels(method="to_arrow")._sum.get() + assert pytest.approx(after_sum - before_sum, abs=1) == 1000 + + def test_different_methods_tracked_independently(self): + before_a = offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + before_b = offline_store_request_total.labels( + method="other", status="success" + )._value.get() + + offline_store_request_total.labels(method="to_arrow", status="success").inc() + + assert ( + offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + == before_a + 1 + ) + assert ( + offline_store_request_total.labels( + method="other", status="success" + )._value.get() + == before_b + ) + + +class TestEmitAuditLogs: + """Tests for structured JSON audit log emission.""" + + def test_emit_online_audit_log_writes_json(self): + import json + import logging + + _audit_logger = logging.getLogger("feast.audit") + with patch.object(_audit_logger, "info") as mock_info: + emit_online_audit_log( + requestor_id="user@example.com", + entity_keys=["driver_id", "customer_id"], + entity_count=10, + feature_views=["driver_fv", "order_fv"], + feature_count=5, + status="success", + latency_ms=42.0, + ) + + mock_info.assert_called_once() + logged_json = mock_info.call_args[0][0] + record = json.loads(logged_json) + + assert record["event"] == "online_feature_request" + assert record["requestor_id"] == "user@example.com" + assert record["entity_keys"] == ["driver_id", "customer_id"] + assert record["entity_count"] == 10 + assert record["feature_views"] == ["driver_fv", "order_fv"] + assert record["feature_count"] == 5 + assert record["status"] == "success" + assert record["latency_ms"] == pytest.approx(42.0) + assert "timestamp" in record + + def test_emit_online_audit_log_noop_when_disabled(self): + import logging + + import feast.metrics as m + + m._config = m._MetricsFlags(enabled=True, audit_logging=False) + _audit_logger = logging.getLogger("feast.audit") + with patch.object(_audit_logger, "info") as mock_info: + emit_online_audit_log( + requestor_id="user@example.com", + entity_keys=["driver_id"], + entity_count=1, + feature_views=["driver_fv"], + feature_count=1, + status="success", + latency_ms=10.0, + ) + mock_info.assert_not_called() + + def test_emit_offline_audit_log_writes_json(self): + import json + import logging + + _audit_logger = logging.getLogger("feast.audit") + with patch.object(_audit_logger, "info") as mock_info: + emit_offline_audit_log( + method="to_arrow", + feature_views=["driver_fv"], + feature_count=3, + row_count=500, + status="success", + start_time="2026-04-27T12:00:00+00:00", + end_time="2026-04-27T12:00:01+00:00", + duration_ms=1230.0, + ) + + mock_info.assert_called_once() + logged_json = mock_info.call_args[0][0] + record = json.loads(logged_json) + + assert record["event"] == "offline_feature_retrieval" + assert "timestamp" in record + assert record["method"] == "to_arrow" + assert record["feature_views"] == ["driver_fv"] + assert record["feature_count"] == 3 + assert record["row_count"] == 500 + assert record["status"] == "success" + assert record["duration_ms"] == pytest.approx(1230.0) + assert record["start_time"] == "2026-04-27T12:00:00+00:00" + assert record["end_time"] == "2026-04-27T12:00:01+00:00" + + def test_emit_offline_audit_log_noop_when_disabled(self): + import logging + + import feast.metrics as m + + m._config = m._MetricsFlags(enabled=True, audit_logging=False) + _audit_logger = logging.getLogger("feast.audit") + with patch.object(_audit_logger, "info") as mock_info: + emit_offline_audit_log( + method="to_arrow", + feature_views=["fv"], + feature_count=1, + row_count=10, + status="success", + start_time="t0", + end_time="t1", + duration_ms=500.0, + ) + mock_info.assert_not_called() + + def test_emit_online_audit_log_with_error_status(self): + import json + import logging + + _audit_logger = logging.getLogger("feast.audit") + with patch.object(_audit_logger, "info") as mock_info: + emit_online_audit_log( + requestor_id="unknown", + entity_keys=[], + entity_count=0, + feature_views=[], + feature_count=0, + status="error", + latency_ms=1.0, + ) + + record = json.loads(mock_info.call_args[0][0]) + assert record["status"] == "error" + + +class TestBuildMetricsFlagsOfflineAndAudit: + """Tests for the new offline_features and audit_logging flags.""" + + def test_no_config_defaults_for_new_flags(self): + from feast.metrics import build_metrics_flags + + flags = build_metrics_flags(None) + assert flags.offline_features is True + assert flags.audit_logging is False + + def test_explicit_enable(self): + from types import SimpleNamespace + + from feast.metrics import build_metrics_flags + + mc = SimpleNamespace( + enabled=True, + resource=True, + request=True, + online_features=True, + push=True, + materialization=True, + freshness=True, + offline_features=True, + audit_logging=True, + ) + flags = build_metrics_flags(mc) + assert flags.offline_features is True + assert flags.audit_logging is True + + def test_explicit_disable(self): + from types import SimpleNamespace + + from feast.metrics import build_metrics_flags + + mc = SimpleNamespace( + enabled=True, + resource=True, + request=True, + online_features=True, + push=True, + materialization=True, + freshness=True, + offline_features=False, + audit_logging=False, + ) + flags = build_metrics_flags(mc) + assert flags.offline_features is False + assert flags.audit_logging is False + + def test_missing_new_attrs_fall_back_to_defaults(self): + from types import SimpleNamespace + + from feast.metrics import build_metrics_flags + + mc = SimpleNamespace( + enabled=True, + resource=True, + request=True, + online_features=True, + push=True, + materialization=True, + freshness=True, + ) + flags = build_metrics_flags(mc) + assert flags.offline_features is True + assert flags.audit_logging is False + + +class TestExtractRetrievalMetadata: + """Tests for _extract_retrieval_metadata helper.""" + + def test_extracts_feature_views_and_count(self): + from feast.infra.offline_stores.offline_store import ( + RetrievalMetadata, + _extract_retrieval_metadata, + ) + + job = MagicMock() + job.metadata = RetrievalMetadata( + features=[ + "driver_fv:conv_rate", + "driver_fv:acc_rate", + "vehicle_fv:mileage", + ], + keys=["driver_id"], + ) + + fv_names, feat_count = _extract_retrieval_metadata(job) + assert feat_count == 3 + assert set(fv_names) == {"driver_fv", "vehicle_fv"} + + def test_returns_empty_when_no_metadata(self): + from feast.infra.offline_stores.offline_store import ( + _extract_retrieval_metadata, + ) + + job = MagicMock() + job.metadata = None + + fv_names, feat_count = _extract_retrieval_metadata(job) + assert fv_names == [] + assert feat_count == 0 + + def test_handles_not_implemented_metadata(self): + from feast.infra.offline_stores.offline_store import ( + _extract_retrieval_metadata, + ) + + job = MagicMock() + type(job).metadata = property( + lambda self: (_ for _ in ()).throw(NotImplementedError()) + ) + + fv_names, feat_count = _extract_retrieval_metadata(job) + assert fv_names == [] + assert feat_count == 0 + + +class TestRetrievalJobToArrowInstrumentation: + """Tests for the metrics/audit instrumentation in RetrievalJob.to_arrow().""" + + def _make_job( + self, table, on_demand_fvs=None, metadata=None, raise_on_internal=None + ): + """Create a concrete RetrievalJob subclass for testing.""" + from feast.infra.offline_stores.offline_store import RetrievalJob + + class _TestJob(RetrievalJob): + def __init__(self): + self._table = table + self._odfvs = on_demand_fvs or [] + self._metadata = metadata + self._raise = raise_on_internal + + def _to_arrow_internal(self, timeout=None): + if self._raise: + raise self._raise + return self._table + + @property + def full_feature_names(self): + return False + + @property + def on_demand_feature_views(self): + return self._odfvs + + @property + def metadata(self): + return self._metadata + + return _TestJob() + + def test_success_increments_counter_and_records_latency(self): + import pyarrow as pa + + table = pa.table({"col": [1, 2, 3]}) + job = self._make_job(table) + + before_count = offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + before_latency = offline_store_request_latency_seconds.labels( + method="to_arrow" + )._sum.get() + + result = job.to_arrow() + + assert result.num_rows == 3 + assert ( + offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + == before_count + 1 + ) + assert ( + offline_store_request_latency_seconds.labels(method="to_arrow")._sum.get() + > before_latency + ) + + def test_error_increments_error_counter(self): + job = self._make_job(None, raise_on_internal=RuntimeError("query failed")) + + before_error = offline_store_request_total.labels( + method="to_arrow", status="error" + )._value.get() + + with pytest.raises(RuntimeError, match="query failed"): + job.to_arrow() + + assert ( + offline_store_request_total.labels( + method="to_arrow", status="error" + )._value.get() + == before_error + 1 + ) + + def test_row_count_recorded_on_success(self): + import pyarrow as pa + + table = pa.table({"a": list(range(500))}) + job = self._make_job(table) + + before_sum = offline_store_row_count.labels(method="to_arrow")._sum.get() + + job.to_arrow() + + assert ( + offline_store_row_count.labels(method="to_arrow")._sum.get() + >= before_sum + 500 + ) + + def test_row_count_recorded_when_zero(self): + import pyarrow as pa + + table = pa.table({"a": pa.array([], type=pa.int64())}) + job = self._make_job(table) + + hist = offline_store_row_count.labels(method="to_arrow") + before_bucket = hist._buckets[0].get() + + job.to_arrow() + + assert hist._buckets[0].get() == before_bucket + 1 + + def test_metrics_skipped_when_offline_features_disabled(self): + import pyarrow as pa + + import feast.metrics as m + + m._config = m._MetricsFlags( + enabled=True, offline_features=False, audit_logging=False + ) + + table = pa.table({"col": [1, 2]}) + job = self._make_job(table) + + before_count = offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + + job.to_arrow() + + assert ( + offline_store_request_total.labels( + method="to_arrow", status="success" + )._value.get() + == before_count + ) + + def test_audit_log_emitted_on_success(self): + import pyarrow as pa + + from feast.infra.offline_stores.offline_store import RetrievalMetadata + + meta = RetrievalMetadata( + features=["driver_fv:conv_rate", "driver_fv:acc_rate"], + keys=["driver_id"], + ) + table = pa.table({"col": [1, 2, 3]}) + job = self._make_job(table, metadata=meta) + + with patch("feast.metrics.emit_offline_audit_log") as mock_audit: + job.to_arrow() + + mock_audit.assert_called_once() + call_kwargs = mock_audit.call_args[1] + assert call_kwargs["method"] == "to_arrow" + assert call_kwargs["status"] == "success" + assert call_kwargs["row_count"] == 3 + assert call_kwargs["feature_count"] == 2 + assert set(call_kwargs["feature_views"]) == {"driver_fv"} + + def test_audit_log_skipped_when_disabled(self): + import pyarrow as pa + + import feast.metrics as m + + m._config = m._MetricsFlags( + enabled=True, offline_features=True, audit_logging=False + ) + + table = pa.table({"col": [1]}) + job = self._make_job(table) + + with patch("feast.metrics.emit_offline_audit_log") as mock_audit: + job.to_arrow() + mock_audit.assert_not_called() + + def test_instrumentation_failure_does_not_mask_query_error(self): + """If metrics code itself throws, the original query error still propagates.""" + import pyarrow as pa + + table = pa.table({"col": [1]}) + job = self._make_job(table) + + with patch( + "feast.metrics._config", + new_callable=lambda: property( + lambda self: (_ for _ in ()).throw(RuntimeError("metrics broken")) + ), + ): + result = job.to_arrow() + assert result.num_rows == 1 + + +class TestParseFeatureInfo: + """Tests for _parse_feature_info in feature_server.""" + + def test_feature_ref_list(self): + from feast.feature_server import _parse_feature_info + + refs = ["driver_fv:conv_rate", "driver_fv:acc_rate", "vehicle_fv:mileage"] + fv_names, feat_count = _parse_feature_info(refs) + assert feat_count == 3 + assert set(fv_names) == {"driver_fv", "vehicle_fv"} + + def test_empty_list(self): + from feast.feature_server import _parse_feature_info + + fv_names, feat_count = _parse_feature_info([]) + assert fv_names == [] + assert feat_count == 0 + + def test_feature_service(self): + from feast.feature_server import _parse_feature_info + + proj1 = MagicMock() + proj1.name = "driver_fv" + proj1.features = [MagicMock(), MagicMock()] + proj2 = MagicMock() + proj2.name = "order_fv" + proj2.features = [MagicMock()] + + fs_svc = MagicMock() + fs_svc.feature_view_projections = [proj1, proj2] + + from feast.feature_service import FeatureService + + fs_svc.__class__ = FeatureService + + fv_names, feat_count = _parse_feature_info(fs_svc) + assert feat_count == 3 + assert fv_names == ["driver_fv", "order_fv"] + + def test_strips_version_suffix(self): + from feast.feature_server import _parse_feature_info + + refs = ["driver_fv@v2:conv_rate"] + fv_names, feat_count = _parse_feature_info(refs) + assert feat_count == 1 + assert fv_names == ["driver_fv"] + + +class TestEmitOnlineAudit: + """Tests for the _emit_online_audit helper in feature_server.""" + + def test_emits_audit_log_with_anonymous_user(self): + from feast.feature_server import GetOnlineFeaturesRequest, _emit_online_audit + + request = GetOnlineFeaturesRequest( + entities={"driver_id": [1, 2]}, + features=["driver_fv:conv_rate"], + ) + + with ( + patch("feast.feature_server.feast_metrics") as mock_metrics, + patch( + "feast.permissions.security_manager.get_security_manager", + return_value=None, + ), + ): + _emit_online_audit( + request=request, + features=request.features, + entity_count=2, + status="success", + latency_ms=15.0, + ) + + mock_metrics.emit_online_audit_log.assert_called_once() + kwargs = mock_metrics.emit_online_audit_log.call_args[1] + assert kwargs["requestor_id"] == "anonymous" + assert kwargs["entity_keys"] == ["driver_id"] + assert kwargs["entity_count"] == 2 + assert kwargs["status"] == "success" + + def test_emits_audit_log_with_authenticated_user(self): + from feast.feature_server import GetOnlineFeaturesRequest, _emit_online_audit + + request = GetOnlineFeaturesRequest( + entities={"driver_id": [1]}, + features=["driver_fv:conv_rate"], + ) + + mock_sm = MagicMock() + mock_sm.current_user.username = "jdoe" + + with ( + patch("feast.feature_server.feast_metrics") as mock_metrics, + patch( + "feast.permissions.security_manager.get_security_manager", + return_value=mock_sm, + ), + ): + _emit_online_audit( + request=request, + features=request.features, + entity_count=1, + status="success", + latency_ms=10.0, + ) + + kwargs = mock_metrics.emit_online_audit_log.call_args[1] + assert kwargs["requestor_id"] == "jdoe" + + def test_does_not_raise_on_failure(self): + from feast.feature_server import GetOnlineFeaturesRequest, _emit_online_audit + + request = GetOnlineFeaturesRequest( + entities={"driver_id": [1]}, + features=["driver_fv:conv_rate"], + ) + + with patch( + "feast.permissions.security_manager.get_security_manager", + side_effect=RuntimeError("auth broken"), + ): + _emit_online_audit( + request=request, + features=request.features, + entity_count=1, + status="error", + latency_ms=5.0, + )