重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

一篇详解Redis--延时队列

redis的 list 数据结构常用来作为 异步消息队列 使用,使用 rpush/lpush 操作 入队 ,使用 lpop/rpop 来操作 出队

专注于为中小企业提供成都网站设计、成都网站建设服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业睢县免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了超过千家企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。

一篇详解Redis -- 延时队列

> rpush my-queue apple banana pear
(integer) 3
> llen my-queue
(integer) 3
> lpop my-queue
"apple"
> llen my-queue
(integer) 2
> lpop my-queue
"banana"
> llen my-queue
(integer) 1
> lpop my-queue
"pear"
> llen my-queue
(integer) 0
> lpop my-queue
(nil)


空队列

  1. 如果队列为空,客户端会陷入 pop的死循环 , 空轮询 不仅拉高了 客户端的CPU , Redis的QPS 也会被拉高
  2. 如果空轮询的客户端有几十个, Redis的慢查询 也会显著增加,可以尝试让客户端线程 sleep 1s
  3. 但睡眠会导致消息的 延迟增大 ,可以使用 blpop/brpop (blocking, 阻塞读 )
  4. 阻塞读在队列没有数据时,会立即进入 休眠 状态,一旦有数据到来,会立即被 唤醒 , 消息延迟几乎为0


空闲连接

  1. 如果线程一直阻塞在那里,Redis的客户端连接就成了 闲置连接
  2. 闲置过久, 服务器 一般会 主动断开 连接, 减少闲置的资源占用 ,此时 blpop/brpop 会 抛出异常


锁冲突处理

  1. 分布式锁 加锁失败 的处理策略
  2. 直接抛出异常 ,通知用户稍后重试
  3. sleep 后再重试
  4. 将请求转移到 延时队列 ,过一会重试
  5. 抛出异常
  6. 这种方式比较适合由 用户直接发起 的请求
  7. sleep
  8. sleep会 阻塞 当前的消息处理线程,从而导致队列的后续消息处理出现 延迟
  9. 如果 碰撞比较频繁 ,sleep方案不合适
  10. 延时队列
  11. 比较适合异步消息处理的场景,通过将当前冲突的请求转移到另一个队列 延后处理 来 避免冲突


延时队列

  1. 可以通过Redis的 zset 来实现延时队列
  2. 将消息序列化成一个字符串作为zet的 value ,将该消息的 到期处理时间 作为 score
  3. 然后 多线程轮询 zset获取 到期的任务 进行处理
  4. 多线程是为了保障 可用性 ,但同时要考虑 并发安全 ,确保 任务不能被多次执行

public class RedisDelayingQueue {
 @Data
 @AllArgsConstructor
 @NoArgsConstructor
 private static class TaskItem {
 private String id;
 private T msg;
 }
 private Type taskType = new TypeReference>() {
 }.getType();
 private Jedis jedis;
 private String queueKey;
 public RedisDelayingQueue(Jedis jedis, String queueKey) {
 this.jedis = jedis;
 this.queueKey = queueKey;
 }
 public void delay(T msg) {
 TaskItem task = new TaskItem<>(UUID.randomUUID().toString(), msg);
 jedis.zadd(queueKey, System.currentTimeMillis() + 5000, JSON.toJSONString(task));
 }
 public void loop() {
 // 可以进一步优化,通过Lua脚本将zrangeByScore和zrem统一挪到Redis服务端进行原子化操作,减少抢夺失败出现的资源浪费
 while (!Thread.interrupted()) {
 // 只取一条
 Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
 if (values.isEmpty()) {
 try {
 Thread.sleep(500);
 } catch (InterruptedException e) {
 break;
 }
 continue;
 }
 String s = values.iterator().next();
 if (jedis.zrem(queueKey, s) > 0) {
 // zrem是多线程多进程争夺任务的关键
 TaskItem task = JSON.parseObject(s, taskType);
 this.handleMsg(task.msg);
 }
 }
 }
 private void handleMsg(T msg) {
 try {
 System.out.println(msg);
 } catch (Throwable ignored) {
 // 一定要捕获异常,避免因为个别任务处理问题导致循环异常退出
 }
 }
 public static void main(String[] args) {
 final RedisDelayingQueue queue = new RedisDelayingQueue<>(new Jedis("localhost", 16379), "q-demo");
 Thread producer = new Thread() {
 @Override
 public void run() {
 for (int i = 0; i < 10; i++) {
 queue.delay("zhongmingmao" + i);
 }
 }
 };
 Thread consumer = new Thread() {
 @Override
 public void run() {
 queue.loop();
 }
 };
 producer.start();
 consumer.start();
 try {
 producer.join();
 Thread.sleep(6000);
 consumer.interrupt();
 consumer.join();
 } catch (InterruptedException ignored) {
 }
 }
}

文章标题:一篇详解Redis--延时队列
分享链接:http://cqcxhl.com/article/pscpsj.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP