Go语言-协程

摘要

本文内容来源于网络,个人收集整理,请勿传播

目前几种主流的并发模型:

  • 多线程,每个线程一次处理一个请求,在当前请求处理完成之前不会接收其它请求;但在高并发环境下,多线程的开销比较大;
  • 基于回调的异步IO,如Nginx服务器使用的epoll模型,这种模式通过事件驱动的方式使用异步IO,使服务器持续运转,但人的思维模式是串行的,大量回调函数会把流程分割,对于问题本身的反应不够自然;
  • 协程,不需要抢占式调度,可以有效提高线程的任务并发性,而避免多线程的缺点;但原生支持协程的语言还很少。

协程coroutine是Go语言中的轻量级线程实现,由Go运行时runtime管理。

在一个函数调用前加上go关键字,这次调用就会在一个新的goroutine中并发执行。当被调用的函数返回时,这个goroutine也自动结束。需要注意的是,如果这个函数有返回值,那么这个返回值会被丢弃。

协程是 golang 并发的最小单元,类似于其他语言的线程,只不过线程的实现借助了操作系统的实现,每次线程的调度都是一次系统调用,需要从用户态切换到内核态,这是一项非常耗时的操作,因此一般的程序里面线程太多会导致大量的性能耗费在线程切换上。而在 golang 内部实现了这种调度,协程在这种调度下面的切换非常的轻量级,成百上千的协程跑在一个 golang 程序里面是很正常的事情

goroutine

goroutine是一个轻量级的执行线程。假设有一个函数调用f(s),要在goroutine中调用此函数,请使用go f(s)。 这个新的goroutine将与调用同时执行。

https://www.zhihu.com/question/20862617#answer-6525538

https://studygolang.com/articles/1633

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import "fmt"

func f(from string) {
for i := 0; i < 3; i++ {
fmt.Println(from, ":", i)
}
}

func main() {
f("direct")

go f("goroutine")

go func(msg string) {
fmt.Println(msg)
}("going")

var input string
fmt.Scanln(&input)
fmt.Println("done")
}

channel

goroutine之间通过channel来通讯,可以认为channel是一个管道或者先进先出的队列。你可以从一个goroutine中向channel发送数据,在另一个goroutine中取出这个值。

1
2
3
4
5
6
7
8
9
var channel chan int = make(chan int)
// 或
channel := make(chan int)

ic := make(chan int, 10) // 申明一个通道
ic <- 10 // 往通道里面放
i := <- ic // 从通道里面取

close(ic) // 关闭通道

生产者消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
)

func main(){
buf:=make(chan int)
flg := make(chan int)
go producer(buf)
go consumer(buf, flg)
<-flg //等待接受完成
}

func producer(c chan int){
defer close(c) // 关闭channel
for i := 0; i < 10; i++{
c <- i // 阻塞,直到数据被消费者取走后,才能发送下一条数据
}
}

func consumer(c, f chan int){
for{
if v, ok := <-c; ok{
fmt.Print(v) // 阻塞,直到生产者放入数据后继续读取数据
}else{
break
}
}
f<-1 //发送数据,通知main函数已接受完成
}

单向通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func producer(c chan<-int){
defer close(c) // 关闭channel
for i := 0; i < 10; i++{
c <- i // 阻塞,直到数据被消费者取走后,才能发送下一条数据
}
}

func consumer(c <-chan int, f chan<-int){
for{
if v, ok := <-c; ok{
fmt.Print(v) // 阻塞,直到生产者放入数据后继续读取数据
}else{
break
}
}
f<-1 //发送数据,通知main函数已接受完成
}

生产消费缓冲

向带缓冲的channel发送数据时,只有缓冲区满时,发送操作才会被阻塞。当缓冲区空时,接收才会阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import (

"fmt"
"time"
)
func produce(p chan<- int) {
for i := 0; i < 10; i++ {
p <- i
fmt.Println("send:", i)
}
}
func consumer(c <-chan int) {
for i := 0; i < 10; i++ {
v := <-c
fmt.Println("receive:", v)
}
}
func main() {
// 缓冲区可以存储10个int类型的整数
//在执行生产者线程的时候,线程就不会阻塞,一次性将10个整数存入channel
ch := make(chan int, 10)
go produce(ch)
go consumer(ch)
time.Sleep(1 * time.Second)
}

channle超时机制

当一个channelread/write阻塞时,会被一直阻塞下去,直到channel关闭。产生一个异常退出程序。channel内部没有超时的定时器。但我们可以用select来实现channel的超时机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import (
"time"
"fmt"
)

func main(){
c := make(chan int)
select{
case <- c:
fmt.Println("没有数据")
case <-time.After(5* time.Second):
fmt.Println("超时退出")
}
}

多个消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
"SingleRisk/collection"
"fmt"
"time"
)

func main() {
t := collection.GetNodeList()
fmt.Println(t)
// 创建缓冲通道
ch := make(chan int, 10)
consNum := 2
flg := make(chan bool, consNum)
for i := 1; i <= consNum; i++ {
go consumer(ch, flg)
}
go produce(ch)
for i := 1; i <= consNum; i++ {
<-flg
}
}

// 生产者
func produce(p chan<- int) {
for i := 0; i < 15; i++ {
p <- i
fmt.Println("send:", i)
}
close(p)
}

//消费者
func consumer(c <-chan int, f chan bool) {
for {
v, ok := <-c
if ok {
fmt.Println("reveive", v)
// 模拟数据处理
time.Sleep(time.Second)
} else {
fmt.Println("数据消费完成")
break
}
}
f <- true
}

结合sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
type Product struct {
name int
value int
}

func producer(wg *sync.WaitGroup, products chan<- Product, name int, stop *bool) {
for !*stop {
product := Product{name: name, value: rand.Int()}
products <- product
fmt.Printf("producer %v produce a product: %#v\n", name, product)
time.Sleep(time.Duration(200+rand.Intn(1000)) * time.Millisecond)
}
wg.Done()
}

func consumer(wg *sync.WaitGroup, products <-chan Product, name int) {
for product := range products {
fmt.Printf("consumer %v consume a product: %#v\n", name, product)
time.Sleep(time.Duration(200+rand.Intn(1000)) * time.Millisecond)
}
wg.Done()
}


func main() {
var wgp sync.WaitGroup
var wgc sync.WaitGroup
stop := false
products := make(chan Product, 10)

// 创建 5 个生产者和 5 个消费者
for i := 0; i < 5; i++ {
go producer(&wgp, products, i, &stop)
go consumer(&wgc, products, i)
wgp.Add(1)
wgc.Add(1)
}

time.Sleep(time.Duration(1) * time.Second)
stop = true // 设置生产者终止信号
wgp.Wait() // 等待生产者退出
close(products) // 关闭通道
wgc.Wait() // 等待消费者退出
}

sync.WaitGroup

1
2
3
4
var wg sync.WaitGroup  // 申明一个信号量
wg.Add(1) // 信号量加一
wg.Done() // 信号量减一
wg.Wait() // 信号量为正时阻塞,直到信号量为0时被唤醒

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
"runtime"
"fmt"
"sync"
"database/sql"
_ "github.com/go-sql-driver/mysql"
"time"
)

//定义任务队列
var waitgroup sync.WaitGroup

func xtgxiso(num int) {
//fmt.Println(num)
db, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/test?charset=utf8")
if err != nil {
fmt.Println(err)
}
defer db.Close()
rows, err := db.Query("select sleep(1) as a")
if err != nil {
fmt.Println(err)
}
defer rows.Close()
var a string
for rows.Next() {
err = rows.Scan(&a)
if err != nil {
fmt.Println(err)
} else {
//fmt.Println(a)
}
}
waitgroup.Done() //任务完成,将任务队列中的任务数量-1,其实.Done就是.Add(-1)
}

func main() {
//记录开始时间
start := time.Now()
//设置最大的可同时使用的CPU核数和实际cpu核数一致
runtime.GOMAXPROCS(1)
for i := 1; i <= 10; i++ {
waitgroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
go xtgxiso(i)
}
waitgroup.Wait() //Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
//记录结束时间
end := time.Now()
//输出执行时间,单位为秒。
fmt.Println(end.Sub(start).Seconds())
}

select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import "time"
import "fmt"

func main() {

c1 := make(chan string)
c2 := make(chan string)

go func() {
time.Sleep(time.Second * 1)
c1 <- "one"
}()
go func() {
time.Sleep(time.Second * 2)
c2 <- "two"
}()

for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
"fmt"
"time"
)

func main() {
timeout := make(chan bool, 1)
go func() {
fmt.Println("------------ 子任务1--------------")
t1 := time.Now().UnixNano()
fmt.Println(t1)
fmt.Println("这个一定会执行")

time.Sleep(3 * time.Second)
// timeout <- true
timeout <- true
}()

fmt.Println("首先逻辑还是响应 main 函数")

go func() {
fmt.Println("------------ 子任务2--------------")
t2 := time.Now().UnixNano()
fmt.Println(t2)
fmt.Println("相当于fork一个子任务在进行")
}()

ch := make(chan int)
select {
case <-ch:
case <-timeout:
fmt.Println("------------ 回到main函数 --------------")
fmt.Println("task is timeout!")
}

fmt.Println("main 函数本身的输出")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
)

func main(){
c := make(chan int)
quit := make(chan int)
go func(){
for i := 0; i < 10; i++{
fmt.Printf("%d ", <-c)
}
quit <- 1
}()
testMuti(c, quit)
}

func testMuti(c, quit chan int){
x, y := 0, 1
for {
select{
case c<-x:
x, y = y, x+y
case <-quit:
fmt.Print("\nquit")
return
}
}
}

runtime

runtime包中有几个处理goroutine的函数

runtime.GOMAXPROCS(1)

  • 计算密集型使用多核心要快很多
  • io密集型使用多核心很有可能比单核心要慢

下面是计算密集型使用多核心的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"runtime"
"fmt"
"sync"
"time"
)

//定义任务队列
var waitgroup sync.WaitGroup

func xtgxiso(num int) {
for i:=1;i<=1000000000;i++{
num = num+i
num = num-i
num = num*i
num = num/i
}
waitgroup.Done() //任务完成,将任务队列中的任务数量-1,其实.Done就是.Add(-1)
}

func main() {
//记录开始时间
start := time.Now()
//设置最大的可同时使用的CPU核数和实际cpu核数一致
runtime.GOMAXPROCS(1)
for i := 1; i <= 10; i++ {
waitgroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
go xtgxiso(i)
}
waitgroup.Wait() //Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞 //记录结束时间
end := time.Now()
//输出执行时间,单位为秒。
fmt.Println(end.Sub(start).Seconds())
}

runtime.NumCPU()

返回了cpu核数

runtime.NumGoroutine()

返回当前进程的goroutine线程数。即便我们没有开启新的goroutine

1
2
3
4
5
6
7
8
9
10
package main

import (
"runtime"
"fmt"
)
func main(){
fmt.Println(runtime.NumCPU())
fmt.Println(runtime.NumGoroutine())
}

runtime.Gosched

Gosched()让当前正在执行的goroutine放弃CPU执行权限。调度器安排其他正在等待的线程运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"runtime"
"fmt"
)

func main(){
go sayHello()
go sayWorld()
var str string
fmt.Scan(&str)
}

func sayHello(){
for i := 0; i < 10; i++{
fmt.Print("hello ")
runtime.Gosched()
}
}

func sayWorld(){
for i := 0; i < 10; i++ {
fmt.Println("world")
runtime.Gosched()
}
}

runtime.Goexit

runtime.Goexit()函数用于终止当前的goroutine,单defer函数将会继续被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"runtime"
"fmt"
)

func test(){
defer func(){
fmt.Println(" in defer")
}()
for i := 0; i < 10; i++{
fmt.Print(i)
if i > 5{
runtime.Goexit()
}
}
}

func main(){
go test()
// 等待输入,在这里用来阻止主线程关闭
var str string
fmt.Scan(&str)
}