本篇文章主要介绍了"Redis分布式锁学习",主要涉及到redis方面的内容,对于.NETjrs看球网直播吧_低调看直播体育app软件下载_低调看体育直播感兴趣的同学可以参考一下:
最近研究了一下Redis分布锁,在github上搜了一下 就拿star最多的 https://github.com/KidFashion/redlock-cs来...
最近研究了一下Redis分布锁,在github上搜了一下
就拿star最多的 https://github.com/KidFashion/redlock-cs来参考。。
但是相比Redisson(java的实现)有以下几点不足:
1.当一个资源解锁的时候,释放锁之后,其他之前等待的锁不能自动重试申请锁。
2.如果设置了一个超时时间,但是确实执行时间超过了超时时间,那么锁会被自动释放,原来持锁的客户端再次解锁的时候会出现问题
也要保证一旦一个客户端持锁,在客户端可用时不会被其他客户端解锁。
3.如果关键业务,可能需要重入场景,无法实现重入锁。
于是我仿照Redisson在c#的实现
测试一下:

TODO:接收事件去重新获取锁
public class RedisLock{
#region Property
//KEYS[1] :需要加锁的key,这里需要是字符串类型。 //ARGV[1] :锁的超时时间,防止死锁 //ARGV[2] :锁的唯一标识 private const String LOCK_SCRIPT = @"
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
";
//– KEYS[1] :需要加锁的key,这里需要是字符串类型。 //– KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:“redisson_lock__channel__{” + getName() + “}” //– ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。 //– ARGV[2] :锁的超时时间,防止死锁 //– ARGV[3] :锁的唯一标识 private const String UN_LOCK_SCRIPT = @"
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
";
private const double ClockDriveFactor = 0.01;
/// <summary> /// 默认的30秒过期时间 /// summary> private readonly TimeSpan LeaseTimeSpan = new TimeSpan(0, 0, 30);
private readonly ConcurrentDictionary<string, CancellationTokenSource> ExpirationRenewalMap = new ConcurrentDictionary<string, CancellationTokenSource>();
private readonly ConnectionMultiplexer Server;
#endregion
#region Constructor
public RedisLock(ConnectionMultiplexer connection)
{
Server = connection;
Server.PreserveAsyncOrder = false;
}
#endregion
#region Public
/// <summary> /// 加锁 /// summary> /// <param name="resource">锁名param> /// <param name="waitTimeSpan">如果没有锁成功,允许动重试申请锁的最大时长param> /// <param name="leaseTimeSpan">如果锁成功,对于锁(key)的过期时间param> /// <param name="lockObject">锁成功信息包装成对象返回param> /// <returns>true:成功returns> public bool TryLock(RedisKey resource, TimeSpan waitTimeSpan, TimeSpan leaseTimeSpan, out Lock lockObject)
{
lockObject = null;
try {
var startTime = DateTime.Now;
var val = CreateUniqueLockId();
//申请锁,返回还剩余的锁过期时间 var ttl = tryAcquire(resource, val, leaseTimeSpan);
var drift = Convert.ToInt32((waitTimeSpan.TotalMilliseconds * ClockDriveFactor) + 2);
var validity_time = waitTimeSpan - (DateTime.Now - startTime) - new TimeSpan(0, 0, 0, 0, drift);
// 如果为空,表示申请锁成功 if (ttl.IsNull)
{
lockObject = new Lock(resource, val, validity_time);
//开始一个调度程序 ScheduleExpirationRenewal(leaseTimeSpan, lockObject);
return true;
}
// 订阅监听redis消息 Subscriber(resource);
startTime = DateTime.Now;
while (true)
{
// 再次尝试一次申请锁 ttl = tryAcquire(resource, val, leaseTimeSpan);
// 获得锁,返回 if (ttl.IsNull)
{
lockObject = new Lock(resource, val, validity_time);
ScheduleExpirationRenewal(leaseTimeSpan, lockObject);
return true;
}
drift = Convert.ToInt32((waitTimeSpan.TotalMilliseconds * ClockDriveFactor) + 2);
validity_time = waitTimeSpan - (DateTime.Now - startTime) - new TimeSpan(0, 0, 0, 0, drift);
if (validity_time.TotalMilliseconds < 0)
{
//说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。 Console.WriteLine("已经超过了客户端设置的最大wait time,Thread ID:" + Thread.CurrentThread.ManagedThreadId);
return false;
}
}
}
catch (Exception)
{
return false;
}
finally {
// 无论是否获得锁,都要取消订阅解锁消息 UnSubscriber(resource);
}
}
/// <summary> /// 解锁 /// summary> /// <param name="lockObject">锁成功的返回对象param> /// <returns>returns> public RedisResult UnLock(Lock lockObject)
{
if (lockObject == null) return null;
CancelExpirationRenewal(lockObject);
RedisKey[] key = { lockObject.Resource, GetChannelName(lockObject.Resource) };
RedisValue[] values = { Thread.CurrentThread.ManagedThreadId, 10000, lockObject.Value };
return Server.GetDatabase().ScriptEvaluate(UN_LOCK_SCRIPT, key, values);
}
#endregion
#region Private
private void Subscriber(RedisKey resource)
{
Console.WriteLine("Thread ID:" + Thread.CurrentThread.ManagedThreadId + " 订阅广播");
var aa = Thread.CurrentThread.ManagedThreadId;
ISubscriber sub = Server.GetSubscriber();
sub.Subscribe(GetChannelName(resource), (channel, message) =>
{
Console.WriteLine("Thread ID:" + aa + ",收到广播:Thread ID:" + message + " 已解锁");
});
}
private void UnSubscriber(RedisKey resource)
{
ISubscriber sub = Server.GetSubscriber();
sub.Unsubscribe(GetChannelName(resource));
}
private string GetChannelName(RedisKey resource)
{
return "redisson_lock__channel__{" + resource.ToString() + "}";
}
private RedisResult tryAcquire(RedisKey resource, string value, TimeSpan? leaseTimeSpan)
{
if (leaseTimeSpan != null)
{
return LockInnerAsync(resource, leaseTimeSpan.Value, value);
}
else {
return LockInnerAsync(resource, value);
}
}
private RedisResult LockInnerAsync(RedisKey resource, TimeSpan waitTime, string threadId)
{
RedisKey[] key = { resource };
RedisValue[] values = { waitTime.TotalMilliseconds, threadId };
return Server.GetDatabase().ScriptEvaluate(LOCK_SCRIPT, key, values);
}
private RedisResult LockInnerAsync(RedisKey resource, string threadId)
{
var task = LockInnerAsync(resource, this.LeaseTimeSpan, threadId);
return task;
}
protected static string CreateUniqueLockId()
{
return string.Concat(Guid.NewGuid().ToString(), Thread.CurrentThread.ManagedThreadId);
}
protected void setTimeOut(ElapsedEventHandler doWork, int time)
{
System.Timers.Timer timer = new System.Timers.Timer();
timer.Interval = time;
timer.Elapsed += delegate (object s, ElapsedEventArgs args) { timer.Stop(); };
timer.Elapsed += doWork;
timer.Start();
}
protected CancellationTokenSource TaskTimeOut(Func<Lock, bool> action, Lock lockObj, int time)
{
var timeoutCancellationTokenSource = new CancellationTokenSource();
var task = Task.Run(() =>
{
while (!timeoutCancellationTokenSource.IsCancellationRequested)
{
Thread.Sleep(time);
if (action(lockObj))
{
Console.WriteLine("锁:" + lockObj.Value + " 重置过期时间");
}
}
}, timeoutCancellationTokenSource.Token);
return timeoutCancellationTokenSource;
}
private void ScheduleExpirationRenewal(TimeSpan leaseTimeSpan, Lock lockObject)
{
ScheduleExpirationRenewal((lockObj) => Server.GetDatabase().KeyExpire(lockObj.Resource, leaseTimeSpan), lockObject,
Convert.ToInt32(leaseTimeSpan.TotalMilliseconds) / 3);
}
private void ScheduleExpirationRenewal(Func<Lock, bool> action, Lock lockObj, int time)
{
// 保证任务不会被重复创建 if (ExpirationRenewalMap.ContainsKey(lockObj.Resource))
{
return;
}
var task = TaskTimeOut(action, lockObj, time);
//如果已经存在,停止任务,也是为了在极端的并发情况下,保证任务不会被重复创建 if (!ExpirationRenewalMap.TryAdd(lockObj.Resource, task))
{
task.Cancel();
}
}
private void CancelExpirationRenewal(Lock lockObj)
{
CancellationTokenSource task;
if (ExpirationRenewalMap.TryRemove(lockObj.Resource, out task))
{
if (task != null)
{
task.Cancel();
}
}
}
#endregion}
以上就介绍了Redis分布式锁学习,包括了redis方面的内容,希望对.NETjrs看球网直播吧_低调看直播体育app软件下载_低调看体育直播有兴趣的朋友有所帮助。
本文网址链接:http://www.codes51.com/article/detail_1068362.html