Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import contextlib
import sys
import urllib.parse
from dataclasses import asdict
from datetime import datetime, timezone
from typing import (
Expand All @@ -18,6 +20,12 @@
import numpy as np
import pandas as pd
import pyarrow as pa

try:
from adbc_driver_manager import OperationalError
from adbc_driver_postgresql.dbapi import connect
except ImportError:
pass
from jinja2 import BaseLoader, Environment
from psycopg import sql

Expand Down Expand Up @@ -229,6 +237,63 @@ def pull_all_from_table_or_query(
on_demand_feature_views=None,
)

if "adbc_driver_postgresql" in sys.modules:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we instead add this to the config optionally?

@Atry Atry Aug 26, 2024

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@franciscojavierarceo What config?


@staticmethod
def offline_write_batch(
config: RepoConfig,
feature_view: FeatureView,
table: pa.Table,
progress: Optional[Callable[[int], Any]],
):
assert isinstance(config.offline_store, PostgreSQLOfflineStoreConfig)
assert isinstance(feature_view.batch_source, PostgreSQLSource)

(
pa_schema,
column_names,
) = offline_utils.get_pyarrow_schema_from_batch_source(
config, feature_view.batch_source
)
if column_names != table.column_names:
raise ValueError(
f"The input pyarrow table has schema {table.schema} with the incorrect columns {table.column_names}. "
f"The schema is expected to be {pa_schema} with the columns (in this exact order) to be {column_names}."
)

if table.schema != pa_schema:
table = table.cast(pa_schema)

escaped_user = urllib.parse.quote_plus(config.offline_store.user)
escaped_password = urllib.parse.quote_plus(config.offline_store.password)
escaped_host = urllib.parse.quote_plus(config.offline_store.host)
port = config.offline_store.port
escaped_database = urllib.parse.quote_plus(config.offline_store.database)

uri = f"postgresql://{escaped_user}:{escaped_password}@{escaped_host}:{port}/{escaped_database}"
with connect(uri) as conn:
try:
try:
with conn.cursor() as cur:
cur.adbc_ingest(
feature_view.batch_source.name, table, mode="append"
)
except Exception:
conn.rollback()
else:
conn.commit()
except OperationalError:
# If the table does not exist, create it
try:
with conn.cursor() as cur:
cur.adbc_ingest(
feature_view.batch_source.name, table, mode="create"
)
except Exception:
conn.rollback()
else:
conn.commit()


class PostgreSQLRetrievalJob(RetrievalJob):
def __init__(
Expand Down
8 changes: 8 additions & 0 deletions sdk/python/requirements/py3.10-ci-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# This file was autogenerated by uv via the following command:
# uv pip compile --system --no-strip-extras setup.py --extra ci --output-file sdk/python/requirements/py3.10-ci-requirements.txt
adal==1.2.7
# via msrestazure
adbc-driver-manager==0.6.0
# via feast (setup.py)
adbc-driver-postgresql==0.6.0
# via feast (setup.py)
adlfs==0.5.9
# via feast (setup.py)
aiobotocore==2.13.1
aiohttp==3.9.5
# via aiobotocore
Expand Down
8 changes: 8 additions & 0 deletions sdk/python/requirements/py3.9-ci-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# This file was autogenerated by uv via the following command:
# uv pip compile --system --no-strip-extras setup.py --extra ci --output-file sdk/python/requirements/py3.9-ci-requirements.txt
adal==1.2.7
# via msrestazure
adbc-driver-manager==0.6.0
# via feast (setup.py)
adbc-driver-postgresql==0.6.0
# via feast (setup.py)
adlfs==0.5.9
# via feast (setup.py)
aiobotocore==2.13.1
aiohttp==3.9.5
# via aiobotocore
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@

POSTGRES_REQUIRED = [
"psycopg[binary,pool]>=3.0.0,<4",
"adbc-driver-manager==0.6.0",
"adbc-driver-postgresql==0.6.0",
]

OPENTELEMETRY = ["prometheus_client","psutil"]
Expand Down