Migrating from 5.x to 6.0

Summary

coredis 6.0 is a major architectural rewrite focused on Structured Concurrency, predictable resource lifetimes, and safer defaults. While most APIs remain familiar, several core behaviors have changed.

Key highlights

anyio integration

All async operations now run on anyio. This enables structured concurrency, ensuring tasks, connections, and background resources are tied to explicit lifetimes. Many classes of bugs related to leaked connections or cancelled tasks are eliminated. Both asyncio and trio backends are supported.

Async context-managed classes

All user-facing classes representing Redis concepts (clients, connection pools, Pub/Sub, pipelines, stream consumers, and Sentinel clients) now support the async context manager protocol and must be used as such. This is the only way to initialize and cleanup the underlying resources used by each class.

Pipeline auto-execution

Pipelines execute automatically when their context exits. Explicit pipeline.execute() calls no longer exist.

RESP3 support only

RESP protocol version 2 support has been removed.

Simplified connection pools

All connection pools now block when exhausted by default. Dedicated blocking pool variants have been removed.

Breaking Changes

Async Context Managers Required

All user-facing classes that interact with Redis must be used as async context managers to ensure proper initialization and cleanup.

import asyncio
import coredis

async def main():
    client = coredis.Redis(host="127.0.0.1", port=6379)
    await client.set("key", "1")

    # Explicit disconnection of connection pool
    client.connection_pool.disconnect()

asyncio.run(main())
import anyio
import coredis

async def main():
    client = coredis.Redis(host="127.0.0.1", port=6379)
    async with client:
        await client.set("key", "1")
    # Client and connection pool are automatically cleaned up

anyio.run(main, backend="asyncio")  # or "trio"

Important

This applies to the following classes:

Pipeline

Pipelines instances must be used as async context managers. Execution happens automatically when the context exits.

Important

  • Pipeline instances can no longer be awaited and must be used as async context managers

  • Pipelines can no longer be reused

  • The watches parameter has been removed from the pipeline constructor

  • watch() is now a context manager that handles MULTI/EXEC automatically

  • pipeline.execute() has been removed — pipelines execute automatically on context exit. Use results to retrieve full results or await individual commands.

Auto-execution on Context Exit

import asyncio
import coredis

async def main():
    client = coredis.Redis()
    pipe = await client.pipeline()
    pipe.set("foo", 1)
    pipe.incr("foo")
    results = await pipe.execute()
    assert results == (True, 2)

asyncio.run(main())
import anyio
import coredis

async def main():
    client = coredis.Redis()
    async with client:
        async with client.pipeline() as pipe:
            pipe.set("foo", 1)
            result = pipe.incr("foo")

        # Pipeline auto-executes when exiting context manager
        value = await result
        assert value == 2
        assert (True, 2) == pipe.results

anyio.run(main)

Transaction Watches

import asyncio
import coredis

async def main():
    client = coredis.Redis()
    async with await client.pipeline(transaction=True) as pipe:
        await pipe.watch("key")
        # Note that in 5.x the pipe would switch to immediate
        # excecution mode once a watch was issued and was no longer
        # the 'pipeline'.
        value = await pipe.get("key")
        # Only after the multi is issued does the pipeline go back
        # to stacking commands.
        pipe.multi()
        pipe.set("key", int(value or 0) + 1)
        pipe.get("key")
        results = await pipe.execute()
        print(results[-1])

asyncio.run(main())
import anyio
import coredis

async def main():
    client = coredis.Redis()
    async with client:
        async with client.pipeline(transaction=False) as pipe:
            async with pipe.watch("key"):
                # Just use the client for any immediate executions
                value = await client.get("key")
                pipe.set("key", int(value or 0) + 1)
                value = pipe.get("key")
            # Executes atomically when exiting watch context
        print(await value)
anyio.run(main)

Connection Pool

Important

  • Connection pool instances must be used as async context managers

  • BlockingConnectionPool removed — ConnectionPool is now blocking

  • BlockingClusterConnectionPool removed — ClusterConnectionPool is now blocking

  • TimeoutError is now raised when a connection cannot be acquired from the pool within the timeout instead of ConnectionError. (This would normally only happen if all the connections in the pool are being used for blocking commands, as non blocking commands are multiplexed over shared connections)

import coredis
import asyncio

from coredis import ConnectionPool, BlockingConnectionPool

async def main():
    # Non-blocking pool
    client = coredis.Redis(connection_pool=ConnectionPool(max_connections=2))
    try:
        await asyncio.gather(*(client.blpop(["fubar"], timeout=1) for _ in range(3)))
    except coredis.exceptions.ConnectionError as err:
        print("Failed with too many connections attempted")

    # Blocking pool
    client = coredis.Redis(connection_pool=BlockingConnectionPool(max_connections=2))
    await asyncio.gather(*(client.blpop(["fubar"], timeout=1) for _ in range(3)))

    # Blocking pool with timeout
    client = coredis.Redis(connection_pool=BlockingConnectionPool(max_connections=2, timeout=1))
    try:
        await asyncio.gather(*(client.blpop(["fubar"], timeout=5) for _ in range(3)))
    except coredis.exceptions.ConnectionError as err:
        print("Failed with timeout")

asyncio.run(main())
import coredis
import asyncio

async def main():
    async with coredis.Redis(
        connection_pool=coredis.pool.ConnectionPool(max_connections=2)
    ) as client:
        print(await asyncio.gather(*(client.blpop(["fubar"], timeout=1) for _ in range(3))))

    async with coredis.Redis(
        connection_pool=coredis.pool.ConnectionPool(max_connections=2, timeout=1)
    ) as client:
        try:
            print(await asyncio.gather(*(client.blpop(["fubar"], timeout=5) for _ in range(3))))
        except TimeoutError:  # Note that a `TimeoutError` is raised instead of `ConnectionError`
            print("Failed with timeout")

asyncio.run(main())

Client-Side Caching

Important

  • coredis.cache.TrackingCache replaced by LRUCache

  • Cache classes moved from coredis.cache to coredis.patterns.cache

  • max_size_bytes removed — only max_keys is supported

import asyncio
import coredis
from coredis.cache import TrackingCache

async def main():
    cache=TrackingCache(max_size_bytes=128 * 1024 * 1024)
    client = coredis.Redis(cache=cache)
    await client.set("fubar", 1)
    await client.get("fubar")
    await client.get("fubar")
    print(cache.stats)

asyncio.run(main())
import anyio
import coredis
from coredis.patterns.cache import LRUCache

async def main():
    cache=LRUCache(max_keys=10000)
    client = coredis.Redis(cache=cache)
    async with client:
      await client.set("fubar", 1)
      await client.get("fubar")
      await client.get("fubar")
      print(cache.stats)

anyio.run(main)

PubSub

Important

  • PubSub classes must be used as async context managers

  • pubsub() now only accepts keyword arguments

  • Subscriptions are validated and may raise TimeoutError if an acknowledgement isn’t received within the configured subscription_timeout (defaults to 1 second)

import asyncio
import coredis

async def main():
    client = coredis.Redis()
    pubsub = client.pubsub()
    await pubsub.subscribe("channel")
    async for message in pubsub:
        print(message)

asyncio.run(main())
import anyio
import coredis

async def main():
    client = coredis.Redis()
    async with client:
        async with client.pubsub() as pubsub:
            await pubsub.subscribe("channel")
            async for message in pubsub:
                print(message)

anyio.run(main)

Stream Consumer

Important

  • Stream consumer instances must be async context managers

  • Stream consumers have moved from the coredis.stream module to coredis.patterns.streams. Update your imports accordingly

import asyncio
import coredis
from coredis.stream import Consumer

async def main():
    client = coredis.Redis()
    consumer = await Consumer(client, streams=["one", "two"])
    stream, entry = await consumer.get_entry()

asyncio.run(main())
import anyio
import coredis

async def main():
    async with coredis.Redis() as client:
        async with client.xconsumer(streams=["one", "two"]) as consumer:
            stream, entry = await consumer.get_entry()
            async for stream, entry in consumer:
                print(stream, entry)

        async with client.xconsumer(
            streams=["one", "two"],
            group="group-a",
            consumer="consumer-1",
        ) as group_consumer:
            async for stream, entry in group_consumer:
                print(stream, entry)

anyio.run(main)

Sentinel

Important

Sentinel instances must be used as async context managers

import anyio
import coredis

async def main():
    sentinel = coredis.Sentinel(sentinels=[("localhost", 26379)])
    async with sentinel:
        primary = sentinel.primary_for("svc")
        replica = sentinel.replica_for("svc")
        async with primary, replica:
            await primary.set("fubar", 1)
            await replica.get("fubar")

anyio.run(main)

Scripting

LUA Scripts

Important

import asyncio
import coredis

async def main():
    client = coredis.Redis()

    @client.register_script("return {KEYS[1], ARGV[1]}").wraps(key_spec=["key"])
    async def echo_key_value(key: str, value: str) -> list[bytes]:
      ...

    k, v = await echo_key_value("co", "redis")
    print(f"{k!r}={v!r}")

asyncio.run(main())
import asyncio
import coredis
from coredis.typing import KeyT
from coredis.commands import CommandRequest

async def main():
    client = coredis.Redis()

    @client.register_script("return {KEYS[1], ARGV[1]}").wraps()
    def echo_key_value(key: KeyT, value: str) -> CommandRequest[list[bytes]]:
      ...

    async with client:
        k, v = await echo_key_value("co", "redis")
        print(f"{k!r}={v!r}")

asyncio.run(main())

Library Functions

Important

import asyncio
from typing import AnyStr

import coredis
from coredis.commands import CommandRequest, Library

class EchoLib(Library[AnyStr]):
    NAME = "echolib"
    CODE = """
    #!lua name=echolib

    redis.register_function('echo_key_value', function(k, a)
      return {k[1], a[1]}
    end)
    """

    @Library.wraps("echo_key_value", key_spec=["key"])
    def echo_key_value(self, key: str, value: str): ...

async def main():
    client = coredis.Redis()
    lib = await EchoLib(client, replace=True)
    print(await lib.echo_key_value("fubar", 1))


asyncio.run(main())
import anyio
from typing import AnyStr

import coredis
from coredis.commands import Library
from coredis.commands.function import wraps
from coredis.typing import KeyT, ValueT

class EchoLib(Library[AnyStr]):
    NAME = "echolib"
    CODE = """
    #!lua name=echolib

    redis.register_function('echo_key_value', function(k, a)
      return {k[1], a[1]}
    end)
    """

    @wraps()
    def echo_key_value(self, key: KeyT, value: ValueT) -> CommandRequest[list[bytes]]: ...

async def main():
    async with coredis.Redis() as client:
        lib = await EchoLib(client, replace=True)
        print(await lib.echo_key_value("fubar", 1))

anyio.run(main)

Lock API

Important

  • LuaLock moved from coredis.recipes.locks to Lock

  • Use lock() convenience method for simpler access

import asyncio
import coredis
from coredis.recipes.locks import LuaLock

async def main():
    client = coredis.Redis()
    async with LuaLock(client, "mylock", timeout=1.0):
        print("locked!")

asyncio.run(main())
import anyio
import coredis

async def main():
    async with coredis.Redis() as client:
        async with client.lock("mylock", timeout=1.0):
            print("locked!")

anyio.run(main)

Removals

  • RESP2 protocol support

  • Monitor wrapper

  • RedisGraph support

Migration Checklist

  • Redis and RedisCluster clients must be used as async context managers

  • ☐ Replace all instances of coredis.BlockingConnectionPool and coredis.BlockingClusterConnectionPool with ConnectionPool or ClusterConnectionPool.

  • ☐ Pipelines must be used as async context managers; remove all explicit execute() calls.

  • ☐ PubSub consumers must be used as async context managers

  • ☐ Stream consumers must be used as async context managers

  • ☐ Sentinel clients and derived primary/replica instances must be used as async context managers

  • ☐ Replace coredis.cache.TrackingCache with LRUCache

  • ☐ Update cache imports from coredis.cache to coredis.patterns.cache

  • ☐ Update stream consumer imports from coredis.stream to coredis.patterns.streams

  • ☐ Update lock imports from coredis.recipes.locks to coredis.patterns.lock

  • ☐ Replace all usage of BlockingConnectionPool / BlockingClusterConnectionPool with ConnectionPool or ClusterConnectionPool

  • ☐ Add KeyT type annotations to scripts and library function keys

  • ☐ Replace Library.wraps with wraps()

  • ☐ Replace LuaLock usage with lock() or Lock

  • ☐ Handle TimeoutError instead of ConnectionError for connection pool timeouts