Skip to content

Commit

Permalink
Merge pull request #1066 from finagle/vk/stream-body-endpoints
Browse files Browse the repository at this point in the history
New names for streaming bodies
  • Loading branch information
vkostyukov authored Jan 13, 2019
2 parents 65fd6f6 + f70c175 commit e618b8e
Show file tree
Hide file tree
Showing 13 changed files with 206 additions and 108 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Finch uses multi-project structure and contains of the following _modules_:
* [`finch-argonaut`](argonaut) - [Argonaut][argonaut] + Finch
* [`finch-circe`](circe) - [Circe][circe] + Finch
* [`finch-iteratee`](iteratee) - [Iteratee][iteratee] + Finch
* [`finch-fs2`](fs2) - [FS2][fs] + Finch
* [`finch-refined`](refined) - [Refined][refined] + Finch
* [`finch-test`](test) - the test support classes/functions
* [`finch-sse`](sse) - SSE ([Server Sent Events][server-sent-events]) support in Finch
Expand Down Expand Up @@ -197,6 +198,7 @@ limitations under the License.
[finch-sprayjson]: https://github.com/finch/finch-sprayjson
[finch-playjson]: https://github.com/finch/finch-playjson
[finch-oauth2]: https://github.com/finch/finch-ouath2
[fs2]: https://github.com/functional-streams-for-scala/fs2
[server-sent-events]: https://en.wikipedia.org/wiki/Server-sent_events
[vkostyukov]: https://twitter.com/vkostyukov
[travisbrown]: https://twitter.com/travisbrown
Expand Down
100 changes: 66 additions & 34 deletions core/src/main/scala/io/finch/Endpoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -890,43 +890,75 @@ object Endpoint {
}

/**
* An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as
* an `S[F, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests.
*/
def streamBinaryBody[F[_], S[_[_], _]](implicit
liftReader: LiftReader[S, F],
F: Effect[F]
): Endpoint[F, S[F, Buf]] = {
new Endpoint[F, S[F, Buf]] {
final def apply(input: Input): Endpoint.Result[F, S[F, Buf]] = {
if (!input.request.isChunked) EndpointResult.NotMatched[F]
else {
val req = input.request
EndpointResult.Matched(
input,
Trace.empty,
F.pure[Output[S[F, Buf]]](Output.payload(liftReader(req.reader)))
)
}
}
* An [[Endpoint]] that matches chunked requests and lifts their content into a generic
* **binary** stream passed as a type parameter. This method, along with other `bodyStream`
* endpoints, are integration points with streaming libraries such as fs2 and iteratee.
*
* {{{
* scala> import io.finch._, io.finch.iteratee._, cats.effect.IO, io.iteratee.Enumerator
*
* scala> val bin = Endpoint[IO].binaryBodyStream[Enumerator]
* bin: Endpoint[IO, Enumerator[IO, Array[Byte]]] = binaryBodyStream
* }}}
*/
def binaryBodyStream[F[_]: Effect, S[_[_], _]](implicit
LR: LiftReader[S, F]
): Endpoint[F, S[F, Array[Byte]]] = new BinaryBodyStream[F, S]

final override def item: RequestItem = items.BodyItem
final override def toString: String = "streamBinaryBody"
}
}
/**
* An [[Endpoint]] that matches chunked requests and lifts their content into a generic
* **string** stream passed as a type parameter. This method, along with other `bodyStream`
* endpoints, are integration points with streaming libraries such as fs2 and iteratee.
*
* {{{
* scala> import io.finch._, io.finch.iteratee._, cats.effect.IO, io.iteratee.Enumerator
*
* scala> val bin = Endpoint[IO].stringBodyStream[Enumerator]
* bin: Endpoint[IO, Enumerator[IO, String]] = stringBodyStream
* }}}
*/
def stringBodyStream[F[_]: Effect, S[_[_], _]](implicit
LR: LiftReader[S, F]
): Endpoint[F, S[F, String]] = new StringBodyStream[F, S]

def streamJsonBody[F[_], S[_[_], _], A](implicit
streamDecoder: DecodeStream.Aux[S, F, A, Application.Json],
liftReader: LiftReader[S, F],
F: Effect[F]
): Endpoint[F, S[F, A]] = new Endpoint[F, S[F, A]] {
final def apply(input: Input): Result[F, S[F, A]] = {
streamBinaryBody.apply(input).map(streamDecoder(_, input.request.charsetOrUtf8))
}
/**
* An [[Endpoint]] that matches chunked requests and lifts their content into a generic
* stream passed as a type parameter. This method, along with other `bodyStream`
* endpoints, are integration points with streaming libraries such as fs2 and iteratee.
*
* When, for example, JSON library is import, this endpoint can parse an inbound JSON stream.
*
* {{{
* scala> import io.finch._, io.finch.iteratee._, cats.effect.IO, io.iteratee.Enumerator
*
* scala> import io.finch.circe._, io.circe.generic.auto._
*
* scala> case class Foo(s: String)
final override def item: RequestItem = items.BodyItem
final override def toString: String = "streamJsonBody"
}
* scala> val json = Endpoint[IO].bodyStream[Enumerator, Foo, Application.Json]
* bin: Endpoint[IO, Enumerator[IO, Foo]] = bodyStream
* }}}
*/
def bodyStream[F[_]: Effect, S[_[_], _], A, CT <: String](implicit
LR: LiftReader[S, F],
A: DecodeStream.Aux[S, F, A, CT]
): Endpoint[F, S[F, A]] = new BodyStream[F, S, A, CT]

/**
* See [[bodyStream]]. This is just an alias for `bodyStream[?, ?, Application.Json]`.
*/
def jsonBodyStream[F[_]: Effect, S[_[_], _], A](implicit
LR: LiftReader[S, F],
A: DecodeStream.Aux[S, F, A, Application.Json]
) : Endpoint[F, S[F, A]] = bodyStream[F, S, A, Application.Json]

/**
* See [[bodyStream]]. This is just an alias for `bodyStream[?, ?, Text.Plain]`.
*/
def textBodyStream[F[_]: Effect, S[_[_], _], A](implicit
LR: LiftReader[S, F],
A: DecodeStream.Aux[S, F, A, Text.Plain]
): Endpoint[F, S[F, A]] = bodyStream[F, S, A, Text.Plain]

/**
* An evaluating [[Endpoint]] that reads an optional HTTP cookie from the request into an
Expand Down
56 changes: 41 additions & 15 deletions core/src/main/scala/io/finch/EndpointModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -290,21 +290,47 @@ trait EndpointModule[F[_]] {
Endpoint.asyncBody[F]

/**
* An alias for [[Endpoint.streamBinaryBody]]
*/
def streamBinaryBody[S[_[_], _], A, CT <: String](implicit
liftReader: LiftReader[S, F],
F: Effect[F]
): Endpoint[F, S[F, Buf]] = Endpoint.streamBinaryBody[F, S]

/**
* An alias for [[Endpoint.streamJsonBody]]
*/
def streamJsonBody[S[_[_], _], A](implicit
decoder: DecodeStream.Aux[S, F, A, Application.Json],
liftReader: LiftReader[S, F],
F: Effect[F]
): Endpoint[F, S[F, A]] = Endpoint.streamJsonBody[F, S, A]
* An alias for [[Endpoint.binaryBodyStream]].
*/
def binaryBodyStream[S[_[_], _]](implicit
F: Effect[F],
LR: LiftReader[S, F]
): Endpoint[F, S[F, Array[Byte]]] = Endpoint.binaryBodyStream[F, S]

/**
* An alias for [[Endpoint.stringBodyStream]].
*/
def stringBodyStream[S[_[_], _]](implicit
F: Effect[F],
LR: LiftReader[S, F]
): Endpoint[F, S[F, String]] = Endpoint.stringBodyStream[F, S]

/**
* An alias for [[Endpoint.bodyStream]].
*/
def bodyStream[S[_[_], _], A, CT <: String](implicit
F: Effect[F],
LR: LiftReader[S, F],
A: DecodeStream.Aux[S, F, A, CT]
): Endpoint[F, S[F, A]] = Endpoint.bodyStream[F, S, A, CT]

/**
* An alias for [[Endpoint.jsonBodyStream]].
*/
def jsonBodyStream[S[_[_], _], A](implicit
F: Effect[F],
LR: LiftReader[S, F],
A: DecodeStream.Aux[S, F, A, Application.Json]
): Endpoint[F, S[F, A]] = Endpoint.jsonBodyStream[F, S, A]

/**
* An alias for [[Endpoint.textBodyStream]].
*/
def textBodyStream[S[_[_], _], A](implicit
F: Effect[F],
LR: LiftReader[S, F],
A: DecodeStream.Aux[S, F, A, Text.Plain]
): Endpoint[F, S[F, A]] = Endpoint.textBodyStream[F, S, A]

/**
* An alias for [[Endpoint.cookieOption]].
Expand Down
12 changes: 1 addition & 11 deletions core/src/main/scala/io/finch/EndpointResult.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.finch

import cats.{Functor, Id}
import cats.Id
import cats.effect.Effect
import com.twitter.finagle.http.Method
import com.twitter.util._
Expand Down Expand Up @@ -45,11 +45,6 @@ sealed abstract class EndpointResult[F[_], +A] {
case _ => None
}

final def map[B](fn: A => B)(implicit F: Effect[F]): EndpointResult[F, B] = this match {
case EndpointResult.Matched(rem, trc, o) => EndpointResult.Matched(rem, trc, F.map(o)(_.map(fn)))
case n: EndpointResult.NotMatched[F] => n
}

def awaitOutput(d: Duration = Duration.Inf)(implicit F: Effect[F]): Option[Either[Throwable, Output[A]]] = this match {
case EndpointResult.Matched(_, _, out) =>
try {
Expand Down Expand Up @@ -105,9 +100,4 @@ object EndpointResult {
case _ => None
}
}

implicit def endpointResultInstances[F[_] : Effect]: Functor[EndpointResult[F, ?]] =
new Functor[EndpointResult[F, ?]] {
def map[A, B](fa: EndpointResult[F, A])(f: A => B): EndpointResult[F, B] = fa.map(f)
}
}
10 changes: 3 additions & 7 deletions core/src/main/scala/io/finch/LiftReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@ package io.finch
import com.twitter.io.{Buf, Reader}

/**
* Create stream S[F, Buf] from com.twitter.io.Reader[Buf]
* Create stream `S[F, A]` from [[Reader]].
*/
trait LiftReader[S[_[_], _], F[_]] {

def apply(reader: Reader[Buf]): S[F, Buf]
}
final def apply(reader: Reader[Buf]): S[F, Buf] = apply(reader, identity)

object LiftReader {
def instance[S[_[_], _], F[_]](fn: Reader[Buf] => S[F, Buf]): LiftReader[S, F] = new LiftReader[S, F] {
def apply(reader: Reader[Buf]): S[F, Buf] = fn(reader)
}
def apply[A](reader: Reader[Buf], process: Buf => A): S[F, A]
}
60 changes: 58 additions & 2 deletions core/src/main/scala/io/finch/endpoint/body.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package io.finch.endpoint

import cats.effect.Effect
import com.twitter.io.Buf
import com.twitter.io.{Buf, Reader}
import io.finch._
import io.finch.internal._
import io.finch.items._
import java.nio.charset.Charset
import java.nio.charset.{Charset, StandardCharsets}
import scala.reflect.ClassTag

private[finch] abstract class FullBody[F[_], A] extends Endpoint[F, A] {
Expand Down Expand Up @@ -83,3 +83,59 @@ private[finch] abstract class StringBody[F[_], A](implicit protected val F: Effe

final override def toString: String = "stringBody"
}

private[finch] abstract class ChunkedBody[F[_], S[_[_], _], A] extends Endpoint[F, S[F, A]] {

protected def F: Effect[F]
protected def prepare(r: Reader[Buf], cs: Charset): Output[S[F, A]]

final def apply(input: Input): EndpointResult[F, S[F, A]] =
if (!input.request.isChunked) EndpointResult.NotMatched[F]
else EndpointResult.Matched(
input,
Trace.empty,
F.delay(prepare(input.request.reader, input.request.charsetOrUtf8))
)

final override def item: RequestItem = items.BodyItem
}

private[finch] final class BinaryBodyStream[F[_], S[_[_], _]](implicit
LR: LiftReader[S, F],
protected val F: Effect[F]
) extends ChunkedBody[F, S, Array[Byte]] with (Buf => Array[Byte]) {

def apply(buf: Buf): Array[Byte] = buf.asByteArray

protected def prepare(r: Reader[Buf], cs: Charset): Output[S[F, Array[Byte]]] =
Output.payload(LR(r, this))

override def toString: String = "binaryBodyStream"
}

private[finch] final class StringBodyStream[F[_], S[_[_], _]](implicit
LR: LiftReader[S, F],
protected val F: Effect[F]
) extends ChunkedBody[F, S, String] with (Buf => String) {

def apply(buf: Buf): String = buf.asString(StandardCharsets.UTF_8)

protected def prepare(r: Reader[Buf], cs: Charset): Output[S[F, String]] = cs match {
case StandardCharsets.UTF_8 => Output.payload(LR(r, this))
case _ => Output.payload(LR(r, _.asString(cs)))
}

override def toString: String = "stringBodyStream"
}

private[finch] final class BodyStream[F[_], S[_[_], _], A, CT <: String](implicit
protected val F: Effect[F],
LR: LiftReader[S, F],
A: DecodeStream.Aux[S, F, A, CT]
) extends ChunkedBody[F, S, A] {

protected def prepare(r: Reader[Buf], cs: Charset): Output[S[F, A]] =
Output.payload(A(LR(r), cs))

override def toString: String = "bodyStream"
}
13 changes: 0 additions & 13 deletions core/src/test/scala/io/finch/EndpointResultSpec.scala

This file was deleted.

12 changes: 6 additions & 6 deletions core/src/test/scala/io/finch/StreamingLaws.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ abstract class StreamingLaws[S[_[_], _], F[_] : Effect] extends Laws with AllIns

def toResponse: ToResponse[S[F, Buf]]
def fromList: List[Buf] => S[F, Buf]
def toList: S[F, Buf] => List[Buf]
def toList: S[F, Array[Byte]] => List[Buf]

def roundTrip(a: List[Buf], cs: Charset): IsEq[List[Buf]] = {
val req = Request()
Expand All @@ -26,16 +26,16 @@ abstract class StreamingLaws[S[_[_], _], F[_] : Effect] extends Laws with AllIns
Reader.copy(toResponse(fromList(a), cs).reader, req.writer).ensure(req.writer.close())

Endpoint
.streamBinaryBody[F, S]
.binaryBodyStream[F, S]
.apply(Input.fromRequest(req))
.awaitValueUnsafe()
.map(toList)
.get <-> a
}

def onlyChunked: EndpointResult[F, S[F, Buf]] = {
def onlyChunked: EndpointResult[F, S[F, Array[Byte]]] = {
Endpoint
.streamBinaryBody[F, S]
.binaryBodyStream[F, S]
.apply(Input.fromRequest(Request()))
}

Expand All @@ -55,15 +55,15 @@ object StreamingLaws {

def apply[S[_[_], _], F[_] : Effect](
streamFromList: List[Buf] => S[F, Buf],
listFromStream: S[F, Buf] => List[Buf]
listFromStream: S[F, Array[Byte]] => List[Buf]
)(implicit
tr: ToResponse.Aux[S[F, Buf], Text.Plain],
reader: LiftReader[S, F]
): StreamingLaws[S, F] = new StreamingLaws[S, F] {
implicit val streamReader: LiftReader[S, F] = reader
val toResponse: ToResponse[S[F, Buf]] = tr
val fromList: List[Buf] => S[F, Buf] = streamFromList
val toList: S[F, Buf] => List[Buf] = listFromStream
val toList: S[F, Array[Byte]] => List[Buf] = listFromStream
}

}
4 changes: 2 additions & 2 deletions examples/src/main/scala/io/finch/iteratee/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object Main extends EndpointModule[IO] {

private def stream: Stream[Int] = Stream.continually(Random.nextInt())

val sumJson: Endpoint[IO, Result] = post("sumJson" :: streamJsonBody[Enumerator, Number]) {
val sumJson: Endpoint[IO, Result] = post("sumJson" :: jsonBodyStream[Enumerator, Number]) {
enum: Enumerator[IO, Number] =>
enum.into(Iteratee.fold[IO, Number, Result](Result(0))(_ add _)).map(Ok)
}
Expand All @@ -59,7 +59,7 @@ object Main extends EndpointModule[IO] {
}

val isPrime: Endpoint[IO, Enumerator[IO, IsPrime]] =
post("streamPrime" :: streamJsonBody[Enumerator, Number]) { enum: Enumerator[IO, Number] =>
post("streamPrime" :: jsonBodyStream[Enumerator, Number]) { enum: Enumerator[IO, Number] =>
Ok(enum.map(_.isPrime))
}

Expand Down
Loading

0 comments on commit e618b8e

Please sign in to comment.