Skip to content

mysamimi/dataAggregator

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

15 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

dataAggregator

Go Reference Go Version Go Tests License: MIT

A generic, concurrency-safe in-memory aggregation buffer for Go. It accumulates high-throughput events keyed by an identifier, merges them with a user-supplied function, and periodically flushes the merged results to a channel for downstream processing.

It is built for the common "many writers, fewer distinct keys" workload — counters, metrics, rate windows, batch coalescing — where reducing N writes into one record per key before it leaves the process saves significant downstream cost.


Contents


How it works

                 Add(key, *T)                          ChanPool() <-chan *T
 writers  ───────────────────────►  ┌───────────────┐  ──────────────────►  consumer
 (many)                             │  sharded maps  │   periodic flush       (drains
                                    │  shard 0..N-1  │   (cleanupInterval)     merged
                                    └───────────────┘                          records)
  1. Add hashes the key to one of N shards and merges the incoming value into the existing record for that key via your addFunc (or stores it as new).
  2. A background ticker fires every cleanupInterval and runs Cleanup, which atomically detaches each non-empty shard's map and drains the merged records into the output channel.
  3. A consumer ranges over ChanPool() and handles each flushed record.

The number of shards is derived from runtime.NumCPU() (rounded up to the next power of two, minimum 4) so writes to different keys rarely contend on the same lock.


Installation

Requires Go 1.26+.

go get github.com/mysamimi/dataAggregator

Quick start

package main

import (
	"context"
	"fmt"
	"sync/atomic"
	"time"

	"github.com/mysamimi/dataAggregator"
	"github.com/rs/zerolog"
)

type Metric struct {
	Name  string
	Count *uint64
}

func main() {
	logger := zerolog.New(zerolog.NewConsoleWriter()).With().Timestamp().Logger()

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// New[KeyType, ValueType](ctx, flushInterval, channelBufferSize, logger, mergeFunc)
	agg := dataAggregator.New[string, Metric](
		ctx,
		5*time.Second, // flush every 5s
		1000,          // output channel buffer
		&logger,
		func(stored, incoming *Metric) {
			// Merge incoming into the already-stored record.
			atomic.AddUint64(stored.Count, *incoming.Count)
		},
	)

	// Consume flushed records.
	go func() {
		for m := range agg.ChanPool() {
			fmt.Printf("%s = %d\n", m.Name, *m.Count)
		}
	}()

	// Produce events; identical keys are merged in place.
	c1, c2 := uint64(5), uint64(10)
	agg.Add("api.requests", &Metric{Name: "api.requests", Count: &c1})
	agg.Add("api.requests", &Metric{Name: "api.requests", Count: &c2}) // -> 15

	// Flush everything and close the channel. A consumer must keep draining.
	agg.Shutdown()
}

Concurrency contract

Important

addFunc must be safe for the value type's fields. It is invoked while the shard holds an internal per-key lock, so calls for the same key are serialized. Because a stored value's fields may also be read by your consumer after a flush, the examples use sync/atomic (e.g. atomic.AddUint64) on shared fields — follow that pattern when a field is touched from more than one place.

Important

A consumer must keep draining ChanPool() for the lifetime of the aggregator, including during Shutdown(). When the output channel is full, Cleanup re-inserts records instead of dropping them (the zero-loss guarantee). Shutdown flushes in a loop until the buffer is empty, so a stalled consumer makes it block forever.

Note

Do not call Add after Shutdown. Once Shutdown closes the output channel, a subsequent Cleanup/flush of newly added data would attempt to send on a closed channel. Stop your writers before shutting down.

Ownership of *T. A pointer handed to Add is owned by the aggregator until it either (a) is merged into an existing record — Add returns true, and you may reuse or pool the pointer immediately — or (b) becomes the stored record — Add returns false, and the aggregator hands it back to you later via ChanPool(). This makes the type a natural fit for sync.Pool recycling (see TestWithSyncPool_And_Requeue).


API reference

func New[P comparable, T any](
	ctx context.Context,
	cleanupInterval time.Duration,
	maxPoolSize int,
	logger *zerolog.Logger,
	addFunc func(stored, incoming *T),
) *DataAggregator[P, T]
Method Description
Add(key P, data *T) bool Merge data into the record for key, or store it if new. Returns true when merged into an existing record, false when newly stored.
ChanPool() chan *T The output channel of flushed records. Range over it in a consumer.
Cleanup() Manually flush all current records to the output channel (also runs automatically on the interval).
Shutdown() Stop the background loop, flush everything, and close the output channel. Idempotent; safe to call once writers have stopped.
GetItem(key P) *T Look up the currently buffered record for key, or nil.
GetItems() map[P]*T Snapshot of all currently buffered records.
GetSize() int Total number of buffered records across all shards.
GetShardCount() uint32 Number of shards (diagnostics).
GetTicker() *time.Ticker The flush ticker (diagnostics / interval tuning).

Type parameters: P is the key (any comparable type); T is the value type, always handled as *T.


Implementation notes

  • Sharded maps. State is split across N shards, each a xsync.Map guarded by an RWMutex. The mutex protects the map pointer (so a flush can swap it atomically), while xsync.Map provides lock-free per-key updates. Add takes only a read lock, so writers to different keys proceed in parallel.
  • Key hashing. Strings use FNV-1a; integer kinds use fast bitwise folding; floats hash their IEEE-754 bit pattern (with -0.0 canonicalized to +0.0 so equal keys share a shard); other comparable types fall back to a fmt.Sprint-based hash. The shard index is hash & (N-1), where N is a power of two.
  • Swap-and-drain flush. Cleanup runs in two phases: a short sequential phase that locks each shard only long enough to swap its map for a fresh one, then a parallel phase that drains the detached maps into the channel. Empty shards are skipped entirely — no goroutine is spawned for them.
  • Overlap protection. The ticker uses an atomic compare-and-swap flag; if a flush is still running when the next tick fires, that tick is skipped (and logged at debug level) instead of piling up concurrent flushes.
  • Zero data loss. If the output channel is full while draining, the record is re-inserted into the live map rather than dropped, and flushed on a later tick or during Shutdown.

Benchmarks

Run locally:

go test -run=^$ -bench=. -benchmem ./...

Indicative results (Apple M3 Max, 16 logical CPUs, Go 1.26) — hardware-dependent:

Benchmark Workload Time/op Allocs/op
BenchmarkParallelAdd ~10k increments to a single hot key across all cores ~5.1 ms ~50k
BenchmarkParallelAddMultipleKeys 100 keys × 100 increments per core ~8.8 ms ~490k

Most allocations come from the benchmark itself constructing a fresh *T per Add; reuse a sync.Pool (as shown in the tests) to drive this toward zero in production.


Testing

The suite covers aggregation correctness, automatic and manual cleanup, full-pool re-queuing, graceful shutdown, sync.Pool reuse, and high-concurrency zero-loss verification (1,000 writers × 1,000 items). It runs clean under the race detector:

go test -race -count=1 ./...

License

MIT

About

A high-performance, generic-based concurrent data aggregation library for Go. Optimized for extreme throughput with sharded maps, zero data-loss Fallback queuing, and safe sync.Pool memory management.

Topics

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages