go的sync包与锁

背景

许多源码中都用到sync中的锁,
许多教程中也讲到sync与channel在并发时的优劣问题.

资源争抢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"fmt"
"time"
)

const N = 1e7

type test struct { // 公共的资源
a int
}

type chann chan int

func (t *test) addOne(done chann) {
for i := 0; i < N; i++ {
t.a += 1 // 一个不断加1,N次
done <- 0
}
}

func (t *test) subOne(done chann) {
for i := 0; i < N; i++ {
t.a -= 1 // 一个不断减1,N次
done <- 0
}
}

func main() {
start := time.Now()
t := &test{a: 0}
finished := make(chann, 1000)

go t.addOne(finished)
go t.subOne(finished)

for i := 0; i < N; i++ { // 等待处理结束
<-finished
<-finished
}
fmt.Println(t.a) // 结果不为0
fmt.Println(time.Since(start)) // 平均850ms
}

典型的争抢原因:

  1. 一个任务分几步
    1. 读寄存器
    2. 运算
    3. 写寄存器
  2. A与B都读,A与B都计算,A与B都写.导致有一个的结果被覆盖

结果导致

  1. N次一加一减,结果不为0
  2. N次两个都加,结果不足2N

等等

介绍

sync包提供了异步中常用的工具

  1. Mutex 互斥锁
  2. RWMutex 读写锁
  3. WaitGroup 等待组
  4. Once 单次执行
  5. Cond 条件变量
  6. Pool 临时对象池
  7. Map 并发安全的map

互斥锁

概念和使用都最为简单

  1. 资源上带一个字段
    • 这里为了不引入更多复杂度,使用具名字段
    • 使用匿名字段可以少写一些字母
    • 使用具名字段可以方便编辑器多光标操作,同时注释所有Lock,Unlock
  2. 用在结构体中不需要明确初始化
  3. 操作资源时锁上 resource.mu.Lock()
  4. 操作完毕解开 resource.mu.Unlock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package main

import (
"fmt"
"sync"
"time"
)

const N = 1e7

type test struct {
a int
mu sync.Mutex
}

type chann chan int

func (t *test) addOne(done chann) {
for i := 0; i < N; i++ {
t.mu.Lock()
t.a += 1
t.mu.Unlock()
done <- 0
}
}

func (t *test) subOne(done chann) {
for i := 0; i < N; i++ {
t.mu.Lock()
t.a -= 1
t.mu.Unlock()
done <- 0
}
}

func main() {
start := time.Now()
t := &test{a: 0}
finished := make(chann, 1000)

go t.addOne(finished)
go t.subOne(finished)

for i := 0; i < N; i++ {
<-finished
<-finished
}
fmt.Println(t.a) // 结果为0
fmt.Println(time.Since(start)) // 平均1.05s,锁占用0.2s
}

功能上,已经能够保证最终结果为0.

性能上,加锁和解锁理论上的确会造成一定的性能浪费.
实际测试中每次的时间不确定,只有大概结果.

读写锁和性能测试

RWMutex,实际上是多读单写锁,简称读写锁.
搭配RLock,RUnlock(),可实现
读写读与写互斥,写与写互斥,读与读不互斥.

  1. Lock(), Unlock(), 普通互斥锁
  2. RLock(), RUnlock(), 读写锁

按照参考的说法,当读写过程耗时1ms.

  1. 读多写少,RWMutex好7倍
  2. 读写相当,RWMutex好1倍
  3. 读少写多,RWMutex与Mutex性能相当

若读写过程耗时下降,则RWMutex的优势也下降.但总体趋势不变
若读写过程耗时上升,整体优劣不发生明显改变

单次执行

有时候特别希望for循环里的一个过程只执行一次.
sync.Once 就是这样的工具.

主要方法:

  1. Do 将被执行的函数放入Do中,即可只执行一次
1
2
3
4
5
6
7
8
9
10
11
func main() {
var once sync.Once

onceBody := func() {
fmt.Println("lambda function")
}

for i := 0; i < 8; i++ {
once.Do(onceBody) // 如果只是onceBody()则执行多次
}
}

等待组

比如要在main函数中,等待所有子goroutine结束再退出.

可以用的方法:

  1. 专门为结束信号量建立一个channel,启动多少个,就从channel取出多少个,取不够就阻塞,取够就说明已经结束,可以退出
  2. WaitGroup

WaitGroup的方法主要有

  1. Add,添加一个等待,但不区分是谁,goroutine应该是公平的
  2. Done,由被等待的goroutine发出,要求将wg指针传递给启动的goroutine函数
  3. Wait,等待所有子goroutine结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"fmt"
"sync"
)

func work(wg *sync.WaitGroup, id int) {
fmt.Println(id)
wg.Done()
}

func main() {
var wg sync.WaitGroup
for i := 0; i < 8; i++ {
wg.Add(1)
go work(&wg, i)
}
wg.Wait() // 若不等待则只打印最后一句话
fmt.Println("all child goroutine finished")
}

条件变量

sync.Cond 条件变量是一种信号通知机制.
用途广泛

  1. 写协程B,必须依赖读协程A的结果,让A读取完后通知B
  2. 其他信息交流,暂时想不到

但缺点也多

  1. 有时需要全局变量,因wait只能带来是否继续而不能等来真正的消息
  2. 书写复杂

因此有时也会用 WaitGroup 等代替

常用的方法有

  1. Signal, 随机通知一个协程
  2. BroadCase, 通知所有协程
  3. Wait, 阻塞并等待通知

为了完成这些功能.需要先

  1. sync.NewCond(locker *sync.Mutex)
  2. cond.L.Lock()cond.L.Unlock() 包裹wait过程
    1. wait本身是 unlock, lock 因此代码中想wait,必须先lock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"fmt"
"sync"
"time"
)

var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)

func main() {
for i := 0; i < 10; i++ {
go func(x int) {
cond.L.Lock()
defer cond.L.Unlock()
cond.Wait() // 收到通知则不再阻塞
fmt.Println("I'm", x) // 收到信号则执行工作,退出(由于没有for)
}(i)
}

// 睡眠1秒,使所有goroutine进入 Wait 阻塞状态
time.Sleep(time.Millisecond * 500)

time.Sleep(time.Millisecond * 500)
fmt.Println("Signal...")
cond.Signal() // I'm 9 // 看似任意选择的一个

time.Sleep(time.Millisecond * 500)
fmt.Println("Signal...")
cond.Signal() // I'm 4

time.Sleep(time.Millisecond * 500)
fmt.Println("Signal...")
cond.Signal() // I'm 5 // 但好像有点像轮盘

time.Sleep(time.Millisecond * 500)
fmt.Println("Broadcast...")
cond.Broadcast() // I'm 3,6,7,1,2,8,0

time.Sleep(time.Second) // 等待结束
}

将wait放在for中,则可以让协程一次次阻塞,
当然,可以在阻塞期间修改一些变量,
阻塞过后可以对新的值进行判断.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package main

import (
"fmt"
"sync"
"time"
)

var i = 5

func odder(cond *sync.Cond, done chan bool) {
cond.L.Lock()
defer func() {
cond.L.Unlock()
done <- true
}()
for {
cond.Wait()

if i <= -1 {
break
}

if i%2 != 0 {
fmt.Println("i is", i, ", odd")
} else {
fmt.Println("i is", i, ", not odd")
}
}
fmt.Println("i(", i, ") <= -1, odder finish")
}

// func odder_no_wait(done chan bool) {
// defer func() {
// done <- true
// }()
// for { // 会以启动时的i值运行,且i不能变,有可能就无限循环下去了

// if i <= -1 {
// break
// }

// if i%2 != 0 {
// fmt.Println("i is", i, ", odd")
// } else {
// fmt.Println("i is", i, ", not odd")
// }
// }
// fmt.Println("i(", i, ") <= -1, odder finish")
// }

func evener(cond *sync.Cond, done chan bool) {
cond.L.Lock()
defer func(cond *sync.Cond, done chan bool) {
cond.L.Unlock()
done <- true
}(cond, done)
for {
cond.Wait()

if i <= -1 {
break
}

if i%2 == 0 {
fmt.Println("i is", i, ", even")
} else {
fmt.Println("i is", i, ", not event")
}
}
fmt.Println("i(", i, ") <= -1, evener finish")
}

func main() {
var l sync.Mutex
cond := sync.NewCond(&l)
end := make(chan bool, 2)

go odder(cond, end)
go evener(cond, end)

// 确保所有 goroutine 都阻塞在wait处
time.Sleep(time.Millisecond * 200)

// 第一批,广播i=3
time.Sleep(time.Millisecond * 200)
cond.L.Lock() // lock的意义何在暂时不清楚
i = 3
fmt.Println("broadcast, i=", i)
cond.Broadcast()
cond.L.Unlock()

// 第二批,选一个协程,告诉它,i=2
time.Sleep(time.Millisecond * 200)
i = 2
fmt.Println("signal, i=", i)
cond.Signal()
// 第三批,第四批,第五批...

// 最后一次,广播i=-1,结束所有协程
time.Sleep(time.Microsecond * 200)
i = -1
fmt.Println("broadcast, i=", i)
cond.Broadcast()

<-end
<-end
}

TODO 临时对象池

并发安全map

并发中的map

一个普通的map
并发地读写,会报错

fatal error: concurrent map read and map write

例子

  1. 两写,没问题
  2. 一写一读,报错
  3. 两写,报错
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

func main() {
test := map[int]int{1: 1}
end := make(chan bool)
const N = 1e5

go func() {
defer func() {
end <- true
}()
for i := 0; i < N; i++ {
test[1] = 1 // 一写
}
}()

go func() {
defer func() {
end <- true
}()
for i := 0; i < N; i++ {
_ = test[1] // 一读
}
}()

<-end
<-end
}

简单的解决办法

有个锁就行.
互斥锁那边写了将锁放在资源里,
这里将锁放在资源外也行.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import "sync"

func main() {
test := map[int]int{1: 1}
var s sync.Mutex
end := make(chan bool)
const N = 1e5

go func() {
defer func() {
end <- true
}()
for i := 0; i < N; i++ {
s.Lock()
test[1] = 1 // 写
s.Unlock()
}
}()

go func() {
defer func() {
end <- true
}()
for i := 0; i < N; i++ {
s.Lock()
_ = test[1] // 读
s.Unlock()
}
}()

<-end
<-end
}

不知道为什么 RWMutex.RLock 的组合还是会报错.

优点:

  1. 概念简单
  2. 对场景要求少

缺点:

  1. 频繁的锁的确消耗性能
  2. 粒度粗,相当于表级锁而不是行级

sync的一步到位法

sync提供一种自带锁的map.
主体思想是 空间换时间, 或者称 读写分离

  1. 两个map,一个read map,一个dirty map
  2. 读时优先读read map,没有则找dirty map
    1. 不会立即拷贝read map中没有的内容
    2. 而是当数量到达一定程度,才用dirty map覆盖read map
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
"sync"
)

func main() {
test := sync.Map{}
test.Store(1, 1)
end := make(chan bool)
const N = 1e5

go func() {
defer func() {
end <- true
}()
for i := 0; i < N; i++ {
test.Store(1, 1) // 写
}
}()

go func() {
defer func() {
end <- true
}()
for i := 0; i < N; i++ {
test.Load(1)
// _, _ = test.Load(1)
}
}()

<-end
<-end
}

优点:

  1. 看起来的确性能好些
  2. 有人在并发量多于4时基本上性能已经比原始的map加锁好些

缺点

  1. 要求读多写少
    1. 如果写太多,则有频繁的dirty map覆盖read map
  2. 依然是表级锁

 

  1. 基础用法

    1. Store 增,改
    2. Load
    3. Delete
    4. LoadAndDelete
    5. LoadOrStore
    6. Range 用于遍历map
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    package main

    import (
    "fmt"
    "sync"
    )

    func main() {
    test := sync.Map{}
    test.Store(1, 1) // 增
    test.Store(1, 2) // 改
    test.Store(2, 3)

    value, _ := test.Load(1) // 查
    fmt.Println(value)

    test.Delete(1) // 删
    if _, ok := test.Load(1); !ok {
    fmt.Println("key 1 deleted")
    }

    fmt.Println(test.LoadAndDelete(2))
    if _, ok := test.Load(2); !ok {
    fmt.Println("key 2 deleted")
    }

    fmt.Println(test.LoadOrStore(1, 1)) // Store(1, 1)
    fmt.Println(test.LoadOrStore(1, 2)) // Load(1)
    test.LoadOrStore(2, 3) // Store(2, 3)

    fmt.Println("traverse")
    test.Range(func(k, v interface{}) bool {
    fmt.Printf("%v:%v\n", k, v)
    return true // 若返回fasle,遍历立即结束
    })
    }

参考

  1. 契机参考1
  2. 契机参考2
  3. 关于Mutex和RWMutex的性能
  4. Cond的易懂介绍
  5. map的并发
  6. map的主要知识点
  7. sync.Map的性能讨论