Go语言-异步任务

摘要

本文内容转自网络,个人学习记录使用,请勿传播

github.com/astaxie/beego/toolbox

beegotoolbox模块,包括了以下几个功能:

  • 健康检查
  • 性能调试
  • 访问统计
  • 计划任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
"fmt"
"github.com/astaxie/beego/toolbox"
"time"
)

func InitTask(){
// 创建任务
tk := toolbox.NewTask("generateWarning", "*/1 * * * * *", GenerateWarning)
//tk := toolbox.NewTask("myTask", "* * * * * *", func() error { fmt.Println("hello world"); return nil })
// 立即执行任务
err := tk.Run()
if err != nil {
fmt.Println(err)
}
// 添加任务
toolbox.AddTask("generateWarning",tk)
}

func GenerateWarning() error {
fmt.Println("hello job")
return nil
}


func main() {
// 定时任务
InitTask()
toolbox.StartTask()
time.Sleep(100 * time.Second)
defer toolbox.StopTask()
}

函数 NewTask(func NewTask(tname string, spec string, f TaskFunc))会返回一个新的任务,它需要三个参数:

  • tname表示任务名称
  • spec为任务时间描述
  • f为要执行的函数。

spec详解

符号 含义 示例
* 表示任何时间
, 表示分割 如第三段里:2,4,表示 2 点和 4 点执行
- 表示一个段 如第三端里: 1-5,就表示 1 到 5 点
/n 表示每个n的单位执行一次 如第三段里,*/1, 就表示每隔 1 个小时执行一次命令。也可以写成1-23/1

github.com/jasonlvhit/gocron

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package main

import (
"fmt"
"time"

"github.com/jasonlvhit/gocron"
)

func task() {
fmt.Println("I am running task.")
}

func taskWithParams(a int, b string) {
fmt.Println(a, b)
}

func main() {
//可并发运行多个任务
//注意 interval>1时调用sAPi
gocron.Every(1).Second().Do(task)
gocron.Every(2).Seconds().Do(task)
gocron.Every(1).Minute().Do(task)
gocron.Every(2).Minutes().Do(task)
gocron.Every(1).Hour().Do(task)
gocron.Every(2).Hours().Do(task)
gocron.Every(1).Day().Do(task)
gocron.Every(2).Days().Do(task)
gocron.Every(1).Week().Do(task)
gocron.Every(2).Weeks().Do(task)

// Do jobs with params
gocron.Every(1).Second().Do(taskWithParams, 1, "hello")
//在cron所有操作最后调用 start函数,否则start之后调用的操作无效不执行
//<-gocron.Start()
//在task执行过程中 禁止异常退出
gocron.Every(1).Minute().DoSafely(taskWithParams, 1, "hello")

// Do jobs on specific weekday
// 支持在具体某一天、某天的某一时刻、每y-M-d h-m-s 执行任务
gocron.Every(1).Monday().Do(task)
gocron.Every(1).Thursday().Do(task)

// Do a job at a specific time - 'hour:min:sec' - seconds optional
gocron.Every(1).Day().At("10:30").Do(task)
gocron.Every(1).Monday().At("18:30").Do(task)
gocron.Every(1).Tuesday().At("18:30:59").Do(task)

// Begin job immediately upon start
gocron.Every(1).Hour().From(gocron.NextTick()).Do(task)

// Begin job at a specific date/time
t := time.Date(2019, time.November, 10, 15, 0, 0, 0, time.Local)
gocron.Every(1).Hour().From(&t).Do(task)

// NextRun gets the next running time
_, time := gocron.NextRun()
fmt.Println(time)

// Remove a specific job
gocron.Remove(task)

// Clear all scheduled jobs
gocron.Clear()

// Start all the pending jobs
<-gocron.Start()

// also, you can create a new scheduler
// to run two schedulers concurrently
//可同时创建一个新的任务调度 2个schedulers 同时执行
s := gocron.NewScheduler()
s.Every(3).Seconds().Do(task)
<-s.Start()

var lockerImplementation gocron.Locker
//防止多个集群中任务同时执行 task 实现lock接口
//两行代码,对cron 设置lock实现,执行task时调用Lock方法再Do task
gocron.SetLocker(lockerImplementation)
gocron.Every(1).Hour().Lock().Do(task)

<-gocron.Start()
}

这里需要关注一下,gocron对lock的实现,从代码上看Job结构体的lock属性,用于控制多实例job并发执行。但项目woner提到的 multiple instances 指的并不是跨服务器的多实例,而是在同一应用服务 里的多任务实例(也就是1个app服务中多个任务,粒度是只在统一应用内)。如果跨server则lock需要自行依赖redis或其他分布式锁来管理。通过读源码的run方法,j.lock来控制job并发,但一旦跨server job.lock属性是没法共享的。这里doc上给的解释有点歧义,需要注意。

github.com/jiajunhuang/gotasks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package main

import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/jiajunhuang/gotasks"
//"github.com/jiajunhuang/gotasks/metrics"
)

const (
uniqueJobName = "a-unique-job-name"
redisURL = "redis://127.0.0.1:6379/0"
queueName = "job-queue-name"
)

func worker() {
// setup signal handler
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT)

ctx, cancel := context.WithCancel(context.Background())
go func() {
log.Printf("gonna listen on SIGINT...")
s := <-sigChan
switch s {
case syscall.SIGINT:
cancel()
default:
}
}()

gotasks.Run(ctx, queueName)
}

func main() {
// register tasks
handler1 := func(args gotasks.ArgsMap) (gotasks.ArgsMap, error) {
time.Sleep(time.Duration(1) * time.Second)
return args, nil
}
handler2 := func(args gotasks.ArgsMap) (gotasks.ArgsMap, error) {
time.Sleep(time.Duration(1) * time.Second)
return args, nil
}
// if handler1 failed, the task will stop, but if handler2 failed(return a non-nil error)
// handler2 will be retry 3 times, and sleep 100 ms each time
gotasks.Register(uniqueJobName, handler1, gotasks.Reentrant(handler2, gotasks.WithMaxTimes(3), gotasks.WithSleepyMS(10)))

// set broker
gotasks.UseRedisBroker(redisURL, gotasks.WithRedisTaskTTL(1000))

// enqueue
// or you can use a queue:
queue := gotasks.NewQueue(queueName, gotasks.WithMaxLimit(10)) // max limit is max concurrency per worker, default is 10
queue.Enqueue(uniqueJobName, gotasks.MapToArgsMap(map[string]interface{}{})) // or gotasks.StructToArgsMap

// or you can integrate metrics handler yourself in your own web app
// go metrics.RunServer(":2121")
worker()
}