Redis Modules

coredis contains built in support for a few popular Redis Modules.

The commands exposed by the individual modules follow the same API conventions as the core builtin commands and can be accessed via the appropriate command group property (such as json, cf, timeseries etc…) of Redis or RedisCluster.

For example:

client = coredis.Redis()
async with client:
    # RedisJSON
    await client.json.get("key")
    # RediSearch
    await client.search.search("index", "*")
    # RedisBloom:BloomFilter
    await client.bf.reserve("bf", 0.001, 1000)
    # RedisBloom:CuckooFilter
    await client.cf.reserve("cf", 1000)
    # RedisTimeSeries
    await client.timeseries.add("ts", 1, 1)

Module commands can also be used in Pipelines (and transactions) by accessing them via the command group property in the same way as described above.

For example:

async with client.pipeline(transaction=True) as pipe:
    pipe.json.get("key")
    pipe.json.get("key")
    pipe.search.search("index", "*")
    pipe.bf.reserve("bf", 0.001, 1000)
    pipe.cf.reserve("cf", 1000)
    pipe.timeseries.add("ts", 1, 1)

RedisJSON

RedisJSON adds native support for storing and retrieving JSON documents.

To access the commands exposed by the module use the json property or manually instantiate the Json class with an instance of Redis or RedisCluster

Get/set operations:

import coredis
client = coredis.Redis()

async with client:
    await client.json.set(
        "key1", ".", {"a": 1, "b": [1, 2, 3], "c": "str"}
    )
    assert 1 == await client.json.get("key1", ".a")
    assert [1,2,3] == await client.json.get("key1", ".b")
    assert "str" == await client.json.get("key1", ".c")

    await client.json.set("key2", ".", {"a": 2, "b": [4,5,6], "c": ["str"]})

    # multi get
    assert ["str", ["str"]] == await client.json.mget(["key1", "key2"], ".c")

Clear versus Delete:

await client.json.set(
    "key1", ".", {"a": 1, "b": [1, 2, 3], "c": "str", "d": {"e": []}}
)

# a numeric value
assert 1 == await client.json.clear("key1", ".a")
assert 0 == await client.json.get("key1", ".a")
assert 1 == await client.json.delete("key1", ".a")
assert {"b", "c", "d"} == set(await client.json.objkeys("key1"))

# an array
assert 1 == await client.json.clear("key1", ".b")
assert [] == await client.json.get("key1", ".b")
assert 1 == await client.json.delete("key1", ".b")
assert {"d", "c"} == set(await client.json.objkeys("key1"))

# a string
assert 0 == await client.json.clear("key1", ".c")
assert "str" == await client.json.get("key1", ".c")
assert 1 == await client.json.delete("key1", ".c")
assert ["d"] == await client.json.objkeys("key1")

# an object
assert 1 == await client.json.clear("key1", ".d")
assert {} == await client.json.get("key1", ".d")
assert 1 == await client.json.delete("key1", ".d")
assert [] == await client.json.objkeys("key1")

Array operations:

await client.json.set("key", ".", [])
assert 1 == await client.json.arrappend("key", [1], ".")
assert 0 == await client.json.arrindex("key", ".", 1)
assert 3 == await client.json.arrappend("key", [2, 3], ".")
assert 4 == await client.json.arrinsert("key", ".", 0, [-1])
assert [-1, 1, 2, 3] == await client.json.get("key", ".")

For more details refer to the API documentation for Json

Note

By default coredis uses the json module from the python standard library to serialize inputs and deserialize the responses to JsonType. If, however the orjson package is installed, coredis will transparently use it for improved performance.

RediSearch

RediSearch adds support for indexing hash and json datatypes and performing search, aggregation, suggestion & autocompletion.

Search & Aggregation

To access the search & aggregation related commands exposed by the module use the search property or manually instantiate the Search class with an instance of Redis or RedisCluster

Create an Index

Since creating an index requires a non-trivial assembly of arguments that are sent to the FT.CREATE, the coredis method create() accepts a collection of Field instances as an argument to schema.

The example below creates two similar indices for json and hash data, that demonstrate some common field definitions:

import coredis
import coredis.modules
client = coredis.Redis(decode_responses=True)
async with client:
    # Create an index on json documents
    await client.search.create("json_index", on=coredis.PureToken.JSON, schema = [
        coredis.modules.search.Field('$.name', coredis.PureToken.TEXT, alias='name'),
        coredis.modules.search.Field('$.country', coredis.PureToken.TEXT, alias='country'),
        coredis.modules.search.Field('$.population', coredis.PureToken.NUMERIC, alias='population'),
        coredis.modules.search.Field("$.location", coredis.PureToken.GEO, alias='location'),
        coredis.modules.search.Field('$.iso_tags', coredis.PureToken.TAG, alias='iso_tags'),
        coredis.modules.search.Field('$.summary_vector', coredis.PureToken.VECTOR, alias='summary_vector',
            algorithm="FLAT",
            attributes={
                "DIM": 768,
                "DISTANCE_METRIC": "COSINE",
                "TYPE": "FLOAT32",
            }
        )

    ], prefixes=['json:city:'])

    # or on all hashes that start with a prefix ``city:``
    await client.search.create("hash_index", on=coredis.PureToken.HASH, schema = [
        coredis.modules.search.Field('name', coredis.PureToken.TEXT),
        coredis.modules.search.Field('country', coredis.PureToken.TEXT),
        coredis.modules.search.Field('population', coredis.PureToken.NUMERIC),
        coredis.modules.search.Field("location", coredis.PureToken.GEO),
        coredis.modules.search.Field('iso_tags', coredis.PureToken.TAG, separator=","),
        coredis.modules.search.Field('summary_vector', coredis.PureToken.VECTOR,
            algorithm="FLAT",
            attributes={
                "DIM": 768,
                "DISTANCE_METRIC": "COSINE",
                "TYPE": "FLOAT32",
            }
        )
    ], prefixes=['city:'])

To populate the indices we can add some sample city data (a sample that can be used for the above index definition can be found in the coredis repository) using a pipeline for performance:

import requests
import numpy

cities = requests.get(
    "https://raw.githubusercontent.com/alisaifee/coredis/master/tests/modules/data/city_index.json"
).json()

async with client.pipeline(transaction=False) as pipe:
    for name, fields in cities.items():
        pipe.json.set(f"json:city:{name}", f".", {
            "name": name,
            "country": fields["country"],
            "population": int(fields["population"]),
            "location": f"{fields['lng']},{fields['lat']}",
            "iso_tags": fields["iso_tags"],
            "summary_vector": fields["summary_vector"],
        })

        pipe.hset(f"city:{name}", {
            "name": name,
            "country": fields["country"],
            "population": fields["population"],
            "location": f"{fields['lng']},{fields['lat']}",
            "iso_tags": ",".join(fields["iso_tags"]),
            "summary_vector": numpy.asarray(fields["summary_vector"]).astype(numpy.float32).tobytes(),
        })

Note

Take special note of how the population (numeric field), iso_tags (tag field) & summary_vector (vector field) fields are handled differently in the case of hashes vs json documents.

Inspect the index information:

json_index_info = await client.search.info("json_index")
hash_index_info = await client.search.info("hash_index")


assert (0.0, 50) == (json_index_info["indexing"], json_index_info["num_docs"])
assert (0.0, 50) == (hash_index_info["indexing"], json_index_info["num_docs"])

Aggregation

To perform aggregations use the aggregate() method that provides the interface to the FT.AGGREGATE.

To simplify construction of transformation steps in the aggregation pipeline a few helper dataclasses are provided to construct the pipeline steps for the transforms parameter.

The results of the aggregation are represented by the SearchAggregationResult class.

Group by country and count and sort by count desc:

aggregations = await client.search.aggregate(
    "hash_index",
    "*",
    load="*",
    transforms=[
        # group by country=>count
        coredis.modules.search.Group(
            "@country", [
                coredis.modules.search.Reduce("count", [0], "city_count"),
             ]
        ),
    ],
    sortby={"@city_count": coredis.PureToken.DESC},
)

assert "China" == aggregations.results[0]["country"]
assert 18 == int(aggregations.results[0]["city_count"])

Filter, Group->Reduce, Apply, Group:

aggregations = await client.search.aggregate(
    "hash_index",
    "*",
    load="*",
    transforms=[
        # include only cities with population greater than 20 million
        coredis.modules.search.Filter(
            '@population > 20000000'
        ),
        # group by country=>{count, average_city_population}
        coredis.modules.search.Group(
            "@country", [
                coredis.modules.search.Reduce("count", [0], "city_count"),
                coredis.modules.search.Reduce("avg", [1, '@population'], "average_city_population")
             ]
        ),
        # apply a transformation of average_city_population -> log10(average_city_population)
        coredis.modules.search.Apply(
            "floor(log(@average_city_population))",
            "average_population_bucket"
        ),
        # group by average_population_bucket=>countries
        coredis.modules.search.Group(
            "@average_population_bucket", [
                coredis.modules.search.Reduce("tolist", [1, "@country"], "countries"),
             ]
        ),
    ],
)
assert aggregations.results[0] == {
    'average_population_bucket': '16',
    'countries': ['Brazil', 'South Korea', 'Egypt', 'Mexico']
}
assert aggregations.results[1] == {
    'average_population_bucket': '17',
    'countries': ['Japan', 'Indonesia', 'China', 'Philippines', 'India']
}

For more details refer to the API documentation for Search

Autocomplete

To access the commands from the SUGGEST group of the RedisSearch module use the autocomplete property or manually instantiate the Autocomplete class with an instance of Redis or RedisCluster

Add some terms, each with an associated score to a group:

await client.autocomplete.sugadd("cities", "New Milton", 5.0)
await client.autocomplete.sugadd("cities", "New Port", 4.0)
await client.autocomplete.sugadd("cities", "New Richmond", 3.0)
await client.autocomplete.sugadd("cities", "New Albany", 2.0)
await client.autocomplete.sugadd("cities", "New York", 1.0)

Fetch some suggestions:

suggestions = await client.autocomplete.sugget("cities", "new")
assert 5 == len(suggestions)
assert "New Milton" == suggestions[0].string

suggestions = await client.autocomplete.sugget("cities", "new po")
assert "New Port" == suggestions[0].string

Boost the score of a term:

await client.autocomplete.sugadd("cities", "New York", 5.0, increment_score=True)

suggestions = await client.autocomplete.sugget("cities", "new")
assert "New York" == suggestions[0].string

For more details refer to the API documentation for Autocomplete

RedisBloom

The probabilistic datastructures exposed by RedisBloom can be accessed through coredis using the following properties (or explicitly by instantiating the associated module exposed by the property):

BloomFilter

Redis.bf / RedisCluster.bf

import coredis
client = coredis.Redis()
async with client:
    # create filter
    await client.bf.reserve("filter", 0.1, 1000)

    # add items
    await client.bf.add("filter", 1)
    await client.bf.madd("filter", [2,3,4])

    # test for inclusion
    assert await client.bf.exists("filter", 1)
    assert (True, False) == await client.bf.mexists("filter", [2,5])

    # or
    assert await coredis.modules.BloomFilter(client).exists("filter", 1)
    ...

For more details refer to the API documentation for BloomFilter

CuckooFilter

Redis.cf / RedisCluster.cf

# create filter
await client.cf.reserve("filter", 1000)

# add items
assert await client.cf.add("filter", 1)
assert not await client.cf.addnx("filter", 1)

# test for inclusion
assert await client.cf.exists("filter", 1)
assert 1 == await client.cf.count("filter", 1)

# delete an item
assert await client.cf.delete("filter", 1)

# test for inclusion
assert not await client.cf.exists("filter", 1)
assert 0 == await client.cf.count("filter", 1)

For more details refer to the API documentation for CuckooFilter

CountMinSketch

Redis.cms / RedisCluster.cms

# create a sketch
await client.cms.initbydim("sketch", 2, 50)

# increment the counts for multiple entries
assert (1, 2) == await client.cms.incrby("sketch", {"a": 1, "b": 2})

# query the count for multiple entries
assert (1, 2, 0) == await client.cms.query("sketch", ["a", "b", "c"])

For more details refer to the API documentation for CountMinSketch

TopK

Redis.topk / RedisCluster.topk

import string
import itertools
import random

# create a top-3
await client.topk.reserve("top3", 3)

# add entries
letters = list(itertools.chain(*[k[0]*k[1] for k in list(enumerate(string.ascii_lowercase))]))
random.shuffle(letters)
await client.topk.add("top3", letters)

# get top 3 letters
assert (b'z', b'y', b'x') == await client.topk.list("top3")

For more details refer to the API documentation for TopK

TDigest

Redis.tdigest / RedisCluster.tdigest

# create a digest
await client.tdigest.create("digest")

# add some values
await client.tdigest.add("digest", 1, [1, 2, 3, 4])

# add some more values
await client.tdigest.add("digest", 1, [1, 2, 3, 4])

# get the rank & reverse ranks
assert (1.0, 1.0, 2.0) == await client.tdigest.byrank("digest", [0, 1, 2])
assert (6.0, 5.0, 4.0) == await client.tdigest.byrevrank("digest", [0, 1, 2])

# get the quantiles
assert (1.0, 3.0, 6.0) == await client.tdigest.quantile("digest", [0, 0.5, 1])

For more details refer to the API documentation for TDigest

RedisTimeSeries

RedisTimeSeries adds a time series data structure to Redis that allows ingesting and querying time series’.

To access the commands exposed by the module use the timeseries property or manually instantiate the TimeSeries class with an instance of Redis or RedisCluster

The below examples use random temperature data captured every few minutes for different rooms in a house.

Create a few timeseries with different labels (create()):

import coredis
from datetime import datetime, timedelta

rooms = {"bedroom", "lounge", "bathroom"}
client = coredis.Redis(port=9379)

async with client:
    for room in rooms:
        assert await client.timeseries.create(f"temp:{room}", labels={"room": room})

Create compaction rules for hourly and daily averages (createrule()):

for room in rooms:
    assert await client.timeseries.create(
        f"temp:{room}:hourly:avg", labels={"room": room, "compaction": "hourly"}
    )
    assert await client.timeseries.create(
        f"temp:{room}:daily:avg", labels={"room": room, "compaction": "daily"}
    )

    assert await client.timeseries.createrule(
        f"temp:{room}", f"temp:{room}:hourly:avg",
        coredis.PureToken.AVG, timedelta(hours=1)
    )
    assert await client.timeseries.createrule(
        f"temp:{room}", f"temp:{room}:daily:avg",
        coredis.PureToken.AVG, timedelta(hours=24)
    )

Populate a year of random sample data (add()):

import random
cur = datetime.fromtimestamp(0)
async with client.pipeline(transaction=True) as pipe:
    while cur < datetime(1971, 1, 1, 0, 0, 0):
        cur += timedelta(minutes=random.randint(1, 60))
        for room in rooms:
            pipe.timeseries.add(f"temp:{room}", cur, random.randint(15, 30))

Query for the latest temperature in each room (get()):

for room in rooms:
    print(await client.timeseries.get(f"temp:{room}"))

Query for latest temperature in all rooms (mget()):

print(await client.timeseries.mget(
    filters=[f"room=({','.join(rooms)})", "compaction="])
)

Query for daily averages by individual room (range() & mrange()):

# using individual range queries on the compacted timeseries
for room in rooms:
    print(await client.timeseries.range(
        f"temp:{room}:daily:avg", 0, datetime(1971, 1, 1),
    ))

# using individual range queries with an aggregation on the original timeseries
for room in rooms:
    print(await client.timeseries.range(
        f"temp:{room}", 0, datetime(1971, 1, 1),
        aggregator=coredis.PureToken.AVG,
        bucketduration=timedelta(hours=24),
    ))

# using a multi range query on the compacted series
print(
    await client.timeseries.mrange(
        0, datetime(1971, 1, 1), filters=["compaction=daily"]
    )
)

# using a multi range query with aggregation on the original series
print(
    await client.timeseries.mrange(
        0, datetime(1971, 1, 1),
        aggregator=coredis.PureToken.AVG,
        bucketduration=timedelta(hours=24),
        filters=[f"room=({','.join(rooms)})", "compaction="]
    )
)

For more details refer to the API documentation for TimeSeries