Skip to content

Commit e71ba54

Browse files
authored
Add ExceptionInfo type for with_exception_handling (#38173)
* Add ExceptionInfo type for with_exception_handling * lint * changes
1 parent 0e8953c commit e71ba54

3 files changed

Lines changed: 25 additions & 19 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
encode finished bitset. SentinelBitSetCoder and BitSetCoder are state
7474
compatible. Both coders can decode encoded bytes from the other coder
7575
([#38139](https://github.com/apache/beam/issues/38139)).
76+
* (Python) Added type alias for with_exception_handling to be used for typehints. ([#38173](https://github.com/apache/beam/issues/38173)).
7677

7778
## Breaking Changes
7879

sdks/python/apache_beam/transforms/core.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2312,6 +2312,24 @@ def FlatMapTuple(fn, *args, **kwargs): # pylint: disable=invalid-name
23122312
return pardo
23132313

23142314

2315+
# Element format emitted on the dead-letter tag by `with_exception_handling()`:
2316+
# (exception_class, repr(exception), formatted_traceback_lines).
2317+
# The first slot is the bare metaclass `type` rather than `Type[BaseException]`
2318+
# because the runtime value is `type(exn)`, a class object whose only
2319+
# universally-correct hint is `type`. Beam has no parametric `Type[...]`
2320+
# constraint that would yield a non-pickle coder anyway, so narrowing here
2321+
# would over-promise without changing the wire format.
2322+
ExceptionInfo = typehints.Tuple[type, str, typehints.List[str]]
2323+
2324+
2325+
class DeadLetter:
2326+
"""Type hint for a dead-letter element: (original_element, ExceptionInfo).
2327+
2328+
Use as ``DeadLetter[T]`` in ``with_output_types(...)`` etc."""
2329+
def __class_getitem__(cls, element_type):
2330+
return typehints.Tuple[element_type, ExceptionInfo]
2331+
2332+
23152333
class _ExceptionHandlingWrapper(ptransform.PTransform):
23162334
"""Implementation of ParDo.with_exception_handling."""
23172335
def __init__(
@@ -2455,13 +2473,7 @@ def expand(self, pcoll):
24552473
main_output_type = self._fn.infer_output_type(pcoll.element_type)
24562474
tagged_type_hints = dict(self._fn.get_type_hints().tagged_output_types())
24572475

2458-
# Dead letter format: Tuple[element, Tuple[exception_type, repr, traceback]]
2459-
dead_letter_type = typehints.Tuple[pcoll.element_type,
2460-
typehints.Tuple[type,
2461-
str,
2462-
typehints.List[str]]]
2463-
2464-
tagged_type_hints[self._dead_letter_tag] = dead_letter_type
2476+
tagged_type_hints[self._dead_letter_tag] = DeadLetter[pcoll.element_type]
24652477
pardo = pardo.with_output_types(main_output_type, **tagged_type_hints)
24662478

24672479
all_tags = tuple(set(self._extra_tags or ()) | {self._dead_letter_tag})

sdks/python/apache_beam/transforms/core_test.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from apache_beam.options.pipeline_options import PipelineOptions
3434
from apache_beam.testing.util import assert_that
3535
from apache_beam.testing.util import equal_to
36+
from apache_beam.transforms.core import DeadLetter
3637
from apache_beam.transforms.resources import ResourceHint
3738
from apache_beam.transforms.userstate import BagStateSpec
3839
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
@@ -559,9 +560,7 @@ def test_with_exception_handling_then_with_outputs(self):
559560
self.assertEqual(results.main.element_type, int)
560561
self.assertEqual(results.threes.element_type, int)
561562
self.assertEqual(results.fives.element_type, str)
562-
self.assertEqual(
563-
results.bad.element_type,
564-
typehints.Tuple[int, typehints.Tuple[type, str, typehints.List[str]]])
563+
self.assertEqual(results.bad.element_type, DeadLetter[int])
565564

566565
def test_with_outputs_then_with_exception_handling(self):
567566
"""Direction 2: .with_outputs().with_exception_handling()"""
@@ -582,9 +581,7 @@ def test_with_outputs_then_with_exception_handling(self):
582581
self.assertEqual(results.main.element_type, int)
583582
self.assertEqual(results.threes.element_type, int)
584583
self.assertEqual(results.fives.element_type, str)
585-
self.assertEqual(
586-
results.bad.element_type,
587-
typehints.Tuple[int, typehints.Tuple[type, str, typehints.List[str]]])
584+
self.assertEqual(results.bad.element_type, DeadLetter[int])
588585

589586
def test_with_outputs_then_with_exception_handling_custom_dead_letter_tag(
590587
self):
@@ -603,9 +600,7 @@ def test_with_outputs_then_with_exception_handling_custom_dead_letter_tag(
603600
bad_elements = results.errors | beam.Keys()
604601
assert_that(bad_elements, equal_to([2]), 'errors')
605602
self.assertEqual(results.threes.element_type, int)
606-
self.assertEqual(
607-
results.errors.element_type,
608-
typehints.Tuple[int, typehints.Tuple[type, str, typehints.List[str]]])
603+
self.assertEqual(results.errors.element_type, DeadLetter[int])
609604

610605
def test_with_exception_handling_then_with_outputs_custom_dead_letter_tag(
611606
self):
@@ -624,9 +619,7 @@ def test_with_exception_handling_then_with_outputs_custom_dead_letter_tag(
624619
bad_elements = results.errors | beam.Keys()
625620
assert_that(bad_elements, equal_to([2]), 'errors')
626621
self.assertEqual(results.threes.element_type, int)
627-
self.assertEqual(
628-
results.errors.element_type,
629-
typehints.Tuple[int, typehints.Tuple[type, str, typehints.List[str]]])
622+
self.assertEqual(results.errors.element_type, DeadLetter[int])
630623

631624
def test_exception_handling_no_with_outputs_backward_compat(self):
632625
"""Without with_outputs(), behavior is unchanged."""

0 commit comments

Comments
 (0)