Skip to content

Commit

Permalink
Extract LambdaApp interface (#2794)
Browse files Browse the repository at this point in the history
* split event from flow

* push decoding message up a level

* define strict config layer (keep typesafe at the edge)

* Apply auto-formatting rules

* update tests

* add config test

* add an abstraction for lamdba apps

* add tests for lambdaapp

---------

Co-authored-by: Github on behalf of Wellcome Collection <[email protected]>
  • Loading branch information
kenoir and weco-bot authored Jan 6, 2025
1 parent 8a0c5df commit c55ad24
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,39 +1,22 @@
package weco.pipeline.relation_embedder

import com.amazonaws.services.lambda.runtime.{Context, RequestHandler}
import grizzled.slf4j.Logging
import com.amazonaws.services.lambda.runtime.events.SQSEvent
import org.apache.pekko.actor.ActorSystem
import weco.pipeline.relation_embedder.lib._

import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.Future

object LambdaMain
extends RequestHandler[SQSEvent, String]
with Logging
extends LambdaApp[SQSEvent, String, RelationEmbedderConfig]
with RelationEmbedderConfigurable {

import SQSEventOps._
private lazy val batchProcessor = BatchProcessor(config)

override def handleRequest(
event: SQSEvent,
context: Context
): String = {
implicit val actorSystem: ActorSystem =
ActorSystem("main-actor-system")
implicit val ec: ExecutionContext =
actorSystem.dispatcher
val batchProcessor = BatchProcessor(config)

def processEvent(event: SQSEvent): Future[String] = {
info(s"running relation_embedder lambda, got event: $event")

// Wait here so that lambda can finish executing correctly.
// 15 minutes is the maximum time allowed for a lambda to run.
Await.result(
Future.sequence(event.extractBatches.map(batchProcessor(_))),
15.minutes
)
"Done"
Future
.sequence(event.extractBatches.map(batchProcessor(_)))
.map(_ => "Done")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package weco.pipeline.relation_embedder.lib

import com.amazonaws.services.lambda.runtime.{Context, RequestHandler}
import grizzled.slf4j.Logging
import org.apache.pekko.actor.ActorSystem

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.{DurationInt, FiniteDuration}

trait LambdaApp[In, Out, Config <: ApplicationConfig]
extends RequestHandler[In, Out]
with LambdaConfigurable[Config]
with Logging {

// 15 minutes is the maximum time allowed for a lambda to run, as of 2024-12-19
protected val maximumExecutionTime: FiniteDuration = 15.minutes

implicit val actorSystem: ActorSystem =
ActorSystem("main-actor-system")
implicit val ec: ExecutionContext =
actorSystem.dispatcher

def processEvent(in: In): Future[Out]

override def handleRequest(
event: In,
context: Context
): Out = Await.result(
processEvent(event),
maximumExecutionTime
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
configString=knownConfigValue
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package weco.pipeline.relation_embedder.lib

import com.typesafe.config.Config
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import weco.fixtures.RandomGenerators
import weco.pipeline.relation_embedder.helpers.ConfigurationTestHelpers

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.Future

class LambdaAppTest
extends AnyFunSpec
with ConfigurationTestHelpers
with RandomGenerators
with Matchers {

// This value is from application.conf in test resources
val configString = "knownConfigValue"

case class TestLambdaAppConfiguration(configString: String)
extends ApplicationConfig
class TestLambdaApp
extends LambdaApp[String, String, TestLambdaAppConfiguration] {
override protected val maximumExecutionTime: FiniteDuration = 200.millis

// Config is available in this scope
lazy val configString: String = config.configString

// Function to process an event is required, and should return a Future
override def processEvent(event: String): Future[String] =
Future.successful(event + configString)

// Function to convert typesafe config to application config is required
override def build(rawConfig: Config): TestLambdaAppConfiguration =
TestLambdaAppConfiguration(
configString = rawConfig.getString("configString")
)
}

it(
"creates a lambda app with a config, and allows execution of a processEvent function"
) {
val lambdaApp = new TestLambdaApp()
val eventString = randomAlphanumeric()

lambdaApp.handleRequest(
eventString,
null
) shouldBe eventString + configString
}

class FailingTestLambdaApp extends TestLambdaApp {
override def processEvent(event: String): Future[String] =
Future.failed(new Throwable("Failed"))
}

it("fails if the processEvent function fails") {
val lambdaApp = new FailingTestLambdaApp()
val eventString = randomAlphanumeric()

a[Throwable] shouldBe thrownBy {
lambdaApp.handleRequest(eventString, null)
}
}

class SleepingTestLambdaApp extends TestLambdaApp {
override def processEvent(event: String): Future[String] = Future {
Thread.sleep(500)
event + configString
}
}

it("fails if the processEvent function takes too long") {
val lambdaApp = new SleepingTestLambdaApp()
val eventString = randomAlphanumeric()

a[Throwable] shouldBe thrownBy {
lambdaApp.handleRequest(eventString, null)
}
}
}

0 comments on commit c55ad24

Please sign in to comment.