diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 5239cfb474d..fb36440d91d 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -1,4 +1,6 @@ import contextlib +import sys +import urllib.parse from dataclasses import asdict from datetime import datetime, timezone from typing import ( @@ -18,6 +20,12 @@ import numpy as np import pandas as pd import pyarrow as pa + +try: + from adbc_driver_manager import OperationalError + from adbc_driver_postgresql.dbapi import connect +except ImportError: + pass from jinja2 import BaseLoader, Environment from psycopg import sql @@ -229,6 +237,63 @@ def pull_all_from_table_or_query( on_demand_feature_views=None, ) + if "adbc_driver_postgresql" in sys.modules: + + @staticmethod + def offline_write_batch( + config: RepoConfig, + feature_view: FeatureView, + table: pa.Table, + progress: Optional[Callable[[int], Any]], + ): + assert isinstance(config.offline_store, PostgreSQLOfflineStoreConfig) + assert isinstance(feature_view.batch_source, PostgreSQLSource) + + ( + pa_schema, + column_names, + ) = offline_utils.get_pyarrow_schema_from_batch_source( + config, feature_view.batch_source + ) + if column_names != table.column_names: + raise ValueError( + f"The input pyarrow table has schema {table.schema} with the incorrect columns {table.column_names}. " + f"The schema is expected to be {pa_schema} with the columns (in this exact order) to be {column_names}." + ) + + if table.schema != pa_schema: + table = table.cast(pa_schema) + + escaped_user = urllib.parse.quote_plus(config.offline_store.user) + escaped_password = urllib.parse.quote_plus(config.offline_store.password) + escaped_host = urllib.parse.quote_plus(config.offline_store.host) + port = config.offline_store.port + escaped_database = urllib.parse.quote_plus(config.offline_store.database) + + uri = f"postgresql://{escaped_user}:{escaped_password}@{escaped_host}:{port}/{escaped_database}" + with connect(uri) as conn: + try: + try: + with conn.cursor() as cur: + cur.adbc_ingest( + feature_view.batch_source.name, table, mode="append" + ) + except Exception: + conn.rollback() + else: + conn.commit() + except OperationalError: + # If the table does not exist, create it + try: + with conn.cursor() as cur: + cur.adbc_ingest( + feature_view.batch_source.name, table, mode="create" + ) + except Exception: + conn.rollback() + else: + conn.commit() + class PostgreSQLRetrievalJob(RetrievalJob): def __init__( diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index 8128eb094d6..f1c9bcfbca7 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -1,5 +1,13 @@ # This file was autogenerated by uv via the following command: # uv pip compile --system --no-strip-extras setup.py --extra ci --output-file sdk/python/requirements/py3.10-ci-requirements.txt +adal==1.2.7 + # via msrestazure +adbc-driver-manager==0.6.0 + # via feast (setup.py) +adbc-driver-postgresql==0.6.0 + # via feast (setup.py) +adlfs==0.5.9 + # via feast (setup.py) aiobotocore==2.13.1 aiohttp==3.9.5 # via aiobotocore diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index 58ec69fe2de..682866cd274 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -1,5 +1,13 @@ # This file was autogenerated by uv via the following command: # uv pip compile --system --no-strip-extras setup.py --extra ci --output-file sdk/python/requirements/py3.9-ci-requirements.txt +adal==1.2.7 + # via msrestazure +adbc-driver-manager==0.6.0 + # via feast (setup.py) +adbc-driver-postgresql==0.6.0 + # via feast (setup.py) +adlfs==0.5.9 + # via feast (setup.py) aiobotocore==2.13.1 aiohttp==3.9.5 # via aiobotocore diff --git a/setup.py b/setup.py index a9f9cafacc5..03c85e101e3 100644 --- a/setup.py +++ b/setup.py @@ -101,6 +101,8 @@ POSTGRES_REQUIRED = [ "psycopg[binary,pool]>=3.0.0,<4", + "adbc-driver-manager==0.6.0", + "adbc-driver-postgresql==0.6.0", ] OPENTELEMETRY = ["prometheus_client","psutil"]