Skip to content

Commit

Permalink
Generalise lambda functionality into module (#2795)
Browse files Browse the repository at this point in the history
* split event from flow

* push decoding message up a level

* define strict config layer (keep typesafe at the edge)

* Apply auto-formatting rules

* update tests

* add config test

* add an abstraction for lamdba apps

* add tests for lambdaapp

* add common lib

* use lambda lib in relation_embedder

* shift tests to lambda lib

* use lambda lib in batcher

* fix tests, move sqseventops test

* add lambda module to ci tests

* self-linting

* ensure lib folders are added

* fix gitignore config

* Apply auto-formatting rules

* fix tests

* rejig lambda app to account for type erasure bug :(

* Apply auto-formatting rules

* use should matchers to be consistent

* update after trying to run locally

* Apply auto-formatting rules

* remaining merge compile issues

---------

Co-authored-by: Github on behalf of Wellcome Collection <[email protected]>
  • Loading branch information
kenoir and weco-bot authored Jan 6, 2025
1 parent 5a4d47c commit c3e21d0
Show file tree
Hide file tree
Showing 38 changed files with 479 additions and 323 deletions.
1 change: 1 addition & 0 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ steps:

matrix:
- "internal_model"
- "lambda"
- "display_model"
- "source_model"
- "pipeline_storage"
Expand Down
9 changes: 1 addition & 8 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -177,22 +177,15 @@ __pycache__/

# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
target/

# PyInstaller
# Usually these files are written by a python script from a template
Expand Down
12 changes: 8 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ lazy val display_model = setupProject(
externalDependencies = CatalogueDependencies.displayModelDependencies
)

lazy val lambda = setupProject(
project,
"common/lambda",
externalDependencies = CatalogueDependencies.lambdaDependencies
)

lazy val flows = setupProject(
project,
"common/flows",
Expand Down Expand Up @@ -135,8 +141,7 @@ lazy val path_concatenator = setupProject(
lazy val relation_embedder = setupProject(
project,
"pipeline/relation_embedder/relation_embedder",
localDependencies = Seq(internal_model, pipeline_storage_typesafe),
externalDependencies = CatalogueDependencies.relationEmbedderDependencies
localDependencies = Seq(internal_model, pipeline_storage_typesafe, lambda)
)

lazy val router = setupProject(
Expand All @@ -149,8 +154,7 @@ lazy val router = setupProject(
lazy val batcher = setupProject(
project,
"pipeline/relation_embedder/batcher",
localDependencies = Nil,
externalDependencies = CatalogueDependencies.batcherDependencies
localDependencies = Seq(lambda)
)

lazy val reindex_worker = setupProject(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,34 @@
package weco.pipeline.relation_embedder.lib
package weco.lambda

import io.circe.Encoder
import software.amazon.awssdk.services.sns.SnsClient
import weco.messaging.sns.{SNSConfig, SNSMessageSender}
import weco.json.JsonUtil.toJson

import scala.util.Try

trait Downstream {
def notify(workId: String): Try[Unit]
def notify[T](batch: T)(implicit encoder: Encoder[T]): Try[Unit]
}

class SNSDownstream(snsConfig: SNSConfig) extends Downstream {
private val msgSender = new SNSMessageSender(
protected val msgSender = new SNSMessageSender(
snsClient = SnsClient.builder().build(),
snsConfig = snsConfig,
subject = "Sent from relation_embedder"
)

override def notify(workId: String): Try[Unit] = Try(msgSender.send(workId))
override def notify[T](batch: T)(implicit encoder: Encoder[T]): Try[Unit] =
msgSender.sendT(batch)
}

object STDIODownstream extends Downstream {
override def notify(workId: String): Try[Unit] = Try(println(workId))
override def notify[T](t: T)(implicit encoder: Encoder[T]): Try[Unit] = Try(
println(toJson(t))
)
}

sealed trait DownstreamTarget
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package weco.pipeline.relation_embedder.lib
package weco.lambda

import com.sksamuel.elastic4s.ElasticClient
import com.typesafe.config.Config
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package weco.pipeline.relation_embedder.lib
package weco.lambda

import java.io.File
import com.typesafe.config.{Config, ConfigFactory}

import java.io.File

trait ApplicationConfig {}

trait ConfigurationBuilder[C, T <: ApplicationConfig] {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package weco.pipeline.relation_embedder.lib
package weco.lambda

import com.amazonaws.services.lambda.runtime.events.SQSEvent
import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage
import io.circe.Decoder
import ujson.Value
import weco.json.JsonUtil.fromJson
import weco.pipeline.relation_embedder.models.Batch
import weco.json.JsonUtil._

import scala.collection.JavaConverters._
import scala.reflect.ClassTag

object SQSEventOps {

Expand All @@ -21,13 +21,21 @@ object SQSEventOps {
* - an SNS notification containing
* - a `Message`, which is the actual content we want
*/
implicit class ExtractBatchFromSqsEvent(event: SQSEvent) {
def extractBatches: List[Batch] =
event.getRecords.asScala.toList.flatMap(extractBatchFromMessage)
implicit class ExtractTFromSqsEvent(event: SQSEvent) {
def extract[T]()(implicit decoder: Decoder[T], ct: ClassTag[T]) =
event.getRecords.asScala.toList.flatMap(extractFromMessage[T](_))

private def extractBatchFromMessage(message: SQSMessage): Option[Batch] =
private def extractFromMessage[T](
message: SQSMessage
)(implicit decoder: Decoder[T], ct: ClassTag[T]): Option[T] =
ujson.read(message.getBody).obj.get("Message").flatMap {
value: Value => fromJson[Batch](value.str).toOption
value: Value =>
{
ct.runtimeClass match {
case c if c == classOf[String] => Some(value.str.asInstanceOf[T])
case _ => fromJson[T](value.str).toOption
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
package weco.pipeline.relation_embedder.lib
package weco.lambda

import com.amazonaws.services.lambda.runtime.events.SQSEvent
import com.amazonaws.services.lambda.runtime.{Context, RequestHandler}
import grizzled.slf4j.Logging
import io.circe.Decoder
import org.apache.pekko.actor.ActorSystem

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.reflect.ClassTag

trait LambdaApp[In, Out, Config <: ApplicationConfig]
extends RequestHandler[In, Out]
// Unfortunately we can't have an intermediate abstraction here because of an interaction
// of the AWS SDK with the Scala compiler.
// See: https://stackoverflow.com/questions/54098144/aws-lambda-handler-throws-a-classcastexception-with-scala-generics
abstract class SQSLambdaApp[T, Out, Config <: ApplicationConfig]()(
implicit val decoder: Decoder[T],
val ct: ClassTag[T]
) extends RequestHandler[SQSEvent, Out]
with LambdaConfigurable[Config]
with Logging {

Expand All @@ -20,13 +28,15 @@ trait LambdaApp[In, Out, Config <: ApplicationConfig]
implicit val ec: ExecutionContext =
actorSystem.dispatcher

def processEvent(in: In): Future[Out]
import weco.lambda.SQSEventOps._

def processT(t: List[T]): Future[Out]

override def handleRequest(
event: In,
event: SQSEvent,
context: Context
): Out = Await.result(
processEvent(event),
processT(event.extract[T]),
maximumExecutionTime
)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package weco.pipeline.relation_embedder.lib
package weco.lambda

import grizzled.slf4j.Logging
import weco.json.JsonUtil._
import weco.pipeline.relation_embedder.models.Batch
import io.circe.Decoder
import weco.json.JsonUtil.fromJson

import scala.io.Source.stdin
import scala.util.{Failure, Success, Try}
Expand All @@ -14,28 +15,27 @@ import scala.util.{Failure, Success, Try}
*/

trait StdInNDJSON[T] extends Logging {
protected def jsonToInstance(str: String): Try[T]
private def jsonToInstance(jsonString: String)(
implicit decoder: Decoder[T]
): Try[T] =
fromJson[T](jsonString)

private val stdInStrings: Iterator[String] = stdin.getLines()

private def toInstance(jsonString: String): Option[T] =
private def toInstance(
jsonString: String
)(implicit decoder: Decoder[T]): Option[T] =
jsonToInstance(jsonString) match {
case Failure(exception) =>
error(exception.getMessage)
None
case Success(value) => Some(value)
}

protected val instances: Iterator[T] =
protected def instances(implicit decoder: Decoder[T]): Iterator[T] =
stdInStrings
.flatMap(
toInstance
toInstance(_)
)

}

trait StdInBatches extends StdInNDJSON[Batch] {
def jsonToInstance(jsonString: String): Try[Batch] =
fromJson[Batch](jsonString)

protected val batches: Iterator[Batch] = instances
}
1 change: 1 addition & 0 deletions common/lambda/src/test/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
configString=knownConfigValue
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package weco.pipeline.relation_embedder.lib
package weco.lambda

import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import weco.pipeline.relation_embedder.helpers.ConfigurationTestHelpers
import weco.lambda.helpers.ConfigurationTestHelpers

class ConfigurationTest
extends AnyFunSpec
Expand Down
68 changes: 68 additions & 0 deletions common/lambda/src/test/scala/weco/lambda/SQSEventOpsTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package weco.lambda

import com.amazonaws.services.lambda.runtime.events.SQSEvent
import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers

import scala.collection.JavaConverters._
import weco.json.JsonUtil._

class SQSEventOpsTest extends AnyFunSpec with Matchers {

import SQSEventOps._

describe("Using the implicit class SQSEventOps") {
it("extracts values from an SQSEvent where the message is a String") {
val fakeMessage = new SQSMessage()
fakeMessage.setBody("{\"Message\":\"A/C\"}")
val fakeSQSEvent = new SQSEvent()
fakeSQSEvent.setRecords(List(fakeMessage).asJava)

val paths = fakeSQSEvent.extract[String]()

paths shouldBe List("A/C")
}

case class TestMessage(value: String)

it("extracts values from an SQSEvent where the message is a JSON object") {
val fakeMessage = new SQSMessage()
fakeMessage.setBody("{\"Message\":\"{\\\"value\\\": \\\"A/C\\\"}\"}")
val fakeSQSEvent = new SQSEvent()
fakeSQSEvent.setRecords(List(fakeMessage).asJava)

val paths = fakeSQSEvent.extract[TestMessage]()

paths shouldBe List(TestMessage("A/C"))
}

it("extracts multiple values from an SQSEvent") {
val fakeMessage1 = new SQSMessage()
fakeMessage1.setBody("{\"Message\":\"A/C\"}")
val fakeMessage2 = new SQSMessage()
fakeMessage2.setBody("{\"Message\":\"A/E\"}")
val fakeSQSEvent = new SQSEvent()
fakeSQSEvent.setRecords(List(fakeMessage1, fakeMessage2).asJava)

val paths = fakeSQSEvent.extract[String]()

paths shouldBe List("A/C", "A/E")
}

it(
"extracts values from an SQSEvent where the message is a JSON object with multiple fields, only taking the ones we want"
) {
val fakeMessage = new SQSMessage()
fakeMessage.setBody(
"{\"Message\":\"{\\\"value\\\": \\\"A/C\\\", \\\"other\\\": \\\"D/E\\\"}\"}"
)
val fakeSQSEvent = new SQSEvent()
fakeSQSEvent.setRecords(List(fakeMessage).asJava)

val paths = fakeSQSEvent.extract[TestMessage]()

paths shouldBe List(TestMessage("A/C"))
}
}
}
55 changes: 55 additions & 0 deletions common/lambda/src/test/scala/weco/lambda/SQSLambdaAppTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package weco.lambda

import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.must.Matchers
import weco.fixtures.RandomGenerators
import weco.lambda.helpers.{ConfigurationTestHelpers, SQSLambdaAppHelpers}

class SQSLambdaAppTest
extends AnyFunSpec
with ConfigurationTestHelpers
with SQSLambdaAppHelpers
with RandomGenerators
with Matchers {

it(
"creates a lambda app with a config, and allows execution of a processEvent function"
) {
val lambdaApp = new TestLambdaApp()
val eventString = randomAlphanumeric()

lambdaApp.handleRequest(createSqsEvent(List(eventString)),
null
) mustBe eventString + expectedConfigString
}

it(
"creates a lambda app with a config, and allows execution of a processEvent function, handling multiple events"
) {
val lambdaApp = new TestLambdaApp()
val eventString1 = randomAlphanumeric()
val eventString2 = randomAlphanumeric()

lambdaApp.handleRequest(createSqsEvent(List(eventString1, eventString2)),
null
) mustBe eventString1 + eventString2 + expectedConfigString
}

it("fails if the processEvent function fails") {
val lambdaApp = new FailingTestLambdaApp()
val eventString = randomAlphanumeric()

a[Throwable] shouldBe thrownBy {
lambdaApp.handleRequest(createSqsEvent(List(eventString)), null)
}
}

it("fails if the processEvent function takes too long") {
val lambdaApp = new SleepingTestLambdaApp()
val eventString = randomAlphanumeric()

a[Throwable] shouldBe thrownBy {
lambdaApp.handleRequest(createSqsEvent(List(eventString)), null)
}
}
}
Loading

0 comments on commit c3e21d0

Please sign in to comment.