Locks

coredis.recipes.locks

Distributed lock with LUA Scripts

The implementation is based on the distributed locking pattern described in redis docs

When used with a RedisCluster instance, acquiring the lock includes ensuring that the token set by the acquire() method is replicated to atleast n/2 replicas using the ensure_replication() context manager.

The implementation uses the following LUA scripts:

  1. Release the lock

    -- KEYS[1] - lock name
    -- ARGS[1] - token
    -- return 1 if the lock was released, otherwise 0
    
    local token = redis.call('get', KEYS[1])
    if not token or token ~= ARGV[1] then
        return 0
    end
    redis.call('del', KEYS[1])
    return 1
    
  2. Extend the lock

    -- KEYS[1] - lock name
    -- ARGS[1] - token
    -- ARGS[2] - additional milliseconds
    -- return 1 if the locks time was extended, otherwise 0
    local token = redis.call('get', KEYS[1])
    if not token or token ~= ARGV[1] then
        return 0
    end
    local expiration = redis.call('pttl', KEYS[1])
    if not expiration then
        expiration = 0
    end
    if expiration < 0 then
        return 0
    end
    redis.call('pexpire', KEYS[1], expiration + ARGV[2])
    return 1
    
class LuaLock(client: Redis | RedisCluster, name: StringT, timeout: float | None = None, sleep: float = 0.1, blocking: bool = True, blocking_timeout: float | None = None)[source]

A shared, distributed Lock using LUA scripts.

The lock can be used with both coredis.Redis and coredis.RedisCluster either explicitly or as an async context manager:

import asyncio
import coredis
from coredis.exceptions import LockError
from coredis.recipes.locks import LuaLock

async def test():
    client = coredis.Redis()
    async with LuaLock(client, "mylock", timeout=1.0):
        # do stuff
        await asyncio.sleep(0.5)
        # lock is implictly released when the context manager exits
    try:
        async with LuaLock(client, "mylock", timeout=1.0):
            # do stuff that takes too long
            await asyncio.sleep(1)
            # lock will raise upon exiting the context manager
    except LockError as err:
        # roll back stuff
        print(f"Expected error: {err}")
    lock = LuaLock(client, "mylock", timeout=1.0)
    await lock.acquire()
    # do stuff
    await asyncio.sleep(0.5)
    # do more stuff
    await lock.extend(1.0)
    await lock.release()

asyncio.run(test())
Parameters:
  • timeout – indicates a maximum life for the lock. By default, it will remain locked until release() is called. timeout can be specified as a float or integer, both representing the number of seconds to wait.

  • sleep – indicates the amount of time to sleep per loop iteration when the lock is in blocking mode and another client is currently holding the lock.

  • blocking – indicates whether calling acquire() should block until the lock has been acquired or to fail immediately, causing acquire() to return False and the lock not being acquired. Defaults to True.

  • blocking_timeout – indicates the maximum amount of time in seconds to spend trying to acquire the lock. A value of None indicates continue trying forever. blocking_timeout can be specified as a float or int, both representing the number of seconds to wait.

async acquire() bool[source]

Use SET with the NX option to acquire a lock. If the lock is being used with a cluster client the coredis.RedisCluster.ensure_replication() context manager will be used to ensure that the command was replicated to atleast half the replicas of the shard where the lock would be acquired.

Raises:

LockError

async release() None[source]

Releases the already acquired lock

Raises:

LockReleaseError

async extend(additional_time: float) bool[source]

Adds more time to an already acquired lock.

Parameters:

additional_time – can be specified as an integer or a float, both representing the number of seconds to add.

Raises:

LockExtensionError

property replication_factor: int

Number of replicas the lock needs to replicate to, to be considered acquired.