并发编程范例
select 多路复用
先了解下select多路复用,下面多处会用到。
有时候需要在多个通道上接收,不能只从一个通道上接收,因为任何一个操作都会在完成前阻塞:
select {
case <-ch1:
// ...
case x := <-ch2:
// ...use x...
case ch3 <- y:
// ...
default:
// ...
}
非阻塞模式
有时候我们试图在一个通道上发送或接收,但是不想在通道没有准备好的情况下被阻塞,非阻塞通信。这使用 select 语句也可以做到。select 可以有一个默认情况,它用来指定在没有其他的通信发生时可以立即执行的动作:
select {
case <-abort:
fmt.Println("Launch aborted!")
return
default:
// 不执行任何操作
}
这里的处理逻辑是,通道有值就接收处理,没有则不执行任何操作,继续往下执行。这段代码外是没有for循环的,否则会没有任何延迟的重复执行default空操作,会一直占着CPU。
for select 循环模式
使用select多路复用时,一般会在外层套一个for循环。内存的select是具体要执行的任务,执行一次。外层的for则是重复循环的多次执行任务:
for { //for无限循环,或者for range循环
select {
//通过一个channel控制
}
}
循环方式有两种:
- for无限循环,直到收到停止指令
- for range 循环,重复执行N次
无限循环:
for {
select {
case <-done:
return
default:
// 具体要执行的任务
}
}
在default分支中是要重复执行的任务。一旦收到done信号,就会退出停止。
for range 循环:
for _, v := range []int{1, 2, 3, 4, 5} {
select {
case <-done:
return
case resultCh <-s:
}
}
这种情况,也会有一个done通道,这样可以中途停止。另外还有个resultCh通道,用于将for range获取的值传递给其他goroutine。
这里不使用default分支,应该是不能用,《Go语言圣经》中的示例也是这样的。
default分支用于非阻塞模式,这种通道的操作不能放在default中,default中只能放非阻塞的操作。除非default下还有default,但是最底层的default也不能阻塞。
select timeout 模式
在select多路复用的基础上,如果设置了default分支,那么所有通道都没有准备好的情况,不会等待,而是执行default。
如果就是需要等待某个通道准备好,这时就应该不设置default分支。这样就会阻塞着,直到某个通道准备好了,就能继续。但这样又会有个新问题,就是可能会一直阻塞下去。
这个模式就是设置一个超时时间,在那之前阻塞等待。一旦超时就立刻返回:
select {
case v := result:
fmt.Println(v)
case <-time.After(5 * time.Second):
fmt.Println("请求超时")
}
这是一个简单的实现。
如果了解context包,建议优先考虑使用WithTimeout函数做超时取消:
ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
defer cancel()
select {
case v := result:
fmt.Println(v)
case <-ctx.Done():
fmt.Println("请求超时")
}
Pipeline 模式
也称流水线模式,适合用来解耦上下游的处理逻辑。类似流水线的生产。每一道工序的输出就是下一道工序的输入,在工序之间通过管道来传递数据。
这里模拟一个有三道工序的例子,首先是第一道工序:
func start(n int) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for i := 0; i < n; i++ {
time.Sleep(time.Millisecond * 100)
out <- fmt.Sprint("配件", i)
}
}()
return out
}
函数接收的参数是要制作多少个产品。然后sleep一段时间,模拟制作过程。最后将成功放入管道,传递给下一道工序。而函数的返回值就是这个管道,这个管道将会作为参数传给下一道工序。
然后是第二道工序:
func build(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for c := range in {
time.Sleep(time.Millisecond * 1000)
out <- fmt.Sprint("组装", "+", c)
}
}()
return out
}
和上面的代码稍有不同,这次要从管道中接收数据进行处理,之后的部分则一样。
再来是第三道工序:
func end(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for c := range in {
time.Sleep(time.Millisecond * 200)
out <- fmt.Sprint("完成", "+", c)
}
}()
return out
}
这里的代码和上面的完全一样。实际也就第一道工序的时候是自己生成数量然后迭代。之后都是从管道接收数据进行迭代,处理后都是把结果传给另一个管道,交给下一道工序上。然后函数返回管道。
最后写一个主函数,将所有的工序串联起来:
func main() {
begin := time.Now()
s := start(10)
b := build(s)
e := end(b)
for item := range e {
fmt.Println(item)
}
duration := time.Since(begin)
fmt.Println("耗时:", duration)
}
// 耗时: 5.3091995s
扇出和扇入模式
上面的流水线模式有一个问题。上面给每一道工序加了一个sleep,并且是不同长度的延迟,可以发现第二道工序的耗时最长,这里就是瓶颈。
不需要复制整条流水线,而只要增加第二道工序的goroutine就可以提高生产效率。这会产生两种模式:扇入和扇出。
扇出,工序1之后,将分出多个分支通向几个工序2,就像一把打开的扇子,这个是扇出。
扇入,几个工序2处理后,又会汇聚到后面的工序3,还是像一把扇子,方向反了,这个就是扇入。
扇出的数据流是发散传递出去,是输出流;扇入的数据流是汇聚进来的,是输入流。
扇出的实现:
s := start(10)
b1 := build(s)
b2 := build(s)
b3 := build(s)
在主函数中,启动多个实例,都将上游的管道传入,这样就完成了扇出。扇出之后,产生了3个管道,这3个管道的数据都是下一道工序要处理的。
这里不修改下一道工序,所以仍然是要从一个管道中接收数据出处。这里要实现一个扇入的函数,从多个管道中接收数据然后从一个管道中输出,实现管道的合并,这就是扇入:
// merge 扇入函数(组件),可以合并多个管道到一个管道中
func merge(ins ...<-chan string) <-chan string {
var wg sync.WaitGroup
out := make(chan string)
// 把一个通道中的数据发送到out中。
p := func(in <-chan string) {
defer wg.Done()
for c := range in {
out <- c
}
}
// 扇入,启动多个goroutine,每一个处理一个管道
for _, cs := range ins {
wg.Add(1)
go p(cs)
}
// 等待所有ins处理完后,关闭out管道
go func() {
wg.Wait()
close(out)
}()
return out
}
这里把merge称作组件,函数不大,并且和业务无关。并且各个merge组件是可以复用的。
在main函数中使用merge组件,完成扇出和扇入:
func main() {
begin := time.Now()
s := start(10)
b1 := build(s) // 扇出
b2 := build(s)
b3 := build(s)
b := merge(b1, b2, b3) // 扇入
e := end(b)
for item := range e {
fmt.Println(item)
}
duration := time.Since(begin)
fmt.Println("耗时:", duration)
}
// 耗时: 2.6091368s
Futures 模式
上面的流水线模式中,各个工序是互相依赖的。后面的工序必须等待上一步的完成才能继续。在有的场景中,一个大任务可以分解成多个小任务,这些小任务之间有些是独立的没有依赖。为了提高效率,这些独立的小任务就可以并发进行。
Futures模式可以理解为未来模式,主goroutine只需要把任务分派给各个子goroutine执行,之后再等待每个子任务等返回结果。同样是三个任务,这里是并行,上面的流水线是串行。
这里写的例子没什么特别的:
package main
import (
"fmt"
"time"
)
func water() <-chan string {
out := make(chan string)
go func() {
time.Sleep(time.Second *1)
out <-"开水"
}()
return out
}
func vegetable() <-chan string {
out := make(chan string)
go func() {
time.Sleep(time.Second * 3)
out <-"炒青菜"
}()
return out
}
func rice() <-chan string {
out := make(chan string)
go func() {
time.Sleep(time.Second * 6)
out <-"米饭"
}()
return out
}
func main() {
w := water()
v := vegetable()
r := rice()
<-w
fmt.Println("水开了,倒水...")
<-v
fmt.Println("菜好了,上菜...")
<-r
fmt.Println("饭好了,盛饭...")
fmt.Println("开动。")
}
这个模式主要的特点就是,每到工序返回的结果,是在未来某个时间要使用的。