From 4db29fee52276f780d1436afdcc6516912f03f8e Mon Sep 17 00:00:00 2001 From: Robin Neufeld Date: Mon, 26 Jun 2023 17:42:01 -0400 Subject: [PATCH 01/10] Add fully-qualified-table-name Redshift prop Signed-off-by: Robin Neufeld --- sdk/python/feast/infra/offline_stores/redshift.py | 2 +- sdk/python/feast/infra/offline_stores/redshift_source.py | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index aba2bda353c..837cf49655d 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -369,7 +369,7 @@ def offline_write_batch( s3_resource=s3_resource, s3_path=f"{config.offline_store.s3_staging_location}/push/{uuid.uuid4()}.parquet", iam_role=config.offline_store.iam_role, - table_name=redshift_options.table, + table_name=redshift_options.fully_qualified_table_name, schema=pa_schema, fail_if_exists=False, ) diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index 1f80dede076..11cb112c87a 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -293,6 +293,15 @@ def from_proto(cls, redshift_options_proto: DataSourceProto.RedshiftOptions): ) return redshift_options + + @property + def fully_qualified_table_name(self) -> str: + if self.database and self.schema: + return f"{self.database}.{self.schema}.{self.table}" + elif self.schema: + return f"{self.schema}.{self.table}" + else: + return self.table def to_proto(self) -> DataSourceProto.RedshiftOptions: """ From 77fc3c5695bcb11f77f617e75a31e35832fda367 Mon Sep 17 00:00:00 2001 From: Robin Neufeld Date: Mon, 26 Jun 2023 17:45:03 -0400 Subject: [PATCH 02/10] pre-commit Signed-off-by: Robin Neufeld --- sdk/python/feast/infra/offline_stores/redshift_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index 11cb112c87a..e1df67ab9bb 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -293,7 +293,7 @@ def from_proto(cls, redshift_options_proto: DataSourceProto.RedshiftOptions): ) return redshift_options - + @property def fully_qualified_table_name(self) -> str: if self.database and self.schema: From da296fba75fd831ae3be4db6e99e51bd259c4098 Mon Sep 17 00:00:00 2001 From: Robin Neufeld Date: Fri, 30 Jun 2023 11:24:49 -0400 Subject: [PATCH 03/10] Docstring Signed-off-by: Robin Neufeld --- .../feast/infra/offline_stores/redshift_source.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index e1df67ab9bb..d7775fd2715 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -295,7 +295,18 @@ def from_proto(cls, redshift_options_proto: DataSourceProto.RedshiftOptions): return redshift_options @property - def fully_qualified_table_name(self) -> str: + def fully_qualified_table_name(self) -> Optional[str]: + """ + The fully qualified table name of this Redshift table. + + Returns: + A string in the format of ... + May be empty or None if the table is not set. + """"" + + if not self.table: + return self.table + if self.database and self.schema: return f"{self.database}.{self.schema}.{self.table}" elif self.schema: From 2edd7acc8b9130da0f5ff636093e0aefe57315e8 Mon Sep 17 00:00:00 2001 From: Robin Neufeld Date: Fri, 30 Jun 2023 12:06:16 -0400 Subject: [PATCH 04/10] Test fully_qualified_table_name Signed-off-by: Robin Neufeld --- .../infra/offline_stores/redshift_source.py | 8 ++++++ sdk/python/tests/unit/test_data_sources.py | 25 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index d7775fd2715..902b0418abc 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -306,6 +306,14 @@ def fully_qualified_table_name(self) -> Optional[str]: if not self.table: return self.table + + # If the table name is already fully qualified, return it as is + if self.table.count(".") == 2: + return self.table + elif self.table.count(".") == 1 and self.database: + return f"{self.database}.{self.table}" + elif self.table.count(".") == 1: + return self.table if self.database and self.schema: return f"{self.database}.{self.schema}.{self.table}" diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index 30b030feb67..9639199438f 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -190,3 +190,28 @@ def test_column_conflict(): timestamp_field="event_timestamp", created_timestamp_column="event_timestamp", ) + +@pytest.mark.parametrize( + "source_kwargs,expected_name", + [ + ({"database": "test_database", "schema": "test_schema", "table": "test_table"}, "test_database.test_schema.test_table"), + ({"database": "test_database", "table": "test_table"}, "test_database.public.test_table"), + ({"table": "test_table"}, "public.test_table"), + ({"database": "test_database", "table": "b.c"}, "test_database.b.c"), + ({"database": "test_database", "table": "a.b.c"}, "a.b.c"), + ({"database": "test_database", "schema": "test_schema", "query": "select * from abc"}, ""), + ] +) +def test_redshift_fully_qualified_table_name(source_kwargs, expected_name): + redshift_source = RedshiftSource( + name="test_source", + timestamp_field="event_timestamp", + created_timestamp_column="created_timestamp", + field_mapping={"foo": "bar"}, + description="test description", + tags={"test": "test"}, + owner="test@gmail.com", + **source_kwargs + ) + + assert redshift_source.redshift_options.fully_qualified_table_name == expected_name \ No newline at end of file From dc7b9d7fa93d3fd6f9bba3a7c0b874ab78096ac5 Mon Sep 17 00:00:00 2001 From: Robin Neufeld Date: Fri, 30 Jun 2023 12:09:29 -0400 Subject: [PATCH 05/10] Simplify logic Signed-off-by: Robin Neufeld --- .../infra/offline_stores/redshift_source.py | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index 902b0418abc..9a49981efb3 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -307,20 +307,26 @@ def fully_qualified_table_name(self) -> Optional[str]: if not self.table: return self.table - # If the table name is already fully qualified, return it as is - if self.table.count(".") == 2: - return self.table - elif self.table.count(".") == 1 and self.database: - return f"{self.database}.{self.table}" - elif self.table.count(".") == 1: - return self.table + # self.table may already contain the database and schema + parts = self.table.split(".") + if len(parts) == 3: + database, schema, table = parts + elif len(parts) == 2: + database = self.database + schema, table = parts + elif len(parts) == 1: + database = self.database + schema = self.schema + table = parts[0] + else: + raise ValueError(f"Invalid table name: {self.table} - can't determine database and schema") - if self.database and self.schema: - return f"{self.database}.{self.schema}.{self.table}" - elif self.schema: - return f"{self.schema}.{self.table}" + if database and schema: + return f"{database}.{schema}.{table}" + elif schema: + return f"{schema}.{table}" else: - return self.table + return table def to_proto(self) -> DataSourceProto.RedshiftOptions: """ From 37a8a84f56de224df458905527ae5f7846c11e69 Mon Sep 17 00:00:00 2001 From: Robin Neufeld Date: Fri, 30 Jun 2023 12:11:06 -0400 Subject: [PATCH 06/10] pre-commit Signed-off-by: Robin Neufeld --- .../infra/offline_stores/redshift_source.py | 8 +++-- sdk/python/tests/unit/test_data_sources.py | 30 +++++++++++++++---- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index 9a49981efb3..0ed8df5e401 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -302,11 +302,11 @@ def fully_qualified_table_name(self) -> Optional[str]: Returns: A string in the format of ..
. May be empty or None if the table is not set. - """"" + """ if not self.table: return self.table - + # self.table may already contain the database and schema parts = self.table.split(".") if len(parts) == 3: @@ -319,7 +319,9 @@ def fully_qualified_table_name(self) -> Optional[str]: schema = self.schema table = parts[0] else: - raise ValueError(f"Invalid table name: {self.table} - can't determine database and schema") + raise ValueError( + f"Invalid table name: {self.table} - can't determine database and schema" + ) if database and schema: return f"{database}.{schema}.{table}" diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index 9639199438f..990c5d3b698 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -191,16 +191,34 @@ def test_column_conflict(): created_timestamp_column="event_timestamp", ) + @pytest.mark.parametrize( "source_kwargs,expected_name", [ - ({"database": "test_database", "schema": "test_schema", "table": "test_table"}, "test_database.test_schema.test_table"), - ({"database": "test_database", "table": "test_table"}, "test_database.public.test_table"), + ( + { + "database": "test_database", + "schema": "test_schema", + "table": "test_table", + }, + "test_database.test_schema.test_table", + ), + ( + {"database": "test_database", "table": "test_table"}, + "test_database.public.test_table", + ), ({"table": "test_table"}, "public.test_table"), ({"database": "test_database", "table": "b.c"}, "test_database.b.c"), ({"database": "test_database", "table": "a.b.c"}, "a.b.c"), - ({"database": "test_database", "schema": "test_schema", "query": "select * from abc"}, ""), - ] + ( + { + "database": "test_database", + "schema": "test_schema", + "query": "select * from abc", + }, + "", + ), + ], ) def test_redshift_fully_qualified_table_name(source_kwargs, expected_name): redshift_source = RedshiftSource( @@ -211,7 +229,7 @@ def test_redshift_fully_qualified_table_name(source_kwargs, expected_name): description="test description", tags={"test": "test"}, owner="test@gmail.com", - **source_kwargs + **source_kwargs, ) - assert redshift_source.redshift_options.fully_qualified_table_name == expected_name \ No newline at end of file + assert redshift_source.redshift_options.fully_qualified_table_name == expected_name From 3bd0cdfacdbfb9a986b887b3e8053de23e2768b0 Mon Sep 17 00:00:00 2001 From: Robin Neufeld Date: Fri, 30 Jun 2023 12:16:18 -0400 Subject: [PATCH 07/10] pre-commit Signed-off-by: Robin Neufeld --- sdk/python/feast/infra/offline_stores/redshift_source.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index 0ed8df5e401..e312a9b4ce2 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -295,17 +295,17 @@ def from_proto(cls, redshift_options_proto: DataSourceProto.RedshiftOptions): return redshift_options @property - def fully_qualified_table_name(self) -> Optional[str]: + def fully_qualified_table_name(self) -> str: """ The fully qualified table name of this Redshift table. - + Returns: A string in the format of ..
. May be empty or None if the table is not set. """ if not self.table: - return self.table + return "" # self.table may already contain the database and schema parts = self.table.split(".") From bee4e6b5224c303da3e182ae9de5b80b305b9c8d Mon Sep 17 00:00:00 2001 From: Robin Neufeld Date: Wed, 5 Jul 2023 17:51:19 -0400 Subject: [PATCH 08/10] Test offline_write_batch Signed-off-by: Robin Neufeld --- .../infra/offline_stores/test_redshift.py | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 sdk/python/tests/unit/infra/offline_stores/test_redshift.py diff --git a/sdk/python/tests/unit/infra/offline_stores/test_redshift.py b/sdk/python/tests/unit/infra/offline_stores/test_redshift.py new file mode 100644 index 00000000000..049977489b9 --- /dev/null +++ b/sdk/python/tests/unit/infra/offline_stores/test_redshift.py @@ -0,0 +1,67 @@ +from unittest.mock import MagicMock, patch + +import pandas as pd +import pyarrow as pa + +from feast import FeatureView +from feast.infra.offline_stores import offline_utils +from feast.infra.offline_stores.redshift import ( + RedshiftOfflineStore, + RedshiftOfflineStoreConfig, +) +from feast.infra.offline_stores.redshift_source import RedshiftSource +from feast.infra.utils import aws_utils +from feast.repo_config import RepoConfig + + +@patch.object(aws_utils, "upload_arrow_table_to_redshift") +def test_offline_write_batch( + mock_upload_arrow_table_to_redshift: MagicMock, + simple_dataset_1: pd.DataFrame, +): + repo_config = RepoConfig( + registry="registry", + project="project", + provider="local", + offline_store=RedshiftOfflineStoreConfig( + type="redshift", + region="us-west-2", + cluster_id="cluster_id", + database="database", + user="user", + iam_role="abcdef", + s3_staging_location="s3://bucket/path", + ), + ) + + batch_source = RedshiftSource( + name="test_source", + timestamp_field="ts", + table="table_name", + schema="schema_name", + ) + feature_view = FeatureView( + name="test_view", + source=batch_source, + ) + + pa_dataset = pa.Table.from_pandas(simple_dataset_1) + + # patch some more things so that the function can run + def mock_get_pyarrow_schema_from_batch_source(*args, **kwargs) -> pa.Schema: + return pa_dataset.schema, pa_dataset.column_names + + with patch.object( + offline_utils, + "get_pyarrow_schema_from_batch_source", + new=mock_get_pyarrow_schema_from_batch_source, + ): + RedshiftOfflineStore.offline_write_batch( + repo_config, feature_view, pa_dataset, progress=None + ) + + # check that we have included the fully qualified table name + mock_upload_arrow_table_to_redshift.assert_called_once() + + call = mock_upload_arrow_table_to_redshift.call_args_list[0] + assert call.kwargs["table_name"] == "schema_name.table_name" From 786f7861d2ae15ccef2e74232dbd3004220801f6 Mon Sep 17 00:00:00 2001 From: Robin Neufeld Date: Mon, 17 Jul 2023 16:14:53 -0400 Subject: [PATCH 09/10] Bump to trigger CI Signed-off-by: Robin Neufeld --- sdk/python/feast/infra/offline_stores/redshift_source.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index e312a9b4ce2..07337bc85d1 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -300,7 +300,7 @@ def fully_qualified_table_name(self) -> str: The fully qualified table name of this Redshift table. Returns: - A string in the format of ..
. + A string in the format of ..
May be empty or None if the table is not set. """ @@ -359,7 +359,6 @@ def __init__(self, table_ref: str): @staticmethod def from_proto(storage_proto: SavedDatasetStorageProto) -> SavedDatasetStorage: - return SavedDatasetRedshiftStorage( table_ref=RedshiftOptions.from_proto(storage_proto.redshift_storage).table ) From 6529b2a216ed75b14d0edf7531ea99ee0aefc056 Mon Sep 17 00:00:00 2001 From: Robin Neufeld Date: Fri, 21 Jul 2023 13:46:48 -0400 Subject: [PATCH 10/10] another bump for ci Signed-off-by: Robin Neufeld --- sdk/python/feast/infra/offline_stores/redshift_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index 07337bc85d1..52ab50ba000 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -301,7 +301,7 @@ def fully_qualified_table_name(self) -> str: Returns: A string in the format of ..
- May be empty or None if the table is not set. + May be empty or None if the table is not set """ if not self.table: