From f937c5df5140889c94a0cb3891b882d795db17d8 Mon Sep 17 00:00:00 2001 From: makinzm Date: Sat, 25 Apr 2026 12:36:28 +0900 Subject: [PATCH 1/4] feat: add versioning support to Milvus online store Signed-off-by: makinzm --- .../milvus_online_store/milvus.py | 15 +-- .../feast/infra/online_stores/online_store.py | 8 ++ .../online_store/test_milvus_versioning.py | 101 ++++++++++++++++++ 3 files changed, 117 insertions(+), 7 deletions(-) create mode 100644 sdk/python/tests/unit/infra/online_store/test_milvus_versioning.py diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index e33765c1ecf..497723f3b0e 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -19,6 +19,7 @@ deserialize_entity_key, serialize_entity_key, ) +from feast.infra.online_stores.helpers import compute_table_id from feast.infra.online_stores.online_store import OnlineStore from feast.infra.online_stores.vector_store import VectorStoreConfig from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto @@ -164,7 +165,7 @@ def _get_or_create_collection( ) -> Dict[str, Any]: self.client = self._connect(config) vector_field_dict = {k.name: k for k in table.schema if k.vector_index} - collection_name = _table_id(config.project, table) + collection_name = _table_id(config.project, table, config.registry.enable_online_feature_view_versioning) if collection_name not in self._collections: # Create a composite key by combining entity fields composite_key_name = _get_composite_key_name(table) @@ -346,7 +347,7 @@ def online_read( requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: self.client = self._connect(config) - collection_name = _table_id(config.project, table) + collection_name = _table_id(config.project, table, config.registry.enable_online_feature_view_versioning) collection = self._get_or_create_collection(config, table) composite_key_name = _get_composite_key_name(table) @@ -494,7 +495,7 @@ def update( self._get_or_create_collection(config, table) for table in tables_to_delete: - collection_name = _table_id(config.project, table) + collection_name = _table_id(config.project, table, config.registry.enable_online_feature_view_versioning) if self._collections.get(collection_name, None): self.client.drop_collection(collection_name) self._collections.pop(collection_name, None) @@ -512,7 +513,7 @@ def teardown( ): self.client = self._connect(config) for table in tables: - collection_name = _table_id(config.project, table) + collection_name = _table_id(config.project, table, config.registry.enable_online_feature_view_versioning) if self._collections.get(collection_name, None): self.client.drop_collection(collection_name) self._collections.pop(collection_name, None) @@ -551,7 +552,7 @@ def retrieve_online_documents_v2( k.name: k.dtype for k in table.entity_columns } self.client = self._connect(config) - collection_name = _table_id(config.project, table) + collection_name = _table_id(config.project, table, config.registry.enable_online_feature_view_versioning) collection = self._get_or_create_collection(config, table) if not config.online_store.vector_enabled: raise ValueError("Vector search is not enabled in the online store config") @@ -749,8 +750,8 @@ def retrieve_online_documents_v2( return result_list -def _table_id(project: str, table: FeatureView) -> str: - return f"{project}_{table.name}" +def _table_id(project: str, table: FeatureView, enable_versioning: bool = False) -> str: + return compute_table_id(project, table, enable_versioning) def _get_composite_key_name(table: FeatureView) -> str: diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index c3fda86cc5e..5b5daf17575 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -292,6 +292,14 @@ def _check_versioned_read_support(self, grouped_refs): supported_types.append(DynamoDBOnlineStore) except Exception: pass + try: + from feast.infra.online_stores.milvus_online_store.milvus import ( + MilvusOnlineStore, + ) + + supported_types.append(MilvusOnlineStore) + except ImportError: + pass if isinstance(self, tuple(supported_types)): return diff --git a/sdk/python/tests/unit/infra/online_store/test_milvus_versioning.py b/sdk/python/tests/unit/infra/online_store/test_milvus_versioning.py new file mode 100644 index 00000000000..4faad4971e8 --- /dev/null +++ b/sdk/python/tests/unit/infra/online_store/test_milvus_versioning.py @@ -0,0 +1,101 @@ +"""Unit tests for Milvus online store feature view versioning.""" + +from datetime import timedelta +from unittest.mock import MagicMock + +from feast import Entity, FeatureView +from feast.field import Field +from feast.types import Float32 +from feast.value_type import ValueType + + +def _make_feature_view(name="driver_stats", version_number=None, version_tag=None): + entity = Entity( + name="driver_id", + join_keys=["driver_id"], + value_type=ValueType.INT64, + ) + fv = FeatureView( + name=name, + entities=[entity], + ttl=timedelta(days=1), + schema=[Field(name="trips_today", dtype=Float32)], + ) + if version_number is not None: + fv.current_version_number = version_number + if version_tag is not None: + fv.projection.version_tag = version_tag + return fv + + +def _make_config(project="test_project", versioning=False): + config = MagicMock() + config.project = project + config.entity_key_serialization_version = 2 + config.registry.enable_online_feature_view_versioning = versioning + return config + + +class TestTableId: + """Test _table_id with versioning enabled/disabled.""" + + def test_no_versioning(self): + from feast.infra.online_stores.milvus_online_store.milvus import _table_id + + fv = _make_feature_view() + config = _make_config(versioning=False) + assert _table_id(config.project, fv) == "test_project_driver_stats" + + def test_versioning_enabled_with_version(self): + from feast.infra.online_stores.milvus_online_store.milvus import _table_id + + fv = _make_feature_view(version_number=2) + config = _make_config(versioning=True) + assert _table_id(config.project, fv, enable_versioning=True) == "test_project_driver_stats_v2" + + def test_projection_version_tag_takes_priority(self): + from feast.infra.online_stores.milvus_online_store.milvus import _table_id + + fv = _make_feature_view(version_number=1, version_tag=3) + config = _make_config(versioning=True) + assert _table_id(config.project, fv, enable_versioning=True) == "test_project_driver_stats_v3" + + def test_version_zero_no_suffix(self): + from feast.infra.online_stores.milvus_online_store.milvus import _table_id + + fv = _make_feature_view(version_number=0) + config = _make_config(versioning=True) + assert _table_id(config.project, fv, enable_versioning=True) == "test_project_driver_stats" + + def test_versioning_enabled_no_version_set(self): + from feast.infra.online_stores.milvus_online_store.milvus import _table_id + + fv = _make_feature_view() + config = _make_config(versioning=True) + assert _table_id(config.project, fv, enable_versioning=True) == "test_project_driver_stats" + + def test_versioning_disabled_ignores_version(self): + from feast.infra.online_stores.milvus_online_store.milvus import _table_id + + fv = _make_feature_view(version_number=5) + config = _make_config(versioning=False) + assert _table_id(config.project, fv) == "test_project_driver_stats" + + +class TestMilvusVersionedReadSupport: + """Test that MilvusOnlineStore passes _check_versioned_read_support.""" + + def test_allowed_with_version_tag(self): + from feast.infra.online_stores.milvus_online_store.milvus import MilvusOnlineStore + + store = MilvusOnlineStore() + fv = _make_feature_view() + fv.projection.version_tag = 2 + store._check_versioned_read_support([(fv, ["trips_today"])]) + + def test_allowed_without_version_tag(self): + from feast.infra.online_stores.milvus_online_store.milvus import MilvusOnlineStore + + store = MilvusOnlineStore() + fv = _make_feature_view() + store._check_versioned_read_support([(fv, ["trips_today"])]) From b9f187322f2f953d756a49227f34ec37472be09c Mon Sep 17 00:00:00 2001 From: makinzm Date: Sat, 25 Apr 2026 13:56:16 +0900 Subject: [PATCH 2/4] lint: uv run ruff format sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py Signed-off-by: makinzm --- .../milvus_online_store/milvus.py | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index 497723f3b0e..e680d257fc2 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -165,7 +165,9 @@ def _get_or_create_collection( ) -> Dict[str, Any]: self.client = self._connect(config) vector_field_dict = {k.name: k for k in table.schema if k.vector_index} - collection_name = _table_id(config.project, table, config.registry.enable_online_feature_view_versioning) + collection_name = _table_id( + config.project, table, config.registry.enable_online_feature_view_versioning + ) if collection_name not in self._collections: # Create a composite key by combining entity fields composite_key_name = _get_composite_key_name(table) @@ -347,7 +349,9 @@ def online_read( requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: self.client = self._connect(config) - collection_name = _table_id(config.project, table, config.registry.enable_online_feature_view_versioning) + collection_name = _table_id( + config.project, table, config.registry.enable_online_feature_view_versioning + ) collection = self._get_or_create_collection(config, table) composite_key_name = _get_composite_key_name(table) @@ -495,7 +499,11 @@ def update( self._get_or_create_collection(config, table) for table in tables_to_delete: - collection_name = _table_id(config.project, table, config.registry.enable_online_feature_view_versioning) + collection_name = _table_id( + config.project, + table, + config.registry.enable_online_feature_view_versioning, + ) if self._collections.get(collection_name, None): self.client.drop_collection(collection_name) self._collections.pop(collection_name, None) @@ -513,7 +521,11 @@ def teardown( ): self.client = self._connect(config) for table in tables: - collection_name = _table_id(config.project, table, config.registry.enable_online_feature_view_versioning) + collection_name = _table_id( + config.project, + table, + config.registry.enable_online_feature_view_versioning, + ) if self._collections.get(collection_name, None): self.client.drop_collection(collection_name) self._collections.pop(collection_name, None) @@ -552,7 +564,9 @@ def retrieve_online_documents_v2( k.name: k.dtype for k in table.entity_columns } self.client = self._connect(config) - collection_name = _table_id(config.project, table, config.registry.enable_online_feature_view_versioning) + collection_name = _table_id( + config.project, table, config.registry.enable_online_feature_view_versioning + ) collection = self._get_or_create_collection(config, table) if not config.online_store.vector_enabled: raise ValueError("Vector search is not enabled in the online store config") From a957183f4dd212fe846e87afc1e566b3c82514d4 Mon Sep 17 00:00:00 2001 From: makinzm Date: Sun, 26 Apr 2026 07:39:39 +0900 Subject: [PATCH 3/4] feat: drop all versioned Milvus collections on teardown/update and lint Signed-off-by: makinzm --- .../milvus_online_store/milvus.py | 41 +++++---- .../online_store/test_milvus_versioning.py | 91 +++++++++++++++++-- 2 files changed, 110 insertions(+), 22 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index e680d257fc2..f06fa6e70e3 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -498,15 +498,12 @@ def update( for table in tables_to_keep: self._get_or_create_collection(config, table) + # Always drop the base collection plus any "_v{N}" siblings, regardless of + # the current versioning flag. This handles mixed-state repos where + # versioning was toggled on/off across applies and would otherwise leave + # orphan collections behind in Milvus. for table in tables_to_delete: - collection_name = _table_id( - config.project, - table, - config.registry.enable_online_feature_view_versioning, - ) - if self._collections.get(collection_name, None): - self.client.drop_collection(collection_name) - self._collections.pop(collection_name, None) + self._drop_all_version_collections(config.project, table) def plan( self, config: RepoConfig, desired_registry_proto: RegistryProto @@ -520,15 +517,9 @@ def teardown( entities: Sequence[Entity], ): self.client = self._connect(config) + # See update(): drop base + all "_v{N}" siblings to handle mixed-state repos. for table in tables: - collection_name = _table_id( - config.project, - table, - config.registry.enable_online_feature_view_versioning, - ) - if self._collections.get(collection_name, None): - self.client.drop_collection(collection_name) - self._collections.pop(collection_name, None) + self._drop_all_version_collections(config.project, table) def retrieve_online_documents_v2( self, @@ -763,6 +754,24 @@ def retrieve_online_documents_v2( result_list.append((res_ts, entity_key_proto, res if res else None)) return result_list + def _drop_all_version_collections(self, project: str, table: FeatureView) -> None: + """Drop the base collection and every ``_v{N}`` versioned sibling. + + Mirrors the ``_drop_all_version_tables`` helpers in the MySQL/PostgreSQL + online stores. Always called from ``update`` and ``teardown`` so a + repo that toggles versioning on and off does not leave orphan + collections behind in Milvus. + """ + base = f"{project}_{table.name}" + versioned_prefix = f"{base}_v" + for collection_name in self.client.list_collections(): + if collection_name == base or ( + collection_name.startswith(versioned_prefix) + and collection_name[len(versioned_prefix) :].isdigit() + ): + self.client.drop_collection(collection_name) + self._collections.pop(collection_name, None) + def _table_id(project: str, table: FeatureView, enable_versioning: bool = False) -> str: return compute_table_id(project, table, enable_versioning) diff --git a/sdk/python/tests/unit/infra/online_store/test_milvus_versioning.py b/sdk/python/tests/unit/infra/online_store/test_milvus_versioning.py index 4faad4971e8..c2979d37690 100644 --- a/sdk/python/tests/unit/infra/online_store/test_milvus_versioning.py +++ b/sdk/python/tests/unit/infra/online_store/test_milvus_versioning.py @@ -51,28 +51,40 @@ def test_versioning_enabled_with_version(self): fv = _make_feature_view(version_number=2) config = _make_config(versioning=True) - assert _table_id(config.project, fv, enable_versioning=True) == "test_project_driver_stats_v2" + assert ( + _table_id(config.project, fv, enable_versioning=True) + == "test_project_driver_stats_v2" + ) def test_projection_version_tag_takes_priority(self): from feast.infra.online_stores.milvus_online_store.milvus import _table_id fv = _make_feature_view(version_number=1, version_tag=3) config = _make_config(versioning=True) - assert _table_id(config.project, fv, enable_versioning=True) == "test_project_driver_stats_v3" + assert ( + _table_id(config.project, fv, enable_versioning=True) + == "test_project_driver_stats_v3" + ) def test_version_zero_no_suffix(self): from feast.infra.online_stores.milvus_online_store.milvus import _table_id fv = _make_feature_view(version_number=0) config = _make_config(versioning=True) - assert _table_id(config.project, fv, enable_versioning=True) == "test_project_driver_stats" + assert ( + _table_id(config.project, fv, enable_versioning=True) + == "test_project_driver_stats" + ) def test_versioning_enabled_no_version_set(self): from feast.infra.online_stores.milvus_online_store.milvus import _table_id fv = _make_feature_view() config = _make_config(versioning=True) - assert _table_id(config.project, fv, enable_versioning=True) == "test_project_driver_stats" + assert ( + _table_id(config.project, fv, enable_versioning=True) + == "test_project_driver_stats" + ) def test_versioning_disabled_ignores_version(self): from feast.infra.online_stores.milvus_online_store.milvus import _table_id @@ -86,7 +98,9 @@ class TestMilvusVersionedReadSupport: """Test that MilvusOnlineStore passes _check_versioned_read_support.""" def test_allowed_with_version_tag(self): - from feast.infra.online_stores.milvus_online_store.milvus import MilvusOnlineStore + from feast.infra.online_stores.milvus_online_store.milvus import ( + MilvusOnlineStore, + ) store = MilvusOnlineStore() fv = _make_feature_view() @@ -94,8 +108,73 @@ def test_allowed_with_version_tag(self): store._check_versioned_read_support([(fv, ["trips_today"])]) def test_allowed_without_version_tag(self): - from feast.infra.online_stores.milvus_online_store.milvus import MilvusOnlineStore + from feast.infra.online_stores.milvus_online_store.milvus import ( + MilvusOnlineStore, + ) store = MilvusOnlineStore() fv = _make_feature_view() store._check_versioned_read_support([(fv, ["trips_today"])]) + + +class TestTeardownDropsAllVersions: + """Teardown should drop the base collection AND all versioned collections.""" + + def _build_store_with_collections(self, existing_collections): + from feast.infra.online_stores.milvus_online_store.milvus import ( + MilvusOnlineStore, + ) + + store = MilvusOnlineStore() + store.client = MagicMock() + store.client.list_collections.return_value = existing_collections + store._connect = MagicMock(return_value=store.client) + store._collections = {name: MagicMock() for name in existing_collections} + return store + + def test_teardown_drops_base_and_all_versioned_collections(self): + fv = _make_feature_view() + config = _make_config(versioning=True) + existing = [ + "test_project_driver_stats", + "test_project_driver_stats_v1", + "test_project_driver_stats_v2", + "test_project_other_view", # unrelated, must not be dropped + ] + store = self._build_store_with_collections(existing) + + store.teardown(config, [fv], []) + + dropped = {call.args[0] for call in store.client.drop_collection.call_args_list} + assert dropped == { + "test_project_driver_stats", + "test_project_driver_stats_v1", + "test_project_driver_stats_v2", + } + assert "test_project_other_view" not in dropped + + def test_update_drops_all_versions_for_deleted_table(self): + fv = _make_feature_view() + config = _make_config(versioning=True) + existing = [ + "test_project_driver_stats", + "test_project_driver_stats_v3", + "test_project_driver_stats_v4", + ] + store = self._build_store_with_collections(existing) + + store.update( + config=config, + tables_to_delete=[fv], + tables_to_keep=[], + entities_to_delete=[], + entities_to_keep=[], + partial=False, + ) + + dropped = {call.args[0] for call in store.client.drop_collection.call_args_list} + assert dropped == { + "test_project_driver_stats", + "test_project_driver_stats_v3", + "test_project_driver_stats_v4", + } From 3f2989a0280dbaf6aae4fe49a0f387bd8bf80477 Mon Sep 17 00:00:00 2001 From: makinzm Date: Tue, 28 Apr 2026 17:38:41 +0900 Subject: [PATCH 4/4] chore: fix type to check whether it is None Signed-off-by: makinzm --- .../feast/infra/online_stores/milvus_online_store/milvus.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index f06fa6e70e3..941de3b64cd 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -764,6 +764,7 @@ def _drop_all_version_collections(self, project: str, table: FeatureView) -> Non """ base = f"{project}_{table.name}" versioned_prefix = f"{base}_v" + assert self.client is not None, "Milvus client is not initialized" for collection_name in self.client.list_collections(): if collection_name == base or ( collection_name.startswith(versioned_prefix)