[flink] Fix Debezium Avro deserialization for multi topic source#7871
Conversation
|
cc @JingsongLi thanks! :) |
|
cc @XiaoHongbo-Hope could you PTAL thanks! |
Sure. |
|
thanks! |
|
cc @JingsongLi @XiaoHongbo-Hope gentle reminder could PTAL minor bug fix thanks! |
|
Gentle reminder thanks! |
JingsongLi
left a comment
There was a problem hiding this comment.
Review of the fix
The change is correct and minimal. The root cause is clear: findOneTopic() picks the first topic from the config at construction time and caches it as a field, but in a multi-topic source (semicolon-separated list or topic-pattern), messages from other topics arrive and get deserialized/routed using the wrong topic name. This causes both a Schema Registry subject mismatch (crash) and incorrect CdcSourceRecord routing.
Correctness: Using message.topic() per record is the right fix. It resolves both problems:
- The Confluent Schema Registry subject lookup now uses the actual topic the message originated from.
- The
CdcSourceRecordconstructed downstream already received the localtopicvariable (which previously held the cached field), so it will now carry the correct per-record topic for table routing.
Consistency with existing code: The JSON counterpart (KafkaDebeziumJsonDeserializationSchema) already uses message.topic() directly, so this brings the Avro schema in line with the established pattern.
One minor observation: The topic field removal also removes the only usage of KafkaActionUtils.findOneTopic in the deserialization path. The import of KafkaActionUtils is still needed (for extractKafkaMetadata), so no dead-import issue here. However, it may be worth confirming that CI still passes given findOneTopic in the topic-pattern case used to eagerly connect to the Kafka admin client at construction time -- removing that call means the admin connection no longer happens at deserialization-schema instantiation, which is actually a positive side effect (no unnecessary admin call for Avro schemas).
Suggestion for future improvement (non-blocking): Consider adding a targeted unit test that exercises multi-topic deserialization with distinct schemas to prevent regression. The PR description mentions "Existing UTs" pass, which is sufficient for merge, but a dedicated test for this scenario would strengthen coverage.
Overall this is a clean, well-scoped bug fix. LGTM.
|
+1 |
Purpose
findOneTopicat init time, this gets reused for every incoming record.SerializationException: The given schema does not match. The same cached topic is also passed intoCdcSourceRecord, misrouting records to the wrong table.Tests