Connections¶
Connection Pools¶
Both Redis
and RedisCluster
are backed by a connection
pool that manages the underlying connections to the redis server(s). coredis supports
both blocking and non-blocking connection pools. The default pool that is allocated is a
non-blocking connection pool.
To explicitly select the type of connection pool used pass in the appropriate class as
coredis.Redis.connection_pool_cls
or coredis.RedisCluster.connection_pool_cls
.
Connection pools can also be shared between multiple clients through the coredis.Redis.connection_pool
or coredis.RedisCluster.connection_pool
parameter.
Non-Blocking Connection Pool¶
- Standalone
ConnectionPool
- Cluster
ClusterConnectionPool
The default non-blocking connection pools that are allocated to clients will only allow
upto max_connections
connections to be acquired concurrently, and if more are requested
they will raise an exception.
In the following example, a client is created with max_connections
set to 2
, however 10
tasks are concurrently started. This means 8
tasks will fail:
import coredis
import asyncio
async def test():
client = coredis.Redis(max_connections=2)
# or with cluster
# client = coredis.RedisCluster(
# "localhost", 7000,
# max_connections=2, max_connections_per_node=True
# )
await client.set("fubar", 1)
results = await asyncio.gather(
*[asyncio.get_running_loop().create_task(client.get("fubar")) for _ in range(10)],
return_exceptions=True
)
assert len([r for r in results if isinstance(r, Exception)]) == 8
asyncio.run(test())
Changing max_connections
to 10
will result in all tasks succeeding:
import coredis
import asyncio
async def test():
client = coredis.Redis(max_connections=10)
# or with cluster
# client = coredis.RedisCluster(
# "localhost", 7000,
# max_connections=2, max_connections_per_node=True
# )
await client.set("fubar", 1)
results = await asyncio.gather(
*[asyncio.get_running_loop().create_task(client.get("fubar")) for _ in range(10)],
return_exceptions=True
)
assert len([r for r in results if isinstance(r, Exception)]) == 0
asyncio.run(test())
Blocking Connection Pool¶
- Standalone
BlockingConnectionPool
- Cluster
BlockingClusterConnectionPool
Re-using the example from the Non-Blocking Connection Pool section above,
but using the blocking variants of the connection pools for parameters coredis.Redis.connection_pool_cls
or coredis.RedisCluster.connection_pool_cls
setting max_connections
to 2
will not result in any tasks failing but instead blocking to re-use
the 2
connections in the pool:
import coredis
import asyncio
async def test():
client = coredis.Redis(
connection_pool_cls=coredis.BlockingConnectionPool,
max_connections=2
)
# or with cluster
# client = coredis.RedisCluster(
# "localhost", 7000,
# connection_pool_cls=coredis.BlockingClusterConnectionPool,
# max_connections=2,
# max_connections_per_node=True
# )
await client.set("fubar", 1)
results = await asyncio.gather(
*[asyncio.get_running_loop().create_task(client.get("fubar")) for _ in range(10)],
return_exceptions=True
)
assert len([r for r in results if isinstance(r, Exception)]) == 0
asyncio.run(test())
Note
For BlockingClusterConnectionPool
the
max_connections_per_node
controls whether the value of max_connections
is used cluster wide or per node.
Connection types¶
coredis ships with three types of connections.
The default,
coredis.connection.Connection
, is a normal TCP socket based connection.UnixDomainSocketConnection
allows for clients running on the same device as the server to connect via a unix domain socket. To use aUnixDomainSocketConnection
connection, simply pass theunix_socket_path
argument, which is a string to the unix domain socket file.Additionally, make sure the parameter is defined in your redis.conf file. It’s commented out by default.
r = coredis.Redis(unix_socket_path='/tmp/redis.sock')
ClusterConnection
connection which is essentially justConnection
with the exception of ensuring appropriateREADONLY
handling is set if configured (coredis.RedisCluster.readonly
)
Custom connection classes¶
You can create your own connection subclasses by deriving from
coredis.connection.BaseConnection
as well. This may be useful if
you want to control the socket behavior within an async framework. To
instantiate a client class using your own connection, you need to create
a connection pool, passing your class to the connection_class argument.
Other keyword parameters you pass to the pool will be passed to the class
specified during initialization.
pool = coredis.ConnectionPool(connection_class=YourConnectionClass,
your_arg='...', ...)