摘要:之分布式鎖的實現(xiàn)方案如何優(yōu)雅地實現(xiàn)分布式鎖博客地址分布式鎖關鍵詞分布式鎖是控制分布式系統(tǒng)之間同步訪問共享資源的一種方式。
關鍵詞Redis之分布式鎖的實現(xiàn)方案 - 如何優(yōu)雅地實現(xiàn)分布式鎖(JAVA)
博客地址 https://blog.piaoruiqing.cn/2019/05/19/redis分布式鎖/
分布式鎖: 是控制分布式系統(tǒng)之間同步訪問共享資源的一種方式。
spring-data-redis: Spring針對redis的封裝, 配置簡單, 提供了與Redis存儲交互的抽象封裝, 十分優(yōu)雅, 也極具擴展性, 推薦讀一讀源碼
Lua: Lua 是一種輕量小巧的腳本語言, 可在redis執(zhí)行.
前言本文闡述了Redis分布式鎖的一種簡單JAVA實現(xiàn)及優(yōu)化進階, 實現(xiàn)了自動解鎖、自定義異常、重試、注解鎖等功能, 嘗試用更優(yōu)雅簡潔的代碼完成分布式鎖.
需求互斥性: 在分布式系統(tǒng)環(huán)境下, 一個鎖只能被一個線程持有.
高可用: 不會發(fā)生死鎖、即使客戶端崩潰也可超時釋放鎖.
非阻塞: 獲取鎖失敗即返回.
方案Redis具有極高的性能, 且其命令對分布式鎖支持友好, 借助SET命令即可實現(xiàn)加鎖處理.
實現(xiàn)SET
EX seconds -- Set the specified expire time, in seconds.
PX milliseconds -- Set the specified expire time, in milliseconds.
NX -- Only set the key if it does not already exist.
XX -- Only set the key if it already exist.
簡單實現(xiàn)
做法為set if not exist(如果不存在則賦值), redis命令為原子操作, 所以多帶帶使用set命令時不用擔心并發(fā)導致異常.
具體代碼實現(xiàn)如下: (spring-data-redis:2.1.6)
依賴引入<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-data-redisartifactId>
<version>2.1.4.RELEASEversion>
dependency>
配置RedisTemplate
@Bean
@ConditionalOnMissingBean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {
StringRedisSerializer keySerializer = new StringRedisSerializer();
RedisSerializer<");new StringRedisSerializer();
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(factory);
template.setKeySerializer(keySerializer);
template.setHashKeySerializer(keySerializer);
template.setValueSerializer(serializer);
template.setHashValueSerializer(serializer);
template.afterPropertiesSet();
return template;
}
簡單的分布式鎖實現(xiàn)
/**
* try lock
* @author piaoruiqing
*
* @param key lock key
* @param value value
* @param timeout timeout
* @param unit time unit
* @return
*/
public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {
return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
}
以上代碼即完成了一個簡單的分布式鎖功能:
其中redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit); 即為執(zhí)行redis命令:
redis> set dlock:test-try-lock a EX 10 NX
OK
redis> set dlock:test-try-lock a EX 10 NX
null
早期版本spring-data-redis分布式鎖實現(xiàn)及注意事項
方法Boolean setIfAbsent(K key, V value, long timeout, TimeUnit unit);是在2.1版本中新增的, 早期版本中setIfAbsent無法同時指定過期時間, 若先使用setIfAbsent再設置key的過期時間, 會存在產生死鎖的風險, 故舊版本中需要使用另外的寫法進行實現(xiàn). 以spring-data-redis:1.8.20為例
/**
* try lock
* @author piaoruiqing
*
* @param key lock key
* @param value value
* @param timeout timeout
* @param unit time unit
* @return
*/
public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {
return redisTemplate.execute(new RedisCallback() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
JedisCommands commands = (JedisCommands)connection.getNativeConnection();
String result = commands.set(key, value, "NX", "PX", unit.toMillis(timeout));
return "OK".equals(result);
}
});
}
spring-data-redis:1.8.20默認redis客戶端為jedis, 可通過getNativeConnection直接調用jedis方法進行操作. 新舊版本實現(xiàn)方式最終效果相同.
優(yōu)化進階優(yōu)化一 (自動解鎖及重試)基于AOP實現(xiàn)分布式鎖注解工具 - 不僅能用, 而且好用
自動解鎖、重試: 上一節(jié)針對分布式鎖的簡單實現(xiàn)可滿足基本需求, 但仍有較多可優(yōu)化改進之處, 本小節(jié)將針對分布式鎖自動解鎖及重試進行優(yōu)化
實現(xiàn)AutoCloseable接口, 可使用try-with-resource方便地完成自動解鎖.
/**
* distributed lock
* @author piaoruiqing
*
* @since JDK 1.8
*/
abstract public class DistributedLock implements AutoCloseable {
private final Logger LOGGER = LoggerFactory.getLogger(getClass());
/**
* release lock
* @author piaoruiqing
*/
abstract public void release();
/*
* (non-Javadoc)
* @see java.lang.AutoCloseable#close()
*/
@Override
public void close() throws Exception {
LOGGER.debug("distributed lock close , {}", this.toString());
this.unlock();
}
}
RedisDistributedLock是Redis分布式鎖的抽象, 繼承了DistributedLock并實現(xiàn)了unlock接口.
/**
* redis distributed lock
*
* @author piaoruiqing
* @date: 2019/01/12 23:20
*
* @since JDK 1.8
*/
public class RedisDistributedLock extends DistributedLock {
private RedisOperations operations;
private String key;
private String value;
private static final String COMPARE_AND_DELETE = // (一)
"if redis.call("get",KEYS[1]) == ARGV[1]
" +
"then
" +
" return redis.call("del",KEYS[1])
" +
"else
" +
" return 0
" +
"end";
/**
* @param operations
* @param key
* @param value
*/
public RedisDistributedLock(RedisOperations operations, String key, String value) {
this.operations = operations;
this.key = key;
this.value = value;
}
/*
* (non-Javadoc)
* @see com.piaoruiqing.demo.distributed.lock.DistributedLock#release()
*/
@Override
public void release() { // (二)
List keys = Collections.singletonList(key);
operations.execute(new DefaultRedisScript(COMPARE_AND_DELETE), keys, value);
}
/*
* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "RedisDistributedLock [key=" + key + ", value=" + value + "]";
}
}
(一): 通過Lua腳本進行解鎖, 使對比鎖的值+刪除成為原子操作, 確保解鎖操作的正確性. 簡單來說就是防止刪了別人的鎖. 例如: 線程A方法未執(zhí)行完畢時鎖超時了, 隨后B線程也獲取到了該鎖(key相同), 但此時如果A線程方法執(zhí)行完畢嘗試解鎖, 如果不比對value, 那么A將刪掉B的鎖, 這時候C線程又能加鎖, 業(yè)務將產生更嚴重的混亂.(不要過分依賴分布式鎖, 在數(shù)據(jù)一致性要求較高的情況下, 數(shù)據(jù)庫層面也要進行一定的處理, 例如唯一鍵約束、事務等來確保數(shù)據(jù)的正確)
(二): 使用RedisOperations執(zhí)行Lua腳本進行解鎖操作.
可參閱redis官方文檔
/**
* @author piaoruiqing
* @param key lock key
* @param timeout timeout
* @param retries number of retries
* @param waitingTime retry interval
* @return
* @throws InterruptedException
*/
public DistributedLock acquire(String key, long timeout, int retries, long waitingTime) throws InterruptedException {
final String value
= RandomStringUtils.randomAlphanumeric(4) + System.currentTimeMillis(); // (一)
do {
Boolean result
= stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS); // (二)
if (result) {
return new RedisDistributedLock(stringRedisTemplate, key, value);
}
if (retries > NumberUtils.INTEGER_ZERO) {
TimeUnit.MILLISECONDS.sleep(waitingTime);
}
if(Thread.currentThread().isInterrupted()){
break;
}
} while (retries-- > NumberUtils.INTEGER_ZERO);
return null;
}
(一): 鎖值要保證唯一, 使用4位隨機字符串+時間戳基本可滿足需求 注: UUID.randomUUID()在高并發(fā)情況下性能不佳.
(二): 嘗試加鎖, 代碼中是2.1版本的做法, 早起版本參考上一節(jié)的實現(xiàn).
此代碼已經可以滿足自動解鎖和重試的需求了, 使用方法:
// 根據(jù)key加鎖, 超時時間10000ms, 重試2次, 重試間隔500ms
try(DistributedLock lock = redisLockService.acquire(key, 10000, 2, 500);){
// do something
}
但還可以再優(yōu)雅一點, 將模板代碼封裝起來, 可支持Lambda表達式:
/**
* lock handler
* @author piaoruiqing
*
* @since JDK 1.8
*/
@FunctionalInterface // (一)
public interface LockHandler<T> {
/**
* the logic you want to execute
*
* @author piaoruiqing
*
* @return
* @throws Throwable
*/
T handle() throws Throwable; // (二)
}
(一): 定義函數(shù)式接口, 將業(yè)務邏輯放入Lambda表達式使代碼更加簡潔.
(二): 業(yè)務中的異常不建議在分布式鎖中處理, 直接拋出來更合理.
使用LockHandler完成加鎖的實現(xiàn):
public T tryLock(String key, LockHandler handler, long timeout, boolean autoUnlock, int retries, long waitingTime) throws Throwable {
try (DistributedLock lock = this.acquire(key, timeout, retries, waitingTime);) {
if (lock != null) {
LOGGER.debug("get lock success, key: {}", key);
return handler.handle();
}
LOGGER.debug("get lock fail, key: {}", key);
return null;
}
}
此時可以通過比較優(yōu)雅的方式使用分布式鎖來完成編碼:
@Test
public void testTryLock() throws Throwable {
final String key = "dlock:test-try-lock";
AnyObject anyObject = redisLockService.tryLock(key, () -> {
// do something
return new AnyObject();
}, 10000, true, 0, 0);
}
自定義異常: 前文中針對分布式鎖的封裝可滿足多數(shù)業(yè)務場景, 但是考慮這樣一種情況, 如果業(yè)務本身會返回NULL當前的實現(xiàn)方式可能會存在錯誤的處理, 因為獲取鎖失敗也會返回NULL. 避免返回NULL固然是一種解決方式, 但無法滿足所有的場景, 此時支持自定義異常或許是個不錯的選擇.
實現(xiàn)起來很容易, 在原代碼的基礎之上增加onFailure參數(shù), 如果鎖為空直接拋出異常即可.
public T tryLock(String key, LockHandler handler, long timeout, boolean autoUnlock, int retries, long waitingTime, Class<"); throws Throwable { // (一)
try (DistributedLock lock = this.getLock(key, timeout, retries, waitingTime);) {
if (lock != null) {
LOGGER.debug("get lock success, key: {}", key);
return handler.handle();
}
LOGGER.debug("get lock fail, key: {}", key);
if (null != onFailure) {
throw onFailure.newInstance(); // (二)
}
return null;
}
}
(一): Class<");限定onFailure必須是RuntimeException或其子類. 筆者認為使用RuntimeException在語義上更容易理解. 如有需要使用其他異常也未嘗不可(如獲取鎖失敗需要統(tǒng)一處理等情況).
(二): 反射
優(yōu)化三 (優(yōu)雅地使用注解)結合APO優(yōu)雅地使用注解完成分布式鎖:
為了減小篇幅折疊部分注釋
/**
* distributed lock
* @author piaoruiqing
* @date: 2019/01/12 23:15
*
* @since JDK 1.8
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DistributedLockable {
/** timeout of the lock */
long timeout() default 5L;
/** time unit */
TimeUnit unit() default TimeUnit.MILLISECONDS;
/** number of retries */
int retries() default 0;
/** interval of each retry */
long waitingTime() default 0L;
/** key prefix */
String prefix() default "";
/** parameters that construct a key */
String[] argNames() default {};
/** construct a key with parameters */
boolean argsAssociated() default true;
/** whether unlock when completed */
boolean autoUnlock() default true;
/** throw an runtime exception while fail to get lock */
Class<");default NoException.class;
/** no exception */
public static final class NoException extends RuntimeException {
private static final long serialVersionUID = -7821936618527445658L;
}
}
timeout: 超時時間
unit: 時間單位
retries: 重試次數(shù)
waitingTime: 重試間隔時間
prefix: key前綴, 默認為包名+類名+方法名
argNames: 組成key的參數(shù)
注解可使用在方法上, 需要注意的是, 本文注解通過spring AOP實現(xiàn), 故對象內部方法間調用將無效.
/**
* distributed lock aspect
* @author piaoruiqing
* @date: 2019/02/02 22:35
*
* @since JDK 1.8
*/
@Aspect
@Order(10) // (一)
public class DistributedLockableAspect implements KeyGenerator { // (二)
private final Logger LOGGER = LoggerFactory.getLogger(getClass());
@Resource
private RedisLockClient redisLockClient;
/**
* {@link DistributedLockable}
* @author piaoruiqing
*/
@Pointcut(value = "execution(* *(..)) && @annotation(com.github.piaoruiqing.dlock.annotation.DistributedLockable)")
public void distributedLockable() {}
/**
* @author piaoruiqing
*
* @param joinPoint
* @param lockable
* @return
* @throws Throwable
*/
@Around(value = "distributedLockable() && @annotation(lockable)")
public Object around(ProceedingJoinPoint joinPoint, DistributedLockable lockable) throws Throwable {
long start = System.nanoTime();
final String key = this.generate(joinPoint, lockable.prefix(), lockable.argNames(), lockable.argsAssociated()).toString();
Object result = redisLockClient.tryLock(
key, () -> {
return joinPoint.proceed();
},
lockable.unit().toMillis(lockable.timeout()), lockable.autoUnlock(),
lockable.retries(), lockable.unit().toMillis(lockable.waitingTime()),
lockable.onFailure()
);
long end = System.nanoTime();
LOGGER.debug("distributed lockable cost: {} ns", end - start);
return result;
}
}
(一): 切面優(yōu)先級
(二): KeyGenerator為自定義的key生成策略, 使用 prefix+argName+arg作為key, 具體實現(xiàn)見源碼.
此時可以通過注解的方式使用分布式鎖, 這種方式對代碼入侵較小, 且簡潔.
@DistributedLockable(
argNames = {"anyObject.id", "anyObject.name", "param1"},
timeout = 20, unit = TimeUnit.SECONDS,
onFailure = RuntimeException.class
)
public Long distributedLockableOnFaiFailure(AnyObject anyObject, String param1, Object param2, Long timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
LOGGER.info("distributed-lockable: " + System.nanoTime());
} catch (InterruptedException e) {
}
return System.nanoTime();
}
擴展
分布式鎖的實現(xiàn)有多種方式, 可根據(jù)實際場景和需求選擇不同的介質進行實現(xiàn):
Redis: 性能高, 對分布式鎖支持友好, 實現(xiàn)簡單, 多數(shù)場景下表現(xiàn)較好.
Zookeeper: 可靠性較高, 對分布式鎖支持友好, 實現(xiàn)較復雜但有現(xiàn)成的實現(xiàn)可以使用.
數(shù)據(jù)庫: 實現(xiàn)簡單, 可使用樂觀鎖/悲觀鎖實現(xiàn), 性能一般, 高并發(fā)場景下不推薦
結語本文闡述了Redis分布式鎖的JAVA實現(xiàn), 完成了自動解鎖、自定義異常、重試、注解鎖等功能, 源碼見地址.
本實現(xiàn)還有諸多可以優(yōu)化之處, 如:
重入鎖的實現(xiàn)
優(yōu)化重試策略為訂閱Redis事件: 訂閱Redis事件可以進一步優(yōu)化鎖的性能, 可通過wait+notifyAll來替代文中的sleep.
篇幅有限, 后續(xù)再行闡述.
參考文獻redis.io/topics/dist…
martin.kleppmann.com/2016/02/08/…
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/7228.html
摘要:集群實現(xiàn)分布式鎖上面的討論中我們有一個非常重要的假設是單點的。但是其實這已經超出了實現(xiàn)分布式鎖的范圍,單純用沒有命令來實現(xiàn)生成。這個問題用實現(xiàn)分布式鎖暫時無解。結論并不能實現(xiàn)嚴格意義上的分布式鎖。 關于Redis實現(xiàn)分布式鎖的問題,網(wǎng)絡上很多,但是很多人的討論基本就是把原來博主的貼過來,甚至很多面試官也是一知半解經不起推敲就來面候選人,最近結合我自己的學習和資料查閱,整理一下用Redi...
摘要:分布式鎖的作用在單機環(huán)境下,有個秒殺商品的活動,在短時間內,服務器壓力和流量會陡然上升。分布式集群業(yè)務業(yè)務場景下,每臺服務器是獨立存在的。這里就用到了分布式鎖這里簡單介紹一下,以的事務機制來延生。 Redis 分布式鎖的作用 在單機環(huán)境下,有個秒殺商品的活動,在短時間內,服務器壓力和流量會陡然上升。這個就會存在并發(fā)的問題。想要解決并發(fā)需要解決以下問題 1、提高系統(tǒng)吞吐率也就是qps 每...
閱讀 1214·2021-11-24 09:39
閱讀 2137·2021-11-22 13:54
閱讀 2128·2021-09-08 10:45
閱讀 1454·2021-08-09 13:43
閱讀 2991·2019-08-30 15:52
閱讀 3090·2019-08-29 15:38
閱讀 2853·2019-08-26 13:44
閱讀 3059·2019-08-26 13:30