diff --git a/README.md b/README.md index ded20b15376..115bd37903f 100644 --- a/README.md +++ b/README.md @@ -185,6 +185,7 @@ The list below contains the functionality that contributors are planning to deve * [x] [Athena (contrib plugin)](https://docs.feast.dev/reference/data-sources/athena) * [x] [Clickhouse (contrib plugin)](https://docs.feast.dev/reference/data-sources/clickhouse) * [x] [Oracle (contrib plugin)](https://docs.feast.dev/reference/data-sources/oracle) + * [x] [MongoDB (contrib plugin)](https://docs.feast.dev/reference/data-sources/mongodb) * [x] [Ray source (contrib plugin)](https://docs.feast.dev/reference/data-sources/ray) * [x] Kafka / Kinesis sources (via [push support into the online store](https://docs.feast.dev/reference/data-sources/push)) * **Offline Stores** @@ -204,6 +205,7 @@ The list below contains the functionality that contributors are planning to deve * [x] [Clickhouse (contrib plugin)](https://docs.feast.dev/reference/offline-stores/clickhouse) * [x] [Ray (contrib plugin)](https://docs.feast.dev/reference/offline-stores/ray) * [x] [Oracle (contrib plugin)](https://docs.feast.dev/reference/offline-stores/oracle) + * [x] [MongoDB (contrib plugin)](https://docs.feast.dev/reference/offline-stores/mongodb) * [x] [Hybrid](https://docs.feast.dev/reference/offline-stores/hybrid) * [x] [Custom offline store support](https://docs.feast.dev/how-to-guides/customizing-feast/adding-a-new-offline-store) * **Online Stores** diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index b29a0ac9ce8..44c1cc09477 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -113,6 +113,7 @@ * [Athena (contrib)](reference/data-sources/athena.md) * [Clickhouse (contrib)](reference/data-sources/clickhouse.md) * [Ray (contrib)](reference/data-sources/ray.md) + * [MongoDB (contrib)](reference/data-sources/mongodb.md) * [Offline stores](reference/offline-stores/README.md) * [Overview](reference/offline-stores/overview.md) * [Dask](reference/offline-stores/dask.md) @@ -129,6 +130,7 @@ * [Ray (contrib)](reference/offline-stores/ray.md) * [Oracle (contrib)](reference/offline-stores/oracle.md) * [Athena (contrib)](reference/offline-stores/athena.md) + * [MongoDB (contrib)](reference/offline-stores/mongodb.md) * [Remote Offline](reference/offline-stores/remote-offline-store.md) * [Hybrid](reference/offline-stores/hybrid.md) * [Online stores](reference/online-stores/README.md) diff --git a/docs/reference/data-sources/README.md b/docs/reference/data-sources/README.md index e25a7f6e8ae..24bf18dbe86 100644 --- a/docs/reference/data-sources/README.md +++ b/docs/reference/data-sources/README.md @@ -69,3 +69,7 @@ Please see [Data Source](../../getting-started/concepts/data-ingestion.md) for a {% content-ref url="ray.md" %} [ray.md](ray.md) {% endcontent-ref %} + +{% content-ref url="mongodb.md" %} +[mongodb.md](mongodb.md) +{% endcontent-ref %} diff --git a/docs/reference/data-sources/mongodb.md b/docs/reference/data-sources/mongodb.md new file mode 100644 index 00000000000..c1b6eed1bed --- /dev/null +++ b/docs/reference/data-sources/mongodb.md @@ -0,0 +1,113 @@ +# MongoDB source (contrib) + +## Description + +MongoDB data sources are [MongoDB](https://www.mongodb.com/) collections that can be used as a source for feature data. The `MongoDBSource` points at a MongoDB collection and provides the metadata Feast needs to read historical features from the offline store's collection. + +## Examples + +Defining a MongoDB source: + +```python +from feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb import ( + MongoDBSource, +) + +driver_stats_source = MongoDBSource( + name="driver_stats", + timestamp_field="event_timestamp", + created_timestamp_column="created_at", +) +``` + +The `name` field becomes the `feature_view` discriminator stored in every document in the `feature_history` collection. + +Configuration options such as `connection_string`, `database`, and `collection` are inherited from the offline store configuration in `feature_store.yaml`. + +The full set of configuration options is available [here](https://rtd.feast.dev/en/master/#feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb.MongoDBSource). + +## Vector Search + +The MongoDB online store supports [Atlas Vector Search](https://www.mongodb.com/docs/atlas/atlas-vector-search/), enabling similarity search over feature embeddings stored in MongoDB Atlas. This is powered by the `$vectorSearch` aggregation stage and requires MongoDB Atlas (or the `mongodb/mongodb-atlas-local` Docker image for local development). + +See [PR #6344](https://github.com/feast-dev/feast/pull/6344) for full implementation details. + +### Configuration + +Enable vector search in your `feature_store.yaml`: + +```yaml +project: my_project +provider: local +online_store: + type: mongodb + connection_string: mongodb+srv://:@cluster.mongodb.net + vector_enabled: true + similarity: cosine # cosine | euclidean | dotProduct + vector_index_wait_timeout: 60 # seconds to wait for index to become queryable + vector_index_wait_poll_interval: 1.0 # seconds between polls +``` + +### Defining a Feature View with Vector Index + +Mark embedding fields with `vector_index=True` and specify `vector_length`: + +```python +from feast import Entity, FeatureView, Field, FileSource +from feast.types import Array, Float32, Int64, String +from datetime import timedelta + +item_embeddings = FeatureView( + name="item_embeddings", + entities=[Entity(name="item_id", join_keys=["item_id"])], + schema=[ + Field( + name="embedding", + dtype=Array(Float32), + vector_index=True, + vector_length=384, + vector_search_metric="cosine", + ), + Field(name="title", dtype=String), + Field(name="item_id", dtype=Int64), + ], + source=FileSource(path="items.parquet", timestamp_field="event_timestamp"), + ttl=timedelta(hours=24), +) +``` + +When `feast apply` (or `store.update()`) runs with `vector_enabled=True`, Atlas vector search indexes are automatically created for any field with `vector_index=True`. Indexes are also automatically dropped when feature views are removed. + +### Retrieving Documents via Vector Search + +Use `retrieve_online_documents_v2()` to perform similarity search: + +```python +source = FeatureStore(repo_path=".") +results = store.retrieve_online_documents_v2( + config=repo_config, + table=item_embeddings, + requested_features=["embedding", "title"], + embedding=[0.1, 0.2, ...], # query vector + top_k=5, +) + +# Each result is a (event_timestamp, entity_key_proto, feature_dict) tuple. +# feature_dict includes a synthetic "distance" key with the vector search score. +for ts, entity_key, features in results: + print(features["title"].string_val, features["distance"].float_val) +``` +``` + +### How It Works + +- **Index creation**: `update()` creates an Atlas vector search index named `____vs_index` for each vector-indexed field. It waits for the index to reach `READY` status before proceeding. +- **Query execution**: `retrieve_online_documents_v2()` builds a `$vectorSearch` aggregation pipeline with `numCandidates = max(top_k * 10, 100)` and the specified `limit`. +- **Score**: Results include a `distance` field populated from `$meta: "vectorSearchScore"`. +- **BSON compatibility**: Query vectors are coerced to native Python floats to avoid numpy serialization issues. +- **Idempotency**: Calling `update()` multiple times will not duplicate indexes. + +## Supported Types + +MongoDB data sources support all eight primitive types (`bytes`, `string`, `int32`, `int64`, `float32`, `float64`, `bool`, `timestamp`) and their corresponding array types. Complex types such as `Map` and `Struct` are preserved through the MongoDB document model. +For a comparison against other batch data sources, please see [here](overview.md#functionality-matrix). diff --git a/docs/reference/offline-stores/README.md b/docs/reference/offline-stores/README.md index 5f4e146326a..1c0d24c8d07 100644 --- a/docs/reference/offline-stores/README.md +++ b/docs/reference/offline-stores/README.md @@ -62,6 +62,10 @@ Please see [Offline Store](../../getting-started/components/offline-store.md) fo [clickhouse.md](clickhouse.md) {% endcontent-ref %} +{% content-ref url="mongodb.md" %} +[mongodb.md](mongodb.md) +{% endcontent-ref %} + {% content-ref url="remote-offline-store.md" %} [remote-offline-store.md](remote-offline-store.md) {% endcontent-ref %} diff --git a/docs/reference/offline-stores/mongodb.md b/docs/reference/offline-stores/mongodb.md new file mode 100644 index 00000000000..0e8d1786699 --- /dev/null +++ b/docs/reference/offline-stores/mongodb.md @@ -0,0 +1,103 @@ +# MongoDB offline store (contrib) + +## Description + +The MongoDB offline store provides support for reading [MongoDBSource](../data-sources/mongodb.md). +* Uses a single shared collection with a compound index for all FeatureViews, distinguished by a `feature_view` discriminator field. +* Entity dataframes can be provided as a Pandas dataframe. The offline store converts entity identifiers into serialized entity keys for efficient lookup against the collection. + +## Getting started + +In order to use this offline store, you'll need to run `pip install 'feast[mongodb]'`. + +## Example + +{% code title="feature_store.yaml" %} +```yaml +project: my_project +registry: data/registry.db +provider: local +offline_store: + type: feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb.MongoDBOfflineStore + connection_string: "mongodb+srv://user:pass@cluster.mongodb.net" # pragma: allowlist secret + database: feast + collection: feature_history +online_store: + type: mongodb + connection_string: "mongodb+srv://user:pass@cluster.mongodb.net" # pragma: allowlist secret + database_name: feast_online_store + collection_suffix: latest + client_kwargs: {} +``` +{% endcode %} + +The full set of configuration options is available in [MongoDBOfflineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.offline_stores.contrib.mongodb_offline_store.mongodb.MongoDBOfflineStoreConfig). + +## Data Model + +The offline store uses a single shared collection (by default `feature_history`) that stores append-only historical feature rows for all feature views. Each document represents one observation of one entity for one FeatureView at a specific event timestamp: + +```json +{ + "entity_id": "Binary(...)", + "feature_view": "driver_stats", + "event_timestamp": "ISODate(2024-01-15T12:00:00Z)", + "created_at": "ISODate(2024-01-15T12:01:00Z)", + "features": { + "conv_rate": 0.72, + "acc_rate": 0.91, + "avg_daily_trips": 14 + } +} +``` + +Key properties: + +* **Append-only**: Historical data is treated as immutable; corrections are written as new rows with newer `created_at` timestamps rather than in-place updates. +* **Time-series friendly**: `event_timestamp` represents when the feature value was observed; `created_at` is used as a tie-breaker when multiple observations share the same event timestamp. +* **Feature grouping by FeatureView**: `feature_view` identifies which FeatureView the row belongs to, so a single collection can host multiple FVs. + +A single compound index supports all major query patterns: + +``` +(entity_id ASC, feature_view ASC, event_timestamp DESC, created_at DESC) +``` + +This index enables efficient range scans over entities and feature views, while ensuring that the most recent observation per `(entity_id, feature_view)` is seen first during aggregation. The index is created lazily on first use and cached per connection string. + +## Key Optimizations + +* **Scoring vs. training paths**: When each entity appears only once in `entity_df` (scoring/inference — one feature lookup per entity), server-side `$group $first` efficiently returns the single latest value per entity. When the same entity appears at multiple timestamps (training — building a dataset with many historical snapshots per entity), the store retrieves all candidate rows and uses `pd.merge_asof` to select the correct point-in-time value for each request timestamp. +* **Two-level chunking**: `CHUNK_SIZE` (50,000 rows) controls the size of intermediate DataFrames in memory; `MONGO_BATCH_SIZE` (10,000 entity IDs) limits the query size sent to MongoDB. + +## Functionality Matrix + +The set of functionality supported by offline stores is described in detail [here](overview.md#functionality). +Below is a matrix indicating which functionality is supported by the MongoDB offline store. + +| | MongoDB | +| :----------------------------------------------------------------- | :------ | +| `get_historical_features` (point-in-time correct join) | yes | +| `pull_latest_from_table_or_query` (retrieve latest feature values) | yes | +| `pull_all_from_table_or_query` (retrieve a saved dataset) | yes | +| `offline_write_batch` (persist dataframes to offline store) | yes | +| `write_logged_features` (persist logged features to offline store) | no | + +Below is a matrix indicating which functionality is supported by `MongoDBRetrievalJob`. + +| | MongoDB | +| ----------------------------------------------------- | ------- | +| export to dataframe | yes | +| export to arrow table | yes | +| export to arrow batches | no | +| export to SQL | no | +| export to data lake (S3, GCS, etc.) | no | +| export to data warehouse | no | +| export as Spark dataframe | no | +| local execution of Python-based on-demand transforms | yes | +| remote execution of Python-based on-demand transforms | no | +| persist results in the offline store | yes | +| preview the query plan before execution | no | +| read partitioned data | no | + +To compare this set of functionality against other offline stores, please see the full [functionality matrix](overview.md#functionality-matrix). diff --git a/docs/roadmap.md b/docs/roadmap.md index 017127d7355..e47aa79b573 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -20,6 +20,7 @@ The list below contains the functionality that contributors are planning to deve * [x] [Athena (contrib plugin)](https://docs.feast.dev/reference/data-sources/athena) * [x] [Clickhouse (contrib plugin)](https://docs.feast.dev/reference/data-sources/clickhouse) * [x] [Oracle (contrib plugin)](https://docs.feast.dev/reference/data-sources/oracle) + * [x] [MongoDB (contrib plugin)](https://docs.feast.dev/reference/data-sources/mongodb) * [x] [Ray source (contrib plugin)](https://docs.feast.dev/reference/data-sources/ray) * [x] Kafka / Kinesis sources (via [push support into the online store](https://docs.feast.dev/reference/data-sources/push)) * **Offline Stores** @@ -39,6 +40,7 @@ The list below contains the functionality that contributors are planning to deve * [x] [Clickhouse (contrib plugin)](https://docs.feast.dev/reference/offline-stores/clickhouse) * [x] [Ray (contrib plugin)](https://docs.feast.dev/reference/offline-stores/ray) * [x] [Oracle (contrib plugin)](https://docs.feast.dev/reference/offline-stores/oracle) + * [x] [MongoDB (contrib plugin)](https://docs.feast.dev/reference/offline-stores/mongodb) * [x] [Hybrid](https://docs.feast.dev/reference/offline-stores/hybrid) * [x] [Custom offline store support](https://docs.feast.dev/how-to-guides/customizing-feast/adding-a-new-offline-store) * **Online Stores** diff --git a/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/README.md b/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/README.md index 6a30854969c..24446f8c003 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/README.md +++ b/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/README.md @@ -1,10 +1,9 @@ # MongoDB Offline Store This offline store lets you train models and run batch scoring directly from it. -All feature views share a single collection (`feature_history`). Reads use +All feature views share a single collection. Reads use MongoDB aggregation pipelines with a compound index, so per-entity cost is -O(log n_observations) regardless of collection size, and K feature views with the same -entity key collapse into one round-trip instead of K (1 if your data shares a unique id.) +O(log n_observations) regardless of collection size. ## Schema diff --git a/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/mongodb.py b/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/mongodb.py index 07e35f66c15..76aa173cb95 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/mongodb.py +++ b/sdk/python/feast/infra/offline_stores/contrib/mongodb_offline_store/mongodb.py @@ -17,11 +17,7 @@ Single-collection schema. Key optimizations: -1. K-collapse: feature views that share the same join key set are batched - into a single ``$match + $sort`` aggregation instead of K separate find - queries. Reduces round-trips from K to |unique join key signatures|. - -2. Server-side deduplication (scoring path): when entity_df has unique +1. Server-side deduplication (scoring path): when entity_df has unique entity IDs the aggregation adds a ``$group`` stage that returns at most one document per (entity_id, feature_view) pair — O(N×K) transfer instead of O(N×P×K). The compound index backs the entire pipeline, @@ -561,7 +557,7 @@ def get_historical_features( Training path (repeated entity IDs at different timestamps): Omits ``$group`` and uses ``merge_asof`` in Python, matching - standard PIT behaviour but still with K-collapsed queries. + standard PIT behaviour. Args: strict_pit: When True (default) features whose document timestamp