Redis Modules¶
coredis contains built in support for a few popular Redis Modules.
The commands exposed by the individual modules follow the same API conventions
as the core builtin commands and can be accessed via the appropriate
command group property (such as json
, cf
, timeseries
etc…)
of Redis
or RedisCluster
.
For example:
client = coredis.Redis()
# RedisJSON
await client.json.get("key")
# RediSearch
await client.search.search("index", "*")
# RedisBloom:BloomFilter
await client.bf.reserve("bf", 0.001, 1000)
# RedisBloom:CuckooFilter
await client.cf.reserve("cf", 1000)
# RedisTimeSeries
await client.timeseries.add("ts", 1, 1)
# RedisGraph
await client.graph.query("graph", "CREATE (:Node {name: 'Node'})")
Module commands can also be used in Pipelines (and transactions) by accessing them via the command group property in the same way as described above.
For example:
pipeline = await client.pipeline()
await pipeline.json.get("key")
await pipeline.json.get("key")
await pipeline.search.search("index", "*")
await pipeline.bf.reserve("bf", 0.001, 1000)
await pipeline.cf.reserve("cf", 1000)
await pipeline.timeseries.add("ts", 1, 1)
await pipeline.graph.query("graph", "CREATE (:Node {name: 'Node'})")
await pipeline.execute()
RedisJSON¶
RedisJSON
adds native support for storing and retrieving JSON documents.
To access the commands exposed by the module use the json
property
or manually instantiate the Json
class with an instance of
Redis
or RedisCluster
Get/set operations:
import coredis
client = coredis.Redis()
await client.json.set(
"key1", ".", {"a": 1, "b": [1, 2, 3], "c": "str"}
)
assert 1 == await client.json.get("key1", ".a")
assert [1,2,3] == await client.json.get("key1", ".b")
assert "str" == await client.json.get("key1", ".c")
await client.json.set("key2", ".", {"a": 2, "b": [4,5,6], "c": ["str"]})
# multi get
assert ["str", ["str"]] == await client.json.mget(["key1", "key2"], ".c")
Clear versus Delete:
await client.json.set(
"key1", ".", {"a": 1, "b": [1, 2, 3], "c": "str", "d": {"e": []}}
)
# a numeric value
assert 1 == await client.json.clear("key1", ".a")
assert 0 == await client.json.get("key1", ".a")
assert 1 == await client.json.delete("key1", ".a")
assert {"b", "c", "d"} == set(await client.json.objkeys("key1"))
# an array
assert 1 == await client.json.clear("key1", ".b")
assert [] == await client.json.get("key1", ".b")
assert 1 == await client.json.delete("key1", ".b")
assert {"d", "c"} == set(await client.json.objkeys("key1"))
# a string
assert 0 == await client.json.clear("key1", ".c")
assert "str" == await client.json.get("key1", ".c")
assert 1 == await client.json.delete("key1", ".c")
assert ["d"] == await client.json.objkeys("key1")
# an object
assert 1 == await client.json.clear("key1", ".d")
assert {} == await client.json.get("key1", ".d")
assert 1 == await client.json.delete("key1", ".d")
assert [] == await client.json.objkeys("key1")
Array operations:
await client.json.set("key", ".", [])
assert 1 == await client.json.arrappend("key", [1], ".")
assert 0 == await client.json.arrindex("key", ".", 1)
assert 3 == await client.json.arrappend("key", [2, 3], ".")
assert 4 == await client.json.arrinsert("key", ".", 0, [-1])
assert [-1, 1, 2, 3] == await client.json.get("key", ".")
For more details refer to the API documentation for Json
RediSearch¶
RediSearch
adds support for indexing hash
and json
datatypes and performing
search, aggregation, suggestion & autocompletion.
Search & Aggregation¶
To access the search & aggregation related commands exposed by the module use the
search
property or manually instantiate the Search
class with an instance of Redis
or RedisCluster
Create an Index¶
Since creating an index requires a non-trivial assembly of arguments that are sent to the
FT.CREATE, the coredis method create()
accepts a
collection of Field
instances as an argument to
schema
.
The example below creates two similar indices for json and hash data, that demonstrate some common field definitions:
import coredis
import coredis.modules
client = coredis.Redis(decode_responses=True)
# Create an index on json documents
await client.search.create("json_index", on=coredis.PureToken.JSON, schema = [
coredis.modules.search.Field('$.name', coredis.PureToken.TEXT, alias='name'),
coredis.modules.search.Field('$.country', coredis.PureToken.TEXT, alias='country'),
coredis.modules.search.Field('$.population', coredis.PureToken.NUMERIC, alias='population'),
coredis.modules.search.Field("$.location", coredis.PureToken.GEO, alias='location'),
coredis.modules.search.Field('$.iso_tags', coredis.PureToken.TAG, alias='iso_tags'),
coredis.modules.search.Field('$.summary_vector', coredis.PureToken.VECTOR, alias='summary_vector',
algorithm="FLAT",
attributes={
"DIM": 768,
"DISTANCE_METRIC": "COSINE",
"TYPE": "FLOAT32",
}
)
], prefixes=['json:city:'])
# or on all hashes that start with a prefix ``city:``
await client.search.create("hash_index", on=coredis.PureToken.HASH, schema = [
coredis.modules.search.Field('name', coredis.PureToken.TEXT),
coredis.modules.search.Field('country', coredis.PureToken.TEXT),
coredis.modules.search.Field('population', coredis.PureToken.NUMERIC),
coredis.modules.search.Field("location", coredis.PureToken.GEO),
coredis.modules.search.Field('iso_tags', coredis.PureToken.TAG, separator=","),
coredis.modules.search.Field('summary_vector', coredis.PureToken.VECTOR,
algorithm="FLAT",
attributes={
"DIM": 768,
"DISTANCE_METRIC": "COSINE",
"TYPE": "FLOAT32",
}
)
], prefixes=['city:'])
To populate the indices we can add some sample city data (a sample that can be used for the above index definition can be found in the coredis repository) using a pipeline for performance:
pipeline = await client.pipeline()
import requests
import numpy
cities = requests.get(
"https://raw.githubusercontent.com/alisaifee/coredis/master/tests/modules/data/city_index.json"
).json()
for name, fields in cities.items():
await pipeline.json.set(f"json:city:{name}", f".", {
"name": name,
"country": fields["country"],
"population": int(fields["population"]),
"location": f"{fields['lng']},{fields['lat']}",
"iso_tags": fields["iso_tags"],
"summary_vector": fields["summary_vector"],
})
await pipeline.hset(f"city:{name}", {
"name": name,
"country": fields["country"],
"population": fields["population"],
"location": f"{fields['lng']},{fields['lat']}",
"iso_tags": ",".join(fields["iso_tags"]),
"summary_vector": numpy.asarray(fields["summary_vector"]).astype(numpy.float32).tobytes(),
})
await pipeline.execute()
Note
Take special note of how the population
(numeric field), iso_tags
(tag field) & summary_vector
(vector field)
fields are handled differently in the case of hashes vs json documents.
Inspect the index information:
json_index_info = await client.search.info("json_index")
hash_index_info = await client.search.info("hash_index")
assert (0.0, 50) == (json_index_info["indexing"], json_index_info["num_docs"])
assert (0.0, 50) == (hash_index_info["indexing"], json_index_info["num_docs"])
Search¶
Searching for documents is done through the search()
function that provides the interface to the FT.SEARCH. The returned
search results are represented by the SearchResult
class.
Perform a simple text search:
results = await client.search.search("json_index", "Tok*", returns={"name": None, "country": "country"})
# or with the hash index
# results = await client.search.search("hash_index", "Tok*", returns={"name": None, "country": "country"})
assert results.total == 1
assert results.documents[0].properties["country"] == "Japan"
Perform a geo filtered search:
results = await client.search.search(
"json_index",
"*",
geo_filters={"location": ((67.0011, 24.8607), 1, coredis.PureToken.KM)},
returns={"name": None},
)
assert results.total == 1
assert results.documents[0].properties["name"] == "karachi"
Perform a vector similarity search:
from sentence_transformers import SentenceTransformer
query = SentenceTransformer(
"sentence-transformers/all-distilroberta-v1",
).encode("The fishing village called Edo").astype(numpy.float32).tobytes()
results = await client.search.search(
"json_index",
"*=>[KNN 1 @summary_vector $query_vec as query_score]",
parameters={"query_vec": query},
returns={"name": None},
dialect=2,
)
assert results.documents[0].properties["name"] == "tokyo"
Note
The vector similarity search example above uses a pre-trained sentence transformer model
to encode the query string into a vector. The query_vec
parameter is then passed to the
KNN
operator to perform a vector similarity search. This ofcourse requires that the summary_vector
field in the index was encoded using the same model (which is the case for the sample data referenced
in the earlier example when populating the index).
Aggregation¶
To perform aggregations use the aggregate()
method that provides the interface to the FT.AGGREGATE.
To simplify construction of transformation steps in the aggregation pipeline a few helper
dataclasses are provided to construct the pipeline steps for the transforms
parameter.
The results of the aggregation are represented by the SearchAggregationResult
class.
Group by country and count and sort by count desc:
aggregations = await client.search.aggregate(
"hash_index",
"*",
load="*",
transforms=[
# group by country=>count
coredis.modules.search.Group(
"@country", [
coredis.modules.search.Reduce("count", [0], "city_count"),
]
),
],
sortby={"@city_count": coredis.PureToken.DESC},
)
assert "China" == aggregations.results[0]["country"]
assert 18 == int(aggregations.results[0]["city_count"])
Filter, Group->Reduce, Apply, Group:
aggregations = await client.search.aggregate(
"hash_index",
"*",
load="*",
transforms=[
# include only cities with population greater than 20 million
coredis.modules.search.Filter(
'@population > 20000000'
),
# group by country=>{count, average_city_population}
coredis.modules.search.Group(
"@country", [
coredis.modules.search.Reduce("count", [0], "city_count"),
coredis.modules.search.Reduce("avg", [1, '@population'], "average_city_population")
]
),
# apply a transformation of average_city_population -> log10(average_city_population)
coredis.modules.search.Apply(
"floor(log(@average_city_population))",
"average_population_bucket"
),
# group by average_population_bucket=>countries
coredis.modules.search.Group(
"@average_population_bucket", [
coredis.modules.search.Reduce("tolist", [1, "@country"], "countries"),
]
),
],
)
assert aggregations.results[0] == {
'average_population_bucket': '16',
'countries': ['Brazil', 'South Korea', 'Egypt', 'Mexico']
}
assert aggregations.results[1] == {
'average_population_bucket': '17',
'countries': ['Japan', 'Indonesia', 'China', 'Philippines', 'India']
}
For more details refer to the API documentation for Search
Autocomplete¶
To access the commands from the SUGGEST
group of the RedisSearch
module
use the autocomplete
property or manually instantiate the
Autocomplete
class with an instance of Redis
or RedisCluster
Add some terms, each with an associated score to a group:
await client.autocomplete.sugadd("cities", "New Milton", 5.0)
await client.autocomplete.sugadd("cities", "New Port", 4.0)
await client.autocomplete.sugadd("cities", "New Richmond", 3.0)
await client.autocomplete.sugadd("cities", "New Albany", 2.0)
await client.autocomplete.sugadd("cities", "New York", 1.0)
Fetch some suggestions:
suggestions = await client.autocomplete.sugget("cities", "new")
assert 5 == len(suggestions)
assert "New Milton" == suggestions[0].string
suggestions = await client.autocomplete.sugget("cities", "new po")
assert "New Port" == suggestions[0].string
Boost the score of a term:
await client.autocomplete.sugadd("cities", "New York", 5.0, increment_score=True)
suggestions = await client.autocomplete.sugget("cities", "new")
assert "New York" == suggestions[0].string
For more details refer to the API documentation for Autocomplete
RedisGraph¶
RedisGraph
is a queryable Property Graph database that uses sparse matrices
to represent the adjacency matrix in graphs and linear algebra to query the graph.
To access the commands exposed by the module use the graph
property
or manually instantiate the Graph
class with an instance of
Redis
or RedisCluster
The main interface to the module is the query()
method
or ro_query()
for readonly queries. The response from the method
is an instance of GraphQueryResult
which
follows the described Result Structure.
Specifically:
Scalars are converted to native python types
Nodes are converted to instances of
GraphNode
Relations are converted to instances of
GraphRelation
Paths are converted to instances of
GraphPath
Adding nodes & relations¶
Using the example from the RedisGraph design documentation we can create a graph with multiple nodes and relationships in one query:
import coredis
client = coredis.Redis(decode_responses=True)
results = await client.graph.query(
"imdb",
"""
CREATE (aldis:actor {
name: "Aldis Hodge", birth_year: 1986
}),
(oshea:actor {
name: "OShea Jackson", birth_year: 1991
}),
(corey:actor {
name: "Corey Hawkins", birth_year: 1988
}),
(neil:actor {
name: "Neil Brown", birth_year: 1980
}),
(compton:movie {
title: "Straight Outta Compton", genre: "Biography", votes: 127258, rating: 7.9, year: 2015
}),
(neveregoback:movie {
title: "Never Go Back", genre: "Action", votes: 15821, rating: 6.4, year: 2016
}),
(aldis)-[:act]->(neveregoback),
(aldis)-[:act]->(compton),
(oshea)-[:act]->(compton),
(corey)-[:act]->(compton),
(neil)-[:act]->(compton)
"""
)
assert results.stats["Nodes created"] == 6
assert results.stats["Relationships created"] == 5
Querying the graph¶
Using the previous example, let’s query for a few scalar properties of the nodes:
results = await client.graph.query("imdb", """
MATCH (a:actor)-[:act]->(m:movie {
title:"Straight Outta Compton"
})
RETURN a.name, m.title
""")
assert len(results.result_set) == 4
assert ['Aldis Hodge', 'Straight Outta Compton'] == results.result_set[0]
Perform the same query but this time return the nodes:
results = await client.graph.query("imdb", """
MATCH (a:actor)-[:act]->(m:movie {
title:"Straight Outta Compton"
})
RETURN a, m
""")
assert len(results.result_set) == 4
actor = results.result_set[0][0]
movie = results.result_set[0][1]
assert actor.properties == {"name": "Aldis Hodge", "birth_year": 1986}
assert movie.properties == {
"title": "Straight Outta Compton", "genre": "Biography",
"votes": 127258, "rating": 7.9, "year": 2015
}
Fetch the entire matching path:
results = await client.graph.query("imdb", """
MATCH p=(a:actor)-[:act]->(m:movie {
title:"Straight Outta Compton"
})
RETURN p
""")
path = results.result_set[0][0].path
assert ["Aldis Hodge", "act", "Straight Outta Compton"] == [
path[0].properties["name"], # GraphNode: actor
path[1].type, # GraphRelation: act
path[2].properties["title"] # GraphNode: movie
]
Perform an aggregation using aggregate functions:
results = await client.graph.query("imdb", """
MATCH (a:actor)-[:act]->(m:movie {
title:"Straight Outta Compton"
})
RETURN m.title, SUM(2020-a.birth_year), MAX(2020-a.birth_year), MIN(2020-a.birth_year), AVG(2020-a.birth_year)
""")
assert ['Straight Outta Compton', 135.0, 40, 29, 33.75] == results.result_set[0]
For more details about supported commands in the module refer to the API documentation
for Graph
RedisBloom¶
The probabilistic datastructures exposed by RedisBloom
can be accessed
through coredis using the following properties (or explicitly by instantiating
the associated module exposed by the property):
BloomFilter¶
import coredis
client = coredis.Redis()
# create filter
await client.bf.reserve("filter", 0.1, 1000)
# add items
await client.bf.add("filter", 1)
await client.bf.madd("filter", [2,3,4])
# test for inclusion
assert await client.bf.exists("filter", 1)
assert (True, False) == await client.bf.mexists("filter", [2,5])
# or
assert await coredis.modules.BloomFilter(client).exists("filter", 1)
...
For more details refer to the API documentation for BloomFilter
CuckooFilter¶
import coredis
client = coredis.Redis()
# create filter
await client.cf.reserve("filter", 1000)
# add items
assert await client.cf.add("filter", 1)
assert not await client.cf.addnx("filter", 1)
# test for inclusion
assert await client.cf.exists("filter", 1)
assert 1 == await client.cf.count("filter", 1)
# delete an item
assert await client.cf.delete("filter", 1)
# test for inclusion
assert not await client.cf.exists("filter", 1)
assert 0 == await client.cf.count("filter", 1)
For more details refer to the API documentation for CuckooFilter
CountMinSketch¶
import coredis
client = coredis.Redis()
# create a sketch
await client.cms.initbydim("sketch", 2, 50)
# increment the counts for multiple entries
assert (1, 2) == await client.cms.incrby("sketch", {"a": 1, "b": 2})
# query the count for multiple entries
assert (1, 2, 0) == await client.cms.query("sketch", ["a", "b", "c"])
For more details refer to the API documentation for CountMinSketch
TopK¶
Redis.topk
/ RedisCluster.topk
import coredis
import string
import itertools
import random
client = coredis.Redis()
# create a top-3
await client.topk.reserve("top3", 3)
# add entries
letters = list(itertools.chain(*[k[0]*k[1] for k in list(enumerate(string.ascii_lowercase))]))
random.shuffle(letters)
await client.topk.add("top3", letters)
# get top 3 letters
assert (b'z', b'y', b'x') == await client.topk.list("top3")
For more details refer to the API documentation for TopK
TDigest¶
Redis.tdigest
/ RedisCluster.tdigest
import coredis
client = coredis.Redis()
# create a digest
await client.tdigest.create("digest")
# add some values
await client.tdigest.add("digest", 1, [1, 2, 3, 4])
# add some more values
await client.tdigest.add("digest", 1, [1, 2, 3, 4])
# get the rank & reverse ranks
assert (1.0, 1.0, 2.0) == await client.tdigest.byrank("digest", [0, 1, 2])
assert (6.0, 5.0, 4.0) == await client.tdigest.byrevrank("digest", [0, 1, 2])
# get the quantiles
assert (1.0, 3.0, 6.0) == await client.tdigest.quantile("digest", [0, 0.5, 1])
For more details refer to the API documentation for TDigest
RedisTimeSeries¶
RedisTimeSeries
adds a time series data structure to Redis that allows ingesting
and querying time series’.
To access the commands exposed by the module use the timeseries
property
or manually instantiate the TimeSeries
class with an instance of
Redis
or RedisCluster
The below examples use random temperature data captured every few minutes for different rooms in a house.
Create a few timeseries with different labels (create()
):
import coredis
from datetime import datetime, timedelta
rooms = {"bedroom", "lounge", "bathroom"}
client = coredis.Redis(port=9379)
for room in rooms:
assert await client.timeseries.create(f"temp:{room}", labels={"room": room})
Create compaction rules for hourly and daily averages (createrule()
):
for room in rooms:
assert await client.timeseries.create(
f"temp:{room}:hourly:avg", labels={"room": room, "compaction": "hourly"}
)
assert await client.timeseries.create(
f"temp:{room}:daily:avg", labels={"room": room, "compaction": "daily"}
)
assert await client.timeseries.createrule(
f"temp:{room}", f"temp:{room}:hourly:avg",
coredis.PureToken.AVG, timedelta(hours=1)
)
assert await client.timeseries.createrule(
f"temp:{room}", f"temp:{room}:daily:avg",
coredis.PureToken.AVG, timedelta(hours=24)
)
Populate a year of random sample data (add()
):
import random
cur = datetime.fromtimestamp(0)
pipeline = await client.pipeline()
while cur < datetime(1971, 1, 1, 0, 0, 0):
cur += timedelta(minutes=random.randint(1, 60))
for room in rooms:
await pipeline.timeseries.add(f"temp:{room}", cur, random.randint(15, 30))
await pipeline.execute()
Query for the latest temperature in each room (get()
):
for room in rooms:
print(await client.timeseries.get(f"temp:{room}"))
Query for latest temperature in all rooms (mget()
):
print(await client.timeseries.mget(
filters=[f"room=({','.join(rooms)})", "compaction="])
)
Query for daily averages by individual room (range()
& mrange()
):
# using individual range queries on the compacted timeseries
for room in rooms:
print(await client.timeseries.range(
f"temp:{room}:daily:avg", 0, datetime(1971, 1, 1),
))
# using individual range queries with an aggregation on the original timeseries
for room in rooms:
print(await client.timeseries.range(
f"temp:{room}", 0, datetime(1971, 1, 1),
aggregator=coredis.PureToken.AVG,
bucketduration=timedelta(hours=24),
))
# using a multi range query on the compacted series
print(
await client.timeseries.mrange(
0, datetime(1971, 1, 1), filters=["compaction=daily"]
)
)
# using a multi range query with aggregation on the original series
print(
await client.timeseries.mrange(
0, datetime(1971, 1, 1),
aggregator=coredis.PureToken.AVG,
bucketduration=timedelta(hours=24),
filters=[f"room=({','.join(rooms)})", "compaction="]
)
)
For more details refer to the API documentation for TimeSeries