更新于 

golang中的Channel

定义

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

关闭有缓冲数据的 channel, 还能读取吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {

c := make(chan int, 10)
c <- 1
c <- 2
c <- 3
close(c)

for {
data, ok := <-c
if !ok {
return
}
/*
输出:
1 true
2 true
3 true
*/
fmt.Println(data, ok)
}
}

答案:由此可见,有缓存通道在通道被关闭时,还是可以正常读取的

那如果是无缓冲通道会怎样?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 func main() {
c := make(chan int)
c <- 1
c <- 2
c <- 3
close(c)
for {
data, ok := <-c
if !ok {
return
}
/*
输出:fatal error: all goroutines are asleep - deadlock!
*/
fmt.Println(data, ok)
}
}

答案:无缓冲通道会发生死锁(deadlock),是因为无缓冲通道在进行写操作时需要有对应的读操作来处理数据。如果没有对通道的读操作,写操作在执行时会阻塞,直到有一个读操作准备好接收数据。因此,当你尝试执行 c <- 1 时,程序会一直等待,此时1并没有被写入到通道 c 中,直到有一个 goroutine 从通道中读取数据。如果没有 goroutine 进行读取,程序将会阻塞在写入操作上,导致死锁并引发 panic。

如何判断channel已经关闭了 ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func main() {
c := make(chan int, 10)
c <- 1
c <- 2
c <- 3
close(c) // 此时关闭通道是否意味着ok == false?(不是),需区分情况:有缓冲通道会在所有通道数据都读取完,ok == false,无缓冲通道会阻塞,产生panic
for {
select {
case val, ok := <-c:
if !ok {
fmt.Println("close")
return
}
fmt.Println(ok, val)
default:
}
}
/*
输出:
true 1
true 2
true 3
close
*/
}

channel 的零值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
var c chan int
fmt.Println(c)
for {
select {
case <-c: // select 永远不会走到 case
fmt.Println("fuck")
default:
time.Sleep(time.Second)
fmt.Println(time.Now().Unix())
}
}
/*
输出:
<nil>
1744253527
...
*/
}

nil 通道的特性:当 c 是 nil 时,任何尝试从 c 中接收数据的操作(如 case <-c:)都会阻塞,因为没有任何 goroutine 可以向 nil 通道发送数据。由于 case <-c 永远无法成功执行,select 语句不会进入这个分支。执行 default 分支:由于 case <-c 永远不会成功,select 语句会直接跳到 default 分支。在 default 分支中,程序会执行 time.Sleep(time.Second),然后打印当前的 Unix 时间戳。

如何优雅的关闭channel

单个生产者,多个消费者(由生产者关闭通道)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
const MaxRandomNumber = 20
const NumReceivers = 5
wg := sync.WaitGroup{}
wg.Add(NumReceivers)
dataCh := make(chan int, 10)
// 一个生产者
go func() {
for {
if value := rand.Intn(MaxRandomNumber); value == 0 {
close(dataCh) // 关闭通道
return
} else {
dataCh <- value
}
}
}()

// 多个消费者
for i := 0; i < NumReceivers; i++ {
go func() {
defer wg.Done()
for value := range dataCh {
log.Println(value)
}
}()
}
wg.Wait()
}

多个生产者,单个消费者(由消费者关闭通道)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)

const MaxRandomNumber = 20
const NumSenders = 10

dataCh := make(chan int, 10)
stopCh := make(chan struct{})

// 多个生产者
for i := 0; i < NumSenders; i++ {
go func() {
for {
// 生产数据
select {
case <-stopCh:
return
case dataCh <- rand.Intn(MaxRandomNumber):
}
}
}()
}
wg := sync.WaitGroup{}
wg.Add(1)
// 一个消费者
go func() {
defer wg.Done()
defer close(dataCh)
for value := range dataCh {
if value == 0 {
close(stopCh) // 消费关闭stopCh,生产者监听stopCh通道,并跳出生产消息逻辑
return
}
log.Println(value)
}
}()
/*
输出:
6
3
...
*/
wg.Wait()
}

多个生产者,多个消费者(生产者和消费值都有多个,找一个中间人来进行通道关闭)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
const MaxRandomNumber = 100
const NumReceivers = 10
const NumSenders = 10

wg := sync.WaitGroup{}
wg.Add(NumReceivers)

dataCh := make(chan int, 10)
stopCh := make(chan struct{}) // 生产者是中间人, 消费者是 (dataCh所有生产者和消费者)
toStop := make(chan string, 1) // 中间人通道,dataCh所有生产者或者消费者满足关闭条件后,想中间人通道发送数据

var stoppedBy string

// 中间人,收到关闭信号,关闭stopCh通道
go func() {
stoppedBy = <-toStop
close(stopCh)
}()

// 多个生产者
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(MaxRandomNumber)
// 命中关闭,通知中间人去干关闭的活
if value == 0 {
select {
case toStop <- "sender#" + id:
default:
}
return
}

//尝试尽早退出, 这里不能省略, 因为可能会导致多发送一次?
select {
case <-stopCh:
return
default:
}

// 生产数据
select {
case <-stopCh: // 监听 stopCh关闭信号,跳出循环
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}

// 多个消费者
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wg.Done()

for {
// 尝试尽早退出, 这里不能省略, 因为可能会导致多接收一次?
select {
case <-stopCh:
return
default:
}

//注意此处如果 stopCh 关闭了, 下面也有能 return 不了
//因为dataCh也有可能select 到, 所以上一个 select语句不能省略?

select {
case <-stopCh: 监听 stopCh关闭信号,跳出循环
return
case value := <-dataCh:
//命中关闭,通知中间人去干关闭的活
if value == MaxRandomNumber-1 {
select {
case toStop <- "receiver#" + id:
default:
}
return
}
log.Println(value)
}
}
}(strconv.Itoa(i))
}

wg.Wait()
log.Println("stopped by", stoppedBy)
}