• C语言中文网教程离线版下载(PDF下载)
    C语言中文网
    • C语言教程
    • C++教程
    • Linux教程
    • Shell脚本
    • socket编程
    • 更多>>
    Go语言教程
    1 Go语言简介
    2 Go语言基本语法
    3 Go语言容器
    4 流程控制
    5 Go语言函数
    6 Go语言结构体
    7 Go语言接口
    8 Go语言包(package)
    8.1 包的基本概念
    8.2 Go语言封装简介及实现细节
    8.3 Go语言GOPATH
    8.4 Go语言常用内置包
    8.5 Go语言自定义包
    8.6 Go语言package
    8.7 Go语言导出包中的标识符
    8.8 Go语言import导入包
    8.9 Go语言工厂模式自动注册
    8.10 Go语言单例模式
    8.11 Go语言sync包与锁
    8.12 Go语言big包
    8.13 示例:使用图像包制作GIF动画
    8.14 Go语言正则表达式:regexp包
    8.15 Go语言time包:时间和日期
    8.16 Go语言os包用法简述
    8.17 Go语言flag包:命令行参数解析
    8.18 Go语言go mod包依赖管理工具
    8.19 示例:使用Go语言生成二维码
    8.20 Go语言Context(上下文)
    8.21 示例:客户信息管理系统
    8.22 示例:使用Go语言发送电子邮件
    8.23 Go语言(Pingo)插件化开发
    8.24 Go语言定时器实现原理及作用
    8.25 Go语言使用定时器实现任务队列
    9 Go语言并发
    10 Go语言反射
    11 Go语言网络编程
    12 Go语言文件处理
    13 Go语言网络爬虫
    14 Go语言编译与工具
    15 “避坑”与技巧
    首页 > Go语言教程 > Go语言包(package) 阅读:365

    Go语言使用定时器实现任务队列

    < 上一页Go语言定时器实现原理及作用 Go语言并发下一页 >

    Go语言中提供了两种定时器 timer 和 ticker,分别是一次性定时器和重复任务定时器。本节咱们主要介绍如何使用Go语言的定时器实现一个任务队列,非常具有实用价值。

    Go语言中定时器

    一般用法:
    package main
    
    import(
        "fmt"
        "time"
    )
    
    func main() {
        input := make(chan interface{})
        //producer - produce the messages
        go func() {
            for i := 0; i < 5; i++ {
                input <- i
            }
            input <- "hello, world"
        }()
    
        t1 := time.NewTimer(time.Second * 5)
        t2 := time.NewTimer(time.Second * 10)
    
        for {
            select {
                //consumer - consume the messages
                case msg := <-input:
                    fmt.Println(msg)
    
                case <-t1.C:
                    println("5s timer")
                    t1.Reset(time.Second * 5)
    
                case <-t2.C:
                    println("10s timer")
                    t2.Reset(time.Second * 10)
            }
        }
    }
    上面代码中的这个 C 是啥呢,我们去源码看看,以 timer 为例:

    type Timer struct {
        C <-chan Time
        r runtimeTimer
    }

    原来是一个 channel,其实有 GO 基础的都知道,GO 的运算符当出现的 -> 或者 <- 的时候,必然是有一端是指 channel。按照上面的例子来看,就是阻塞在一个 for 循环内,等待到了定时器的 C 从 channel 出来,当获取到值的时候,进行想要的操作。

    设计我们的定时任务队列

    当时的需求是这样,需要接收到客户端的请求并产生一个定时任务,会在固定时间执行,可能是一次,也可能是多次,也可能到指定时间自动停止,可能当任务终止的时候,还要能停止掉。

    具体的流程如下图所示:


    定义结构
    type OnceCron struct {
        tasks []*Task   //任务的列队
        add chan *Task  //当遭遇到新任务的时候
        remove chan string  //当遭遇到删除任务的时候
        stop chan struct{}  //当遇到停止信号的时候
        Logger *log.Logger  //日志
    }
    type Job interface {
        Run()     //执行接口
    }
    type Task struct {
        Job  Job   //要执行的任务
        Uuid string   //任务标识,删除时用
        RunTime int64   //执行时间
        Spacing int64   //间隔时间
        EndTime int64   //结束时间
        Number int    //总共要次数
    }

    队列实现

    首先,我们要获得一个队列任务

    func NewCron() *OnceCron常规操作,为了节省篇幅,就不写出来,具体可以看源码,贴在了底部。

    然后,开始定时器队列的运行,一般,都会命名为 Start。那么就有一个问题,我们刚开始启动程序的时候,这个时候是没有任务队列,那岂不是 for{select{}} 在等待个毛毛球?所以,我们需要在 Start 的时候添加一个默认的任务,防止队列退出。
    func (one *OnceCron) Start() {
        //初始化的时候加入一个一年的长定时器,间隔1小时执行一次
        task := getTaskWithFuncSpacing(3600, time.Now().Add(time.Hour*24*365).Unix() , func() {
        log.Println("It's a Hour timer!")
        }) //为了代码格式 markdown 里面有个括号改成全角了
        one.tasks = append(one.tasks, task)
        go one.run() //协成执行 防止主进程被阻塞
    }
    执行部分应该是重点的,分成三部:
    • 首先获得一个最先执行的任务
    • 然后产生一个定时器,用于执行任务
    • 进行阻塞判断,获取我们要进行的操作
    package main
    
    import (
        "time"
        "log"
        "github.com/google/uuid"
        "os"
    )
    
    //compatible old name
    type OnceCron struct {
        *TaskScheduler
    }
    
    //only exec cron timer cron
    type TaskScheduler struct {
        tasks  []TaskInterface
        swap   []TaskInterface
        add    chan TaskInterface
        remove chan string
        stop   chan struct{}
        Logger TaskLogInterface
        lock    bool
    }
    
    
    type Lock interface {
        Lock()
        UnLock()
    }
    
    //return old name with OnceCron
    func NewCron() *OnceCron {
        return &OnceCron{
            TaskScheduler:NewScheduler(),
        }
    }
    
    //return a Controller Scheduler
    func NewScheduler() *TaskScheduler {
        return &TaskScheduler{
            tasks:  make([]TaskInterface, 0),
            swap:   make([]TaskInterface, 0),
            add:    make(chan TaskInterface),
            stop:   make(chan struct{}),
            remove: make(chan string),
            Logger: log.New(os.Stdout, "[Control]: ", log.Ldate|log.Ltime|log.Lshortfile),
            lock:   false,
        }
    }
    
    //add spacing time job to list with number
    func (scheduler *TaskScheduler) AddFuncSpaceNumber(spaceTime int64, number int, f func()) {
        task := getTaskWithFuncSpacingNumber(spaceTime, number, f)
        scheduler.addTask(task)
    }
    //add spacing time job to list with endTime
    func (scheduler *TaskScheduler) AddFuncSpace(spaceTime int64, endTime int64, f func()) {
        task := getTaskWithFuncSpacing(spaceTime, endTime, f)
        scheduler.addTask(task)
    }
    
    //add func to list
    func (scheduler *TaskScheduler) AddFunc(unixTime int64, f func()) {
        task := getTaskWithFunc(unixTime, f)
        scheduler.addTask(task)
    }
    
    func (scheduler *TaskScheduler) AddTaskInterface(task TaskInterface) {
        scheduler.addTask(task)
    }
    //add a task to list
    func (scheduler *TaskScheduler) AddTask(task *Task) string {
        if task.RunTime != 0 {
            if task.RunTime < 100000000000 {
                task.RunTime = task.RunTime * int64(time.Second)
            }
            if task.RunTime < time.Now().UnixNano() {
                //延遲1秒
                task.RunTime = time.Now().UnixNano() + int64(time.Second)
            }
        } else {
            if task.Spacing > 0 {
                task.RunTime = time.Now().UnixNano() + task.Spacing * int64(time.Second)
            }else{
                scheduler.Logger.Println("error too add task! Runtime error")
                return ""
            }
        }
    
        if task.Uuid == "" {
            task.Uuid = uuid.New().String()
        }
        return scheduler.addTask(task)
    }
    
    //if lock add to swap
    func (scheduler *TaskScheduler) addTask(task TaskInterface) string  {
        if scheduler.lock {
            scheduler.swap = append(scheduler.swap, task)
            scheduler.add <- task
        } else{
            scheduler.tasks = append(scheduler.tasks, task)
            scheduler.add <- task
        }
    
        return task.GetUuid()
    }
    //new export
    func (scheduler *TaskScheduler) ExportInterface() []TaskInterface {
        return scheduler.tasks
    }
    //compatible old export tasks
    func (scheduler *TaskScheduler) Export() []*Task {
        task := make([]*Task,0)
        for _,v := range scheduler.tasks {
            task = append(task, v.(*Task))
        }
        return task
    }
    
    //stop task with uuid
    func (scheduler *TaskScheduler) StopOnce(uuidStr string) {
        scheduler.remove <- uuidStr
    }
    
    //run Cron
    func (scheduler *TaskScheduler) Start() {
        //初始化的时候加入一个一年的长定时器,间隔1小时执行一次
        task := getTaskWithFuncSpacing(3600, time.Now().Add(time.Hour * 24 * 365).UnixNano(), func() {
            log.Println("It's a Hour timer!")
        })
        scheduler.tasks = append(scheduler.tasks, task)
        go scheduler.run()
    }
    
    //stop all
    func (scheduler *TaskScheduler) Stop() {
        scheduler.stop <- struct{}{}
    }
    
    //run task list
    //if is empty, run a year timer task
    func (scheduler *TaskScheduler) run() {
    
        for {
    
            now := time.Now()
            task, key := scheduler.GetTask()
            runTime := task.GetRunTime()
            i64 := runTime - now.UnixNano()
    
            var d time.Duration
            if i64 < 0 {
                scheduler.tasks[key].SetRuntime(now.UnixNano())
                if task != nil {
                    go task.RunJob()
                }
                scheduler.doAndReset(key)
                continue
            } else {
                sec := runTime / int64(time.Second)
                nsec := runTime % int64(time.Second)
    
                d = time.Unix(sec, nsec).Sub(now)
            }
    
            timer := time.NewTimer(d)
    
            //catch a chan and do something
            for {
                select {
                //if time has expired do task and shift key if is task list
                case <-timer.C:
                    scheduler.doAndReset(key)
                    if task != nil {
                        //fmt.Println(scheduler.tasks[key])
                        go task.RunJob()
                        timer.Stop()
                    }
    
                    //if add task
                case <-scheduler.add:
                    timer.Stop()
                    // remove task with remove uuid
                case uuidstr := <-scheduler.remove:
                    scheduler.removeTask(uuidstr)
                    timer.Stop()
                    //if get a stop single exit
                case <-scheduler.stop:
                    timer.Stop()
                    return
                }
    
                break
            }
        }
    }
    
    //return a task and key In task list
    func (scheduler *TaskScheduler) GetTask() (task TaskGetInterface, tempKey int) {
        scheduler.Lock()
        defer scheduler.UnLock()
    
        min := scheduler.tasks[0].GetRunTime()
        tempKey = 0
    
        for key, task := range scheduler.tasks {
            tTime := task.GetRunTime()
            if min <= tTime {
                continue
            }
            if min > tTime {
                tempKey = key
    
                min = tTime
                continue
            }
        }
    
        task = scheduler.tasks[tempKey]
    
        return task, tempKey
    }
    
    //if add a new task and runtime < now task runtime
    // stop now timer and again
    func (scheduler *TaskScheduler) doAndReset(key int) {
        scheduler.Lock()
        defer scheduler.UnLock()
        //null pointer
        if key < len(scheduler.tasks) {
    
            nowTask := scheduler.tasks[key]
            scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...)
    
            if nowTask.GetSpacing() > 0 {
                tTime := nowTask.GetRunTime()
                nowTask.SetRuntime(nowTask.GetSpacing() * int64(time.Second) + tTime)
                number := nowTask.GetRunNumber()
                if number > 1 {
                    nowTask.SetRunNumber(number - 1)
                    scheduler.tasks = append(scheduler.tasks, nowTask)
                } else if nowTask.GetEndTime() >= tTime {
                    scheduler.tasks = append(scheduler.tasks, nowTask)
                }
            }
    
        }
    }
    
    
    //remove task by uuid
    func (scheduler *TaskScheduler) removeTask(uuidStr string) {
        scheduler.Lock()
        defer scheduler.UnLock()
        for key, task := range scheduler.tasks {
            if task.GetUuid() == uuidStr {
                scheduler.tasks = append(scheduler.tasks[:key], scheduler.tasks[key+1:]...)
                break
            }
        }
    }
    
    //lock task []
    func (scheduler *TaskScheduler) Lock() {
        scheduler.lock = true
    }
    
    //unlock task []
    func (scheduler *TaskScheduler) UnLock() {
        scheduler.lock = false
        if len(scheduler.swap) > 0 {
            for _, task := range scheduler.swap {
                scheduler.tasks = append(scheduler.tasks, task)
            }
            scheduler.swap = make([]TaskInterface, 0)
        }
    
    }
    < 上一页Go语言定时器实现原理及作用 Go语言并发下一页 >

    所有教程

    • socket
    • Python基础教程
    • C#教程
    • MySQL函数
    • MySQL
    • C语言入门
    • C语言专题
    • C语言编译器
    • C语言编程实例
    • GCC编译器
    • 数据结构
    • C语言项目案例
    • C++教程
    • OpenCV
    • Qt教程
    • Unity 3D教程
    • UE4
    • STL
    • Redis
    • Android教程
    • JavaScript
    • PHP
    • Mybatis
    • Spring Cloud
    • Maven
    • vi命令
    • Spring Boot
    • Spring MVC
    • Hibernate
    • Linux
    • Linux命令
    • Shell脚本
    • Java教程
    • 设计模式
    • Spring
    • Servlet
    • Struts2
    • Java Swing
    • JSP教程
    • CSS教程
    • TensorFlow
    • 区块链
    • Go语言教程
    • Docker
    • 编程笔记
    • 资源下载
    • 关于我们
    • 汇编语言
    • 大数据
    • 云计算
    • VIP视频

    优秀文章

    • C++友元函数和友元类(C++ friend)详解
    • C语言函数声明
    • C++ pair类模板,STL pair类模板
    • C++11 Lambda表达式(匿名函数)详解
    • Linux mkdir命令:创建目录(文件夹)
    • Shell awk命令详解(格式+使用方法)
    • C++ break和continue用法详解
    • Python创建包,导入包(入门必读)
    • Go语言inject库:依赖注入
    • Java项目实战之九宫格记忆网

    精美而实用的网站,提供C语言、C++、STL、Linux、Shell、Java、Go语言等教程,以及socket、GCC、vi、Swing、设计模式、JSP等专题。

    底部Logo ↑