feat(qwp): add QuestDB facade with pooled Sender and Query APIs#28
Merged
Conversation
QwpWebSocketSenderTest reached into QwpWebSocketSender via getDeclaredField/getDeclaredMethod for five members. Restoring totalBufferedBytes after an accidental delete made the reflection visible -- removing it altogether is the better fix. QwpWebSocketSender changes: - applyServerBatchSizeLimit promoted to @testonly public (production callers still use it; the annotation flags it as a test seam too). - getPendingBytes promoted to @testonly public, deduped against the former private getter (the one internal caller at the auto-flush guard now binds to the public version). - New @testonly public getters: getEffectiveAutoFlushBytes, getServerMaxBatchSize. - New @testonly public setConnectedForTest(boolean) so tests can fake the post-handshake state without reflection. - totalBufferedBytes restored as @testonly public (kept by the test and QwpTotalBufferedBytesBenchmark; otherwise dead). QwpWebSocketSenderTest: - Five reflection-based static helpers (~50 LOC) deleted. - Three java.lang.reflect imports removed. - All call sites now go through direct method invocation. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
testLargeSegmentCountReopensInOrder used to assert wall-clock elapsed < 5s as a proxy for "sortByBaseSeq stayed O(N log N)". The bound was generous for any modern machine but tight enough to flake on loaded Windows CI runners doing 2048 mmap-creates + 2048 mmap-opens. Replace the timing assertion with a comparison-count assertion: - SegmentRing tracks the total baseSeq comparisons performed by sortByBaseSeq in a static counter (one += per partition pass, bumping by 3 + (hi - lo - 1) to cover median-of-three plus the partition loop). - @testonly public accessors getSortComparisons() / resetSortComparisons() let the test reset before the run and read after. - testLargeSegmentCountReopensInOrder now asserts comparisons < 5 * N * log2(N), which sits about 30x below the O(N^2) regression value and 1.5x above the expected O(N log N) count. Detects a true regression on first commit, deterministic across hardware. Production cost is one volatile-free += per partition pass, dwarfed by the mmap I/O the same path does on every segment. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three independent fixes to the new QWP pooling layer surfaced by code review against the design/egress-api-ergonomics-review notes. SenderPool.close() left ThreadLocal references on every thread that had previously called pinToCurrentThread(). ThreadLocal#remove only clears the calling thread's slot, so a subsequent db.sender() on a different thread took the cached-pin early-return and handed back a wrapper whose delegate had just been closed by close(). The next operation on it dereferenced a closed native handle. Add an invalidated flag on PooledSender that close() flips on every wrapper under the lock; pinToCurrentThread() and releaseCurrentThread() check the flag and drop the stale ThreadLocal slot so borrow() reports "QuestDB handle is closed" instead. PooledSender.close() returned broken wrappers to the pool when delegate.flush() threw. The Sender contract intentionally does NOT clear its buffer on flush failure (to permit retry), so the next borrower inherited the failed rows; on WebSocket transport the delegate itself was terminally broken. Route the failure path through a new SenderPool.discardBroken() that marks the wrapper invalidated, removes it from the pool under the lock, signals one waiter, and closes the delegate outside the lock. The invalidated flag does double duty: a broken thread-pinned slot is also rejected by pinToCurrentThread() so the next db.sender() re-borrows a fresh slot. QwpQueryClient.cancel() between submit() returning and executeOnce() writing currentRequestId was a silent no-op: cancel() read -1 and skipped the wire-send. The window covers worker-wakeup latency, bind encoding (user code), and every failover retry's backoff. Latch a pendingCancel flag in cancel() before reading currentRequestId; execute() clears it once at the outermost entry; executeOnce() reads it immediately after the currentRequestId assignment and re-issues io.requestCancel() if set. The latch is intentionally not cleared inside executeOnce(), so failover retries also honor the cancel. Tests: - SenderPoolTest: testPinAfterCloseRejectsStaleEntry, testReleaseAfterCloseIsSafe, testBrokenSenderIsNotReturnedToPool. - QwpQueryClientUnitTest: testCancelDuringDispatchWindowLatchesPendingIntent, testExecuteEntryClearsStalePendingCancel, backed by a new @testonly isPendingCancelForTest seam. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
drain(timeoutMillis) flushes every buffered row and blocks until the server has acknowledged the resulting frame, or until the caller's timeout elapses. The shape mirrors the implicit drain that close() already runs -- target = flushAndGetSequence(), then awaitAckedFsn(target, timeoutMillis) -- but with the timeout chosen at the call site instead of at builder time via close_flush_timeout_millis. Useful for callers that want different drain budgets in different shutdown scenarios (long pre-batch checkpoint vs. tight per-iteration cleanup), or that want to observe drain progress without closing the sender. Pairs cleanly with close()'s implicit drain: drain(short) returns false on timeout without throwing, the caller can take an action (log, retry, switch to a different shutdown path), and a subsequent close() still runs its own bounded drain through close_flush_timeout_millis. Default implementation lives on the Sender interface so every transport picks it up: - WebSocket/QWP: target = real FSN; awaitAckedFsn blocks until acked. - HTTP / TCP / UDP (no FSN tracking): flushAndGetSequence returns -1, awaitAckedFsn(-1, _) returns true immediately. drain becomes "flush and report success" on these transports, which is the most honest thing a non-FSN transport can do. PooledSender forwards to its delegate so pool callers see the underlying Sender's semantics unchanged. Tests (CloseDrainTest): - testDrainBlocksUntilAckArrivesAndReturnsTrue: delayed-ACK server, drain(5000) must wait, return true; the subsequent close() is near-instant because the drain consumed the wait. - testDrainReturnsFalseOnTimeoutAndSenderStillUsable: silent server, drain(200) returns false (not throws). Sender continues to accept writes after. - testDrainNonZeroTimeoutOnFastServerReturnsImmediately: fast server, drain(5000) returns promptly -- the budget is an upper bound, not a floor. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The old 5s default was a "tight" budget that worked for benchmark-shape workloads (small batches, fast loopback, fresh server) but routinely fell short on real ones: slow consumers, catch-up replicas, chunky payloads against small server send buffers, GC pauses, network weather. When the budget runs out, close() throws and silently drops acked-but-unverified rows in memory mode -- which is a much worse failure mode than "close took a few seconds longer than I expected". The recent CI fragmentation work surfaced this concretely: a 25M-row async-mode test hit the close drain timeout because the server couldn't ACK the backlog within 5s; pinning the value to 60s gives the backlog enough wall-clock to drain even under the adversarial chunk sizes the fuzzer picks. 60s was chosen as the upper end of what feels "still bounded" without being open-ended -- humans typing Ctrl-C will tolerate up to about a minute of "shutdown in progress" before reaching for kill -9. The existing escape hatches stay in place: set close_flush_timeout_millis=0 or -1 for fast close (data lost in memory mode, recovered by the next sender in SF mode); set close_flush_timeout_millis to any positive value for a custom budget; or, new in fd3f8bb, call Sender.drain(timeoutMillis) explicitly before close() to choose the wait per call-site instead of per-builder. Test note: CloseDrainTest had two stale comments referring to the 5s default by name; updated to 60s. Behaviour-wise the tests still rely on "non-zero default, server takes ~800-1500 ms to ACK, close() must wait at least half that" -- a wider default does not invalidate any of those assertions. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
SenderPool.close()'s teardown loop iterated `all` outside the ReentrantLock with the comment "Snapshot of underlying Senders to close" -- but the code iterated the live ArrayList, not a snapshot. Concurrently, discardBroken acquired the lock and called all.remove(s) regardless of whether the pool was already shutting down. ArrayList is not thread-safe; close()'s for (int i = 0; i < all.size(); i++) loop reads `size` and indexed elements without synchronisation while discardBroken's fastRemove updates size and shifts elements down. Reachable interleaving: 1. Application calls db.close() -> pool.close() on thread A. 2. Thread B is in a try-with-resources block holding a borrowed PooledSender. The block exits, PooledSender.close() runs. 3. A acquires the lock, sets closed=true, marks every wrapper invalidated, releases the lock, starts the teardown loop, closes B's delegate. 4. B's delegate.flush() throws (closed socket); broken=true routes to discardBroken. 5. discardBroken acquires the lock (A no longer holds it), removes B's wrapper from `all`, releases the lock, calls delegate.close() on B's already-closed delegate. Outcomes range from IndexOutOfBoundsException out of close() (propagating up the shutdown path), through native-handle leaks when the iteration cursor and the remove shift miss each other, through two threads simultaneously inside delegate.close() on the same Sender (whose teardown frees native scratch and joins a daemon -- not safe to enter from concurrent threads). Fix has two parts: - close() takes a snapshot under the lock (PooledSender[] snapshot = all.toArray(...)) and iterates the snapshot outside the lock. The teardown loop is now immune to concurrent mutation of `all`, regardless of whether mutators cooperate. - discardBroken bails on `closed`. Once the pool is shutting down, close()'s snapshot loop owns the delegate close; mutating `all` here would race that iteration (with the snapshot in place that is no longer a correctness issue, but it is still wasted work) and the delegate.close() below would be a double-close. Add an if (closed) return; immediately under the lock; the read is serialised against close()'s closed=true write because both happen under `lock`. Red test (testDiscardBrokenAfterCloseDoesNotMutatePool) drives the scenario deterministically on a single thread: borrow a sender, write a row so flush has something to send, pool.close() (which closes the delegate), then sender.close() (whose flush throws, broken=true, route to discardBroken). Before the fix pool.totalSize() drops from 1 to 0 because discardBroken removed the wrapper from `all` post-close; after the fix it stays at 1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
QueryClientPool.createUnlocked() constructs a QwpQueryClient via QwpQueryClient.fromConfig() and then calls client.connect(). The construction step field-initialises QwpBindValues, which allocates 8192 NATIVE_DEFAULT bytes through NativeBufferWriter. When connect() throws -- server unreachable, role mismatch (target=primary against a REPLICA), TLS handshake error, etc. -- the client reference escapes: the catch in acquire() (lines 126-133) only has handles to inFlightCreations and the lock, and the pre-warm cleanup in the constructor (lines 94-102) iterates `for (i < built)` over `all`, which never received the failed worker. Every connect failure during pool growth leaks one bindValues buffer; a flapping endpoint accumulates the leak per attempt. createUnlocked() now catches RuntimeException from connect(), calls client.close(), and rethrows. QwpQueryClient.close() guards against pre-connect state via closedFlag and runs bindValues.close() in a finally block, so closing a never-connected client releases the NativeBufferWriter scratch without touching the still-null ioThread or webSocketClient. QueryClientPoolLeakTest covers both the pre-warm and lazy-create paths. It points a QwpQueryClient at a FakeStatusServer that returns HTTP 421 and X-QuestDB-Role: REPLICA while the client requests target=primary -- the endpoint walk rejects the only option and connect() throws HttpClientException deterministically. The test asserts that Unsafe.getMemUsedByTag(NATIVE_DEFAULT) returns to the pre-failure baseline. Before the fix both tests fail with an 8192-byte delta; after the fix the delta is zero. The QueryClientPool constructor and acquire() become public to match SenderPool's testability surface (its constructor and borrow() are already public). QueryWorker was already public.
pinToCurrentThread() caches the borrowed PooledSender in a
ThreadLocal (threadAffine) so subsequent db.sender() calls on the
same thread skip the borrow path. PooledSender.close() -- the
public Sender#close() the user reaches for in
try (Sender s = db.sender()) { ... } -- flushed and returned the
wrapper to the pool, but never cleared the caller's
ThreadLocal entry. A subsequent db.sender() on the same thread
would re-read the cached entry, see isInvalidated() = false, and
return the wrapper even though another consumer had since
borrowed the slot. Both consumers then wrote to the same
underlying Sender; ILP rows interleaved on the wire.
Reachable interleaving:
1. Thread A: db.sender() -> S, inUse=true, A's TL=S.
2. Thread A: S.close() -> inUse=false, S returned to pool;
A's TL still references S.
3. Thread B: db.sender() -> borrow() polls S from `available`,
markInUse=true, returns S to B.
4. Thread A: db.sender() -> TL.get() = S, !invalidated -> returns
S. A and B now share S.
The earlier defensive idea of "also check pinned.isInUse() in
pinToCurrentThread()" only narrows the window: it catches the
case where A re-pins before B borrows, but once B's borrow has
set inUse=true the guard passes and the bug repeats.
Fix is structural: clear the pin in PooledSender.close() before
the slot becomes borrowable again.
- New package-private SenderPool.clearPinIfCurrent(s) removes the
current thread's TL entry iff it currently references s.
- PooledSender.close() calls it in the finally block, BEFORE
pool.giveBack(this) / pool.discardBroken(this). The order
matters: if we cleared after giveBack, a concurrent borrower
could grab the slot while the TL still pointed at the wrapper,
and a re-pin on this thread would hand the (now in-use)
wrapper back -- the same race this clear is meant to close.
clearPinIfCurrent is also called on the discardBroken path. There
it is redundant -- markInvalidated() means the existing
isInvalidated() check in pinToCurrentThread() would already clear
the TL on the next pin -- but uniformly clearing is simpler and
makes the close() contract symmetric across the broken/healthy
branches.
Behaviour change: db.sender() after s.close() on the same thread
no longer returns the same wrapper instance. It goes through
borrow() again and may return a different slot, or block on an
empty pool. This is the correct semantics -- close() releases
the pin; releaseSender() remains the documented release path
for code that wants to keep using the same wrapper without
flushing.
Red tests (in SenderPoolTest):
- testPinAfterUserCloseDoesNotShareWrapper: same-thread
reproducer, pool size 1 so the in-between borrow
deterministically receives the just-returned slot. Bug
exposed at the wrapper-identity level without timing. Before
fix, pinToCurrentThread() returns the (now in-use) wrapper;
after fix, it tries to borrow from an empty pool and trips
the 100ms acquireTimeout, throwing LineSenderException.
- testPinAfterUserCloseDoesNotShareWrapperCrossThread: mirrors
the originally reported A/B scenario with two threads sequenced
via CountDownLatch. Same identity assertion. Same green/red
flip across the fix.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The parent questdb repo's PR title validator already accepts qwp as a subtype, but the client repo's validator did not. Add it here so QWP- scoped PRs can be titled consistently across the two repos. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Contributor
[PR Coverage check]😍 pass : 531 / 1043 (50.91%) file detail
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
QuestDBhandle pools both ingest (Sender) and query connections behind one configuration string. Construct once, share across threads.db.executeSql(sql, handler)for no-bind one-shots;db.query().sql(...).binds(...).handler(...).submit()for parameterised queries returning aCompletionyou canawait,await(timeout, unit),cancel, orisDone.db.borrowSender()(try-with-resources, close flushes and returns to pool) anddb.sender()(thread-affine, zero borrow overhead for hot producer threads).min/max/idleTimeoutMillis/maxLifetimeMillis. A single daemon housekeeper sweeps both pools.sender_pool_min,sender_pool_max,query_pool_min,query_pool_max,acquire_timeout_ms,idle_timeout_ms,max_lifetime_ms. Explicit builder calls afterfromConfigstill win.http::maps tows::on the query side and vice versa, so the common single-port deployment is one line.Backwards compatibility
SenderAPI is unchanged. Code usingSender.fromConfig(...)directly is not affected.QuestDB.connect(...)orQuestDB.builder().Tradeoffs
QuestDBhandle owns one daemon housekeeper thread plus one dispatch thread per pooled query client. Defaults aremin=1,max=4per pool, so at idle one handle holds two server-facing connections.db.query()is single-flight per thread by design (zero-GC steady state). One thread that needs multiple in-flight queries callsdb.newQuery(), which allocates per call.Test plan
mvn -pl core testConfigStringTranslatorTest-- schema translation, pool-key extraction, error paths.SenderPoolTest-- borrow / return / exhaustion / thread affinity / elastic growth / idle reaping /availableSizeinvariants.QuestDBBuilderTest-- validation, partial-build cleanup, connect-string pool keys.QueryWorkerTest--client()getter coverage.questdbPR asQuestDBFacadeE2ETest.QuestDBExamples.javacovers the common patterns.