Go协程

摘要

https://blog.csdn.net/u011304970/article/details/75096323?locationNum=3&fps=1

https://www.cnblogs.com/chenny7/p/4498322.html

目前几种主流的并发模型:

  • 多线程,每个线程一次处理一个请求,在当前请求处理完成之前不会接收其它请求;但在高并发环境下,多线程的开销比较大;
  • 基于回调的异步IO,如Nginx服务器使用的epoll模型,这种模式通过事件驱动的方式使用异步IO,使服务器持续运转,但人的思维模式是串行的,大量回调函数会把流程分割,对于问题本身的反应不够自然;
  • 协程,不需要抢占式调度,可以有效提高线程的任务并发性,而避免多线程的缺点;但原生支持协程的语言还很少。

协程coroutine是Go语言中的轻量级线程实现,由Go运行时runtime管理。

在一个函数调用前加上go关键字,这次调用就会在一个新的goroutine中并发执行。当被调用的函数返回时,这个goroutine也自动结束。需要注意的是,如果这个函数有返回值,那么这个返回值会被丢弃。

goroutine

goroutine是一个轻量级的执行线程。假设有一个函数调用f(s),要在goroutine中调用此函数,请使用go f(s)。 这个新的goroutine将与调用同时执行。

https://www.zhihu.com/question/20862617#answer-6525538

https://studygolang.com/articles/1633

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import "fmt"

func f(from string) {
for i := 0; i < 3; i++ {
fmt.Println(from, ":", i)
}
}

func main() {
f("direct")

go f("goroutine")

go func(msg string) {
fmt.Println(msg)
}("going")

var input string
fmt.Scanln(&input)
fmt.Println("done")
}

channel

goroutine之间通过channel来通讯,可以认为channel是一个管道或者先进先出的队列。你可以从一个goroutine中向channel发送数据,在另一个goroutine中取出这个值。

1
2
3
var channel chan int = make(chan int)
// 或
channel := make(chan int)

生产者消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
)

func main(){
buf:=make(chan int)
flg := make(chan int)
go producer(buf)
go consumer(buf, flg)
<-flg //等待接受完成
}

func producer(c chan int){
defer close(c) // 关闭channel
for i := 0; i < 10; i++{
c <- i // 阻塞,直到数据被消费者取走后,才能发送下一条数据
}
}

func consumer(c, f chan int){
for{
if v, ok := <-c; ok{
fmt.Print(v) // 阻塞,直到生产者放入数据后继续读取数据
}else{
break
}
}
f<-1 //发送数据,通知main函数已接受完成
}

单向通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func producer(c chan<-int){
defer close(c) // 关闭channel
for i := 0; i < 10; i++{
c <- i // 阻塞,直到数据被消费者取走后,才能发送下一条数据
}
}

func consumer(c <-chan int, f chan<-int){
for{
if v, ok := <-c; ok{
fmt.Print(v) // 阻塞,直到生产者放入数据后继续读取数据
}else{
break
}
}
f<-1 //发送数据,通知main函数已接受完成
}

生产消费缓冲

向带缓冲的channel发送数据时,只有缓冲区满时,发送操作才会被阻塞。当缓冲区空时,接收才会阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import (

"fmt"
"time"
)
func produce(p chan<- int) {
for i := 0; i < 10; i++ {
p <- i
fmt.Println("send:", i)
}
}
func consumer(c <-chan int) {
for i := 0; i < 10; i++ {
v := <-c
fmt.Println("receive:", v)
}
}
func main() {
// 缓冲区可以存储10个int类型的整数
//在执行生产者线程的时候,线程就不会阻塞,一次性将10个整数存入channel
ch := make(chan int, 10)
go produce(ch)
go consumer(ch)
time.Sleep(1 * time.Second)
}

channle超时机制

当一个channelread/write阻塞时,会被一直阻塞下去,直到channel关闭。产生一个异常退出程序。channel内部没有超时的定时器。但我们可以用select来实现channel的超时机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import (
"time"
"fmt"
)

func main(){
c := make(chan int)
select{
case <- c:
fmt.Println("没有数据")
case <-time.After(5* time.Second):
fmt.Println("超时退出")
}
}

sync.WaitGroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
"runtime"
"fmt"
"sync"
"database/sql"
_ "github.com/go-sql-driver/mysql"
"time"
)

//定义任务队列
var waitgroup sync.WaitGroup

func xtgxiso(num int) {
//fmt.Println(num)
db, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/test?charset=utf8")
if err != nil {
fmt.Println(err)
}
defer db.Close()
rows, err := db.Query("select sleep(1) as a")
if err != nil {
fmt.Println(err)
}
defer rows.Close()
var a string
for rows.Next() {
err = rows.Scan(&a)
if err != nil {
fmt.Println(err)
} else {
//fmt.Println(a)
}
}
waitgroup.Done() //任务完成,将任务队列中的任务数量-1,其实.Done就是.Add(-1)
}

func main() {
//记录开始时间
start := time.Now()
//设置最大的可同时使用的CPU核数和实际cpu核数一致
runtime.GOMAXPROCS(1)
for i := 1; i <= 10; i++ {
waitgroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
go xtgxiso(i)
}
waitgroup.Wait() //Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
//记录结束时间
end := time.Now()
//输出执行时间,单位为秒。
fmt.Println(end.Sub(start).Seconds())
}

select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import "time"
import "fmt"

func main() {

c1 := make(chan string)
c2 := make(chan string)

go func() {
time.Sleep(time.Second * 1)
c1 <- "one"
}()
go func() {
time.Sleep(time.Second * 2)
c2 <- "two"
}()

for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
"fmt"
"time"
)

func main() {
timeout := make(chan bool, 1)
go func() {
fmt.Println("------------ 子任务1--------------")
t1 := time.Now().UnixNano()
fmt.Println(t1)
fmt.Println("这个一定会执行")

time.Sleep(3 * time.Second)
// timeout <- true
timeout <- true
}()

fmt.Println("首先逻辑还是响应 main 函数")

go func() {
fmt.Println("------------ 子任务2--------------")
t2 := time.Now().UnixNano()
fmt.Println(t2)
fmt.Println("相当于fork一个子任务在进行")
}()

ch := make(chan int)
select {
case <-ch:
case <-timeout:
fmt.Println("------------ 回到main函数 --------------")
fmt.Println("task is timeout!")
}

fmt.Println("main 函数本身的输出")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
)

func main(){
c := make(chan int)
quit := make(chan int)
go func(){
for i := 0; i < 10; i++{
fmt.Printf("%d ", <-c)
}
quit <- 1
}()
testMuti(c, quit)
}

func testMuti(c, quit chan int){
x, y := 0, 1
for {
select{
case c<-x:
x, y = y, x+y
case <-quit:
fmt.Print("\nquit")
return
}
}
}

runtime

runtime包中有几个处理goroutine的函数

runtime.GOMAXPROCS(1)

  • 计算密集型使用多核心要快很多
  • io密集型使用多核心很有可能比单核心要慢

下面是计算密集型使用多核心的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"runtime"
"fmt"
"sync"
"time"
)

//定义任务队列
var waitgroup sync.WaitGroup

func xtgxiso(num int) {
for i:=1;i<=1000000000;i++{
num = num+i
num = num-i
num = num*i
num = num/i
}
waitgroup.Done() //任务完成,将任务队列中的任务数量-1,其实.Done就是.Add(-1)
}

func main() {
//记录开始时间
start := time.Now()
//设置最大的可同时使用的CPU核数和实际cpu核数一致
runtime.GOMAXPROCS(1)
for i := 1; i <= 10; i++ {
waitgroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
go xtgxiso(i)
}
waitgroup.Wait() //Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞 //记录结束时间
end := time.Now()
//输出执行时间,单位为秒。
fmt.Println(end.Sub(start).Seconds())
}

runtime.NumCPU()

返回了cpu核数

runtime.NumGoroutine()

返回当前进程的goroutine线程数。即便我们没有开启新的goroutine

1
2
3
4
5
6
7
8
9
10
package main

import (
"runtime"
"fmt"
)
func main(){
fmt.Println(runtime.NumCPU())
fmt.Println(runtime.NumGoroutine())
}

runtime.Gosched

Gosched()让当前正在执行的goroutine放弃CPU执行权限。调度器安排其他正在等待的线程运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"runtime"
"fmt"
)

func main(){
go sayHello()
go sayWorld()
var str string
fmt.Scan(&str)
}

func sayHello(){
for i := 0; i < 10; i++{
fmt.Print("hello ")
runtime.Gosched()
}
}

func sayWorld(){
for i := 0; i < 10; i++ {
fmt.Println("world")
runtime.Gosched()
}
}

runtime.Goexit

runtime.Goexit()函数用于终止当前的goroutine,单defer函数将会继续被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"runtime"
"fmt"
)

func test(){
defer func(){
fmt.Println(" in defer")
}()
for i := 0; i < 10; i++{
fmt.Print(i)
if i > 5{
runtime.Goexit()
}
}
}

func main(){
go test()
// 等待输入,在这里用来阻止主线程关闭
var str string
fmt.Scan(&str)
}