前言 这是一篇译文,网上有很多关于这篇文章的翻译,但这并不影响自己想自己翻译这篇文章的的愿望,毕竟自己看重的是自己实践翻译这样一件事情,并且从中得到收获。
原文介绍 原文来自Go101 ,该项目被托管在Github上。
原文链接为How to Gracefully Close Channels ,如果可以,阅读原文是最好的选择。
译文内容 几天前我写了一篇解释Go channel规则 的文章,在reddit 和HN 上这篇文章获得了许多赞同,但是仍然存在很多对Go channel设计细节的批判声。
我收集了一些关于Go channel的以下几点设计以及规则的批判:
在没有设定状态标志的情况下,没有简便的方法去判断一个channel是否已经关闭
调用者不知道channel是否已经关闭的情况下去执行关闭操作是很危险的,因为关闭已关闭的channel会造成panic
调用者不知道channel是否已经关闭的情况下向channel发送数据是很危险的,因为向已关闭的channel发送数据会造成panic
这些批判听起来很合理(事实上并不)。是的,确实没有判断一个channel是否已经被关闭了的内置函数。
如果你可以确定没有数据以后也不会有数据发送至一个channel的话,确实有一个简单的方法去检查channel是否是关闭的。这个方法在上一篇文章 。这里,为了更好的统一,该方法再次被列在了下面的例子中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package mainimport "fmt" type T int func IsClosed (ch <-chan T) bool { select { case <-ch: return true default : } return false } func main () { c := make (chan T) fmt.Println(IsClosed(c)) close (c) fmt.Println(IsClosed(c)) }
正如上面提到的,这并不是判断一个channel是否关闭的通用方法。
实际上,即使有一个简单的内置函数closed
检查channel是否已经被关闭,但是它的作用是非常局限的,就像用于获取当前channel缓冲中的数据个数的内置函数len
一样。原因是在一次内置函数调用刚返回后被校验的channel的状态有可能改变,所以返回值已经不能反应被校验channel的最新状态。虽然当closed(ch)
返回true
时停止向ch
发送数据没问题,但是如果closed(ch)
返回false
,则关闭channel或者继续发送数据到channel是不安全的。
channel关闭准则 使用Go channel的一个基本准则是不要从receiver侧关闭channel,如果channel有多个并发的senders时也不要关闭 。换句话说,我们应该只在只有一个sender时,在sender侧关闭channel。
(下面,我们将上面的准则称之为channel关闭准则 )
当然,这不是一个关闭channel的通用准则。通用准则是不要关闭(或者发送数据到)已经关闭的channel 。如果我们能保证不再有协程关闭或者发送数据到一个没关闭和非空的channel,此时协程便可安全的关闭channel。然而,靠channel的一个receiver或者某一个sender达到这样的保障是需要很大的努力的,并且通常会使代码变得复杂。正相反,坚持上面提到的channel关闭准则 更容易。
粗暴地关闭channel 如果你无论如何要从receiver侧或者多个sender中的一个关闭channel,你可以使用恢复机制 阻止可能的panic让你的程序崩溃。这里有一个例子(假设channel元素类型是T)。
1 2 3 4 5 6 7 8 9 10 11 12 func SafeClose (ch chan T) (justClosed bool ) { defer func () { if recover () != nil { justClosed = false } }() close (ch) return true }
这个方法明显违背了channel关闭准则
同样的想法可以用在向一个有可能已关闭的channel发送数据的情形。
1 2 3 4 5 6 7 8 9 10 func SafeSend (ch chan T, value T) (closed bool ) { defer func () { if recover () != nil { closed = true } }() ch <- value return false }
粗暴的解决方法不仅仅违背了channel关闭准则 ,并且程序有可能出现数据竞争。
礼貌地关闭channel 许多人更喜欢用sync.Once
关闭channel:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 type MyChannel struct { C chan T once sync.Once } func NewMyChannel () *MyChannel { return &MyChannel{C: make (chan T)} } func (mc *MyChannel) SafeClose() { mc.once.Do(func () { close (mc.C) }) }
当然,我们也可以用sync.Mutex
来避免多次关闭channel:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 type MyChannel struct { C chan T closed bool mutex sync.Mutex } func NewMyChannel () *MyChannel { return &MyChannel{C: make (chan T)} } func (mc *MyChannel) SafeClose() { mc.mutex.Lock() defer mc.mutex.Unlock() if !mc.closed { close (mc.C) mc.closed = true } } func (mc *MyChannel) IsClosed() bool { mc.mutex.Lock() defer mc.mutex.Unlock() return mc.closed }
这些方法可能很礼貌,但是它们可能无法避免数据竞争。目前,当channel关闭操作和发送操作同时发生时Go不保证没有数据竞争发生。如果同一个channel的SafeClose
函数和发送操作同时发生,有可能发生数据竞争(虽然这种数据竞争通常不会带来多大伤害)。
优雅地关闭channel 上面的SafeSend
函数的一个缺点是,它的调用不能像在select
语句块case
关键字中的发送操作那样使用。SafeSend
和SafeClose
函数的另一个缺点就是很多人,包括我,可能会觉得上面使用panic
/recover
和sync
包的方法并不优雅。接下来,将会介绍一些针对所有场景,纯净的,不使用panic
/recover
和sync
包的channel使用方法。
(在接下来的例子中,通过使用sync.WaitGroup
让例子变得完整,在实际练习中使用它并不总是必要的。)
1. M个receiver,一个sender,sender通过关闭数据channel来终止数据传输
这是最简单的情形,当sender不再想发送数据时让它关闭channel即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package mainimport ( "time" "math/rand" "sync" "log" ) func main () { rand.Seed(time.Now().UnixNano()) log.SetFlags(0 ) const Max = 100000 const NumReceivers = 100 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) dataCh := make (chan int ) go func () { for { if value := rand.Intn(Max); value == 0 { close (dataCh) return } else { dataCh <- value } } }() for i := 0 ; i < NumReceivers; i++ { go func () { defer wgReceivers.Done() for value := range dataCh { log.Println(value) } }() } wgReceivers.Wait() }
2. 一个receiver,N个sender,receiver通过关闭额外的信号channel来告诉sender停止发送数据
这是一个比之前一个稍微复杂一点的情形,我们不能让receiver关闭数据channel来停止数据传输,因为这样做会违背channel关闭准则 。但是我们可以让receiver关闭一个额外的信号channel来通知所有的sender停止发送数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 package mainimport ( "time" "math/rand" "sync" "log" ) func main () { rand.Seed(time.Now().UnixNano()) log.SetFlags(0 ) const Max = 100000 const NumSenders = 1000 wgReceivers := sync.WaitGroup{} wgReceivers.Add(1 ) dataCh := make (chan int ) stopCh := make (chan struct {}) for i := 0 ; i < NumSenders; i++ { go func () { for { select { case <- stopCh: return default : } select { case <- stopCh: return case dataCh <- rand.Intn(Max): } } }() } go func () { defer wgReceivers.Done() for value := range dataCh { if value == Max-1 { close (stopCh) return } log.Println(value) } }() wgReceivers.Wait() }
正如注释中提到的,信号channel的发送者是数据channel的receiver。信号channel是靠它唯一的sender来关闭的,这坚持了channel关闭准则 。
在这个例子中,dataChan
一直没有被关闭。没错,channel不是必须被关闭,如果一个channel不再有协程引用了,不管是否被关闭,最终都会被垃圾回收器回收。所以这里关闭channel的优雅之处就是不关闭channel。
3. M个receiver,N个sender,它们中的任何一个通过通知中间人来关闭额外的信号channel以停止数据传输
这是最复杂的情形,我们不能让任何一个receiver或者sender关闭数据channel,也不能让任何一个receiver通过关闭信号channel来通知所有sender和所有receiver退出数据传输,任何一种做法都违背了channel关闭准则 。然而,我们可以引进一个可以关闭信号channel的中间者角色,接下来的例子里的技巧是如何利用尝试发送操作的机制来通知中间者关闭信号channel。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 package mainimport ( "time" "math/rand" "sync" "log" "strconv" ) func main () { rand.Seed(time.Now().UnixNano()) log.SetFlags(0 ) const Max = 100000 const NumReceivers = 10 const NumSenders = 1000 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) dataCh := make (chan int ) stopCh := make (chan struct {}) toStop := make (chan string , 1 ) var stoppedBy string go func () { stoppedBy = <-toStop close (stopCh) }() for i := 0 ; i < NumSenders; i++ { go func (id string ) { for { value := rand.Intn(Max) if value == 0 { select { case toStop <- "sender#" + id: default : } return } select { case <- stopCh: return default : } select { case <- stopCh: return case dataCh <- value: } } }(strconv.Itoa(i)) } for i := 0 ; i < NumReceivers; i++ { go func (id string ) { defer wgReceivers.Done() for { select { case <- stopCh: return default : } select { case <- stopCh: return case value := <-dataCh: if value == Max-1 { select { case toStop <- "receiver#" + id: default : } return } log.Println(value) } } }(strconv.Itoa(i)) } wgReceivers.Wait() log.Println("stopped by" , stoppedBy) }
在这个例子中,仍然坚持了channel关闭准则 。
请注意,toStop
channel的缓冲大小是1,这是为了如果在中间者协程准备好从toStop
接收通知之前第一个通知就已经发送过来时避免错过通知。
我们也可以设置toStop
的容量大小为sender和receiver的数量之和,这样我们就不需要带有尝试发送机制的select
语句块的来通知中间人了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ... toStop := make (chan string , NumReceivers + NumSenders) ... value := rand.Intn(Max) if value == 0 { toStop <- "sender#" + id return } ... if value == Max-1 { toStop <- "receiver#" + id return } ...
4. 不一样的”M个receiver,一个sender”情形:由第三方协程发起关闭channel的请求
有时候需要第三方发出关闭信息,对于这样的需求,我们可以利用额外的信号channel去通知sender关闭数据channel。例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 package mainimport ( "time" "math/rand" "sync" "log" ) func main () { rand.Seed(time.Now().UnixNano()) log.SetFlags(0 ) const Max = 100000 const NumReceivers = 100 const NumThirdParties = 15 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) dataCh := make (chan int ) closing := make (chan struct {}) closed := make (chan struct {}) stop := func () { select { case closing<-struct {}{}: <-closed case <-closed: } } for i := 0 ; i < NumThirdParties; i++ { go func () { r := 1 + rand.Intn(3 ) time.Sleep(time.Duration(r) * time.Second) stop() }() } go func () { defer func () { close (closed) close (dataCh) }() for { select { case <-closing: return default : } select { case <-closing: return case dataCh <- rand.Intn(Max): } } }() for i := 0 ; i < NumReceivers; i++ { go func () { defer wgReceivers.Done() for value := range dataCh { log.Println(value) } }() } wgReceivers.Wait() }
stop
函数中应用的想法是学习自Roger Peppe的一条评论评论
5. 不一样的”N个sender”情形:为了告诉receiver数据传输已经结束,必须关闭数据channel
在之前提到的N个sender的情形中,为了坚持channel关闭准则 ,我们避免了关闭数据channel。然而,有时候为了让receiver知道数据传输已经完成,最终需要关闭数据channel。对于这种情形,我们可以通过使用一个中间channel将N个sender的情形转换成一个sender的情形,这个中间channel只有一个sender,所以我们可以通过关闭它来替代关闭原始的数据channel。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 package mainimport ( "time" "math/rand" "sync" "log" "strconv" ) func main () { rand.Seed(time.Now().UnixNano()) log.SetFlags(0 ) const Max = 1000000 const NumReceivers = 10 const NumSenders = 1000 const NumThirdParties = 15 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) dataCh := make (chan int ) middleCh := make (chan int ) closing := make (chan string ) closed := make (chan struct {}) var stoppedBy string stop := func (by string ) { select { case closing <- by: <-closed case <-closed: } } go func () { exit := func (v int , needSend bool ) { close (closed) if needSend { dataCh <- v } close (dataCh) } for { select { case stoppedBy = <-closing: exit(0 , false ) return case v := <- middleCh: select { case stoppedBy = <-closing: exit(v, true ) return case dataCh <- v: } } } }() for i := 0 ; i < NumThirdParties; i++ { go func (id string ) { r := 1 + rand.Intn(3 ) time.Sleep(time.Duration(r) * time.Second) stop("3rd-party#" + id) }(strconv.Itoa(i)) } for i := 0 ; i < NumSenders; i++ { go func (id string ) { for { value := rand.Intn(Max) if value == 0 { stop("sender#" + id) return } select { case <- closed: return default : } select { case <- closed: return case middleCh <- value: } } }(strconv.Itoa(i)) } for range [NumReceivers]struct {}{} { go func () { defer wgReceivers.Done() for value := range dataCh { log.Println(value) } }() } wgReceivers.Wait() log.Println("stopped by" , stoppedBy) }
其他情形? 应该还有更多的变化情形,但在上面出现的情形是最平常、最基础的情形。通过聪明地使用channel(和其他并发编程技术),我相信每个变化的情形都能找到一个坚持channel关闭准则 的解决方案。
疑问 没有情形会强迫你违背channel关闭准则 ,如果你遇到了这样的情形,请重新思考你的设计并重构你的代码。
用Go语言编程就像是在进行艺术创作。