Source code for coredis.recipes.credentials
from __future__ import annotations
from urllib.parse import ParseResult, urlencode, urlunparse
# aiobotocore, botocore, asyncache & cachetools will need to be installed in addition
# to coredis dependencies. These can also be requested by installing coredis
# as coredis[recipes]
import aiobotocore.session
from aiobotocore.signers import AioRequestSigner
from asyncache import cached
from botocore.model import ServiceId
from cachetools import TTLCache
from coredis.credentials import AbstractCredentialProvider, UserPass
[docs]
class ElastiCacheIAMProvider(AbstractCredentialProvider):
"""
Credential provider that uses IAM authentication
to connect to an Elasticache instance.
"""
def __init__(self, user: str, cluster_name: str, region: str = "us-east-1") -> None:
self.user: str = user
self.cluster_name: str = cluster_name
self.region: str = region
self.session = aiobotocore.session.get_session()
[docs]
@cached(cache=TTLCache(maxsize=128, ttl=900)) # type: ignore[untyped-decorator]
async def get_credentials(self) -> UserPass:
"""
Returns a short-lived token that can be used to connect to an
IAM enabled Elasticache instance. The token will be cached for
its lifetime (15 minutes) to avoid unnecessary requests.
"""
request_signer = AioRequestSigner(
ServiceId("elasticache"),
self.region,
"elasticache",
"v4",
await self.session.get_credentials(),
self.session.get_component("event_emitter"),
)
query_params = {"Action": "connect", "User": self.user}
url = urlunparse(
ParseResult(
scheme="https",
netloc=self.cluster_name,
path="/",
query=urlencode(query_params),
params="",
fragment="",
)
)
signed_url = await request_signer.generate_presigned_url(
{"method": "GET", "url": url, "body": {}, "headers": {}, "context": {}},
operation_name="connect",
expires_in=900,
region_name=self.region,
)
# Need to strip the protocol so that Elasticache accepts it
return UserPass(self.user, signed_url.removeprefix("https://"))