Go基础-并发安全

摘要

本文内容转自网络,个人学习记录使用,请勿传播

在go语言中 map 是很重要的数据结构。Map 是一种无序的键值对的集合。Map 最重要的一点是通过 key 来快速检索数据,key 类似于索引,指向数据的值。问题来了,这么安逸的 数据结构,它不是协程安全的 !当多个 协程同时对一个map 进行 读写时,会抛出致命错误。总结一下 想要 做到 协程安全 map 一共有以下三种方法。

不同的协程共享数据的方式除了通道之外还有就是共享变量。虽然 Go 语言官方推荐使用通道的方式来共享数据,但是通过变量来共享才是基础,因为通道在底层也是通过共享变量的方式来实现的。通道的内部数据结构包含一个数组,对通道的读写就是对内部数组的读写。

在并发环境下共享读写变量必须要使用锁来控制数据结构的安全,Go 语言内置了 sync 包,里面包含了我们平时需要经常使用的互斥锁对象 sync.Mutex。Go 语言内置的字典不是线程安全的,所以下面我们尝试使用互斥锁对象来保护字典,让它变成线程安全的字典。

线程不安全的字典

Go 语言内置了数据结构「竞态检查」工具来帮我们检查程序中是否存在线程不安全的代码。当我们在运行代码时,打开 -run 开关,程序就会在内置的通用数据结构中进行埋点检查。竞态检查工具在 Go 1.1 版本中引入,该功能帮助 Go 语言「元团队」找出了 Go 语言标准库中几十个存在线程安全隐患的 bug,这是一个非常了不起的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import "fmt"

func write(d map[string]int) {
d["fruit"] = 2
}

func read(d map[string]int) {
fmt.Println(d["fruit"])
}

func main() {
d := map[string]int{}
go read(d)
write(d)
}

上面的代码明显存在安全隐患,运行下面的竞态检查指令观察输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
$ go run -race main.go
==================
WARNING: DATA RACE
Read at 0x00c420090180 by goroutine 6:
runtime.mapaccess1_faststr()
/usr/local/Cellar/go/1.10.3/libexec/src/runtime/hashmap_fast.go:172 +0x0
main.read()
~/go/src/github.com/pyloque/practice/main.go:10 +0x5d

Previous write at 0x00c420090180 by main goroutine:
runtime.mapassign_faststr()
/usr/local/Cellar/go/1.10.3/libexec/src/runtime/hashmap_fast.go:694 +0x0
main.main()
~/go/src/github.com/pyloque/practice/main.go:6 +0x88

Goroutine 6 (running) created at:
main.main()
~/go/src/github.com/pyloque/practice/main.go:15 +0x59
==================
==================
WARNING: DATA RACE
Read at 0x00c4200927d8 by goroutine 6:
main.read()
~/go/src/github.com/pyloque/practice/main.go:10 +0x70

Previous write at 0x00c4200927d8 by main goroutine:
main.main()
~/go/src/github.com/pyloque/practice/main.go:6 +0x9b

Goroutine 6 (running) created at:
main.main()
~/go/src/github.com/pyloque/practice/main.go:15 +0x59
==================
2
Found 2 data race(s)

竞态检查工具是基于运行时代码检查,而不是通过代码静态分析来完成的。这意味着那些没有机会运行到的代码逻辑中如果存在安全隐患,它是检查不出来的。

线程安全的字典

让字典变的线程安全,就需要对字典的所有读写操作都使用互斥锁保护起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package main

import "fmt"
import "sync"

type SafeDict struct {
data map[string]int
mutex *sync.Mutex
}

func NewSafeDict(data map[string]int) *SafeDict {
return &SafeDict{
data: data,
mutex: &sync.Mutex{},
}
}

func (d *SafeDict) Len() int {
d.mutex.Lock()
defer d.mutex.Unlock()
return len(d.data)
}

func (d *SafeDict) Put(key string, value int) (int, bool) {
d.mutex.Lock()
defer d.mutex.Unlock()
old_value, ok := d.data[key]
d.data[key] = value
return old_value, ok
}

func (d *SafeDict) Get(key string) (int, bool) {
d.mutex.Lock()
defer d.mutex.Unlock()
old_value, ok := d.data[key]
return old_value, ok
}

func (d *SafeDict) Delete(key string) (int, bool) {
d.mutex.Lock()
defer d.mutex.Unlock()
old_value, ok := d.data[key]
if ok {
delete(d.data, key)
}
return old_value, ok
}

func write(d *SafeDict) {
d.Put("banana", 5)
}

func read(d *SafeDict) {
fmt.Println(d.Get("banana"))
}

func main() {
d := NewSafeDict(map[string]int{
"apple": 2,
"pear": 3,
})
go read(d)
write(d)
}

尝试使用竞态检查工具运行上面的代码,会发现没有了刚才一连串的警告输出,说明 Get 和 Put 方法已经做到了协程安全,但是还不能说明 Delete() 方法是否安全,因为它根本没有机会得到运行。

在上面的代码中我们再次看到了 defer 语句的应用场景 —— 释放锁。defer 语句总是要推迟到函数尾部运行,所以如果函数逻辑运行时间比较长,这会导致锁持有的时间较长,这时使用 defer 语句来释放锁未必是一个好注意。

避免锁复制

上面的代码中还有一个需要特别注意的地方是 sync.Mutex 是一个结构体对象,这个对象在使用的过程中要避免被复制 —— 浅拷贝。复制将会导致锁被「分裂」了,也就起不到保护的作用。所以在平时的使用中要尽量使用它的指针类型。读者可以尝试将上面的类型换成非指针类型,然后运行一下竞态检查工具,会看到警告信息再次布满整个屏幕。锁复制存在于结构体变量的赋值、函数参数传递、方法参数传递中,都需要注意。

使用匿名锁字段

在结构体章节,我们知道外部结构体可以自动继承匿名内部结构体的所有方法。如果将上面的 SafeDict 结构体进行改造,将锁字段匿名,就可以稍微简化一下代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main

import "fmt"
import "sync"

type SafeDict struct {
data map[string]int
*sync.Mutex
}

func NewSafeDict(data map[string]int) *SafeDict {
return &SafeDict{data, &sync.Mutex{}}
}

func (d *SafeDict) Len() int {
d.Lock()
defer d.Unlock()
return len(d.data)
}

func (d *SafeDict) Put(key string, value int) (int, bool) {
d.Lock()
defer d.Unlock()
old_value, ok := d.data[key]
d.data[key] = value
return old_value, ok
}

func (d *SafeDict) Get(key string) (int, bool) {
d.Lock()
defer d.Unlock()
old_value, ok := d.data[key]
return old_value, ok
}

func (d *SafeDict) Delete(key string) (int, bool) {
d.Lock()
defer d.Unlock()
old_value, ok := d.data[key]
if ok {
delete(d.data, key)
}
return old_value, ok
}

func write(d *SafeDict) {
d.Put("banana", 5)
}

func read(d *SafeDict) {
fmt.Println(d.Get("banana"))
}

func main() {
d := NewSafeDict(map[string]int{
"apple": 2,
"pear": 3,
})
go read(d)
write(d)
}

使用读写锁

日常应用中,大多数并发数据结构都是读多写少的,对于读多写少的场合,可以将互斥锁换成读写锁,可以有效提升性能。sync 包也提供了读写锁对象 RWMutex,不同于互斥锁只有两个常用方法 Lock() 和 Unlock(),读写锁提供了四个常用方法,分别是写加锁 Lock()、写释放锁 Unlock()、读加锁 RLock() 和读释放锁 RUnlock()。写锁是拍他锁,加写锁时会阻塞其它协程再加读锁和写锁,读锁是共享锁,加读锁还可以允许其它协程再加读锁,但是会阻塞加写锁。

读写锁在写并发高的情况下性能退化为普通的互斥锁

下面我们将上面代码中 SafeDict 的互斥锁改造成读写锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main

import "fmt"
import "sync"

type SafeDict struct {
data map[string]int
*sync.RWMutex
}

func NewSafeDict(data map[string]int) *SafeDict {
return &SafeDict{data, &sync.RWMutex{}}
}

func (d *SafeDict) Len() int {
d.RLock()
defer d.RUnlock()
return len(d.data)
}

func (d *SafeDict) Put(key string, value int) (int, bool) {
d.Lock()
defer d.Unlock()
old_value, ok := d.data[key]
d.data[key] = value
return old_value, ok
}

func (d *SafeDict) Get(key string) (int, bool) {
d.RLock()
defer d.RUnlock()
old_value, ok := d.data[key]
return old_value, ok
}

func (d *SafeDict) Delete(key string) (int, bool) {
d.Lock()
defer d.Unlock()
old_value, ok := d.data[key]
if ok {
delete(d.data, key)
}
return old_value, ok
}

func write(d *SafeDict) {
d.Put("banana", 5)
}

func read(d *SafeDict) {
fmt.Println(d.Get("banana"))
}

func main() {
d := NewSafeDict(map[string]int{
"apple": 2,
"pear": 3,
})
go read(d)
write(d)
}

线程安全map

map + 锁

这是最常见的一种操作,当要对 map操作的时候就加锁,其他的 协程就等待。下面是代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package util

import "sync"

type SafeMap struct {
Data map[string]interface{}
Lock sync.RWMutex
}

func (this *SafeMap) Get(k string) interface{} {
this.Lock.RLock()
defer this.Lock.RUnlock()
if v, exit := this.Data[k]; exit {
return v
}
return nil
}

func (this *SafeMap) Set(k string, v interface{}) {
this.Lock.Lock()
defer this.Lock.Unlock()
if this.Data == nil {
this.Data = make(map[string]interface{})
}
this.Data[k] = v
}

sync.map

这个是go推出来的 协程安全 map

这里要注意一下 sync.map 不需要 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var test sync.Map

//设置元素
func set (k,v interface{}){
test.Store(k,v)
}

//获得元素
func get (k interface{}) interface{}{
tem ,exit := test.Load(k)
if exit {
return tem
}
return nil
}

//传入一个 函数 ,sync.map 会内部迭代 ,运行这个函数
func ranggfunc (funcs func(key, value interface{}) bool) {
test.Range(funcs)
}

//删除元素
func del(key interface{}){
test.Delete(key)
}

单协程操作 map ,用 channle 通信

这个思路有点 骚,就是一直由一个协程 操作map ,其他协程 通过 channle 告诉这个协程应该 怎么操作。其实这样子 性能不是很好,因为 channle 底层 也是锁 ,而且 map 存数据 是要 计算hash的 ,之前是 多个协程自己算自己的hash ,现在变成了一个协程计算了。但是这个思路还是可以,不仅仅是 在 map上可以这么操作。socket 通信啊, 全局 唯一对象的调用啊,都可以用此思路。下面给大家看一下我是实现的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package main

import (
"fmt"
//"time"
)

var (
ADD interface{} = 1
DEL interface{} = 2
GET interface{} = 3
)


type safeMap struct {
Msq chan *[3] interface{} //['type','id','value',channle]
data map[interface{}]interface{}
chanl chan interface{}
}

func NewSafeMap() *safeMap {
tem := &safeMap{}
tem.init()
return tem
}

func (this *safeMap) init() {
this.Msq = make(chan *[3]interface{},10)
this.data = make(map[interface{}]interface{})
this.chanl = make(chan interface{},0)
go this.run()
}

func (this *safeMap) run() {
for {
select {
case msg := <- this.Msq :
switch msg[0] {
case ADD :
this.dataAdd(msg[1],msg[2])
case DEL :
this.dataDel(msg[1])
case GET :
this.dataGet(msg[1])
}
}
}
}

func (this *safeMap) msqChan (typ,id,val interface{}) *[3]interface{}{
return &[...]interface{}{typ,id,val}
}

//保存 或者更新元素
func (this *safeMap) dataAdd (id , value interface{}) {
this.data[id] = value
}

//删除元素
func (this *safeMap) dataDel (id interface{}) {
delete(this.data,id)
}

//获得元素
func (this *safeMap) dataGet (id interface{}) {
if val ,exit := this.data[id] ;exit {
this.chanl <- val
return
}
this.chanl <- nil
}

//----------------------------------------------------对外接口--------------------------------
func (this *safeMap) Add (id ,value interface{}) {
this.Msq <- this.msqChan(ADD,id,value)
}

func (this *safeMap) Del (id interface{}) {
this.Msq <- this.msqChan(DEL,id ,nil)
}

func (this *safeMap) Get (id interface{}) interface{} {
this.Msq <- this.msqChan(GET,id,nil)
res := <- this.chanl
return res
}

//获得 长度
func (this *safeMap) GetLength() uint32{
return uint32(len(this.data))
}


func main() {
sa := NewSafeMap()

// sa.Add(1,1)
sa.Add(2,3)
fmt.Println(2,sa.Get(2))
sa.Del(2)
fmt.Println(2,sa.Get(2))
}