Skip to content

Commit

Permalink
Merge branch 'release/0.1.6'
Browse files Browse the repository at this point in the history
  • Loading branch information
berrytern committed Feb 3, 2023
2 parents daa0f63 + 9c2fa40 commit bffdfa5
Show file tree
Hide file tree
Showing 16 changed files with 195 additions and 133 deletions.
2 changes: 1 addition & 1 deletion amqp_client_python/domain/models/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(
domain: str = "localhost",
port: int = 5672,
vhost: str = "/",
heartbeat: Optional[int] = 0,
heartbeat: Optional[int] = 60,
publisher_confirms = False,
**kwargs: Dict[str, Any]
) -> None:
Expand Down
2 changes: 2 additions & 0 deletions amqp_client_python/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
from .nack_exception import NackException
from .rpc_provider_exception import RpcProviderException
from .timeout_exception import TimeoutException
from .response_timeout_exception import ResponseTimeoutException
from .publish_timeout_exception import PublishTimeoutException
10 changes: 10 additions & 0 deletions amqp_client_python/exceptions/publish_timeout_exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .timeout_exception import TimeoutException


class PublishTimeoutException(TimeoutException):
message: str = None
description: str = None

def __init__(self, message, description="") -> None:
self.message = message
self.description = description
9 changes: 9 additions & 0 deletions amqp_client_python/exceptions/response_timeout_exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .timeout_exception import TimeoutException

class ResponseTimeoutException(TimeoutException):
message: str = None
description: str = None

def __init__(self, message, description="") -> None:
self.message = message
self.description = description
8 changes: 6 additions & 2 deletions amqp_client_python/rabbitmq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from .async_eventbus_rabbitmq import AsyncEventbusRabbitMQ
from .async_connection_factory import AsyncConnectionFactoryRabbitMQ
from .async_connection import AsyncConnection
from .async_channel_factory import AsyncChannelFactoryRabbitMQ
from .async_channel import AsyncChannel
from .eventbus_rabbitmq import EventbusRabbitMQ
from .connection_factory_rabbitmq import ConnectionFactoryRabbitMQ
from .connection_rabbitmq import ConnectionRabbitMQ
from .channel_factory_rabbitmq import ChannelFactoryRabbitMQ
from .channel_rabbitmq import ChannelRabbitMQ
from .eventbus_rabbitmq import EventbusRabbitMQ
from .async_eventbus_rabbitmq import AsyncEventbusRabbitMQ
123 changes: 75 additions & 48 deletions amqp_client_python/rabbitmq/async_channel.py

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion amqp_client_python/rabbitmq/async_channel_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@
class AsyncChannelFactoryRabbitMQ:

def create_channel(self, connection:AsyncioConnection, on_channel_open:callable):
print(connection, "channel")
return connection.channel(on_open_callback=on_channel_open)

39 changes: 21 additions & 18 deletions amqp_client_python/rabbitmq/async_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@
from .async_channel import AsyncChannel
from asyncio import AbstractEventLoop
from asyncio import sleep, get_event_loop
from amqp_client_python.utils import Logger
import logging


LOGGER = logging.getLogger(__name__)


class AsyncConnection:

def __init__(self, ioloop: AbstractEventLoop, publisher_confirms=False, prefetch_count=0) -> None:
def __init__(self, ioloop: AbstractEventLoop, publisher_confirms=False, prefetch_count=0, auto_ack=True) -> None:
self.ioloop = ioloop
self.publisher_confirms = publisher_confirms
self.logger = Logger.lib_logger
self.connection_factory = AsyncConnectionFactoryRabbitMQ()
self._connection: AsyncioConnection = None
self._prefetch_count = prefetch_count
self._channel = AsyncChannel(self.logger, self._prefetch_count)
self._auto_ack = auto_ack
self._channel = AsyncChannel(self._prefetch_count, self._auto_ack)
self._closing = False
self._consuming = False
self.reconnecting = False
Expand Down Expand Up @@ -44,13 +47,13 @@ async def close(self):
self._connection.close()

def on_connection_open(self, _unused_connection):
self.logger.debug(f"connection openned {self._channel}")
self._channel = AsyncChannel(self.logger, self._prefetch_count)
LOGGER.debug(f"connection openned {self._channel}")
self._channel = AsyncChannel(self._prefetch_count, self._auto_ack)
self._channel.publisher_confirms = self.publisher_confirms
self._channel.open(self._connection)

def on_connection_open_error(self, _unused_connection, err):
self.logger.warn(f"connection open error: {err}, will attempt a connection")
LOGGER.warn(f"connection open error: {err}, will attempt a connection")
self.reconnect()

def on_connection_closed(self, _unused_connection, reason):
Expand All @@ -63,10 +66,10 @@ def on_connection_closed(self, _unused_connection, reason):
"""
self._channel = None
if self._closing:
self.logger.warn("connection closed intentionally")
LOGGER.warn("connection closed intentionally")
self._connection.ioloop.stop()
else:
self.logger.warn(f"Connection closed, reason: {reason}, will attempt a connection")
LOGGER.warn(f"Connection closed, reason: {reason}, will attempt a connection")
self.reconnect()

def reconnect(self):
Expand Down Expand Up @@ -118,13 +121,13 @@ def stop(self):
"""
if not self._closing:
self._closing = True
self.logger.warn('Stopping intentionally')
LOGGER.warn('Stopping intentionally')
if self._consuming:
self.stop_consuming()
self._connection.ioloop.run_forever()
else:
self.ioloop.stop()
self.logger.warn('Stopped')
LOGGER.warn('Stopped')


def set_qos(self):
Expand All @@ -150,21 +153,21 @@ async def rpc_client(self, exchange_name: str, routing_key: str, body, content_t
async def publish(self, exchange_name: str, routing_key: str, body, content_type):
return await self._channel.publish(exchange_name, routing_key, body, content_type, loop=self.ioloop)

async def rpc_subscribe(self, queue_name, exchange_name, routing_key, callback, auto_ack):
async def rpc_subscribe(self, queue_name, exchange_name, routing_key, callback):
self.backup["rpc_subscribe"][routing_key] = {
"queue_name": queue_name, "exchange_name": exchange_name,
"routing_key": routing_key, "callback": callback, "auto_ack": auto_ack
"routing_key": routing_key, "callback": callback
}
await self._channel.rpc_subscribe(queue_name=queue_name, exchange_name=exchange_name,
routing_key=routing_key, callback=callback, auto_ack=auto_ack)
routing_key=routing_key, callback=callback)

async def subscribe(self, queue_name, exchange_name, routing_key, callback, auto_ack):
async def subscribe(self, queue_name, exchange_name, routing_key, callback):
self.backup["subscribe"][routing_key] = {
"queue_name": queue_name, "exchange_name": exchange_name,
"routing_key": routing_key, "callback": callback, "auto_ack": auto_ack
"routing_key": routing_key, "callback": callback
}
await self._channel.subscribe(queue_name=queue_name, exchange_name=exchange_name,
routing_key=routing_key, callback=callback, auto_ack=auto_ack)
await self._channel.subscribe(exchange_name=exchange_name, queue_name=queue_name,
routing_key=routing_key, callback=callback)

async def add_callback(self, callback, retry=3, delay=2):
while retry:
Expand Down
19 changes: 11 additions & 8 deletions amqp_client_python/rabbitmq/async_eventbus_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@

class AsyncEventbusRabbitMQ:

def __init__(self, config: Config, loop=None, pub_publisher_confirms=True, rpc_client_publisher_confirms=True, rpc_server_publisher_confirms=False, sub_prefetch_count=0, rpc_prefetch_count=0) -> None:
def __init__(self, config: Config, loop=None, pub_publisher_confirms=True, rpc_client_publisher_confirms=True, rpc_server_publisher_confirms=False,
sub_prefetch_count=0, rpc_client_prefetch_count=0,rpc_server_prefetch_count=0,
sub_auto_ack=False, rpc_client_auto_ack=False, rpc_server_auto_ack=False,
) -> None:
self.loop: AbstractEventLoop = loop
self._pub_connection = AsyncConnection(self.loop, pub_publisher_confirms)
self._sub_connection = AsyncConnection(self.loop, False, sub_prefetch_count)
self._rpc_client_connection = AsyncConnection(self.loop, rpc_client_publisher_confirms)
self._rpc_server_connection = AsyncConnection(self.loop, rpc_server_publisher_confirms, rpc_prefetch_count)
self._sub_connection = AsyncConnection(self.loop, False, sub_prefetch_count, sub_auto_ack)
self._rpc_client_connection = AsyncConnection(self.loop, rpc_client_publisher_confirms, rpc_client_prefetch_count, rpc_client_auto_ack)
self._rpc_server_connection = AsyncConnection(self.loop, rpc_server_publisher_confirms, rpc_server_prefetch_count, rpc_server_auto_ack)
self.config = config.build()
self._rpc_server_initialized = False

Expand All @@ -28,17 +31,17 @@ async def add_publish():
self._pub_connection.open(self.config.url)
return await self._pub_connection.add_callback(add_publish)

async def provide_resource(self, name: str, callback, auto_ack=False):
async def provide_resource(self, name: str, callback):
async def add_resource():
await self._rpc_server_connection.rpc_subscribe(self.config.options.rpc_queue_name, self.config.options.rpc_exchange_name,
name, callback, auto_ack)
name, callback)
self._rpc_server_connection.open(self.config.url)
await self._rpc_server_connection.add_callback(add_resource)

async def subscribe(self, event: IntegrationEvent, handler: IntegrationEventHandler, routing_key: str, auto_ack=False):
async def subscribe(self, event: IntegrationEvent, handler: IntegrationEventHandler, routing_key: str):
async def add_subscribe():
await self._sub_connection.subscribe(self.config.options.queue_name, event.event_type,
routing_key, handler.handle, auto_ack)
routing_key, handler.handle)
self._sub_connection.open(self.config.url)
await self._sub_connection.add_callback(add_subscribe)

Expand Down
25 changes: 14 additions & 11 deletions amqp_client_python/rabbitmq/channel_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
from uuid import uuid4
from json import loads, dumps
from concurrent.futures import Future, TimeoutError
import logging


LOGGER = logging.getLogger(__name__)


class ChannelRabbitMQ:
def __init__(self, logger) -> None:
def __init__(self) -> None:
self.channel_factory = ChannelFactoryRabbitMQ()
self.consumer_tag = None
self.logger = logger
self._channel = None
self._callback_queue = None
self.consumers = {}
Expand All @@ -38,7 +41,7 @@ def on_channel_open(self,channel, callback=None):
Since the channel is now open, we'll declare the exchange to use.
:param pika.channel.Channel channel: The channel object
"""
self.logger.debug('Channel opened')
LOGGER.debug('Channel opened')
self._channel:Channel = channel
self.add_on_channel_close_callback()
if callback:
Expand All @@ -50,7 +53,7 @@ def declare_exchange(self, exchange, exchange_type, durable=True, callback=None)
be invoked by pika.
:param str|unicode exchange_name: The name of the exchange to declare
"""
self.logger.debug('Declaring exchange %s', exchange)
LOGGER.debug('Declaring exchange %s', exchange)
# Note: using functools.partial is not required, it is demonstrating
# how arbitrary data can be passed to the callback when it is called
cb = partial(
Expand All @@ -66,7 +69,7 @@ def on_exchange_declareok(self, _unused_frame, userdata, callback=None):
:param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
:param str|unicode userdata: Extra user data (exchange name)
"""
self.logger.debug('Exchange declared: %s', userdata)
LOGGER.debug('Exchange declared: %s', userdata)
if callback:
callback()

Expand All @@ -76,20 +79,20 @@ def queue_declare(self, queue_name: str, durable=False, auto_delete=False, callb
be invoked by pika.
:param str|unicode queue_name: The name of the queue to declare.
"""
self.logger.debug('Declaring queue %s', queue_name)
LOGGER.debug('Declaring queue %s', queue_name)
self._channel.queue_declare(
queue=queue_name, durable=durable, auto_delete=auto_delete, callback=callback)

def add_on_return_callback(self, callback):
self.logger.debug('Adding channel return callback')
LOGGER.debug('Adding channel return callback')
self._channel.add_on_return_callback(callback)


def add_on_channel_close_callback(self):
"""This method tells pika to call the on_channel_closed method if
RabbitMQ unexpectedly closes the channel.
"""
self.logger.debug('Adding channel close callback')
LOGGER.debug('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)

def on_channel_closed(self, channel, reason):
Expand All @@ -101,7 +104,7 @@ def on_channel_closed(self, channel, reason):
:param pika.channel.Channel channel: The closed channel
:param Exception reason: why the channel was closed
"""
self.logger.warning('Channel %i was closed: %s', channel, reason)
LOGGER.warning('Channel %i was closed: %s', channel, reason)
if(isinstance(reason, tuple) and reason[0]==406):
self.open(self._connection, False)

Expand All @@ -122,7 +125,7 @@ def close_channel(self):
the Channel.Close RPC command.
"""
if self._channel is not None:
self.logger.debug('Closing the channel')
LOGGER.debug('Closing the channel')
self._channel.close()

def rpc_client(self, exchange, routing_key, message, content_type, future, timeout):
Expand Down Expand Up @@ -226,7 +229,7 @@ def serve_subscribe(self, ch:Channel, method, props, body):
body = loads(body)
self.consumers[method.routing_key]["handle"](*body["handle"])
except BaseException as err:
self.logger.error(err)
LOGGER.error(err)

def __on_response(self, ch:Channel, method, props, body, future=None):
if props.correlation_id in self.futures:
Expand Down
18 changes: 10 additions & 8 deletions amqp_client_python/rabbitmq/connection_rabbitmq.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from .connection_factory_rabbitmq import ConnectionFactoryRabbitMQ
from .channel_rabbitmq import ChannelRabbitMQ
from amqp_client_python.exceptions import EventBusException
from .channel_rabbitmq import ChannelRabbitMQ
from pika import SelectConnection, URLParameters
from amqp_client_python.utils import Logger
from .ioloop_factory import IOLoopFactory
import logging


LOGGER = logging.getLogger(__name__)


class ConnectionRabbitMQ:
_connection:SelectConnection
Expand All @@ -16,9 +19,8 @@ def __init__(self) -> None:
self.ioloop_factory = IOLoopFactory
self.ioloop_factory.add_reconnection(self.reconnect)
self._stopping = False
self._channel = ChannelRabbitMQ(Logger.lib_logger)
self._channel = ChannelRabbitMQ()
self._uri = None
self.logger = Logger.lib_logger
self.backup = {
"exchange": {},
"queue": {},
Expand All @@ -43,7 +45,7 @@ def reset(self):
self.ioloop.call_later(2,self.ioloop_factory.reset)

def reconnect(self):
self.logger.debug('reconnect %s', self.ioloop_is_open)
LOGGER.debug('reconnect %s', self.ioloop_is_open)
if not self.is_open():
self._connection = self._connectionFactory.create_connection(self._uri, self.on_connection_open, self.on_connection_open_error, self.on_connection_closed, custum_ioloop=self.ioloop)
self.add_callback(self.restore)
Expand Down Expand Up @@ -104,7 +106,7 @@ def on_connection_open(self, _unused_connection):
case we need it, but in this case, we'll just mark it unused.
:param pika.SelectConnection _unused_connection: The connection
"""
self.logger.debug('Connection opened')
LOGGER.debug('Connection opened')
if not self.channel_is_open():
self.channel_open()
#self.pause()
Expand All @@ -115,7 +117,7 @@ def on_connection_open_error(self, _unused_connection, err):
:param pika.SelectConnection _unused_connection: The connection
:param Exception err: The error
"""
self.logger.error('Connection open failed, reopening in 5 seconds: %s', err)
LOGGER.error('Connection open failed, reopening in 5 seconds: %s', err)
if self.ioloop_factory.running:
self.reset()
#else:
Expand All @@ -130,7 +132,7 @@ def on_connection_closed(self, _unused_connection, reason):
connection.
"""
if self._connection.is_closed:
self.logger.warning('Connection closed: %s', reason)
LOGGER.warning('Connection closed: %s', reason)
self.reset()

def add_callback(self, callback, event: str = "channel_open"):
Expand Down
1 change: 1 addition & 0 deletions amqp_client_python/rabbitmq/eventbus_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from asyncio import AbstractEventLoop
import asyncio


class EventbusRabbitMQ:

def __init__(self, config: Config) -> None:
Expand Down
1 change: 0 additions & 1 deletion amqp_client_python/utils/__init__.py

This file was deleted.

5 changes: 0 additions & 5 deletions amqp_client_python/utils/logger.py

This file was deleted.

Loading

0 comments on commit bffdfa5

Please sign in to comment.