From c55ad24f07bc33e8c1a9b6940360ae18b20ff923 Mon Sep 17 00:00:00 2001 From: Robert Kenny Date: Mon, 6 Jan 2025 11:13:48 +0000 Subject: [PATCH] Extract LambdaApp interface (#2794) * split event from flow * push decoding message up a level * define strict config layer (keep typesafe at the edge) * Apply auto-formatting rules * update tests * add config test * add an abstraction for lamdba apps * add tests for lambdaapp --------- Co-authored-by: Github on behalf of Wellcome Collection --- .../relation_embedder/LambdaMain.scala | 31 ++----- .../relation_embedder/lib/LambdaApp.scala | 32 ++++++++ .../src/test/resources/application.conf | 1 + .../relation_embedder/lib/LambdaAppTest.scala | 82 +++++++++++++++++++ 4 files changed, 122 insertions(+), 24 deletions(-) create mode 100644 pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaApp.scala create mode 100644 pipeline/relation_embedder/relation_embedder/src/test/resources/application.conf create mode 100644 pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/lib/LambdaAppTest.scala diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/LambdaMain.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/LambdaMain.scala index 3cc50a0a52..34e2a2c0d8 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/LambdaMain.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/LambdaMain.scala @@ -1,39 +1,22 @@ package weco.pipeline.relation_embedder -import com.amazonaws.services.lambda.runtime.{Context, RequestHandler} -import grizzled.slf4j.Logging import com.amazonaws.services.lambda.runtime.events.SQSEvent -import org.apache.pekko.actor.ActorSystem import weco.pipeline.relation_embedder.lib._ -import scala.concurrent.duration.DurationInt -import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.Future object LambdaMain - extends RequestHandler[SQSEvent, String] - with Logging + extends LambdaApp[SQSEvent, String, RelationEmbedderConfig] with RelationEmbedderConfigurable { import SQSEventOps._ + private lazy val batchProcessor = BatchProcessor(config) - override def handleRequest( - event: SQSEvent, - context: Context - ): String = { - implicit val actorSystem: ActorSystem = - ActorSystem("main-actor-system") - implicit val ec: ExecutionContext = - actorSystem.dispatcher - val batchProcessor = BatchProcessor(config) - + def processEvent(event: SQSEvent): Future[String] = { info(s"running relation_embedder lambda, got event: $event") - // Wait here so that lambda can finish executing correctly. - // 15 minutes is the maximum time allowed for a lambda to run. - Await.result( - Future.sequence(event.extractBatches.map(batchProcessor(_))), - 15.minutes - ) - "Done" + Future + .sequence(event.extractBatches.map(batchProcessor(_))) + .map(_ => "Done") } } diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaApp.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaApp.scala new file mode 100644 index 0000000000..6f0b085383 --- /dev/null +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaApp.scala @@ -0,0 +1,32 @@ +package weco.pipeline.relation_embedder.lib + +import com.amazonaws.services.lambda.runtime.{Context, RequestHandler} +import grizzled.slf4j.Logging +import org.apache.pekko.actor.ActorSystem + +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration.{DurationInt, FiniteDuration} + +trait LambdaApp[In, Out, Config <: ApplicationConfig] + extends RequestHandler[In, Out] + with LambdaConfigurable[Config] + with Logging { + + // 15 minutes is the maximum time allowed for a lambda to run, as of 2024-12-19 + protected val maximumExecutionTime: FiniteDuration = 15.minutes + + implicit val actorSystem: ActorSystem = + ActorSystem("main-actor-system") + implicit val ec: ExecutionContext = + actorSystem.dispatcher + + def processEvent(in: In): Future[Out] + + override def handleRequest( + event: In, + context: Context + ): Out = Await.result( + processEvent(event), + maximumExecutionTime + ) +} diff --git a/pipeline/relation_embedder/relation_embedder/src/test/resources/application.conf b/pipeline/relation_embedder/relation_embedder/src/test/resources/application.conf new file mode 100644 index 0000000000..f789c33843 --- /dev/null +++ b/pipeline/relation_embedder/relation_embedder/src/test/resources/application.conf @@ -0,0 +1 @@ +configString=knownConfigValue diff --git a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/lib/LambdaAppTest.scala b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/lib/LambdaAppTest.scala new file mode 100644 index 0000000000..87c35ecc8f --- /dev/null +++ b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/lib/LambdaAppTest.scala @@ -0,0 +1,82 @@ +package weco.pipeline.relation_embedder.lib + +import com.typesafe.config.Config +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers +import weco.fixtures.RandomGenerators +import weco.pipeline.relation_embedder.helpers.ConfigurationTestHelpers + +import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.concurrent.Future + +class LambdaAppTest + extends AnyFunSpec + with ConfigurationTestHelpers + with RandomGenerators + with Matchers { + + // This value is from application.conf in test resources + val configString = "knownConfigValue" + + case class TestLambdaAppConfiguration(configString: String) + extends ApplicationConfig + class TestLambdaApp + extends LambdaApp[String, String, TestLambdaAppConfiguration] { + override protected val maximumExecutionTime: FiniteDuration = 200.millis + + // Config is available in this scope + lazy val configString: String = config.configString + + // Function to process an event is required, and should return a Future + override def processEvent(event: String): Future[String] = + Future.successful(event + configString) + + // Function to convert typesafe config to application config is required + override def build(rawConfig: Config): TestLambdaAppConfiguration = + TestLambdaAppConfiguration( + configString = rawConfig.getString("configString") + ) + } + + it( + "creates a lambda app with a config, and allows execution of a processEvent function" + ) { + val lambdaApp = new TestLambdaApp() + val eventString = randomAlphanumeric() + + lambdaApp.handleRequest( + eventString, + null + ) shouldBe eventString + configString + } + + class FailingTestLambdaApp extends TestLambdaApp { + override def processEvent(event: String): Future[String] = + Future.failed(new Throwable("Failed")) + } + + it("fails if the processEvent function fails") { + val lambdaApp = new FailingTestLambdaApp() + val eventString = randomAlphanumeric() + + a[Throwable] shouldBe thrownBy { + lambdaApp.handleRequest(eventString, null) + } + } + + class SleepingTestLambdaApp extends TestLambdaApp { + override def processEvent(event: String): Future[String] = Future { + Thread.sleep(500) + event + configString + } + } + + it("fails if the processEvent function takes too long") { + val lambdaApp = new SleepingTestLambdaApp() + val eventString = randomAlphanumeric() + + a[Throwable] shouldBe thrownBy { + lambdaApp.handleRequest(eventString, null) + } + } +}