~

并发编程范例

select 多路复用

先了解下select多路复用,下面多处会用到。
有时候需要在多个通道上接收,不能只从一个通道上接收,因为任何一个操作都会在完成前阻塞:

select {
case <-ch1:
    // ...
case x := <-ch2:
    // ...use x...
case ch3 <- y:
    // ...
default:
    // ...
}

非阻塞模式

有时候我们试图在一个通道上发送或接收,但是不想在通道没有准备好的情况下被阻塞,非阻塞通信。这使用 select 语句也可以做到。select 可以有一个默认情况,它用来指定在没有其他的通信发生时可以立即执行的动作:

select {
case <-abort:
    fmt.Println("Launch aborted!")
    return
default:
    // 不执行任何操作
}

这里的处理逻辑是,通道有值就接收处理,没有则不执行任何操作,继续往下执行。这段代码外是没有for循环的,否则会没有任何延迟的重复执行default空操作,会一直占着CPU。

for select 循环模式

使用select多路复用时,一般会在外层套一个for循环。内存的select是具体要执行的任务,执行一次。外层的for则是重复循环的多次执行任务:

for { //for无限循环,或者for range循环
    select {
        //通过一个channel控制
    }
}

循环方式有两种:

  1. for无限循环,直到收到停止指令
  2. for range 循环,重复执行N次

无限循环:

for {
    select {
    case <-done:
        return
    default:
        // 具体要执行的任务
    }
}

在default分支中是要重复执行的任务。一旦收到done信号,就会退出停止。

for range 循环:

for _, v := range []int{1, 2, 3, 4, 5} {
    select {
    case <-done:
        return
    case resultCh <-s:
    }
}

这种情况,也会有一个done通道,这样可以中途停止。另外还有个resultCh通道,用于将for range获取的值传递给其他goroutine。
这里不使用default分支,应该是不能用,《Go语言圣经》中的示例也是这样的。
default分支用于非阻塞模式,这种通道的操作不能放在default中,default中只能放非阻塞的操作。除非default下还有default,但是最底层的default也不能阻塞。

select timeout 模式

在select多路复用的基础上,如果设置了default分支,那么所有通道都没有准备好的情况,不会等待,而是执行default。
如果就是需要等待某个通道准备好,这时就应该不设置default分支。这样就会阻塞着,直到某个通道准备好了,就能继续。但这样又会有个新问题,就是可能会一直阻塞下去。
这个模式就是设置一个超时时间,在那之前阻塞等待。一旦超时就立刻返回:

select {
case v := result:
    fmt.Println(v)
case <-time.After(5 * time.Second):
    fmt.Println("请求超时")
}

这是一个简单的实现。
如果了解context包,建议优先考虑使用WithTimeout函数做超时取消:

ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
defer cancel()
select {
case v := result:
    fmt.Println(v)
case <-ctx.Done():
    fmt.Println("请求超时")
}

Pipeline 模式

也称流水线模式,适合用来解耦上下游的处理逻辑。类似流水线的生产。每一道工序的输出就是下一道工序的输入,在工序之间通过管道来传递数据。
这里模拟一个有三道工序的例子,首先是第一道工序:

func start(n int) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for i := 0; i < n; i++ {
            time.Sleep(time.Millisecond * 100)
            out <- fmt.Sprint("配件", i)

        }
    }()
    return out
}

函数接收的参数是要制作多少个产品。然后sleep一段时间,模拟制作过程。最后将成功放入管道,传递给下一道工序。而函数的返回值就是这个管道,这个管道将会作为参数传给下一道工序。
然后是第二道工序:

func build(in <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for c := range in {
            time.Sleep(time.Millisecond * 1000)
            out <- fmt.Sprint("组装", "+", c)
        }
    }()
    return out
}

和上面的代码稍有不同,这次要从管道中接收数据进行处理,之后的部分则一样。
再来是第三道工序:

func end(in <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for c := range in {
            time.Sleep(time.Millisecond * 200)
            out <- fmt.Sprint("完成", "+", c)
        }
    }()
    return out
}

这里的代码和上面的完全一样。实际也就第一道工序的时候是自己生成数量然后迭代。之后都是从管道接收数据进行迭代,处理后都是把结果传给另一个管道,交给下一道工序上。然后函数返回管道。

最后写一个主函数,将所有的工序串联起来:

func main() {
    begin := time.Now()
    s := start(10)
    b := build(s)
    e := end(b)
    for item := range e {
        fmt.Println(item)
    }
    duration := time.Since(begin)
    fmt.Println("耗时:", duration)
}
// 耗时: 5.3091995s

扇出和扇入模式

上面的流水线模式有一个问题。上面给每一道工序加了一个sleep,并且是不同长度的延迟,可以发现第二道工序的耗时最长,这里就是瓶颈。
不需要复制整条流水线,而只要增加第二道工序的goroutine就可以提高生产效率。这会产生两种模式:扇入和扇出。
扇出,工序1之后,将分出多个分支通向几个工序2,就像一把打开的扇子,这个是扇出。
扇入,几个工序2处理后,又会汇聚到后面的工序3,还是像一把扇子,方向反了,这个就是扇入。
扇出的数据流是发散传递出去,是输出流;扇入的数据流是汇聚进来的,是输入流。

扇出的实现:

s := start(10)
b1 := build(s)
b2 := build(s)
b3 := build(s)

在主函数中,启动多个实例,都将上游的管道传入,这样就完成了扇出。扇出之后,产生了3个管道,这3个管道的数据都是下一道工序要处理的。
这里不修改下一道工序,所以仍然是要从一个管道中接收数据出处。这里要实现一个扇入的函数,从多个管道中接收数据然后从一个管道中输出,实现管道的合并,这就是扇入:

// merge 扇入函数(组件),可以合并多个管道到一个管道中
func merge(ins ...<-chan string) <-chan string {
    var wg sync.WaitGroup
    out := make(chan string)
    // 把一个通道中的数据发送到out中。
    p := func(in <-chan string) {
        defer wg.Done()
        for c := range in {
            out <- c
        }
    }
    // 扇入,启动多个goroutine,每一个处理一个管道
    for _, cs := range ins {
        wg.Add(1)
        go p(cs)
    }
    // 等待所有ins处理完后,关闭out管道
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

这里把merge称作组件,函数不大,并且和业务无关。并且各个merge组件是可以复用的。
在main函数中使用merge组件,完成扇出和扇入:

func main() {
    begin := time.Now()
    s := start(10)
    b1 := build(s) // 扇出
    b2 := build(s)
    b3 := build(s)
    b := merge(b1, b2, b3) // 扇入
    e := end(b)
    for item := range e {
        fmt.Println(item)
    }
    duration := time.Since(begin)
    fmt.Println("耗时:", duration)
}
// 耗时: 2.6091368s

Futures 模式

上面的流水线模式中,各个工序是互相依赖的。后面的工序必须等待上一步的完成才能继续。在有的场景中,一个大任务可以分解成多个小任务,这些小任务之间有些是独立的没有依赖。为了提高效率,这些独立的小任务就可以并发进行。
Futures模式可以理解为未来模式,主goroutine只需要把任务分派给各个子goroutine执行,之后再等待每个子任务等返回结果。同样是三个任务,这里是并行,上面的流水线是串行。
这里写的例子没什么特别的:

package main

import (
    "fmt"
    "time"
)


func water() <-chan string {
    out := make(chan string)
    go func() {
        time.Sleep(time.Second *1)
        out <-"开水"
    }()
    return out
}

func vegetable() <-chan string {
    out := make(chan string)
    go func() {
        time.Sleep(time.Second * 3)
        out <-"炒青菜"
    }()
    return out
}

func rice() <-chan string {
    out := make(chan string)
    go func() {
        time.Sleep(time.Second * 6)
        out <-"米饭"
    }()
    return out
}

func main() {
    w := water()
    v := vegetable()
    r := rice()
    <-w
    fmt.Println("水开了,倒水...")
    <-v
    fmt.Println("菜好了,上菜...")
    <-r
    fmt.Println("饭好了,盛饭...")
    fmt.Println("开动。")
}

这个模式主要的特点就是,每到工序返回的结果,是在未来某个时间要使用的。