Rill is a lightweight toolkit that brings composable concurrency to Go, making it easier to build concurrent programs from simple, reusable parts. It reduces boilerplate while preserving Go's natural channel-based model and backpressure behavior.
简体中文: Rill 是一个轻量级的 Go 并发编程工具包,使您能够从简单、可复用的组件构建并发程序。它在保留 Go 原生通道模型和背压行为的同时,减少了样板代码。
- Features
- Go Version Requirements
- Installation
- Quick Start
- Usage Examples
- Core Concepts
- API Reference
- Contributing
- License
| English | 中文 | Description |
|---|---|---|
| Make common tasks easier | 简化常见任务 | Provides a cleaner and safer way of solving common concurrency problems |
| Composable and clean | 可组合且简洁 | Functions take Go channels as inputs and return new, transformed channels as outputs |
| Centralized error handling | 集中式错误处理 | Errors are automatically propagated through a pipeline |
| Stream processing | 流处理 | Handle potentially infinite streams, processing items as they arrive |
| Advanced operations | 高级操作 | Batching, ordered fan-in, map-reduce, stream splitting, merging, and more |
| Custom extensions | 自定义扩展 | Easy to write custom functions compatible with the library |
| Lightweight | 轻量级 | Small, type-safe, channel-based API with zero external dependencies |
| Order preservation | 顺序保持 | Choose between unordered (faster) or ordered (consistent) processing |
| Requirement | Version | Reason |
|---|---|---|
| Minimum | Go 1.18 | Uses generics (any type) |
| Recommended | Go 1.21+ | Better performance and tooling |
| Declared | Go 1.20 | As specified in go.mod |
Note: This library uses Go generics extensively. Go 1.17 and earlier versions are not supported.
简体中文: 需要 Go 1.18 或更高版本(使用泛型特性),推荐使用 Go 1.21+。
go get -u github.com/bycall/rill-goHere's a practical example demonstrating a concurrent data processing pipeline:
这是一个并发数据处理管道的实际示例:
package main
import (
"fmt"
"time"
"github.com/bycall/rill-go"
)
func main() {
// Convert a slice to a stream (data source)
ids := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
// Process each item concurrently with 3 workers
processed := rill.Map(ids, 3, func(id int) (int, error) {
time.Sleep(10 * time.Millisecond)
return id * 2, nil
})
// Filter results - keep only numbers greater than 10
filtered := rill.Filter(processed, 2, func(n int) (bool, error) {
return n > 10, nil
})
// Consume and print results with 2 workers
err := rill.ForEach(filtered, 2, func(n int) error {
fmt.Printf("Processed: %d\n", n)
return nil
})
fmt.Println("Error:", err)
}Processing items in batches can significantly improve performance when working with external services or databases:
import "time"
ids := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
// Batch by size 3 with 100ms timeout
batches := rill.Batch(ids, 3, 100*time.Millisecond)
err := rill.ForEach(batches, 1, func(batch []int) error {
fmt.Printf("Processing batch: %v\n", batch)
return nil
})numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
evens := rill.Filter(numbers, 2, func(n int) (bool, error) {
return n%2 == 0, nil
})Use ordered versions when you need to maintain input order:
ids := rill.FromSlice([]int{1, 2, 3, 4, 5}, nil)
processed := rill.OrderedMap(ids, 3, func(id int) (int, error) {
return id * 2, nil
})Errors are automatically propagated through the pipeline:
stream := rill.FromSlice([]int{1, 2, 3, 4, 5}, nil)
processed := rill.Map(stream, 2, func(n int) (int, error) {
if n == 3 {
return 0, fmt.Errorf("error at %d", n)
}
return n * 2, nil
})
err := rill.Err(processed)In Rill, a stream is a channel of Try[A] containers. A Try[A] container holds either a value of type A or an error:
type Try[A any] struct {
Value A
Error error
}- Non-blocking functions (
Map,Filter,Batch) take a stream as input and return a new stream immediately - Blocking functions (
ForEach,Reduce,ToSlice) consume a stream and return a final result or error
Most functions accept a n parameter that controls the number of concurrent workers:
// 3 concurrent workers
result := rill.Map(input, 3, mapperFunc)FromSlice(slice []A, err error) Stream[A]- Convert a slice to a streamFromChan(values <-chan A, err error) Stream[A]- Convert a channel to a streamFromChans(values <-chan A, errs <-chan error) Stream[A]- Convert two channels to a streamGenerate(f func(send func(A), sendErr func(error))) Stream[A]- Create a stream from a generator
Map(in Stream[A], n int, f func(A) (B, error)) Stream[B]- Transform each itemOrderedMap(in Stream[A], n int, f func(A) (B, error)) Stream[B]- Transform preserving orderFilter(in Stream[A], n int, f func(A) (bool, error)) Stream[A]- Filter itemsOrderedFilter(in Stream[A], n int, f func(A) (bool, error)) Stream[A]- Filter preserving orderFilterMap(in Stream[A], n int, f func(A) (B, bool, error)) Stream[B]- Filter and transformFlatMap(in Stream[A], n int, f func(A) Stream[B]) Stream[B]- Transform and flattenOrderedFlatMap(in Stream[A], n int, f func(A) Stream[B]) Stream[B]- FlatMap preserving orderCatch(in Stream[A], n int, f func(error) error) Stream[A]- Catch and handle errorsOrderedCatch(in Stream[A], n int, f func(error) error) Stream[A]- Catch errors preserving order
ForEach(in Stream[A], n int, f func(A) error) error- Apply a function to each itemErr(in Stream[A]) error- Return the first errorFirst(in Stream[A]) (value A, found bool, err error)- Get the first itemAny(in Stream[A], n int, f func(A) (bool, error)) (bool, error)- Check if any item satisfies the predicateAll(in Stream[A], n int, f func(A) (bool, error)) (bool, error)- Check if all items satisfy the predicateCount(in Stream[A]) (int, error)- Count itemsToSlice(in Stream[A]) ([]A, error)- Collect items into a slice
Batch(in Stream[A], size int, timeout time.Duration) Stream[[]A]- Group items into batchesUnbatch(in Stream[[]A]) Stream[A]- Flatten batches into individual items
Merge(streams ...Stream[A]) Stream[A]- Merge multiple streamsConcat(streams ...Stream[A]) Stream[A]- Concatenate multiple streamsZip(in1 Stream[A], in2 Stream[B]) Stream[[2]any]- Zip two streams together
Take(in Stream[A], n int) Stream[A]- Take first n itemsSkip(in Stream[A], n int) Stream[A]- Skip first n itemsDistinct(in Stream[A]) Stream[A]- Remove duplicatesDistinctBy(in Stream[A], keyFunc func(A) K) Stream[A]- Remove duplicates by keyRepeat(value A, count int) Stream[A]- Repeat a valueRange(start, end int) Stream[int]- Create a range streamTap(in Stream[A], f func(A)) Stream[A]- Apply a function for side effects
Wrap(value A, err error) Try[A]- Wrap a value and errorToChans(in Stream[A]) (<-chan A, <-chan error)- Split a stream into channelsDiscard(in Stream[A])- Discard remaining items
Contributions are welcome! Before submitting a pull request, please consider:
- Focus on generic, widely applicable solutions
- Keep the API surface clean and focused
- Avoid adding external dependencies
- Add tests for any new features
- For major changes, please open an issue first to discuss the approach
This library is inspired by and builds upon the excellent work in destel/rill. Special thanks to the original author!
MIT License - see LICENSE file for details.