Migrating from 5.x to 6.0¶
Summary¶
coredis 6.0 is a major architectural rewrite focused on Structured Concurrency,
predictable resource lifetimes, and safer defaults. While most APIs remain familiar,
several core behaviors have changed.
Key highlights¶
- anyio integration
All async operations now run on
anyio. This enables structured concurrency, ensuring tasks, connections, and background resources are tied to explicit lifetimes. Many classes of bugs related to leaked connections or cancelled tasks are eliminated. Bothasyncioandtriobackends are supported.- Async context-managed classes
All user-facing classes representing Redis concepts (clients, connection pools, Pub/Sub, pipelines, stream consumers, and Sentinel clients) now support the async context manager protocol and must be used as such. This is the only way to initialize and cleanup the underlying resources used by each class.
- Pipeline auto-execution
Pipelines execute automatically when their context exits. Explicit
pipeline.execute()calls no longer exist.- RESP3 support only
RESP protocol version
2support has been removed.- Simplified connection pools
All connection pools now block when exhausted by default. Dedicated blocking pool variants have been removed.
Breaking Changes¶
Async Context Managers Required¶
All user-facing classes that interact with Redis must be used as async context managers to ensure proper initialization and cleanup.
import asyncio
import coredis
async def main():
client = coredis.Redis(host="127.0.0.1", port=6379)
await client.set("key", "1")
# Explicit disconnection of connection pool
client.connection_pool.disconnect()
asyncio.run(main())
import anyio
import coredis
async def main():
client = coredis.Redis(host="127.0.0.1", port=6379)
async with client:
await client.set("key", "1")
# Client and connection pool are automatically cleaned up
anyio.run(main, backend="asyncio") # or "trio"
Important
This applies to the following classes:
Pipeline¶
Pipelines instances must be used as async context managers. Execution happens automatically when the context exits.
Important
Pipeline instances can no longer be awaited and must be used as async context managers
Pipelines can no longer be reused
The
watchesparameter has been removed from the pipeline constructorwatch()is now a context manager that handles MULTI/EXEC automaticallypipeline.execute()has been removed — pipelines execute automatically on context exit. Useresultsto retrieve full results or await individual commands.
Auto-execution on Context Exit¶
import asyncio
import coredis
async def main():
client = coredis.Redis()
pipe = await client.pipeline()
pipe.set("foo", 1)
pipe.incr("foo")
results = await pipe.execute()
assert results == (True, 2)
asyncio.run(main())
import anyio
import coredis
async def main():
client = coredis.Redis()
async with client:
async with client.pipeline() as pipe:
pipe.set("foo", 1)
result = pipe.incr("foo")
# Pipeline auto-executes when exiting context manager
value = await result
assert value == 2
assert (True, 2) == pipe.results
anyio.run(main)
Transaction Watches¶
import asyncio
import coredis
async def main():
client = coredis.Redis()
async with await client.pipeline(transaction=True) as pipe:
await pipe.watch("key")
# Note that in 5.x the pipe would switch to immediate
# excecution mode once a watch was issued and was no longer
# the 'pipeline'.
value = await pipe.get("key")
# Only after the multi is issued does the pipeline go back
# to stacking commands.
pipe.multi()
pipe.set("key", int(value or 0) + 1)
pipe.get("key")
results = await pipe.execute()
print(results[-1])
asyncio.run(main())
import anyio
import coredis
async def main():
client = coredis.Redis()
async with client:
async with client.pipeline(transaction=False) as pipe:
async with pipe.watch("key"):
# Just use the client for any immediate executions
value = await client.get("key")
pipe.set("key", int(value or 0) + 1)
value = pipe.get("key")
# Executes atomically when exiting watch context
print(await value)
anyio.run(main)
Connection Pool¶
Important
Connection pool instances must be used as async context managers
BlockingConnectionPoolremoved —ConnectionPoolis now blockingBlockingClusterConnectionPoolremoved —ClusterConnectionPoolis now blockingTimeoutErroris now raised when a connection cannot be acquired from the pool within thetimeoutinstead ofConnectionError. (This would normally only happen if all the connections in the pool are being used for blocking commands, as non blocking commands are multiplexed over shared connections)
import coredis
import asyncio
from coredis import ConnectionPool, BlockingConnectionPool
async def main():
# Non-blocking pool
client = coredis.Redis(connection_pool=ConnectionPool(max_connections=2))
try:
await asyncio.gather(*(client.blpop(["fubar"], timeout=1) for _ in range(3)))
except coredis.exceptions.ConnectionError as err:
print("Failed with too many connections attempted")
# Blocking pool
client = coredis.Redis(connection_pool=BlockingConnectionPool(max_connections=2))
await asyncio.gather(*(client.blpop(["fubar"], timeout=1) for _ in range(3)))
# Blocking pool with timeout
client = coredis.Redis(connection_pool=BlockingConnectionPool(max_connections=2, timeout=1))
try:
await asyncio.gather(*(client.blpop(["fubar"], timeout=5) for _ in range(3)))
except coredis.exceptions.ConnectionError as err:
print("Failed with timeout")
asyncio.run(main())
import coredis
import asyncio
async def main():
async with coredis.Redis(
connection_pool=coredis.pool.ConnectionPool(max_connections=2)
) as client:
print(await asyncio.gather(*(client.blpop(["fubar"], timeout=1) for _ in range(3))))
async with coredis.Redis(
connection_pool=coredis.pool.ConnectionPool(max_connections=2, timeout=1)
) as client:
try:
print(await asyncio.gather(*(client.blpop(["fubar"], timeout=5) for _ in range(3))))
except TimeoutError: # Note that a `TimeoutError` is raised instead of `ConnectionError`
print("Failed with timeout")
asyncio.run(main())
Client-Side Caching¶
Important
import asyncio
import coredis
from coredis.cache import TrackingCache
async def main():
cache=TrackingCache(max_size_bytes=128 * 1024 * 1024)
client = coredis.Redis(cache=cache)
await client.set("fubar", 1)
await client.get("fubar")
await client.get("fubar")
print(cache.stats)
asyncio.run(main())
import anyio
import coredis
from coredis.patterns.cache import LRUCache
async def main():
cache=LRUCache(max_keys=10000)
client = coredis.Redis(cache=cache)
async with client:
await client.set("fubar", 1)
await client.get("fubar")
await client.get("fubar")
print(cache.stats)
anyio.run(main)
PubSub¶
Important
PubSub classes must be used as async context managers
pubsub()now only accepts keyword argumentsSubscriptions are validated and may raise
TimeoutErrorif an acknowledgement isn’t received within the configuredsubscription_timeout(defaults to 1 second)
import asyncio
import coredis
async def main():
client = coredis.Redis()
pubsub = client.pubsub()
await pubsub.subscribe("channel")
async for message in pubsub:
print(message)
asyncio.run(main())
import anyio
import coredis
async def main():
client = coredis.Redis()
async with client:
async with client.pubsub() as pubsub:
await pubsub.subscribe("channel")
async for message in pubsub:
print(message)
anyio.run(main)
Stream Consumer¶
Important
Stream consumer instances must be async context managers
Stream consumers have moved from the
coredis.streammodule tocoredis.patterns.streams. Update your imports accordingly
import asyncio
import coredis
from coredis.stream import Consumer
async def main():
client = coredis.Redis()
consumer = await Consumer(client, streams=["one", "two"])
stream, entry = await consumer.get_entry()
asyncio.run(main())
import anyio
import coredis
async def main():
async with coredis.Redis() as client:
async with client.xconsumer(streams=["one", "two"]) as consumer:
stream, entry = await consumer.get_entry()
async for stream, entry in consumer:
print(stream, entry)
async with client.xconsumer(
streams=["one", "two"],
group="group-a",
consumer="consumer-1",
) as group_consumer:
async for stream, entry in group_consumer:
print(stream, entry)
anyio.run(main)
Sentinel¶
Important
Sentinel instances must be used as async context managers
import anyio
import coredis
async def main():
sentinel = coredis.Sentinel(sentinels=[("localhost", 26379)])
async with sentinel:
primary = sentinel.primary_for("svc")
replica = sentinel.replica_for("svc")
async with primary, replica:
await primary.set("fubar", 1)
await replica.get("fubar")
anyio.run(main)
Scripting¶
LUA Scripts¶
Important
Any script function stub wrapped with
coredis.commands.Script.wraps()must annotate key arguments withKeyTto distinguish it from script argsAny function stub wrapped with
coredis.commands.Script.wraps()should be a synchronous function that returns an instance of the genericCommandRequest
import asyncio
import coredis
async def main():
client = coredis.Redis()
@client.register_script("return {KEYS[1], ARGV[1]}").wraps(key_spec=["key"])
async def echo_key_value(key: str, value: str) -> list[bytes]:
...
k, v = await echo_key_value("co", "redis")
print(f"{k!r}={v!r}")
asyncio.run(main())
import asyncio
import coredis
from coredis.typing import KeyT
from coredis.commands import CommandRequest
async def main():
client = coredis.Redis()
@client.register_script("return {KEYS[1], ARGV[1]}").wraps()
def echo_key_value(key: KeyT, value: str) -> CommandRequest[list[bytes]]:
...
async with client:
k, v = await echo_key_value("co", "redis")
print(f"{k!r}={v!r}")
asyncio.run(main())
Library Functions¶
Important
Library.wrapsreplaced bycoredis.commands.function.wraps()Any Library function type stub wrapped with
wrapsmust annotate key arguments withKeyTto distinguish it from script argsAny Library function stub wrapped with
coredis.commands.Script.wraps()should be a synchronous function that returns an instance of the genericCommandRequest
import asyncio
from typing import AnyStr
import coredis
from coredis.commands import CommandRequest, Library
class EchoLib(Library[AnyStr]):
NAME = "echolib"
CODE = """
#!lua name=echolib
redis.register_function('echo_key_value', function(k, a)
return {k[1], a[1]}
end)
"""
@Library.wraps("echo_key_value", key_spec=["key"])
def echo_key_value(self, key: str, value: str): ...
async def main():
client = coredis.Redis()
lib = await EchoLib(client, replace=True)
print(await lib.echo_key_value("fubar", 1))
asyncio.run(main())
import anyio
from typing import AnyStr
import coredis
from coredis.commands import Library
from coredis.commands.function import wraps
from coredis.typing import KeyT, ValueT
class EchoLib(Library[AnyStr]):
NAME = "echolib"
CODE = """
#!lua name=echolib
redis.register_function('echo_key_value', function(k, a)
return {k[1], a[1]}
end)
"""
@wraps()
def echo_key_value(self, key: KeyT, value: ValueT) -> CommandRequest[list[bytes]]: ...
async def main():
async with coredis.Redis() as client:
lib = await EchoLib(client, replace=True)
print(await lib.echo_key_value("fubar", 1))
anyio.run(main)
Lock API¶
Important
import asyncio
import coredis
from coredis.recipes.locks import LuaLock
async def main():
client = coredis.Redis()
async with LuaLock(client, "mylock", timeout=1.0):
print("locked!")
asyncio.run(main())
import anyio
import coredis
async def main():
async with coredis.Redis() as client:
async with client.lock("mylock", timeout=1.0):
print("locked!")
anyio.run(main)
Removals¶
RESP2 protocol support
Monitor wrapper
RedisGraph support
Migration Checklist¶
☐
RedisandRedisClusterclients must be used as async context managers☐ Replace all instances of
coredis.BlockingConnectionPoolandcoredis.BlockingClusterConnectionPoolwithConnectionPoolorClusterConnectionPool.☐ Pipelines must be used as async context managers; remove all explicit
execute()calls.☐ PubSub consumers must be used as async context managers
☐ Stream consumers must be used as async context managers
☐ Sentinel clients and derived primary/replica instances must be used as async context managers
☐ Replace
coredis.cache.TrackingCachewithLRUCache☐ Update cache imports from
coredis.cachetocoredis.patterns.cache☐ Update stream consumer imports from
coredis.streamtocoredis.patterns.streams☐ Update lock imports from
coredis.recipes.lockstocoredis.patterns.lock☐ Replace all usage of
BlockingConnectionPool/BlockingClusterConnectionPoolwithConnectionPoolorClusterConnectionPool☐ Add
KeyTtype annotations to scripts and library function keys☐ Replace
Library.wrapswithwraps()☐ Handle
TimeoutErrorinstead ofConnectionErrorfor connection pool timeouts