From 16fb9d6be2d1fe28de09605506b4e9f850307ea9 Mon Sep 17 00:00:00 2001 From: Pat Buxton Date: Wed, 25 Oct 2023 17:57:23 +0100 Subject: [PATCH] Allow Redis Cluster connection --- redis_stomp/redis_connector.py | 34 ++++++++++++++++++++++++++++------ requirements.txt | 11 ++++++----- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/redis_stomp/redis_connector.py b/redis_stomp/redis_connector.py index b70a636..59584c0 100644 --- a/redis_stomp/redis_connector.py +++ b/redis_stomp/redis_connector.py @@ -3,9 +3,13 @@ from typing import List, Tuple, NamedTuple, Type from urllib import parse as urlparse -from redis.asyncio import Redis, Sentinel +import redis +from redis.asyncio import Redis, Sentinel, RedisCluster +from redis.asyncio.retry import Retry +from redis.backoff import FullJitterBackoff CLIENT_NAME = socket.gethostname().rsplit('-', 2)[0] +DEFAULT_RETRY_ERRORS = [TimeoutError, socket.timeout, redis.TimeoutError, redis.ConnectionError] # Needed for async version to add socket timeout class ParsedRedisURL(NamedTuple): hosts: List[Tuple[str, int]] @@ -127,12 +131,30 @@ def aio_connect(redis_url: str, read_only: bool = False, socket_timeout: float = redis.asyncio.client.Redis: Redis connection """ rinfo = parse_url(redis_url) - if rinfo.sentinel: + if rinfo.cluster: + host, port = rinfo.hosts[0] + cluster_retry = Retry(FullJitterBackoff(), 1) + cluster_retry.update_supported_errors(DEFAULT_RETRY_ERRORS) # can't pass retry_on_timeout to async cluster + return RedisCluster( + host=host, + port=port, + # client params + db=rinfo.database, + password=rinfo.password, + decode_responses=decode_responses, + socket_timeout=socket_timeout or rinfo.socket_timeout, + health_check_interval=30, + retry=cluster_retry, + client_name=CLIENT_NAME, + ) + elif rinfo.sentinel: # We're connecting to a sentinel cluster. sentinel_connection = Sentinel( rinfo.hosts, socket_timeout=socket_timeout or rinfo.socket_timeout, db=rinfo.database, password=rinfo.password, - health_check_interval=30, retry_on_timeout=True, + health_check_interval=30, + retry_on_timeout=True, # required for retry on socket timeout for the async class + retry=Retry(FullJitterBackoff(), 1), client_name=CLIENT_NAME, ) if read_only: @@ -156,8 +178,8 @@ def aio_connect(redis_url: str, read_only: bool = False, socket_timeout: float = db=rinfo.database, password=rinfo.password, decode_responses=decode_responses, - #socket_timeout=socket_timeout or rinfo.socket_timeout, - health_check_interval=30, - retry_on_timeout=True, + socket_timeout=socket_timeout or rinfo.socket_timeout, + retry_on_timeout=True, # required for retry on socket timeout for the async class + retry=Retry(FullJitterBackoff(), 1), client_name=CLIENT_NAME, ) diff --git a/requirements.txt b/requirements.txt index 57f51ce..1091f1c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ -anyio==3.7.1 +anyio==4.0.0 coilmq@git+https://github.com/rad-pat/coilmq.git@asyncio#egg=coilmq -redis==4.5.4 -starlette==0.31.0 -uvicorn==0.23.1 -websockets==10.4 +redis==5.0.1 +starlette==0.31.1 +uvicorn==0.23.2 +uvloop==0.19.0 +websockets==12.0