Pipelines¶
Pipelines expose an API “similar” to Redis
with the exception
that calling any redis command returns the pipeline instance itself.
To retrieve the actual results of each command queued in the pipeline you must call
execute()
For example:
async def example(client):
async with await client.pipeline() as pipe:
await pipe.delete(['bar'])
await pipe.set('bar', 'foo')
await pipe.execute() # needs to be called explicitly
Here are more examples:
async def example(client):
async with await client.pipeline(transaction=True) as pipe:
# will return self to send another command
pipe = await (await pipe.flushdb()).set('foo', 'bar')
# can also directly send command
await pipe.set('bar', 'foo')
# commands will be buffered
await pipe.keys('*')
res = await pipe.execute()
# results should be in order corresponding to your command
assert res == (True, True, True, set([b'bar', b'foo']))
For ease of use, all commands being buffered into the pipeline return the pipeline object itself. Which enable you to use it like the example provided.
Atomicity & Transactions¶
In addition, pipelines can also ensure the buffered commands are executed
atomically as a group by using the transaction
argument.
pipe = r.pipeline(transaction=True)
A common issue occurs when requiring atomic transactions but needing to retrieve values in Redis prior for use within the transaction. For instance, let’s assume that the INCR command didn’t exist and we need to build an atomic version of INCR in Python.
The completely naive implementation could GET the value, increment it in Python, and SET the new value back. However, this is not atomic because multiple clients could be doing this at the same time, each getting the same value from GET.
Enter the WATCH command. WATCH provides the ability to monitor one or more keys
prior to starting a transaction. If any of those keys change prior the
execution of that transaction, the entire transaction will be canceled and a
WatchError
will be raised. To implement our own client-side INCR command, we
could do something like this:
async def example():
async with await r.pipeline() as pipe:
while True:
try:
# put a WATCH on the key that holds our sequence value
await pipe.watch('OUR-SEQUENCE-KEY')
# after WATCHing, the pipeline is put into immediate execution
# mode until we tell it to start buffering commands again.
# this allows us to get the current value of our sequence
current_value = await pipe.get('OUR-SEQUENCE-KEY')
next_value = int(current_value) + 1
# now we can put the pipeline back into buffered mode with MULTI
pipe.multi()
await pipe.set('OUR-SEQUENCE-KEY', next_value)
# and finally, execute the pipeline (the set command)
await pipe.execute()
# if a WatchError wasn't raised during execution, everything
# we just did happened atomically.
break
except WatchError:
# another client must have changed 'OUR-SEQUENCE-KEY' between
# the time we started WATCHing it and the pipeline's execution.
# our best bet is to just retry.
continue
Note that, because the Pipeline must bind to a single connection for the
duration of a WATCH, care must be taken to ensure that the connection is
returned to the connection pool by calling the reset()
method. If the
Pipeline
is used as a context manager (as in the example above) reset()
will be called automatically. Of course you can do this the manual way by
explicitly calling reset()
:
async def example():
async with await r.pipeline() as pipe:
while 1:
try:
await pipe.watch('OUR-SEQUENCE-KEY')
...
await pipe.execute()
break
except WatchError:
continue
finally:
await pipe.reset()
A convenience method transaction()
exists for handling all the
boilerplate of handling and retrying watch errors. It takes a callable that
should expect a single parameter, a pipeline object, and any number of keys to
be watched. Our client-side INCR command above can be written like this,
which is much easier to read:
async def client_side_incr(pipe) -> int:
current_value = await pipe.get('OUR-SEQUENCE-KEY') or 0
next_value = int(current_value) + 1
pipe.multi()
await pipe.set('OUR-SEQUENCE-KEY', next_value)
return next_value
await r.transaction(client_side_incr, 'OUR-SEQUENCE-KEY')
# (True,)
await r.transaction(client_side_incr, 'OUR-SEQUENCE-KEY', value_from_callable=True)
# 2