背景
许多源码中都用到sync中的锁,
许多教程中也讲到sync与channel在并发时的优劣问题.
资源争抢
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package mainimport ( "fmt" "time" ) const N = 1e7 type test struct { a int } type chann chan int func (t *test) addOne (done chann) { for i := 0 ; i < N; i++ { t.a += 1 done <- 0 } } func (t *test) subOne (done chann) { for i := 0 ; i < N; i++ { t.a -= 1 done <- 0 } } func main () { start := time.Now() t := &test{a: 0 } finished := make (chann, 1000 ) go t.addOne(finished) go t.subOne(finished) for i := 0 ; i < N; i++ { <-finished <-finished } fmt.Println(t.a) fmt.Println(time.Since(start)) }
典型的争抢原因:
一个任务分几步
读寄存器
运算
写寄存器
A与B都读,A与B都计算,A与B都写.导致有一个的结果被覆盖
结果导致
N次一加一减,结果不为0
N次两个都加,结果不足2N
等等
介绍
sync包提供了异步中常用的工具
Mutex 互斥锁
RWMutex 读写锁
WaitGroup 等待组
Once 单次执行
Cond 条件变量
Pool 临时对象池
Map 并发安全的map
互斥锁
概念和使用都最为简单
资源上带一个字段
这里为了不引入更多复杂度,使用具名字段
使用匿名字段可以少写一些字母
使用具名字段可以方便编辑器多光标操作,同时注释所有Lock,Unlock
用在结构体中不需要明确初始化
操作资源时锁上 resource.mu.Lock()
操作完毕解开 resource.mu.Unlock()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package mainimport ( "fmt" "sync" "time" ) const N = 1e7 type test struct { a int mu sync.Mutex } type chann chan int func (t *test) addOne (done chann) { for i := 0 ; i < N; i++ { t.mu.Lock() t.a += 1 t.mu.Unlock() done <- 0 } } func (t *test) subOne (done chann) { for i := 0 ; i < N; i++ { t.mu.Lock() t.a -= 1 t.mu.Unlock() done <- 0 } } func main () { start := time.Now() t := &test{a: 0 } finished := make (chann, 1000 ) go t.addOne(finished) go t.subOne(finished) for i := 0 ; i < N; i++ { <-finished <-finished } fmt.Println(t.a) fmt.Println(time.Since(start)) }
功能上,已经能够保证最终结果为0.
性能上,加锁和解锁理论上的确会造成一定的性能浪费.
实际测试中每次的时间不确定,只有大概结果.
读写锁和性能测试
RWMutex
,实际上是多读单写锁,简称读写锁.
搭配RLock,RUnlock(),可实现
读写读与写互斥,写与写互斥,读与读不互斥.
Lock(), Unlock(), 普通互斥锁
RLock(), RUnlock(), 读写锁
按照参考的说法,当读写过程耗时1ms.
读多写少,RWMutex好7倍
读写相当,RWMutex好1倍
读少写多,RWMutex与Mutex性能相当
若读写过程耗时下降,则RWMutex的优势也下降.但总体趋势不变
若读写过程耗时上升,整体优劣不发生明显改变
单次执行
有时候特别希望for循环里的一个过程只执行一次.
sync.Once
就是这样的工具.
主要方法:
Do 将被执行的函数放入Do中,即可只执行一次
1 2 3 4 5 6 7 8 9 10 11 func main () { var once sync.Once onceBody := func () { fmt.Println("lambda function" ) } for i := 0 ; i < 8 ; i++ { once.Do(onceBody) } }
等待组
比如要在main函数中,等待所有子goroutine结束再退出.
可以用的方法:
专门为结束信号量建立一个channel,启动多少个,就从channel取出多少个,取不够就阻塞,取够就说明已经结束,可以退出
用 WaitGroup
WaitGroup的方法主要有
Add,添加一个等待,但不区分是谁,goroutine应该是公平的
Done,由被等待的goroutine发出,要求将wg指针传递给启动的goroutine函数
Wait,等待所有子goroutine结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package mainimport ( "fmt" "sync" ) func work (wg *sync.WaitGroup, id int ) { fmt.Println(id) wg.Done() } func main () { var wg sync.WaitGroup for i := 0 ; i < 8 ; i++ { wg.Add(1 ) go work(&wg, i) } wg.Wait() fmt.Println("all child goroutine finished" ) }
条件变量
sync.Cond
条件变量是一种信号通知机制.
用途广泛
写协程B,必须依赖读协程A的结果,让A读取完后通知B
其他信息交流,暂时想不到
但缺点也多
有时需要全局变量,因wait只能带来是否继续而不能等来真正的消息
书写复杂
因此有时也会用 WaitGroup
等代替
常用的方法有
Signal
, 随机通知一个协程
BroadCase
, 通知所有协程
Wait
, 阻塞并等待通知
为了完成这些功能.需要先
sync.NewCond(locker *sync.Mutex)
用 cond.L.Lock()
和 cond.L.Unlock()
包裹wait过程
wait本身是 unlock, lock
因此代码中想wait,必须先lock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package mainimport ( "fmt" "sync" "time" ) var locker = new (sync.Mutex)var cond = sync.NewCond(locker)func main () { for i := 0 ; i < 10 ; i++ { go func (x int ) { cond.L.Lock() defer cond.L.Unlock() cond.Wait() fmt.Println("I'm" , x) }(i) } time.Sleep(time.Millisecond * 500 ) time.Sleep(time.Millisecond * 500 ) fmt.Println("Signal..." ) cond.Signal() time.Sleep(time.Millisecond * 500 ) fmt.Println("Signal..." ) cond.Signal() time.Sleep(time.Millisecond * 500 ) fmt.Println("Signal..." ) cond.Signal() time.Sleep(time.Millisecond * 500 ) fmt.Println("Broadcast..." ) cond.Broadcast() time.Sleep(time.Second) }
将wait放在for中,则可以让协程一次次阻塞,
当然,可以在阻塞期间修改一些变量,
阻塞过后可以对新的值进行判断.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 package mainimport ( "fmt" "sync" "time" ) var i = 5 func odder (cond *sync.Cond, done chan bool ) { cond.L.Lock() defer func () { cond.L.Unlock() done <- true }() for { cond.Wait() if i <= -1 { break } if i%2 != 0 { fmt.Println("i is" , i, ", odd" ) } else { fmt.Println("i is" , i, ", not odd" ) } } fmt.Println("i(" , i, ") <= -1, odder finish" ) } func evener (cond *sync.Cond, done chan bool ) { cond.L.Lock() defer func (cond *sync.Cond, done chan bool ) { cond.L.Unlock() done <- true }(cond, done) for { cond.Wait() if i <= -1 { break } if i%2 == 0 { fmt.Println("i is" , i, ", even" ) } else { fmt.Println("i is" , i, ", not event" ) } } fmt.Println("i(" , i, ") <= -1, evener finish" ) } func main () { var l sync.Mutex cond := sync.NewCond(&l) end := make (chan bool , 2 ) go odder(cond, end) go evener(cond, end) time.Sleep(time.Millisecond * 200 ) time.Sleep(time.Millisecond * 200 ) cond.L.Lock() i = 3 fmt.Println("broadcast, i=" , i) cond.Broadcast() cond.L.Unlock() time.Sleep(time.Millisecond * 200 ) i = 2 fmt.Println("signal, i=" , i) cond.Signal() time.Sleep(time.Microsecond * 200 ) i = -1 fmt.Println("broadcast, i=" , i) cond.Broadcast() <-end <-end }
TODO 临时对象池
并发安全map
并发中的map
一个普通的map
并发地读写,会报错
fatal error: concurrent map read and map write
例子
两写,没问题
一写一读,报错
两写,报错
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package mainfunc main () { test := map [int ]int {1 : 1 } end := make (chan bool ) const N = 1e5 go func () { defer func () { end <- true }() for i := 0 ; i < N; i++ { test[1 ] = 1 } }() go func () { defer func () { end <- true }() for i := 0 ; i < N; i++ { _ = test[1 ] } }() <-end <-end }
简单的解决办法
有个锁就行.
互斥锁那边写了将锁放在资源里,
这里将锁放在资源外也行.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package mainimport "sync" func main () { test := map [int ]int {1 : 1 } var s sync.Mutex end := make (chan bool ) const N = 1e5 go func () { defer func () { end <- true }() for i := 0 ; i < N; i++ { s.Lock() test[1 ] = 1 s.Unlock() } }() go func () { defer func () { end <- true }() for i := 0 ; i < N; i++ { s.Lock() _ = test[1 ] s.Unlock() } }() <-end <-end }
不知道为什么 RWMutex.RLock
的组合还是会报错.
优点:
概念简单
对场景要求少
缺点:
频繁的锁的确消耗性能
粒度粗,相当于表级锁而不是行级
sync的一步到位法
sync提供一种自带锁的map.
主体思想是 空间换时间 , 或者称 读写分离
两个map,一个read map,一个dirty map
读时优先读read map,没有则找dirty map
不会立即拷贝read map中没有的内容
而是当数量到达一定程度,才用dirty map覆盖read map
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package mainimport ( "sync" ) func main () { test := sync.Map{} test.Store(1 , 1 ) end := make (chan bool ) const N = 1e5 go func () { defer func () { end <- true }() for i := 0 ; i < N; i++ { test.Store(1 , 1 ) } }() go func () { defer func () { end <- true }() for i := 0 ; i < N; i++ { test.Load(1 ) } }() <-end <-end }
优点:
看起来的确性能好些
有人在并发量多于4时基本上性能已经比原始的map加锁好些
缺点
要求读多写少
如果写太多,则有频繁的dirty map覆盖read map
依然是表级锁
基础用法
Store
增,改
Load
查
Delete
删
LoadAndDelete
LoadOrStore
Range
用于遍历map
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package mainimport ( "fmt" "sync" ) func main () { test := sync.Map{} test.Store(1 , 1 ) test.Store(1 , 2 ) test.Store(2 , 3 ) value, _ := test.Load(1 ) fmt.Println(value) test.Delete(1 ) if _, ok := test.Load(1 ); !ok { fmt.Println("key 1 deleted" ) } fmt.Println(test.LoadAndDelete(2 )) if _, ok := test.Load(2 ); !ok { fmt.Println("key 2 deleted" ) } fmt.Println(test.LoadOrStore(1 , 1 )) fmt.Println(test.LoadOrStore(1 , 2 )) test.LoadOrStore(2 , 3 ) fmt.Println("traverse" ) test.Range(func (k, v interface {}) bool { fmt.Printf("%v:%v\n" , k, v) return true }) }
参考
契机参考1
契机参考2
关于Mutex和RWMutex的性能
Cond的易懂介绍
map的并发
map的主要知识点
sync.Map的性能讨论