Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract LambdaApp interface #2794

Merged
merged 9 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,39 +1,22 @@
package weco.pipeline.relation_embedder

import com.amazonaws.services.lambda.runtime.{Context, RequestHandler}
import grizzled.slf4j.Logging
import com.amazonaws.services.lambda.runtime.events.SQSEvent
import org.apache.pekko.actor.ActorSystem
import weco.pipeline.relation_embedder.lib._

import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.Future

object LambdaMain
extends RequestHandler[SQSEvent, String]
with Logging
extends LambdaApp[SQSEvent, String, RelationEmbedderConfig]
with RelationEmbedderConfigurable {

import SQSEventOps._
private lazy val batchProcessor = BatchProcessor(config)

override def handleRequest(
event: SQSEvent,
context: Context
): String = {
implicit val actorSystem: ActorSystem =
ActorSystem("main-actor-system")
implicit val ec: ExecutionContext =
actorSystem.dispatcher
val batchProcessor = BatchProcessor(config)

def processEvent(event: SQSEvent): Future[String] = {
info(s"running relation_embedder lambda, got event: $event")

// Wait here so that lambda can finish executing correctly.
// 15 minutes is the maximum time allowed for a lambda to run.
Await.result(
Future.sequence(event.extractBatches.map(batchProcessor(_))),
15.minutes
)
"Done"
Future
.sequence(event.extractBatches.map(batchProcessor(_)))
.map(_ => "Done")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package weco.pipeline.relation_embedder.lib

import com.amazonaws.services.lambda.runtime.{Context, RequestHandler}
import grizzled.slf4j.Logging
import org.apache.pekko.actor.ActorSystem

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.{DurationInt, FiniteDuration}

trait LambdaApp[In, Out, Config <: ApplicationConfig]
extends RequestHandler[In, Out]
with LambdaConfigurable[Config]
with Logging {

// 15 minutes is the maximum time allowed for a lambda to run, as of 2024-12-19
protected val maximumExecutionTime: FiniteDuration = 15.minutes

implicit val actorSystem: ActorSystem =
ActorSystem("main-actor-system")
implicit val ec: ExecutionContext =
actorSystem.dispatcher

def processEvent(in: In): Future[Out]

override def handleRequest(
event: In,
context: Context
): Out = Await.result(
processEvent(event),
maximumExecutionTime
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
configString=knownConfigValue
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package weco.pipeline.relation_embedder.lib

import com.typesafe.config.Config
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import weco.fixtures.RandomGenerators
import weco.pipeline.relation_embedder.helpers.ConfigurationTestHelpers

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.Future

class LambdaAppTest
extends AnyFunSpec
with ConfigurationTestHelpers
with RandomGenerators
with Matchers {

// This value is from application.conf in test resources
val configString = "knownConfigValue"

case class TestLambdaAppConfiguration(configString: String)
extends ApplicationConfig
class TestLambdaApp
extends LambdaApp[String, String, TestLambdaAppConfiguration] {
override protected val maximumExecutionTime: FiniteDuration = 200.millis

// Config is available in this scope
lazy val configString: String = config.configString

// Function to process an event is required, and should return a Future
override def processEvent(event: String): Future[String] =
Future.successful(event + configString)

// Function to convert typesafe config to application config is required
override def build(rawConfig: Config): TestLambdaAppConfiguration =
TestLambdaAppConfiguration(
configString = rawConfig.getString("configString")
)
}

it(
"creates a lambda app with a config, and allows execution of a processEvent function"
) {
val lambdaApp = new TestLambdaApp()
val eventString = randomAlphanumeric()

lambdaApp.handleRequest(
eventString,
null
) shouldBe eventString + configString
}

class FailingTestLambdaApp extends TestLambdaApp {
override def processEvent(event: String): Future[String] =
Future.failed(new Throwable("Failed"))
}

it("fails if the processEvent function fails") {
val lambdaApp = new FailingTestLambdaApp()
val eventString = randomAlphanumeric()

a[Throwable] shouldBe thrownBy {
lambdaApp.handleRequest(eventString, null)
}
}

class SleepingTestLambdaApp extends TestLambdaApp {
override def processEvent(event: String): Future[String] = Future {
Thread.sleep(500)
event + configString
}
}

it("fails if the processEvent function takes too long") {
val lambdaApp = new SleepingTestLambdaApp()
val eventString = randomAlphanumeric()

a[Throwable] shouldBe thrownBy {
lambdaApp.handleRequest(eventString, null)
}
}
}
Loading