通道Channel

通道Channel简介和使用

Posted by Ted on February 11, 2023

1、Channel介绍

通过共享内存来通讯和通过通讯来共享内存是并发编程中的两种编程风格。

当通过共享内存来通讯的时候,我们需要一些传统的并发同步技术(比如互斥锁)来避免数据竞争。

Go提供了一种独特的并发同步技术来实现通过通讯来共享内存。此技术即为Channel。 我们可以把一个Channel看作是在一个程序内部的一个先进先出(FIFO:first in first out)数据队列。 一些协程可以向此Channel发送数据,另外一些协程可以从此Channel接收数据。Go Channel可以帮助程序员轻松地避免数据竞争。

尽管Go也支持几种传统的数据同步技术,但是只有Channel为一等公民。 Channel是Go中的一种类型,所以我们可以无需引进任何代码包就可以使用Channel。 几种传统的数据同步技术提供在sync和sync/atomic标准库包中。

实事求是地说,每种并发同步技术都有它们各自的最佳应用场景,但是Channel的应用范围更广。 使用Channel来做同步常常可以使得代码看上去更整洁和易于理解。

2、通道类型和值

通道类型

和数组、切片以及映射类型一样,每个通道类型也有一个元素类型。 一个通道只能传送它的(通道类型的)元素类型的值。

通道可以是双向的,也可以是单向的。假设T是一个元素类型

  • chan T :双向通道类型。
  • chan<- T :单向可发送通道类型,编译器不允许从此类型的值中接收数据。
  • <-chan T :单向可接收通道类型。 编译器不允许向此类型的值中发送数据。

通道箭头<-总是指向左边。类型T总是在最右。

双向通道chan T可以被隐式转换为单向通道类型chan<- T和<-chan T,但反之不行(即使显式也不行)。 类型chan<- T和<-chan T的值也不能相互转换。

通道零值

通道类型的零值也使用nil来表示。

一个非零通道值必须通过内置的make函数来创建。 比如make(chan int, 10)将创建一个元素类型为int,容量为10的通道值。 第二个参数指定了欲创建的通道的容量。此第二个实参是可选的,它的默认值为0。

缓存、非缓存

  • 一个容量为0的通道值称为一个非缓冲通道(unbuffered channel);
  • 一个容量不为0的通道值称为一个缓冲通道(buffered channel)。

通道的堵塞:

  • 如果 buffered channel 中还有数据,那么,从这个 channel 接收数据的时候就不会阻塞,
  • 如果 buffered channel 还未满(“满”指达到其容量),给它发送数据也不会阻塞,否则就会阻塞。
  • unbuffered chan 只有读写都准备好之后才不会阻塞,这也是很多使用 unbuffered chan 时的常见 Bug。
  • nil 是 chan 的零值,是一种特殊的 chan,对值是 nil 的 chan 的发送接收调用者总是会阻塞。

3、通道操作

Go中有五种通道相关的操作:

  1. 关闭通道
  2. 向通道内发送值
  3. 从通道内接收值
  4. 查询通道容量
  5. 查询通道内队列长度

假设一个通道(值)为ch,下面列出了这五种操作的语法或者函数调用。

3.1 关闭通道:

 close(ch) 

传给close函数调用的实参必须为一个通道,并且此通道值不能为单向接收的。

3.2 向通道ch发送一个值v:

 ch <- v  

v必须能够赋值给通道ch的元素类型。 ch不能为单向接收通道。 <-称为数据发送操作符。

3.3 从通道ch接收一个值:

v = <-ch
v, sentBeforeClosed = <-ch

接收数据时,还可以返回两个值。第一个值是返回的 chan 中的元素,很多人不太熟悉的是第二个值。第二个值是 bool 类型,代表是否成功地从 chan 中读取到一个值,如果第二个参数是 false,chan 已经被 close 而且 chan 中没有缓存的数据,这个时候,第一个值是零值。所以,如果从 chan 读取到一个零值,可能是 sender 真正发送的零值,也可能是 closed 的并且没有缓存元素产生的零值。

3.4 查询一个通道的容量:

cap(ch) 

其中cap是一个已经在容器类型一文中介绍过的内置函数。 cap的返回值的类型为内置类型int。

3.5 查询一个通道的长度:

           ```        
            len(ch)      
           ```

其中len是一个已经在容器类型一文中介绍过的内置函数。 len的返回值的类型也为内置类型int。 一个通道的长度是指当前有多少个已被发送到此通道但还未被接收出去的元素值。

4、channel 的五种应用场景

img

1、消息交流

channel 的底层是一个循环队列,当队列的长度大于 0 的 时候,它会被当做线程安全队列和 buffer。利用这个特性,一个 goroutine 可以安全的往 channel 中存放数据,另一个 goroutine 可以安全的从 channel 中读取数据,这样就实现了 goroutine 之间的消息交流。

2、数据传递

数据传递类似游戏“击鼓传花”。鼓响时,花(或者其它物件)从一个人手里传到下一个人,数据就类似这里的花。

3、信号通知

channel 类型有这样一个特性:如果 channel 为空,那么 recevier 接收数据的时候就会阻塞,直到有新的数据进来或者 channel 被关闭。

利用这个特性,就可以实现 wait/notify 设计模式。另外还有一个经常碰到的场景,实现程序的 graceful shutdown。

func main() {
  go func() {
      ...... // 执行业务处理
    }()

  // 处理CTRL+C等中断信号
  termChan := make(chan os.Signal)
  signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
  <-termChan 

  // 执行退出之前的清理动作
  doCleanup()
  
  fmt.Println("优雅退出")
}

4、锁

利用 channel 我们也能实现锁的功能。

sync.Mutex 通过修改持有锁标记位的状态达到占有锁的目的,因此 channel 可以通过转移这个标记位的所有权实现占有锁。

具体代码如下:

// 使用chan实现互斥锁
type Mutex struct {
    ch chan struct{}
}

// 使用锁需要初始化
func NewMutex() *Mutex {
    mu := &Mutex{make(chan struct{}, 1)}
    mu.ch <- struct{}{}
    return mu
}

// 请求锁,直到获取到
func (m *Mutex) Lock() {
    <-m.ch
}

// 解锁
func (m *Mutex) Unlock() {
    select {
    case m.ch <- struct{}{}:
    default:
        panic("unlock of unlocked mutex")
    }
}

// 尝试获取锁
func (m *Mutex) TryLock() bool {
    select {
    case <-m.ch:
        return true
    default:
    }
    return false
}

// 加入一个超时的设置
func (m *Mutex) LockTimeout(timeout time.Duration) bool {
    timer := time.NewTimer(timeout)
    select {
    case <-m.ch:
        timer.Stop()
        return true
    case <-timer.C:
    }
    return false
}

// 锁是否已被持有
func (m *Mutex) IsLocked() bool {
    return len(m.ch) == 0
}

func main() {
    m := NewMutex()
    ok := m.TryLock()
    fmt.Printf("locked v %v\n", ok)
    ok = m.TryLock()
    fmt.Printf("locked %v\n", ok)
}

这里实现锁主要利用了向满 channel 发送数组或从空 channel 接收数据会阻塞的特性。另外,利用 select 很容易实现 TryLock 和 Timeout 的功能。

5、任务编排

通过 WaitGroup,我们能很容易的实现 等待一组 goroutine 完成任务这种任务编排需求。同样,我们也可以用 channel 实现。

但是如果任务编排再复杂一些呢?如果面试官出了下面这个题目:

有一批任务需要处理,但是机器资源有限,只能承受100的并发度,该如何实现?

一种解决方案就是使用 channel,代码如下:

func task(ch chan struct{}) {
    <-ch
    //执行任务
    time.Sleep(time.Second)
    ch <- struct{}{}
    return
}

func concurrency100() {
    ch := make(chan struct{}, 100)

    for i := 0; i < 100; i++ {
        ch <- struct{}{}
    }

    for {
        go task(ch)
    }
}

利用 sender 给满员的 channel 发送数据会阻塞的特性,就实现了并发度始终维持在 100 的需求。