From b32b3cec21a0c1ffb829b4473e1f6eceb5157d2d Mon Sep 17 00:00:00 2001 From: Abraham Chavez Date: Fri, 3 Nov 2023 21:31:59 -0700 Subject: [PATCH] Promote Queues to QueueService and enforce its use in Health --- scripts/manage_queues.py | 10 ++++---- src/azul/azulclient.py | 6 ++--- src/azul/health.py | 23 ++++++++++--------- .../{queues.py => service/queue_service.py} | 2 +- terraform/cloudwatch.tf.json.template.py | 6 ++--- test/indexer/test_indexer_controller.py | 6 +++-- 6 files changed, 28 insertions(+), 25 deletions(-) rename src/azul/{queues.py => service/queue_service.py} (99%) diff --git a/scripts/manage_queues.py b/scripts/manage_queues.py index 03393abfec..ef222e2f55 100644 --- a/scripts/manage_queues.py +++ b/scripts/manage_queues.py @@ -5,8 +5,8 @@ from azul.logging import ( configure_script_logging, ) -from azul.queues import ( - Queues, +from azul.service.queue_service import ( + QueueService, ) logger = logging.getLogger(__name__) @@ -65,7 +65,7 @@ def main(argv): args = p.parse_args(argv) if args.command in ('list', 'purge', 'purge_all'): - queues = Queues() + queues = QueueService() if args.command == 'list': queues.list() elif args.command == 'purge': @@ -75,7 +75,7 @@ def main(argv): else: assert False, args.command elif args.command in ('dump', 'dump_all'): - queues = Queues(delete=args.delete, json_body=args.json_body) + queues = QueueService(delete=args.delete, json_body=args.json_body) if args.command == 'dump': queues.dump(args.queue, args.path) elif args.command == 'dump_all': @@ -83,7 +83,7 @@ def main(argv): else: assert False, args.command elif args.command == 'feed': - queues = Queues(delete=args.delete) + queues = QueueService(delete=args.delete) queues.feed(args.path, args.queue, force=args.force) else: p.print_usage() diff --git a/src/azul/azulclient.py b/src/azul/azulclient.py index 61e8963132..760e048ff8 100644 --- a/src/azul/azulclient.py +++ b/src/azul/azulclient.py @@ -58,8 +58,8 @@ from azul.plugins import ( RepositoryPlugin, ) -from azul.queues import ( - Queues, +from azul.service.queue_service import ( + QueueService, ) from azul.types import ( JSON, @@ -434,7 +434,7 @@ def deindex(self, catalog: CatalogName, sources: Iterable[str]): @cached_property def queues(self): - return Queues() + return QueueService() def reset_indexer(self, catalogs: Iterable[CatalogName], diff --git a/src/azul/health.py b/src/azul/health.py index 59c70746b1..49e6c177a4 100644 --- a/src/azul/health.py +++ b/src/azul/health.py @@ -41,15 +41,15 @@ from azul.chalice import ( AppController, ) -from azul.deployment import ( - aws, -) from azul.es import ( ESClientFactory, ) from azul.plugins import ( MetadataPlugin, ) +from azul.service.queue_service import ( + QueueService, +) from azul.service.storage_service import ( StorageService, ) @@ -200,24 +200,25 @@ def queues(self): """ Returns information about the SQS queues used by the indexer. """ - sqs = aws.resource('sqs') + sqs = QueueService() response = {'up': True} - for queue in config.all_queue_names: + for queue_name in config.all_queue_names: try: - queue_instance = sqs.get_queue_by_name(QueueName=queue).attributes + queue = sqs.get_queues([queue_name]) except ClientError as ex: - response[queue] = { + response[queue_name] = { 'up': False, 'error': ex.response['Error']['Message'] } response['up'] = False else: - response[queue] = { + queue = queue[queue_name].attributes + response[queue_name] = { 'up': True, 'messages': { - 'delayed': int(queue_instance['ApproximateNumberOfMessagesDelayed']), - 'invisible': int(queue_instance['ApproximateNumberOfMessagesNotVisible']), - 'queued': int(queue_instance['ApproximateNumberOfMessages']) + 'delayed': int(queue['ApproximateNumberOfMessagesDelayed']), + 'invisible': int(queue['ApproximateNumberOfMessagesNotVisible']), + 'queued': int(queue['ApproximateNumberOfMessages']) } } return response diff --git a/src/azul/queues.py b/src/azul/service/queue_service.py similarity index 99% rename from src/azul/queues.py rename to src/azul/service/queue_service.py index d156f807ff..689cbafa72 100644 --- a/src/azul/queues.py +++ b/src/azul/service/queue_service.py @@ -54,7 +54,7 @@ Queue = Any # place-holder for boto3's SQS queue resource -class Queues: +class QueueService: def __init__(self, delete=False, json_body=True): self._delete = delete diff --git a/terraform/cloudwatch.tf.json.template.py b/terraform/cloudwatch.tf.json.template.py index 9c9356cc84..63ec440404 100644 --- a/terraform/cloudwatch.tf.json.template.py +++ b/terraform/cloudwatch.tf.json.template.py @@ -13,8 +13,8 @@ from azul.modules import ( load_app_module, ) -from azul.queues import ( - Queues, +from azul.service.queue_service import ( + QueueService, ) from azul.terraform import ( emit_tf, @@ -45,7 +45,7 @@ def prod_qualified_resource_name(name: str) -> str: resource, _, suffix = config.unqualified_resource_name_and_suffix(name) return config.qualified_resource_name(resource, suffix=suffix, stage='prod') - queues = Queues() + queues = QueueService() qualified_resource_names = [ *config.all_queue_names, *queues.functions_by_queue().values() diff --git a/test/indexer/test_indexer_controller.py b/test/indexer/test_indexer_controller.py index 96dc227056..9fc0a12fa6 100644 --- a/test/indexer/test_indexer_controller.py +++ b/test/indexer/test_indexer_controller.py @@ -31,7 +31,6 @@ from azul import ( config, - queues, ) from azul.azulclient import ( AzulClient, @@ -58,6 +57,9 @@ DSSSourceRef, Plugin, ) +from azul.service.queue_service import ( + QueueService, +) from azul.types import ( JSONs, ) @@ -92,7 +94,7 @@ def setUp(self) -> None: self.controller = IndexController(app=app) app.catalog = self.catalog IndexController.index_service.fset(self.controller, self.index_service) - self.queue_manager = queues.Queues(delete=True) + self.queue_manager = QueueService(delete=True) def tearDown(self): self.index_service.delete_indices(self.catalog)