Skip to content
Merged
16 changes: 16 additions & 0 deletions google/cloud/bigquery/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,19 @@ def _generate_next_value_(name, start, count, last_values):
ROUNDING_MODE_UNSPECIFIED = enum.auto()
ROUND_HALF_AWAY_FROM_ZERO = enum.auto()
ROUND_HALF_EVEN = enum.auto()


class BigLakeFileFormat(object):
FILE_FORMAT_UNSPECIFIED = "FILE_FORMAT_UNSPECIFIED"
"""The default unspecified value."""

PARQUET = "PARQUET"
"""Apache Parquet format."""


class BigLakeTableFormat(object):
TABLE_FORMAT_UNSPECIFIED = "TABLE_FORMAT_UNSPECIFIED"
"""The default unspecified value."""

ICEBERG = "ICEBERG"
"""Apache Iceberg format."""
150 changes: 150 additions & 0 deletions google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ class Table(_TableBase):

_PROPERTY_TO_API_FIELD: Dict[str, Any] = {
**_TableBase._PROPERTY_TO_API_FIELD,
"biglake_configuration": "biglakeConfiguration",
"clustering_fields": "clustering",
"created": "creationTime",
"description": "description",
Expand Down Expand Up @@ -431,6 +432,29 @@ def __init__(self, table_ref, schema=None) -> None:

reference = property(_reference_getter)

@property
def biglake_configuration(self):
"""google.cloud.bigquery.table.BigLakeConfiguration: Configuration
for managed tables for Apache Iceberg.

See https://cloud.google.com/bigquery/docs/iceberg-tables for more information.
"""
prop = self._properties.get(
self._PROPERTY_TO_API_FIELD["biglake_configuration"]
)
if prop is not None:
prop = BigLakeConfiguration.from_api_repr(prop)
return prop

@biglake_configuration.setter
def biglake_configuration(self, value):
api_repr = value
if value is not None:
api_repr = value.to_api_repr()
self._properties[
self._PROPERTY_TO_API_FIELD["biglake_configuration"]
] = api_repr

@property
def require_partition_filter(self):
"""bool: If set to true, queries over the partitioned table require a
Expand Down Expand Up @@ -3501,6 +3525,132 @@ def to_api_repr(self) -> Dict[str, Any]:
return resource


class BigLakeConfiguration(object):
"""Configuration for managed tables for Apache Iceberg, formerly
known as BigLake.

Args:
connection_id (Optional[str]):
The connection specifying the credentials to be used to read and write to external
storage, such as Cloud Storage. The connection_id can have the form
``{project}.{location}.{connection_id}`` or
``projects/{project}/locations/{location}/connections/{connection_id}``.
storage_uri (Optional[str]):
The fully qualified location prefix of the external folder where table data is
stored. The '*' wildcard character is not allowed. The URI should be in the
format ``gs://bucket/path_to_table/``.
file_format (Optional[str]):
The file format the table data is stored in. See BigLakeFileFormat for available
values.
table_format (Optional[str]):
The table format the metadata only snapshots are stored in. See BigLakeTableFormat
for available values.
_properties (Optional[dict]):
Private. Used to construct object from API resource.
"""

def __init__(
self,
connection_id: Optional[str] = None,
storage_uri: Optional[str] = None,
file_format: Optional[str] = None,
table_format: Optional[str] = None,
_properties: Optional[dict] = None,
) -> None:
if _properties is None:
_properties = {}
self._properties = _properties
if connection_id is not None:
self.connection_id = connection_id
if storage_uri is not None:
self.storage_uri = storage_uri
if file_format is not None:
self.file_format = file_format
if table_format is not None:
self.table_format = table_format

@property
def connection_id(self) -> Optional[str]:
"""str: The connection specifying the credentials to be used to read and write to external
storage, such as Cloud Storage."""
return self._properties.get("connectionId")

@connection_id.setter
def connection_id(self, value: Optional[str]):
self._properties["connectionId"] = value

@property
def storage_uri(self) -> Optional[str]:
"""str: The fully qualified location prefix of the external folder where table data is
stored."""
return self._properties.get("storageUri")

@storage_uri.setter
def storage_uri(self, value: Optional[str]):
self._properties["storageUri"] = value

@property
def file_format(self) -> Optional[str]:
"""str: The file format the table data is stored in. See BigLakeFileFormat for available
values."""
return self._properties.get("fileFormat")

@file_format.setter
def file_format(self, value: Optional[str]):
self._properties["fileFormat"] = value

@property
def table_format(self) -> Optional[str]:
"""str: The table format the metadata only snapshots are stored in. See BigLakeTableFormat
for available values."""
return self._properties.get("tableFormat")

@table_format.setter
def table_format(self, value: Optional[str]):
self._properties["tableFormat"] = value

def _key(self):
return tuple(sorted(self._properties.items()))

def __eq__(self, other):
if not isinstance(other, BigLakeConfiguration):
return NotImplemented
return self._key() == other._key()

def __ne__(self, other):
return not self == other

def __hash__(self):
return hash(self._key())

def __repr__(self):
key_vals = ["{}={}".format(key, val) for key, val in self._key()]
return "BigLakeConfiguration({})".format(",".join(key_vals))

@classmethod
def from_api_repr(cls, resource: Dict[str, Any]) -> "BigLakeConfiguration":
"""Factory: construct a BigLakeConfiguration given its API representation.

Args:
resource:
BigLakeConfiguration representation returned from the API

Returns:
BigLakeConfiguration parsed from ``resource``.
"""
ref = cls()
ref._properties = resource
return ref

def to_api_repr(self) -> Dict[str, Any]:
"""Construct the API resource representation of this BigLakeConfiguration.

Returns:
BigLakeConfiguration represented as an API resource.
"""
return copy.deepcopy(self._properties)


def _item_to_row(iterator, resource):
"""Convert a JSON row to the native object.

Expand Down
160 changes: 160 additions & 0 deletions tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,12 @@ def _make_resource(self):
"sourceFormat": "CSV",
"csvOptions": {"allowJaggedRows": True, "encoding": "encoding"},
},
"biglakeConfiguration": {
"connectionId": "connection",
"storageUri": "uri",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
},
"labels": {"x": "y"},
}

Expand Down Expand Up @@ -521,6 +527,15 @@ def _verifyResourceProperties(self, table, resource):
else:
self.assertIsNone(table.encryption_configuration)

if "biglakeConfiguration" in resource:
self.assertIsNotNone(table.biglake_configuration)
self.assertEqual(table.biglake_configuration.connection_id, "connection")
self.assertEqual(table.biglake_configuration.storage_uri, "uri")
self.assertEqual(table.biglake_configuration.file_format, "PARQUET")
self.assertEqual(table.biglake_configuration.table_format, "ICEBERG")
else:
self.assertIsNone(table.biglake_configuration)

def test_ctor(self):
dataset = DatasetReference(self.PROJECT, self.DS_ID)
table_ref = dataset.table(self.TABLE_NAME)
Expand Down Expand Up @@ -893,6 +908,60 @@ def test_table_constraints_property_getter(self):
assert isinstance(table_constraints, TableConstraints)
assert table_constraints.primary_key == PrimaryKey(columns=["id"])

def test_biglake_configuration_not_set(self):
dataset = DatasetReference(self.PROJECT, self.DS_ID)
table_ref = dataset.table(self.TABLE_NAME)
table = self._make_one(table_ref)

assert table.biglake_configuration is None

def test_biglake_configuration_set(self):
from google.cloud.bigquery.table import BigLakeConfiguration

dataset = DatasetReference(self.PROJECT, self.DS_ID)
table_ref = dataset.table(self.TABLE_NAME)
table = self._make_one(table_ref)

table._properties["biglakeConfiguration"] = {
"connectionId": "connection",
"storageUri": "uri",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}

config = table.biglake_configuration

assert isinstance(config, BigLakeConfiguration)
assert config.connection_id == "connection"
assert config.storage_uri == "uri"
assert config.file_format == "PARQUET"
assert config.table_format == "ICEBERG"

def test_biglake_configuration_property_setter(self):
from google.cloud.bigquery.table import BigLakeConfiguration

dataset = DatasetReference(self.PROJECT, self.DS_ID)
table_ref = dataset.table(self.TABLE_NAME)
table = self._make_one(table_ref)

config = BigLakeConfiguration(
connection_id="connection",
storage_uri="uri",
file_format="PARQUET",
table_format="ICEBERG",
)
table.biglake_configuration = config

assert table._properties["biglakeConfiguration"] == {
"connectionId": "connection",
"storageUri": "uri",
"fileFormat": "PARQUET",
"tableFormat": "ICEBERG",
}

table.biglake_configuration = None
assert table.biglake_configuration is None

def test_table_constraints_property_setter(self):
from google.cloud.bigquery.table import (
ColumnReference,
Expand Down Expand Up @@ -2166,6 +2235,97 @@ def test_ctor_full_resource(self):
assert instance.snapshot_time == expected_time


class TestBigLakeConfiguration(unittest.TestCase):
@staticmethod
def _get_target_class():
from google.cloud.bigquery.table import BigLakeConfiguration

return BigLakeConfiguration

@classmethod
def _make_one(cls, *args, **kwargs):
klass = cls._get_target_class()
return klass(*args, **kwargs)

def test_ctor_empty_resource(self):
instance = self._make_one()
self.assertIsNone(instance.connection_id)
self.assertIsNone(instance.storage_uri)
self.assertIsNone(instance.file_format)
self.assertIsNone(instance.table_format)

def test_ctor_kwargs(self):
instance = self._make_one(
connection_id="conn",
storage_uri="uri",
file_format="FILE",
table_format="TABLE",
)
self.assertEqual(instance.connection_id, "conn")
self.assertEqual(instance.storage_uri, "uri")
self.assertEqual(instance.file_format, "FILE")
self.assertEqual(instance.table_format, "TABLE")

def test_ctor_full_resource(self):
resource = {
"connectionId": "conn",
"storageUri": "uri",
"fileFormat": "FILE",
"tableFormat": "TABLE",
}
instance = self._make_one(_properties=resource)
self.assertEqual(instance.connection_id, "conn")
self.assertEqual(instance.storage_uri, "uri")
self.assertEqual(instance.file_format, "FILE")
self.assertEqual(instance.table_format, "TABLE")

def test_to_api_repr(self):
resource = {
"connectionId": "conn",
"storageUri": "uri",
"fileFormat": "FILE",
"tableFormat": "TABLE",
}
instance = self._make_one(_properties=resource)
self.assertEqual(instance.to_api_repr(), resource)

def test_from_api_repr_partial(self):
klass = self._get_target_class()
api_repr = {"fileFormat": "FILE"}
instance = klass.from_api_repr(api_repr)

self.assertIsNone(instance.connection_id)
self.assertIsNone(instance.storage_uri)
self.assertEqual(instance.file_format, "FILE")
self.assertIsNone(instance.table_format)

def test_comparisons(self):
resource = {
"connectionId": "conn",
"storageUri": "uri",
"fileFormat": "FILE",
"tableFormat": "TABLE",
}

first = self._make_one(_properties=resource)
second = self._make_one(_properties=copy.deepcopy(resource))
# Exercise comparator overloads.
# first and second should be equivalent.
self.assertNotEqual(first, resource)
self.assertEqual(first, second)
self.assertEqual(hash(first), hash(second))

# Update second to ensure that first and second are no longer equivalent.
second.connection_id = "foo"
self.assertNotEqual(first, second)
self.assertNotEqual(hash(first), hash(second))

# Update first with the same change, restoring equivalence.
first.connection_id = "foo"
self.assertEqual(first, second)
self.assertEqual(hash(first), hash(second))


class TestCloneDefinition:
@staticmethod
def _get_target_class():
Expand Down