Skip to content

[flink] Fix Debezium Avro deserialization for multi topic source#7871

Merged
JingsongLi merged 1 commit into
apache:masterfrom
ArnavBalyan:arnavb/fix-kafka-debezium-avro-multi-topic
May 23, 2026
Merged

[flink] Fix Debezium Avro deserialization for multi topic source#7871
JingsongLi merged 1 commit into
apache:masterfrom
ArnavBalyan:arnavb/fix-kafka-debezium-avro-multi-topic

Conversation

@ArnavBalyan
Copy link
Copy Markdown
Member

@ArnavBalyan ArnavBalyan commented May 15, 2026

Purpose

  • KafkaDebeziumAvroDeserializationSchema returns 1 topic via findOneTopic at init time, this gets reused for every incoming record.
  • With multi topic configs (topic=a;b), any message coming from another topic leads to a crash with SerializationException: The given schema does not match. The same cached topic is also passed into CdcSourceRecord, misrouting records to the wrong table.
  • Read topic from the actual message for deserialize instead of using the cached value at init time.
  • Closes user reported bug [Bug] Kafka Debezium Avro deserialization uses wrong schema subject for multi-topic sources #7859.

Tests

  • Existing UTs

@ArnavBalyan
Copy link
Copy Markdown
Member Author

cc @JingsongLi thanks! :)

@ArnavBalyan
Copy link
Copy Markdown
Member Author

cc @XiaoHongbo-Hope could you PTAL thanks!

@XiaoHongbo-Hope
Copy link
Copy Markdown
Contributor

cc @XiaoHongbo-Hope could you PTAL thanks!

Sure.

@ArnavBalyan
Copy link
Copy Markdown
Member Author

thanks!

@ArnavBalyan
Copy link
Copy Markdown
Member Author

cc @JingsongLi @XiaoHongbo-Hope gentle reminder could PTAL minor bug fix thanks!

@ArnavBalyan
Copy link
Copy Markdown
Member Author

Gentle reminder thanks!

Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review of the fix

The change is correct and minimal. The root cause is clear: findOneTopic() picks the first topic from the config at construction time and caches it as a field, but in a multi-topic source (semicolon-separated list or topic-pattern), messages from other topics arrive and get deserialized/routed using the wrong topic name. This causes both a Schema Registry subject mismatch (crash) and incorrect CdcSourceRecord routing.

Correctness: Using message.topic() per record is the right fix. It resolves both problems:

  1. The Confluent Schema Registry subject lookup now uses the actual topic the message originated from.
  2. The CdcSourceRecord constructed downstream already received the local topic variable (which previously held the cached field), so it will now carry the correct per-record topic for table routing.

Consistency with existing code: The JSON counterpart (KafkaDebeziumJsonDeserializationSchema) already uses message.topic() directly, so this brings the Avro schema in line with the established pattern.

One minor observation: The topic field removal also removes the only usage of KafkaActionUtils.findOneTopic in the deserialization path. The import of KafkaActionUtils is still needed (for extractKafkaMetadata), so no dead-import issue here. However, it may be worth confirming that CI still passes given findOneTopic in the topic-pattern case used to eagerly connect to the Kafka admin client at construction time -- removing that call means the admin connection no longer happens at deserialization-schema instantiation, which is actually a positive side effect (no unnecessary admin call for Avro schemas).

Suggestion for future improvement (non-blocking): Consider adding a targeted unit test that exercises multi-topic deserialization with distinct schemas to prevent regression. The PR description mentions "Existing UTs" pass, which is sufficient for merge, but a dedicated test for this scenario would strengthen coverage.

Overall this is a clean, well-scoped bug fix. LGTM.

@JingsongLi
Copy link
Copy Markdown
Contributor

+1

@JingsongLi JingsongLi merged commit 4d1a44a into apache:master May 23, 2026
11 of 12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants