Skip to content

Async Engine

Eugene Palchukovsky edited this page Jun 4, 2026 · 1 revision

Async Engine (Go)

The Go SDK ships an optional asyncengine subpackage that turns an AccountSync engine into a concurrent facade with per-account ordering guarantees. The package is one possible integration pattern - you are free to implement your own per-account dispatch (sharded channels, ring buffer, actor model, third-party libraries) and use the engine directly. The bundled helper is provided so that callers who don't want to write their own dispatcher can opt in with a few lines of code.

This page is Go-only. Other SDKs handle concurrency through their own ecosystem primitives.

When to Use

Use the bundled asyncengine.AsyncEngine when the application:

  • builds the engine with AccountSync to allow concurrent access across many accounts;
  • wants the SDK to guarantee that no two operations for the same account ever run inside the engine concurrently;
  • prefers Future-style return values over manual channel/wait-group bookkeeping;
  • needs graceful and hard stop modes with deadlines.

If FullSync or NoSync is sufficient, or if you already have a custom per-account dispatcher you are happy with, you do not need this package.

Choosing Between Strategies

asyncengine ships two dispatch strategies. Each chosen at build time so the hot path stays branch-free in steady state.

Sharded

asyncengine.NewBuilder(engine).Sharded(workers)

  • A fixed number of worker channels created up front.
  • The account ID is hashed (Fibonacci mix) into one of the shards.
  • Routing is one multiply-shift per submit (lock-free, no per-queue RWMutex on the send path); one short shared read-lock is still taken per submit to order against stop, so concurrent submits are not serialized against each other.

Strengths:

  • The cheapest hot path: ~constant per submit.
  • O(1) memory regardless of how many distinct accounts are active.
  • Predictable goroutine count.

Trade-offs:

  • A hot account saturates a single shard while the others stay idle.
  • No per-account observer signals (queue created/removed never fire).
  • Different accounts that hash to the same shard interleave through the same channel.

Pick Sharded when the active account set is broad and roughly balanced or when raw throughput per submit matters more than per-account isolation.

Dynamic

asyncengine.NewBuilder(engine).Dynamic().MaxQueues(n).IdleCleanupAfter(d)

  • Per-account queue created on first submit.
  • A background goroutine retires queues that have been empty and untouched for IdleCleanupAfter.
  • MaxQueues caps the live queue count; submits for unknown accounts return ErrQueueLimit past the cap. MaxQueues(0) removes the cap.

Strengths:

  • Full per-account isolation: a slow account does not starve others that happen to share a shard.
  • Per-account observer signals (OnQueueCreated, OnQueueRemoved, per-account latency).
  • Memory scales with the active set rather than the total population.

Trade-offs:

  • An RWMutex lookup on every submit (read path is RLock, queue creation is WLock).
  • A periodic cleanup goroutine.
  • Slightly higher per-account memory because each queue holds its own channel and worker.

Pick Dynamic when account activity is skewed, when you want per-account metrics, or when the population is large enough that statically allocating shards would be wasteful.

MaxQueues defaults to runtime.NumCPU() * 32, a value designed to be effectively non-restrictive on typical hosts while still bounding pathological growth (e.g. ephemeral per-request accounts). Override with MaxQueues(0) for unbounded growth.

Result Delivery: Future

Every queued operation returns a future from the pkg/future package, resolved exactly once — from the worker goroutine in the normal case, or synchronously on the submitting goroutine if the submit fails immediately (e.g. ErrStopped, ErrQueueLimit, or a cancelled ctx) before the task is ever queued. Operations that mirror a single-value engine call return future.Future[T]; operations whose synchronous form returns two values plus an error (start stage → request-or-rejects, main stage → reservation-or-rejects, account adjustment → batch-error-or-outcomes) return future.Future2[A, B], so the asynchronous shape lines up exactly with the synchronous tuple instead of a result struct.

Available operations:

  • f.Await(ctx) - block until resolved or ctx fires. Future[T] yields (T, error); Future2[A, B] yields (A, B, error).
  • f.Done() - non-blocking check.
  • f.TryGet() - non-blocking read.
  • f.Wait() - channel that closes on resolution (for select).

The future and the wrapped engine objects are decoupled: cancelling the context passed to Await does not stop the underlying engine call; it only stops the caller from waiting.

Threading

AsyncEngine requires the wrapped engine to be built with AccountSync. Per-account correctness holds in both strategies: no two operations for the same account ever run inside the engine concurrently, and within one account every queued operation runs in submit order.

Parallelism across different accounts depends on the strategy. Dynamic gives full per-account isolation, so distinct accounts are always processed in parallel. Sharded gives per-shard serialization: distinct accounts that hash to the same shard share one worker and are processed one after another (a hot account can block others on its shard). Neither relaxes the per-account invariant above.

The Threading Contract of the engine itself is respected end-to-end.

AsyncRequest and AsyncReservation keep the same per-account queue across the boundary: calling Execute on a started request, or Commit on a reservation, re-enters the same per-account chain.

Stop Modes

  • StopGraceful(ctx) refuses new submissions and waits for every already-queued task to run to completion. Returns ctx.Err() if ctx fires before workers drain; the engine is then partially stopped and StopHard may be invoked to complete the shutdown.
  • StopHard(ctx) refuses new submissions, aborts every task that has not yet started with ErrStopped, and waits for the currently-running task in each worker to finish. Returns ctx.Err() if ctx fires before the in-flight task in each worker finishes.

When the builder was wired via AccountSyncReadyEngineBuilder.BuildAsync, the underlying *Engine is released when the stop completes successfully (returns nil). If StopGraceful or StopHard returns ctx.Err() (timeout), the engine is not yet released; the caller must complete the shutdown (call StopHard) before the release happens. When the caller used asyncengine.NewBuilder(engine) and did not wire WithStopUnderlying, the engine handle remains owned by the caller.

Observer

asyncengine.Observer is an optional diagnostic interface. The default is NoopObserver. Wire only the methods you need - the package itself has zero external observability dependencies (no OpenTelemetry, Prometheus, or logging in the import graph), so you decide where the signals go.

Available callbacks: OnEnqueue, OnDequeue, OnComplete, OnSlowSubmit, OnQueueFullBlocked, OnQueueCreated, OnQueueRemoved, OnSubmitCancelled.

Example

package main

import (
 "context"
 "log"
 "time"

 "go.openpit.dev/openpit"
 "go.openpit.dev/openpit/model"
 "go.openpit.dev/openpit/param"
 "go.openpit.dev/openpit/pretrade/policies"
)

func main() {
 // Build an AccountSync engine and wrap it into an async facade in one
 // chain. BuildAsync is only available on the AccountSync builder; for
 // FullSync or NoSync engines the bundled async helper is not used.
 asyncBuilder, err := openpit.NewEngineBuilder().
  AccountSync().
  Builtin(policies.BuildOrderValidation()).
  Builtin(
   policies.BuildRateLimit().BrokerBarrier(
    policies.RateLimitBrokerBarrier{
     Limit: policies.RateLimit{
      MaxOrders: 100,
      Window:    time.Second,
     },
    },
   ),
  ).
  BuildAsync()
 if err != nil {
  log.Fatal(err)
 }

 // Pick a dispatch strategy. Use Sharded for cheap routing across a
 // balanced account population; use Dynamic for per-account isolation
 // and per-account metrics.
 async, err := asyncBuilder.Dynamic().
  MaxQueues(0).
  IdleCleanupAfter(5 * time.Minute).
  Build()
 if err != nil {
  log.Fatal(err)
 }
 defer func() {
  if err := async.StopGraceful(context.Background()); err != nil {
   log.Printf("StopGraceful: %v", err)
  }
 }()

 // Submit a start-stage call. The future resolves once the worker has
 // executed the call. AsyncRequest.Execute and Close are queued in the
 // same per-account chain so AccountSync is never violated.
 usd, err := param.NewAsset("USD")
 if err != nil {
  log.Fatal(err)
 }
 aapl, err := param.NewAsset("AAPL")
 if err != nil {
  log.Fatal(err)
 }
 order := model.NewOrder()
 op := order.EnsureOperationView()
 op.SetInstrument(param.NewInstrument(aapl, usd))
 op.SetAccountID(param.NewAccountIDFromUint64(99224416))
 op.SetSide(param.SideBuy)
 price, err := param.NewPriceFromString("185")
 if err != nil {
  log.Fatal(err)
 }
 qty, err := param.NewQuantityFromString("100")
 if err != nil {
  log.Fatal(err)
 }
 op.SetTradeAmount(param.NewQuantityTradeAmount(qty))
 op.SetPrice(price)

 request, rejects, err := async.StartPreTrade(
  context.Background(),
  order,
 ).Await(context.Background())
 if err != nil {
  log.Fatal(err)
 }
 if request == nil {
  // Rejected at the start stage; inspect rejects.
  _ = rejects
  return
 }

 // The async request preserves AccountSync across the Start - Execute
 // boundary by routing Execute through the same per-account queue. The
 // future yields the same (reservation, rejects, error) tuple the
 // synchronous main stage returns.
 reservation, rejects, err := request.Execute(
  context.Background(),
 ).Await(context.Background())
 if err != nil {
  log.Fatal(err)
 }
 if reservation == nil {
  _ = rejects
  return
 }

 if _, err := reservation.CommitAndClose(
  context.Background(),
 ).Await(context.Background()); err != nil {
  log.Fatal(err)
 }
}

Submitting Caller-Owned Work

Submit(ctx, accountID, fn) enqueues an arbitrary func() error into the same per-account queue. Use it to run client-side work atomically with respect to engine calls on the same account - for example, to persist an order to a database before Execute, or to update a strategy book after Commit.

Splitting a logical transaction into two Submit calls "surfaces" between them: tasks for the same account from elsewhere in the system can interleave between the two halves. To keep that boundary closed, bundle the work into a single Submit whose fn does both halves.

Lifecycle Notes

  • Aborted tasks (hard stop) resolve their future with ErrStopped. For reservation methods that "and close" (Close, CommitAndClose, RollbackAndClose) the underlying reservation is still released as a safety net even on abort, so the native handle never leaks.
  • An aborted AsyncRequest.Execute releases the underlying request before resolving the future. The caller does not need a separate cleanup path.
  • The ctx passed to a submit method controls how long the producer is willing to wait for queue space. Once the task is queued it is the worker's responsibility to run or abort it.

Related Pages

Clone this wiki locally