Skip to content

Commit

Permalink
Fix up docs and examples
Browse files Browse the repository at this point in the history
  • Loading branch information
calvinlfer committed Jan 9, 2025
1 parent b3dafd7 commit 0fd8274
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 11 deletions.
4 changes: 3 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object ProducerConsumerExample extends ZIOAppDefault {
_ <- ZIO.scoped(producer.flatMap(_.sendStream(stream).runDrain))
_ <- SqsStream(
queueUrl,
SqsStreamSettings(stopWhenQueueEmpty = true, waitTimeSeconds = Some(3))
SqsStreamSettings.default.withStopWhenQueueEmpty(true).withWaitTimeSeconds(3)
).foreach(msg => Console.printLine(msg.body))
} yield ()

Expand All @@ -59,3 +59,5 @@ object ProducerConsumerExample extends ZIOAppDefault {
)
}
```

Check out the [examples](../zio-sqs/src/test/scala/examples) folder for more examples.
23 changes: 15 additions & 8 deletions zio-sqs/src/test/scala/examples/AtLeastOnceExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import zio.sqs._
import zio.aws.sqs.model.Message

object AtLeastOnceExample extends ZIOAppDefault {
val queueUrl = "https://sqs.us-east-1.amazonaws.com/00000/calq.fifo"

val producerLayer: RLayer[Sqs, Producer[String]] =
ZLayer.scoped(
Producer.make(
queueUrl = "https://sqs.us-east-1.amazonaws.com/000/calq.fifo",
queueUrl = queueUrl,
serializer = Serializer.serializeString,
settings = ProducerSettings(parallelism = 1)
)
Expand All @@ -25,7 +26,7 @@ object AtLeastOnceExample extends ZIOAppDefault {
val producerExample =
ZIO.serviceWithZIO[Producer[String]] { producer =>
producer.produceBatch(
(200 to 300).map(i =>
(0 to 200).map(i =>
ProducerEvent(
data = s"Message $i",
attributes = Map.empty,
Expand All @@ -41,18 +42,24 @@ object AtLeastOnceExample extends ZIOAppDefault {

val consumerExample =
SqsStream.consumeChunkAtLeastOnce(
queueUrl = "https://sqs.us-east-1.amazonaws.com/000/calq.fifo",
settings = SqsStreamSettings.default.copy(
maxNumberOfMessages = Option(10),
visibilityTimeout = Some(5),
waitTimeSeconds = Some(20)
),
queueUrl = queueUrl,
settings = SqsStreamSettings.default
.withMaxNumberOfMessages(10)
.withVisibilityTimeout(5)
.withWaitTimeSeconds(20),
extensionSettings = SqsMessageLifetimeExtensionSettings.default,
consumerParallelism = 10
) { (messages: Chunk[Message.ReadOnly]) =>
ZIO.debug(messages.map(_.body.getOrElse(""))) *> ZIO.sleep(14.seconds)
}

val consumerStreamExample =
SqsStream(
queueUrl = queueUrl,
settings = SqsStreamSettings.default.withAutoDelete(false)
).tap(message => ZIO.debug(message.body.getOrElse("")))
.run(SqsStream.deleteMessageBatchSink(queueUrl))

override val run: ZIO[Environment with ZIOAppArgs with Scope, Any, Any] =
(producerExample *> consumerExample)
.provide(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import zio.sqs.serialization.Serializer
import zio.sqs.{ SqsStream, SqsStreamSettings, Utils }
import zio._

object TestApp extends zio.ZIOAppDefault {
object AtMostOnceExample extends zio.ZIOAppDefault {
val queueName = "TestQueue"

val client: ZLayer[Any, Throwable, Sqs] =
Expand All @@ -36,7 +36,7 @@ object TestApp extends zio.ZIOAppDefault {
}
_ <- SqsStream(
queueUrl,
SqsStreamSettings.default.withStopWhenQueueEmpty(true).withWaitTimeSeconds(3)
SqsStreamSettings.default.withStopWhenQueueEmpty(true).withWaitTimeSeconds(3).withAutoDelete(true)
).foreach(msg => ZIO.succeed(println(msg.body)))
} yield ()

Expand Down

0 comments on commit 0fd8274

Please sign in to comment.