1.介绍
sync
包提供了互斥锁。除了Once
和WaitGroup
类型,其余多数适用于低水平的程序,多数情况下,高水平的同步使用channel
通信性能会更优一些。
2.并发等待组(WaitGroup)
WaitGroup
,即等待一组Goroutine
结束。父Goroutine
调用Add()
方法来设置应等待Goroutine
的数量。每个被等待的Goroutine
在结束时应该调用Done()
方法。与此同时,主Goroutine
可调用Wait()
方法阻塞至所有Goroutine
结束。
2.1 WaitGroup结构
1 2 3 4 5
| type WaitGroup struct { noCopy noCopy state1 [3]uint32 }
|
2.2 方法列表
方法名 |
功能 |
(wg *WaitGroup) Add(delta int) |
等待组的计数器 +1 |
(wg *WaitGroup) Done() |
等待组的计数器 -1 |
(wg *WaitGroup) Wait() |
当等待组计数器不等于0时,阻塞直到0 |
2.3 Add参数取值范围
等待组内部拥有一个计数器,计数器的值可以通过Add(delta int)
方法调用实现计数器的增加和减少。该方法应该在创建新的Goroutine
之前调用。
参数值x取值
取值 |
描述 |
delta < 0 |
x 小于0时,但会报错: panic: sync: negative WaitGroup counter |
delta = 0 |
x 等于0时,会释放Wait() 方法阻塞等待的所有Goroutine |
delta > 0 |
x 大于0时,Wait() 方法会阻塞Goroutine 直到WaitGroup 计数减为0 |
2.4 使用示例
1. 不使用WaitGroup示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package main import ( "fmt" "time" ) func main() { intChan := make(chan int)
go func(intChan chan int) { sum := 0 for i := 1; i<= 50 ; i++ { sum += i } intChan <- sum }(intChan) go func(intChan chan int) { sum := 0 for i := 51; i<= 100 ; i++ { sum += i } intChan <- sum }(intChan) go func(intChan chan int) { sum1 := <- intChan sum2 := <- intChan fmt.Printf("sum1 = %d sum2 = %d \nsum1 + sum2 = %d \n",sum1,sum2,sum1+sum2) }(intChan)
time.Sleep( time.Second) fmt.Println("运行结束") }
|
2.使用WaitGroup示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup wg.Add(3) intChan := make(chan int) go func(intChan chan int, wg *sync.WaitGroup) { sum := 0 for i := 1; i <= 50; i++ { sum += i } intChan <- sum wg.Done() }(intChan, &wg) go func(intChan chan int, wg *sync.WaitGroup) { sum := 0 for i := 51; i <= 100; i++ { sum += i } intChan <- sum wg.Done() }(intChan, &wg) go func(intChan chan int,wg *sync.WaitGroup) { sum1 := <-intChan sum2 := <-intChan fmt.Printf("sum1 = %d sum2 = %d \nsum1 + sum2 = %d \n", sum1, sum2, sum1+sum2) wg.Done() }(intChan,&wg) wg.Wait() fmt.Println("运行结束") }
|
3.互斥锁(Mutex)
Mutex
是一个互斥锁,保证同时只有一个Goroutine可以访问共享资源。Mutex
类型的锁和Goroutine
无关,可以由不同的Goroutine
加锁和解锁。也可以为其他结构体的字段,零值为解锁状态。
3.1 结构介绍
1 2 3 4
| type Mutex struct { state int32 sema uint32 }
|
3.2 方法列表
方法名 |
描述 |
(m *Mutex) Lock() |
方法锁住m ,如果 m 已经加锁,则阻塞直到 m 解锁。 |
(m *Mutex) Unlock() |
解锁 m ,如果 m 未加锁会导致运行时错误。 |
3.3 使用(售票)
需求:模拟多个窗口售票
1.不作为结构体属性使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| package main import ( "fmt" "sync" "time" )
var wg sync.WaitGroup
var mutex sync.Mutex
var ticket int = 10 func main() { wg.Add(3) go saleTicket("窗口A",&wg) go saleTicket("窗口B",&wg) go saleTicket("窗口C",&wg) wg.Wait() fmt.Println("运行结束!") }
func saleTicket(windowName string, wg *sync.WaitGroup) { defer wg.Done() for { mutex.Lock() if ticket > 0{ time.Sleep(10 * time.Millisecond) ticket-- fmt.Printf("%s 卖出一张票,余票: %d \n",windowName,ticket) } else { fmt.Printf("%s 票已卖完! \n",windowName) mutex.Unlock() break } mutex.Unlock() } }
|
2.作为结构体属性使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| package main import ( "fmt" "strconv" "sync" "time" )
type ticketPool struct { over int lock sync.Mutex wg sync.WaitGroup }
func (t *ticketPool) sellTicket(windowName string) { defer t.wg.Done() for { t.lock.Lock() if t.over > 0 { time.Sleep(10 * time.Millisecond) t.over-- fmt.Printf("%s 卖出一张票,余票: %d \n", windowName, t.over) } else { t.lock.Unlock() fmt.Printf("%s 票已卖完! \n", windowName) break } t.lock.Unlock() } } func main() { ticketP := ticketPool{over: 10} fmt.Printf("T:%T v: %v \n", ticketP, ticketP) windowNum := 3 ticketP.wg.Add(windowNum) for i:= 1 ; i <= windowNum; i++ { go ticketP.sellTicket("窗口" + strconv.Itoa(i)) } ticketP.wg.Wait() fmt.Println("运行结束!") }
|
4.读写锁(RWMutex)
4.1 结构介绍
RWMutex
是读写互斥锁,简称读写锁。该锁可以同时被多个读取者持有或被唯一个写入者持有。RWMutex
类型锁跟Goroutine
无关,可以由不同的Goroutine
加锁、解锁。RWMutex
也可以创建为其他结构体的字段;零值为解锁状态。
1. RWMutex
锁结构
1 2 3 4 5 6 7
| type RWMutex struct { w Mutex writerSem uint32 readerSem uint32 readerCount int32 readerWait int32 }
|
2. 读写锁堵塞场景
- 写锁需要阻塞写锁:一个协程拥有写锁时,其他协程写锁需要阻塞
- 写锁需要阻塞读锁:一个协程拥有写锁时,其他协程读锁需要阻塞
- 读锁需要阻塞写锁:一个协程拥有读锁时,其他协程写锁需要阻塞
- 读锁不能阻塞读锁:一个协程拥有读锁时,其他协程也可以拥有读锁
4.2 方法列表
方法名 |
描述 |
(rw *RWMutex) RLock() |
获取读锁,当一个协程拥有读锁时,其他协程写锁需要阻塞。 |
(rw *RWMutex) RUnlock() |
释放读锁。 |
(rw *RWMutex) Lock() |
获取写锁,与Mutex完全一致;当一个协程拥有写锁时,其他协程读写锁都需要阻塞 |
(rw *RWMutex) Unlock() |
释放写锁 |
4.3 使用(读写文件)
1.不作为结构体属性使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package main import ( "fmt" "strconv" "sync" )
var fileContext string
var rxMutex sync.RWMutex
var wg sync.WaitGroup func main() { wg.Add(5) for i := 1; i <= 5; i++ { name := "同学-" + strconv.Itoa(i) if i%2 == 0 { go readFile(name) } else { go writeFile(name, strconv.Itoa(i)) } } wg.Wait() fmt.Println("运行结束!") }
func readFile(name string) { defer rxMutex.RUnlock() rxMutex.RLock() fmt.Printf("%s 获取读锁,读取内容为: %s \n", name, fileContext) wg.Done() }
func writeFile(name, s string) { defer rxMutex.Unlock() rxMutex.Lock() fileContext = fileContext + " " + s fmt.Printf("%s 获取写锁,写入内容: %s。 文件内容变成: %s \n", name, s, fileContext) wg.Done() }
|
2.作为结构体属性使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| package main import ( "fmt" "strconv" "sync" "time" )
type fileResource struct { content string wg sync.WaitGroup rwLock sync.RWMutex }
func (f *fileResource)readFile(name string) { defer f.rwLock.RUnlock() f.rwLock.RLock() time.Sleep(time.Second) fmt.Printf("%s 获取读锁,读取内容为: %s \n", name, f.content) f.wg.Done() }
func (f *fileResource)writeFile(name, s string) { defer f.rwLock.Unlock() f.rwLock.Lock() time.Sleep(time.Second) f.content = f.content + " " + s fmt.Printf("%s 获取写锁,写入内容: %s。 文件内容变成: %s \n", name, s, f.content) f.wg.Done() } func main() { var file fileResource file.wg.Add(5) for i := 1; i <= 5; i++ { name := "同学-" + strconv.Itoa(i) if i%2 == 0 { go file.readFile(name) } else { go file.writeFile(name, strconv.Itoa(i)) } } file.wg.Wait() fmt.Println("运行结束!") }
|
5.条件变量(Cond)
5.1 介绍
与互斥锁不同,条件变量的作用并不是保证在同一时刻仅有一个线程访问某一个共享数据,而是在对应的共享数据的状态发生变化时,通知其他因此而被阻塞的线程。条件变量总是与互斥锁组合使用,互斥锁为共享数据的访问提供互斥支持,而条件变量可以就共享数据的状态的变化向相关线程发出通知。
使用场景: 我需要完成一项任务,但是这项任务需要满足一定条件才可以执行,否则我就等着。
sync.Cond 经常用在多个 goroutine 等待,一个 goroutine 通知(事件发生)的场景。如果是一个通知,一个等待,使用互斥锁或 channel 就能搞定了。
我们想象一个非常简单的场景:
有一个协程在异步地接收数据,剩下的多个协程必须等待这个协程接收完数据,才能读取到正确的数据。在这种情况下,如果单纯使用 chan 或互斥锁,那么只能有一个协程可以等待,并读取到数据,没办法通知其他的协程也读取数据。
这个时候,就需要有个全局的变量来标志第一个协程数据是否接受完毕,剩下的协程,反复检查该变量的值,直到满足要求。或者创建多个 channel,每个协程阻塞在一个 channel 上,由接收数据的协程在数据接收完毕后,逐个通知。总之,需要额外的复杂度来完成这件事。
Go 语言在标准库 sync 中内置一个 sync.Cond 用来解决这类问题。
5.2 方法列表
方法名 |
描述 |
NewCond(l Locker) *Cond |
生成一个cond ,需要传入实现Locker 接口的变量。一般是*Mutex 或*RWMutex 类型的值。 |
(c *Cond) Wait() |
等待通知 |
(c *Cond) Signal() |
发送单个通知 |
(c *Cond) Broadcast() |
广播(多个通知) |
5.3 使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| package main import ( "fmt" "sync" "time" ) func main() { var mutex sync.Mutex cond := sync.NewCond(&mutex) for i := 1; i <= 10; i++ { go func(i int) { cond.L.Lock() defer cond.L.Unlock() cond.Wait() fmt.Printf("输出:%d ! \n", i) }(i) } time.Sleep(time.Second) fmt.Println("单个通知A!") cond.Signal() time.Sleep(time.Second) fmt.Println("单个通知B!") cond.Signal()
time.Sleep(time.Second) fmt.Println("广播通知!并睡眠1秒,等待其他协程输出!") cond.Broadcast() time.Sleep(time.Second) fmt.Println("运行结束!") }
|
6.一次(Once)
sync.Once
是使Go方法只执行一次的对象实现,作用与 init
函数类似,但也有所不同。区别如下:
- init 函数是在文件包首次被加载的时候执行,且只执行一次
- sync.Onc 是在代码运行中需要的时候执行,且只执行一次
6.1 方法介绍
方法名 |
描述 |
(o *Once) Do(f func()) |
函数只会执行一次,并保证在返回时,传入Do 的函数已经执行完成。多个 goroutine 同时执行 once.Do 的时候,可以保证抢占到 once.Do 执行权的 goroutine 执行完 once.Do 后,其他goroutine`才能得到返回 。 |
6.2 使用示例
示例1: 重复调用只执行一次
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package main import ( "fmt" "strconv" "sync" "time" ) func main() { echo := func() { t := time.Now().Unix() fmt.Printf("输出时间 %v ",strconv.FormatInt(t,10)) } var one sync.Once for i := 1; i< 10 ; i++ { go func(a,b int) { one.Do(echo) }(i,i+1) } time.Sleep(3 * time.Second) fmt.Println("运行结束!") }
|
7.对象池(Pool
)
7.1 为什么使用?
Go
语言是支持垃圾自动回收的。对于一些暂时用不到但是后续会用到的对象,为了提升性能,可以先暂存起来,这虽然会占用一些内存,但是比起销毁了再新建,要节省运行时间。Go
语言专门提供了暂存对象的工具,就是sync.Pool
。
sync.Pool
是一个对象池,它是并发安全的,而且大小是可伸缩的,仅受限于内存。当需要使用对象的时候可以从对象池中直接取出使用。
7.2 数据结构
1 2 3 4 5 6 7 8 9 10 11
| type Pool struct { noCopy noCopy local unsafe.Pointer localSize uintptr
victim unsafe.Pointer victimSize uintptr
New func() interface{} }
|
7.3 方法列表
sync.Pool
提供以下两个公共方法,用来操作对象池。
方法名 |
描述 |
(p *Pool) Put(x interface{}) |
向池中添加对象 |
(p *Pool) Get() interface{} |
从池中获取对象 |
Get
方法是从池中获取对象,如果没有对象则调用New
方法创建生成,如果未设置New
则返回nil
。
7.4 使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package main import ( "fmt" "sync" ) func main() { pool := sync.Pool{ New: func() interface{}{ return make([]string,5) }, } fmt.Printf("不设置直接获取: %v\n",pool.Get()) pool.Put([]string{"Hello","Word"}) fmt.Printf("设置后,第一次获取: %v\n",pool.Get()) fmt.Printf("设置后,第二次获取: %v\n",pool.Get()) }
|
7.5 注意事项
存入sync.Pool
的对象可能会在不通知的情况下被释放,这一点一定要注意。比如一些socket
长连接就不适合存入sync.Pool
内。
8.sync.Map
如果要缓存的数据量不大,可以考虑使用sync.Map
(Go 1.9+版本支持
)。在1.6
版本以前,Go
语言自带标准的map
类型是并发读安全的,但是并发写不安全。
8.1 查询和新增
a.查找方法:
-
Load
: 通过参数key
查询对应的value
,如果不存在则返回nil
;ok
表示是否找到对应的值。
b.新增方法:
-
Store
: 对sync.Map
的更新或新增,参数是键值对
-
LoadOrStore
: 参数为key
和value
。根据参数key
查找对应的value
,如果找到,则不修改原来的值并通过actual
返回,并且loaded
为true
;如果未找到,则存储key-value
并且将存储的value
通过actual
返回,loaded
为false
。
c.使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package main import ( "fmt" "sync" ) func main() { var syncMap sync.Map syncMap.Store("name","张三") load, _ := syncMap.Load("name") fmt.Printf("Store新增->name:%v\n",load) store, loaded := syncMap.LoadOrStore("name", "李四") fmt.Printf("找到则返回旧值-> name:%v loaded:%v \n",store,loaded) age, loaded := syncMap.LoadOrStore("age", 20) fmt.Printf("找不到则新增-> age:%v loaded:%v \n",age,loaded) }
|
8.2 删除
a.方法列表:
-
LoadAndDelete
: 根据参数key
删除对应的value
,如果找到则删除,并通过value
返回删除的值,并设置loaded
为true
;如果未找到,则value
返回nil
,loaded
为false
。
-
Delete
:根据参数key
删除对应的value
。
b.使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package main import ( "fmt" "sync" ) func main() { var syncMap sync.Map syncMap.Store("name","张三") syncMap.Store("age",20)
andDelete, loaded := syncMap.LoadAndDelete("name") fmt.Printf("找到-> val:%v loaded:%v \n",andDelete,loaded) search, ok := syncMap.Load("name") fmt.Printf("删除name后查找-> search:%v ok:%v \n",search,ok)
andDelete2, loaded := syncMap.LoadAndDelete("name2") fmt.Printf("找不到-> val:%v loaded:%v \n",andDelete2,loaded)
syncMap.Delete("age") searchAge, ok := syncMap.Load("name") fmt.Printf("删除age后查找-> searchAge:%v ok:%v \n",searchAge,ok) }
|
8.3 遍历
sync.Map
不能通过for...range
遍历,只能通过包提供的方法Range
进行遍历。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package main import ( "fmt" "sync" ) func main() { var syncMap sync.Map syncMap.Store("name", "张三") syncMap.Store("age", 20) syncMap.Store("home", "天津永和大区") syncMap.Range(func(key, value interface{}) bool { fmt.Printf("key: %v value: %v \n", key, value) return true }) }
|