Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements (at-least-once delivery, batch deletions, message visibility extensions) #770

Closed
wants to merge 10 commits into from

Conversation

calvinlfer
Copy link
Member

@calvinlfer calvinlfer commented Jan 8, 2025

Changes

  • Implement a high level at-least-once-delivery API (consumeChunkAtLeastOnce) that allows users to focus on processing messages and once successful, deleting the messages from the queue.
    • In addition, if processing takes long, automatic message extension requests are sent out to SQS (configurable) so other consumers won't see the message until processing finishes
    • Inspiration for this API was taken from FS2 Kafka's consumeChunk API
  • Improve SqsStream so deletes are batched reducing API calls (when autoDelete=true)
    • Improve performance by preserving internal chunking structure (replaced mapZIO)
  • Implement batch deletes
  • Implement message visibility extension (batch/single)
  • Tests for consumeChunkAtLeastOnce that ensure failed processing causes re-delivery and the automatic extension process is functioning correctly
  • Implement ZSink deleteMessageBatchSink which can be used with SqsStream with autoDelete=false to provide at-least-once delivery semantics so users can process their message and then delete it.

Addresses #587 and #259 including taking care of retries during batched deletes

Note: There are breaking changes to adhere to the AWS defaults in SqsSettings as well as removing an unneeded type parameter in Producer.make

… to focus on processing messages and once successful, deleting the messages from the queue. In addition, if processing takes long, automatic message extension requests are sent out to SQS so other consumers won't see the message until processing finishes

- Improve SqsStream so deletes are batched reducing API calls
- Implement batch deletes
- Implement message visibility extension (batch/single)
@calvinlfer calvinlfer requested a review from ghostdogpr as a code owner January 8, 2025 02:07
@calvinlfer
Copy link
Member Author

I've done testing against ElasticMQ, next up - I'm going to run some tests against AWS SQS and ensure everything works as expected (especially around the message visibility extension timeouts and batch deletes)

@calvinlfer
Copy link
Member Author

AWS SQS testing (Note: account removed):

  val producerLayer: RLayer[Sqs, Producer[String]] =
    ZLayer.scoped(
      Producer.make(
        queueUrl = "https://sqs.us-east-1.amazonaws.com/000/calqstandard",
        serializer = Serializer.serializeString,
        settings = ProducerSettings(parallelism = 1)
      )
    )

  val producerExample =
    ZIO.serviceWithZIO[Producer[String]] { producer =>
      producer.produceBatch(
        (1 to 1_000).map(i => ProducerEvent(s"Message $i"))
      )
    }

  val consumerExample =
    SqsStream.consumeChunkAtLeastOnce(
      queueUrl = "https://sqs.us-east-1.amazonaws.com/000/calqstandard",
      settings = SqsStreamSettings(
        maxNumberOfMessages = 10,
        visibilityTimeout = Some(5),
        waitTimeSeconds = Some(20)
      ),
      extensionSettings = SqsMessageLifetimeExtensionSettings.default,
      consumerParallelism = 2
    ) { messages =>
      ZIO.debug(messages.map(_.body.getOrElse(""))) *> ZIO.sleep(14.seconds)
    }

The queue settings are:
image

Notice that I'm purposely taking 14 seconds when the visibility timeout is 5 seconds so that the automatic extension process is renewing the lease on the message and keeping it invisible from other queue consumers.

image

Also did some checks to ensure that messages that are pulled from the queue won't reappear back on the queue due to the extension process that takes place in the background to minimize the change of duplicate processing.

@calvinlfer
Copy link
Member Author

For FIFO testing, having a parallelism > message group id uniqueness will cap the parallelism to the unique message group ids

image

  val producerLayer: RLayer[Sqs, Producer[String]] =
    ZLayer.scoped(
      Producer.make(
        queueUrl = "https://sqs.us-east-1.amazonaws.com/000/calq.fifo",
        serializer = Serializer.serializeString,
        settings = ProducerSettings(parallelism = 1)
      )
    )

  val producerExample =
    ZIO.serviceWithZIO[Producer[String]] { producer =>
      producer.produceBatch(
        (200 to 300).map(i =>
          ProducerEvent(
            data = s"Message $i",
            attributes = Map.empty,
            groupId = Some(
              if (i % 2 == 0) "even"
              else "odd"
            ),
            deduplicationId = None
          )
        )
      )
    }

  val consumerExample =
    SqsStream.consumeChunkAtLeastOnce(
      queueUrl = "https://sqs.us-east-1.amazonaws.com/000/calq.fifo",
      settings = SqsStreamSettings.default.copy(
        maxNumberOfMessages = Option(10),
        visibilityTimeout = Some(5),
        waitTimeSeconds = Some(20)
      ),
      extensionSettings = SqsMessageLifetimeExtensionSettings.default,
      consumerParallelism = 10
    ) { messages: Chunk[Message.ReadOnly] =>
      ZIO.debug(messages.map(_.body.getOrElse(""))) *> ZIO.sleep(14.seconds)
    }

Note how you only see at most 2 chunks being printed at a time. This is because of the message group id (even/odd -> 2 unique values). Note that even though consumer parallelism is set to 10. We are capped to 2 due to the amount of unique values.
stdout-fifo

Also note that the extension requests are working in the background because the queue settings are 5 seconds but the consumer takes 14 seconds to process them. Notice in the logs that there are no repeats printed

@calvinlfer calvinlfer force-pushed the feature/consume-chunk-api branch from f012e24 to c36c62b Compare January 9, 2025 01:13
Co-authored-by: ZIO Assistant <zio-assistant[bot]@users.noreply.github.com>
@calvinlfer calvinlfer closed this Jan 9, 2025
@calvinlfer calvinlfer deleted the feature/consume-chunk-api branch January 9, 2025 02:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant