diff --git a/README.md b/README.md index 88c236b2b..38b8e506c 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Finch uses multi-project structure and contains of the following _modules_: * [`finch-argonaut`](argonaut) - [Argonaut][argonaut] + Finch * [`finch-circe`](circe) - [Circe][circe] + Finch * [`finch-iteratee`](iteratee) - [Iteratee][iteratee] + Finch +* [`finch-fs2`](fs2) - [FS2][fs] + Finch * [`finch-refined`](refined) - [Refined][refined] + Finch * [`finch-test`](test) - the test support classes/functions * [`finch-sse`](sse) - SSE ([Server Sent Events][server-sent-events]) support in Finch @@ -197,6 +198,7 @@ limitations under the License. [finch-sprayjson]: https://github.com/finch/finch-sprayjson [finch-playjson]: https://github.com/finch/finch-playjson [finch-oauth2]: https://github.com/finch/finch-ouath2 +[fs2]: https://github.com/functional-streams-for-scala/fs2 [server-sent-events]: https://en.wikipedia.org/wiki/Server-sent_events [vkostyukov]: https://twitter.com/vkostyukov [travisbrown]: https://twitter.com/travisbrown diff --git a/core/src/main/scala/io/finch/Endpoint.scala b/core/src/main/scala/io/finch/Endpoint.scala index 3095b14c4..c36dc4547 100644 --- a/core/src/main/scala/io/finch/Endpoint.scala +++ b/core/src/main/scala/io/finch/Endpoint.scala @@ -890,43 +890,75 @@ object Endpoint { } /** - * An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as - * an `S[F, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests. - */ - def streamBinaryBody[F[_], S[_[_], _]](implicit - liftReader: LiftReader[S, F], - F: Effect[F] - ): Endpoint[F, S[F, Buf]] = { - new Endpoint[F, S[F, Buf]] { - final def apply(input: Input): Endpoint.Result[F, S[F, Buf]] = { - if (!input.request.isChunked) EndpointResult.NotMatched[F] - else { - val req = input.request - EndpointResult.Matched( - input, - Trace.empty, - F.pure[Output[S[F, Buf]]](Output.payload(liftReader(req.reader))) - ) - } - } + * An [[Endpoint]] that matches chunked requests and lifts their content into a generic + * **binary** stream passed as a type parameter. This method, along with other `bodyStream` + * endpoints, are integration points with streaming libraries such as fs2 and iteratee. + * + * {{{ + * scala> import io.finch._, io.finch.iteratee._, cats.effect.IO, io.iteratee.Enumerator + * + * scala> val bin = Endpoint[IO].binaryBodyStream[Enumerator] + * bin: Endpoint[IO, Enumerator[IO, Array[Byte]]] = binaryBodyStream + * }}} + */ + def binaryBodyStream[F[_]: Effect, S[_[_], _]](implicit + LR: LiftReader[S, F] + ): Endpoint[F, S[F, Array[Byte]]] = new BinaryBodyStream[F, S] - final override def item: RequestItem = items.BodyItem - final override def toString: String = "streamBinaryBody" - } - } + /** + * An [[Endpoint]] that matches chunked requests and lifts their content into a generic + * **string** stream passed as a type parameter. This method, along with other `bodyStream` + * endpoints, are integration points with streaming libraries such as fs2 and iteratee. + * + * {{{ + * scala> import io.finch._, io.finch.iteratee._, cats.effect.IO, io.iteratee.Enumerator + * + * scala> val bin = Endpoint[IO].stringBodyStream[Enumerator] + * bin: Endpoint[IO, Enumerator[IO, String]] = stringBodyStream + * }}} + */ + def stringBodyStream[F[_]: Effect, S[_[_], _]](implicit + LR: LiftReader[S, F] + ): Endpoint[F, S[F, String]] = new StringBodyStream[F, S] - def streamJsonBody[F[_], S[_[_], _], A](implicit - streamDecoder: DecodeStream.Aux[S, F, A, Application.Json], - liftReader: LiftReader[S, F], - F: Effect[F] - ): Endpoint[F, S[F, A]] = new Endpoint[F, S[F, A]] { - final def apply(input: Input): Result[F, S[F, A]] = { - streamBinaryBody.apply(input).map(streamDecoder(_, input.request.charsetOrUtf8)) - } + /** + * An [[Endpoint]] that matches chunked requests and lifts their content into a generic + * stream passed as a type parameter. This method, along with other `bodyStream` + * endpoints, are integration points with streaming libraries such as fs2 and iteratee. + * + * When, for example, JSON library is import, this endpoint can parse an inbound JSON stream. + * + * {{{ + * scala> import io.finch._, io.finch.iteratee._, cats.effect.IO, io.iteratee.Enumerator + * + * scala> import io.finch.circe._, io.circe.generic.auto._ + * + * scala> case class Foo(s: String) - final override def item: RequestItem = items.BodyItem - final override def toString: String = "streamJsonBody" - } + * scala> val json = Endpoint[IO].bodyStream[Enumerator, Foo, Application.Json] + * bin: Endpoint[IO, Enumerator[IO, Foo]] = bodyStream + * }}} + */ + def bodyStream[F[_]: Effect, S[_[_], _], A, CT <: String](implicit + LR: LiftReader[S, F], + A: DecodeStream.Aux[S, F, A, CT] + ): Endpoint[F, S[F, A]] = new BodyStream[F, S, A, CT] + + /** + * See [[bodyStream]]. This is just an alias for `bodyStream[?, ?, Application.Json]`. + */ + def jsonBodyStream[F[_]: Effect, S[_[_], _], A](implicit + LR: LiftReader[S, F], + A: DecodeStream.Aux[S, F, A, Application.Json] + ) : Endpoint[F, S[F, A]] = bodyStream[F, S, A, Application.Json] + + /** + * See [[bodyStream]]. This is just an alias for `bodyStream[?, ?, Text.Plain]`. + */ + def textBodyStream[F[_]: Effect, S[_[_], _], A](implicit + LR: LiftReader[S, F], + A: DecodeStream.Aux[S, F, A, Text.Plain] + ): Endpoint[F, S[F, A]] = bodyStream[F, S, A, Text.Plain] /** * An evaluating [[Endpoint]] that reads an optional HTTP cookie from the request into an diff --git a/core/src/main/scala/io/finch/EndpointModule.scala b/core/src/main/scala/io/finch/EndpointModule.scala index d14075f17..3d5f7451d 100644 --- a/core/src/main/scala/io/finch/EndpointModule.scala +++ b/core/src/main/scala/io/finch/EndpointModule.scala @@ -290,21 +290,47 @@ trait EndpointModule[F[_]] { Endpoint.asyncBody[F] /** - * An alias for [[Endpoint.streamBinaryBody]] - */ - def streamBinaryBody[S[_[_], _], A, CT <: String](implicit - liftReader: LiftReader[S, F], - F: Effect[F] - ): Endpoint[F, S[F, Buf]] = Endpoint.streamBinaryBody[F, S] - - /** - * An alias for [[Endpoint.streamJsonBody]] - */ - def streamJsonBody[S[_[_], _], A](implicit - decoder: DecodeStream.Aux[S, F, A, Application.Json], - liftReader: LiftReader[S, F], - F: Effect[F] - ): Endpoint[F, S[F, A]] = Endpoint.streamJsonBody[F, S, A] + * An alias for [[Endpoint.binaryBodyStream]]. + */ + def binaryBodyStream[S[_[_], _]](implicit + F: Effect[F], + LR: LiftReader[S, F] + ): Endpoint[F, S[F, Array[Byte]]] = Endpoint.binaryBodyStream[F, S] + + /** + * An alias for [[Endpoint.stringBodyStream]]. + */ + def stringBodyStream[S[_[_], _]](implicit + F: Effect[F], + LR: LiftReader[S, F] + ): Endpoint[F, S[F, String]] = Endpoint.stringBodyStream[F, S] + + /** + * An alias for [[Endpoint.bodyStream]]. + */ + def bodyStream[S[_[_], _], A, CT <: String](implicit + F: Effect[F], + LR: LiftReader[S, F], + A: DecodeStream.Aux[S, F, A, CT] + ): Endpoint[F, S[F, A]] = Endpoint.bodyStream[F, S, A, CT] + + /** + * An alias for [[Endpoint.jsonBodyStream]]. + */ + def jsonBodyStream[S[_[_], _], A](implicit + F: Effect[F], + LR: LiftReader[S, F], + A: DecodeStream.Aux[S, F, A, Application.Json] + ): Endpoint[F, S[F, A]] = Endpoint.jsonBodyStream[F, S, A] + + /** + * An alias for [[Endpoint.textBodyStream]]. + */ + def textBodyStream[S[_[_], _], A](implicit + F: Effect[F], + LR: LiftReader[S, F], + A: DecodeStream.Aux[S, F, A, Text.Plain] + ): Endpoint[F, S[F, A]] = Endpoint.textBodyStream[F, S, A] /** * An alias for [[Endpoint.cookieOption]]. diff --git a/core/src/main/scala/io/finch/EndpointResult.scala b/core/src/main/scala/io/finch/EndpointResult.scala index 4b8072fcd..093afbb63 100644 --- a/core/src/main/scala/io/finch/EndpointResult.scala +++ b/core/src/main/scala/io/finch/EndpointResult.scala @@ -1,6 +1,6 @@ package io.finch -import cats.{Functor, Id} +import cats.Id import cats.effect.Effect import com.twitter.finagle.http.Method import com.twitter.util._ @@ -45,11 +45,6 @@ sealed abstract class EndpointResult[F[_], +A] { case _ => None } - final def map[B](fn: A => B)(implicit F: Effect[F]): EndpointResult[F, B] = this match { - case EndpointResult.Matched(rem, trc, o) => EndpointResult.Matched(rem, trc, F.map(o)(_.map(fn))) - case n: EndpointResult.NotMatched[F] => n - } - def awaitOutput(d: Duration = Duration.Inf)(implicit F: Effect[F]): Option[Either[Throwable, Output[A]]] = this match { case EndpointResult.Matched(_, _, out) => try { @@ -105,9 +100,4 @@ object EndpointResult { case _ => None } } - - implicit def endpointResultInstances[F[_] : Effect]: Functor[EndpointResult[F, ?]] = - new Functor[EndpointResult[F, ?]] { - def map[A, B](fa: EndpointResult[F, A])(f: A => B): EndpointResult[F, B] = fa.map(f) - } } diff --git a/core/src/main/scala/io/finch/LiftReader.scala b/core/src/main/scala/io/finch/LiftReader.scala index a29cb3465..dae402a60 100644 --- a/core/src/main/scala/io/finch/LiftReader.scala +++ b/core/src/main/scala/io/finch/LiftReader.scala @@ -3,15 +3,11 @@ package io.finch import com.twitter.io.{Buf, Reader} /** - * Create stream S[F, Buf] from com.twitter.io.Reader[Buf] + * Create stream `S[F, A]` from [[Reader]]. */ trait LiftReader[S[_[_], _], F[_]] { - def apply(reader: Reader[Buf]): S[F, Buf] -} + final def apply(reader: Reader[Buf]): S[F, Buf] = apply(reader, identity) -object LiftReader { - def instance[S[_[_], _], F[_]](fn: Reader[Buf] => S[F, Buf]): LiftReader[S, F] = new LiftReader[S, F] { - def apply(reader: Reader[Buf]): S[F, Buf] = fn(reader) - } + def apply[A](reader: Reader[Buf], process: Buf => A): S[F, A] } diff --git a/core/src/main/scala/io/finch/endpoint/body.scala b/core/src/main/scala/io/finch/endpoint/body.scala index 4c00e190f..cfac6a3d4 100644 --- a/core/src/main/scala/io/finch/endpoint/body.scala +++ b/core/src/main/scala/io/finch/endpoint/body.scala @@ -1,11 +1,11 @@ package io.finch.endpoint import cats.effect.Effect -import com.twitter.io.Buf +import com.twitter.io.{Buf, Reader} import io.finch._ import io.finch.internal._ import io.finch.items._ -import java.nio.charset.Charset +import java.nio.charset.{Charset, StandardCharsets} import scala.reflect.ClassTag private[finch] abstract class FullBody[F[_], A] extends Endpoint[F, A] { @@ -83,3 +83,59 @@ private[finch] abstract class StringBody[F[_], A](implicit protected val F: Effe final override def toString: String = "stringBody" } + +private[finch] abstract class ChunkedBody[F[_], S[_[_], _], A] extends Endpoint[F, S[F, A]] { + + protected def F: Effect[F] + protected def prepare(r: Reader[Buf], cs: Charset): Output[S[F, A]] + + final def apply(input: Input): EndpointResult[F, S[F, A]] = + if (!input.request.isChunked) EndpointResult.NotMatched[F] + else EndpointResult.Matched( + input, + Trace.empty, + F.delay(prepare(input.request.reader, input.request.charsetOrUtf8)) + ) + + final override def item: RequestItem = items.BodyItem +} + +private[finch] final class BinaryBodyStream[F[_], S[_[_], _]](implicit + LR: LiftReader[S, F], + protected val F: Effect[F] +) extends ChunkedBody[F, S, Array[Byte]] with (Buf => Array[Byte]) { + + def apply(buf: Buf): Array[Byte] = buf.asByteArray + + protected def prepare(r: Reader[Buf], cs: Charset): Output[S[F, Array[Byte]]] = + Output.payload(LR(r, this)) + + override def toString: String = "binaryBodyStream" +} + +private[finch] final class StringBodyStream[F[_], S[_[_], _]](implicit + LR: LiftReader[S, F], + protected val F: Effect[F] +) extends ChunkedBody[F, S, String] with (Buf => String) { + + def apply(buf: Buf): String = buf.asString(StandardCharsets.UTF_8) + + protected def prepare(r: Reader[Buf], cs: Charset): Output[S[F, String]] = cs match { + case StandardCharsets.UTF_8 => Output.payload(LR(r, this)) + case _ => Output.payload(LR(r, _.asString(cs))) + } + + override def toString: String = "stringBodyStream" +} + +private[finch] final class BodyStream[F[_], S[_[_], _], A, CT <: String](implicit + protected val F: Effect[F], + LR: LiftReader[S, F], + A: DecodeStream.Aux[S, F, A, CT] +) extends ChunkedBody[F, S, A] { + + protected def prepare(r: Reader[Buf], cs: Charset): Output[S[F, A]] = + Output.payload(A(LR(r), cs)) + + override def toString: String = "bodyStream" +} diff --git a/core/src/test/scala/io/finch/EndpointResultSpec.scala b/core/src/test/scala/io/finch/EndpointResultSpec.scala deleted file mode 100644 index 4afae365c..000000000 --- a/core/src/test/scala/io/finch/EndpointResultSpec.scala +++ /dev/null @@ -1,13 +0,0 @@ -package io.finch - -import cats.effect.IO -import cats.laws.discipline.FunctorTests - -class EndpointResultSpec extends FinchSpec { - - checkAll( - "Functor[EndpointResult]", - FunctorTests[EndpointResult[IO, ?]].functor[String, String, String] - ) - -} diff --git a/core/src/test/scala/io/finch/StreamingLaws.scala b/core/src/test/scala/io/finch/StreamingLaws.scala index 58e866111..5b537e784 100644 --- a/core/src/test/scala/io/finch/StreamingLaws.scala +++ b/core/src/test/scala/io/finch/StreamingLaws.scala @@ -17,7 +17,7 @@ abstract class StreamingLaws[S[_[_], _], F[_] : Effect] extends Laws with AllIns def toResponse: ToResponse[S[F, Buf]] def fromList: List[Buf] => S[F, Buf] - def toList: S[F, Buf] => List[Buf] + def toList: S[F, Array[Byte]] => List[Buf] def roundTrip(a: List[Buf], cs: Charset): IsEq[List[Buf]] = { val req = Request() @@ -26,16 +26,16 @@ abstract class StreamingLaws[S[_[_], _], F[_] : Effect] extends Laws with AllIns Reader.copy(toResponse(fromList(a), cs).reader, req.writer).ensure(req.writer.close()) Endpoint - .streamBinaryBody[F, S] + .binaryBodyStream[F, S] .apply(Input.fromRequest(req)) .awaitValueUnsafe() .map(toList) .get <-> a } - def onlyChunked: EndpointResult[F, S[F, Buf]] = { + def onlyChunked: EndpointResult[F, S[F, Array[Byte]]] = { Endpoint - .streamBinaryBody[F, S] + .binaryBodyStream[F, S] .apply(Input.fromRequest(Request())) } @@ -55,7 +55,7 @@ object StreamingLaws { def apply[S[_[_], _], F[_] : Effect]( streamFromList: List[Buf] => S[F, Buf], - listFromStream: S[F, Buf] => List[Buf] + listFromStream: S[F, Array[Byte]] => List[Buf] )(implicit tr: ToResponse.Aux[S[F, Buf], Text.Plain], reader: LiftReader[S, F] @@ -63,7 +63,7 @@ object StreamingLaws { implicit val streamReader: LiftReader[S, F] = reader val toResponse: ToResponse[S[F, Buf]] = tr val fromList: List[Buf] => S[F, Buf] = streamFromList - val toList: S[F, Buf] => List[Buf] = listFromStream + val toList: S[F, Array[Byte]] => List[Buf] = listFromStream } } diff --git a/examples/src/main/scala/io/finch/iteratee/Main.scala b/examples/src/main/scala/io/finch/iteratee/Main.scala index 8c93be66e..1900df98f 100644 --- a/examples/src/main/scala/io/finch/iteratee/Main.scala +++ b/examples/src/main/scala/io/finch/iteratee/Main.scala @@ -49,7 +49,7 @@ object Main extends EndpointModule[IO] { private def stream: Stream[Int] = Stream.continually(Random.nextInt()) - val sumJson: Endpoint[IO, Result] = post("sumJson" :: streamJsonBody[Enumerator, Number]) { + val sumJson: Endpoint[IO, Result] = post("sumJson" :: jsonBodyStream[Enumerator, Number]) { enum: Enumerator[IO, Number] => enum.into(Iteratee.fold[IO, Number, Result](Result(0))(_ add _)).map(Ok) } @@ -59,7 +59,7 @@ object Main extends EndpointModule[IO] { } val isPrime: Endpoint[IO, Enumerator[IO, IsPrime]] = - post("streamPrime" :: streamJsonBody[Enumerator, Number]) { enum: Enumerator[IO, Number] => + post("streamPrime" :: jsonBodyStream[Enumerator, Number]) { enum: Enumerator[IO, Number] => Ok(enum.map(_.isPrime)) } diff --git a/fs2/src/main/scala/io/finch/fs2/package.scala b/fs2/src/main/scala/io/finch/fs2/package.scala index 31e16f1f6..08f6cdd2d 100644 --- a/fs2/src/main/scala/io/finch/fs2/package.scala +++ b/fs2/src/main/scala/io/finch/fs2/package.scala @@ -11,12 +11,16 @@ package object fs2 extends StreamInstances { implicit def streamLiftReader[F[_]](implicit F: Effect[F], TE: ToEffect[Future, F] - ): LiftReader[Stream, F] = LiftReader.instance { reader => - Stream - .repeatEval(F.defer(TE(reader.read))) - .unNoneTerminate - .onFinalize(F.delay(reader.discard())) - } + ): LiftReader[Stream, F] = + new LiftReader[Stream, F] { + final def apply[A](reader: Reader[Buf], process: Buf => A): Stream[F, A] = { + Stream + .repeatEval(F.suspend(TE(reader.read()))) + .unNoneTerminate + .map(process) + .onFinalize(F.delay(reader.discard())) + } + } implicit def encodeJsonFs2Stream[F[_]: Effect, A](implicit A: Encode.Json[A] diff --git a/fs2/src/test/scala/io/finch/fs2/Fs2StreamingSpec.scala b/fs2/src/test/scala/io/finch/fs2/Fs2StreamingSpec.scala index e0b51c08d..9fc0dd8a4 100644 --- a/fs2/src/test/scala/io/finch/fs2/Fs2StreamingSpec.scala +++ b/fs2/src/test/scala/io/finch/fs2/Fs2StreamingSpec.scala @@ -2,6 +2,7 @@ package io.finch.fs2 import _root_.fs2.Stream import cats.effect.IO +import com.twitter.io.Buf import io.finch._ import org.scalatest.prop.GeneratorDrivenPropertyChecks @@ -9,7 +10,7 @@ class Fs2StreamingSpec extends FinchSpec with GeneratorDrivenPropertyChecks { checkAll("fs2.streamBody", StreamingLaws[Stream, IO]( list => Stream(list:_*), - _.compile.toList.unsafeRunSync() + _.map(array => Buf.ByteArray.Owned(array)).compile.toList.unsafeRunSync() ).all) } diff --git a/iteratee/src/main/scala/io/finch/iteratee/package.scala b/iteratee/src/main/scala/io/finch/iteratee/package.scala index 21948c991..2750e1dec 100644 --- a/iteratee/src/main/scala/io/finch/iteratee/package.scala +++ b/iteratee/src/main/scala/io/finch/iteratee/package.scala @@ -51,19 +51,22 @@ trait IterateeInstances extends LowPriorityIterateeInstances { implicit def enumeratorLiftReader[F[_]](implicit F: Effect[F], TE: ToEffect[Future, F] - ): LiftReader[Enumerator, F] = LiftReader.instance { reader => - def loop(): Enumerator[F, Buf] = { - Enumerator.liftM[F, Option[Buf]] { - F.defer(TE(reader.read())) - }.flatMap { - case None => Enumerator.empty[F, Buf] - case Some(buf) => Enumerator.enumOne[F, Buf](buf).append(loop()) + ): LiftReader[Enumerator, F] = + new LiftReader[Enumerator, F] { + final def apply[A](reader: Reader[Buf], process: Buf => A): Enumerator[F, A] = { + def loop(): Enumerator[F, A] = { + Enumerator + .liftM[F, Option[Buf]](F.suspend(TE(reader.read()))) + .flatMap { + case None => Enumerator.empty[F, A] + case Some(buf) => Enumerator.enumOne[F, A](process(buf)).append(loop()) + } + } + + loop().ensure(F.delay(reader.discard())) } } - loop().ensure(F.delay(reader.discard())) - } - implicit def encodeJsonEnumerator[F[_]: Effect, A](implicit A: Encode.Json[A] ): EncodeStream.Json[Enumerator, F, A] = diff --git a/iteratee/src/test/scala/io/finch/iteratee/IterateeStreamingSpec.scala b/iteratee/src/test/scala/io/finch/iteratee/IterateeStreamingSpec.scala index d89e89d95..a274c6c18 100644 --- a/iteratee/src/test/scala/io/finch/iteratee/IterateeStreamingSpec.scala +++ b/iteratee/src/test/scala/io/finch/iteratee/IterateeStreamingSpec.scala @@ -1,6 +1,7 @@ package io.finch.iteratee import cats.effect.IO +import com.twitter.io.Buf import io.finch._ import io.iteratee.Enumerator import org.scalatest.prop.GeneratorDrivenPropertyChecks @@ -9,7 +10,7 @@ class IterateeStreamingSpec extends FinchSpec with GeneratorDrivenPropertyChecks checkAll("Iteratee.streamBody", StreamingLaws[Enumerator, IO]( Enumerator.enumList, - _.toVector.unsafeRunSync().toList + _.map(array => Buf.ByteArray.Owned(array)).toVector.unsafeRunSync().toList ).all) }