From 4134113d36f30eb8a3680a34cdb323615f6263d6 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 15 Feb 2022 09:48:22 -0800 Subject: [PATCH 01/35] Fix materialization bug Signed-off-by: Kevin Zhang --- sdk/python/feast/infra/online_stores/redis.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 0420751bbde..7fcac794250 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -41,7 +41,7 @@ try: from redis import Redis - from redis.cluster import RedisCluster + from redis.cluster import ClusterNode, RedisCluster except ImportError as e: from feast.errors import FeastExtrasDependencyImportError @@ -160,8 +160,9 @@ def _get_client(self, online_store_config: RedisOnlineStoreConfig): online_store_config.connection_string ) if online_store_config.redis_type == RedisType.redis_cluster: - kwargs["startup_nodes"] = startup_nodes - self._client = RedisCluster(**kwargs) + kwargs["startup_nodes"] = [ + ClusterNode(**node) for node in startup_nodes + ] else: kwargs["host"] = startup_nodes[0]["host"] kwargs["port"] = startup_nodes[0]["port"] From 926260b80cdfe81a4049accb36fad88ff24a33a6 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 15 Feb 2022 10:04:43 -0800 Subject: [PATCH 02/35] oops removed something when i was copy pasting Signed-off-by: Kevin Zhang --- sdk/python/feast/infra/online_stores/redis.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 7fcac794250..b557fddc687 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -163,6 +163,7 @@ def _get_client(self, online_store_config: RedisOnlineStoreConfig): kwargs["startup_nodes"] = [ ClusterNode(**node) for node in startup_nodes ] + self._client = RedisCluster(**kwargs) else: kwargs["host"] = startup_nodes[0]["host"] kwargs["port"] = startup_nodes[0]["port"] From b4e9ac0a38fcf5f5d2dfc4212351cba483ecac60 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 15 Feb 2022 16:08:49 -0800 Subject: [PATCH 03/35] Test github workflow Signed-off-by: Kevin Zhang --- .github/workflows/pr_integration_tests.yml | 9 +++ .../online_store/test_e2e_local.py | 63 +++++++++++++++++++ sdk/python/tests/utils/cli_utils.py | 42 +++++++++++++ 3 files changed, 114 insertions(+) diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index e04b78ec320..258cbd64542 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -145,6 +145,15 @@ jobs: run: pip install pip-tools - name: Install dependencies run: make install-python-ci-dependencies + - name: Test redis cluster + uses: vishnudxb/redis-cluster@1.0.5 + with: + master1-port: 5000 + master2-port: 5001 + master3-port: 5002 + slave1-port: 5003 + slave2-port: 5004 + slave3-port: 5005 - name: Test python if: ${{ always() }} # this will guarantee that step won't be canceled and resources won't leak env: diff --git a/sdk/python/tests/integration/online_store/test_e2e_local.py b/sdk/python/tests/integration/online_store/test_e2e_local.py index 79902273440..36506359975 100644 --- a/sdk/python/tests/integration/online_store/test_e2e_local.py +++ b/sdk/python/tests/integration/online_store/test_e2e_local.py @@ -150,3 +150,66 @@ def test_e2e_local() -> None: assert returncode != 0 assert "feast.errors.FeastJoinKeysDuringMaterialization" in str(output) + + +def test_redis_cluster_e2e_local() -> None: + """ + A more comprehensive than "basic" test, using local provider and redis cluster as the online store. + + 1. Create a repo. + 2. Apply + 3. Ingest some data to online store from parquet + 4. Read from the online store to make sure it made it there. + """ + + runner = CliRunner() + with tempfile.TemporaryDirectory() as data_dir: + + # Generate some test data in parquet format. + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = driver_data.create_driver_hourly_stats_df( + driver_entities, start_date, end_date + ) + driver_stats_path = os.path.join(data_dir, "driver_stats.parquet") + driver_df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True) + + global_df = driver_data.create_global_daily_stats_df(start_date, end_date) + global_stats_path = os.path.join(data_dir, "global_stats.parquet") + global_df.to_parquet(path=global_stats_path, allow_truncated_timestamps=True) + + # Note that runner takes care of running apply/teardown for us here. + # We patch python code in example_feature_repo_2.py to set the path to Parquet files. + with runner.local_redis_repo( + get_example_repo("example_feature_repo_2.py") + .replace("%PARQUET_PATH%", driver_stats_path) + .replace("%PARQUET_PATH_GLOBAL%", global_stats_path) + ) as store: + + assert store.repo_path is not None + + # feast materialize + r = runner.run( + [ + "materialize", + start_date.isoformat(), + (end_date - timedelta(days=7)).isoformat(), + ], + cwd=Path(store.repo_path), + ) + + assert r.returncode == 0 + + _assert_online_features(store, driver_df, end_date - timedelta(days=7)) + + # feast materialize-incremental + r = runner.run( + ["materialize-incremental", end_date.isoformat()], + cwd=Path(store.repo_path), + ) + + assert r.returncode == 0 + + _assert_online_features(store, driver_df, end_date) diff --git a/sdk/python/tests/utils/cli_utils.py b/sdk/python/tests/utils/cli_utils.py index 5d6d5722eb1..5487badef26 100644 --- a/sdk/python/tests/utils/cli_utils.py +++ b/sdk/python/tests/utils/cli_utils.py @@ -88,3 +88,45 @@ def local_repo(self, example_repo_py: str, offline_store: str): result = self.run(["teardown"], cwd=repo_path) assert result.returncode == 0 + + @contextmanager + def local_redis_repo(self, example_repo_py: str): + """ + Convenience method to set up all the boilerplate for a local feature repo with a redis cluster. + """ + project_id = "test" + "".join( + random.choice(string.ascii_lowercase + string.digits) for _ in range(10) + ) + + with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: + + repo_path = Path(repo_dir_name) + data_path = Path(data_dir_name) + + repo_config = repo_path / "feature_store.yaml" + + repo_config.write_text( + dedent( + f""" + project: {project_id} + registry: {data_path / "registry.db"} + provider: local + online_store: + type: redis + redis_type: redis_cluster + connection_string: "127.0.0.1:5000,127.0.0.1:5001,127.0.0.1:5002" + """ + ) + ) + + repo_example = repo_path / "example.py" + repo_example.write_text(example_repo_py) + + result = self.run(["apply"], cwd=repo_path) + assert result.returncode == 0 + + yield FeatureStore(repo_path=str(repo_path), config=None) + + result = self.run(["teardown"], cwd=repo_path) + assert result.returncode == 0 + From 1281b2d841ec1e405b7140ae4625d76cfa9d7517 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 15 Feb 2022 16:09:41 -0800 Subject: [PATCH 04/35] lint Signed-off-by: Kevin Zhang --- sdk/python/tests/utils/cli_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/tests/utils/cli_utils.py b/sdk/python/tests/utils/cli_utils.py index 5487badef26..ca79d6256a9 100644 --- a/sdk/python/tests/utils/cli_utils.py +++ b/sdk/python/tests/utils/cli_utils.py @@ -129,4 +129,3 @@ def local_redis_repo(self, example_repo_py: str): result = self.run(["teardown"], cwd=repo_path) assert result.returncode == 0 - From b404f4f12346217229ec615df0c8281997aa20eb Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 15 Feb 2022 16:23:08 -0800 Subject: [PATCH 05/35] transfer to unit_tests Signed-off-by: Kevin Zhang --- .github/workflows/pr_integration_tests.yml | 9 --------- .github/workflows/unit_tests.yml | 9 +++++++++ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index 258cbd64542..e04b78ec320 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -145,15 +145,6 @@ jobs: run: pip install pip-tools - name: Install dependencies run: make install-python-ci-dependencies - - name: Test redis cluster - uses: vishnudxb/redis-cluster@1.0.5 - with: - master1-port: 5000 - master2-port: 5001 - master3-port: 5002 - slave1-port: 5003 - slave2-port: 5004 - slave3-port: 5005 - name: Test python if: ${{ always() }} # this will guarantee that step won't be canceled and resources won't leak env: diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index d9552e175e7..e58c7c9e5e2 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -41,6 +41,15 @@ jobs: run: pip install pip-tools - name: Install dependencies run: make install-python-ci-dependencies + - name: Test redis cluster + uses: vishnudxb/redis-cluster@1.0.5 + with: + master1-port: 5000 + master2-port: 5001 + master3-port: 5002 + slave1-port: 5003 + slave2-port: 5004 + slave3-port: 5005 - name: Test Python env: SNOWFLAKE_CI_DEPLOYMENT: ${{ secrets.SNOWFLAKE_CI_DEPLOYMENT }} From f8dab19da446b59893e64ab871dcbcb7a0838f55 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 15 Feb 2022 16:42:55 -0800 Subject: [PATCH 06/35] Brute force implementation Signed-off-by: Kevin Zhang --- .github/workflows/unit_tests.yml | 13 +++++-------- sdk/python/tests/utils/cli_utils.py | 2 +- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index e58c7c9e5e2..ba9d93476fb 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -42,14 +42,11 @@ jobs: - name: Install dependencies run: make install-python-ci-dependencies - name: Test redis cluster - uses: vishnudxb/redis-cluster@1.0.5 - with: - master1-port: 5000 - master2-port: 5001 - master3-port: 5002 - slave1-port: 5003 - slave2-port: 5004 - slave3-port: 5005 + run: | + git clone https://github.com/redis/redis + cd redis + make + cd utils/create-cluster/ - name: Test Python env: SNOWFLAKE_CI_DEPLOYMENT: ${{ secrets.SNOWFLAKE_CI_DEPLOYMENT }} diff --git a/sdk/python/tests/utils/cli_utils.py b/sdk/python/tests/utils/cli_utils.py index ca79d6256a9..a246bece7ba 100644 --- a/sdk/python/tests/utils/cli_utils.py +++ b/sdk/python/tests/utils/cli_utils.py @@ -114,7 +114,7 @@ def local_redis_repo(self, example_repo_py: str): online_store: type: redis redis_type: redis_cluster - connection_string: "127.0.0.1:5000,127.0.0.1:5001,127.0.0.1:5002" + connection_string: "127.0.0.1:30000,127.0.0.1:30001,127.0.0.1:30002" """ ) ) From c05d2b29bd1d21e787971ec2680a39e3bc4495b1 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 15 Feb 2022 16:51:49 -0800 Subject: [PATCH 07/35] Brute force implementation add gcc Signed-off-by: Kevin Zhang --- .github/workflows/unit_tests.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index ba9d93476fb..a9cdf655787 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -41,6 +41,11 @@ jobs: run: pip install pip-tools - name: Install dependencies run: make install-python-ci-dependencies + - name: Set up GCC + uses: egor-tensin/setup-gcc@v1 + with: + version: latest + platform: x64 - name: Test redis cluster run: | git clone https://github.com/redis/redis From e53c75b4e1247e16eee1fddd0cf1884e07661a48 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 15 Feb 2022 17:02:14 -0800 Subject: [PATCH 08/35] Brute force implementation add gcc Signed-off-by: Kevin Zhang --- .github/workflows/unit_tests.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index a9cdf655787..7cd509f5171 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -50,6 +50,7 @@ jobs: run: | git clone https://github.com/redis/redis cd redis + cd deps; make hiredis lua jemalloc linenoise make cd utils/create-cluster/ - name: Test Python From eafc27dd590036bcdda948fd360e6714353ab022 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 15 Feb 2022 17:06:12 -0800 Subject: [PATCH 09/35] Brute force implementation add gcc Signed-off-by: Kevin Zhang --- .github/workflows/unit_tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index 7cd509f5171..dec9cf98e8b 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -50,7 +50,7 @@ jobs: run: | git clone https://github.com/redis/redis cd redis - cd deps; make hiredis lua jemalloc linenoise + make deps/hiredis deps/lua deps/jemalloc deps/linenoise make cd utils/create-cluster/ - name: Test Python From bbbb325cab9be0ff8ae9da579c40a965718ce27c Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 15 Feb 2022 17:13:27 -0800 Subject: [PATCH 10/35] Brute force implementation add gcc Signed-off-by: Kevin Zhang --- .github/workflows/unit_tests.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index dec9cf98e8b..051271f7dd8 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -48,6 +48,7 @@ jobs: platform: x64 - name: Test redis cluster run: | + sudo apt install build-essential git clone https://github.com/redis/redis cd redis make deps/hiredis deps/lua deps/jemalloc deps/linenoise From ac08a7ba71e8ac7be5c2af1e56b5a968b370884d Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 15 Feb 2022 17:24:28 -0800 Subject: [PATCH 11/35] Continue fixing... Signed-off-by: Kevin Zhang --- .github/workflows/unit_tests.yml | 8 ++------ sdk/python/tests/utils/cli_utils.py | 3 +-- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index 051271f7dd8..573c7aed97a 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -48,12 +48,8 @@ jobs: platform: x64 - name: Test redis cluster run: | - sudo apt install build-essential - git clone https://github.com/redis/redis - cd redis - make deps/hiredis deps/lua deps/jemalloc deps/linenoise - make - cd utils/create-cluster/ + docker pull vishnunair/docker-redis-cluster:latest + docker run -d -p 6000:6379 -p 6001:6380 -p 6002:6381 -p 6003:6382 -p 6004:6383 -p 6005:6384 --name redis-cluster vishnunair/docker-redis-cluster - name: Test Python env: SNOWFLAKE_CI_DEPLOYMENT: ${{ secrets.SNOWFLAKE_CI_DEPLOYMENT }} diff --git a/sdk/python/tests/utils/cli_utils.py b/sdk/python/tests/utils/cli_utils.py index a246bece7ba..f2a180964d7 100644 --- a/sdk/python/tests/utils/cli_utils.py +++ b/sdk/python/tests/utils/cli_utils.py @@ -114,11 +114,10 @@ def local_redis_repo(self, example_repo_py: str): online_store: type: redis redis_type: redis_cluster - connection_string: "127.0.0.1:30000,127.0.0.1:30001,127.0.0.1:30002" + connection_string: "127.0.0.1:6000,127.0.0.1:6001,127.0.0.1:6002" """ ) ) - repo_example = repo_path / "example.py" repo_example.write_text(example_repo_py) From fbb8b08e2ec73d5ef07c9c68f4ca324163fefd14 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 15 Feb 2022 17:30:04 -0800 Subject: [PATCH 12/35] Remove gcc setup Signed-off-by: Kevin Zhang --- .github/workflows/unit_tests.yml | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index 573c7aed97a..7b79edae6bb 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -41,12 +41,7 @@ jobs: run: pip install pip-tools - name: Install dependencies run: make install-python-ci-dependencies - - name: Set up GCC - uses: egor-tensin/setup-gcc@v1 - with: - version: latest - platform: x64 - - name: Test redis cluster + - name: Setup Redis Cluster run: | docker pull vishnunair/docker-redis-cluster:latest docker run -d -p 6000:6379 -p 6001:6380 -p 6002:6381 -p 6003:6382 -p 6004:6383 -p 6005:6384 --name redis-cluster vishnunair/docker-redis-cluster From ef14ff40faaab6c23a40068b108e7c0acf380258 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 10:40:30 -0800 Subject: [PATCH 13/35] Add integration test Signed-off-by: Kevin Zhang --- sdk/python/tests/conftest.py | 14 ++ .../feature_repos/repo_configuration.py | 4 + .../online_store/test_e2e_local.py | 122 +++++++++--------- .../online_store/test_universal_online.py | 101 +++++++++++++++ sdk/python/tests/utils/cli_utils.py | 78 +++++------ 5 files changed, 219 insertions(+), 100 deletions(-) diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 49f32379a3b..055d00fd8d5 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -24,6 +24,7 @@ from _pytest.nodes import Item from feast import FeatureStore +from sdk.python.tests.integration.feature_repos.repo_configuration import REDIS_CLUSTER_CONFIG from tests.data.data_creator import create_dataset from tests.integration.feature_repos.integration_test_repo_config import ( IntegrationTestRepoConfig, @@ -31,6 +32,7 @@ from tests.integration.feature_repos.repo_configuration import ( FULL_REPO_CONFIGS, REDIS_CONFIG, + REDIS_CLUSTER_CONFIG, Environment, construct_test_environment, construct_universal_data_sources, @@ -182,6 +184,18 @@ def cleanup(): request.addfinalizer(cleanup) return e +@pytest.fixture() +def local_redis_cluster_environment(request, worker_id): + e = construct_test_environment( + IntegrationTestRepoConfig(online_store=REDIS_CLUSTER_CONFIG), worker_id=worker_id + ) + + def cleanup(): + e.feature_store.teardown() + + request.addfinalizer(cleanup) + return e + @pytest.fixture(scope="session") def universal_data_sources(request, environment): diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 7de2effc5da..78429d91342 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -46,6 +46,9 @@ DYNAMO_CONFIG = {"type": "dynamodb", "region": "us-west-2"} REDIS_CONFIG = {"type": "redis", "connection_string": "localhost:6379,db=0"} +REDIS_CLUSTER_CONFIG = { + "type": "redis", "redis_type": "redis_cluster", + "connection_string": "127.0.0.1:6000,127.0.0.1:6001,127.0.0.1:6002"} # FULL_REPO_CONFIGS contains the repo configurations (e.g. provider, offline store, # online store, test data, and more parameters) that most integration tests will test @@ -63,6 +66,7 @@ DEFAULT_FULL_REPO_CONFIGS.extend( [ IntegrationTestRepoConfig(online_store=REDIS_CONFIG), + IntegrationTestRepoConfig(online_store=REDIS_CLUSTER_CONFIG), # GCP configurations IntegrationTestRepoConfig( provider="gcp", diff --git a/sdk/python/tests/integration/online_store/test_e2e_local.py b/sdk/python/tests/integration/online_store/test_e2e_local.py index 36506359975..78bdff8ac5b 100644 --- a/sdk/python/tests/integration/online_store/test_e2e_local.py +++ b/sdk/python/tests/integration/online_store/test_e2e_local.py @@ -152,64 +152,64 @@ def test_e2e_local() -> None: assert "feast.errors.FeastJoinKeysDuringMaterialization" in str(output) -def test_redis_cluster_e2e_local() -> None: - """ - A more comprehensive than "basic" test, using local provider and redis cluster as the online store. - - 1. Create a repo. - 2. Apply - 3. Ingest some data to online store from parquet - 4. Read from the online store to make sure it made it there. - """ - - runner = CliRunner() - with tempfile.TemporaryDirectory() as data_dir: - - # Generate some test data in parquet format. - end_date = datetime.now().replace(microsecond=0, second=0, minute=0) - start_date = end_date - timedelta(days=15) - - driver_entities = [1001, 1002, 1003, 1004, 1005] - driver_df = driver_data.create_driver_hourly_stats_df( - driver_entities, start_date, end_date - ) - driver_stats_path = os.path.join(data_dir, "driver_stats.parquet") - driver_df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True) - - global_df = driver_data.create_global_daily_stats_df(start_date, end_date) - global_stats_path = os.path.join(data_dir, "global_stats.parquet") - global_df.to_parquet(path=global_stats_path, allow_truncated_timestamps=True) - - # Note that runner takes care of running apply/teardown for us here. - # We patch python code in example_feature_repo_2.py to set the path to Parquet files. - with runner.local_redis_repo( - get_example_repo("example_feature_repo_2.py") - .replace("%PARQUET_PATH%", driver_stats_path) - .replace("%PARQUET_PATH_GLOBAL%", global_stats_path) - ) as store: - - assert store.repo_path is not None - - # feast materialize - r = runner.run( - [ - "materialize", - start_date.isoformat(), - (end_date - timedelta(days=7)).isoformat(), - ], - cwd=Path(store.repo_path), - ) - - assert r.returncode == 0 - - _assert_online_features(store, driver_df, end_date - timedelta(days=7)) - - # feast materialize-incremental - r = runner.run( - ["materialize-incremental", end_date.isoformat()], - cwd=Path(store.repo_path), - ) - - assert r.returncode == 0 - - _assert_online_features(store, driver_df, end_date) +# def test_redis_cluster_e2e_local() -> None: +# """ +# A more comprehensive than "basic" test, using local provider and redis cluster as the online store. + +# 1. Create a repo. +# 2. Apply +# 3. Ingest some data to online store from parquet +# 4. Read from the online store to make sure it made it there. +# """ + +# runner = CliRunner() +# with tempfile.TemporaryDirectory() as data_dir: + +# # Generate some test data in parquet format. +# end_date = datetime.now().replace(microsecond=0, second=0, minute=0) +# start_date = end_date - timedelta(days=15) + +# driver_entities = [1001, 1002, 1003, 1004, 1005] +# driver_df = driver_data.create_driver_hourly_stats_df( +# driver_entities, start_date, end_date +# ) +# driver_stats_path = os.path.join(data_dir, "driver_stats.parquet") +# driver_df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True) + +# global_df = driver_data.create_global_daily_stats_df(start_date, end_date) +# global_stats_path = os.path.join(data_dir, "global_stats.parquet") +# global_df.to_parquet(path=global_stats_path, allow_truncated_timestamps=True) + +# # Note that runner takes care of running apply/teardown for us here. +# # We patch python code in example_feature_repo_2.py to set the path to Parquet files. +# with runner.local_redis_repo( +# get_example_repo("example_feature_repo_2.py") +# .replace("%PARQUET_PATH%", driver_stats_path) +# .replace("%PARQUET_PATH_GLOBAL%", global_stats_path) +# ) as store: + +# assert store.repo_path is not None + +# # feast materialize +# r = runner.run( +# [ +# "materialize", +# start_date.isoformat(), +# (end_date - timedelta(days=7)).isoformat(), +# ], +# cwd=Path(store.repo_path), +# ) + +# assert r.returncode == 0 + +# _assert_online_features(store, driver_df, end_date - timedelta(days=7)) + +# # feast materialize-incremental +# r = runner.run( +# ["materialize-incremental", end_date.isoformat()], +# cwd=Path(store.repo_path), +# ) + +# assert r.returncode == 0 + +# _assert_online_features(store, driver_df, end_date) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 7d6296baa51..f08742357ab 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -137,6 +137,107 @@ def test_write_to_online_store_event_check(local_redis_environment): assert df["string_col"].iloc[1] == "LATEST_VALUE2" assert df["string_col"].iloc[2] == "LATEST_VALUE3" +# TODO: make this work with all universal (all online store types) +@pytest.mark.integration +def test_write_to_online_store_event_check_with_redis_cluster(local_redis_cluster_environment): + if os.getenv("FEAST_IS_LOCAL_TEST", "False") == "True": + return + fs = local_redis_cluster_environment.feature_store + + # write same data points 3 with different timestamps + now = pd.Timestamp(datetime.datetime.utcnow()).round("ms") + hour_ago = pd.Timestamp(datetime.datetime.utcnow() - timedelta(hours=1)).round("ms") + latest = pd.Timestamp(datetime.datetime.utcnow() + timedelta(seconds=1)).round("ms") + + data = { + "id": [123, 567, 890], + "string_col": ["OLD_FEATURE", "LATEST_VALUE2", "LATEST_VALUE3"], + "ts_1": [hour_ago, now, now], + } + dataframe_source = pd.DataFrame(data) + with prep_file_source( + df=dataframe_source, event_timestamp_column="ts_1" + ) as file_source: + e = Entity(name="id", value_type=ValueType.STRING) + + # Create Feature View + fv1 = FeatureView( + name="feature_view_123", + features=[Feature(name="string_col", dtype=ValueType.STRING)], + entities=["id"], + batch_source=file_source, + ttl=timedelta(minutes=5), + ) + # Register Feature View and Entity + fs.apply([fv1, e]) + + # data to ingest into Online Store (recent) + data = { + "id": [123], + "string_col": ["hi_123"], + "ts_1": [now], + } + df_data = pd.DataFrame(data) + + # directly ingest data into the Online Store + fs.write_to_online_store("feature_view_123", df_data) + + df = fs.get_online_features( + features=["feature_view_123:string_col"], entity_rows=[{"id": 123}] + ).to_df() + assert df["string_col"].iloc[0] == "hi_123" + + # data to ingest into Online Store (1 hour delayed data) + # should now overwrite features for id=123 because it's less recent data + data = { + "id": [123, 567, 890], + "string_col": ["bye_321", "hello_123", "greetings_321"], + "ts_1": [hour_ago, hour_ago, hour_ago], + } + df_data = pd.DataFrame(data) + + # directly ingest data into the Online Store + fs.write_to_online_store("feature_view_123", df_data) + + df = fs.get_online_features( + features=["feature_view_123:string_col"], + entity_rows=[{"id": 123}, {"id": 567}, {"id": 890}], + ).to_df() + assert df["string_col"].iloc[0] == "hi_123" + assert df["string_col"].iloc[1] == "hello_123" + assert df["string_col"].iloc[2] == "greetings_321" + + # should overwrite string_col for id=123 because it's most recent based on event_timestamp + data = { + "id": [123], + "string_col": ["LATEST_VALUE"], + "ts_1": [latest], + } + df_data = pd.DataFrame(data) + + fs.write_to_online_store("feature_view_123", df_data) + + df = fs.get_online_features( + features=["feature_view_123:string_col"], + entity_rows=[{"id": 123}, {"id": 567}, {"id": 890}], + ).to_df() + assert df["string_col"].iloc[0] == "LATEST_VALUE" + assert df["string_col"].iloc[1] == "hello_123" + assert df["string_col"].iloc[2] == "greetings_321" + + # writes to online store via datasource (dataframe_source) materialization + fs.materialize( + start_date=datetime.datetime.now() - timedelta(hours=12), + end_date=datetime.datetime.utcnow(), + ) + + df = fs.get_online_features( + features=["feature_view_123:string_col"], + entity_rows=[{"id": 123}, {"id": 567}, {"id": 890}], + ).to_df() + assert df["string_col"].iloc[0] == "LATEST_VALUE" + assert df["string_col"].iloc[1] == "LATEST_VALUE2" + assert df["string_col"].iloc[2] == "LATEST_VALUE3" @pytest.mark.integration @pytest.mark.universal diff --git a/sdk/python/tests/utils/cli_utils.py b/sdk/python/tests/utils/cli_utils.py index f2a180964d7..a4b7cc1c9fc 100644 --- a/sdk/python/tests/utils/cli_utils.py +++ b/sdk/python/tests/utils/cli_utils.py @@ -89,42 +89,42 @@ def local_repo(self, example_repo_py: str, offline_store: str): result = self.run(["teardown"], cwd=repo_path) assert result.returncode == 0 - @contextmanager - def local_redis_repo(self, example_repo_py: str): - """ - Convenience method to set up all the boilerplate for a local feature repo with a redis cluster. - """ - project_id = "test" + "".join( - random.choice(string.ascii_lowercase + string.digits) for _ in range(10) - ) - - with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: - - repo_path = Path(repo_dir_name) - data_path = Path(data_dir_name) - - repo_config = repo_path / "feature_store.yaml" - - repo_config.write_text( - dedent( - f""" - project: {project_id} - registry: {data_path / "registry.db"} - provider: local - online_store: - type: redis - redis_type: redis_cluster - connection_string: "127.0.0.1:6000,127.0.0.1:6001,127.0.0.1:6002" - """ - ) - ) - repo_example = repo_path / "example.py" - repo_example.write_text(example_repo_py) - - result = self.run(["apply"], cwd=repo_path) - assert result.returncode == 0 - - yield FeatureStore(repo_path=str(repo_path), config=None) - - result = self.run(["teardown"], cwd=repo_path) - assert result.returncode == 0 + # @contextmanager + # def local_redis_repo(self, example_repo_py: str): + # """ + # Convenience method to set up all the boilerplate for a local feature repo with a redis cluster. + # """ + # project_id = "test" + "".join( + # random.choice(string.ascii_lowercase + string.digits) for _ in range(10) + # ) + + # with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: + + # repo_path = Path(repo_dir_name) + # data_path = Path(data_dir_name) + + # repo_config = repo_path / "feature_store.yaml" + + # repo_config.write_text( + # dedent( + # f""" + # project: {project_id} + # registry: {data_path / "registry.db"} + # provider: local + # online_store: + # type: redis + # redis_type: redis_cluster + # connection_string: "127.0.0.1:6000,127.0.0.1:6001,127.0.0.1:6002" + # """ + # ) + # ) + # repo_example = repo_path / "example.py" + # repo_example.write_text(example_repo_py) + + # result = self.run(["apply"], cwd=repo_path) + # assert result.returncode == 0 + + # yield FeatureStore(repo_path=str(repo_path), config=None) + + # result = self.run(["teardown"], cwd=repo_path) + # assert result.returncode == 0 From 99166498c9e31c055df75ed3455fdad08810ec7b Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 10:41:38 -0800 Subject: [PATCH 14/35] fix error Signed-off-by: Kevin Zhang --- sdk/python/tests/conftest.py | 10 +++-- .../feature_repos/repo_configuration.py | 6 ++- .../online_store/test_universal_online.py | 6 ++- sdk/python/tests/utils/cli_utils.py | 40 ------------------- 4 files changed, 16 insertions(+), 46 deletions(-) diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 055d00fd8d5..d9a623af131 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -22,17 +22,19 @@ import pandas as pd import pytest from _pytest.nodes import Item +from sdk.python.tests.integration.feature_repos.repo_configuration import ( + REDIS_CLUSTER_CONFIG, +) from feast import FeatureStore -from sdk.python.tests.integration.feature_repos.repo_configuration import REDIS_CLUSTER_CONFIG from tests.data.data_creator import create_dataset from tests.integration.feature_repos.integration_test_repo_config import ( IntegrationTestRepoConfig, ) from tests.integration.feature_repos.repo_configuration import ( FULL_REPO_CONFIGS, - REDIS_CONFIG, REDIS_CLUSTER_CONFIG, + REDIS_CONFIG, Environment, construct_test_environment, construct_universal_data_sources, @@ -184,10 +186,12 @@ def cleanup(): request.addfinalizer(cleanup) return e + @pytest.fixture() def local_redis_cluster_environment(request, worker_id): e = construct_test_environment( - IntegrationTestRepoConfig(online_store=REDIS_CLUSTER_CONFIG), worker_id=worker_id + IntegrationTestRepoConfig(online_store=REDIS_CLUSTER_CONFIG), + worker_id=worker_id, ) def cleanup(): diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 78429d91342..b0b86f84abc 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -47,8 +47,10 @@ DYNAMO_CONFIG = {"type": "dynamodb", "region": "us-west-2"} REDIS_CONFIG = {"type": "redis", "connection_string": "localhost:6379,db=0"} REDIS_CLUSTER_CONFIG = { - "type": "redis", "redis_type": "redis_cluster", - "connection_string": "127.0.0.1:6000,127.0.0.1:6001,127.0.0.1:6002"} + "type": "redis", + "redis_type": "redis_cluster", + "connection_string": "127.0.0.1:6000,127.0.0.1:6001,127.0.0.1:6002", +} # FULL_REPO_CONFIGS contains the repo configurations (e.g. provider, offline store, # online store, test data, and more parameters) that most integration tests will test diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index f08742357ab..a56abaf871c 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -137,9 +137,12 @@ def test_write_to_online_store_event_check(local_redis_environment): assert df["string_col"].iloc[1] == "LATEST_VALUE2" assert df["string_col"].iloc[2] == "LATEST_VALUE3" + # TODO: make this work with all universal (all online store types) @pytest.mark.integration -def test_write_to_online_store_event_check_with_redis_cluster(local_redis_cluster_environment): +def test_write_to_online_store_event_check_with_redis_cluster( + local_redis_cluster_environment, +): if os.getenv("FEAST_IS_LOCAL_TEST", "False") == "True": return fs = local_redis_cluster_environment.feature_store @@ -239,6 +242,7 @@ def test_write_to_online_store_event_check_with_redis_cluster(local_redis_cluste assert df["string_col"].iloc[1] == "LATEST_VALUE2" assert df["string_col"].iloc[2] == "LATEST_VALUE3" + @pytest.mark.integration @pytest.mark.universal def test_write_to_online_store(environment, universal_data_sources): diff --git a/sdk/python/tests/utils/cli_utils.py b/sdk/python/tests/utils/cli_utils.py index a4b7cc1c9fc..5d6d5722eb1 100644 --- a/sdk/python/tests/utils/cli_utils.py +++ b/sdk/python/tests/utils/cli_utils.py @@ -88,43 +88,3 @@ def local_repo(self, example_repo_py: str, offline_store: str): result = self.run(["teardown"], cwd=repo_path) assert result.returncode == 0 - - # @contextmanager - # def local_redis_repo(self, example_repo_py: str): - # """ - # Convenience method to set up all the boilerplate for a local feature repo with a redis cluster. - # """ - # project_id = "test" + "".join( - # random.choice(string.ascii_lowercase + string.digits) for _ in range(10) - # ) - - # with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: - - # repo_path = Path(repo_dir_name) - # data_path = Path(data_dir_name) - - # repo_config = repo_path / "feature_store.yaml" - - # repo_config.write_text( - # dedent( - # f""" - # project: {project_id} - # registry: {data_path / "registry.db"} - # provider: local - # online_store: - # type: redis - # redis_type: redis_cluster - # connection_string: "127.0.0.1:6000,127.0.0.1:6001,127.0.0.1:6002" - # """ - # ) - # ) - # repo_example = repo_path / "example.py" - # repo_example.write_text(example_repo_py) - - # result = self.run(["apply"], cwd=repo_path) - # assert result.returncode == 0 - - # yield FeatureStore(repo_path=str(repo_path), config=None) - - # result = self.run(["teardown"], cwd=repo_path) - # assert result.returncode == 0 From 6b8209867e52dc9db6e9cb9333625c671a11b248 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 10:42:37 -0800 Subject: [PATCH 15/35] fix error Signed-off-by: Kevin Zhang --- sdk/python/tests/conftest.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index d9a623af131..ba1d3b2cde9 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -22,9 +22,6 @@ import pandas as pd import pytest from _pytest.nodes import Item -from sdk.python.tests.integration.feature_repos.repo_configuration import ( - REDIS_CLUSTER_CONFIG, -) from feast import FeatureStore from tests.data.data_creator import create_dataset From 89c1da8e6f37e6003e74c13e9a4423ec4e00aa97 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 10:47:07 -0800 Subject: [PATCH 16/35] Add setup Signed-off-by: Kevin Zhang --- .github/workflows/pr_integration_tests.yml | 5 +++++ .github/workflows/unit_tests.yml | 4 ---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index e04b78ec320..925324e93c5 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -145,6 +145,11 @@ jobs: run: pip install pip-tools - name: Install dependencies run: make install-python-ci-dependencies + - name: Setup Redis Cluster + run: | + docker pull vishnunair/docker-redis-cluster:latest + docker run -d -p 6000:6379 -p 6001:6380 -p 6002:6381 -p 6003:6382 -p 6004:6383 -p 6005:6384 --name redis-cluster vishnunair/docker-redis-cluster + - name: Setup Redis Cluster - name: Test python if: ${{ always() }} # this will guarantee that step won't be canceled and resources won't leak env: diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index 7b79edae6bb..d9552e175e7 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -41,10 +41,6 @@ jobs: run: pip install pip-tools - name: Install dependencies run: make install-python-ci-dependencies - - name: Setup Redis Cluster - run: | - docker pull vishnunair/docker-redis-cluster:latest - docker run -d -p 6000:6379 -p 6001:6380 -p 6002:6381 -p 6003:6382 -p 6004:6383 -p 6005:6384 --name redis-cluster vishnunair/docker-redis-cluster - name: Test Python env: SNOWFLAKE_CI_DEPLOYMENT: ${{ secrets.SNOWFLAKE_CI_DEPLOYMENT }} From 0e9f1b3c71341067cb3c59652eab360bb5d07233 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 10:53:58 -0800 Subject: [PATCH 17/35] temp fix to get integration tests to work Signed-off-by: Kevin Zhang --- .github/workflows/pr_integration_tests.yml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index 925324e93c5..c68f3e63018 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -14,9 +14,6 @@ on: jobs: build-docker-image: - # all jobs MUST have this if check for 'ok-to-test' or 'approved' for security purposes. - if: (github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'ok-to-test')) - || (github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved'))) runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 @@ -149,7 +146,6 @@ jobs: run: | docker pull vishnunair/docker-redis-cluster:latest docker run -d -p 6000:6379 -p 6001:6380 -p 6002:6381 -p 6003:6382 -p 6004:6383 -p 6005:6384 --name redis-cluster vishnunair/docker-redis-cluster - - name: Setup Redis Cluster - name: Test python if: ${{ always() }} # this will guarantee that step won't be canceled and resources won't leak env: From ca3aa7bf0ed8228750357062ad1a4fe9832a7c66 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 10:57:03 -0800 Subject: [PATCH 18/35] temp fix to get integration tests to work Signed-off-by: Kevin Zhang --- .github/workflows/pr_integration_tests.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index c68f3e63018..31b7995573a 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -68,9 +68,6 @@ jobs: outputs: DOCKER_IMAGE_TAG: ${{ steps.image-tag.outputs.DOCKER_IMAGE_TAG }} integration-test-python: - # all jobs MUST have this if check for 'ok-to-test' or 'approved' for security purposes. - if: (github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'ok-to-test')) - || (github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved'))) needs: build-docker-image runs-on: ${{ matrix.os }} strategy: From 03f76f1a3776b2795687a7dde7d17234f72393d6 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 10:58:22 -0800 Subject: [PATCH 19/35] temp fix to get integration tests to work Signed-off-by: Kevin Zhang --- .github/workflows/pr_integration_tests.yml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index 31b7995573a..a3ffefc80eb 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -1,12 +1,5 @@ name: pr-integration-tests -on: - pull_request_target: - types: - - opened - - synchronize - - labeled - # concurrency is currently broken, see details https://github.com/actions/runner/issues/1532 #concurrency: # group: pr-integration-tests-${{ github.event.pull_request.number }} From 6aeb5f4b6cb492b838de8d9eb3e3d04df31afc41 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 11:00:20 -0800 Subject: [PATCH 20/35] temp fix to get integration tests to work Signed-off-by: Kevin Zhang --- .github/workflows/pr_integration_tests.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index a3ffefc80eb..df89c5b1dc5 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -1,5 +1,7 @@ name: pr-integration-tests +on: [push, pull_request] + # concurrency is currently broken, see details https://github.com/actions/runner/issues/1532 #concurrency: # group: pr-integration-tests-${{ github.event.pull_request.number }} From c32ed2175962c20780b3170630f770d60f953cb0 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 11:53:50 -0800 Subject: [PATCH 21/35] Fix integration even more Signed-off-by: Kevin Zhang --- .github/workflows/pr_integration_tests.yml | 78 +------------------ .../feature_repos/repo_configuration.py | 64 +++++++-------- 2 files changed, 33 insertions(+), 109 deletions(-) diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index df89c5b1dc5..54ec39c3803 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -8,62 +8,7 @@ on: [push, pull_request] # cancel-in-progress: true jobs: - build-docker-image: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - with: - # pull_request_target runs the workflow in the context of the base repo - # as such actions/checkout needs to be explicit configured to retrieve - # code from the PR. - ref: refs/pull/${{ github.event.pull_request.number }}/merge - submodules: recursive - - name: Set up QEMU - uses: docker/setup-qemu-action@v1 - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v1 - - name: Set up AWS SDK - uses: aws-actions/configure-aws-credentials@v1 - with: - aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} - aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - aws-region: us-west-2 - - name: Login to Amazon ECR - id: login-ecr - uses: aws-actions/amazon-ecr-login@v1 - - name: Set ECR image tag - id: image-tag - run: echo "::set-output name=DOCKER_IMAGE_TAG::`git rev-parse HEAD`" - - name: Cache Public ECR Image - id: lambda_python_3_9 - uses: actions/cache@v2 - with: - path: ~/cache - key: lambda_python_3_9 - - name: Handle Cache Miss (pull public ECR image & save it to tar file) - if: steps.cache-primes.outputs.cache-hit != 'true' - run: | - mkdir -p ~/cache - docker pull public.ecr.aws/lambda/python:3.9 - docker save public.ecr.aws/lambda/python:3.9 -o ~/cache/lambda_python_3_9.tar - - name: Handle Cache Hit (load docker image from tar file) - if: steps.cache-primes.outputs.cache-hit == 'true' - run: | - docker load -i ~/cache/lambda_python_3_9.tar - - name: Build and push - env: - ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} - ECR_REPOSITORY: feast-python-server - run: | - docker build \ - --file sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile \ - --tag $ECR_REGISTRY/$ECR_REPOSITORY:${{ steps.image-tag.outputs.DOCKER_IMAGE_TAG }} \ - . - docker push $ECR_REGISTRY/$ECR_REPOSITORY:${{ steps.image-tag.outputs.DOCKER_IMAGE_TAG }} - outputs: - DOCKER_IMAGE_TAG: ${{ steps.image-tag.outputs.DOCKER_IMAGE_TAG }} integration-test-python: - needs: build-docker-image runs-on: ${{ matrix.os }} strategy: fail-fast: false @@ -89,7 +34,7 @@ jobs: # pull_request_target runs the workflow in the context of the base repo # as such actions/checkout needs to be explicit configured to retrieve # code from the PR. - ref: refs/pull/${{ github.event.pull_request.number }}/merge + ref: refs/pull/2/merge submodules: recursive - name: Setup Python uses: actions/setup-python@v2 @@ -97,22 +42,7 @@ jobs: with: python-version: ${{ matrix.python-version }} architecture: x64 - - name: Set up gcloud SDK - uses: google-github-actions/setup-gcloud@v0 - with: - project_id: ${{ secrets.GCP_PROJECT_ID }} - service_account_key: ${{ secrets.GCP_SA_KEY }} export_default_credentials: true - - name: Use gcloud CLI - run: gcloud info - - name: Set up AWS SDK - uses: aws-actions/configure-aws-credentials@v1 - with: - aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} - aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - aws-region: us-west-2 - - name: Use AWS CLI - run: aws sts get-caller-identity - name: Upgrade pip version run: | pip install --upgrade "pip>=21.3.1" @@ -141,14 +71,8 @@ jobs: - name: Test python if: ${{ always() }} # this will guarantee that step won't be canceled and resources won't leak env: - FEAST_SERVER_DOCKER_IMAGE_TAG: ${{ needs.build-docker-image.outputs.DOCKER_IMAGE_TAG }} FEAST_USAGE: "False" IS_TEST: "True" - SNOWFLAKE_CI_DEPLOYMENT: ${{ secrets.SNOWFLAKE_CI_DEPLOYMENT }} - SNOWFLAKE_CI_USER: ${{ secrets.SNOWFLAKE_CI_USER }} - SNOWFLAKE_CI_PASSWORD: ${{ secrets.SNOWFLAKE_CI_PASSWORD }} - SNOWFLAKE_CI_ROLE: ${{ secrets.SNOWFLAKE_CI_ROLE }} - SNOWFLAKE_CI_WAREHOUSE: ${{ secrets.SNOWFLAKE_CI_WAREHOUSE }} run: pytest -n 8 --cov=./ --cov-report=xml --verbose --color=yes sdk/python/tests --integration --durations=5 - name: Upload coverage to Codecov uses: codecov/codecov-action@v1 diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index b0b86f84abc..9cc8e43d583 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -61,43 +61,43 @@ # module will be imported and FULL_REPO_CONFIGS will be extracted from the file. DEFAULT_FULL_REPO_CONFIGS: List[IntegrationTestRepoConfig] = [ # Local configurations - IntegrationTestRepoConfig(), - IntegrationTestRepoConfig(python_feature_server=True), + # IntegrationTestRepoConfig(), + # IntegrationTestRepoConfig(python_feature_server=True), + IntegrationTestRepoConfig(online_store=REDIS_CONFIG), + IntegrationTestRepoConfig(online_store=REDIS_CLUSTER_CONFIG), ] if os.getenv("FEAST_IS_LOCAL_TEST", "False") != "True": DEFAULT_FULL_REPO_CONFIGS.extend( [ - IntegrationTestRepoConfig(online_store=REDIS_CONFIG), - IntegrationTestRepoConfig(online_store=REDIS_CLUSTER_CONFIG), # GCP configurations - IntegrationTestRepoConfig( - provider="gcp", - offline_store_creator=BigQueryDataSourceCreator, - online_store="datastore", - ), - IntegrationTestRepoConfig( - provider="gcp", - offline_store_creator=BigQueryDataSourceCreator, - online_store=REDIS_CONFIG, - ), - # AWS configurations - IntegrationTestRepoConfig( - provider="aws", - offline_store_creator=RedshiftDataSourceCreator, - online_store=DYNAMO_CONFIG, - python_feature_server=True, - ), - IntegrationTestRepoConfig( - provider="aws", - offline_store_creator=RedshiftDataSourceCreator, - online_store=REDIS_CONFIG, - ), - # Snowflake configurations - IntegrationTestRepoConfig( - provider="aws", # no list features, no feature server - offline_store_creator=SnowflakeDataSourceCreator, - online_store=REDIS_CONFIG, - ), + # IntegrationTestRepoConfig( + # provider="gcp", + # offline_store_creator=BigQueryDataSourceCreator, + # online_store="datastore", + # ), + # IntegrationTestRepoConfig( + # provider="gcp", + # offline_store_creator=BigQueryDataSourceCreator, + # online_store=REDIS_CONFIG, + # ), + # # AWS configurations + # IntegrationTestRepoConfig( + # provider="aws", + # offline_store_creator=RedshiftDataSourceCreator, + # online_store=DYNAMO_CONFIG, + # python_feature_server=True, + # ), + # IntegrationTestRepoConfig( + # provider="aws", + # offline_store_creator=RedshiftDataSourceCreator, + # online_store=REDIS_CONFIG, + # ), + # # Snowflake configurations + # IntegrationTestRepoConfig( + # provider="aws", # no list features, no feature server + # offline_store_creator=SnowflakeDataSourceCreator, + # online_store=REDIS_CONFIG, + # ), ] ) full_repo_configs_module = os.environ.get(FULL_REPO_CONFIGS_MODULE_ENV_NAME) From 68c9b616343c2bf89455fe2afa01d6c94b8d1dfb Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 11:55:01 -0800 Subject: [PATCH 22/35] Fix integration even more Signed-off-by: Kevin Zhang --- .../feature_repos/repo_configuration.py | 58 +++++++++---------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 9cc8e43d583..da34be352ee 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -69,35 +69,35 @@ if os.getenv("FEAST_IS_LOCAL_TEST", "False") != "True": DEFAULT_FULL_REPO_CONFIGS.extend( [ - # GCP configurations - # IntegrationTestRepoConfig( - # provider="gcp", - # offline_store_creator=BigQueryDataSourceCreator, - # online_store="datastore", - # ), - # IntegrationTestRepoConfig( - # provider="gcp", - # offline_store_creator=BigQueryDataSourceCreator, - # online_store=REDIS_CONFIG, - # ), - # # AWS configurations - # IntegrationTestRepoConfig( - # provider="aws", - # offline_store_creator=RedshiftDataSourceCreator, - # online_store=DYNAMO_CONFIG, - # python_feature_server=True, - # ), - # IntegrationTestRepoConfig( - # provider="aws", - # offline_store_creator=RedshiftDataSourceCreator, - # online_store=REDIS_CONFIG, - # ), - # # Snowflake configurations - # IntegrationTestRepoConfig( - # provider="aws", # no list features, no feature server - # offline_store_creator=SnowflakeDataSourceCreator, - # online_store=REDIS_CONFIG, - # ), + GCP configurations + IntegrationTestRepoConfig( + provider="gcp", + offline_store_creator=BigQueryDataSourceCreator, + online_store="datastore", + ), + IntegrationTestRepoConfig( + provider="gcp", + offline_store_creator=BigQueryDataSourceCreator, + online_store=REDIS_CONFIG, + ), + # AWS configurations + IntegrationTestRepoConfig( + provider="aws", + offline_store_creator=RedshiftDataSourceCreator, + online_store=DYNAMO_CONFIG, + python_feature_server=True, + ), + IntegrationTestRepoConfig( + provider="aws", + offline_store_creator=RedshiftDataSourceCreator, + online_store=REDIS_CONFIG, + ), + # Snowflake configurations + IntegrationTestRepoConfig( + provider="aws", # no list features, no feature server + offline_store_creator=SnowflakeDataSourceCreator, + online_store=REDIS_CONFIG, + ), ] ) full_repo_configs_module = os.environ.get(FULL_REPO_CONFIGS_MODULE_ENV_NAME) From caeb7570ee31bf79f2f48989cd892a324f0fe973 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 11:55:31 -0800 Subject: [PATCH 23/35] Fix lint Signed-off-by: Kevin Zhang --- .../tests/integration/feature_repos/repo_configuration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index da34be352ee..f6fd05c58de 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -69,7 +69,7 @@ if os.getenv("FEAST_IS_LOCAL_TEST", "False") != "True": DEFAULT_FULL_REPO_CONFIGS.extend( [ - GCP configurations + # GCP configurations IntegrationTestRepoConfig( provider="gcp", offline_store_creator=BigQueryDataSourceCreator, From 8b9f2f156a8181959f409da435385201156965f9 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 11:58:18 -0800 Subject: [PATCH 24/35] only run one test Signed-off-by: Kevin Zhang --- .github/workflows/pr_integration_tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index 54ec39c3803..283b7ba3c1b 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -73,7 +73,7 @@ jobs: env: FEAST_USAGE: "False" IS_TEST: "True" - run: pytest -n 8 --cov=./ --cov-report=xml --verbose --color=yes sdk/python/tests --integration --durations=5 + run: pytest -n 8 --cov=./ --cov-report=xml --verbose --color=yes sdk/python/tests/integration/online_store/test_universal_online.py --integration --durations=5 - name: Upload coverage to Codecov uses: codecov/codecov-action@v1 with: From 1f060e09e3e8cad4f88b85fa756dd8324c844094 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 14:20:16 -0800 Subject: [PATCH 25/35] Do some more integration testing Signed-off-by: Kevin Zhang --- .github/workflows/pr_integration_tests.yml | 2 +- .../tests/integration/feature_repos/repo_configuration.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index 283b7ba3c1b..4fa6ca1f61e 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -67,7 +67,7 @@ jobs: - name: Setup Redis Cluster run: | docker pull vishnunair/docker-redis-cluster:latest - docker run -d -p 6000:6379 -p 6001:6380 -p 6002:6381 -p 6003:6382 -p 6004:6383 -p 6005:6384 --name redis-cluster vishnunair/docker-redis-cluster + docker run -d -p 6001:6379 -p 6002:6380 -p 6003:6381 -p 6004:6382 -p 6005:6383 -p 6006:6384 --name redis-cluster vishnunair/docker-redis-cluster - name: Test python if: ${{ always() }} # this will guarantee that step won't be canceled and resources won't leak env: diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index f6fd05c58de..c464b9c3a2e 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -49,7 +49,7 @@ REDIS_CLUSTER_CONFIG = { "type": "redis", "redis_type": "redis_cluster", - "connection_string": "127.0.0.1:6000,127.0.0.1:6001,127.0.0.1:6002", + "connection_string": "127.0.0.1:6001,127.0.0.1:6002,127.0.0.1:6003", } # FULL_REPO_CONFIGS contains the repo configurations (e.g. provider, offline store, From 931dc234812a5ea6ef33188a4a483666886e34c3 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 14:25:13 -0800 Subject: [PATCH 26/35] Do some more integration testing by adding bug to make sure no false positives Signed-off-by: Kevin Zhang --- .../tests/integration/feature_repos/repo_configuration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index c464b9c3a2e..0f981f46202 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -49,7 +49,7 @@ REDIS_CLUSTER_CONFIG = { "type": "redis", "redis_type": "redis_cluster", - "connection_string": "127.0.0.1:6001,127.0.0.1:6002,127.0.0.1:6003", + "connection_string": "127.0.0.1:8001,127.0.0.1:8002,127.0.0.1:8003", } # FULL_REPO_CONFIGS contains the repo configurations (e.g. provider, offline store, From 41f7c756e7cc5e82f44d21aef78795c6fadc6d49 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 14:30:04 -0800 Subject: [PATCH 27/35] Integration testing works Signed-off-by: Kevin Zhang --- .../tests/integration/feature_repos/repo_configuration.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 0f981f46202..80795c8fe45 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -49,7 +49,8 @@ REDIS_CLUSTER_CONFIG = { "type": "redis", "redis_type": "redis_cluster", - "connection_string": "127.0.0.1:8001,127.0.0.1:8002,127.0.0.1:8003", + # Redis Cluster Port Forwarding is setup in "pr_integration_tests.yaml" under "Setup Redis Cluster". + "connection_string": "127.0.0.1:6001,127.0.0.1:6002,127.0.0.1:6003", } # FULL_REPO_CONFIGS contains the repo configurations (e.g. provider, offline store, From 174af8ee4c4eedb897a8625bb38fccca6d25e336 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 14:36:17 -0800 Subject: [PATCH 28/35] Clean upu code Signed-off-by: Kevin Zhang --- .../feature_repos/repo_configuration.py | 9 +-- .../online_store/test_e2e_local.py | 63 ------------------- .../online_store/test_universal_online.py | 2 - 3 files changed, 5 insertions(+), 69 deletions(-) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 80795c8fe45..976c807acb2 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -62,14 +62,15 @@ # module will be imported and FULL_REPO_CONFIGS will be extracted from the file. DEFAULT_FULL_REPO_CONFIGS: List[IntegrationTestRepoConfig] = [ # Local configurations - # IntegrationTestRepoConfig(), - # IntegrationTestRepoConfig(python_feature_server=True), - IntegrationTestRepoConfig(online_store=REDIS_CONFIG), - IntegrationTestRepoConfig(online_store=REDIS_CLUSTER_CONFIG), + IntegrationTestRepoConfig(), + IntegrationTestRepoConfig(python_feature_server=True), ] if os.getenv("FEAST_IS_LOCAL_TEST", "False") != "True": DEFAULT_FULL_REPO_CONFIGS.extend( [ + # Redis configurations + IntegrationTestRepoConfig(online_store=REDIS_CONFIG), + IntegrationTestRepoConfig(online_store=REDIS_CLUSTER_CONFIG), # GCP configurations IntegrationTestRepoConfig( provider="gcp", diff --git a/sdk/python/tests/integration/online_store/test_e2e_local.py b/sdk/python/tests/integration/online_store/test_e2e_local.py index 78bdff8ac5b..79902273440 100644 --- a/sdk/python/tests/integration/online_store/test_e2e_local.py +++ b/sdk/python/tests/integration/online_store/test_e2e_local.py @@ -150,66 +150,3 @@ def test_e2e_local() -> None: assert returncode != 0 assert "feast.errors.FeastJoinKeysDuringMaterialization" in str(output) - - -# def test_redis_cluster_e2e_local() -> None: -# """ -# A more comprehensive than "basic" test, using local provider and redis cluster as the online store. - -# 1. Create a repo. -# 2. Apply -# 3. Ingest some data to online store from parquet -# 4. Read from the online store to make sure it made it there. -# """ - -# runner = CliRunner() -# with tempfile.TemporaryDirectory() as data_dir: - -# # Generate some test data in parquet format. -# end_date = datetime.now().replace(microsecond=0, second=0, minute=0) -# start_date = end_date - timedelta(days=15) - -# driver_entities = [1001, 1002, 1003, 1004, 1005] -# driver_df = driver_data.create_driver_hourly_stats_df( -# driver_entities, start_date, end_date -# ) -# driver_stats_path = os.path.join(data_dir, "driver_stats.parquet") -# driver_df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True) - -# global_df = driver_data.create_global_daily_stats_df(start_date, end_date) -# global_stats_path = os.path.join(data_dir, "global_stats.parquet") -# global_df.to_parquet(path=global_stats_path, allow_truncated_timestamps=True) - -# # Note that runner takes care of running apply/teardown for us here. -# # We patch python code in example_feature_repo_2.py to set the path to Parquet files. -# with runner.local_redis_repo( -# get_example_repo("example_feature_repo_2.py") -# .replace("%PARQUET_PATH%", driver_stats_path) -# .replace("%PARQUET_PATH_GLOBAL%", global_stats_path) -# ) as store: - -# assert store.repo_path is not None - -# # feast materialize -# r = runner.run( -# [ -# "materialize", -# start_date.isoformat(), -# (end_date - timedelta(days=7)).isoformat(), -# ], -# cwd=Path(store.repo_path), -# ) - -# assert r.returncode == 0 - -# _assert_online_features(store, driver_df, end_date - timedelta(days=7)) - -# # feast materialize-incremental -# r = runner.run( -# ["materialize-incremental", end_date.isoformat()], -# cwd=Path(store.repo_path), -# ) - -# assert r.returncode == 0 - -# _assert_online_features(store, driver_df, end_date) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index a56abaf871c..748d8ff91ff 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -137,8 +137,6 @@ def test_write_to_online_store_event_check(local_redis_environment): assert df["string_col"].iloc[1] == "LATEST_VALUE2" assert df["string_col"].iloc[2] == "LATEST_VALUE3" - -# TODO: make this work with all universal (all online store types) @pytest.mark.integration def test_write_to_online_store_event_check_with_redis_cluster( local_redis_cluster_environment, From d818c0af82bd13101d165ac74b5a5f1bb1fa2425 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 14:49:48 -0800 Subject: [PATCH 29/35] Add redis cluster script for starting a redis cluster Signed-off-by: Kevin Zhang --- scripts/create-cluster | 109 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100755 scripts/create-cluster diff --git a/scripts/create-cluster b/scripts/create-cluster new file mode 100755 index 00000000000..8ec3945c0aa --- /dev/null +++ b/scripts/create-cluster @@ -0,0 +1,109 @@ +# Settings +# Make sure you run "brew install redis" +# Assumes you are running a Mac OS environment +BIN_PATH="/opt/homebrew/bin" +CLUSTER_HOST=127.0.0.1 +PORT=6000 +TIMEOUT=2000 +# Creates a cluster at ports 6001-6006 with 3 masters 6001-6003 and 3 slaves 6004-6006 +NODES=6 +REPLICAS=1 +PROTECTED_MODE=yes +ADDITIONAL_OPTIONS="" + +if [ -a config.sh ] +then + source "config.sh" +fi + +# Computed vars +ENDPORT=$((PORT+NODES)) + +if [ "$1" == "start" ] +then + while [ $((PORT < ENDPORT)) != "0" ]; do + PORT=$((PORT+1)) + echo "Starting $PORT" + $BIN_PATH/redis-server --port $PORT --protected-mode $PROTECTED_MODE --cluster-enabled yes --cluster-config-file nodes-${PORT}.conf --cluster-node-timeout $TIMEOUT --appendonly yes --appendfilename appendonly-${PORT}.aof --dbfilename dump-${PORT}.rdb --logfile ${PORT}.log --daemonize yes ${ADDITIONAL_OPTIONS} + done + exit 0 +fi + +if [ "$1" == "create" ] +then + HOSTS="" + while [ $((PORT < ENDPORT)) != "0" ]; do + PORT=$((PORT+1)) + HOSTS="$HOSTS $CLUSTER_HOST:$PORT" + done + OPT_ARG="" + if [ "$2" == "-f" ]; then + OPT_ARG="--cluster-yes" + fi + $BIN_PATH/redis-cli --cluster create $HOSTS --cluster-replicas $REPLICAS $OPT_ARG + exit 0 +fi + +if [ "$1" == "stop" ] +then + while [ $((PORT < ENDPORT)) != "0" ]; do + PORT=$((PORT+1)) + echo "Stopping $PORT" + $BIN_PATH/redis-cli -p $PORT shutdown nosave + done + exit 0 +fi + +if [ "$1" == "watch" ] +then + PORT=$((PORT+1)) + while [ 1 ]; do + clear + date + $BIN_PATH/redis-cli -p $PORT cluster nodes | head -30 + sleep 1 + done + exit 0 +fi + +if [ "$1" == "tail" ] +then + INSTANCE=$2 + PORT=$((PORT+INSTANCE)) + tail -f ${PORT}.log + exit 0 +fi + +if [ "$1" == "tailall" ] +then + tail -f *.log + exit 0 +fi + +if [ "$1" == "clean" ] +then + echo "Cleaning *.log" + rm -rf *.log + echo "Cleaning appendonly-*" + rm -rf appendonly-* + echo "Cleaning dump-*.rdb" + rm -rf dump-*.rdb + echo "Cleaning nodes-*.conf" + rm -rf nodes-*.conf + exit 0 +fi + +if [ "$1" == "clean-logs" ] +then + echo "Cleaning *.log" + rm -rf *.log + exit 0 +fi + +echo "Usage: $0 [start|create|stop|watch|clean|clean-logs|call]" +echo "start -- Launch Redis Cluster instances." +echo "create [-f] -- Create a cluster using redis-cli --cluster create." +echo "stop -- Stop Redis Cluster instances." +echo "watch -- Show CLUSTER NODES output (first 30 lines) of first node." +echo "clean -- Remove all instances data, logs, configs." +echo "clean-logs -- Remove just instances logs." From b2903f8bd2d7a25035621a3fb6489195ba287353 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 15:00:37 -0800 Subject: [PATCH 30/35] Reset integration yml file Signed-off-by: Kevin Zhang --- .github/workflows/pr_integration_tests.yml | 95 +++++++++++++++++++++- 1 file changed, 91 insertions(+), 4 deletions(-) diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index 4fa6ca1f61e..71fe6fc227d 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -1,6 +1,11 @@ name: pr-integration-tests -on: [push, pull_request] +on: + pull_request_target: + types: + - opened + - synchronize + - labeled # concurrency is currently broken, see details https://github.com/actions/runner/issues/1532 #concurrency: @@ -8,7 +13,68 @@ on: [push, pull_request] # cancel-in-progress: true jobs: + build-docker-image: + # all jobs MUST have this if check for 'ok-to-test' or 'approved' for security purposes. + if: (github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'ok-to-test')) + || (github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved'))) + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + # pull_request_target runs the workflow in the context of the base repo + # as such actions/checkout needs to be explicit configured to retrieve + # code from the PR. + ref: refs/pull/${{ github.event.pull_request.number }}/merge + submodules: recursive + - name: Set up QEMU + uses: docker/setup-qemu-action@v1 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v1 + - name: Set up AWS SDK + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: us-west-2 + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + - name: Set ECR image tag + id: image-tag + run: echo "::set-output name=DOCKER_IMAGE_TAG::`git rev-parse HEAD`" + - name: Cache Public ECR Image + id: lambda_python_3_9 + uses: actions/cache@v2 + with: + path: ~/cache + key: lambda_python_3_9 + - name: Handle Cache Miss (pull public ECR image & save it to tar file) + if: steps.cache-primes.outputs.cache-hit != 'true' + run: | + mkdir -p ~/cache + docker pull public.ecr.aws/lambda/python:3.9 + docker save public.ecr.aws/lambda/python:3.9 -o ~/cache/lambda_python_3_9.tar + - name: Handle Cache Hit (load docker image from tar file) + if: steps.cache-primes.outputs.cache-hit == 'true' + run: | + docker load -i ~/cache/lambda_python_3_9.tar + - name: Build and push + env: + ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} + ECR_REPOSITORY: feast-python-server + run: | + docker build \ + --file sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile \ + --tag $ECR_REGISTRY/$ECR_REPOSITORY:${{ steps.image-tag.outputs.DOCKER_IMAGE_TAG }} \ + . + docker push $ECR_REGISTRY/$ECR_REPOSITORY:${{ steps.image-tag.outputs.DOCKER_IMAGE_TAG }} + outputs: + DOCKER_IMAGE_TAG: ${{ steps.image-tag.outputs.DOCKER_IMAGE_TAG }} integration-test-python: + # all jobs MUST have this if check for 'ok-to-test' or 'approved' for security purposes. + if: (github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'ok-to-test')) + || (github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved'))) + needs: build-docker-image runs-on: ${{ matrix.os }} strategy: fail-fast: false @@ -34,7 +100,7 @@ jobs: # pull_request_target runs the workflow in the context of the base repo # as such actions/checkout needs to be explicit configured to retrieve # code from the PR. - ref: refs/pull/2/merge + ref: refs/pull/${{ github.event.pull_request.number }}/merge submodules: recursive - name: Setup Python uses: actions/setup-python@v2 @@ -42,7 +108,22 @@ jobs: with: python-version: ${{ matrix.python-version }} architecture: x64 + - name: Set up gcloud SDK + uses: google-github-actions/setup-gcloud@v0 + with: + project_id: ${{ secrets.GCP_PROJECT_ID }} + service_account_key: ${{ secrets.GCP_SA_KEY }} export_default_credentials: true + - name: Use gcloud CLI + run: gcloud info + - name: Set up AWS SDK + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: us-west-2 + - name: Use AWS CLI + run: aws sts get-caller-identity - name: Upgrade pip version run: | pip install --upgrade "pip>=21.3.1" @@ -71,9 +152,15 @@ jobs: - name: Test python if: ${{ always() }} # this will guarantee that step won't be canceled and resources won't leak env: + FEAST_SERVER_DOCKER_IMAGE_TAG: ${{ needs.build-docker-image.outputs.DOCKER_IMAGE_TAG }} FEAST_USAGE: "False" IS_TEST: "True" - run: pytest -n 8 --cov=./ --cov-report=xml --verbose --color=yes sdk/python/tests/integration/online_store/test_universal_online.py --integration --durations=5 + SNOWFLAKE_CI_DEPLOYMENT: ${{ secrets.SNOWFLAKE_CI_DEPLOYMENT }} + SNOWFLAKE_CI_USER: ${{ secrets.SNOWFLAKE_CI_USER }} + SNOWFLAKE_CI_PASSWORD: ${{ secrets.SNOWFLAKE_CI_PASSWORD }} + SNOWFLAKE_CI_ROLE: ${{ secrets.SNOWFLAKE_CI_ROLE }} + SNOWFLAKE_CI_WAREHOUSE: ${{ secrets.SNOWFLAKE_CI_WAREHOUSE }} + run: pytest -n 8 --cov=./ --cov-report=xml --verbose --color=yes sdk/python/tests --integration --durations=5 - name: Upload coverage to Codecov uses: codecov/codecov-action@v1 with: @@ -82,4 +169,4 @@ jobs: flags: integrationtests env_vars: OS,PYTHON fail_ci_if_error: true - verbose: true + verbose: true \ No newline at end of file From 0b5dabd28312a5e3157c3601e421066d169dab3d Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 15:01:51 -0800 Subject: [PATCH 31/35] lint Signed-off-by: Kevin Zhang --- .../tests/integration/online_store/test_universal_online.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 748d8ff91ff..2e48770e47f 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -137,6 +137,7 @@ def test_write_to_online_store_event_check(local_redis_environment): assert df["string_col"].iloc[1] == "LATEST_VALUE2" assert df["string_col"].iloc[2] == "LATEST_VALUE3" + @pytest.mark.integration def test_write_to_online_store_event_check_with_redis_cluster( local_redis_cluster_environment, From 1a4a1553ed9f3b93db4299b230050d77707d114d Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 15:54:41 -0800 Subject: [PATCH 32/35] Clean up Signed-off-by: Kevin Zhang --- .github/workflows/pr_integration_tests.yml | 2 +- docs/how-to-guides/adding-or-reusing-tests.md | 26 +++++++++-- scripts/create-cluster | 44 +++++++------------ 3 files changed, 39 insertions(+), 33 deletions(-) diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index 71fe6fc227d..000f9e97286 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -169,4 +169,4 @@ jobs: flags: integrationtests env_vars: OS,PYTHON fail_ci_if_error: true - verbose: true \ No newline at end of file + verbose: true diff --git a/docs/how-to-guides/adding-or-reusing-tests.md b/docs/how-to-guides/adding-or-reusing-tests.md index 1730abe2096..21ba0214f56 100644 --- a/docs/how-to-guides/adding-or-reusing-tests.md +++ b/docs/how-to-guides/adding-or-reusing-tests.md @@ -79,7 +79,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n datasets["global"], datasets["entity"], ) - + # ... more test code customer_fv, driver_fv, driver_odfv, order_fv, global_fv = ( @@ -93,7 +93,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n feature_service = FeatureService( "convrate_plus100", features=[ - feature_views["driver"][["conv_rate"]], + feature_views["driver"][["conv_rate"]], feature_views["driver_odfv"] ], ) @@ -138,7 +138,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n assert_frame_equal( expected_df, actual_df_from_df_entities, check_dtype=False, ) - + # ... more test code ``` {% endtab %} @@ -186,6 +186,24 @@ def your_test(environment: Environment): your_fv = driver_feature_view(data_source) entity = driver(value_type=ValueType.UNKNOWN) fs.apply([fv, entity]) - + # ... run test ``` + +### Running your own redis cluster for testing + +* Install redis on your computer. If you are a mac user, you should be able to `brew install redis`. + * Running `redis-server --help` and `redis-cli --help` should show corresponding help menus. +* Run `cd scripts/create-cluster` and run `./create-cluster start` then `./create-cluster create` to start the server. You should see output that looks like this: +~~~~ +Starting 6001 +Starting 6002 +Starting 6003 +Starting 6004 +Starting 6005 +Starting 6006 +~~~~ +* You should be able to run the integration tests and have the redis cluster tests pass. +* If you would like to run your own redis cluster, you can run the above commands with your own specified ports and connect to the newly configured cluster. +* To stop the cluster, run `./create-cluster stop` and then `./create-cluster clean`. + diff --git a/scripts/create-cluster b/scripts/create-cluster index 8ec3945c0aa..06a1afd2322 100755 --- a/scripts/create-cluster +++ b/scripts/create-cluster @@ -1,11 +1,13 @@ # Settings # Make sure you run "brew install redis" -# Assumes you are running a Mac OS environment -BIN_PATH="/opt/homebrew/bin" + +# BIN_PATH="/opt/homebrew/bin" +REDIS_CLI=`which redis-cli` +REDIS_SERVER=`which redis-server` CLUSTER_HOST=127.0.0.1 -PORT=6000 -TIMEOUT=2000 # Creates a cluster at ports 6001-6006 with 3 masters 6001-6003 and 3 slaves 6004-6006 +PORT=${2:-6000} +TIMEOUT=2000 NODES=6 REPLICAS=1 PROTECTED_MODE=yes @@ -24,7 +26,7 @@ then while [ $((PORT < ENDPORT)) != "0" ]; do PORT=$((PORT+1)) echo "Starting $PORT" - $BIN_PATH/redis-server --port $PORT --protected-mode $PROTECTED_MODE --cluster-enabled yes --cluster-config-file nodes-${PORT}.conf --cluster-node-timeout $TIMEOUT --appendonly yes --appendfilename appendonly-${PORT}.aof --dbfilename dump-${PORT}.rdb --logfile ${PORT}.log --daemonize yes ${ADDITIONAL_OPTIONS} + $REDIS_SERVER --port $PORT --protected-mode $PROTECTED_MODE --cluster-enabled yes --cluster-config-file nodes-${PORT}.conf --cluster-node-timeout $TIMEOUT --appendonly yes --appendfilename appendonly-${PORT}.aof --dbfilename dump-${PORT}.rdb --logfile ${PORT}.log --daemonize yes ${ADDITIONAL_OPTIONS} done exit 0 fi @@ -40,7 +42,7 @@ then if [ "$2" == "-f" ]; then OPT_ARG="--cluster-yes" fi - $BIN_PATH/redis-cli --cluster create $HOSTS --cluster-replicas $REPLICAS $OPT_ARG + $REDIS_CLI --cluster create $HOSTS --cluster-replicas $REPLICAS $OPT_ARG exit 0 fi @@ -49,7 +51,7 @@ then while [ $((PORT < ENDPORT)) != "0" ]; do PORT=$((PORT+1)) echo "Stopping $PORT" - $BIN_PATH/redis-cli -p $PORT shutdown nosave + $REDIS_CLI -p $PORT shutdown nosave done exit 0 fi @@ -60,26 +62,12 @@ then while [ 1 ]; do clear date - $BIN_PATH/redis-cli -p $PORT cluster nodes | head -30 + $REDIS_CLI -p $PORT cluster nodes | head -30 sleep 1 done exit 0 fi -if [ "$1" == "tail" ] -then - INSTANCE=$2 - PORT=$((PORT+INSTANCE)) - tail -f ${PORT}.log - exit 0 -fi - -if [ "$1" == "tailall" ] -then - tail -f *.log - exit 0 -fi - if [ "$1" == "clean" ] then echo "Cleaning *.log" @@ -101,9 +89,9 @@ then fi echo "Usage: $0 [start|create|stop|watch|clean|clean-logs|call]" -echo "start -- Launch Redis Cluster instances." -echo "create [-f] -- Create a cluster using redis-cli --cluster create." -echo "stop -- Stop Redis Cluster instances." -echo "watch -- Show CLUSTER NODES output (first 30 lines) of first node." -echo "clean -- Remove all instances data, logs, configs." -echo "clean-logs -- Remove just instances logs." +echo "start [PORT] -- Launch Redis Cluster instances." +echo "create [PORT] [-f] -- Create a cluster using redis-cli --cluster create." +echo "stop [PORT] -- Stop Redis Cluster instances." +echo "watch [PORT] -- Show CLUSTER NODES output (first 30 lines) of first node." +echo "clean -- Remove all instances data, logs, configs." +echo "clean-logs -- Remove just instances logs." From 52f239d1bca7cbe88bb4998b4986e2473ffef1fd Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 15:58:46 -0800 Subject: [PATCH 33/35] Fix how to guide lint Signed-off-by: Kevin Zhang --- docs/how-to-guides/adding-or-reusing-tests.md | 4 ---- 1 file changed, 4 deletions(-) diff --git a/docs/how-to-guides/adding-or-reusing-tests.md b/docs/how-to-guides/adding-or-reusing-tests.md index 21ba0214f56..5a29342d6e1 100644 --- a/docs/how-to-guides/adding-or-reusing-tests.md +++ b/docs/how-to-guides/adding-or-reusing-tests.md @@ -79,7 +79,6 @@ def test_historical_features(environment, universal_data_sources, full_feature_n datasets["global"], datasets["entity"], ) - # ... more test code customer_fv, driver_fv, driver_odfv, order_fv, global_fv = ( @@ -112,7 +111,6 @@ def test_historical_features(environment, universal_data_sources, full_feature_n ] ) store.apply(feast_objects) - # ... more test code job_from_df = store.get_historical_features( @@ -132,13 +130,11 @@ def test_historical_features(environment, universal_data_sources, full_feature_n full_feature_names=full_feature_names, ) actual_df_from_df_entities = job_from_df.to_df() - # ... more test code assert_frame_equal( expected_df, actual_df_from_df_entities, check_dtype=False, ) - # ... more test code ``` {% endtab %} From a21e13f1b24c917235496c1f5a13c1cfc50dfb25 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 17:17:01 -0800 Subject: [PATCH 34/35] add fixtures and remove excess code Signed-off-by: Kevin Zhang --- sdk/python/tests/conftest.py | 18 +-- .../online_store/test_universal_online.py | 104 ------------------ 2 files changed, 2 insertions(+), 120 deletions(-) diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index ba1d3b2cde9..0593891f149 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -171,24 +171,10 @@ def cleanup(): return e -@pytest.fixture() +@pytest.fixture(scope="session", params=[REDIS_CONFIG, REDIS_CLUSTER_CONFIG], ids=[str(c) for c in [REDIS_CONFIG, REDIS_CLUSTER_CONFIG]]) def local_redis_environment(request, worker_id): e = construct_test_environment( - IntegrationTestRepoConfig(online_store=REDIS_CONFIG), worker_id=worker_id - ) - - def cleanup(): - e.feature_store.teardown() - - request.addfinalizer(cleanup) - return e - - -@pytest.fixture() -def local_redis_cluster_environment(request, worker_id): - e = construct_test_environment( - IntegrationTestRepoConfig(online_store=REDIS_CLUSTER_CONFIG), - worker_id=worker_id, + IntegrationTestRepoConfig(online_store=request.param), worker_id=worker_id ) def cleanup(): diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 2e48770e47f..7d6296baa51 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -138,110 +138,6 @@ def test_write_to_online_store_event_check(local_redis_environment): assert df["string_col"].iloc[2] == "LATEST_VALUE3" -@pytest.mark.integration -def test_write_to_online_store_event_check_with_redis_cluster( - local_redis_cluster_environment, -): - if os.getenv("FEAST_IS_LOCAL_TEST", "False") == "True": - return - fs = local_redis_cluster_environment.feature_store - - # write same data points 3 with different timestamps - now = pd.Timestamp(datetime.datetime.utcnow()).round("ms") - hour_ago = pd.Timestamp(datetime.datetime.utcnow() - timedelta(hours=1)).round("ms") - latest = pd.Timestamp(datetime.datetime.utcnow() + timedelta(seconds=1)).round("ms") - - data = { - "id": [123, 567, 890], - "string_col": ["OLD_FEATURE", "LATEST_VALUE2", "LATEST_VALUE3"], - "ts_1": [hour_ago, now, now], - } - dataframe_source = pd.DataFrame(data) - with prep_file_source( - df=dataframe_source, event_timestamp_column="ts_1" - ) as file_source: - e = Entity(name="id", value_type=ValueType.STRING) - - # Create Feature View - fv1 = FeatureView( - name="feature_view_123", - features=[Feature(name="string_col", dtype=ValueType.STRING)], - entities=["id"], - batch_source=file_source, - ttl=timedelta(minutes=5), - ) - # Register Feature View and Entity - fs.apply([fv1, e]) - - # data to ingest into Online Store (recent) - data = { - "id": [123], - "string_col": ["hi_123"], - "ts_1": [now], - } - df_data = pd.DataFrame(data) - - # directly ingest data into the Online Store - fs.write_to_online_store("feature_view_123", df_data) - - df = fs.get_online_features( - features=["feature_view_123:string_col"], entity_rows=[{"id": 123}] - ).to_df() - assert df["string_col"].iloc[0] == "hi_123" - - # data to ingest into Online Store (1 hour delayed data) - # should now overwrite features for id=123 because it's less recent data - data = { - "id": [123, 567, 890], - "string_col": ["bye_321", "hello_123", "greetings_321"], - "ts_1": [hour_ago, hour_ago, hour_ago], - } - df_data = pd.DataFrame(data) - - # directly ingest data into the Online Store - fs.write_to_online_store("feature_view_123", df_data) - - df = fs.get_online_features( - features=["feature_view_123:string_col"], - entity_rows=[{"id": 123}, {"id": 567}, {"id": 890}], - ).to_df() - assert df["string_col"].iloc[0] == "hi_123" - assert df["string_col"].iloc[1] == "hello_123" - assert df["string_col"].iloc[2] == "greetings_321" - - # should overwrite string_col for id=123 because it's most recent based on event_timestamp - data = { - "id": [123], - "string_col": ["LATEST_VALUE"], - "ts_1": [latest], - } - df_data = pd.DataFrame(data) - - fs.write_to_online_store("feature_view_123", df_data) - - df = fs.get_online_features( - features=["feature_view_123:string_col"], - entity_rows=[{"id": 123}, {"id": 567}, {"id": 890}], - ).to_df() - assert df["string_col"].iloc[0] == "LATEST_VALUE" - assert df["string_col"].iloc[1] == "hello_123" - assert df["string_col"].iloc[2] == "greetings_321" - - # writes to online store via datasource (dataframe_source) materialization - fs.materialize( - start_date=datetime.datetime.now() - timedelta(hours=12), - end_date=datetime.datetime.utcnow(), - ) - - df = fs.get_online_features( - features=["feature_view_123:string_col"], - entity_rows=[{"id": 123}, {"id": 567}, {"id": 890}], - ).to_df() - assert df["string_col"].iloc[0] == "LATEST_VALUE" - assert df["string_col"].iloc[1] == "LATEST_VALUE2" - assert df["string_col"].iloc[2] == "LATEST_VALUE3" - - @pytest.mark.integration @pytest.mark.universal def test_write_to_online_store(environment, universal_data_sources): From 9c59fccc04093c0e614199bff5faa5da722bc52f Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 16 Feb 2022 17:19:05 -0800 Subject: [PATCH 35/35] lint Signed-off-by: Kevin Zhang --- sdk/python/tests/conftest.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 0593891f149..624b610fe43 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -171,7 +171,11 @@ def cleanup(): return e -@pytest.fixture(scope="session", params=[REDIS_CONFIG, REDIS_CLUSTER_CONFIG], ids=[str(c) for c in [REDIS_CONFIG, REDIS_CLUSTER_CONFIG]]) +@pytest.fixture( + scope="session", + params=[REDIS_CONFIG, REDIS_CLUSTER_CONFIG], + ids=[str(c) for c in [REDIS_CONFIG, REDIS_CLUSTER_CONFIG]], +) def local_redis_environment(request, worker_id): e = construct_test_environment( IntegrationTestRepoConfig(online_store=request.param), worker_id=worker_id