重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
在linux下实现定时器主要有如下方式
超过10余年行业经验,技术领先,服务至上的经营模式,全靠网络和口碑获得客户,为自己降低成本,也就是为客户降低成本。到目前业务范围包括了:网站设计、成都网站制作,成都网站推广,成都网站优化,整体网络托管,小程序开发,微信开发,App定制开发,同时也可以让客户的网站和网络营销和我们一样获得订单和生意!
在这当中 基于时间轮方式实现的定时器 时间复杂度最小,效率最高,然而我们可以通过 优先队列 实现时间轮定时器。
优先队列的实现可以使用最大堆和最小堆,因此在队列中所有的数据都可以定义排序规则自动排序。我们直接通过队列中 pop 函数获取数据,就是我们按照自定义排序规则想要的数据。
在 Golang 中实现一个优先队列异常简单,在 container/head 包中已经帮我们封装了,实现的细节,我们只需要实现特定的接口就可以。
下面是官方提供的例子
因为优先队列底层数据结构是由二叉树构建的,所以我们可以通过数组来保存二叉树上的每一个节点。
改数组需要实现 Go 预先定义的接口 Len , Less , Swap , Push , Pop 和 update 。
timerType结构是定时任务抽象结构
首先的 start 函数,当创建一个 TimeingWheel 时,通过一个 goroutine 来执行 start ,在start中for循环和select来监控不同的channel的状态
通过for循环从队列中取数据,直到该队列为空或者是遇见第一个当前时间比任务开始时间大的任务, append 到 expired 中。因为优先队列中是根据 expiration 来排序的,
所以当取到第一个定时任务未到的任务时,表示该定时任务以后的任务都未到时间。
当 getExpired 函数取出队列中要执行的任务时,当有的定时任务需要不断执行,所以就需要判断是否该定时任务需要重新放回优先队列中。 isRepeat 是通过判断任务中 interval 是否大于 0 判断,
如果大于0 则,表示永久就生效。
防止外部滥用,阻塞定时器协程,框架又一次封装了timer这个包,名为 timer_wapper 这个包,它提供了两种调用方式。
参数和上面的参数一样,只是在第三个参数中使用了任务池,将定时任务放入了任务池中。定时任务的本身执行就是一个 put 操作。
至于put以后,那就是 workers 这个包管理的了。在 worker 包中, 也就是维护了一个任务池,任务池中的任务会有序的执行,方便管理。
本项目用于移动端的数据统计,项目地址: 。开源的数据统计countly做的很好,但是基础免费版的功能实在不够看,因此我就决定用go语言来写了这个项目,一来可以在实践中学习go语言,二来也可以开发功能完整的开源平台。该项目正在开发中,欢迎有兴趣的gopher一起参与。
数据存储方面使用的是mongodb。由于数据统计业务几乎不涉及到事务以及严格的一致性场景,而且mongodb的自动分片功能可以支撑较大的数据量。使用大数据的存储组件的话就太过于重了。因此选用mongodb。
业务逻辑整体基于事件的发布订阅。当收到客户端请求, frontend 会对请求数据进行处理,然后发布响应的事件。 backend 收到事件后进行统计处理。
后台展示基于Vue-Admin-Template开发,本人前端能力基本就是依葫芦画瓢,希望有前端大神来开发后台页面,项目地址:
目前客户端API仅有2个。一个是上报 openApp 打开APP时间,一个是上报 usageTime 一次启动使用时长事件。SDK方面也需要移动端的大神开发,感兴趣的大佬可以一起开发。
下面放一点后台页面的效果图:
GoAnalytics是基于go实现的一个数据统计平台,用于统计移动端的数据指标,比如启动次数、用户增长、活跃用户、留存等指标分析。前端数据展示项目是 goanalytics-web 。目前正在积极开发中,欢迎提交新的需求和pull request。
Go版本需要支持module,本地开发测试
cmd/goanalytics_kafka 和 goanalytics_rmq 是分别基于 kafka 和 rocketmq 的发布订阅功能做的数据发布
和订阅处理,横向扩展能力比 local 高。另外由于 rocketmq 还没有原生基于 go 的客户端(原生客户端正在开发中
2.0.0 road map ),可能会存在问题。
项目结构
├── README.md
├── api
│ ├── authentication 用户认证、管理API
│ ├── middlewares GIN 中间件
│ └── router API route
├── cmd
│ ├── account 生成admin账号命令
│ ├── analytic_local 不依赖消息系统的goanalytics
│ ├── goanalytics_kafka 基于kafak的goanalytics
│ ├── goanalytics_rmq 基于rocketmq的goanalytics
│ └── test_data 生成测试数据命令
├── common
│ └── data.go
├── conf 配置
│ └── conf.go
├── event
│ ├── codec 数据编解码
│ └── pubsub 消息发布订阅
├── go.mod
├── go.sum
├── metric 所有的统计指标在这里实现
│ ├── init.go
│ └── user 用户相关指标的实现
├── schedule
│ └── schedule.go 定时任务调度
├── storage 存储模块
│ ├── counter.go 计数器接口
│ ├── data.go
│ └── mongodb 基于mongodb实现的存储及计数器
└── utils
├── date.go
├── date_test.go
├── errors.go
└── key.go
目前APP业务中启用的定时任务已达到400+,目前管理比较混乱,很多任务运行时占用服务器资源巨大,其中不乏一些非紧急的任务,平时并不会有太大影响,但是当流量高峰来临时,这些定时任务可能会成为压死骆驼的最后一根稻草。为了避免出现这样的问题,我们通常会在高流量来之前去调整一些定时任务的执行间隔时间或者暂停一些不影响服务的定时任务。这样做的弊端是工作量很大,同时难免会有遗漏。由此衍生除了对任务分级的诉求。对任务分级后,高峰流量时,可视情况降级相关等级的定时任务。
PS:设计核心流程的任务等,如支付回调
PS:任务中设计到事务等
基于gocron的任务节点做任务分级,不同级别的任务对应不同的gocron节点。如下图:
把三级任务放在三级节点上跑,如下图:
以此类推,不同级别的任务跑在对应级别的节点上。
当流量高峰来临时,我们想通过停掉所有三级任务来实现快速降级,而这个操作仅仅需要关闭对应节点的连接即可。如下图
PS:这个操作同时会停止所有正在运行的任务
举个例子:目前我的三级任务节点上运行了一个同步数据的任务(预计5分钟左右能执行完),当我把三级任务节点关闭时,这个任务会直接失败,在节点对应的机器上我们可以看到所有进程也被直接kill掉了,即使我的任务是多进程在跑,相应的子进程也会被kill掉。如下:
当前正在服务的三级节点-asgard三级定时任务
当前正在节点-asgard三级定时任务上运行的任务-商品数据整合同步搜索个推库
节点服务器上正在运行的进程
这时候我们关闭asgard三级定时任务这个节点
可以看到任务直接执行失败了
同时,节点服务器上的进程也被kill掉了
由于二级任务可能涉及到事务等操作,非万分紧急情况下不能直接终止,以免导致脏数据的产生。对于这种任务的降级我们不能直接通过节点的方式停止任务。可以通过关闭任务的方式停止。如下:
PS:关闭任务的操作会等当前的任务执行完成再关闭,不会对当前任务产生任何影响
举个例子:
还拿asgard三级定时任务这个节点来看,目前这个节点在链接状态
这个节点下跑了一个任务
同样的,节点服务器上有对应的进程在跑着
这时候,我们关闭这个任务
我们可以看到,关闭这个任务,不会影响正在执行的任务
节点对应的服务器上的任务也正常在跑
PS:这个关闭任务对应的是,完成当前任务后不再执行新的任务。
1、基于gocron的任务节点对任务做分级处理
2、一、二、三级任务的划分
3、服务降级的两种方式:关闭节点关闭任务
利用 Etcd 的Lease租约特性来实现定时功能,同时通过Watch机制来实现多节点情况下只有一个节点执行该任务。通过定时任务库 Cron 的时间字符串解析器Parser来解析任务执行时间。
Etcd
Cron
源码链接