Skip to content

Commit

Permalink
Move configuration of serdes to the creation effect (#334)
Browse files Browse the repository at this point in the history
  • Loading branch information
iravid authored Sep 30, 2021
1 parent c9bddd7 commit 1a213c5
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 60 deletions.
28 changes: 12 additions & 16 deletions src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,23 +197,19 @@ object Consumer {
keyDeserializer: Deserializer[R, K],
valueDeserializer: Deserializer[R, V]
): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] =
ZStream.fromEffect {
keyDeserializer.configure(settings.driverSettings, isKey = true) *>
valueDeserializer.configure(settings.driverSettings, isKey = false)
} *>
ZStream
.fromQueue(runloop.partitions)
.map(_.exit)
.flattenExitOption
.map {
_.map { case (tp, partition) =>
val partitionStream =
if (settings.perPartitionChunkPrefetch <= 0) partition
else partition.buffer(settings.perPartitionChunkPrefetch)

tp -> partitionStream.mapChunksM(_.mapM(_.deserializeWith(keyDeserializer, valueDeserializer)))
}
ZStream
.fromQueue(runloop.partitions)
.map(_.exit)
.flattenExitOption
.map {
_.map { case (tp, partition) =>
val partitionStream =
if (settings.perPartitionChunkPrefetch <= 0) partition
else partition.buffer(settings.perPartitionChunkPrefetch)

tp -> partitionStream.mapChunksM(_.mapM(_.deserializeWith(keyDeserializer, valueDeserializer)))
}
}

override def partitionedStream[R, K, V](
keyDeserializer: Deserializer[R, K],
Expand Down
32 changes: 22 additions & 10 deletions src/main/scala/zio/kafka/serde/Deserializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package zio.kafka.serde
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.serialization.{ Deserializer => KafkaDeserializer }
import zio.{ RIO, Task, ZIO }
import zio.blocking.{ blocking => zioBlocking, Blocking }

import scala.util.{ Failure, Success, Try }
import scala.jdk.CollectionConverters._
Expand All @@ -18,7 +19,12 @@ import scala.annotation.nowarn
*/
trait Deserializer[-R, +T] {
def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[R, T]
def configure(props: Map[String, AnyRef], isKey: Boolean): Task[Unit]

/**
* Returns a new deserializer that executes its deserialization function on the blocking threadpool.
*/
def blocking: Deserializer[R with Blocking, T] =
Deserializer((topic, headers, data) => zioBlocking(deserialize(topic, headers, data)))

/**
* Create a deserializer for a type U based on the deserializer for type T and a mapping function
Expand Down Expand Up @@ -48,6 +54,9 @@ trait Deserializer[-R, +T] {
def asTry: Deserializer[R, Try[T]] =
Deserializer(deserialize(_, _, _).fold(e => Failure(e), v => Success(v)))

/**
* Returns a new deserializer that deserializes values as Option values, mapping null data to None values.
*/
def asOption(implicit @nowarn ev: T <:< AnyRef): Deserializer[R, Option[T]] =
Deserializer((topic, headers, data) => ZIO.foreach(Option(data))(deserialize(topic, headers, _)))
}
Expand All @@ -60,18 +69,21 @@ object Deserializer extends Serdes {
def apply[R, T](deser: (String, Headers, Array[Byte]) => RIO[R, T]): Deserializer[R, T] = new Deserializer[R, T] {
override def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[R, T] =
deser(topic, headers, data)

override def configure(props: Map[String, AnyRef], isKey: Boolean): Task[Unit] = Task.unit
}

/**
* Create a Deserializer from a Kafka Deserializer
*/
def apply[T](deserializer: KafkaDeserializer[T]): Deserializer[Any, T] = new Deserializer[Any, T] {
override def deserialize(topic: String, headers: Headers, data: Array[Byte]): Task[T] =
Task(deserializer.deserialize(topic, headers, data))

override def configure(props: Map[String, AnyRef], isKey: Boolean): zio.Task[Unit] =
Task(deserializer.configure(props.asJava, isKey))
}
def fromKafkaDeserializer[T](
deserializer: KafkaDeserializer[T],
props: Map[String, AnyRef],
isKey: Boolean
): Task[Deserializer[Any, T]] =
Task(deserializer.configure(props.asJava, isKey))
.as(
new Deserializer[Any, T] {
override def deserialize(topic: String, headers: Headers, data: Array[Byte]): Task[T] =
Task(deserializer.deserialize(topic, headers, data))
}
)
}
41 changes: 27 additions & 14 deletions src/main/scala/zio/kafka/serde/Serde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.apache.kafka.common.serialization.{ Serde => KafkaSerde }
import org.apache.kafka.common.header.Headers

import zio.{ RIO, Task }
import zio.blocking.Blocking

import scala.util.Try
import scala.jdk.CollectionConverters._
Expand All @@ -18,6 +19,18 @@ import scala.jdk.CollectionConverters._
*/
trait Serde[-R, T] extends Deserializer[R, T] with Serializer[R, T] {

/**
* Creates a new Serde that uses optional values. Null data will be mapped to None values.
*/
def asOption(implicit ev: T <:< AnyRef, ev2: Null <:< T): Serde[R, Option[T]] =
Serde(super[Deserializer].asOption)(super[Serializer].asOption)

/**
* Creates a new Serde that executes its serialization and deserialization functions on the blocking threadpool.
*/
override def blocking: Serde[R with Blocking, T] =
Serde(super[Deserializer].blocking)(super[Serializer].blocking)

/**
* Converts to a Serde of type U with pure transformations
*/
Expand All @@ -29,9 +42,6 @@ trait Serde[-R, T] extends Deserializer[R, T] with Serializer[R, T] {
*/
def inmapM[R1 <: R, U](f: T => RIO[R1, U])(g: U => RIO[R1, T]): Serde[R1, U] =
Serde(mapM(f))(contramapM(g))

def asOption(implicit ev: T <:< AnyRef, ev2: Null <:< T): Serde[R, Option[T]] =
Serde(super[Deserializer].asOption)(super[Serializer].asOption)
}

object Serde extends Serdes {
Expand All @@ -49,7 +59,6 @@ object Serde extends Serdes {
ser(topic, headers, value)
override def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[R, T] =
deser(topic, headers, data)
override def configure(props: Map[String, AnyRef], isKey: Boolean): Task[Unit] = Task.unit
}

/**
Expand All @@ -60,21 +69,25 @@ object Serde extends Serdes {
ser.serialize(topic, headers, value)
override def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[R, T] =
deser.deserialize(topic, headers, data)
override def configure(props: Map[String, AnyRef], isKey: Boolean): Task[Unit] =
deser.configure(props, isKey) *> ser.configure(props, isKey)
}

/**
* Create a Serde from a Kafka Serde
*/
def apply[T](serde: KafkaSerde[T]): Serde[Any, T] = new Serde[Any, T] {
override def serialize(topic: String, headers: Headers, value: T): Task[Array[Byte]] =
Task(serde.serializer().serialize(topic, headers, value))
override def deserialize(topic: String, headers: Headers, data: Array[Byte]): Task[T] =
Task(serde.deserializer().deserialize(topic, headers, data))
override def configure(props: Map[String, AnyRef], isKey: Boolean): Task[Unit] =
Task(serde.configure(props.asJava, isKey))
}
def fromKafkaSerde[T](serde: KafkaSerde[T], props: Map[String, AnyRef], isKey: Boolean) =
Task(serde.configure(props.asJava, isKey))
.as(
new Serde[Any, T] {
val serializer = serde.serializer()
val deserializer = serde.deserializer()

override def deserialize(topic: String, headers: Headers, data: Array[Byte]): Task[T] =
Task(deserializer.deserialize(topic, headers, data))

override def serialize(topic: String, headers: Headers, value: T): Task[Array[Byte]] =
Task(serializer.serialize(topic, headers, value))
}
)

implicit def deserializerWithError[R, T](implicit deser: Deserializer[R, T]): Deserializer[R, Try[T]] =
deser.asTry
Expand Down
34 changes: 24 additions & 10 deletions src/main/scala/zio/kafka/serde/Serdes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,30 @@ package zio.kafka.serde
import java.nio.ByteBuffer
import java.util.UUID

import org.apache.kafka.common.serialization.{ Serdes => KafkaSerdes }
import org.apache.kafka.common.serialization.{ Serde => KafkaSerde, Serdes => KafkaSerdes }
import org.apache.kafka.common.header.Headers
import zio.{ RIO, Task }

private[zio] trait Serdes {
lazy val long: Serde[Any, Long] = Serde(KafkaSerdes.Long()).inmap(Long2long)(long2Long)
lazy val int: Serde[Any, Int] = Serde(KafkaSerdes.Integer()).inmap(Integer2int)(int2Integer)
lazy val short: Serde[Any, Short] = Serde(KafkaSerdes.Short()).inmap(Short2short)(short2Short)
lazy val float: Serde[Any, Float] = Serde(KafkaSerdes.Float()).inmap(Float2float)(float2Float)
lazy val double: Serde[Any, Double] = Serde(KafkaSerdes.Double()).inmap(Double2double)(double2Double)
lazy val string: Serde[Any, String] = Serde(KafkaSerdes.String())
lazy val byteArray: Serde[Any, Array[Byte]] = Serde(KafkaSerdes.ByteArray())
lazy val byteBuffer: Serde[Any, ByteBuffer] = Serde(KafkaSerdes.ByteBuffer())
lazy val uuid: Serde[Any, UUID] = Serde(KafkaSerdes.UUID())
lazy val long: Serde[Any, Long] = convertPrimitiveSerde(KafkaSerdes.Long()).inmap(Long2long)(long2Long)
lazy val int: Serde[Any, Int] = convertPrimitiveSerde(KafkaSerdes.Integer()).inmap(Integer2int)(int2Integer)
lazy val short: Serde[Any, Short] = convertPrimitiveSerde(KafkaSerdes.Short()).inmap(Short2short)(short2Short)
lazy val float: Serde[Any, Float] = convertPrimitiveSerde(KafkaSerdes.Float()).inmap(Float2float)(float2Float)
lazy val double: Serde[Any, Double] = convertPrimitiveSerde(KafkaSerdes.Double()).inmap(Double2double)(double2Double)
lazy val string: Serde[Any, String] = convertPrimitiveSerde(KafkaSerdes.String())
lazy val byteArray: Serde[Any, Array[Byte]] = convertPrimitiveSerde(KafkaSerdes.ByteArray())
lazy val byteBuffer: Serde[Any, ByteBuffer] = convertPrimitiveSerde(KafkaSerdes.ByteBuffer())
lazy val uuid: Serde[Any, UUID] = convertPrimitiveSerde(KafkaSerdes.UUID())

private[this] def convertPrimitiveSerde[T](serde: KafkaSerde[T]): Serde[Any, T] =
new Serde[Any, T] {
val serializer = serde.serializer()
val deserializer = serde.deserializer()

override def deserialize(topic: String, headers: Headers, data: Array[Byte]): RIO[Any, T] =
Task(deserializer.deserialize(topic, headers, data))

override def serialize(topic: String, headers: Headers, value: T): RIO[Any, Array[Byte]] =
Task(serializer.serialize(topic, headers, value))
}
}
31 changes: 22 additions & 9 deletions src/main/scala/zio/kafka/serde/Serializer.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package zio.kafka.serde

import zio.{ RIO, Task }
import zio.blocking.{ blocking => zioBlocking, Blocking }
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.serialization.{ Serializer => KafkaSerializer }

Expand All @@ -15,7 +16,6 @@ import scala.jdk.CollectionConverters._
*/
trait Serializer[-R, -T] {
def serialize(topic: String, headers: Headers, value: T): RIO[R, Array[Byte]]
def configure(props: Map[String, AnyRef], isKey: Boolean): Task[Unit]

/**
* Create a serializer for a type U based on the serializer for type T and a mapping function
Expand All @@ -29,6 +29,15 @@ trait Serializer[-R, -T] {
def contramapM[R1 <: R, U](f: U => RIO[R1, T]): Serializer[R1, U] =
Serializer((topic, headers, u) => f(u).flatMap(serialize(topic, headers, _)))

/**
* Returns a new serializer that executes its serialization function on the blocking threadpool.
*/
def blocking: Serializer[R with Blocking, T] =
Serializer((topic, headers, t) => zioBlocking(serialize(topic, headers, t)))

/**
* Returns a new serializer that handles optional values and serializes them as nulls.
*/
def asOption[U <: T](implicit ev: Null <:< T): Serializer[R, Option[U]] =
contramap(_.orNull)
}
Expand All @@ -42,18 +51,22 @@ object Serializer extends Serdes {
new Serializer[R, T] {
override def serialize(topic: String, headers: Headers, value: T): RIO[R, Array[Byte]] =
ser(topic, headers, value)

override def configure(props: Map[String, AnyRef], isKey: Boolean): Task[Unit] = Task.unit
}

/**
* Create a Serializer from a Kafka Serializer
*/
def apply[T](serializer: KafkaSerializer[T]): Serializer[Any, T] = new Serializer[Any, T] {
override def serialize(topic: String, headers: Headers, value: T): Task[Array[Byte]] =
Task(serializer.serialize(topic, headers, value))
def fromKafkaSerializer[T](
serializer: KafkaSerializer[T],
props: Map[String, AnyRef],
isKey: Boolean
): Task[Serializer[Any, T]] =
Task(serializer.configure(props.asJava, isKey))
.as(
new Serializer[Any, T] {
override def serialize(topic: String, headers: Headers, value: T): Task[Array[Byte]] =
Task(serializer.serialize(topic, headers, value))
}
)

override def configure(props: Map[String, AnyRef], isKey: Boolean): Task[Unit] =
Task(serializer.configure(props.asJava, isKey))
}
}
2 changes: 1 addition & 1 deletion src/test/scala/zio/kafka/serde/DeserializerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ object DeserializerSpec extends DefaultRunnableSpec {
assertM(stringDeserializer.asOption.deserialize("topic1", new RecordHeaders, null))(isNone)
},
testM("deserialize to None when value is null also when underlying deserializer fails on null values") {
val deserializer = Deserializer((_, _, _) => ZIO.fail(new RuntimeException("cannot handle null")))
val deserializer = Deserializer[Any, Nothing]((_, _, _) => ZIO.fail(new RuntimeException("cannot handle null")))
assertM(deserializer.asOption.deserialize("topic1", new RecordHeaders, null))(isNone)
},
testM("deserialize to Some when value is not null") {
Expand Down

0 comments on commit 1a213c5

Please sign in to comment.