Connections

Connection Pools

Both Redis and RedisCluster are backed by a connection pool that manages the underlying connections to the redis server(s). coredis supports both blocking and non-blocking connection pools. The default pool that is allocated is a non-blocking connection pool.

To explicitly select the type of connection pool used pass in the appropriate class as coredis.Redis.connection_pool_cls or coredis.RedisCluster.connection_pool_cls.

Connection pools can also be shared between multiple clients through the coredis.Redis.connection_pool or coredis.RedisCluster.connection_pool parameter.

Non-Blocking Connection Pool

Standalone

ConnectionPool

Cluster

ClusterConnectionPool

The default non-blocking connection pools that are allocated to clients will only allow upto max_connections connections to be acquired concurrently, and if more are requested they will raise an exception.

In the following example, a client is created with max_connections set to 2, however 10 tasks are concurrently started. This means 8 tasks will fail:

import coredis
import asyncio

async def test():
    client = coredis.Redis(max_connections=2)
    # or with cluster
    # client = coredis.RedisCluster(
    #   "localhost", 7000,
    #   max_connections=2, max_connections_per_node=True
    # )

    await client.set("fubar", 1)
    results = await asyncio.gather(
        *[asyncio.get_running_loop().create_task(client.get("fubar")) for _ in range(10)],
        return_exceptions=True
    )
    assert len([r for r in results if isinstance(r, Exception)]) == 8

asyncio.run(test())

Changing max_connections to 10 will result in all tasks succeeding:

import coredis
import asyncio

async def test():
    client = coredis.Redis(max_connections=10)
    # or with cluster
    # client = coredis.RedisCluster(
    #   "localhost", 7000,
    #   max_connections=2, max_connections_per_node=True
    # )

    await client.set("fubar", 1)
    results = await asyncio.gather(
        *[asyncio.get_running_loop().create_task(client.get("fubar")) for _ in range(10)],
        return_exceptions=True
    )
    assert len([r for r in results if isinstance(r, Exception)]) == 0

asyncio.run(test())

Blocking Connection Pool

Standalone

BlockingConnectionPool

Cluster

BlockingClusterConnectionPool

Re-using the example from the Non-Blocking Connection Pool section above, but using the blocking variants of the connection pools for parameters coredis.Redis.connection_pool_cls or coredis.RedisCluster.connection_pool_cls setting max_connections to 2 will not result in any tasks failing but instead blocking to re-use the 2 connections in the pool:

import coredis
import asyncio

async def test():
    client = coredis.Redis(
        connection_pool_cls=coredis.BlockingConnectionPool,
        max_connections=2
    )
    # or with cluster
    # client = coredis.RedisCluster(
    #    "localhost", 7000,
    #    connection_pool_cls=coredis.BlockingClusterConnectionPool,
    #    max_connections=2,
    #    max_connections_per_node=True
    # )

    await client.set("fubar", 1)
    results = await asyncio.gather(
        *[asyncio.get_running_loop().create_task(client.get("fubar")) for _ in range(10)],
        return_exceptions=True
    )
    assert len([r for r in results if isinstance(r, Exception)]) == 0

asyncio.run(test())

Note

For BlockingClusterConnectionPool the max_connections_per_node controls whether the value of max_connections is used cluster wide or per node.

Connection types

coredis ships with three types of connections.

Custom connection classes

You can create your own connection subclasses by deriving from coredis.connection.BaseConnection as well. This may be useful if you want to control the socket behavior within an async framework. To instantiate a client class using your own connection, you need to create a connection pool, passing your class to the connection_class argument. Other keyword parameters you pass to the pool will be passed to the class specified during initialization.

pool = coredis.ConnectionPool(connection_class=YourConnectionClass,
                                your_arg='...', ...)