diff --git a/amqp_client_python/domain/models/config.py b/amqp_client_python/domain/models/config.py index d384762..e383090 100644 --- a/amqp_client_python/domain/models/config.py +++ b/amqp_client_python/domain/models/config.py @@ -20,7 +20,7 @@ def build(self): } if bool(self.ssl_options): opt["ssl_options"] = { - "certfile": self.ssl_options.ca_certs_path, + "certfile": self.ssl_options.certfile_path, "keyfile": self.ssl_options.keyfile_path, "ca_certs": self.ssl_options.ca_certs_path, } diff --git a/amqp_client_python/rabbitmq/eventbus_rabbitmq.py b/amqp_client_python/rabbitmq/eventbus_rabbitmq.py index d9133ee..16039ee 100644 --- a/amqp_client_python/rabbitmq/eventbus_rabbitmq.py +++ b/amqp_client_python/rabbitmq/eventbus_rabbitmq.py @@ -165,12 +165,14 @@ def after_channel_openned(): self.event_loop.add_callback_threadsafe(rpc_server_setup) - def dispose(self): - if isinstance(self.pub_connection, ConnectionRabbitMQ): + def dispose(self, stop_event_loop=True): + if self.pub_connection.is_open(): self.pub_connection.close() - if isinstance(self.sub_connection, ConnectionRabbitMQ): + if self.sub_connection.is_open(): self.sub_connection.close() - if isinstance(self.rpc_client_connection, ConnectionRabbitMQ): + if self.rpc_client_connection.is_open(): self.rpc_client_connection.close() - if isinstance(self.rpc_server_connection, ConnectionRabbitMQ): + if self.rpc_server_connection.is_open(): self.rpc_server_connection.close() + if stop_event_loop: + self.event_loop.add_callback_threadsafe(self.event_loop.stop) diff --git a/examples/sync_case.py b/examples/sync_case.py new file mode 100644 index 0000000..64435ea --- /dev/null +++ b/examples/sync_case.py @@ -0,0 +1,52 @@ +from amqp_client_python import EventbusRabbitMQ, Config, Options +from amqp_client_python.event import IntegrationEvent, IntegrationEventHandler +from examples.default import queue, rpc_queue, rpc_exchange + + +# rpc_client call inside rpc_provider +# if __name__ == "__main__": + +try: + config = Config(Options(queue, rpc_queue, rpc_exchange)) + eventbus = EventbusRabbitMQ(config) + + def handle(*body): + print(f"body1: {body}") + return b"response" + + def handle3(*body): + print(body) + + class ExampleEventHandler(IntegrationEventHandler): + event_type = rpc_exchange + + def handle(self, *body) -> None: + print(body) + + class ExampleEvent(IntegrationEvent): + EVENT_NAME: str = "NAME" + + def __init__(self, event_type: str, message=[]) -> None: + super().__init__(self.EVENT_NAME, event_type) + self.message = message + + publish_event = ExampleEvent(rpc_exchange, ["message"]) + event_handle = ExampleEventHandler() + + eventbus.provide_resource("user.find", handle) + eventbus.subscribe(publish_event, event_handle, "user.find3") + count = 0 + running = True + while running: + try: + count += 1 + result = eventbus.rpc_client(rpc_exchange, "user.find", ["content_message"]) + print("returned:", result) + eventbus.publish(publish_event, "user.find3") + except BaseException as err: + running = False + print(f"err: {err}") +except (KeyboardInterrupt, BaseException) as err: + print("err", err) + +eventbus.dispose() diff --git a/tests/unit/config/test_uri.py b/tests/unit/config/test_uri.py new file mode 100644 index 0000000..8a89763 --- /dev/null +++ b/tests/unit/config/test_uri.py @@ -0,0 +1,37 @@ +from amqp_client_python import Config, Options, SSLOptions +from pika import URLParameters + + +def test_url(): + user, passwd, host, port = "user", "passwd", "ipdomain", 1234 + config = Config( + Options( + "example", + "rpc_queue", + "rpc_exchange", + f"amqp://{user}:{passwd}@{host}:{port}/", + ) + ) + assert config.url is None + config.build() + config.url: URLParameters + assert config.url is not None + assert isinstance(config.url, URLParameters) + assert config.url.host == host + assert config.url.port == port + assert config.url.credentials.username == user + assert config.url.credentials.password == passwd + + +def test_ssl(): + config = Config( + Options("example", "rpc_queue", "rpc_exchange"), + SSLOptions( + "./tests/unit/utils/rabbitmq_cert.pem", + "./tests/unit/utils/rabbitmq_key.pem", + "./tests/unit/utils/ca.pem", + ), + ) + assert config.ssl_options is not None + config.build() + assert config.url.ssl_options is not None diff --git a/tests/unit/eventbus/test_async_eventbus_provide_resource.py b/tests/unit/eventbus/test_async_eventbus_provide_resource.py index ad47ca6..ca8d775 100644 --- a/tests/unit/eventbus/test_async_eventbus_provide_resource.py +++ b/tests/unit/eventbus/test_async_eventbus_provide_resource.py @@ -22,7 +22,7 @@ async def handle(*body): config_mock.build().url ) # test if will try when connection and channel is open - eventbus._rpc_server_connection.add_callback.called_once() + eventbus._rpc_server_connection.add_callback.assert_called_once() assert len(eventbus._rpc_server_connection.add_callback.call_args.args) == 1 iscoroutinefunction(eventbus._rpc_server_connection.add_callback.call_args.args[0]) diff --git a/tests/unit/eventbus/test_async_eventbus_publish.py b/tests/unit/eventbus/test_async_eventbus_publish.py index f318314..56689df 100644 --- a/tests/unit/eventbus/test_async_eventbus_publish.py +++ b/tests/unit/eventbus/test_async_eventbus_publish.py @@ -23,7 +23,7 @@ async def test_async_eventbus_publish_surface(async_connection_mock, config_mock # test connection will be open eventbus._pub_connection.open.assert_called_once_with(config_mock.build().url) # test if will try when connection and channel is open - eventbus._pub_connection.add_callback.called_once() + eventbus._pub_connection.add_callback.assert_called_once() assert len(eventbus._pub_connection.add_callback.call_args.args) == 1 iscoroutinefunction(eventbus._pub_connection.add_callback.call_args.args[0]) diff --git a/tests/unit/eventbus/test_async_eventbus_rpc_client.py b/tests/unit/eventbus/test_async_eventbus_rpc_client.py index 3549622..f6f6920 100644 --- a/tests/unit/eventbus/test_async_eventbus_rpc_client.py +++ b/tests/unit/eventbus/test_async_eventbus_rpc_client.py @@ -26,7 +26,7 @@ async def test_async_eventbus_rpc_client_surface(async_connection_mock, config_m config_mock.build().url ) # test if will try when connection and channel is open - eventbus._rpc_client_connection.add_callback.called_once() + eventbus._rpc_client_connection.add_callback.assert_called_once() assert len(eventbus._rpc_client_connection.add_callback.call_args.args) == 1 iscoroutinefunction(eventbus._rpc_client_connection.add_callback.call_args.args[0]) diff --git a/tests/unit/utils/ca.pem b/tests/unit/utils/ca.pem new file mode 100644 index 0000000..b3c256a --- /dev/null +++ b/tests/unit/utils/ca.pem @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDLTCCAhWgAwIBAgIUXi+oPVUVWLAtyJaDpl7e+0CzsXQwDQYJKoZIhvcNAQEL +BQAwHjEcMBoGA1UECgwTSEFOSW9ULENOPUhBTklvVCBDQTAeFw0yMzAyMDYxMzAy +NTlaFw0zMzAyMDMxMzAyNTlaMB4xHDAaBgNVBAoME0hBTklvVCxDTj1IQU5Jb1Qg +Q0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCr89iR6l0olBCBR1/j +Rk96ThvAI8REnnOM1BqJfKtqqHmUsRJK4Ok5iAcn0piPSoq5YebO+E1nPdheePTm +kdrPBiJ4Tcbghhg1HZ15fEKy6NKxLFrEwlVTxZ/VPQlUq8B5w3Sl5+ByQVSSFUkx +fjEA/3GfGNLoLq89YK4hOEVN+kCylFEKReQev0Dxt1Gwoe2jNKw0FrXURWci+cq8 +Dr82gTOR8nVG7NCcQooImJxcSzEGuPkLXiWHGawMzOo+2jfDldw0H+RDnn38W8rg +SyN5ZZmnRZVA7JYqN7Cpsces2RPLr6nxeUDpbfPLEZl1ZoirU3YvAbgCRGdbNAJQ +HThJAgMBAAGjYzBhMB0GA1UdDgQWBBTEaGzMbijWCG7BlmZuqxnpnF4GFDAfBgNV +HSMEGDAWgBTEaGzMbijWCG7BlmZuqxnpnF4GFDAPBgNVHRMBAf8EBTADAQH/MA4G +A1UdDwEB/wQEAwIBxjANBgkqhkiG9w0BAQsFAAOCAQEAaCfnj+nBTLfT4d9552g8 +V2uGKwSwKy79kaVsLBot9BsB5JuKqGWn6OVj03Ml3kqGiV4wXZAZ+QZ+4pMAq1UT +5Zdd9CZZ6YoakO12EEGqCokf+TXU4YasdZls0iTnxlV2kYSR8mVojMHl3aBeVzNW +VSZ00ExnerEwXB1XsyaTPi8YHAKVKbLKDKIarjS89TNRQETKfOCprnhhVpTQ8IwX +ZXj1oKqc8ioke8UhTOcirOmBOoJlLtzBs3fin7FOOUJe9KS6qylfAo8N2h7tgT/z +6GrICdaSokhT9quDALpzy5Dth0Eb2FHApILorHEpgXw0bGY3CZQWPXHeI6av/fzN +3w== +-----END CERTIFICATE----- diff --git a/tests/unit/utils/rabbitmq_cert.pem b/tests/unit/utils/rabbitmq_cert.pem new file mode 100644 index 0000000..0130b5a --- /dev/null +++ b/tests/unit/utils/rabbitmq_cert.pem @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDKzCCAhOgAwIBAgIUHp6PIrZ+VRJaKIKaNNJNtbeYslowDQYJKoZIhvcNAQEL +BQAwHjEcMBoGA1UECgwTSEFOSW9ULENOPUhBTklvVCBDQTAeFw0yMzAyMDYxMzAy +NTlaFw0zMzAyMDMxMzAyNTlaMCoxFzAVBgNVBAoMDmRldmljZS1tYW5hZ2VyMQ8w +DQYDVQQDDAZIQU5Jb1QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDA +1EDkW9Ljwy1DsacoGAJNf9pXFr8B4nfnaMeE0uHutuo+PetO2Jgb0kZi9EQhns4k +jliDj7lpNccwzjgI0EVhOBb36nrwBABL3L005GXPryn9vbCMiep8Tfpzl5/VABfq +aKuWGz63NV7iFX3Ey34M3cooGlhqxuIisiI8qq74bDSZZOiMLWtbdIq+uybez8qk +C2C8bo0ntgri/ynVDgpHbrpXY5tQdsne81o+UAsUc1KpbaeNRnBcUxU+wbUxjkHo +LFrapY90DsgzUT0i1RVcFxY0cJXPx1i9iJkfb3ZzEWrUakMIr+e5B0v5AeLDjByN +rTlNb9By9jsnmDBGXbQtAgMBAAGjVTBTMAkGA1UdEwQCMAAwCwYDVR0PBAQDAgWg +MBMGA1UdJQQMMAoGCCsGAQUFBwMCMCQGA1UdEQQdMBuHBH8AAAGCCWxvY2FsaG9z +dIIIcmFiYml0bXEwDQYJKoZIhvcNAQELBQADggEBACtOO6VSjLIiGYxfmodvWPfr +srceRyWESLGmOhYpifxpcAzeoV6nEY4KTg8V7DIgUF73umieJ4S5Gy1yjjoqNvWo +lKoHcl2hUgZI+Z57+44e+tywNcErpjm54T0EZO4hpGqLiEm3CsfRqOgVV32xXPYG +BqwfLKqyB+MsofQWhy72aOXkpdHZ3vHyYxdvj9Dj/2XVNmSTSQksF9Fr1M7Uv7NN +UeLEkUgKp3sYFKifFGXf6M5gLhxqq5htWoI46VhZKWqqbJlFhEUcVzu9OS0NpPTL +thBiipLw1qVroCXSUfJkvXcUtviYElEoTmjBdvZITzo9OTnFQN3r588jXqqq7pA= +-----END CERTIFICATE----- diff --git a/tests/unit/utils/rabbitmq_key.pem b/tests/unit/utils/rabbitmq_key.pem new file mode 100644 index 0000000..c44619f --- /dev/null +++ b/tests/unit/utils/rabbitmq_key.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAwNRA5FvS48MtQ7GnKBgCTX/aVxa/AeJ352jHhNLh7rbqPj3r +TtiYG9JGYvREIZ7OJI5Yg4+5aTXHMM44CNBFYTgW9+p68AQAS9y9NORlz68p/b2w +jInqfE36c5ef1QAX6mirlhs+tzVe4hV9xMt+DN3KKBpYasbiIrIiPKqu+Gw0mWTo +jC1rW3SKvrsm3s/KpAtgvG6NJ7YK4v8p1Q4KR266V2ObUHbJ3vNaPlALFHNSqW2n +jUZwXFMVPsG1MY5B6Cxa2qWPdA7IM1E9ItUVXBcWNHCVz8dYvYiZH292cxFq1GpD +CK/nuQdL+QHiw4wcja05TW/QcvY7J5gwRl20LQIDAQABAoIBAQCINebY6Jwh038n +8FgZlEwcHpJHLoQieq0kQ/mcM7LxAziYWvjbK1jXXkPmJpHyBdGsNPkFhgL89MYO +Db27TBOWRx7CZVoOLWVgMT9eG3Bnsl5BJTuPKuT+hb7C5Ho85eH+7Y03bWtx0zuB +DJlY6D3ULN9eUK3gTzjyNU7F9/O8+siFVuwzMIETiV2MQv3ndaQGKUZmH/zc5MVG +JvGBMKd1eByAT3wQRpGGPMnTZdgn3iggJXt0Ln30yR6o4fO7NUmMUWCtg9UjRcB6 +jkfIjfg2ORm899PXoPo/OgfGlUJyrccf3KIjyRsa7n8873h7wViXCS4Fw74A3kiq +3BzkEhyBAoGBAOHr2WgryXi6Rj5JYU3Bc0B1slpoO6Ot13YqaVWJyEZ4SbZfkIs/ +h+gKPqC9tgShtjUHb75+r8W8bIETpuXYWaJul5NJoKLbMbojT112Ax72XqXoJXad +3Cgf7V5b3S33ih4Az99cOnbpP/v9quMffSPfFqqIsY5GNw0+kAtoHuOdAoGBANqA +g0Dhw3J9u17uJH61tkgLevEen5GOpxq0bPRPrzogK7Iczu92nOthzfOZIk5OEu2q +O8fkAkFGAEjieNNS1FrOoCHlx0BeXRKpZAnGYW+17ffS5JrEwSQ0moQkWXxR5fvT +tDa/JJEd8KRLHCF6FnImKtodBf5JY9QQzGDmQhXRAoGADPYrEv7wIC0PuqMbIgrn +Qdt+0BEK2ukuY2krgezVwLVGBWCWOmb6tSVhUneP6dQdUA2NK7C5BO86im1GNgmj +mt4ddCXVQYKx56v+8a3DoBMiewJGo/eKmgK66575oZmCIxaI1pfEAu5+7UYwd7dm +xpgWBNKy3SbKIchlG0JuA0ECgYA5BnE67bDkoIvWKrC7oREBnH1pkGR0yJY0EgKd +gG5q7Rp6UapwNLSfedcWTFD3vsiR3mvbr1YnUu2gF/sQq/1f9a01K5Lk6bvAsxS4 +uF2VmhWRKCkhe1gs6s2ozxPLrlQndQkDgL86YGX9etzMn+BbyjpE0m5N/zlej0PH +xlBZAQKBgQDe9n7OtNpo9xP4jLtk9yXBugD7CDdCSlCJJmfnaNQsCEaPBWYfexbO +OpZojRAPpr/bEWe0HVOLHbeniDUP+Pmv8ZiSHaQscITGh/HUUSbH39oRAkAjc99E +xL15Y1a9RIon6i8T0Wi6j9zJMtq4jRi1XMyStzggtSe9944qS6R47w== +-----END RSA PRIVATE KEY-----