From bfa21f45e609723f98aab0c01d8ce333021cf38f Mon Sep 17 00:00:00 2001 From: Greg Methvin Date: Thu, 11 Apr 2019 18:52:51 -0700 Subject: [PATCH] Improved API for negative acknowledgements (#2) --- .../akka/streams/PulsarCommittableSourceGraphStage.scala | 6 +++--- .../akka/streams/PulsarCommittableSourceTest.scala | 3 +-- .../com/sksamuel/pulsar4s/cats/CatsAsyncHandler.scala | 7 +++++-- .../main/scala/com/sksamuel/pulsar4s/AsyncHandler.scala | 1 + .../src/main/scala/com/sksamuel/pulsar4s/Consumer.scala | 7 +++++++ .../scala/com/sksamuel/pulsar4s/FutureAsyncHandler.scala | 3 +++ .../com/sksamuel/pulsar4s/monixs/MonixAsyncHandler.scala | 8 ++++++-- .../com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandler.scala | 8 ++++++-- 8 files changed, 32 insertions(+), 11 deletions(-) diff --git a/pulsar4s-akka-streams/src/main/scala/com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceGraphStage.scala b/pulsar4s-akka-streams/src/main/scala/com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceGraphStage.scala index 93bd2610..bbc43aee 100644 --- a/pulsar4s-akka-streams/src/main/scala/com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceGraphStage.scala +++ b/pulsar4s-akka-streams/src/main/scala/com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceGraphStage.scala @@ -21,7 +21,7 @@ import scala.util.Success trait CommittableMessage[T] { def ack(cumulative: Boolean = false): Future[Done] - def nack(): Unit + def nack(): Future[Done] def message: ConsumerMessage[T] } @@ -63,9 +63,9 @@ class PulsarCommittableSourceGraphStage[T](create: () => Consumer[T], seek: Opti } ackFuture.map(_ => Done) } - override def nack(): Unit = { + override def nack(): Future[Done] = { logger.debug(s"Negatively acknowledging message: $msg") - consumer.negativeAcknowledge(msg.messageId) + consumer.negativeAcknowledgeAsync(msg.messageId).map(_ => Done) } }) case Failure(e) => diff --git a/pulsar4s-akka-streams/src/test/scala/com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceTest.scala b/pulsar4s-akka-streams/src/test/scala/com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceTest.scala index 15581853..588cc104 100644 --- a/pulsar4s-akka-streams/src/test/scala/com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceTest.scala +++ b/pulsar4s-akka-streams/src/test/scala/com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceTest.scala @@ -93,8 +93,7 @@ class PulsarCommittableSourceTest extends FunSuite with Matchers { .take(4) .mapAsync(10) { msg => if (msg.message.value < "c") { - msg.nack() - Future.successful(Vector.empty) + msg.nack().map(_ => Vector.empty) } else { msg.ack().map(_ => Vector(msg.message)) } diff --git a/pulsar4s-cats-effect/src/main/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandler.scala b/pulsar4s-cats-effect/src/main/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandler.scala index ef2e5eb2..ec97eb1d 100644 --- a/pulsar4s-cats-effect/src/main/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandler.scala +++ b/pulsar4s-cats-effect/src/main/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandler.scala @@ -13,8 +13,8 @@ import scala.util.{Failure, Success, Try} class CatsAsyncHandler extends AsyncHandler[IO] { - implicit def completableVoidToIO(f: CompletableFuture[Void]): IO[Unit] = completableToIO(f).map(_ => ()) - implicit def completableToIO[T](f: CompletableFuture[T]): IO[T] = + implicit def completableVoidToIO(f: => CompletableFuture[Void]): IO[Unit] = completableToIO(f).map(_ => ()) + implicit def completableToIO[T](f: => CompletableFuture[T]): IO[T] = IO.async[T] { k => f.whenCompleteAsync(new BiConsumer[T, Throwable] { override def accept(t: T, e: Throwable): Unit = { @@ -50,6 +50,9 @@ class CatsAsyncHandler extends AsyncHandler[IO] { override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): IO[Unit] = consumer.acknowledgeCumulativeAsync(messageId) + override def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): IO[Unit] = + IO { consumer.negativeAcknowledge(messageId) } + override def close(reader: Reader[_]): IO[Unit] = reader.closeAsync() override def flush(producer: api.Producer[_]): IO[Unit] = producer.flushAsync() diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/AsyncHandler.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/AsyncHandler.scala index c40f88e8..5671ec7b 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/AsyncHandler.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/AsyncHandler.scala @@ -28,6 +28,7 @@ trait AsyncHandler[F[_]] { def unsubscribeAsync(consumer: api.Consumer[_]): F[Unit] def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] + def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] } diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Consumer.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Consumer.scala index 234ddb15..b6f2d232 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Consumer.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Consumer.scala @@ -65,6 +65,11 @@ trait Consumer[T] extends Closeable { def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] + final def negativeAcknowledgeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] = + acknowledgeAsync(message.messageId) + + def negativeAcknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] + final def acknowledgeCumulativeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] = acknowledgeCumulativeAsync(message.messageId) @@ -100,6 +105,8 @@ class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Loggin implicitly[AsyncHandler[F]].acknowledgeCumulativeAsync(consumer, messageId) override def negativeAcknowledge(messageId: MessageId): Unit = consumer.negativeAcknowledge(messageId) + override def negativeAcknowledgeAsync[F[_]: AsyncHandler](messageId: MessageId): F[Unit] = + implicitly[AsyncHandler[F]].negativeAcknowledgeAsync(consumer, messageId) override def stats: ConsumerStats = consumer.getStats override def subscription = Subscription(consumer.getSubscription) diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/FutureAsyncHandler.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/FutureAsyncHandler.scala index ca293b87..71065da4 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/FutureAsyncHandler.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/FutureAsyncHandler.scala @@ -49,6 +49,9 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Future[Unit] = consumer.acknowledgeAsync(messageId).toScala + override def negativeAcknowledgeAsync[T](consumer: JConsumer[T], messageId: MessageId): Future[Unit] = + Future.successful(consumer.negativeAcknowledge(messageId)) + override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Future[Unit] = consumer.acknowledgeCumulativeAsync(messageId).toScala diff --git a/pulsar4s-monix/src/main/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandler.scala b/pulsar4s-monix/src/main/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandler.scala index a18c0d15..ccc79931 100644 --- a/pulsar4s-monix/src/main/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandler.scala +++ b/pulsar4s-monix/src/main/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandler.scala @@ -5,6 +5,7 @@ import java.util.concurrent.CompletableFuture import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerMessage, MessageId} import monix.eval.Task import org.apache.pulsar.client.api +import org.apache.pulsar.client.api.Consumer import org.apache.pulsar.client.api.{Reader, TypedMessageBuilder} import scala.compat.java8.FutureConverters @@ -14,10 +15,10 @@ import scala.util.{Failure, Success, Try} class MonixAsyncHandler extends AsyncHandler[Task] { - implicit def completableTToFuture[T](f: CompletableFuture[T]): Future[T] = + implicit def completableTToFuture[T](f: => CompletableFuture[T]): Future[T] = FutureConverters.toScala(f) - implicit def completableVoidToTask(f: CompletableFuture[Void]): Task[Unit] = + implicit def completableVoidToTask(f: => CompletableFuture[Void]): Task[Unit] = Task.deferFuture(FutureConverters.toScala(f)).map(_ => ()) override def failed(e: Throwable): Task[Nothing] = Task.raiseError(e) @@ -58,6 +59,9 @@ class MonixAsyncHandler extends AsyncHandler[Task] { override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] = consumer.acknowledgeCumulativeAsync(messageId) + override def negativeAcknowledgeAsync[T](consumer: Consumer[T], messageId: MessageId): Task[Unit] = + Task { consumer.negativeAcknowledge(messageId) } + override def close(reader: Reader[_]): Task[Unit] = reader.closeAsync() override def flush(producer: api.Producer[_]): Task[Unit] = producer.flushAsync() diff --git a/pulsar4s-scalaz/src/main/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandler.scala b/pulsar4s-scalaz/src/main/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandler.scala index 44217ab1..59ba1fe7 100644 --- a/pulsar4s-scalaz/src/main/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandler.scala +++ b/pulsar4s-scalaz/src/main/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandler.scala @@ -5,6 +5,7 @@ import java.util.function.BiConsumer import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerMessage, MessageId} import org.apache.pulsar.client.api +import org.apache.pulsar.client.api.Consumer import org.apache.pulsar.client.api.{Reader, TypedMessageBuilder} import scalaz.concurrent.Task @@ -13,10 +14,10 @@ import scala.util.{Failure, Success, Try} class ScalazAsyncHandler extends AsyncHandler[Task] { - implicit def completableVoidToTask(f: CompletableFuture[Void]): Task[Unit] = + implicit def completableVoidToTask(f: => CompletableFuture[Void]): Task[Unit] = completableToTask(f).map(_ => ()) - implicit def completableToTask[T](f: CompletableFuture[T]): Task[T] = { + implicit def completableToTask[T](f: => CompletableFuture[T]): Task[T] = { Task.async[T] { k => f.whenCompleteAsync(new BiConsumer[T, Throwable] { override def accept(t: T, e: Throwable): Unit = { @@ -56,6 +57,9 @@ class ScalazAsyncHandler extends AsyncHandler[Task] { override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] = consumer.acknowledgeCumulativeAsync(messageId) + override def negativeAcknowledgeAsync[T](consumer: Consumer[T], messageId: MessageId): Task[Unit] = + Task { consumer.negativeAcknowledge(messageId) } + override def close(reader: Reader[_]): Task[Unit] = reader.closeAsync() override def close(producer: api.Producer[_]): Task[Unit] = producer.closeAsync() override def close(consumer: api.Consumer[_]): Task[Unit] = consumer.closeAsync()