ASP源码.NET源码PHP源码JSP源码JAVA源码DELPHI源码PB源码VC源码VB源码Android源码

Redis分布式锁学习

来源:网络整理     时间:2016-05-14     关键词:redis

本篇文章主要介绍了"Redis分布式锁学习",主要涉及到redis方面的内容,对于.NETjrs看球网直播吧_低调看直播体育app软件下载_低调看体育直播感兴趣的同学可以参考一下: 最近研究了一下Redis分布锁,在github上搜了一下 就拿star最多的 https://github.com/KidFashion/redlock-cs来...

最近研究了一下Redis分布锁,在github上搜了一下 

就拿star最多的 https://github.com/KidFashion/redlock-cs来参考。。

但是相比Redisson(java的实现)有以下几点不足:

1.当一个资源解锁的时候,释放锁之后,其他之前等待的锁不能自动重试申请锁。

2.如果设置了一个超时时间,但是确实执行时间超过了超时时间,那么锁会被自动释放,原来持锁的客户端再次解锁的时候会出现问题

  也要保证一旦一个客户端持锁,在客户端可用时不会被其他客户端解锁。

3.如果关键业务,可能需要重入场景,无法实现重入锁。

 


于是我仿照Redisson在c#的实现
测试一下:redis 命令,redis windows,redis入门指南,redis 安装,redis java,php redis,redis 下载,redis 集群,redis可视化工具redis 命令,redis windows,redis入门指南,redis 安装,redis java,php redis,redis 下载,redis 集群,redis可视化工具
TODO:接收事件去重新获取锁 

public class RedisLock{

    #region Property
    //KEYS[1] :需要加锁的key,这里需要是字符串类型。    //ARGV[1] :锁的超时时间,防止死锁    //ARGV[2] :锁的唯一标识    private const String LOCK_SCRIPT = @"
                if (redis.call('exists', KEYS[1]) == 0) then 
                    redis.call('hset', KEYS[1], ARGV[2], 1);
                    redis.call('pexpire', KEYS[1], ARGV[1]);
                    return nil;
                end;
                if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
                    redis.call('hincrby', KEYS[1], ARGV[2], 1);
                    redis.call('pexpire', KEYS[1], ARGV[1]);
                    return nil;
                end;
                return redis.call('pttl', KEYS[1]);
            ";

    //– KEYS[1] :需要加锁的key,这里需要是字符串类型。    //– KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:“redisson_lock__channel__{” + getName() + “}”    //– ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。    //– ARGV[2] :锁的超时时间,防止死锁    //– ARGV[3] :锁的唯一标识    private const String UN_LOCK_SCRIPT = @"
                if (redis.call('exists', KEYS[1]) == 0) then 
                    redis.call('publish', KEYS[2], ARGV[1]); 
                    return 1;
                end;
                if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
                    return nil;
                end;
                local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
                if (counter > 0) then
                    redis.call('pexpire', KEYS[1], ARGV[2]); 
                    return 0;
                else
                    redis.call('del', KEYS[1]);
                    redis.call('publish', KEYS[2], ARGV[1]);
                    return 1;
                end;
                return nil;
            ";
    private const double ClockDriveFactor = 0.01;

    /// <summary>    /// 默认的30秒过期时间    /// summary>    private readonly TimeSpan LeaseTimeSpan = new TimeSpan(0, 0, 30);

    private readonly ConcurrentDictionary<stringCancellationTokenSource> ExpirationRenewalMap =  new ConcurrentDictionary<stringCancellationTokenSource>();

    private readonly ConnectionMultiplexer Server; 
    #endregion
    #region Constructor

    public RedisLock(ConnectionMultiplexer connection)
    {
        Server = connection;
        Server.PreserveAsyncOrder = false;
    }

    #endregion
    #region Public

    /// <summary>    /// 加锁    /// summary>    /// <param name="resource">锁名param>    /// <param name="waitTimeSpan">如果没有锁成功,允许动重试申请锁的最大时长param>    /// <param name="leaseTimeSpan">如果锁成功,对于锁(key)的过期时间param>    /// <param name="lockObject">锁成功信息包装成对象返回param>    /// <returns>true:成功returns>    public bool TryLock(RedisKey resource, TimeSpan waitTimeSpan, TimeSpan leaseTimeSpan, out Lock lockObject)
    {
        lockObject = null;
        try        {
            var startTime = DateTime.Now;
            var val = CreateUniqueLockId();
            //申请锁,返回还剩余的锁过期时间            var ttl = tryAcquire(resource, val, leaseTimeSpan);
            var drift = Convert.ToInt32((waitTimeSpan.TotalMilliseconds * ClockDriveFactor) + 2);
            var validity_time = waitTimeSpan - (DateTime.Now - startTime) - new TimeSpan(0, 0, 0, 0, drift);

            // 如果为空,表示申请锁成功            if (ttl.IsNull)
            {
                lockObject = new Lock(resource, val, validity_time);
                //开始一个调度程序                ScheduleExpirationRenewal(leaseTimeSpan, lockObject);
                return true;
            }
            // 订阅监听redis消息            Subscriber(resource);

            startTime = DateTime.Now;
            while (true)
            {

                // 再次尝试一次申请锁                ttl = tryAcquire(resource, val, leaseTimeSpan);
                // 获得锁,返回                if (ttl.IsNull)
                {
                    lockObject = new Lock(resource, val, validity_time);
                    ScheduleExpirationRenewal(leaseTimeSpan, lockObject);
                    return true;
                }

                drift = Convert.ToInt32((waitTimeSpan.TotalMilliseconds * ClockDriveFactor) + 2);
                validity_time = waitTimeSpan - (DateTime.Now - startTime) - new TimeSpan(0, 0, 0, 0, drift);
                if (validity_time.TotalMilliseconds < 0)
                {
                    //说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。                    Console.WriteLine("已经超过了客户端设置的最大wait time,Thread ID:" + Thread.CurrentThread.ManagedThreadId);
                    return false;
                }
            }
        }
        catch (Exception)
        {
            return false;
        }
        finally        {
            // 无论是否获得锁,都要取消订阅解锁消息            UnSubscriber(resource);
        }

    }

    /// <summary>    /// 解锁    /// summary>    /// <param name="lockObject">锁成功的返回对象param>    /// <returns>returns>    public RedisResult UnLock(Lock lockObject)
    {
        if (lockObject == nullreturn null;
        CancelExpirationRenewal(lockObject);
        RedisKey[] key = { lockObject.Resource, GetChannelName(lockObject.Resource) };
        RedisValue[] values = { Thread.CurrentThread.ManagedThreadId, 10000, lockObject.Value };
        return Server.GetDatabase().ScriptEvaluate(UN_LOCK_SCRIPT, key, values);
    } 
    #endregion
    #region Private
    private void Subscriber(RedisKey resource)
    {
        Console.WriteLine("Thread ID:" + Thread.CurrentThread.ManagedThreadId + " 订阅广播");
        var aa = Thread.CurrentThread.ManagedThreadId;
        ISubscriber sub = Server.GetSubscriber();
        sub.Subscribe(GetChannelName(resource), (channel, message) =>
        {
            Console.WriteLine("Thread ID:" + aa + ",收到广播:Thread ID:" + message + " 已解锁");
        });

    }

    private void UnSubscriber(RedisKey resource)
    {
        ISubscriber sub = Server.GetSubscriber();
        sub.Unsubscribe(GetChannelName(resource));
    }

    private string GetChannelName(RedisKey resource)
    {
        return "redisson_lock__channel__{" + resource.ToString() + "}";
    }

    private RedisResult tryAcquire(RedisKey resource, string value, TimeSpan? leaseTimeSpan)
    {
        if (leaseTimeSpan != null)
        {
            return LockInnerAsync(resource, leaseTimeSpan.Value, value);
        }
        else        {
            return LockInnerAsync(resource, value);
        }
    }

    private RedisResult LockInnerAsync(RedisKey resource, TimeSpan waitTime, string threadId)
    {
        RedisKey[] key = { resource };
        RedisValue[] values = { waitTime.TotalMilliseconds, threadId };
        return Server.GetDatabase().ScriptEvaluate(LOCK_SCRIPT, key, values);
    }

    private RedisResult LockInnerAsync(RedisKey resource, string threadId)
    {
        var task = LockInnerAsync(resource, this.LeaseTimeSpan, threadId);
        return task;
    }

    protected static string CreateUniqueLockId()
    {
        return string.Concat(Guid.NewGuid().ToString(), Thread.CurrentThread.ManagedThreadId);
    }

    protected void setTimeOut(ElapsedEventHandler doWork, int time)

    {
        System.Timers.Timer timer = new System.Timers.Timer();
        timer.Interval = time;
        timer.Elapsed += delegate (object s, ElapsedEventArgs args) { timer.Stop(); };
        timer.Elapsed += doWork;
        timer.Start();
    }

    protected CancellationTokenSource TaskTimeOut(Func<Lockbool> action, Lock lockObj, int time)
    {
        var timeoutCancellationTokenSource = new CancellationTokenSource();
        var task = Task.Run(() =>
        {
            while (!timeoutCancellationTokenSource.IsCancellationRequested)
            {
                Thread.Sleep(time);
                if (action(lockObj))
                {
                    Console.WriteLine("锁:" + lockObj.Value + " 重置过期时间");
                }

            }

        }, timeoutCancellationTokenSource.Token);
        return timeoutCancellationTokenSource;
    }
    private void ScheduleExpirationRenewal(TimeSpan leaseTimeSpan, Lock lockObject)
    {
        ScheduleExpirationRenewal((lockObj) => Server.GetDatabase().KeyExpire(lockObj.Resource, leaseTimeSpan), lockObject,
            Convert.ToInt32(leaseTimeSpan.TotalMilliseconds) / 3);
    }

    private void ScheduleExpirationRenewal(Func<Lockbool> action, Lock lockObj, int time)
    {
        // 保证任务不会被重复创建        if (ExpirationRenewalMap.ContainsKey(lockObj.Resource))
        {
            return;
        }

        var task = TaskTimeOut(action, lockObj, time);

        //如果已经存在,停止任务,也是为了在极端的并发情况下,保证任务不会被重复创建        if (!ExpirationRenewalMap.TryAdd(lockObj.Resource, task))
        {
            task.Cancel();
        }

    }

    private void CancelExpirationRenewal(Lock lockObj)
    {
        CancellationTokenSource task;
        if (ExpirationRenewalMap.TryRemove(lockObj.Resource, out task))
        {
            if (task != null)
            {
                task.Cancel();
            }
        }

    } 
    #endregion}

以上就介绍了Redis分布式锁学习,包括了redis方面的内容,希望对.NETjrs看球网直播吧_低调看直播体育app软件下载_低调看体育直播有兴趣的朋友有所帮助。

本文网址链接:http://www.codes51.com/article/detail_1068362.html

相关图片

相关文章