Skip to content

Commit 1f83bf0

Browse files
authored
[yaml]: add kafka parameter explicitly and uncover kafka test (#36603)
* add kafka parameter explicitly and uncover kafka test for a try * fix post commit conflict * fix yapf error
1 parent 3a19ef1 commit 1f83bf0

4 files changed

Lines changed: 32 additions & 28 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"revision": 1
3+
"revision": 2
44
}

sdks/python/apache_beam/yaml/extended_tests/messaging/kafka.yaml

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -50,33 +50,33 @@ pipelines:
5050
# Locally runs fine.
5151
# Kafka read pipeline
5252
# Need a separate read pipeline to make sure the write pipeline is flushed
53-
# - pipeline:
54-
# type: chain
55-
# transforms:
56-
# - type: ReadFromKafka
57-
# config:
58-
# format: "RAW"
59-
# topic: "silly_topic"
60-
# bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}"
61-
# consumer_config:
62-
# auto.offset.reset: "earliest"
63-
# group.id: "yaml-kafka-test-group"
64-
# max_read_time_seconds: 10 # will read forever if not set
65-
# - type: MapToFields
66-
# config:
67-
# language: python
68-
# fields:
69-
# value:
70-
# callable: |
71-
# # Kafka RAW format reads messages as bytes in the 'payload' field of a Row
72-
# lambda row: row.payload.decode('utf-8')
73-
# output_type: string
74-
# - type: AssertEqual
75-
# config:
76-
# elements:
77-
# - {value: "123"}
78-
# - {value: "456"}
79-
# - {value: "789"}
53+
- pipeline:
54+
type: chain
55+
transforms:
56+
- type: ReadFromKafka
57+
config:
58+
format: "RAW"
59+
topic: "silly_topic"
60+
bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}"
61+
consumer_config:
62+
auto.offset.reset: "earliest"
63+
group.id: "yaml-kafka-test-group"
64+
max_read_time_seconds: 10 # will read forever if not set
65+
- type: MapToFields
66+
config:
67+
language: python
68+
fields:
69+
value:
70+
callable: |
71+
# Kafka RAW format reads messages as bytes in the 'payload' field of a Row
72+
lambda row: row.payload.decode('utf-8')
73+
output_type: string
74+
- type: AssertEqual
75+
config:
76+
elements:
77+
- {value: "123"}
78+
- {value: "456"}
79+
- {value: "789"}
8080

8181
options:
8282
streaming: true

sdks/python/apache_beam/yaml/standard_io.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
'error_handling': 'error_handling'
7575
'file_descriptor_path': 'file_descriptor_path'
7676
'message_name': 'message_name'
77+
'max_read_time_seconds': 'max_read_time_seconds'
7778
'WriteToKafka':
7879
'format': 'format'
7980
'topic': 'topic'

sdks/python/apache_beam/yaml/yaml_provider.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1548,6 +1548,9 @@ def create_transform(
15481548
"""Creates a PTransform instance for the given transform type and arguments.
15491549
"""
15501550
mappings = self._mappings[typ]
1551+
# NOTE: If the `key` is not found in the mappings
1552+
# (e.g. standard_io.yaml), the `key` is passed down
1553+
# as is to the underlying transform.
15511554
remapped_args = {
15521555
mappings.get(key, key): value
15531556
for key, value in args.items()

0 commit comments

Comments
 (0)