Skip to content

Commit

Permalink
Allow Redis Cluster connection
Browse files Browse the repository at this point in the history
  • Loading branch information
rad-pat committed Oct 25, 2023
1 parent e3baab4 commit 16fb9d6
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 11 deletions.
34 changes: 28 additions & 6 deletions redis_stomp/redis_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
from typing import List, Tuple, NamedTuple, Type
from urllib import parse as urlparse

from redis.asyncio import Redis, Sentinel
import redis
from redis.asyncio import Redis, Sentinel, RedisCluster
from redis.asyncio.retry import Retry
from redis.backoff import FullJitterBackoff

CLIENT_NAME = socket.gethostname().rsplit('-', 2)[0]
DEFAULT_RETRY_ERRORS = [TimeoutError, socket.timeout, redis.TimeoutError, redis.ConnectionError] # Needed for async version to add socket timeout

class ParsedRedisURL(NamedTuple):
hosts: List[Tuple[str, int]]
Expand Down Expand Up @@ -127,12 +131,30 @@ def aio_connect(redis_url: str, read_only: bool = False, socket_timeout: float =
redis.asyncio.client.Redis: Redis connection
"""
rinfo = parse_url(redis_url)
if rinfo.sentinel:
if rinfo.cluster:
host, port = rinfo.hosts[0]
cluster_retry = Retry(FullJitterBackoff(), 1)
cluster_retry.update_supported_errors(DEFAULT_RETRY_ERRORS) # can't pass retry_on_timeout to async cluster
return RedisCluster(
host=host,
port=port,
# client params
db=rinfo.database,
password=rinfo.password,
decode_responses=decode_responses,
socket_timeout=socket_timeout or rinfo.socket_timeout,
health_check_interval=30,
retry=cluster_retry,
client_name=CLIENT_NAME,
)
elif rinfo.sentinel:
# We're connecting to a sentinel cluster.
sentinel_connection = Sentinel(
rinfo.hosts, socket_timeout=socket_timeout or rinfo.socket_timeout,
db=rinfo.database, password=rinfo.password,
health_check_interval=30, retry_on_timeout=True,
health_check_interval=30,
retry_on_timeout=True, # required for retry on socket timeout for the async class
retry=Retry(FullJitterBackoff(), 1),
client_name=CLIENT_NAME,
)
if read_only:
Expand All @@ -156,8 +178,8 @@ def aio_connect(redis_url: str, read_only: bool = False, socket_timeout: float =
db=rinfo.database,
password=rinfo.password,
decode_responses=decode_responses,
#socket_timeout=socket_timeout or rinfo.socket_timeout,
health_check_interval=30,
retry_on_timeout=True,
socket_timeout=socket_timeout or rinfo.socket_timeout,
retry_on_timeout=True, # required for retry on socket timeout for the async class
retry=Retry(FullJitterBackoff(), 1),
client_name=CLIENT_NAME,
)
11 changes: 6 additions & 5 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
anyio==3.7.1
anyio==4.0.0
coilmq@git+https://github.com/rad-pat/coilmq.git@asyncio#egg=coilmq
redis==4.5.4
starlette==0.31.0
uvicorn==0.23.1
websockets==10.4
redis==5.0.1
starlette==0.31.1
uvicorn==0.23.2
uvloop==0.19.0
websockets==12.0

0 comments on commit 16fb9d6

Please sign in to comment.