fix(storage): serve the write buffer on read so acked records are consumable (read-after-ack)#149
Open
kamir wants to merge 3 commits into
Open
fix(storage): serve the write buffer on read so acked records are consumable (read-after-ack)#149kamir wants to merge 3 commits into
kamir wants to merge 3 commits into
Conversation
added 3 commits
June 4, 2026 16:48
AppendBatch returns an AppendResult (the produce ACK basis) as soon as the batch is buffered, but flush-to-segment only happens when a WriteBuffer threshold trips, and flushing is evaluated only inside AppendBatch (no background flusher). Read serves flushed segments only and returns ErrOffsetOutOfRange for buffered offsets. So a just-acked record whose partition then goes quiet stays unreadable (and is lost on broker restart, since the buffer is in-memory), violating Kafka read-after-ack. This test appends 10 batches with flush thresholds set so nothing flushes, then asserts every acked offset is readable. It FAILS on the current code (offset 0 -> ErrOffsetOutOfRange) and must pass once Read serves the buffer (or produce flushes before acking under acks=all). Existing tests use MaxBytes:1 so every append flushes immediately, which is why this path was never exercised. Refs: scalytics UPSTREAM/2026-06-04-kafscale-consume-readpath.md
…sumable PartitionLog.Read served only flushed segments and returned ErrOffsetOutOfRange for any offset still in the in-memory WriteBuffer. Because flush is append-triggered (ShouldFlush is evaluated only inside AppendBatch; there is no background flusher), a partition that goes quiet below the flush threshold keeps its just-acked tail in the buffer, where it was unreadable — breaking Kafka's read-after-ack contract (observed end-to-end as 1015 acked -> 588 readable on v1.6.0). Read now falls back to the buffer when the offset is not in a flushed segment: new WriteBuffer.RecordsFrom(offset, maxBytes) returns the buffered batch bytes for the requested offset onward, non-destructively. The fetch handler (cmd/broker fetch -> plog.Read) picks this up unchanged. Makes TestPartitionLogReadAfterAckBeforeFlush pass; full pkg/storage, pkg/broker and cmd/broker suites stay green. Note: this fixes READABILITY (read-after-ack). Durability-on-restart is separate — the buffer is still in-memory, so acked-but-unflushed records are lost if the broker restarts before flush. A complete acks=all guarantee additionally needs flush-before-ack or a WAL; tracked separately. Refs: scalytics UPSTREAM/2026-06-04-kafscale-consume-readpath.md
…ross rotations Appends 30 batches with MaxBatches=3 (frequent flush rotations) and asserts every acked offset stays readable. Passes over MemoryS3, isolating the end-to-end '1019 acked -> ~32 readable' loss OUT of the pkg/storage state machine (no segment overwrite, no drop across rotations). The live loss is therefore in the real S3 client / proxy fetch-forward / concurrency, not the storage logic.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Make
PartitionLog.Readserve the in-memory write buffer in addition to flushedsegments, so an acknowledged-but-not-yet-flushed record is consumable
(Kafka read-after-ack).
When this matters (scope)
This affects the configuration where the per-acknowledgement flush is disabled
(
KAFSCALE_PRODUCE_SYNC_FLUSH=false) or foracks=0produces. There, anacknowledged record can sit in the in-memory
WriteBufferuntil a size/intervalthreshold flushes it;
Readserved only flushed segments and returnedErrOffsetOutOfRangefor the buffered tail, so a consumer reading up to thehigh-watermark could miss just-acknowledged records.
In the default configuration (
KAFSCALE_PRODUCE_SYNC_FLUSH=true) everyacknowledged produce is flushed immediately, so this change is a no-op there. It
is a correctness hardening for the flush-disabled /
acks=0path.Root cause
AppendBatchappends to the in-memoryWriteBufferand returns the assignedoffset (the ack basis). A flush to a segment happens on a
WriteBufferthreshold, or per acknowledged produce in the default path.
Readscanned only flushed segments and returnedErrOffsetOutOfRangeon amiss; it never consulted the buffer.
Fix
WriteBuffer.RecordsFrom(offset, maxBytes): a non-destructive read of thebuffered batch bytes from a given offset onward.
Readfalls back to it when the offset is not in a flushed segment.No wire-protocol change; the fetch handler calls
PartitionLog.Readunchanged.Tests
TestPartitionLogReadAfterAckBeforeFlush(new): appends with flush thresholdsset so nothing flushes, asserts every acked offset is readable. Fails before,
passes after.
TestPartitionLogMultiFlushAllOffsetsReadable(new): every acked offset staysreadable across many flush rotations.
go test ./pkg/storage/... ./pkg/broker/... ./cmd/broker/...all green.Correction to an earlier version of this description
An earlier draft attributed a large "acknowledged but unreadable at volume"
effect to a flush/segment data-loss path. That was wrong. It was traced to a test
that produced one record per produce request against the flush-on-ack broker
(one record per segment, expensive to read back one segment at a time, so the
consumer hit its read deadline after a fraction). The data was durable and
complete; a batched producer round-trips byte-clean. This PR is scoped only to
the read-after-ack consistency described above.