go中协程与通道的一些用法

背景

go的goroutine和channel为了并发执行做了非常多的优化,
同时也产生了许多使用技巧.这些暂时不适合写在初步教程当中.

main自动等待子协程结束

不用任何方法,需要等待足够时间

1
2
3
4
5
6
7
8
9
10
11
func test() {
time.Sleep(time.Millisecond * 500) // 假设此处随机
fmt.Println("goroutine")
}

func main() {
for i := 0; i < 10; i++ {
go test()
}
time.Sleep(time.Second) // 此处需要等待足够长的时间
}

使用 信号量模式,专门用一个通道保存结束信号.

  1. 每个goroutine在结束后都会向通道中传递信号.
  2. 启动了多少个goroutine,就需要从通道中取出多少个信号
  3. 否则就阻塞main,不让结束.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func test(end chan bool) {
time.Sleep(time.Millisecond * 500)
fmt.Println("goroutine")
end <- true // 值无所谓,仅仅利用协程的阻塞特性
}

func main() {
end := make(chan bool)
for i := 0; i < 10; i++ {
go test(end)
// <-end // 若此处有end,在通道没有缓存的情况下,会每0.5s一个"goroutine"
}
for i := 0; i < 10; i++ {
<-end // 写在此处,则不妨碍0s时10个goroutine一同启动
}
}

工厂模式

传统的

  1. 在main函数里新建一个channel
  2. 另外声明一个函数f,将1中建立的channel作为参数
  3. 函数f中向channel中发送数据

一定情景下可以变成

  1. 函数f自己新建channel
  2. 函数f向新建的channel中发送数据
  3. 函数f返回这个装有数据的channel

优点

  1. 整体感更强,能够封装一些连带操作(比如启动一个后台监视程序等等)并将这些连带操作对外屏蔽
  2. 有时(通道带关闭时)可以用出其他语言generator的感觉
  3. 一定程度上可以使用只读/只写通道来保护数据

缺点:

  1. 写了非常多的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func pump() <-chan int {              // 返回一个只读的chan
ch := make(chan int)
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}()
return ch
}

func main() {
for x := range pump() {
fmt.Println(x)
}
}

充分利用多核CPU

许多运算

  • 排序
  • 查找
  • 简单累加

按照传统方式,都是一个线程来完成,不能很好利用多核CPU的功能.

原则: 让那些不要求严格顺序的操作,同一时刻开始

加快for循环的速度

1
2
3
4
5
6
7
8
9
10
11
12
func heavyWork(i int) {
time.Sleep(time.Millisecond * 100)
fmt.Println(i)
}

func main() {
start := time.Now()
for i := 0; i < 100; i++ {
heavyWork(i)
}
fmt.Println(time.Since(start)) // 10.032s
}

go能够简单地发起数万甚至数百万的goroutine,
可以在for中直接发起多个goroutine来节省时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func heavyWork(i int, end chan bool) {
time.Sleep(time.Millisecond * 100) // 尤其是当此处为IO操作,CPU不需要做什么时
fmt.Println(i)
end <- true
}

func main() {
start := time.Now()
finish := make(chan bool, 100) // 建议流出缓冲

for i := 0; i < 100; i++ {
go heavyWork(i, finish)
}

for i := 0; i < 100; i++ {
<-finish
}
fmt.Println(time.Since(start)) // 102.085ms
}

同一时刻开始 对于IO任务来说本身就是极大的性能提升

Futures模式

futures翻译作期货也可.

分别同时开始两个不特别相关的任务,等到全部准备好,立即开始剩下的工作.
所谓等到全部准备好,依靠通道的阻塞来完成.

改进前

1
2
3
4
5
func InverseProduct(a Matrix, b Matrix) {
a_inv := Inverse(a)
b_inv := Inverse(b) // a_inv的计算和b_inv的计算并不需要严格有序
return Product(a_inv, b_inv)
}

改进后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func InverseProduct(a Matrix, b Matrix) {
a_inv_future := InverseFuture(a) // 以工厂模式返回一个通道,用于接收结果
b_inv_future := InverseFuture(b)
a_inv := <-a_inv_future
b_inv := <-b_inv_future // 这里的两句会阻塞到两个结果都已经计算完成
return Product(a_inv, b_inv) // 然后计算即可
}

func InverseFuture(a Matrix) {
future := make(chan Matrix) // 0. 初始化一个通道
go func() {
future <- Inverse(a) // 1. 开始计算 2. 计算结果送入通道
}()
return future // 3. 返回这个通道
}

恢复运行

并发的协程有很多,其中有一些会报错,
但不希望报错影响整体的运行.
因此每一个协程的代码里都放上defer以防止panic.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func generater() chan int {
c := make(chan int, 10)
defer close(c)

for i := 0; i < 10; i++ {
c <- i
}
return c
}

func test(upstream chan int, next chan int) {
defer func() {
if err := recover(); err != nil {
fmt.Println("error in goroutine", err)
}
}()
i := <-upstream
res := 2520 / i
next <- res
}

func main() {
res := make(chan int, 10)
up := generater()

for i := 0; i < 10; i++ {
go test(up, res)
}

for i := 0; i < 9; i++ { // 有一个出错,因此通道里只有9个值
fmt.Println(<-res)
}
}

generator

go本身没有generator一说,但可以写出类似的效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// main外部的generateInteger()需要使用
// 因此在外部声明
var resume chan int

func integers() chan int {
yield := make(chan int)
count := 0
go func() {
for {
yield <- count
count++
}
}()
return yield
}

func generateInteger() int {
return <-resume
}

func main() {
resume = integers()
fmt.Println(generateInteger()) // 0
fmt.Println(generateInteger()) // 1
fmt.Println(generateInteger()) // 2
}

管道模式

适合于数据的流动

  1. 数据从上游channel来
  2. 本地处理后
  3. 结果放入下游channel
1
2
3
4
5
6
func oneStep(in <-chan int, out chan<- string) {
for inValue := range in {
result := f(inValue)
out <- result
}
}

filter模式

不仅仅能够像管道模式一样,来一个处理一个,并放入下游.
还可以像流式编程一样,过滤掉一部分.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func source() <-chan int {
numbers := make(chan int, 20)
func() {
for i := 0; i < 20; i++ {
numbers <- i
}
close(numbers)
}()
return numbers
}

func filter(in <-chan int, out chan<- int) {
for {
time.Sleep(time.Millisecond * 100)
if i, ok := <-in; ok {
if i%2 == 0 {
out <- i
}
} else {
break
}
}
close(out)
}

func main() {
numbers := source()
out := make(chan int)

go filter(numbers, out)
for x := range out {
fmt.Println(x)
}
}

/*
0
2
4
...
18
*/

再比如计算素数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 上游生成大于2的正整数
func generate() chan int {
ch := make(chan int)
go func() {
for i := 2; ; i++ {
ch <- i
}
}()
return ch
}

// filter中对每个上游的数字,判断是否为素数
// 不需要再次遍历是之前所有数字的倍数
// 只需要不是最新产生的素数的倍数即可
func filter(in chan int, prime int) chan int {
out := make(chan int)
go func() {
for {
time.Sleep(time.Millisecond * 100)
if i := <-in; i%prime != 0 {
out <- i
}
}
}()
return out
}

func sieve() chan int {
out := make(chan int)
go func() {
ch := generate()
for {
prime := <-ch // 第一个素数是2
ch = filter(ch, prime)
out <- prime // 收集素数到out通道
}
}()
return out
}

func main() {
primes := sieve()
for {
fmt.Println(<-primes)
}
}

阻塞和生产者消费者模式

本质上是两个协程互相同步与互斥

select搭配tick和after,实现定时或超时功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func main() {
// tick无法手动关闭,如果想关闭需要使用ticker
tick := time.Tick(time.Millisecond * 500)
ticker := time.NewTicker(time.Millisecond * 1000)
defer ticker.Stop()
boom := time.After(time.Second * 3)

for {
select {
case <-tick:
fmt.Println("tick")
case <-ticker.C:
fmt.Println("tock")
case <-boom:
fmt.Println("boom!")
return
}
}
}

/*
tick
tick 同时到来tick与tock,随机做了选择
tock 但剩下的不会丢掉
tick
tick 同时到来tick与tock,随机做了选择
tock
tick
tick
boom! 这次是boom!赢了tock
*/

用途比如

  1. client.Call 的回复放在一个通道中,等待消息返回,或者在超时后提示失败
  2. 从不同数据库查到的信息放在不同通道,等待一个比较快的返回后就结束

信号量模式

实现互斥锁的一种方式就是 信号量,
这里主要写信号量的一种, PV原语

可以使用带缓冲的channel来实现PV原语的模拟

  1. 初始化一个缓冲长度等于资源数n的通道
  2. P(n)向通道中放入消息,相当于占用资源
  3. V(n)从通道中取出消息,相当于释放资源
  4. 当通道缓冲已全被沾满,无法再放入,P(n)就会阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
func (s semaphore) P(n int) {
e := new(Empty)
for i := 0; i < n; i++ {
s <- e
}
}

// release n resouces
func (s semaphore) V(n int) {
for i:= 0; i < n; i++{
<- s
}
}

在PV原语的基础上可以发展出一些常用的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 锁/解锁模式
func (s semaphore) Lock() {
s.P(1)
}

func (s semaphore) Unlock(){
s.V(1)
}


// 等待/信号模式,借此来完成所有的n个协程都完成,才能进行下一步的同步效果
func (s semaphore) Wait(n int) { // 让所有人等待
s.P(n)
}

func (s semaphore) Signal() { // 每个完成工作的通行
s.V(1)
}

限制并发数量

依靠一个缓冲等于限制量的辅助通道.(比如这里的 sem)

  1. 服务器级别分配资源,比如for中向通道书写入一个内容,资源减少1.
  2. 每个请求的处理中,请求完成后释放资源,从通道中取一个内容出来.
  3. 通道缓冲沾满时,服务器的处理会被暂时阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var sem = make(chan int, MAXREQS)

func process(r *Request) {
// Do something 做任何事
}
func handle(r *Request) {
process(r)
// 处理结束,释放"资源"
<-sem
}
func Server(queue chan *Request) {
for {
// 先分配资源
// 若分配不到,则此处阻塞直到资源释放
sem <- 1

// 再处理内容
request := <-queue
go handle(request)
}
}
func main() {
queue := make(chan *Request)
go Server(queue)
}

链式计算

效果上更像是一个玩具,
一节接一节搭建好过山车的轨道.
然后只要将小车推下去即可.

保证的确有这么多goroutine同时在跑,还没有结束.
用来验证go能同时容纳多少个goroutine.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
const N = 1e6

func f(left, right chan int) { left <- 1 + <-right }

func main() {
start := time.Now()
leftmost := make(chan int) // 可以理解为位于0
var left, right chan int = nil, leftmost // 此刻的left位于-1而right位于0

for i := 0; i < N; i++ {
left, right = right, make(chan int) // 之后left与right均向右移动
go f(left, right) // 搭建一条从左到右的,相互连接,上一个完毕下一个才能开始的一连串处理
}

right <- 0 // 从右端放入数据,开始移动
x := <-leftmost // 等待左端数据输出

fmt.Println(x)
fmt.Println(time.Since(start)) // 1e6大约用时1.27s
}

典型的客户端/服务端模式

首先有个 Request 结构体,里面是客户端的请求信息,
以及一个要求回复数据时信息的通道

1
2
3
4
type Request struct {
a, b int;
replyc chan int;
}

服务端代码需要为每一个请求启用一个goroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 本质级别,需要处理参数a,b并返回一个结果
type binOp func(a, b int) int

// 协程角度,需要将处理后的结果放入要求的通道中
func run(op binOp, req *Request) {
req.replyc <- op(req.a, req.b)
}

// 再包一层,整个服务器角度
func server(op binOp, service chan *Request, quit chan bool) {
for {
select {
case req := <-service: // 获取一个请求,得不到就会阻塞
go run(op, req) // 为请求开启goroutine
case <-quit: // 如果需要退出就退出
return
}
}
}

// 再包一层,基础架构级别
// 准备一些外部设施(获取请求的通道,退出信号的通道,启用服务器)
// 不需要channel来实现server,可以不用这个包装
func startServer(op binOp) (reqChan chan *Request, quit chan bool) {
reqChan = make(chan *Request);
quit = make(chan bool)
go server(op, reqChan, quit);
return reqChan, quit
}

测试用的客户端,写在同一个包里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
request, quit := startServer(func(a, b int) int { return a + b })
const N = 100
var reqs [N]Request
for i := 0; i < N; i++ {
// 取出并包装一个请求
req := &reqs[i]
req.a = i
req.b = i + N
req.replyc = make(chan int)

// 放入请求通道
request <- req
}

// checks:
for i := N - 1; i >= 0; i-- { // doesn’t matter what order
fmt.Println(<-reqs[i].replyc)
}

quit <- true
}

多段chan的数据流思想

一段chan,每个元素做各种处理.(这是在一个线程中完成的)
效率上不如多段chan,每段chan只做一种处理.

改进前

1
2
3
4
5
6
7
8
func SerialProcessData (in <- chan *Data, out <- chan *Data) {
for data := range in {
tmpA := PreprocessData(data)
tmpB := ProcessStepA(tmpA)
tmpC := ProcessStepB(tmpB)
out <- PostProcessData(tmpC)
}
}

改进后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func ParallelProcessData(in <-chan *Data, out <-chan *Data) {
// 准备通道,缓冲区大小需要调优
preOut := make(chan *Data, 100)
stepAOut := make(chan *Data, 100)
stepBOut := make(chan *Data, 100)
stepCOut := make(chan *Data, 100)

// 在多个线程中分别启动处理
go PreprocessData(in, preOut)
go ProcessStepA(preOut, stepAOut)
go ProcessStepB(stepAOut, stepBOut)
go ProcessStepC(stepBOut, stepCOut)
go PostProcessData(stepCOut, out)
}

不使用sync包并发读写对象

一种思想是

  1. 为每一个资源,都分配一个协程
  2. 协程从通道接收各种函数指令(好在通道的确可以接收函数)并执行操作
  3. 原先的读取与写入都要转换成相应的函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 资源
type Person struct {
Name string
salary float64
chF chan func()
}

// 有时struct也推荐使用工厂模式,将一系列操作封装在一起
func NewPerson(name string, salary float64) *Person {
p := &Person{name, salary, make(chan func())}
go p.backend() // 每个资源配一个监视操作的协程
return p
}
func (p *Person) backend() {
// 监视操作的内容则是每当遇到一个指令函数f(),则运行之
for f := range p.chF {
f()
}
}

func (p *Person) SetSalary(sal float64) {
// 不再是直接写数据,而是封装到函数中
// 毕竟人关心的是函数的处理结果,而不是哪个核心执行了这个函数
p.chF <- func() { p.salary = sal }
// 函数虽然不带参数,但巧妙地利用作用域,让函数实际上拥有变量
}

func (p *Person) Salary() float64 {
fChan := make(chan float64)
// 操作上不要求返回数值
// 而是将数据放入一个事先准备好的通道
// 毕竟人关心的是得到数据即可,数据是否要再取一下不是特别重要
p.chF <- func() { fChan <- p.salary }
return <-fChan
}
func (p *Person) String() string {
return "Person - name is: " + p.Name + " - salary is: " +
strconv.FormatFloat(p.Salary(), 'f', 2, 64)
}
func main() {
bs := NewPerson("Smith Bill", 2500.5)
fmt.Println(bs)
bs.SetSalary(4000.25)
fmt.Println("Salary changed:")
fmt.Println(bs)
}

参考

  1. 主要参考