Skip to content

Commit

Permalink
Improved API for negative acknowledgements (CleverCloud#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmethvin authored Apr 12, 2019
1 parent 5a1f9bc commit bfa21f4
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.util.Success

trait CommittableMessage[T] {
def ack(cumulative: Boolean = false): Future[Done]
def nack(): Unit
def nack(): Future[Done]
def message: ConsumerMessage[T]
}

Expand Down Expand Up @@ -63,9 +63,9 @@ class PulsarCommittableSourceGraphStage[T](create: () => Consumer[T], seek: Opti
}
ackFuture.map(_ => Done)
}
override def nack(): Unit = {
override def nack(): Future[Done] = {
logger.debug(s"Negatively acknowledging message: $msg")
consumer.negativeAcknowledge(msg.messageId)
consumer.negativeAcknowledgeAsync(msg.messageId).map(_ => Done)
}
})
case Failure(e) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ class PulsarCommittableSourceTest extends FunSuite with Matchers {
.take(4)
.mapAsync(10) { msg =>
if (msg.message.value < "c") {
msg.nack()
Future.successful(Vector.empty)
msg.nack().map(_ => Vector.empty)
} else {
msg.ack().map(_ => Vector(msg.message))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import scala.util.{Failure, Success, Try}

class CatsAsyncHandler extends AsyncHandler[IO] {

implicit def completableVoidToIO(f: CompletableFuture[Void]): IO[Unit] = completableToIO(f).map(_ => ())
implicit def completableToIO[T](f: CompletableFuture[T]): IO[T] =
implicit def completableVoidToIO(f: => CompletableFuture[Void]): IO[Unit] = completableToIO(f).map(_ => ())
implicit def completableToIO[T](f: => CompletableFuture[T]): IO[T] =
IO.async[T] { k =>
f.whenCompleteAsync(new BiConsumer[T, Throwable] {
override def accept(t: T, e: Throwable): Unit = {
Expand Down Expand Up @@ -50,6 +50,9 @@ class CatsAsyncHandler extends AsyncHandler[IO] {
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): IO[Unit] =
consumer.acknowledgeCumulativeAsync(messageId)

override def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): IO[Unit] =
IO { consumer.negativeAcknowledge(messageId) }

override def close(reader: Reader[_]): IO[Unit] = reader.closeAsync()
override def flush(producer: api.Producer[_]): IO[Unit] = producer.flushAsync()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ trait AsyncHandler[F[_]] {
def unsubscribeAsync(consumer: api.Consumer[_]): F[Unit]

def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit]
def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit]
def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ trait Consumer[T] extends Closeable {

def acknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]

final def negativeAcknowledgeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] =
acknowledgeAsync(message.messageId)

def negativeAcknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]

final def acknowledgeCumulativeAsync[F[_] : AsyncHandler](message: ConsumerMessage[T]): F[Unit] =
acknowledgeCumulativeAsync(message.messageId)

Expand Down Expand Up @@ -100,6 +105,8 @@ class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Loggin
implicitly[AsyncHandler[F]].acknowledgeCumulativeAsync(consumer, messageId)

override def negativeAcknowledge(messageId: MessageId): Unit = consumer.negativeAcknowledge(messageId)
override def negativeAcknowledgeAsync[F[_]: AsyncHandler](messageId: MessageId): F[Unit] =
implicitly[AsyncHandler[F]].negativeAcknowledgeAsync(consumer, messageId)

override def stats: ConsumerStats = consumer.getStats
override def subscription = Subscription(consumer.getSubscription)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut
override def acknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Future[Unit] =
consumer.acknowledgeAsync(messageId).toScala

override def negativeAcknowledgeAsync[T](consumer: JConsumer[T], messageId: MessageId): Future[Unit] =
Future.successful(consumer.negativeAcknowledge(messageId))

override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Future[Unit] =
consumer.acknowledgeCumulativeAsync(messageId).toScala

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import java.util.concurrent.CompletableFuture
import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerMessage, MessageId}
import monix.eval.Task
import org.apache.pulsar.client.api
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.{Reader, TypedMessageBuilder}

import scala.compat.java8.FutureConverters
Expand All @@ -14,10 +15,10 @@ import scala.util.{Failure, Success, Try}

class MonixAsyncHandler extends AsyncHandler[Task] {

implicit def completableTToFuture[T](f: CompletableFuture[T]): Future[T] =
implicit def completableTToFuture[T](f: => CompletableFuture[T]): Future[T] =
FutureConverters.toScala(f)

implicit def completableVoidToTask(f: CompletableFuture[Void]): Task[Unit] =
implicit def completableVoidToTask(f: => CompletableFuture[Void]): Task[Unit] =
Task.deferFuture(FutureConverters.toScala(f)).map(_ => ())

override def failed(e: Throwable): Task[Nothing] = Task.raiseError(e)
Expand Down Expand Up @@ -58,6 +59,9 @@ class MonixAsyncHandler extends AsyncHandler[Task] {
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] =
consumer.acknowledgeCumulativeAsync(messageId)

override def negativeAcknowledgeAsync[T](consumer: Consumer[T], messageId: MessageId): Task[Unit] =
Task { consumer.negativeAcknowledge(messageId) }

override def close(reader: Reader[_]): Task[Unit] = reader.closeAsync()
override def flush(producer: api.Producer[_]): Task[Unit] = producer.flushAsync()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import java.util.function.BiConsumer

import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerMessage, MessageId}
import org.apache.pulsar.client.api
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.{Reader, TypedMessageBuilder}
import scalaz.concurrent.Task

Expand All @@ -13,10 +14,10 @@ import scala.util.{Failure, Success, Try}

class ScalazAsyncHandler extends AsyncHandler[Task] {

implicit def completableVoidToTask(f: CompletableFuture[Void]): Task[Unit] =
implicit def completableVoidToTask(f: => CompletableFuture[Void]): Task[Unit] =
completableToTask(f).map(_ => ())

implicit def completableToTask[T](f: CompletableFuture[T]): Task[T] = {
implicit def completableToTask[T](f: => CompletableFuture[T]): Task[T] = {
Task.async[T] { k =>
f.whenCompleteAsync(new BiConsumer[T, Throwable] {
override def accept(t: T, e: Throwable): Unit = {
Expand Down Expand Up @@ -56,6 +57,9 @@ class ScalazAsyncHandler extends AsyncHandler[Task] {
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] =
consumer.acknowledgeCumulativeAsync(messageId)

override def negativeAcknowledgeAsync[T](consumer: Consumer[T], messageId: MessageId): Task[Unit] =
Task { consumer.negativeAcknowledge(messageId) }

override def close(reader: Reader[_]): Task[Unit] = reader.closeAsync()
override def close(producer: api.Producer[_]): Task[Unit] = producer.closeAsync()
override def close(consumer: api.Consumer[_]): Task[Unit] = consumer.closeAsync()
Expand Down

0 comments on commit bfa21f4

Please sign in to comment.