diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index 76da6ad831d..f3437b08f30 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -386,6 +386,12 @@ def list_data_sources( def apply_data_source( self, data_source: DataSource, project: str, commit: bool = True ): + """Apply a data source to the registry with project-scoped deduplication. + + Filters existing data sources by both name and project (fixes feast-dev/feast#6206), + preserving the original created_timestamp if the source already exists in the + target project. + """ now = _utc_now() if not data_source.created_timestamp: data_source.created_timestamp = now @@ -394,7 +400,10 @@ def apply_data_source( registry = self._prepare_registry_for_changes(project) for idx, existing_data_source_proto in enumerate(registry.data_sources): - if existing_data_source_proto.name == data_source.name: + if ( + existing_data_source_proto.name == data_source.name + and existing_data_source_proto.project == project + ): existing_data_source = DataSource.from_proto(existing_data_source_proto) # Check if the data source has actually changed if existing_data_source == data_source: @@ -423,7 +432,7 @@ def delete_data_source(self, name: str, project: str, commit: bool = True): for idx, data_source_proto in enumerate( self.cached_registry_proto.data_sources ): - if data_source_proto.name == name: + if data_source_proto.name == name and data_source_proto.project == project: del self.cached_registry_proto.data_sources[idx] if commit: self.commit() diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index fb09395d789..88d827d786a 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -899,6 +899,111 @@ def test_apply_data_source_with_timestamps(test_registry): test_registry.teardown() +@pytest.mark.integration +@pytest.mark.parametrize( + "test_registry", + all_fixtures, +) +def test_apply_data_source_cross_project_isolation(test_registry): + """Test that apply_data_source uses project-scoped filtering. + + Regression test for https://github.com/feast-dev/feast/issues/6206: + applying a data source to one project must not overwrite the data source + with the same name in a different project. + + See: feast-dev/feast#6298 + """ + project_a = "project_a" + project_b = "project_b" + + source_a = FileSource( + name="shared_source_name", + file_format=ParquetFormat(), + path="file://feast/project_a.parquet", + timestamp_field="ts_col", + ) + source_b = FileSource( + name="shared_source_name", + file_format=ParquetFormat(), + path="file://feast/project_b.parquet", + timestamp_field="ts_col", + ) + + test_registry.apply_data_source(source_a, project_a, commit=True) + test_registry.apply_data_source(source_b, project_b, commit=True) + + # Each project should have exactly its own source + sources_a = test_registry.list_data_sources(project_a) + sources_b = test_registry.list_data_sources(project_b) + assert len(sources_a) == 1 + assert len(sources_b) == 1 + + # Paths must be project-specific — not overwritten cross-project + assert sources_a[0].path == "file://feast/project_a.parquet" + assert sources_b[0].path == "file://feast/project_b.parquet" + + # Re-apply source_b with updated path: must not bleed into project_a + source_b_updated = FileSource( + name="shared_source_name", + file_format=ParquetFormat(), + path="file://feast/project_b_v2.parquet", + timestamp_field="ts_col", + ) + test_registry.apply_data_source(source_b_updated, project_b, commit=True) + + sources_a_after = test_registry.list_data_sources(project_a) + assert len(sources_a_after) == 1 + assert sources_a_after[0].path == "file://feast/project_a.parquet", ( + "apply_data_source for project_b must not overwrite project_a's source" + ) + + test_registry.teardown() + + +@pytest.mark.integration +@pytest.mark.parametrize( + "test_registry", + all_fixtures, +) +def test_delete_data_source_project_scoped(test_registry): + """Test that delete_data_source only removes the source from the given project. + + Regression test for https://github.com/feast-dev/feast/issues/6206: + deleting a data source from one project must not delete the data source + with the same name from another project. + """ + project_a = "project_a" + project_b = "project_b" + + source_a = FileSource( + name="shared_source_name", + file_format=ParquetFormat(), + path="file://feast/project_a.parquet", + timestamp_field="ts_col", + ) + source_b = FileSource( + name="shared_source_name", + file_format=ParquetFormat(), + path="file://feast/project_b.parquet", + timestamp_field="ts_col", + ) + + test_registry.apply_data_source(source_a, project_a, commit=True) + test_registry.apply_data_source(source_b, project_b, commit=True) + + # Delete the source from project_a only + test_registry.delete_data_source("shared_source_name", project_a, commit=True) + + # project_a should have no sources; project_b should be unaffected + sources_a = test_registry.list_data_sources(project_a) + sources_b = test_registry.list_data_sources(project_b) + assert len(sources_a) == 0, "Source should be deleted from project_a" + assert len(sources_b) == 1, "Source in project_b must not be deleted" + assert sources_b[0].path == "file://feast/project_b.parquet" + + test_registry.teardown() + + @pytest.mark.integration @pytest.mark.parametrize( "test_registry",