Skip to content

Commit e4e5184

Browse files
committed
chann: close unbounded channel gracefully
1 parent 809a73f commit e4e5184

File tree

3 files changed

+287
-40
lines changed

3 files changed

+287
-40
lines changed

README.md

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
a unified representation of buffered, unbuffered, and unbounded channels in Go
44

5-
```
5+
```go
66
import "golang.design/x/chann"
77
```
88

@@ -12,22 +12,28 @@ This package requires Go 1.18.
1212

1313
Different types of channels:
1414

15-
```
16-
ch := chann.New[int]() // unbounded, capacity unlimited
17-
ch := chann.New[func()](chann.Cap(0)) // unbufferd, capacity 0
18-
ch := chann.New[map[int]float64](chann.Cap(100)) // buffered, capacity 100
15+
```go
16+
ch := chann.New[int]() // unbounded, capacity unlimited
17+
ch := chann.New[func()](chann.Cap(0)) // unbufferd, capacity 0
18+
ch := chann.New[string](chann.Cap(100)) // buffered, capacity 100
1919
```
2020

2121
Send and receive operations:
2222

23-
```
23+
```go
2424
ch.In() <- 42
25-
<-ch.Out() // 42
25+
println(<-ch.Out()) // 42
2626
```
2727

28-
Channel properties:
28+
Close operation:
2929

30+
```go
31+
ch.Close()
3032
```
33+
34+
Channel properties:
35+
36+
```go
3137
ch.ApproxLen() // an (approx. of) length of the channel
3238
ch.Cap() // the capacity of the channel
3339
```

chann.go

Lines changed: 62 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,12 @@
77
// Package chann provides a unified representation of buffered,
88
// unbuffered, and unbounded channels in Go.
99
//
10-
// The package is compatible with existing buffered and unbuffered channels.
11-
// For example, in Go, to create a buffered or unbuffered channel, one
12-
// uses built-in function `make` to create a channel:
10+
// The package is compatible with existing buffered and unbuffered
11+
// channels. For example, in Go, to create a buffered or unbuffered
12+
// channel, one uses built-in function `make` to create a channel:
1313
//
14-
// ch := make(chan int) // unbuffered channel
15-
// // or
16-
// ch := make(chan int, 42) // buffered channel
14+
// ch := make(chan int) // unbuffered channel
15+
// ch := make(chan int, 42) // or buffered channel
1716
//
1817
// However, all these channels have a finite capacity for caching, and
1918
// it is impossible to create a channel with unlimited capacity, namely,
@@ -22,20 +21,21 @@
2221
// This package provides the ability to create all possible types of
2322
// channels. To create an unbuffered or a buffered channel:
2423
//
25-
// ch := chann.New[int](chann.Cap(0)) // unbuffered channel
26-
// // or
27-
// ch := chann.New[int](chann.Cap(42)) // buffered channel
24+
// ch := chann.New[int](chann.Cap(0)) // unbuffered channel
25+
// ch := chann.New[int](chann.Cap(42)) // or buffered channel
2826
//
2927
// More importantly, when the capacity of the channel is unspecified,
3028
// or provided as negative values, the created channel is an unbounded
3129
// channel:
3230
//
33-
// ch := chann.New[int]() // unbounded channel
34-
// // or
35-
// ch := chann.New[int](chann.Cap(-42)) // unbounded channel
31+
// ch := chann.New[int]() // unbounded channel
32+
// ch := chann.New[int](chann.Cap(-42)) // or unbounded channel
3633
//
3734
// Furthermore, all channels provides methods to send (In()),
3835
// receive (Out()), and close (Close()).
36+
//
37+
// Note that to close a channel, must use Close() method instead of the
38+
// language built-in method
3939
// Two additional methods: ApproxLen and Cap returns the current status
4040
// of the channel: an approximation of the current length of the channel,
4141
// as well as the current capacity of the channel.
@@ -45,7 +45,9 @@
4545
// with this package.
4646
package chann
4747

48-
import "sync/atomic"
48+
import (
49+
"sync/atomic"
50+
)
4951

5052
// Opt represents an option to configure the created channel. The current possible
5153
// option is Cap.
@@ -78,6 +80,7 @@ func Cap(n int) Opt {
7880
// one, and use Cap to configure the capacity of the channel.
7981
type Chann[T any] struct {
8082
in, out chan T
83+
close chan struct{}
8184
cfg *config
8285
}
8386

@@ -121,58 +124,85 @@ func New[T any](opts ...Opt) *Chann[T] {
121124
case unbounded:
122125
ch.in = make(chan T, 16)
123126
ch.out = make(chan T, 16)
127+
ch.close = make(chan struct{})
124128
ready := make(chan struct{})
129+
var nilT T
130+
125131
go func() {
126132
q := make([]T, 0, 1<<10)
127133
ready <- struct{}{}
128134
for {
129-
e, ok := <-ch.in
130-
if !ok {
131-
close(ch.out)
132-
return
135+
select {
136+
case e, ok := <-ch.in:
137+
if !ok {
138+
panic("chann: send-only channel ch.In() closed unexpectedly")
139+
}
140+
atomic.AddInt64(&ch.cfg.len, 1)
141+
q = append(q, e)
142+
case <-ch.close:
143+
goto closed
133144
}
134-
atomic.AddInt64(&ch.cfg.len, 1)
135-
q = append(q, e)
136145

137146
for len(q) > 0 {
138147
select {
139148
case ch.out <- q[0]:
140149
atomic.AddInt64(&ch.cfg.len, -1)
150+
q[0] = nilT
141151
q = q[1:]
142152
case e, ok := <-ch.in:
143-
if ok {
144-
atomic.AddInt64(&ch.cfg.len, 1)
145-
q = append(q, e)
146-
break
153+
if !ok {
154+
panic("chann: send-only channel ch.In() closed unexpectedly")
147155
}
148-
for _, e := range q {
149-
atomic.AddInt64(&ch.cfg.len, -1)
150-
ch.out <- e
151-
}
152-
close(ch.out)
153-
return
156+
atomic.AddInt64(&ch.cfg.len, 1)
157+
q = append(q, e)
158+
case <-ch.close:
159+
goto closed
154160
}
155161
}
156162
if cap(q) < 1<<5 {
157163
q = make([]T, 0, 1<<10)
158164
}
159165
}
166+
167+
closed:
168+
close(ch.in)
169+
for e := range ch.in {
170+
q = append(q, e)
171+
}
172+
for len(q) > 0 {
173+
select {
174+
case ch.out <- q[0]:
175+
q[0] = nilT // de-reference earlier to help GC
176+
q = q[1:]
177+
default:
178+
}
179+
}
180+
close(ch.out)
181+
close(ch.close)
160182
}()
161183
<-ready
162184
}
163185
return ch
164186
}
165187

166188
// In returns the send channel of the given Chann, which can be used to
167-
// send values to the channel.
189+
// send values to the channel. If one closes the channel using close(),
190+
// it will result in a runtime panic. Instead, use Close() method.
168191
func (ch *Chann[T]) In() chan<- T { return ch.in }
169192

170193
// Out returns the receive channel of the given Chann, which can be used
171194
// to receive values from the channel.
172195
func (ch *Chann[T]) Out() <-chan T { return ch.out }
173196

174-
// Close closesa the channel.
175-
func (ch *Chann[T]) Close() { close(ch.in) }
197+
// Close closes the channel gracefully.
198+
func (ch *Chann[T]) Close() {
199+
switch ch.cfg.typ {
200+
case buffered, unbuffered:
201+
close(ch.in)
202+
default:
203+
ch.close <- struct{}{}
204+
}
205+
}
176206

177207
// ApproxLen returns an approximation of the length of the channel.
178208
//

0 commit comments

Comments
 (0)