Skip to content

Commit 53f8367

Browse files
authored
Portable Date Type (Python changes) (#38078)
* portable date python changes * trigger ITs * typo * add todo link * disable managed transforms
1 parent e1f0262 commit 53f8367

9 files changed

Lines changed: 56 additions & 12 deletions
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 15
3+
"modification": 1
44
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 16
3+
"modification": 1
44
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 0
3+
"modification": 1
44
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 2
3+
"modification": 1
44
}

sdks/python/apache_beam/io/jdbc.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,11 @@ def __init__(
360360
of the output PCollection elements. This bypasses automatic
361361
schema inference during pipeline construction.
362362
"""
363+
# override new portable Date type with the current Jdbc type
364+
# TODO(https://github.com/apache/beam/issues/28359):
365+
# switch JdbcIO to return portable Date type
366+
LogicalType.register_logical_type(JdbcDateType)
367+
363368
classpath = classpath or DEFAULT_JDBC_CLASSPATH
364369

365370
dataSchema = None

sdks/python/apache_beam/portability/common_urns.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,4 @@
9292
var_bytes = LogicalTypes.Enum.VAR_BYTES
9393
fixed_char = LogicalTypes.Enum.FIXED_CHAR
9494
var_char = LogicalTypes.Enum.VAR_CHAR
95+
date = LogicalTypes.Enum.DATE

sdks/python/apache_beam/transforms/managed_iceberg_it_test.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# limitations under the License.
1616
#
1717

18+
import datetime
1819
import os
1920
import unittest
2021
import uuid
@@ -33,13 +34,13 @@
3334
"EXPANSION_JARS environment var is not provided, "
3435
"indicating that jars have not been built")
3536
class ManagedIcebergIT(unittest.TestCase):
36-
WAREHOUSE = "gs://temp-storage-for-end-to-end-tests/xlang-python-using-java"
37+
WAREHOUSE = "gs://temp-storage-for-end-to-end-tests"
3738

3839
def setUp(self):
3940
self.test_pipeline = TestPipeline(is_integration_test=True)
4041
self.args = self.test_pipeline.get_full_options_as_args()
4142
self.args.extend([
42-
'--experiments=enable_managed_transforms',
43+
# '--experiments=enable_managed_transforms',
4344
])
4445

4546
def _create_row(self, num: int):
@@ -49,16 +50,24 @@ def _create_row(self, num: int):
4950
bytes_=bytes(num),
5051
bool_=(num % 2 == 0),
5152
float_=(num + float(num) / 100),
52-
arr_=[num, num, num])
53+
arr_=[num, num, num],
54+
date_=datetime.date.today() - datetime.timedelta(days=num))
5355

5456
def test_write_read_pipeline(self):
57+
biglake_catalog_props = {
58+
'type': 'rest',
59+
'uri': 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
60+
'warehouse': self.WAREHOUSE,
61+
'header.x-goog-user-project': 'apache-beam-testing',
62+
'rest.auth.type': 'google',
63+
'io-impl': 'org.apache.iceberg.gcp.gcs.GCSFileIO',
64+
'header.X-Iceberg-Access-Delegation': 'vended-credentials',
65+
'rest-metrics-reporting-enabled': 'false'
66+
}
5567
iceberg_config = {
5668
"table": "test_iceberg_write_read.test_" + uuid.uuid4().hex,
5769
"catalog_name": "default",
58-
"catalog_properties": {
59-
"type": "hadoop",
60-
"warehouse": self.WAREHOUSE,
61-
}
70+
"catalog_properties": biglake_catalog_props
6271
}
6372

6473
rows = [self._create_row(i) for i in range(100)]

sdks/python/apache_beam/typehints/schemas.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
bytes <-----> BYTES
3535
ByteString ------> BYTES
3636
Timestamp <-----> LogicalType(urn="beam:logical_type:micros_instant:v1")
37+
datetime.date <---> LogicalType(urn="beam:logical_type:date:v1")
3738
Decimal <-----> LogicalType(urn="beam:logical_type:fixed_decimal:v1")
3839
Mapping <-----> MapType
3940
Sequence <-----> ArrayType
@@ -1004,6 +1005,33 @@ def to_language_type(self, value):
10041005
return Timestamp(seconds=int(value.seconds), micros=int(value.micros))
10051006

10061007

1008+
@LogicalType._register_internal
1009+
class Date(NoArgumentLogicalType[datetime.date, np.int64]):
1010+
"""Date logical type that handles ``datetime.date``, days since epoch."""
1011+
EPOCH = datetime.date(1970, 1, 1)
1012+
1013+
@classmethod
1014+
def urn(cls):
1015+
return common_urns.date.urn
1016+
1017+
@classmethod
1018+
def representation_type(cls):
1019+
# type: () -> type
1020+
return np.int64
1021+
1022+
@classmethod
1023+
def language_type(cls):
1024+
return datetime.date
1025+
1026+
def to_representation_type(self, value):
1027+
# type: (datetime.date) -> np.int64
1028+
return (value - self.EPOCH).days
1029+
1030+
def to_language_type(self, value):
1031+
# type: (np.int64) -> datetime.date
1032+
return self.EPOCH + datetime.timedelta(days=value)
1033+
1034+
10071035
@LogicalType._register_internal
10081036
class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]):
10091037
"""A logical type for PythonCallableSource objects."""
@@ -1244,7 +1272,6 @@ def argument(self):
12441272
# TODO: A temporary fix for missing jdbc logical types.
12451273
# See the discussion in https://github.com/apache/beam/issues/35738 for
12461274
# more detail.
1247-
@LogicalType._register_internal
12481275
class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]):
12491276
"""
12501277
For internal use only; no backwards-compatibility guarantees.

sdks/python/apache_beam/typehints/schemas_test.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
# pytype: skip-file
2121

2222
import dataclasses
23+
import datetime
2324
import itertools
2425
import pickle
2526
import unittest
@@ -105,6 +106,7 @@ class ComplexSchema(NamedTuple):
105106
optional_array: Optional[Sequence[np.float32]]
106107
array_optional: Sequence[Optional[bool]]
107108
timestamp: Timestamp
109+
date: datetime.date
108110

109111

110112
def get_test_beam_fieldtype_protos():

0 commit comments

Comments
 (0)