重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
本篇文章为大家展示了Jedis中怎么实现分布式锁,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
目前创新互联已为超过千家的企业提供了网站建设、域名、虚拟主机、网站托管、服务器租用、企业网站设计、昆玉网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。
package com.xxx.arch.seq.client.redis; import java.io.Closeable; import java.util.*; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.*; import com.xxx.arch.seq.constant.Constants; /** * Jedis配置实例封装类(兼容单节点连接池和集群节点) * * @author zhangyang * @createDate 2019-01-22 * @since 2.x */ public class JedisConfig { private static volatile JedisConfig redisConfig; //当前模式:1单例,2哨兵 3集群Cluster private int singleton; //jedis连接池 private JedisPool jedisPool; private JedisSentinelPool sentinelPool; private Jedis jedis; //jeids集群 private JedisCluster jedisCluster; private JedisConfig() { Properties redisProp = new Properties(); redisProp.setProperty("arch.seq.redis.host", Constants.ARCH_SEQ_REDIS_NODES); redisProp.setProperty("arch.seq.redis.password", Constants.ARCH_SEQ_REDIS_PASSWORD); redisProp.setProperty("arch.seq.redis.sentinel.master", Constants.ARCH_SEQ_REDIS_SENTINEL_MASTER); String hostConf = redisProp.getProperty("arch.seq.redis.host"); if (hostConf == null) { throw new RuntimeException("get redis configuration error"); } if ("${arch.seq.redis.host}".equals(hostConf)) { throw new RuntimeException("please check occ var \"arch.seq.redis.host\""); } if(!hostConf.contains(",")&&!hostConf.contains(">>")){ singleton = 1; }else if(hostConf.contains(">>")){ singleton=2; }else{ singleton=3; } if (singleton==1) { initJedisPool(redisProp); } else if(singleton==2){ initJedisSentinel(redisProp); }else{ initJedisCluster(redisProp); } } private void initJedisPool(Properties redisProp) { String[] hostConf = redisProp.getProperty("arch.seq.redis.host").split(":"); this.jedisPool = new JedisPool(new JedisPoolConfig(), hostConf[0], Integer.valueOf(hostConf[1]), 0, redisProp.getProperty("arch.seq.redis.password")); } private void initJedisCluster(Properties redisProp) { String[] hostConfList = redisProp.getProperty("arch.seq.redis.host").split(","); Setnodes = new HashSet<>(); String[] hostConf; for (String hc : hostConfList) { hostConf = hc.split(":"); nodes.add(new HostAndPort(hostConf[0], Integer.valueOf(hostConf[1]))); } jedisCluster = new JedisCluster(nodes, 0, 0, 4, redisProp.getProperty("arch.seq.redis.password"), new GenericObjectPoolConfig()); } private void initJedisSentinel(Properties redisProp) { String[] hostConfList = redisProp.getProperty("arch.seq.redis.host").split(">>"); Set sentinels = new HashSet(); String[] hostConf; for (String hc : hostConfList) { hostConf= hc.split(":"); sentinels.add(new HostAndPort(hostConf[0], Integer.valueOf(hostConf[1])).toString()); } sentinelPool = new JedisSentinelPool(redisProp.getProperty("arch.seq.redis.sentinel.master"), sentinels,redisProp.getProperty("arch.seq.redis.password")); jedis = sentinelPool.getResource(); } public static JedisConfig getInstance() { if (redisConfig == null) { synchronized (JedisConfig.class) { if (redisConfig == null) { redisConfig = new JedisConfig(); } } } return redisConfig; } public JedisConn getConn() { if(singleton==1){ return new JedisConn(jedisPool.getResource()); } if(singleton==2){ return new JedisConn(sentinelPool.getResource()); } if(singleton==3){ return new JedisConn(jedisCluster); } return null; } /** * redis连接封装类,支持单机和集群,支持常规操作,支持分布式锁 */ public static class JedisConn implements Closeable { private JedisCommands invoke; public JedisConn(JedisCommands invoke) { this.invoke = invoke; } /** * 设置一个必须是不存在的值 * * @param key - 关键字 * @param value * @return 1-成功 0-失败 */ public Long setnx(String key, String value) { return invoke.setnx(key, value); } /** * 获得一个值 * * @param key - 关键字 * @return */ public String get(String key) { return invoke.get(key); } /** * 更新一个值 * * @param key - 关键字 * @param value - 值 * @return */ public String set(String key, String value) { return invoke.set(key, value); } /** * 更新一个值,并返回更新前的老值 * * @param key - 关键字 * @param value - 值 * @return 更新前的老值 */ public String getSet(String key, String value) { return invoke.getSet(key, value); } /** * 删除一个值 * * @param key - 关键字 */ public void del(String key) { invoke.del(key); } /** * 递增一个值,并返回最新值 * * @param key - 关键字 * @return 最新值 */ public Long incr(String key) { return invoke.incr(key); } /** * 递增一个值,并返回最新值 * * @param key - 关键字 * @return 最新值 */ public Long incr(String key, long total) { return invoke.incrBy(key, total); } /** * 设置过期时间 * * @param key - 关键字 * @param expireTime - 过期时间,毫秒 * @return */ public Long expire(String key, long expireTime) { return invoke.pexpire(key, expireTime); } private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX";//NX是不存在时才set private static final String SET_WITH_EXPIRE_TIME = "PX";//默认毫秒, 解释:EX是秒,PX是毫秒 /** * 尝试获取分布式锁 * @param lockKey 锁 * @param requestId 请求标识 * @param expireTime 超期时间 * @return 是否获取成功 */ public boolean tryLock(String lockKey, String requestId, long expireTime) { String result = invoke.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } private static final Long RELEASE_SUCCESS = 1L; /** * 释放分布式锁 * @param lockKey 锁 * @param requestId 请求标识 * @return 是否释放成功 */ public boolean unLock(String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = evalScript(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; } private Object evalScript(String script, List keys, List args) { return (invoke instanceof Jedis) ? ((Jedis)invoke).eval(script, keys, args) : ((JedisCluster)invoke).eval(script, keys, args); } public void close() { if (invoke instanceof Jedis) { ((Jedis) invoke).close(); } } } }
package com.xxx.arch.seq.core; import com.xxx.arch.seq.client.redis.JedisConfig; import com.xxx.arch.seq.task.ContinuationOfLifeTask; import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.concurrent.*; /** * 基于redis 的分布式锁 */ @Slf4j public final class DistributedLock { //续命任务延迟队列 private static final DelayQueueQUEUE = new DelayQueue<>(); //续命任务映射缓存 private static final Map CACHE = new ConcurrentHashMap<>(); //延长锁时间的守护线程 private static final ExecutorService CONTINUATION_OF_LIFE_EXECUTOR = Executors.newSingleThreadExecutor(); private static final long TIMEOUT = 1000; //限制最大长度 private static final int SIZE = 5000; static { /** * 延长锁时间的核心线程代码 */ CONTINUATION_OF_LIFE_EXECUTOR.execute(() -> { while (true){ //获取优先级最高的任务 ContinuationOfLifeTask task; try { task = QUEUE.take(); } catch (InterruptedException e) { continue; } if (task == null){ continue; } //验证是否活跃 long nowTime = System.currentTimeMillis(); if (task.isActive() && !task.isDiscarded(nowTime)){ //是否可以执行 if (task.isExecute(nowTime)){ task.execute(); //验证是否还需要续命 if (task.isActive() && task.checkCount()){ QUEUE.add(task); }else { //清理不需要任务的缓存 CACHE.remove(task.getId()); } }else { //清理不需要任务的缓存 //如果是时间没到不能执行的 不需要删除,一般不存在 if (nowTime >= task.getEndTime()){ CACHE.remove(task.getId()); } } }else { //清理过期的或者不活跃的任务 CACHE.remove(task.getId()); } } }); } private DistributedLock(){} /** * 获得分布式锁 * * @param lockKey - 分布式锁的key,保证全局唯一 * @param requestId - 本次请求的唯一ID,可用UUID等生成 * @param expireTime - 锁获取后,使用的最长时间,毫秒 * @param flagCount - 延续锁的次数 * @return - 是否成功获取锁 */ public static boolean getDistributeLock(String lockKey, String requestId, long expireTime,int flagCount) { JedisConfig.JedisConn conn = null; try { conn = JedisConfig.getInstance().getConn(); //获取锁 if (QUEUE.size() < SIZE && conn.tryLock(lockKey, requestId, expireTime)){ //创建一个续命任务 ContinuationOfLifeTask task = ContinuationOfLifeTask.build(lockKey, requestId, expireTime, flagCount); //如果放入队列超时 或者失败 if (!QUEUE.offer(task, TIMEOUT, TimeUnit.MILLISECONDS)){ //释放锁 releaseDistributeLock(lockKey, requestId); //返回锁获取失败 return false; } //设置缓存 CACHE.put(lockKey + requestId, task); return true; } return false; } finally { if (conn != null) { conn.close(); } } } /** * 获取分布式锁 * 默认是延长3次锁寿命 * @param lockKey 分布式锁的key,保证全局唯一 * @param requestId 本次请求的唯一ID,可用UUID等生成 * @param expireTime 锁获取后,使用的最长时间,毫秒 * @return */ public static boolean getDefaultDistributeLock(String lockKey, String requestId, long expireTime) { return getDistributeLock(lockKey, requestId, expireTime, 3); } /** * 获取永久分布式锁(默认24小时) * 使用时候记得一定要释放锁 * @param lockKey * @param requestId * @return */ public static boolean getPermanentDistributedLock(String lockKey, String requestId){ return getDistributeLock(lockKey, requestId, 10000, 6 * 60 * 24); } /** * 释放分布式锁 * * @param lockKey - 分布式锁的key,保证全局唯一 * @param requestId - 本次请求的唯一ID,可用UUID等生成 * @return */ public static boolean releaseDistributeLock(String lockKey, String requestId) { JedisConfig.JedisConn conn = null; try { ContinuationOfLifeTask task = CACHE.remove(lockKey + requestId); if (task != null){ task.setActive(false); QUEUE.remove(task); } conn = JedisConfig.getInstance().getConn(); return conn.unLock(lockKey, requestId); } finally { if (conn != null) { conn.close(); } } } }
package com.xxx.arch.seq.task; import com.xxx.arch.seq.client.redis.JedisConfig; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * 续命任务类 */ @Slf4j public class ContinuationOfLifeTask implements Delayed { private String id; //结束时间 即为需要续命的时间 private long endTime; //是否还存活 private volatile boolean active; //锁的key private String lockKey; //锁超时时间 private long timeout; //锁的持续时间 private long expireTime; //锁的续命次数 -1 代表无限 private int flagCount; //续命次数统计 count 不能大于 flagCount private int count; private ContinuationOfLifeTask(String id, String lockKey, long expireTime, long endTime, long timeout, int flagCount) { this.id = id; this.lockKey = lockKey; this.expireTime = expireTime; this.endTime = endTime; this.timeout = timeout; this.flagCount = flagCount; this.active = true; this.count = 0; } public void execute() { //该续命任务是否还存活 if (active) { JedisConfig.JedisConn conn = null; // 当前次数是否小于指定续命次数 // 当前时间是否大于结束时间 if (flagCount > count) { //重试次数 int retryCount = 0; // 当前时间是否大于过期时间 while (System.currentTimeMillis() >= endTime && retryCount < 3) { try { // 续命延期锁的过期时间 (conn = JedisConfig.getInstance().getConn()).expire(lockKey, expireTime); long expiration = expireTime / 10; //保证最少提前100毫秒 timeout = System.currentTimeMillis() + expireTime; //更新结束时间 endTime = timeout - (expiration > 100 ? expiration : 100); //增加执行次数 count++; if (log.isDebugEnabled()) { log.debug("【续命】锁关键字:{},续期:{}毫秒,计数:{}", lockKey, expireTime, count); } break; } catch (Exception e) { try { log.error(e.getMessage(), e); retryCount++; Thread.sleep(100L); } catch (InterruptedException ie) { log.error(e.getMessage(), e); } } finally { if (conn != null) { conn.close(); } } } } } } /** * 是否可以执行 必须是活跃且执行次数没有到最大值 * 且时间没有过期的任务才能执行 * * @return */ public boolean isExecute(long nowTime) { return nowTime >= endTime && nowTime <= timeout && flagCount >= count; } /** * 是否丢弃 * * @return */ public boolean isDiscarded(long nowTime) { return nowTime > timeout || flagCount <= count; } public boolean checkCount() { return count < flagCount; } public static final ContinuationOfLifeTask build(String lockKey, String requestId, long expireTime, int flagCount) { if (StringUtils.isAnyBlank(lockKey, requestId)) { throw new IllegalArgumentException("lockKey Can't be blank !"); } //校验入参如果锁定时间低于 1000 毫秒 延长到 1000 毫秒 if (expireTime < 1000) { expireTime = 1000; } //校验 锁的续命次数 如果小于 -1 则默认等于3 if (flagCount < -1) { flagCount = 3; } long expiration = expireTime / 10; //保证最少提前100毫秒 long timeout = System.currentTimeMillis() + expireTime; long endTime = timeout - (expiration > 500 ? expiration : 500); return new ContinuationOfLifeTask(lockKey + requestId, lockKey, expireTime, endTime, timeout, flagCount); } public long getEndTime() { return endTime; } public ContinuationOfLifeTask setEndTime(long endTime) { this.endTime = endTime; return this; } public boolean isActive() { return active; } public ContinuationOfLifeTask setActive(boolean active) { this.active = active; return this; } public String getLockKey() { return lockKey; } public ContinuationOfLifeTask setLockKey(String lockKey) { this.lockKey = lockKey; return this; } public long getExpireTime() { return expireTime; } public ContinuationOfLifeTask setExpireTime(long expireTime) { this.expireTime = expireTime; return this; } public int getFlagCount() { return flagCount; } public ContinuationOfLifeTask setFlagCount(int flagCount) { this.flagCount = flagCount; return this; } public String getId() { return id; } public void setId(String id) { this.id = id; } @Override public long getDelay(TimeUnit unit) { return unit.convert((endTime) - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); } }
package com.xxx.arch.seq.constant; import com.ctrip.framework.apollo.Config; import com.ctrip.framework.apollo.ConfigService; import org.apache.commons.lang3.StringUtils; public class Constants { //apollo公共的ZK配置集群NameSpace public static final String ZK_NAME_SPACE = "33.zk"; public static final String REDIS_SEQUEN_NAME_SPACE = "33.sequence-redis"; // public static final String REDIS_SEQUEN_NAME_SPACE = "33.sequence"; public static final String ARCH_SEQ_ZOOKEEPER_CONNECT_STRING = getConfig(ZK_NAME_SPACE,"zk.address", ""); public static final String ARCH_SEQ_REDIS_NODES = getConfig(REDIS_SEQUEN_NAME_SPACE,"arch.seq.redis.nodes", ""); public static final String ARCH_SEQ_REDIS_SENTINEL_MASTER = getConfig(REDIS_SEQUEN_NAME_SPACE,"arch.seq.redis.sentinel.master", ""); public static final String ARCH_SEQ_REDIS_PASSWORD = getConfig(REDIS_SEQUEN_NAME_SPACE,"arch.seq.redis.common.key", ""); public static String getConfig(String nameSpace,String key,String defultValue){ if(StringUtils.isBlank(nameSpace)){ return ""; } Config config = ConfigService.getConfig(nameSpace); return config.getProperty(key,defultValue); } }
上述内容就是Jedis中怎么实现分布式锁,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注创新互联行业资讯频道。