Pipelines

Pipelines expose an identical API to Redis, however the awaitable returned by calling a pipeline method can only be awaited after the entire pipeline has successfully executed, that is, after exiting the pipeline’s async context manager:

For example:

async def example(client):
    async with client.pipeline(transaction=True) as pipe:
        # commands is a tuple of awaitables
        commands = (
            pipe.flushdb(),
            pipe.set("foo", "bar"),
            pipe.set("bar", "foo"),
            pipe.keys("*"),
        )
    # results can be retrieved from the returns of each command
    # notice this is OUTSIDE of the pipeline block
    assert await asyncio.gather(*commands) == (True, True, True, {b"bar", b"foo"})
    # or via the `.results` property (this returns `tuple[Any]`)
    assert pipe.results == (True, True, True, {b"bar", b"foo"})

Important

The awaitables returned by calling a command on the pipeline instance retain the expected static types. For example:

async with client.pipeline(raise_on_error=False) as pipe:
    pipe.set("foo", 1)
    value = pipe.incr("foo")

v: int = await value

Atomicity & Transactions

In addition, pipelines can also ensure the buffered commands are executed atomically as a group by using the transaction argument.

pipe = r.pipeline(transaction=True)

A common issue occurs when requiring atomic transactions but needing to retrieve values in Redis prior for use within the transaction. For instance, let”s assume that the INCR command didn’t exist and we need to build an atomic version of INCR in Python.

The completely naive implementation could GET the value, increment it in Python, and SET the new value back. However, this is not atomic because multiple clients could be doing this at the same time, each getting the same value from GET.

Enter the WATCH command. WATCH provides the ability to monitor one or more keys prior to starting a transaction. If any of those keys change prior the execution of that transaction, the entire transaction will be canceled and a WatchError will be raised. To implement our own client-side INCR command, we could do something like this:

async def incr(client: coredis.Redis, key: str) -> int:
    while True:
        try:
            async with client.pipeline(transaction=False) as pipe:
                # put a WATCH on the key that holds our sequence value
                async with pipe.watch(key):
                    current_value = await client.get(key)
                    next_value = int(current_value) + 1
                    pipe.set(key, next_value)
        except WatchError:
            # another client must have changed the value between
            # the time we started watching it and the pipeline"s execution.
            # our best bet is to just retry.
            continue
        else:
            # if a WatchError wasn"t raised during execution, everything
            # we just did happened atomically.
            break