Skip to content

Commit

Permalink
Merge branch 'release/0.1.10'
Browse files Browse the repository at this point in the history
  • Loading branch information
berrytern committed Feb 17, 2023
2 parents d7f14c5 + 5386d90 commit 304bf7e
Show file tree
Hide file tree
Showing 14 changed files with 242 additions and 84 deletions.
1 change: 1 addition & 0 deletions amqp_client_python/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .rabbitmq import EventbusRabbitMQ, AsyncEventbusRabbitMQ, EventbusWrapperRabbitMQ
from .domain.models import Config, Options, SSLOptions
from pika import DeliveryMode
2 changes: 2 additions & 0 deletions amqp_client_python/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@
from .response_timeout_exception import ResponseTimeoutException
from .publish_timeout_exception import PublishTimeoutException
from .auto_reconnect_exception import AutoReconnectException
from .blocking_exception import BlockingException
from .thread_unsafe_exception import ThreadUnsafeException
10 changes: 10 additions & 0 deletions amqp_client_python/exceptions/blocking_exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .eventbus_exception import EventBusException


class BlockingException(EventBusException):
message: str = None
description: str = None

def __init__(self, message, description="") -> None:
self.message = message
self.description = description
10 changes: 10 additions & 0 deletions amqp_client_python/exceptions/thread_unsafe_exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .eventbus_exception import EventBusException


class ThreadUnsafeException(EventBusException):
message: str = None
description: str = None

def __init__(self, message, description="") -> None:
self.message = message
self.description = description
21 changes: 17 additions & 4 deletions amqp_client_python/rabbitmq/async_channel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import MutableMapping, Mapping
from typing import MutableMapping, Mapping, Optional, Union
from .async_channel_factory import AsyncChannelFactoryRabbitMQ
from amqp_client_python.exceptions import (
NackException,
Expand All @@ -9,7 +9,7 @@
)
from pika.adapters.asyncio_connection import AsyncioConnection
from pika.channel import Channel
from pika import BasicProperties
from pika import BasicProperties, DeliveryMode
from asyncio import AbstractEventLoop, Future, wait, wait_for, FIRST_COMPLETED
from functools import partial
from json import dumps, loads
Expand Down Expand Up @@ -314,6 +314,9 @@ async def rpc_client(
body,
content_type,
timeout,
delivery_mode=DeliveryMode.Transient,
expiration: Optional[Union[str, None]] = None,
**key_args,
):
future = self.ioloop.create_future()
message = dumps({"resource_name": routing_key, "handle": body})
Expand All @@ -332,7 +335,11 @@ async def rpc_client(
reply_to=self._callback_queue,
correlation_id=corr_id,
content_type=content_type,
delivery_mode=delivery_mode,
expiration=expiration,
**key_args,
),
mandatory=False,
)

def not_arrived(id):
Expand Down Expand Up @@ -377,7 +384,9 @@ async def publish(
body,
content_type: str,
timeout=5,
loop: AbstractEventLoop = None,
delivery_mode=DeliveryMode.Transient,
expiration: Optional[Union[str, None]] = None,
**key_args,
):
message = dumps({"handle": body})
self._channel.basic_publish(
Expand All @@ -387,7 +396,11 @@ async def publish(
properties=BasicProperties(
reply_to=self._callback_queue,
content_type=content_type,
delivery_mode=delivery_mode,
expiration=expiration,
**key_args,
),
mandatory=False,
)
if self.publisher_confirms:
publish_future = self.ioloop.create_future()
Expand All @@ -400,7 +413,7 @@ def not_arrived(id):
)

func = partial(not_arrived, self._message_number)
loop.call_later(timeout, func)
self.ioloop.call_later(timeout, func)
return await publish_future

def publish_confirmation(self, future: Future):
Expand Down
35 changes: 31 additions & 4 deletions amqp_client_python/rabbitmq/async_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def open(self, uri):

async def close(self):
if self.is_open:
self._closing = True
self._connection.close()

def on_connection_open(self, _unused_connection):
Expand All @@ -77,7 +78,7 @@ def on_connection_closed(self, _unused_connection, reason):
self._channel = None
if self._closing:
LOGGER.warn("connection closed intentionally")
self._connection.ioloop.stop()
# self._connection.ioloop.stop()
else:
LOGGER.warn(
f"Connection closed, reason: {reason}, will attempt a connection"
Expand Down Expand Up @@ -168,21 +169,47 @@ def run(self) -> None:
self._connection.ioloop.run_forever()

async def rpc_client(
self, exchange_name: str, routing_key: str, body, content_type, timeout
self,
exchange_name: str,
routing_key: str,
body,
content_type,
timeout,
delivery_mode,
expiration,
**kwargs,
):
return await self._channel.rpc_client(
exchange_name,
routing_key,
body,
content_type,
timeout,
delivery_mode,
expiration,
**kwargs,
)

async def publish(
self, exchange_name: str, routing_key: str, body, content_type, timeout
self,
exchange_name: str,
routing_key: str,
body,
content_type,
timeout,
delivery_mode,
expiration,
**kwargs,
):
return await self._channel.publish(
exchange_name, routing_key, body, content_type, timeout, loop=self.ioloop
exchange_name,
routing_key,
body,
content_type,
timeout,
delivery_mode,
expiration,
**kwargs,
)

async def rpc_subscribe(
Expand Down
41 changes: 29 additions & 12 deletions amqp_client_python/rabbitmq/async_eventbus_rabbitmq.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Any, List, Optional, Union
from .async_connection import AsyncConnection
from ..event import IntegrationEvent, IntegrationEventHandler
from amqp_client_python.domain.models import Config
from typing import Any, List
from asyncio import AbstractEventLoop
from pika import DeliveryMode


class AsyncEventbusRabbitMQ:
Expand All @@ -20,19 +21,19 @@ def __init__(
rpc_client_auto_ack=False,
rpc_server_auto_ack=False,
) -> None:
self.loop: AbstractEventLoop = loop
self._pub_connection = AsyncConnection(self.loop, pub_publisher_confirms)
self._loop: AbstractEventLoop = loop
self._pub_connection = AsyncConnection(self._loop, pub_publisher_confirms)
self._sub_connection = AsyncConnection(
self.loop, False, sub_prefetch_count, sub_auto_ack
self._loop, False, sub_prefetch_count, sub_auto_ack
)
self._rpc_client_connection = AsyncConnection(
self.loop,
self._loop,
rpc_client_publisher_confirms,
rpc_client_prefetch_count,
rpc_client_auto_ack,
)
self._rpc_server_connection = AsyncConnection(
self.loop,
self._loop,
rpc_server_publisher_confirms,
rpc_server_prefetch_count,
rpc_server_auto_ack,
Expand All @@ -47,10 +48,20 @@ async def rpc_client(
body: List[Any],
content_type="application/json",
timeout=5,
delivery_mode=DeliveryMode.Transient,
expiration: Optional[Union[str, None]] = None,
**kwargs
):
async def add_rpc_client():
return await self._rpc_client_connection.rpc_client(
exchange, routing_key, body, content_type=content_type, timeout=timeout
exchange,
routing_key,
body,
content_type,
timeout,
delivery_mode,
expiration,
**kwargs
)

self._rpc_client_connection.open(self.config.url)
Expand All @@ -62,17 +73,21 @@ async def publish(
routing_key: str,
body: List[Any],
content_type="application/json",
exchange_type: str = "direct",
exchange_durable=True,
timeout=5,
delivery_mode=DeliveryMode.Transient,
expiration: Optional[Union[str, None]] = None, # example: '60000' -> 60s
**kwargs
):
async def add_publish():
return await self._pub_connection.publish(
event.event_type,
routing_key,
body,
content_type=content_type,
timeout=timeout,
content_type,
timeout,
delivery_mode,
expiration,
**kwargs
)

self._pub_connection.open(self.config.url)
Expand Down Expand Up @@ -110,8 +125,10 @@ async def add_subscribe():
self._sub_connection.open(self.config.url)
await self._sub_connection.add_callback(add_subscribe)

async def dispose(self):
async def dispose(self, stop_event_loop=True):
await self._pub_connection.close()
await self._sub_connection.close()
await self._rpc_client_connection.close()
await self._rpc_server_connection.close()
if stop_event_loop:
self._loop.stop()
Loading

0 comments on commit 304bf7e

Please sign in to comment.