-
Notifications
You must be signed in to change notification settings - Fork 1
Async Engine
The Go SDK ships an optional asyncengine subpackage that turns an
AccountSync engine into a concurrent facade with per-account ordering
guarantees. The package is one possible integration pattern - you are
free to implement your own per-account dispatch (sharded channels, ring
buffer, actor model, third-party libraries) and use the engine directly.
The bundled helper is provided so that callers who don't want to write
their own dispatcher can opt in with a few lines of code.
This page is Go-only. Other SDKs handle concurrency through their own ecosystem primitives.
Use the bundled asyncengine.AsyncEngine when the application:
- builds the engine with
AccountSyncto allow concurrent access across many accounts; - wants the SDK to guarantee that no two operations for the same account ever run inside the engine concurrently;
- prefers
Future-style return values over manual channel/wait-group bookkeeping; - needs graceful and hard stop modes with deadlines.
If FullSync or NoSync is sufficient, or if you already have a
custom per-account dispatcher you are happy with, you do not need this
package.
asyncengine ships two dispatch strategies. Each chosen at build time so
the hot path stays branch-free in steady state.
asyncengine.NewBuilder(engine).Sharded(workers)
- A fixed number of worker channels created up front.
- The account ID is hashed (Fibonacci mix) into one of the shards.
- Routing is one multiply-shift per submit (lock-free, no per-queue RWMutex on the send path); one short shared read-lock is still taken per submit to order against stop, so concurrent submits are not serialized against each other.
Strengths:
- The cheapest hot path: ~constant per submit.
- O(1) memory regardless of how many distinct accounts are active.
- Predictable goroutine count.
Trade-offs:
- A hot account saturates a single shard while the others stay idle.
- No per-account observer signals (queue created/removed never fire).
- Different accounts that hash to the same shard interleave through the same channel.
Pick Sharded when the active account set is broad and roughly balanced
or when raw throughput per submit matters more than per-account
isolation.
asyncengine.NewBuilder(engine).Dynamic().MaxQueues(n).IdleCleanupAfter(d)
- Per-account queue created on first submit.
- A background goroutine retires queues that have been empty and
untouched for
IdleCleanupAfter. -
MaxQueuescaps the live queue count; submits for unknown accounts returnErrQueueLimitpast the cap.MaxQueues(0)removes the cap.
Strengths:
- Full per-account isolation: a slow account does not starve others that happen to share a shard.
- Per-account observer signals (
OnQueueCreated,OnQueueRemoved, per-account latency). - Memory scales with the active set rather than the total population.
Trade-offs:
- An
RWMutexlookup on every submit (read path is RLock, queue creation is WLock). - A periodic cleanup goroutine.
- Slightly higher per-account memory because each queue holds its own channel and worker.
Pick Dynamic when account activity is skewed, when you want per-account
metrics, or when the population is large enough that statically
allocating shards would be wasteful.
MaxQueues defaults to runtime.NumCPU() * 32, a value designed to be
effectively non-restrictive on typical hosts while still bounding
pathological growth (e.g. ephemeral per-request accounts). Override
with MaxQueues(0) for unbounded growth.
Every queued operation returns a future from the pkg/future package,
resolved exactly once — from the worker goroutine in the normal case,
or synchronously on the submitting goroutine if the submit fails
immediately (e.g. ErrStopped, ErrQueueLimit, or a cancelled ctx)
before the task is ever queued. Operations that mirror a
single-value engine call return future.Future[T]; operations whose
synchronous form returns two values plus an error (start stage →
request-or-rejects, main stage → reservation-or-rejects, account adjustment →
batch-error-or-outcomes) return future.Future2[A, B], so the asynchronous
shape lines up exactly with the synchronous tuple instead of a result struct.
Available operations:
-
f.Await(ctx)- block until resolved or ctx fires.Future[T]yields(T, error);Future2[A, B]yields(A, B, error). -
f.Done()- non-blocking check. -
f.TryGet()- non-blocking read. -
f.Wait()- channel that closes on resolution (forselect).
The future and the wrapped engine objects are decoupled: cancelling the
context passed to Await does not stop the underlying engine call; it
only stops the caller from waiting.
AsyncEngine requires the wrapped engine to be built with AccountSync.
Per-account correctness holds in both strategies: no two operations for
the same account ever run inside the engine concurrently, and within one
account every queued operation runs in submit order.
Parallelism across different accounts depends on the strategy.
Dynamic gives full per-account isolation, so distinct accounts are
always processed in parallel. Sharded gives per-shard serialization:
distinct accounts that hash to the same shard share one worker and are
processed one after another (a hot account can block others on its
shard). Neither relaxes the per-account invariant above.
The Threading Contract of the engine itself is respected end-to-end.
AsyncRequest and AsyncReservation keep the same per-account queue
across the boundary: calling Execute on a started request, or Commit
on a reservation, re-enters the same per-account chain.
-
StopGraceful(ctx)refuses new submissions and waits for every already-queued task to run to completion. Returnsctx.Err()if ctx fires before workers drain; the engine is then partially stopped andStopHardmay be invoked to complete the shutdown. -
StopHard(ctx)refuses new submissions, aborts every task that has not yet started withErrStopped, and waits for the currently-running task in each worker to finish. Returnsctx.Err()if ctx fires before the in-flight task in each worker finishes.
When the builder was wired via AccountSyncReadyEngineBuilder.BuildAsync,
the underlying *Engine is released when the stop completes successfully
(returns nil). If StopGraceful or StopHard returns ctx.Err()
(timeout), the engine is not yet released; the caller must complete
the shutdown (call StopHard) before the release happens. When the
caller used asyncengine.NewBuilder(engine) and did not wire
WithStopUnderlying, the engine handle remains owned by the caller.
asyncengine.Observer is an optional diagnostic interface. The default
is NoopObserver. Wire only the methods you need - the package itself
has zero external observability dependencies (no OpenTelemetry,
Prometheus, or logging in the import graph), so you decide where the
signals go.
Available callbacks: OnEnqueue, OnDequeue, OnComplete,
OnSlowSubmit, OnQueueFullBlocked, OnQueueCreated, OnQueueRemoved,
OnSubmitCancelled.
package main
import (
"context"
"log"
"time"
"go.openpit.dev/openpit"
"go.openpit.dev/openpit/model"
"go.openpit.dev/openpit/param"
"go.openpit.dev/openpit/pretrade/policies"
)
func main() {
// Build an AccountSync engine and wrap it into an async facade in one
// chain. BuildAsync is only available on the AccountSync builder; for
// FullSync or NoSync engines the bundled async helper is not used.
asyncBuilder, err := openpit.NewEngineBuilder().
AccountSync().
Builtin(policies.BuildOrderValidation()).
Builtin(
policies.BuildRateLimit().BrokerBarrier(
policies.RateLimitBrokerBarrier{
Limit: policies.RateLimit{
MaxOrders: 100,
Window: time.Second,
},
},
),
).
BuildAsync()
if err != nil {
log.Fatal(err)
}
// Pick a dispatch strategy. Use Sharded for cheap routing across a
// balanced account population; use Dynamic for per-account isolation
// and per-account metrics.
async, err := asyncBuilder.Dynamic().
MaxQueues(0).
IdleCleanupAfter(5 * time.Minute).
Build()
if err != nil {
log.Fatal(err)
}
defer func() {
if err := async.StopGraceful(context.Background()); err != nil {
log.Printf("StopGraceful: %v", err)
}
}()
// Submit a start-stage call. The future resolves once the worker has
// executed the call. AsyncRequest.Execute and Close are queued in the
// same per-account chain so AccountSync is never violated.
usd, err := param.NewAsset("USD")
if err != nil {
log.Fatal(err)
}
aapl, err := param.NewAsset("AAPL")
if err != nil {
log.Fatal(err)
}
order := model.NewOrder()
op := order.EnsureOperationView()
op.SetInstrument(param.NewInstrument(aapl, usd))
op.SetAccountID(param.NewAccountIDFromUint64(99224416))
op.SetSide(param.SideBuy)
price, err := param.NewPriceFromString("185")
if err != nil {
log.Fatal(err)
}
qty, err := param.NewQuantityFromString("100")
if err != nil {
log.Fatal(err)
}
op.SetTradeAmount(param.NewQuantityTradeAmount(qty))
op.SetPrice(price)
request, rejects, err := async.StartPreTrade(
context.Background(),
order,
).Await(context.Background())
if err != nil {
log.Fatal(err)
}
if request == nil {
// Rejected at the start stage; inspect rejects.
_ = rejects
return
}
// The async request preserves AccountSync across the Start - Execute
// boundary by routing Execute through the same per-account queue. The
// future yields the same (reservation, rejects, error) tuple the
// synchronous main stage returns.
reservation, rejects, err := request.Execute(
context.Background(),
).Await(context.Background())
if err != nil {
log.Fatal(err)
}
if reservation == nil {
_ = rejects
return
}
if _, err := reservation.CommitAndClose(
context.Background(),
).Await(context.Background()); err != nil {
log.Fatal(err)
}
}Submit(ctx, accountID, fn) enqueues an arbitrary func() error into
the same per-account queue. Use it to run client-side work atomically
with respect to engine calls on the same account - for example, to
persist an order to a database before Execute, or to update a strategy
book after Commit.
Splitting a logical transaction into two Submit calls "surfaces"
between them: tasks for the same account from elsewhere in the system
can interleave between the two halves. To keep that boundary closed,
bundle the work into a single Submit whose fn does both halves.
- Aborted tasks (hard stop) resolve their future with
ErrStopped. For reservation methods that "and close" (Close,CommitAndClose,RollbackAndClose) the underlying reservation is still released as a safety net even on abort, so the native handle never leaks. - An aborted
AsyncRequest.Executereleases the underlying request before resolving the future. The caller does not need a separate cleanup path. - The
ctxpassed to a submit method controls how long the producer is willing to wait for queue space. Once the task is queued it is the worker's responsibility to run or abort it.
- Threading Contract: per-mode sync contract that AsyncEngine respects.
- Pre-trade Pipeline: the Request/Reservation lifecycle that AsyncRequest/AsyncReservation wraps.
- Getting Started: the synchronous flow that AsyncEngine layers on top of.