A generic, concurrency-safe in-memory aggregation buffer for Go. It accumulates high-throughput events keyed by an identifier, merges them with a user-supplied function, and periodically flushes the merged results to a channel for downstream processing.
It is built for the common "many writers, fewer distinct keys" workload — counters, metrics, rate windows, batch coalescing — where reducing N writes into one record per key before it leaves the process saves significant downstream cost.
- How it works
- Installation
- Quick start
- Concurrency contract
- API reference
- Implementation notes
- Benchmarks
- Testing
- License
Add(key, *T) ChanPool() <-chan *T
writers ───────────────────────► ┌───────────────┐ ──────────────────► consumer
(many) │ sharded maps │ periodic flush (drains
│ shard 0..N-1 │ (cleanupInterval) merged
└───────────────┘ records)
Addhashes the key to one ofNshards and merges the incoming value into the existing record for that key via youraddFunc(or stores it as new).- A background ticker fires every
cleanupIntervaland runsCleanup, which atomically detaches each non-empty shard's map and drains the merged records into the output channel. - A consumer ranges over
ChanPool()and handles each flushed record.
The number of shards is derived from runtime.NumCPU() (rounded up to the next power
of two, minimum 4) so writes to different keys rarely contend on the same lock.
Requires Go 1.26+.
go get github.com/mysamimi/dataAggregatorpackage main
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/mysamimi/dataAggregator"
"github.com/rs/zerolog"
)
type Metric struct {
Name string
Count *uint64
}
func main() {
logger := zerolog.New(zerolog.NewConsoleWriter()).With().Timestamp().Logger()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// New[KeyType, ValueType](ctx, flushInterval, channelBufferSize, logger, mergeFunc)
agg := dataAggregator.New[string, Metric](
ctx,
5*time.Second, // flush every 5s
1000, // output channel buffer
&logger,
func(stored, incoming *Metric) {
// Merge incoming into the already-stored record.
atomic.AddUint64(stored.Count, *incoming.Count)
},
)
// Consume flushed records.
go func() {
for m := range agg.ChanPool() {
fmt.Printf("%s = %d\n", m.Name, *m.Count)
}
}()
// Produce events; identical keys are merged in place.
c1, c2 := uint64(5), uint64(10)
agg.Add("api.requests", &Metric{Name: "api.requests", Count: &c1})
agg.Add("api.requests", &Metric{Name: "api.requests", Count: &c2}) // -> 15
// Flush everything and close the channel. A consumer must keep draining.
agg.Shutdown()
}Important
addFunc must be safe for the value type's fields. It is invoked while the
shard holds an internal per-key lock, so calls for the same key are serialized.
Because a stored value's fields may also be read by your consumer after a flush,
the examples use sync/atomic (e.g. atomic.AddUint64) on shared fields — follow
that pattern when a field is touched from more than one place.
Important
A consumer must keep draining ChanPool() for the lifetime of the aggregator,
including during Shutdown(). When the output channel is full, Cleanup
re-inserts records instead of dropping them (the zero-loss guarantee). Shutdown
flushes in a loop until the buffer is empty, so a stalled consumer makes it block
forever.
Note
Do not call Add after Shutdown. Once Shutdown closes the output channel,
a subsequent Cleanup/flush of newly added data would attempt to send on a closed
channel. Stop your writers before shutting down.
Ownership of *T. A pointer handed to Add is owned by the aggregator until it
either (a) is merged into an existing record — Add returns true, and you may reuse
or pool the pointer immediately — or (b) becomes the stored record — Add returns
false, and the aggregator hands it back to you later via ChanPool(). This makes
the type a natural fit for sync.Pool recycling (see TestWithSyncPool_And_Requeue).
func New[P comparable, T any](
ctx context.Context,
cleanupInterval time.Duration,
maxPoolSize int,
logger *zerolog.Logger,
addFunc func(stored, incoming *T),
) *DataAggregator[P, T]| Method | Description |
|---|---|
Add(key P, data *T) bool |
Merge data into the record for key, or store it if new. Returns true when merged into an existing record, false when newly stored. |
ChanPool() chan *T |
The output channel of flushed records. Range over it in a consumer. |
Cleanup() |
Manually flush all current records to the output channel (also runs automatically on the interval). |
Shutdown() |
Stop the background loop, flush everything, and close the output channel. Idempotent; safe to call once writers have stopped. |
GetItem(key P) *T |
Look up the currently buffered record for key, or nil. |
GetItems() map[P]*T |
Snapshot of all currently buffered records. |
GetSize() int |
Total number of buffered records across all shards. |
GetShardCount() uint32 |
Number of shards (diagnostics). |
GetTicker() *time.Ticker |
The flush ticker (diagnostics / interval tuning). |
Type parameters: P is the key (any comparable type); T is the value type,
always handled as *T.
- Sharded maps. State is split across
Nshards, each axsync.Mapguarded by anRWMutex. The mutex protects the map pointer (so a flush can swap it atomically), whilexsync.Mapprovides lock-free per-key updates.Addtakes only a read lock, so writers to different keys proceed in parallel. - Key hashing. Strings use FNV-1a; integer kinds use fast bitwise folding; floats
hash their IEEE-754 bit pattern (with
-0.0canonicalized to+0.0so equal keys share a shard); other comparable types fall back to afmt.Sprint-based hash. The shard index ishash & (N-1), whereNis a power of two. - Swap-and-drain flush.
Cleanupruns in two phases: a short sequential phase that locks each shard only long enough to swap its map for a fresh one, then a parallel phase that drains the detached maps into the channel. Empty shards are skipped entirely — no goroutine is spawned for them. - Overlap protection. The ticker uses an atomic compare-and-swap flag; if a flush is still running when the next tick fires, that tick is skipped (and logged at debug level) instead of piling up concurrent flushes.
- Zero data loss. If the output channel is full while draining, the record is
re-inserted into the live map rather than dropped, and flushed on a later tick or
during
Shutdown.
Run locally:
go test -run=^$ -bench=. -benchmem ./...Indicative results (Apple M3 Max, 16 logical CPUs, Go 1.26) — hardware-dependent:
| Benchmark | Workload | Time/op | Allocs/op |
|---|---|---|---|
BenchmarkParallelAdd |
~10k increments to a single hot key across all cores | ~5.1 ms | ~50k |
BenchmarkParallelAddMultipleKeys |
100 keys × 100 increments per core | ~8.8 ms | ~490k |
Most allocations come from the benchmark itself constructing a fresh *T per Add;
reuse a sync.Pool (as shown in the tests) to drive this toward zero in production.
The suite covers aggregation correctness, automatic and manual cleanup, full-pool
re-queuing, graceful shutdown, sync.Pool reuse, and high-concurrency zero-loss
verification (1,000 writers × 1,000 items). It runs clean under the race detector:
go test -race -count=1 ./...