Transform the way you work with Go channels using modern, composable pipelines
Documentation β’ Features β’ Installation β’ Quick Start β’ Examples β’ API Reference
chankit is a powerful Go library that brings functional programming patterns to Go channels. Built with generics for complete type safety, it transforms verbose channel operations into elegant, readable pipelines.
// Traditional Go: Verbose, nested, hard to maintain ch := make(chan int) go func() { for i := 1; i <= 100; i++ { ch <- i } close(ch) }() ch2 := make(chan int) go func() { defer close(ch2) for v := range ch { ch2 <- v * v } }() result := []int{} for v := range ch2 { if v%2 == 0 { result = append(result, v) } }// chankit: Clean, expressive, maintainable result := chankit.RangePipeline(ctx, 1, 101, 1). Map(func(x int) any { return x * x }). Filter(func(x any) bool { return x.(int)%2 == 0 }). ToSlice()| Control the flow of data with powerful timing operators
Transform and process data functionally
| Create channels from various sources
Built for real-world use
|
go get github.com/utkarsh5026/chankitimport "github.com/utkarsh5026/chankit/chankit"package main import ( "context" "fmt" "github.com/utkarsh5026/chankit/chankit" ) func main() { ctx := context.Background() // Create a pipeline: numbers 1-100, square them, keep evens, take first 10 result := chankit.RangePipeline(ctx, 1, 101, 1). Map(func(x int) any { return x * x }). Filter(func(x any) bool { return x.(int)%2 == 0 }). Take(10). ToSlice() fmt.Println(result) // Output: [4 16 36 64 100 144 196 256 324 400] }Scenario: Process a stream of numbers - square them, filter for even results, skip the first 10, and take the next 20.
package main import ( "context" "fmt" "github.com/utkarsh5026/chankit/chankit" ) func main() { ctx := context.Background() // Process numbers 1-100 result := chankit.RangePipeline(ctx, 1, 101, 1). Map(func(x int) any { return x * x }). // Square each number Filter(func(x any) bool { // Keep only even squares return x.(int)%2 == 0 }). Skip(10). // Skip first 10 results Take(20). // Take next 20 ToSlice() // Collect to slice fmt.Printf("Processed %d values: %v\n", len(result), result) }Output:
Processed 20 values: [484 576 676 784 900 1024 1156 1296 1444 1600 1764 1936 2116 2304 2500 2704 2916 3136 3364 3600] Scenario: Build a real-time event processing system that debounces user input, filters events, and batches them for processing.
package main import ( "context" "fmt" "time" "github.com/utkarsh5026/chankit/chankit" ) type Event struct { Type string Timestamp time.Time Data string } func main() { ctx := context.Background() // Simulate event stream events := make(chan Event, 100) go func() { defer close(events) for i := 0; i < 50; i++ { events <- Event{ Type: "click", Timestamp: time.Now(), Data: fmt.Sprintf("event-%d", i), } time.Sleep(10 * time.Millisecond) } }() // Process events: debounce, filter, and batch batches := chankit.From(ctx, events). Debounce(50 * time.Millisecond). // Wait for 50ms silence Filter(func(e Event) bool { // Filter specific events return e.Type == "click" }). Batch(5, 200*time.Millisecond) // Batch 5 or every 200ms // Process batches for batch := range batches { fmt.Printf("Processing batch of %d events\n", len(batch)) } }Scenario: Transform user data by filtering active users, extracting emails, and processing in batches.
package main import ( "context" "fmt" "strings" "time" "github.com/utkarsh5026/chankit/chankit" ) type User struct { ID int Name string Email string Active bool } func main() { ctx := context.Background() // Sample users users := []User{ {1, "Alice", "alice@example.com", true}, {2, "Bob", "bob@example.com", false}, {3, "Charlie", "charlie@example.com", true}, {4, "Diana", "diana@example.com", true}, {5, "Eve", "eve@example.com", false}, } // Transform: active users β emails β uppercase β batch batches := chankit.FromSlice(ctx, users). Filter(func(u User) bool { return u.Active }). // Active users only Map(func(u User) any { return u.Email }). // Extract emails Map(func(e any) any { // Uppercase emails return strings.ToUpper(e.(string)) }). Batch(2, 100*time.Millisecond) // Batch for processing // Process email batches for batch := range batches { fmt.Printf("Email batch: %v\n", batch) } // Output: // Email batch: [ALICE@EXAMPLE.COM CHARLIE@EXAMPLE.COM] // Email batch: [DIANA@EXAMPLE.COM] }Scenario: Process high-frequency sensor readings, debounce them to reduce noise, transform values, and calculate running averages.
package main import ( "context" "fmt" "math/rand" "time" "github.com/utkarsh5026/chankit/chankit" ) type SensorReading struct { SensorID string Value float64 Timestamp time.Time } func main() { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() // Simulate sensor data stream readings := make(chan SensorReading, 100) go func() { defer close(readings) for { select { case <-ctx.Done(): return case readings <- SensorReading{ SensorID: "temp-01", Value: 20 + rand.Float64()*10, Timestamp: time.Now(), }: time.Sleep(10 * time.Millisecond) } } }() // Process: debounce β transform to Celsius β calculate average processed := chankit.From(ctx, readings). Debounce(100 * time.Millisecond). // Reduce noise Map(func(r SensorReading) any { // Transform value return r.Value * 1.5 // Simulate transformation }). Take(10) // Take first 10 // Calculate average sum := 0.0 count := 0 processed.ForEach(func(v any) { sum += v.(float64) count++ fmt.Printf("Reading #%d: %.2f\n", count, v.(float64)) }) if count > 0 { fmt.Printf("\nAverage: %.2f\n", sum/float64(count)) } }Scenario: Implement an efficient search system that waits for the user to stop typing before executing the search.
package main import ( "context" "fmt" "strings" "time" "github.com/utkarsh5026/chankit/chankit" ) func performSearch(query string) []string { // Simulate search results database := []string{"apple", "application", "apply", "banana", "band", "bandana"} results := []string{} for _, item := range database { if strings.Contains(strings.ToLower(item), strings.ToLower(query)) { results = append(results, item) } } return results } func main() { ctx := context.Background() // Simulate user typing userInput := make(chan string, 10) go func() { defer close(userInput) queries := []string{"a", "ap", "app", "appl", "apple"} for _, q := range queries { userInput <- q time.Sleep(50 * time.Millisecond) } }() // Debounce and search results := chankit.From(ctx, userInput). Debounce(200 * time.Millisecond). // Wait for typing to stop Filter(func(q string) bool { return len(q) >= 2 }).// Min 2 chars Map(func(q string) any { // Perform search fmt.Printf("Searching for: %s\n", q) return performSearch(q) }) // Display results results.ForEach(func(r any) { fmt.Printf("Results: %v\n", r) }) }Scenario: Make API calls at a controlled rate to avoid hitting rate limits.
package main import ( "context" "fmt" "time" "github.com/utkarsh5026/chankit/chankit" ) type APIRequest struct { ID int Endpoint string } func callAPI(req APIRequest) string { // Simulate API call return fmt.Sprintf("Response for request %d", req.ID) } func main() { ctx := context.Background() // Create 20 API requests requests := make([]APIRequest, 20) for i := 0; i < 20; i++ { requests[i] = APIRequest{ ID: i + 1, Endpoint: "/api/data", } } // Process at most 10 requests per second (100ms interval) fmt.Println("Making rate-limited API calls...") start := time.Now() chankit.FromSlice(ctx, requests). FixedInterval(100 * time.Millisecond). // 10 per second max Tap(func(req APIRequest) { // Log progress fmt.Printf("[%s] Processing request %d\n", time.Since(start).Round(time.Millisecond), req.ID) }). Map(func(req APIRequest) any { // Make API call return callAPI(req) }). ForEach(func(resp any) { // Handle responses // Process response }) fmt.Printf("Completed in %s\n", time.Since(start).Round(time.Millisecond)) }| Method | Description | Example |
|---|---|---|
RangePipeline(ctx, start, end, step) | Create from numeric range | RangePipeline(ctx, 1, 100, 1) |
FromSlice(ctx, slice) | Create from slice | FromSlice(ctx, []int{1,2,3}) |
From(ctx, channel) | Create from existing channel | From(ctx, myChan) |
NewPipeline[T](ctx) | Create empty pipeline | NewPipeline[int](ctx) |
| Method | Description | Example |
|---|---|---|
Map(fn) | Transform each value | .Map(func(x int) any { return x * 2 }) |
MapTo[R](fn) | Type-safe map | MapTo(p, func(x int) string { ... }) |
Filter(fn) | Keep matching values | .Filter(func(x int) bool { return x > 10 }) |
FlatMap(fn) | Transform and flatten | .FlatMap(func(x int) <-chan int { ... }) |
| Method | Description | Example |
|---|---|---|
Take(n) | Take first N values | .Take(10) |
Skip(n) | Skip first N values | .Skip(5) |
TakeWhile(fn) | Take while predicate true | .TakeWhile(func(x int) bool { return x < 100 }) |
SkipWhile(fn) | Skip while predicate true | .SkipWhile(func(x int) bool { return x < 0 }) |
First() | Get first value | value, ok := pipeline.First() |
Last() | Get last value | value, ok := pipeline.Last() |
| Method | Description | Use Case | Example |
|---|---|---|---|
Throttle(duration) | Emit at fixed intervals, drop extras | UI updates, high-frequency events | .Throttle(100*time.Millisecond) |
Debounce(duration) | Wait for silence before emitting | Search boxes, form validation | .Debounce(300*time.Millisecond) |
FixedInterval(duration) | Pace values without dropping | Rate-limited API calls | .FixedInterval(100*time.Millisecond) |
Batch(size, timeout) | Group into batches | Bulk database inserts | .Batch(100, 5*time.Second) |
| Method | Description | Example |
|---|---|---|
Merge(channels...) | Combine multiple channels | p1.Merge(p2.Chan(), p3.Chan()) |
ZipWith(other) | Pair values from two channels | ZipWith(p1, p2.Chan()) |
| Method | Description | Blocking | Example |
|---|---|---|---|
ToSlice() | Collect all values | Yes | result := pipeline.ToSlice() |
Reduce(fn, initial) | Aggregate values | Yes | .Reduce(func(a,b int) int { return a+b }, 0) |
Count() | Count values | Yes | count := pipeline.Count() |
ForEach(fn) | Execute for each | Yes | .ForEach(func(x int) { fmt.Println(x) }) |
Any(fn) | Check if any match | Yes | .Any(func(x int) bool { return x > 10 }) |
All(fn) | Check if all match | Yes | .All(func(x int) bool { return x > 0 }) |
Chan() | Get underlying channel | No | ch := pipeline.Chan() |
| Method | Description | Example |
|---|---|---|
Tap(fn) | Observe without modifying | .Tap(func(x int) { log.Println(x) }) |
// Multi-stage data processing with logging result := chankit.RangePipeline(ctx, 1, 1000, 1). Tap(func(x int) { log.Printf("Input: %d", x) }). Filter(func(x int) bool { return x%2 == 0 }). Tap(func(x int) { log.Printf("After filter: %d", x) }). Map(func(x int) any { return x * x }). Skip(10). Take(20). Tap(func(x any) { log.Printf("Final: %v", x) }). ToSlice()// Process data through multiple parallel pipelines ctx := context.Background() source := chankit.RangePipeline(ctx, 1, 100, 1) // Fan-out: multiple processing pipelines evens := source.Filter(func(x int) bool { return x%2 == 0 }) odds := source.Filter(func(x int) bool { return x%2 != 0 }) // Fan-in: merge results merged := evens.Merge(odds.Chan()) result := merged.ToSlice()// Different processing based on value result := chankit.FromSlice(ctx, data). Map(func(x int) any { if x > 100 { return x * 2 } else if x > 50 { return x * 1.5 } return x }). ToSlice()All operations respect context cancellation for graceful shutdowns:
// Timeout after 5 seconds ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() result := chankit.RangePipeline(ctx, 1, 1000000, 1). Map(func(x int) any { return expensiveOperation(x) }). ToSlice() // Stops after 5 seconds // Manual cancellation ctx, cancel := context.WithCancel(context.Background()) go func() { time.Sleep(1 * time.Second) cancel() // Stop all operations }() pipeline := chankit.From(ctx, channel). Map(func(x int) any { return process(x) })Control channel buffer sizes for better performance:
// Custom buffer size for high-throughput ch := chankit.Map(ctx, input, fn, chankit.WithBuffer[int](100)) // Auto-sized buffer for SliceToChan ch := chankit.SliceToChan(ctx, slice, chankit.WithBufferAuto[int]())Optimize memory when collecting:
// Pre-allocate if you know the size slice := chankit.ChanToSlice(ctx, ch, chankit.WithCapacity[int](1000))- Use buffered channels for high-throughput scenarios
- Pre-allocate slices when you know the expected size
- Choose the right flow control:
Throttle- Drop values for rate limitingDebounce- Wait for activity to stopFixedInterval- Preserve all valuesBatch- Group for bulk operations
# Run all tests go test ./... # Run with coverage go test -cover ./... # Run specific test go test -v -run TestThrottle ./chankit # Benchmark go test -bench=. ./chankit // Verbose, error-prone ch := make(chan int) go func() { defer close(ch) for i := 0; i < 10; i++ { ch <- i * 2 } }() result := []int{} for v := range ch { if v%4 == 0 { result = append(result, v) } }Issues:
| // Clean, declarative result := chankit.RangePipeline(ctx, 0, 10, 1). Map(func(x int) any { return x * 2 }). Filter(func(x any) bool { return x.(int)%4 == 0 }). ToSlice()Advantages:
|
- Higher-level abstractions - Map, Filter, Reduce for functional programming
- Flow control utilities - Throttle, Debounce, Batch for common patterns
- Type-safe generics - Compile-time type checking
- Composability - Easy chaining for complex pipelines
- Production-tested - Comprehensive test coverage
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
MIT License - see LICENSE file for details.