2022-07-05 22:50:00

go的channel咋用

好东西,有空翻译 1. 讲channel概念的go101-channel 2. 讲channel关闭的go101-channel-closing

One suggestion (made by Rob Pike) for concurrent programming is don’t (let computations) communicate by sharing memory, (let them) share memory by communicating (through channels). (We can view each computation as a goroutine in Go programming.)

channel closing principle

  • don’t close a channel from the receiver side and don’t close a channel if the channel has multiple concurrent senders
  • don’t close (or send values to) closed channels

优雅关闭例子

例子一

M receivers, one sender, the sender says “no more sends” by closing the data channel

This is the simplest situation, just let the sender close the data channel when it doesn’t want to send more.
一个发送者,多个接收者,由发送者关闭channel,接收者中for range ch能感知channel关闭而退出。

package main import ( "time" "math/rand" "sync" "log" ) func main() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const Max = 100000 const NumReceivers = 100 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) // ... dataCh := make(chan int) // the sender go func() { for { if value := rand.Intn(Max); value == 0 { // The only sender can close the // channel at any time safely. close(dataCh) return } else { dataCh <- value } } }() // receivers for i := 0; i < NumReceivers; i++ { go func() { defer wgReceivers.Done() // Receive values until dataCh is // closed and the value buffer queue // of dataCh becomes empty. for value := range dataCh { log.Println(value) } }() } wgReceivers.Wait() }

例子二

One receiver, N senders, the only receiver says “please stop sending more” by closing an additional signal channel

This is a situation a little more complicated than the above one. We can’t let the receiver close the data channel to stop data transferring, for doing this will break the channel closing principle. But we can let the receiver close an additional signal channel to notify senders to stop sending values.

一个接收者,多个发送者,可以另外启动一个stop channel,来让接收者控制,如果接收者决定停止,那就close stop channel,在发送者那通过select监听stop channel的关闭

package main import ( "time" "math/rand" "sync" "log" ) func main() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const Max = 100000 const NumSenders = 1000 wgReceivers := sync.WaitGroup{} wgReceivers.Add(1) // ... dataCh := make(chan int) stopCh := make(chan struct{}) // stopCh is an additional signal channel. // Its sender is the receiver of channel // dataCh, and its receivers are the // senders of channel dataCh. // senders for i := 0; i < NumSenders; i++ { go func() { for { // The try-receive operation is to try // to exit the goroutine as early as // possible. For this specified example, // it is not essential. select { case <- stopCh: return default: } // Even if stopCh is closed, the first // branch in the second select may be // still not selected for some loops if // the send to dataCh is also unblocked. // But this is acceptable for this // example, so the first select block // above can be omitted. select { case <- stopCh: return case dataCh <- rand.Intn(Max): } } }() } // the receiver go func() { defer wgReceivers.Done() for value := range dataCh { if value == Max-1 { // The receiver of channel dataCh is // also the sender of stopCh. It is // safe to close the stop channel here. close(stopCh) return } log.Println(value) } }() // ... wgReceivers.Wait() }

In this example, the channel dataCh is never closed. Yes, channels don’t have to be closed. A channel will be eventually garbage collected if no goroutines reference it any more, whether it is closed or not. So the gracefulness of closing a channel here is not to close the channel.

例子三

M receivers, N senders, any one of them says “let’s end the game” by notifying a moderator to close an additional signal channel

This is a the most complicated situation. We can’t let any of the receivers and the senders close the data channel. And we can’t let any of the receivers close an additional signal channel to notify all senders and receivers to exit the game. Doing either will break the channel closing principle. However, we can introduce a moderator role to close the additional signal channel. One trick in the following example is how to use a try-send operation to notify the moderator to close the additional signal channel.

package main import ( "time" "math/rand" "sync" "log" "strconv" ) func main() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const Max = 100000 const NumReceivers = 10 const NumSenders = 1000 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) // ... dataCh := make(chan int) stopCh := make(chan struct{}) // stopCh is an additional signal channel. // Its sender is the moderator goroutine shown // below, and its receivers are all senders // and receivers of dataCh. toStop := make(chan string, 1) // The channel toStop is used to notify the // moderator to close the additional signal // channel (stopCh). Its senders are any senders // and receivers of dataCh, and its receiver is // the moderator goroutine shown below. // It must be a buffered channel. var stoppedBy string // moderator go func() { stoppedBy = <-toStop close(stopCh) }() // senders for i := 0; i < NumSenders; i++ { go func(id string) { for { value := rand.Intn(Max) if value == 0 { // Here, the try-send operation is // to notify the moderator to close // the additional signal channel. select { case toStop <- "sender#" + id: default: } return } // The try-receive operation here is to // try to exit the sender goroutine as // early as possible. Try-receive and // try-send select blocks are specially // optimized by the standard Go // compiler, so they are very efficient. select { case <- stopCh: return default: } // Even if stopCh is closed, the first // branch in this select block might be // still not selected for some loops // (and for ever in theory) if the send // to dataCh is also non-blocking. If // this is unacceptable, then the above // try-receive operation is essential. select { case <- stopCh: return case dataCh <- value: } } }(strconv.Itoa(i)) } // receivers for i := 0; i < NumReceivers; i++ { go func(id string) { defer wgReceivers.Done() for { // Same as the sender goroutine, the // try-receive operation here is to // try to exit the receiver goroutine // as early as possible. select { case <- stopCh: return default: } // Even if stopCh is closed, the first // branch in this select block might be // still not selected for some loops // (and forever in theory) if the receive // from dataCh is also non-blocking. If // this is not acceptable, then the above // try-receive operation is essential. select { case <- stopCh: return case value := <-dataCh: if value == Max-1 { // Here, the same trick is // used to notify the moderator // to close the additional // signal channel. select { case toStop <- "receiver#" + id: default: } return } log.Println(value) } } }(strconv.Itoa(i)) } // ... wgReceivers.Wait() log.Println("stopped by", stoppedBy) }

例子四

A variant of the “M receivers, one sender” situation: the close request is made by a third-party goroutine

Sometimes, it is needed that the close signal must be made by a third-party goroutine. For such cases, we can use an extra signal channel to notify the sender to close the data channel. For example,

一个发送者,多接收者,由第三方G关闭

package main import ( "time" "math/rand" "sync" "log" ) func main() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const Max = 100000 const NumReceivers = 100 const NumThirdParties = 15 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) // ... dataCh := make(chan int) closing := make(chan struct{}) // signal channel closed := make(chan struct{}) // The stop function can be called // multiple times safely. stop := func() { select { case closing<-struct{}{}: <-closed case <-closed: } } // some third-party goroutines for i := 0; i < NumThirdParties; i++ { go func() { r := 1 + rand.Intn(3) time.Sleep(time.Duration(r) * time.Second) stop() }() } // the sender go func() { defer func() { close(closed) close(dataCh) }() for { select{ case <-closing: return default: } select{ case <-closing: return case dataCh <- rand.Intn(Max): } } }() // receivers for i := 0; i < NumReceivers; i++ { go func() { defer wgReceivers.Done() for value := range dataCh { log.Println(value) } }() } wgReceivers.Wait() }

例子六

A variant of the “N sender” situation: the data channel must be closed to tell receivers that data sending is over

In the solutions for the above N-sender situations, to hold the channel closing principle, we avoid closing the data channels. However, sometimes, it is required that the data channels must be closed in the end to let receivers know data sending is over. For such cases, we can translate a N-sender situation to a one-sender situation by using a middle channel. The middle channel has only one sender, so that we can close it instead of closing the original data channel.

package main import ( "time" "math/rand" "sync" "log" "strconv" ) func main() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const Max = 1000000 const NumReceivers = 10 const NumSenders = 1000 const NumThirdParties = 15 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) // ... dataCh := make(chan int) // will be closed middleCh := make(chan int) // will never be closed closing := make(chan string) // signal channel closed := make(chan struct{}) var stoppedBy string // The stop function can be called // multiple times safely. stop := func(by string) { select { case closing <- by: <-closed case <-closed: } } // the middle layer go func() { exit := func(v int, needSend bool) { close(closed) if needSend { dataCh <- v } close(dataCh) } for { select { case stoppedBy = <-closing: exit(0, false) return case v := <- middleCh: select { case stoppedBy = <-closing: exit(v, true) return case dataCh <- v: } } } }() // some third-party goroutines for i := 0; i < NumThirdParties; i++ { go func(id string) { r := 1 + rand.Intn(3) time.Sleep(time.Duration(r) * time.Second) stop("3rd-party#" + id) }(strconv.Itoa(i)) } // senders for i := 0; i < NumSenders; i++ { go func(id string) { for { value := rand.Intn(Max) if value == 0 { stop("sender#" + id) return } select { case <- closed: return default: } select { case <- closed: return case middleCh <- value: } } }(strconv.Itoa(i)) } // receivers for range [NumReceivers]struct{}{} { go func() { defer wgReceivers.Done() for value := range dataCh { log.Println(value) } }() } // ... wgReceivers.Wait() log.Println("stopped by", stoppedBy) }

结语

There are no situations which will force you to break the channel closing principle. If you encounter such a situation, please rethink your design and rewrite you code.

Programming with Go channels is like making art.

逃不出以上几条,如果有,那就是你菜!

本文链接:https://troy.wang/post/go-channel.html

-- EOF --