diff --git a/amqp_client_python/__init__.py b/amqp_client_python/__init__.py index 0e581d5..521ba43 100644 --- a/amqp_client_python/__init__.py +++ b/amqp_client_python/__init__.py @@ -1,2 +1,3 @@ from .rabbitmq import EventbusRabbitMQ, AsyncEventbusRabbitMQ, EventbusWrapperRabbitMQ from .domain.models import Config, Options, SSLOptions +from pika import DeliveryMode diff --git a/amqp_client_python/exceptions/__init__.py b/amqp_client_python/exceptions/__init__.py index 96d36bb..999c07c 100644 --- a/amqp_client_python/exceptions/__init__.py +++ b/amqp_client_python/exceptions/__init__.py @@ -5,3 +5,5 @@ from .response_timeout_exception import ResponseTimeoutException from .publish_timeout_exception import PublishTimeoutException from .auto_reconnect_exception import AutoReconnectException +from .blocking_exception import BlockingException +from .thread_unsafe_exception import ThreadUnsafeException diff --git a/amqp_client_python/exceptions/blocking_exception.py b/amqp_client_python/exceptions/blocking_exception.py new file mode 100644 index 0000000..71473f7 --- /dev/null +++ b/amqp_client_python/exceptions/blocking_exception.py @@ -0,0 +1,10 @@ +from .eventbus_exception import EventBusException + + +class BlockingException(EventBusException): + message: str = None + description: str = None + + def __init__(self, message, description="") -> None: + self.message = message + self.description = description diff --git a/amqp_client_python/exceptions/thread_unsafe_exception.py b/amqp_client_python/exceptions/thread_unsafe_exception.py new file mode 100644 index 0000000..e4cbddf --- /dev/null +++ b/amqp_client_python/exceptions/thread_unsafe_exception.py @@ -0,0 +1,10 @@ +from .eventbus_exception import EventBusException + + +class ThreadUnsafeException(EventBusException): + message: str = None + description: str = None + + def __init__(self, message, description="") -> None: + self.message = message + self.description = description diff --git a/amqp_client_python/rabbitmq/async_channel.py b/amqp_client_python/rabbitmq/async_channel.py index 6fe8808..a74d9bd 100644 --- a/amqp_client_python/rabbitmq/async_channel.py +++ b/amqp_client_python/rabbitmq/async_channel.py @@ -1,4 +1,4 @@ -from typing import MutableMapping, Mapping +from typing import MutableMapping, Mapping, Optional, Union from .async_channel_factory import AsyncChannelFactoryRabbitMQ from amqp_client_python.exceptions import ( NackException, @@ -9,7 +9,7 @@ ) from pika.adapters.asyncio_connection import AsyncioConnection from pika.channel import Channel -from pika import BasicProperties +from pika import BasicProperties, DeliveryMode from asyncio import AbstractEventLoop, Future, wait, wait_for, FIRST_COMPLETED from functools import partial from json import dumps, loads @@ -314,6 +314,9 @@ async def rpc_client( body, content_type, timeout, + delivery_mode=DeliveryMode.Transient, + expiration: Optional[Union[str, None]] = None, + **key_args, ): future = self.ioloop.create_future() message = dumps({"resource_name": routing_key, "handle": body}) @@ -332,7 +335,11 @@ async def rpc_client( reply_to=self._callback_queue, correlation_id=corr_id, content_type=content_type, + delivery_mode=delivery_mode, + expiration=expiration, + **key_args, ), + mandatory=False, ) def not_arrived(id): @@ -377,7 +384,9 @@ async def publish( body, content_type: str, timeout=5, - loop: AbstractEventLoop = None, + delivery_mode=DeliveryMode.Transient, + expiration: Optional[Union[str, None]] = None, + **key_args, ): message = dumps({"handle": body}) self._channel.basic_publish( @@ -387,7 +396,11 @@ async def publish( properties=BasicProperties( reply_to=self._callback_queue, content_type=content_type, + delivery_mode=delivery_mode, + expiration=expiration, + **key_args, ), + mandatory=False, ) if self.publisher_confirms: publish_future = self.ioloop.create_future() @@ -400,7 +413,7 @@ def not_arrived(id): ) func = partial(not_arrived, self._message_number) - loop.call_later(timeout, func) + self.ioloop.call_later(timeout, func) return await publish_future def publish_confirmation(self, future: Future): diff --git a/amqp_client_python/rabbitmq/async_connection.py b/amqp_client_python/rabbitmq/async_connection.py index a261986..00c55f3 100644 --- a/amqp_client_python/rabbitmq/async_connection.py +++ b/amqp_client_python/rabbitmq/async_connection.py @@ -52,6 +52,7 @@ def open(self, uri): async def close(self): if self.is_open: + self._closing = True self._connection.close() def on_connection_open(self, _unused_connection): @@ -77,7 +78,7 @@ def on_connection_closed(self, _unused_connection, reason): self._channel = None if self._closing: LOGGER.warn("connection closed intentionally") - self._connection.ioloop.stop() + # self._connection.ioloop.stop() else: LOGGER.warn( f"Connection closed, reason: {reason}, will attempt a connection" @@ -168,7 +169,15 @@ def run(self) -> None: self._connection.ioloop.run_forever() async def rpc_client( - self, exchange_name: str, routing_key: str, body, content_type, timeout + self, + exchange_name: str, + routing_key: str, + body, + content_type, + timeout, + delivery_mode, + expiration, + **kwargs, ): return await self._channel.rpc_client( exchange_name, @@ -176,13 +185,31 @@ async def rpc_client( body, content_type, timeout, + delivery_mode, + expiration, + **kwargs, ) async def publish( - self, exchange_name: str, routing_key: str, body, content_type, timeout + self, + exchange_name: str, + routing_key: str, + body, + content_type, + timeout, + delivery_mode, + expiration, + **kwargs, ): return await self._channel.publish( - exchange_name, routing_key, body, content_type, timeout, loop=self.ioloop + exchange_name, + routing_key, + body, + content_type, + timeout, + delivery_mode, + expiration, + **kwargs, ) async def rpc_subscribe( diff --git a/amqp_client_python/rabbitmq/async_eventbus_rabbitmq.py b/amqp_client_python/rabbitmq/async_eventbus_rabbitmq.py index 7c6e21a..b5cec99 100644 --- a/amqp_client_python/rabbitmq/async_eventbus_rabbitmq.py +++ b/amqp_client_python/rabbitmq/async_eventbus_rabbitmq.py @@ -1,8 +1,9 @@ +from typing import Any, List, Optional, Union from .async_connection import AsyncConnection from ..event import IntegrationEvent, IntegrationEventHandler from amqp_client_python.domain.models import Config -from typing import Any, List from asyncio import AbstractEventLoop +from pika import DeliveryMode class AsyncEventbusRabbitMQ: @@ -20,19 +21,19 @@ def __init__( rpc_client_auto_ack=False, rpc_server_auto_ack=False, ) -> None: - self.loop: AbstractEventLoop = loop - self._pub_connection = AsyncConnection(self.loop, pub_publisher_confirms) + self._loop: AbstractEventLoop = loop + self._pub_connection = AsyncConnection(self._loop, pub_publisher_confirms) self._sub_connection = AsyncConnection( - self.loop, False, sub_prefetch_count, sub_auto_ack + self._loop, False, sub_prefetch_count, sub_auto_ack ) self._rpc_client_connection = AsyncConnection( - self.loop, + self._loop, rpc_client_publisher_confirms, rpc_client_prefetch_count, rpc_client_auto_ack, ) self._rpc_server_connection = AsyncConnection( - self.loop, + self._loop, rpc_server_publisher_confirms, rpc_server_prefetch_count, rpc_server_auto_ack, @@ -47,10 +48,20 @@ async def rpc_client( body: List[Any], content_type="application/json", timeout=5, + delivery_mode=DeliveryMode.Transient, + expiration: Optional[Union[str, None]] = None, + **kwargs ): async def add_rpc_client(): return await self._rpc_client_connection.rpc_client( - exchange, routing_key, body, content_type=content_type, timeout=timeout + exchange, + routing_key, + body, + content_type, + timeout, + delivery_mode, + expiration, + **kwargs ) self._rpc_client_connection.open(self.config.url) @@ -62,17 +73,21 @@ async def publish( routing_key: str, body: List[Any], content_type="application/json", - exchange_type: str = "direct", - exchange_durable=True, timeout=5, + delivery_mode=DeliveryMode.Transient, + expiration: Optional[Union[str, None]] = None, # example: '60000' -> 60s + **kwargs ): async def add_publish(): return await self._pub_connection.publish( event.event_type, routing_key, body, - content_type=content_type, - timeout=timeout, + content_type, + timeout, + delivery_mode, + expiration, + **kwargs ) self._pub_connection.open(self.config.url) @@ -110,8 +125,10 @@ async def add_subscribe(): self._sub_connection.open(self.config.url) await self._sub_connection.add_callback(add_subscribe) - async def dispose(self): + async def dispose(self, stop_event_loop=True): await self._pub_connection.close() await self._sub_connection.close() await self._rpc_client_connection.close() await self._rpc_server_connection.close() + if stop_event_loop: + self._loop.stop() diff --git a/amqp_client_python/rabbitmq/eventbus_wrapper_rabbitmq.py b/amqp_client_python/rabbitmq/eventbus_wrapper_rabbitmq.py index 5f7d67b..961ed83 100644 --- a/amqp_client_python/rabbitmq/eventbus_wrapper_rabbitmq.py +++ b/amqp_client_python/rabbitmq/eventbus_wrapper_rabbitmq.py @@ -1,9 +1,10 @@ from .async_eventbus_rabbitmq import AsyncEventbusRabbitMQ from ..event import IntegrationEvent, IntegrationEventHandler +from ..exceptions import BlockingException, ThreadUnsafeException from amqp_client_python.domain.models import Config from typing import Any, List -from threading import Thread -import asyncio +from threading import Thread, current_thread +from asyncio import new_event_loop, run_coroutine_threadsafe from concurrent.futures import Future @@ -22,10 +23,10 @@ def __init__( rpc_client_auto_ack=False, rpc_server_auto_ack=False, ) -> None: - self.loop = loop or asyncio.new_event_loop() - self.async_eventbus = AsyncEventbusRabbitMQ( + self._loop = loop or new_event_loop() + self._async_eventbus = AsyncEventbusRabbitMQ( config, - self.loop, + self._loop, pub_publisher_confirms, rpc_client_publisher_confirms, rpc_server_publisher_confirms, @@ -36,8 +37,8 @@ def __init__( rpc_client_auto_ack, rpc_server_auto_ack, ) - self.thread = Thread(target=self.loop.run_forever) - self.thread.start() + self._thread = Thread(target=self._loop.run_forever) + self._thread.start() def rpc_client( self, @@ -47,11 +48,15 @@ def rpc_client( content_type="application/json", timeout=5, ) -> Future: - return asyncio.run_coroutine_threadsafe( - self.async_eventbus.rpc_client( + if self._thread.ident == current_thread().ident: + raise BlockingException( + "Cannot run sync blocking call on async thread, try to use async methods with an await expression" + ) + return run_coroutine_threadsafe( + self._async_eventbus.rpc_client( exchange, routing_key, body, content_type, timeout ), - self.loop, + self._loop, ) async def async_rpc_client( @@ -62,7 +67,11 @@ async def async_rpc_client( content_type="application/json", timeout=5, ): - await self.async_eventbus.rpc_client( + if self._thread.ident != current_thread().ident: + raise ThreadUnsafeException( + "Cannot run async call on this thread, try to use sync thread safe methods" + ) + return await self._async_eventbus.rpc_client( exchange, routing_key, body, content_type, timeout ) @@ -76,7 +85,11 @@ async def async_publish( exchange_durable=True, timeout=5, ): - await self.async_eventbus.publish( + if self._thread.ident != current_thread().ident: + raise ThreadUnsafeException( + "Cannot run async call on this thread, try to use sync thread safe methods" + ) + await self._async_eventbus.publish( event, routing_key, body, @@ -95,11 +108,15 @@ def publish( exchange_type: str = "direct", exchange_durable=True, ) -> Future: - return asyncio.run_coroutine_threadsafe( - self.async_eventbus.publish( + if self._thread.ident == current_thread().ident: + raise BlockingException( + "Cannot run sync blocking call on async thread, try to use async methods with an await expression" + ) + return run_coroutine_threadsafe( + self._async_eventbus.publish( event, routing_key, body, content_type, exchange_type, exchange_durable ), - self.loop, + self._loop, ) def subscribe( @@ -109,21 +126,59 @@ def subscribe( routing_key: str, response_timeout: int = None, ) -> Future: - return asyncio.run_coroutine_threadsafe( - self.async_eventbus.subscribe( + if self._thread.ident == current_thread().ident: + raise BlockingException( + "Cannot run sync blocking call on async thread, try to use async methods with an await expression" + ) + return run_coroutine_threadsafe( + self._async_eventbus.subscribe( event, handler, routing_key, response_timeout ), - self.loop, + self._loop, + ) + + async def async_subscribe( + self, + event: IntegrationEvent, + handler: IntegrationEventHandler, + routing_key: str, + response_timeout: int = None, + ): + if self._thread.ident != current_thread().ident: + raise ThreadUnsafeException( + "Cannot run async call on this thread, try to use sync thread safe methods" + ) + await self._async_eventbus.subscribe( + event, handler, routing_key, response_timeout ) def provide_resource( self, name: str, callback, response_timeout: int = None ) -> Future: - return asyncio.run_coroutine_threadsafe( - self.async_eventbus.provide_resource(name, callback, response_timeout), - self.loop, + if self._thread.ident == current_thread().ident: + raise BlockingException( + "Cannot run sync blocking call on async thread, try to use async methods with an await expression" + ) + return run_coroutine_threadsafe( + self._async_eventbus.provide_resource(name, callback, response_timeout), + self._loop, ) + async def async_provide_resource( + self, name: str, callback, response_timeout: int = None + ): + if self._thread.ident != current_thread().ident: + raise ThreadUnsafeException( + "Cannot run async call on this thread, try to use sync thread safe methods" + ) + await self._async_eventbus.provide_resource(name, callback, response_timeout) + def dispose(self): - asyncio.run_coroutine_threadsafe(self.async_eventbus.dispose(), self.loop) - self.loop.stop() + run_coroutine_threadsafe(self._async_eventbus.dispose(), self._loop) + + async def async_dispose(self, stop_event_loop=True): + if self._thread.ident != current_thread().ident: + raise ThreadUnsafeException( + "Cannot run async call on this thread, try to use sync thread safe methods" + ) + await self._async_eventbus.dispose(stop_event_loop=stop_event_loop) diff --git a/amqp_client_python/rabbitmq/ioloop_factory.py b/amqp_client_python/rabbitmq/ioloop_factory.py index 04897d2..74d4641 100644 --- a/amqp_client_python/rabbitmq/ioloop_factory.py +++ b/amqp_client_python/rabbitmq/ioloop_factory.py @@ -1,5 +1,7 @@ from pika.adapters.select_connection import IOLoop -from time import time, sleep +from time import time + + class IOLoopFactory: ioloop = IOLoop() last_creation = time() @@ -9,7 +11,7 @@ class IOLoopFactory: @classmethod def get_ioloop(cls): return cls.ioloop - + @classmethod def reset(cls): cls.last_creation = time() @@ -18,18 +20,17 @@ def reset(cls): cls.ioloop = IOLoop() cls._reconnect() cls.start() - + @classmethod def start(cls): if not cls.running: - cls.running=True + cls.running = True cls.ioloop.start() @classmethod def add_reconnection(cls, call): cls._reconnection_list.append(call) - + @classmethod def _reconnect(cls): [reconnect() for reconnect in cls._reconnection_list] - \ No newline at end of file diff --git a/examples/sync_wrapper_case.py b/examples/sync_wrapper_case.py index 44f6d74..fd89f3d 100644 --- a/examples/sync_wrapper_case.py +++ b/examples/sync_wrapper_case.py @@ -1,52 +1,60 @@ from amqp_client_python import EventbusWrapperRabbitMQ, Config, Options from amqp_client_python.event import IntegrationEvent, IntegrationEventHandler -from examples.default import queue, rpc_queue, rpc_exchange, rpc_routing_key -from random import randint - - -class ExampleEvent(IntegrationEvent): - EVENT_NAME: str = "ExampleEvent" - ROUTING_KEY: str = rpc_routing_key - - def __init__(self, event_type: str, message = []) -> None: - super().__init__(self.EVENT_NAME, event_type) - self.message = message - self.routing_key = self.ROUTING_KEY - - +from default import queue, rpc_queue, rpc_exchange, rpc_routing_key config = Config(Options(queue, rpc_queue, rpc_exchange)) eventbus = EventbusWrapperRabbitMQ(config=config) + class ExampleEvent(IntegrationEvent): EVENT_NAME: str = "ExampleEvent" - def __init__(self, event_type: str, message = []) -> None: + + def __init__(self, event_type: str, message=[]) -> None: super().__init__(self.EVENT_NAME, event_type) self.message = message + + class ExampleEventHandler(IntegrationEventHandler): async def handle(self, body) -> None: - print(body,"subscribe") + print(body, "subscribe") + async def handle(*body): print(body[0], "rpc_provider") + result: bytes = await eventbus.async_rpc_client( + rpc_exchange, rpc_routing_key + "3", [body[0]] + ) + print("...") + return result + + +async def handle2(*body): + print(body[0], "rpc_provider2") return f"{body[0]}".encode("utf-8") + subscribe_event = ExampleEvent(rpc_exchange) publish_event = ExampleEvent(rpc_exchange, ["message"]) subscribe_event_handle = ExampleEventHandler() -eventbus.provide_resource(rpc_routing_key+"2", handle).result() eventbus.subscribe(subscribe_event, subscribe_event_handle, rpc_routing_key).result() +eventbus.provide_resource(rpc_routing_key + "2", handle).result() +eventbus.provide_resource(rpc_routing_key + "3", handle2).result() count = 0 running = True while running: try: count += 1 - if str(count) != eventbus.rpc_client(rpc_exchange, rpc_routing_key+"2", [f"{count}"]).result().decode("utf-8"): + if str(count) != eventbus.rpc_client( + rpc_exchange, rpc_routing_key + "2", [f"{count}"] + ).result().decode("utf-8"): running = False eventbus.publish(publish_event, rpc_routing_key, "direct").result() - #running = False + # running = False except KeyboardInterrupt: - running=False + running = False except BaseException as err: - print("Err:", err) \ No newline at end of file + running = False + print("Err:", err) + +eventbus.dispose() diff --git a/pyproject.toml b/pyproject.toml index 93186da..f40c2b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,12 +1,20 @@ [tool.poetry] name = "amqp-client-python" -version = "0.1.9" +version = "0.1.10" description = "Python AMQP Client Library" license = "Apache-2.0" authors = ["NUTES UEPB "] readme = "README.md" packages = [{include = "amqp_client_python"}] +homepage = "https://github.com/nutes-uepb/amqp-client-python" +repository = "https://github.com/nutes-uepb/amqp-client-python" + +keywords = ["packaging", "dependency", "amqp-client-python"] + +[tool.poetry.urls] +"Bug Tracker" = "https://github.com/nutes-uepb/amqp-client-python/issues" + [project] maintainers = [ {name = "João Pedro M. Cariry", email = "berrytern@gmail.com"} diff --git a/tests/unit/eventbus/test_async_eventbus.py b/tests/unit/eventbus/test_async_eventbus.py index 1aad293..7c271cf 100644 --- a/tests/unit/eventbus/test_async_eventbus.py +++ b/tests/unit/eventbus/test_async_eventbus.py @@ -1,7 +1,7 @@ from amqp_client_python import AsyncEventbusRabbitMQ from asyncio import iscoroutinefunction import pytest -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, Mock @pytest.mark.asyncio_cooperative @@ -10,16 +10,22 @@ async def test_async_eventbus_build_config(config_mock): config_mock.build.assert_called_once() +@pytest.mark.parametrize("stop_ioloop", [True, False]) @pytest.mark.asyncio_cooperative -async def test_async_eventbus_dispose(config_mock): +async def test_async_eventbus_dispose(config_mock, stop_ioloop): eventbus = AsyncEventbusRabbitMQ(config_mock) + eventbus._loop = Mock() eventbus._pub_connection = AsyncMock() eventbus._sub_connection = AsyncMock() eventbus._rpc_client_connection = AsyncMock() eventbus._rpc_server_connection = AsyncMock() assert iscoroutinefunction(eventbus.dispose) - assert await eventbus.dispose() is None + assert await eventbus.dispose(stop_ioloop) is None eventbus._pub_connection.close.assert_called_once() eventbus._sub_connection.close.assert_called_once() eventbus._rpc_client_connection.close.assert_called_once() eventbus._rpc_server_connection.close.assert_called_once() + if stop_ioloop: + eventbus._loop.stop.assert_called() + else: + eventbus._loop.stop.assert_not_called() diff --git a/tests/unit/eventbus/test_async_eventbus_publish.py b/tests/unit/eventbus/test_async_eventbus_publish.py index 83a0261..f318314 100644 --- a/tests/unit/eventbus/test_async_eventbus_publish.py +++ b/tests/unit/eventbus/test_async_eventbus_publish.py @@ -1,4 +1,4 @@ -from amqp_client_python import AsyncEventbusRabbitMQ +from amqp_client_python import AsyncEventbusRabbitMQ, DeliveryMode from amqp_client_python.event import IntegrationEvent from asyncio import iscoroutinefunction from tests.unit.eventbus.default import async_add_callback @@ -35,8 +35,6 @@ async def test_async_eventbus_publish_deep(async_connection_mock, config_mock): exchange, routing_key, body, - exchange_type, - exchange_durable, content_type, timeout, ) = ( @@ -44,8 +42,6 @@ async def test_async_eventbus_publish_deep(async_connection_mock, config_mock): "ex_example", "rk_example", ["content"], - "ex_type", - "ex_durable", "text", 4, ) @@ -62,8 +58,6 @@ async def test_async_eventbus_publish_deep(async_connection_mock, config_mock): routing_key, body, content_type, - exchange_type, - exchange_durable, timeout, ) is not None @@ -72,5 +66,11 @@ async def test_async_eventbus_publish_deep(async_connection_mock, config_mock): eventbus._pub_connection.open.assert_called_once_with(config_mock.build().url) # test if will try when connection and channel is open eventbus._pub_connection.publish.assert_called_once_with( - event.event_type, routing_key, body, content_type=content_type, timeout=timeout + event.event_type, + routing_key, + body, + content_type, + timeout, + DeliveryMode.Transient, + None, ) diff --git a/tests/unit/eventbus/test_async_eventbus_rpc_client.py b/tests/unit/eventbus/test_async_eventbus_rpc_client.py index d5933bc..3549622 100644 --- a/tests/unit/eventbus/test_async_eventbus_rpc_client.py +++ b/tests/unit/eventbus/test_async_eventbus_rpc_client.py @@ -1,4 +1,4 @@ -from amqp_client_python import AsyncEventbusRabbitMQ +from amqp_client_python import AsyncEventbusRabbitMQ, DeliveryMode from asyncio import iscoroutinefunction from tests.unit.eventbus.default import async_add_callback import pytest @@ -55,5 +55,5 @@ async def test_async_eventbus_rpc_client_deep(async_connection_mock, config_mock ) # test if will try when connection and channel is open eventbus._rpc_client_connection.rpc_client.assert_called_once_with( - exchange, routing_key, body, content_type=content_type, timeout=timeout + exchange, routing_key, body, content_type, timeout, DeliveryMode.Transient, None )