BB-780: Configure CRR queue processor poll interval#2749
Conversation
Hello anurag4dsb,My role is to assist you with the merge of this Available options
Available commands
Status report is not available. |
|
LGTM - clean, well-scoped change. The new config key wires correctly through the existing BackbeatConsumer path (which already validates and defaults maxPollIntervalMs). Schema validation, test coverage, and docker-entrypoint mapping are all solid. |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files
... and 3 files with indirect coverage changes
@@ Coverage Diff @@
## bugfix/BB-782-deflake-consumer-metrics-test #2749 +/- ##
===============================================================================
- Coverage 74.34% 74.06% -0.28%
===============================================================================
Files 201 201
Lines 13485 13486 +1
===============================================================================
- Hits 10025 9988 -37
- Misses 3450 3488 +38
Partials 10 10
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
TDD: specify the contract first. The replication queue processor accepts an optional maxPollIntervalMs in its config block; tests/config.json sets a distinctive value (350000) so the pass-through is provable, the value is optional, and values below 45000 are rejected. These tests fail until the next commit adds the schema and wiring (both pushed together so CI sees the green tip).
max.poll.interval.ms was pinned to its 5 minute default. Lowering mpu_parts_concurrency to protect Metadata makes a large-MPU replication exceed it, which evicts the queue processor's consumer and redelivers the partition - losing the partial transfer and leaving orphan parts. Let the replication queue processor set an optional maxPollIntervalMs in its config block (min 45000, the librdkafka session.timeout.ms floor); unset falls back to BackbeatConsumer's existing 300000 default. Scoped to the queue processor - the only consumer with the slow-task problem (the status processor does fast metadata writes). Adds a matching EXTENSIONS_REPLICATION_QUEUE_PROCESSOR_MAX_POLL_INTERVAL_MS mapping.
7ceacca to
efc1331
Compare
|
|
LGTM — wiring is correct end-to-end: env var (docker-entrypoint.sh) -> Joi-validated config (ReplicationConfigValidator.js) -> QueueProcessor -> BackbeatConsumer. The property name maxPollIntervalMs matches BackbeatConsumer's existing interface, which already accepted it with a 300000 default, so unset deployments are unchanged. The 45000 floor matches librdkafka's session.timeout.ms constraint. Tests cover pass-through, optional-when-unset, and below-minimum rejection. |
This is handled |
| groupId: joi.string().required(), | ||
| retry: qpRetryJoi, | ||
| concurrency: joi.number().greater(0).default(10), | ||
| maxPollIntervalMs: joi.number().min(MAX_POLL_INTERVAL_MS_MIN), |
There was a problem hiding this comment.
Should you also validate a maximum ?
What happens if we put 1 hour, or 1 day ? I suppose on a crash of the consumer, kafka will not notice it until the full interval is elasped, so we might wait a long time before another consumer can take its place
nicolas2bert
left a comment
There was a problem hiding this comment.
let's add a max limit to 30 minutes which let 1 hour for a task to finish (drain window) which should be more the enough for all our cases.
Note to reviewers: the target branch is for another fix, unrelated to this, we are debating if we add that or not. But it was needed for a green CI.
Intent: why does this change exist?
Lowering
mpu_parts_concurrency(to stop large MPUs overloading Metadata) makes a single large-MPU replication run longer than Kafka's 5-minmax.poll.interval.ms, so the queue processor's consumer gets evicted mid-transfer — the partition is redelivered and you get duplicate work, orphan sproxyd parts, and ghost processes. This exposes that timeout so an operator can raise it and let long transfers finish. It's the short-term mitigation from the "CRR on large MPUs" root-cause plan; the real fix (keep polling while a slow task runs) is separate, larger work.System impact: what's affected, including downstream?
Only the replication queue processor's Kafka consumer. The value rides through to
BackbeatConsumer, which already acceptedmaxPollIntervalMs— so there are nolib/changes. Downstream: federation has to render the key for S3C to use it (S3C-11270); Zenko sets it through config directly.Preserved behavior: what explicitly stays the same?
Unset behaves exactly as today —
BackbeatConsumer's existing 300000 default applies, so deployments that set nothing are unchanged. Every other consumer (status processor, lifecycle, GC, notification, metrics, queue populator) is deliberately untouched: this is the shortest, lowest-risk fix for the incident, and the queue processor is the only consumer with the documented slow-task problem. Making the timeout global, or tuning the other consumers, is intentionally left for a separate consistent methodology rather than bolted on here.Intended change: what's different after this PR?
extensions.replication.queueProcessor.maxPollIntervalMsis now an optional config key (minimum 45000 — librdkafka'ssession.timeout.msfloor, below which the client refuses to start) wired into the consumer, plus aKAFKA_..._MAX_POLL_INTERVAL_MSentrypoint mapping for container deployments.Verification: how do we know this worked, or how would we know if it didn't?
New unit tests assert the value passes through from config, stays optional, and is rejected below 45000. Built TDD-style: the first commit adds the failing tests, the second implements the schema + wiring — check out the test commit alone and the spec goes red, proving the tests actually bite.