From c3e21d055116d35d18270fbe1ed12289ed5ea4c6 Mon Sep 17 00:00:00 2001 From: Robert Kenny Date: Mon, 6 Jan 2025 15:42:07 +0000 Subject: [PATCH] Generalise lambda functionality into module (#2795) * split event from flow * push decoding message up a level * define strict config layer (keep typesafe at the edge) * Apply auto-formatting rules * update tests * add config test * add an abstraction for lamdba apps * add tests for lambdaapp * add common lib * use lambda lib in relation_embedder * shift tests to lambda lib * use lambda lib in batcher * fix tests, move sqseventops test * add lambda module to ci tests * self-linting * ensure lib folders are added * fix gitignore config * Apply auto-formatting rules * fix tests * rejig lambda app to account for type erasure bug :( * Apply auto-formatting rules * use should matchers to be consistent * update after trying to run locally * Apply auto-formatting rules * remaining merge compile issues --------- Co-authored-by: Github on behalf of Wellcome Collection --- .buildkite/pipeline.yml | 1 + .gitignore | 9 +- build.sbt | 12 ++- .../main/scala/weco/lambda}/Downstream.scala | 12 ++- .../scala/weco/lambda}/ElasticBuilder.scala | 2 +- .../weco/lambda}/LambdaConfiguration.scala | 5 +- .../main/scala/weco/lambda}/SQSEventOps.scala | 24 ++++-- .../main/scala/weco/lambda/SQSLambdaApp.scala | 24 ++++-- .../scala/weco/lambda}/StdInBatches.scala | 28 +++---- .../src/test/resources/application.conf | 1 + .../weco/lambda}/ConfigurationTest.scala | 4 +- .../scala/weco/lambda/SQSEventOpsTest.scala | 68 +++++++++++++++ .../scala/weco/lambda/SQSLambdaAppTest.scala | 55 +++++++++++++ .../helpers/ConfigurationTestHelpers.scala | 7 +- .../lambda/helpers/SQSLambdaAppHelpers.scala | 55 +++++++++++++ ebsco_adapter/ebsco_indexer/.gitignore | 18 ++++ .../batcher/local.docker-compose.yml | 16 ++++ .../batcher/scripts/run_local.sh | 40 +++++++++ .../batcher/BatcherWorkerService.scala | 14 +--- .../scala/weco/pipeline/batcher/CLIMain.scala | 24 +++--- .../weco/pipeline/batcher/Downstream.scala | 11 --- .../weco/pipeline/batcher/LambdaMain.scala | 59 +++---------- .../scala/weco/pipeline/batcher/Main.scala | 13 +-- .../pipeline/batcher/PathsProcessor.scala | 26 +++++- .../pipeline/batcher/lib/BatcherConfig.scala | 31 +++++++ .../pipeline/batcher/lib/SQSEventOps.scala | 26 ------ .../batcher/BatcherWorkerServiceTest.scala | 34 +++++--- .../pipeline/batcher/SQSEventOpsTest.scala | 24 ------ .../relation_embedder/scripts/batches.txt | 6 +- .../relation_embedder/scripts/run_local.sh | 6 +- .../relation_embedder/BatchProcessor.scala | 4 +- .../pipeline/relation_embedder/CLIMain.scala | 7 +- .../relation_embedder/LambdaMain.scala | 13 +-- .../lib/RelationEmbedderConfig.scala | 3 +- .../BatchProcessorTest.scala | 5 +- .../RelationEmbedderWorkerServiceTest.scala | 7 +- .../relation_embedder/lib/LambdaAppTest.scala | 82 ------------------- project/Dependencies.scala | 26 +++--- 38 files changed, 479 insertions(+), 323 deletions(-) rename {pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib => common/lambda/src/main/scala/weco/lambda}/Downstream.scala (70%) rename {pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib => common/lambda/src/main/scala/weco/lambda}/ElasticBuilder.scala (98%) rename {pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib => common/lambda/src/main/scala/weco/lambda}/LambdaConfiguration.scala (96%) rename {pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib => common/lambda/src/main/scala/weco/lambda}/SQSEventOps.scala (55%) rename pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaApp.scala => common/lambda/src/main/scala/weco/lambda/SQSLambdaApp.scala (51%) rename {pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib => common/lambda/src/main/scala/weco/lambda}/StdInBatches.scala (58%) create mode 100644 common/lambda/src/test/resources/application.conf rename {pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/lib => common/lambda/src/test/scala/weco/lambda}/ConfigurationTest.scala (95%) create mode 100644 common/lambda/src/test/scala/weco/lambda/SQSEventOpsTest.scala create mode 100644 common/lambda/src/test/scala/weco/lambda/SQSLambdaAppTest.scala rename {pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder => common/lambda/src/test/scala/weco/lambda}/helpers/ConfigurationTestHelpers.scala (91%) create mode 100644 common/lambda/src/test/scala/weco/lambda/helpers/SQSLambdaAppHelpers.scala create mode 100644 ebsco_adapter/ebsco_indexer/.gitignore create mode 100644 pipeline/relation_embedder/batcher/local.docker-compose.yml create mode 100755 pipeline/relation_embedder/batcher/scripts/run_local.sh delete mode 100644 pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/Downstream.scala create mode 100644 pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/lib/BatcherConfig.scala delete mode 100644 pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/lib/SQSEventOps.scala delete mode 100644 pipeline/relation_embedder/batcher/src/test/scala/weco/pipeline/batcher/SQSEventOpsTest.scala delete mode 100644 pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/lib/LambdaAppTest.scala diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index e1a164aaad..9db2223cb8 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -13,6 +13,7 @@ steps: matrix: - "internal_model" + - "lambda" - "display_model" - "source_model" - "pipeline_storage" diff --git a/.gitignore b/.gitignore index c80ee354e5..2f36a5f5dd 100644 --- a/.gitignore +++ b/.gitignore @@ -177,22 +177,15 @@ __pycache__/ # Distribution / packaging .Python -env/ -build/ -develop-eggs/ dist/ -downloads/ -eggs/ .eggs/ -lib/ -lib64/ -parts/ sdist/ var/ wheels/ *.egg-info/ .installed.cfg *.egg +target/ # PyInstaller # Usually these files are written by a python script from a template diff --git a/build.sbt b/build.sbt index dc5fade8ea..ef1e38eb76 100644 --- a/build.sbt +++ b/build.sbt @@ -50,6 +50,12 @@ lazy val display_model = setupProject( externalDependencies = CatalogueDependencies.displayModelDependencies ) +lazy val lambda = setupProject( + project, + "common/lambda", + externalDependencies = CatalogueDependencies.lambdaDependencies +) + lazy val flows = setupProject( project, "common/flows", @@ -135,8 +141,7 @@ lazy val path_concatenator = setupProject( lazy val relation_embedder = setupProject( project, "pipeline/relation_embedder/relation_embedder", - localDependencies = Seq(internal_model, pipeline_storage_typesafe), - externalDependencies = CatalogueDependencies.relationEmbedderDependencies + localDependencies = Seq(internal_model, pipeline_storage_typesafe, lambda) ) lazy val router = setupProject( @@ -149,8 +154,7 @@ lazy val router = setupProject( lazy val batcher = setupProject( project, "pipeline/relation_embedder/batcher", - localDependencies = Nil, - externalDependencies = CatalogueDependencies.batcherDependencies + localDependencies = Seq(lambda) ) lazy val reindex_worker = setupProject( diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/Downstream.scala b/common/lambda/src/main/scala/weco/lambda/Downstream.scala similarity index 70% rename from pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/Downstream.scala rename to common/lambda/src/main/scala/weco/lambda/Downstream.scala index 564681237a..fed30504d5 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/Downstream.scala +++ b/common/lambda/src/main/scala/weco/lambda/Downstream.scala @@ -1,26 +1,34 @@ -package weco.pipeline.relation_embedder.lib +package weco.lambda +import io.circe.Encoder import software.amazon.awssdk.services.sns.SnsClient import weco.messaging.sns.{SNSConfig, SNSMessageSender} +import weco.json.JsonUtil.toJson import scala.util.Try trait Downstream { def notify(workId: String): Try[Unit] + def notify[T](batch: T)(implicit encoder: Encoder[T]): Try[Unit] } class SNSDownstream(snsConfig: SNSConfig) extends Downstream { - private val msgSender = new SNSMessageSender( + protected val msgSender = new SNSMessageSender( snsClient = SnsClient.builder().build(), snsConfig = snsConfig, subject = "Sent from relation_embedder" ) override def notify(workId: String): Try[Unit] = Try(msgSender.send(workId)) + override def notify[T](batch: T)(implicit encoder: Encoder[T]): Try[Unit] = + msgSender.sendT(batch) } object STDIODownstream extends Downstream { override def notify(workId: String): Try[Unit] = Try(println(workId)) + override def notify[T](t: T)(implicit encoder: Encoder[T]): Try[Unit] = Try( + println(toJson(t)) + ) } sealed trait DownstreamTarget diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/ElasticBuilder.scala b/common/lambda/src/main/scala/weco/lambda/ElasticBuilder.scala similarity index 98% rename from pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/ElasticBuilder.scala rename to common/lambda/src/main/scala/weco/lambda/ElasticBuilder.scala index 0403736648..3edae1b6f0 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/ElasticBuilder.scala +++ b/common/lambda/src/main/scala/weco/lambda/ElasticBuilder.scala @@ -1,4 +1,4 @@ -package weco.pipeline.relation_embedder.lib +package weco.lambda import com.sksamuel.elastic4s.ElasticClient import com.typesafe.config.Config diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaConfiguration.scala b/common/lambda/src/main/scala/weco/lambda/LambdaConfiguration.scala similarity index 96% rename from pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaConfiguration.scala rename to common/lambda/src/main/scala/weco/lambda/LambdaConfiguration.scala index 8d2272e7b2..3c365f0430 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaConfiguration.scala +++ b/common/lambda/src/main/scala/weco/lambda/LambdaConfiguration.scala @@ -1,8 +1,9 @@ -package weco.pipeline.relation_embedder.lib +package weco.lambda -import java.io.File import com.typesafe.config.{Config, ConfigFactory} +import java.io.File + trait ApplicationConfig {} trait ConfigurationBuilder[C, T <: ApplicationConfig] { diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/SQSEventOps.scala b/common/lambda/src/main/scala/weco/lambda/SQSEventOps.scala similarity index 55% rename from pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/SQSEventOps.scala rename to common/lambda/src/main/scala/weco/lambda/SQSEventOps.scala index 638aca9270..aa2bbe2d70 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/SQSEventOps.scala +++ b/common/lambda/src/main/scala/weco/lambda/SQSEventOps.scala @@ -1,13 +1,13 @@ -package weco.pipeline.relation_embedder.lib +package weco.lambda import com.amazonaws.services.lambda.runtime.events.SQSEvent import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage +import io.circe.Decoder import ujson.Value import weco.json.JsonUtil.fromJson -import weco.pipeline.relation_embedder.models.Batch -import weco.json.JsonUtil._ import scala.collection.JavaConverters._ +import scala.reflect.ClassTag object SQSEventOps { @@ -21,13 +21,21 @@ object SQSEventOps { * - an SNS notification containing * - a `Message`, which is the actual content we want */ - implicit class ExtractBatchFromSqsEvent(event: SQSEvent) { - def extractBatches: List[Batch] = - event.getRecords.asScala.toList.flatMap(extractBatchFromMessage) + implicit class ExtractTFromSqsEvent(event: SQSEvent) { + def extract[T]()(implicit decoder: Decoder[T], ct: ClassTag[T]) = + event.getRecords.asScala.toList.flatMap(extractFromMessage[T](_)) - private def extractBatchFromMessage(message: SQSMessage): Option[Batch] = + private def extractFromMessage[T]( + message: SQSMessage + )(implicit decoder: Decoder[T], ct: ClassTag[T]): Option[T] = ujson.read(message.getBody).obj.get("Message").flatMap { - value: Value => fromJson[Batch](value.str).toOption + value: Value => + { + ct.runtimeClass match { + case c if c == classOf[String] => Some(value.str.asInstanceOf[T]) + case _ => fromJson[T](value.str).toOption + } + } } } } diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaApp.scala b/common/lambda/src/main/scala/weco/lambda/SQSLambdaApp.scala similarity index 51% rename from pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaApp.scala rename to common/lambda/src/main/scala/weco/lambda/SQSLambdaApp.scala index 6f0b085383..568dfa039e 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaApp.scala +++ b/common/lambda/src/main/scala/weco/lambda/SQSLambdaApp.scala @@ -1,14 +1,22 @@ -package weco.pipeline.relation_embedder.lib +package weco.lambda +import com.amazonaws.services.lambda.runtime.events.SQSEvent import com.amazonaws.services.lambda.runtime.{Context, RequestHandler} import grizzled.slf4j.Logging +import io.circe.Decoder import org.apache.pekko.actor.ActorSystem -import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.reflect.ClassTag -trait LambdaApp[In, Out, Config <: ApplicationConfig] - extends RequestHandler[In, Out] +// Unfortunately we can't have an intermediate abstraction here because of an interaction +// of the AWS SDK with the Scala compiler. +// See: https://stackoverflow.com/questions/54098144/aws-lambda-handler-throws-a-classcastexception-with-scala-generics +abstract class SQSLambdaApp[T, Out, Config <: ApplicationConfig]()( + implicit val decoder: Decoder[T], + val ct: ClassTag[T] +) extends RequestHandler[SQSEvent, Out] with LambdaConfigurable[Config] with Logging { @@ -20,13 +28,15 @@ trait LambdaApp[In, Out, Config <: ApplicationConfig] implicit val ec: ExecutionContext = actorSystem.dispatcher - def processEvent(in: In): Future[Out] + import weco.lambda.SQSEventOps._ + + def processT(t: List[T]): Future[Out] override def handleRequest( - event: In, + event: SQSEvent, context: Context ): Out = Await.result( - processEvent(event), + processT(event.extract[T]), maximumExecutionTime ) } diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/StdInBatches.scala b/common/lambda/src/main/scala/weco/lambda/StdInBatches.scala similarity index 58% rename from pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/StdInBatches.scala rename to common/lambda/src/main/scala/weco/lambda/StdInBatches.scala index dc8ed9616a..065dff6f59 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/StdInBatches.scala +++ b/common/lambda/src/main/scala/weco/lambda/StdInBatches.scala @@ -1,7 +1,8 @@ -package weco.pipeline.relation_embedder.lib +package weco.lambda + import grizzled.slf4j.Logging -import weco.json.JsonUtil._ -import weco.pipeline.relation_embedder.models.Batch +import io.circe.Decoder +import weco.json.JsonUtil.fromJson import scala.io.Source.stdin import scala.util.{Failure, Success, Try} @@ -14,10 +15,16 @@ import scala.util.{Failure, Success, Try} */ trait StdInNDJSON[T] extends Logging { - protected def jsonToInstance(str: String): Try[T] + private def jsonToInstance(jsonString: String)( + implicit decoder: Decoder[T] + ): Try[T] = + fromJson[T](jsonString) + private val stdInStrings: Iterator[String] = stdin.getLines() - private def toInstance(jsonString: String): Option[T] = + private def toInstance( + jsonString: String + )(implicit decoder: Decoder[T]): Option[T] = jsonToInstance(jsonString) match { case Failure(exception) => error(exception.getMessage) @@ -25,17 +32,10 @@ trait StdInNDJSON[T] extends Logging { case Success(value) => Some(value) } - protected val instances: Iterator[T] = + protected def instances(implicit decoder: Decoder[T]): Iterator[T] = stdInStrings .flatMap( - toInstance + toInstance(_) ) } - -trait StdInBatches extends StdInNDJSON[Batch] { - def jsonToInstance(jsonString: String): Try[Batch] = - fromJson[Batch](jsonString) - - protected val batches: Iterator[Batch] = instances -} diff --git a/common/lambda/src/test/resources/application.conf b/common/lambda/src/test/resources/application.conf new file mode 100644 index 0000000000..f789c33843 --- /dev/null +++ b/common/lambda/src/test/resources/application.conf @@ -0,0 +1 @@ +configString=knownConfigValue diff --git a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/lib/ConfigurationTest.scala b/common/lambda/src/test/scala/weco/lambda/ConfigurationTest.scala similarity index 95% rename from pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/lib/ConfigurationTest.scala rename to common/lambda/src/test/scala/weco/lambda/ConfigurationTest.scala index c723439399..064e7837de 100644 --- a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/lib/ConfigurationTest.scala +++ b/common/lambda/src/test/scala/weco/lambda/ConfigurationTest.scala @@ -1,8 +1,8 @@ -package weco.pipeline.relation_embedder.lib +package weco.lambda import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers -import weco.pipeline.relation_embedder.helpers.ConfigurationTestHelpers +import weco.lambda.helpers.ConfigurationTestHelpers class ConfigurationTest extends AnyFunSpec diff --git a/common/lambda/src/test/scala/weco/lambda/SQSEventOpsTest.scala b/common/lambda/src/test/scala/weco/lambda/SQSEventOpsTest.scala new file mode 100644 index 0000000000..6805242e4f --- /dev/null +++ b/common/lambda/src/test/scala/weco/lambda/SQSEventOpsTest.scala @@ -0,0 +1,68 @@ +package weco.lambda + +import com.amazonaws.services.lambda.runtime.events.SQSEvent +import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers + +import scala.collection.JavaConverters._ +import weco.json.JsonUtil._ + +class SQSEventOpsTest extends AnyFunSpec with Matchers { + + import SQSEventOps._ + + describe("Using the implicit class SQSEventOps") { + it("extracts values from an SQSEvent where the message is a String") { + val fakeMessage = new SQSMessage() + fakeMessage.setBody("{\"Message\":\"A/C\"}") + val fakeSQSEvent = new SQSEvent() + fakeSQSEvent.setRecords(List(fakeMessage).asJava) + + val paths = fakeSQSEvent.extract[String]() + + paths shouldBe List("A/C") + } + + case class TestMessage(value: String) + + it("extracts values from an SQSEvent where the message is a JSON object") { + val fakeMessage = new SQSMessage() + fakeMessage.setBody("{\"Message\":\"{\\\"value\\\": \\\"A/C\\\"}\"}") + val fakeSQSEvent = new SQSEvent() + fakeSQSEvent.setRecords(List(fakeMessage).asJava) + + val paths = fakeSQSEvent.extract[TestMessage]() + + paths shouldBe List(TestMessage("A/C")) + } + + it("extracts multiple values from an SQSEvent") { + val fakeMessage1 = new SQSMessage() + fakeMessage1.setBody("{\"Message\":\"A/C\"}") + val fakeMessage2 = new SQSMessage() + fakeMessage2.setBody("{\"Message\":\"A/E\"}") + val fakeSQSEvent = new SQSEvent() + fakeSQSEvent.setRecords(List(fakeMessage1, fakeMessage2).asJava) + + val paths = fakeSQSEvent.extract[String]() + + paths shouldBe List("A/C", "A/E") + } + + it( + "extracts values from an SQSEvent where the message is a JSON object with multiple fields, only taking the ones we want" + ) { + val fakeMessage = new SQSMessage() + fakeMessage.setBody( + "{\"Message\":\"{\\\"value\\\": \\\"A/C\\\", \\\"other\\\": \\\"D/E\\\"}\"}" + ) + val fakeSQSEvent = new SQSEvent() + fakeSQSEvent.setRecords(List(fakeMessage).asJava) + + val paths = fakeSQSEvent.extract[TestMessage]() + + paths shouldBe List(TestMessage("A/C")) + } + } +} diff --git a/common/lambda/src/test/scala/weco/lambda/SQSLambdaAppTest.scala b/common/lambda/src/test/scala/weco/lambda/SQSLambdaAppTest.scala new file mode 100644 index 0000000000..22f75e8f86 --- /dev/null +++ b/common/lambda/src/test/scala/weco/lambda/SQSLambdaAppTest.scala @@ -0,0 +1,55 @@ +package weco.lambda + +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.must.Matchers +import weco.fixtures.RandomGenerators +import weco.lambda.helpers.{ConfigurationTestHelpers, SQSLambdaAppHelpers} + +class SQSLambdaAppTest + extends AnyFunSpec + with ConfigurationTestHelpers + with SQSLambdaAppHelpers + with RandomGenerators + with Matchers { + + it( + "creates a lambda app with a config, and allows execution of a processEvent function" + ) { + val lambdaApp = new TestLambdaApp() + val eventString = randomAlphanumeric() + + lambdaApp.handleRequest(createSqsEvent(List(eventString)), + null + ) mustBe eventString + expectedConfigString + } + + it( + "creates a lambda app with a config, and allows execution of a processEvent function, handling multiple events" + ) { + val lambdaApp = new TestLambdaApp() + val eventString1 = randomAlphanumeric() + val eventString2 = randomAlphanumeric() + + lambdaApp.handleRequest(createSqsEvent(List(eventString1, eventString2)), + null + ) mustBe eventString1 + eventString2 + expectedConfigString + } + + it("fails if the processEvent function fails") { + val lambdaApp = new FailingTestLambdaApp() + val eventString = randomAlphanumeric() + + a[Throwable] shouldBe thrownBy { + lambdaApp.handleRequest(createSqsEvent(List(eventString)), null) + } + } + + it("fails if the processEvent function takes too long") { + val lambdaApp = new SleepingTestLambdaApp() + val eventString = randomAlphanumeric() + + a[Throwable] shouldBe thrownBy { + lambdaApp.handleRequest(createSqsEvent(List(eventString)), null) + } + } +} diff --git a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/helpers/ConfigurationTestHelpers.scala b/common/lambda/src/test/scala/weco/lambda/helpers/ConfigurationTestHelpers.scala similarity index 91% rename from pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/helpers/ConfigurationTestHelpers.scala rename to common/lambda/src/test/scala/weco/lambda/helpers/ConfigurationTestHelpers.scala index 1edc19eb64..ab675907ba 100644 --- a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/helpers/ConfigurationTestHelpers.scala +++ b/common/lambda/src/test/scala/weco/lambda/helpers/ConfigurationTestHelpers.scala @@ -1,11 +1,8 @@ -package weco.pipeline.relation_embedder.helpers +package weco.lambda.helpers import com.typesafe.config.{Config, ConfigFactory} import weco.fixtures.TestWith -import weco.pipeline.relation_embedder.lib.{ - ApplicationConfig, - LambdaConfigurable -} +import weco.lambda.{ApplicationConfig, LambdaConfigurable} trait ConfigurationTestHelpers { diff --git a/common/lambda/src/test/scala/weco/lambda/helpers/SQSLambdaAppHelpers.scala b/common/lambda/src/test/scala/weco/lambda/helpers/SQSLambdaAppHelpers.scala new file mode 100644 index 0000000000..5f2927f33d --- /dev/null +++ b/common/lambda/src/test/scala/weco/lambda/helpers/SQSLambdaAppHelpers.scala @@ -0,0 +1,55 @@ +package weco.lambda.helpers + +import com.amazonaws.services.lambda.runtime.events.SQSEvent +import com.typesafe.config.Config +import weco.lambda.{ApplicationConfig, SQSLambdaApp} +import scala.concurrent.duration.{DurationInt, FiniteDuration} + +import scala.concurrent.Future +import collection.JavaConverters._ + +trait SQSLambdaAppHelpers { + // This value is from application.conf in test resources + val expectedConfigString = "knownConfigValue" + + case class TestLambdaAppConfiguration(configString: String) + extends ApplicationConfig + + class TestLambdaApp + extends SQSLambdaApp[String, String, TestLambdaAppConfiguration] { + override protected val maximumExecutionTime: FiniteDuration = 200.millis + + // Config is available in this scope + val configString: String = config.configString + + // Function to convert typesafe config to application config is required + override def build(rawConfig: Config): TestLambdaAppConfiguration = + TestLambdaAppConfiguration( + configString = rawConfig.getString("configString") + ) + + override def processT(t: List[String]): Future[String] = + Future.successful(t.mkString + configString) + } + + class FailingTestLambdaApp extends TestLambdaApp { + override def processT(t: List[String]): Future[String] = Future.failed(new Throwable("Failed")) + } + + class SleepingTestLambdaApp extends TestLambdaApp { + override def processT(t: List[String]): Future[String] = Future { + Thread.sleep(500) + t.head + configString + } + } + + def createSqsEvent(eventString: List[String]): SQSEvent = { + val sqsEvent = new SQSEvent() + sqsEvent.setRecords(eventString.map { record => + val sqsRecord = new SQSEvent.SQSMessage() + sqsRecord.setBody(s"""{"Message": "$record"}""") + sqsRecord + }.asJava) + sqsEvent + } +} diff --git a/ebsco_adapter/ebsco_indexer/.gitignore b/ebsco_adapter/ebsco_indexer/.gitignore new file mode 100644 index 0000000000..35b02315a3 --- /dev/null +++ b/ebsco_adapter/ebsco_indexer/.gitignore @@ -0,0 +1,18 @@ +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg diff --git a/pipeline/relation_embedder/batcher/local.docker-compose.yml b/pipeline/relation_embedder/batcher/local.docker-compose.yml new file mode 100644 index 0000000000..a2deae5c1f --- /dev/null +++ b/pipeline/relation_embedder/batcher/local.docker-compose.yml @@ -0,0 +1,16 @@ +# Run locally using `docker compose build lambda && docker compose run --rm --service-ports lambda` +services: + lambda: + build: + context: . + dockerfile: Dockerfile + target: lambda_rie + volumes: + - ~/.aws:/root/.aws + ports: + - "9000:8080" + environment: + - AWS_PROFILE=platform-developer + - metrics_namespace=catalogue-dev_batcher + - use_downstream=stdio + - max_batch_size=100 diff --git a/pipeline/relation_embedder/batcher/scripts/run_local.sh b/pipeline/relation_embedder/batcher/scripts/run_local.sh new file mode 100755 index 0000000000..6b4f292ff6 --- /dev/null +++ b/pipeline/relation_embedder/batcher/scripts/run_local.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +set -euo pipefail + +if [ "$#" -lt 1 ]; then + echo "Usage: $0 [--skip-build]" + exit 1 +fi + +export PIPELINE_DATE=$1 +SKIP_BUILD=false +if [ "$#" -eq 2 ] && [ "$2" == "--skip-build" ]; then + SKIP_BUILD=true +fi + +PROJECT_NAME="batcher" +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" +cd "$DIR"/.. + +# Read template.env, substitute variables, and write to .env +if [ -f template.env ]; then + envsubst < template.env > .env +fi + +# Build the project, skipping if requested +if [ "$SKIP_BUILD" = true ]; then + echo "Skipping build" +else + pushd ../../.. + sbt "project $PROJECT_NAME" ";stage" + popd +fi + +# Build the docker image +docker compose -f local.docker-compose.yml \ + build lambda + +# Run the docker image +docker compose -f local.docker-compose.yml \ + run --rm --service-ports lambda diff --git a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/BatcherWorkerService.scala b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/BatcherWorkerService.scala index 480ca51ef4..475a8885dd 100644 --- a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/BatcherWorkerService.scala +++ b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/BatcherWorkerService.scala @@ -2,25 +2,21 @@ package weco.pipeline.batcher import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ -import scala.util.Try import org.apache.pekko.{Done, NotUsed} import org.apache.pekko.stream.scaladsl._ import software.amazon.awssdk.services.sqs.model.{Message => SQSMessage} import grizzled.slf4j.Logging -import weco.messaging.MessageSender import weco.messaging.sns.NotificationMessage import weco.messaging.sqs.SQSStream import weco.typesafe.Runnable -import weco.json.JsonUtil._ case class Batch(rootPath: String, selectors: List[Selector]) class BatcherWorkerService[MsgDestination]( msgStream: SQSStream[NotificationMessage], - msgSender: MessageSender[MsgDestination], + pathsProcessor: PathsProcessor, flushInterval: FiniteDuration, - maxProcessedPaths: Int, - maxBatchSize: Int + maxProcessedPaths: Int )(implicit ec: ExecutionContext) extends Runnable with Logging { @@ -51,7 +47,7 @@ class BatcherWorkerService[MsgDestination]( private def processPaths( paths: Seq[PathFromSQS] ): Future[Source[SQSMessage, NotUsed]] = - PathsProcessor(maxBatchSize, paths, SNSDownstream) + pathsProcessor(paths) .map { failedPaths => val failedPathSet = failedPaths.toSet @@ -59,8 +55,4 @@ class BatcherWorkerService[MsgDestination]( case path if !failedPathSet.contains(path) => path.referent }.toList) } - - private object SNSDownstream extends Downstream { - override def notify(batch: Batch): Try[Unit] = msgSender.sendT(batch) - } } diff --git a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/CLIMain.scala b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/CLIMain.scala index f3dee7b91e..55f04a6a9d 100644 --- a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/CLIMain.scala +++ b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/CLIMain.scala @@ -10,6 +10,7 @@ import org.apache.pekko.stream.scaladsl.{ StreamConverters } import org.apache.pekko.util.ByteString +import weco.lambda.STDIODownstream import scala.concurrent.{ExecutionContext, Future} @@ -19,31 +20,27 @@ object CLIMain extends App { implicit val ec: ExecutionContext = actorSystem.dispatcher - val stdinSource: Source[ByteString, Future[IOResult]] = + private val stdinSource: Source[ByteString, Future[IOResult]] = StreamConverters.fromInputStream(() => System.in) - val lineDelimiter: Flow[ByteString, ByteString, NotUsed] = + private val lineDelimiter: Flow[ByteString, ByteString, NotUsed] = Framing.delimiter( ByteString("\n"), maximumFrameLength = 256, allowTruncation = true ) - val toStringFlow: Flow[ByteString, String, NotUsed] = + private val toStringFlow: Flow[ByteString, String, NotUsed] = Flow[ByteString].map(_.utf8String) - val toPathFlow: Flow[String, Path, NotUsed] = - Flow[String].map(PathFromString) - - val pathsProcessorFlow: Flow[Seq[Path], Future[Seq[Path]], NotUsed] = + private val pathsProcessor = new PathsProcessor(STDIODownstream, 40) + private val pathsProcessorFlow: Flow[Seq[Path], Future[Seq[Path]], NotUsed] = Flow[Seq[Path]].map { - paths: Seq[Path] => - PathsProcessor( - 40, // TODO: 40 is the number in the config used by Main, do this properly later - paths.toList, - STDIODownstream - ) + paths: Seq[Path] => pathsProcessor(paths) } + private val toPathFlow: Flow[String, Path, NotUsed] = + Flow[String].map(PathFromString) + stdinSource .via(lineDelimiter) .via(toStringFlow) @@ -53,5 +50,6 @@ object CLIMain extends App { .grouped(10000) .via(pathsProcessorFlow) .runWith(Sink.seq) + actorSystem.terminate() } diff --git a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/Downstream.scala b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/Downstream.scala deleted file mode 100644 index ee116a8ce1..0000000000 --- a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/Downstream.scala +++ /dev/null @@ -1,11 +0,0 @@ -package weco.pipeline.batcher - -import scala.util.Try - -trait Downstream { - def notify(batch: Batch): Try[Unit] -} - -object STDIODownstream extends Downstream { - override def notify(batch: Batch): Try[Unit] = Try(println(batch)) -} diff --git a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/LambdaMain.scala b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/LambdaMain.scala index ae66e0943e..33b1c17e0f 100644 --- a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/LambdaMain.scala +++ b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/LambdaMain.scala @@ -1,54 +1,19 @@ package weco.pipeline.batcher -import com.amazonaws.services.lambda.runtime.{Context, RequestHandler} -import grizzled.slf4j.Logging -import com.amazonaws.services.lambda.runtime.events.SQSEvent -import weco.messaging.typesafe.SNSBuilder -import weco.json.JsonUtil._ -import com.typesafe.config.ConfigFactory -import weco.typesafe.config.builders.EnrichConfig.RichConfig -import scala.concurrent.Await -import scala.concurrent.duration.DurationInt -import scala.concurrent.ExecutionContext -import scala.util.Try -import ExecutionContext.Implicits.global +import weco.lambda._ +import weco.pipeline.batcher.lib.{BatcherConfig, BatcherConfigurable} -object LambdaMain extends RequestHandler[SQSEvent, String] with Logging { - import weco.pipeline.batcher.lib.SQSEventOps._ +import scala.concurrent.Future - // Initialize anything we want to be shared across lambda invocations here +object LambdaMain + extends SQSLambdaApp[String, String, BatcherConfig] + with BatcherConfigurable { - private val config = ConfigFactory.load("application") + private val pathsProcessor = PathsProcessor( + Downstream(config.downstreamTarget), + config.maxBatchSize + ) - private object SNSDownstream extends Downstream { - private val msgSender = SNSBuilder - .buildSNSMessageSender(config, subject = "Sent from batcher") - override def notify(batch: Batch): Try[Unit] = msgSender.sendT(batch) - } - - val downstream = config.getString("batcher.use_downstream") match { - case "sns" => SNSDownstream - case "stdio" => STDIODownstream - } - - // This is the entry point for the lambda - - override def handleRequest( - event: SQSEvent, - context: Context - ): String = { - debug(s"Running batcher lambda, got event: $event") - - val f = PathsProcessor( - config.requireInt("batcher.max_batch_size"), - event.extractPaths.map(PathFromString), - downstream - ) - - // Wait here so that lambda can finish executing correctly. - // 15 minutes is the maximum time allowed for a lambda to run. - Await.result(f, 15.minutes) - - "Done" - } + override def processT(t: List[String]): Future[String] = + pathsProcessor(t.map(PathFromString)).map(_ => "Done") } diff --git a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/Main.scala b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/Main.scala index 6a9603801f..6b3d90c475 100644 --- a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/Main.scala +++ b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/Main.scala @@ -4,7 +4,7 @@ import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import org.apache.pekko.actor.ActorSystem import com.typesafe.config.Config - +import weco.lambda.SNSDownstream import weco.messaging.sns.NotificationMessage import weco.messaging.typesafe.{SNSBuilder, SQSBuilder} import weco.typesafe.WellcomeTypesafeApp @@ -20,12 +20,15 @@ object Main extends WellcomeTypesafeApp { new BatcherWorkerService( msgStream = SQSBuilder.buildSQSStream[NotificationMessage](config), - msgSender = SNSBuilder - .buildSNSMessageSender(config, subject = "Sent from batcher"), flushInterval = config.requireInt("batcher.flush_interval_minutes").minutes, - maxProcessedPaths = config.requireInt("batcher.max_processed_paths"), - maxBatchSize = config.requireInt("batcher.max_batch_size") + pathsProcessor = new PathsProcessor( + downstream = new SNSDownstream( + snsConfig = SNSBuilder.buildSNSConfig(config) + ), + maxBatchSize = config.requireInt("batcher.max_batch_size") + ), + maxProcessedPaths = config.requireInt("batcher.max_processed_paths") ) } } diff --git a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/PathsProcessor.scala b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/PathsProcessor.scala index 0977afbb77..ba658a0bcb 100644 --- a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/PathsProcessor.scala +++ b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/PathsProcessor.scala @@ -1,10 +1,22 @@ package weco.pipeline.batcher import grizzled.slf4j.Logging +import weco.lambda.Downstream +import weco.json.JsonUtil._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} -object PathsProcessor extends Logging { +/** Processes a list of paths by bundling them into Batches and sending them to + * a downstream service for processing. + * + * @param downstream + * The downstream target to send the Batches to + * @param maxBatchSize + * The maximum number of selectors to include in a single Batch + */ +class PathsProcessor(downstream: Downstream, maxBatchSize: Int)( + implicit ec: ExecutionContext +) extends Logging { /** Takes a list of strings, each representing a path to be processed by * _downstream_ @@ -19,9 +31,7 @@ object PathsProcessor extends Logging { * SQS/SNS-driven. Should just be the actual failed paths, and the caller * should build a map to work it out if it wants to) */ - def apply(maxBatchSize: Int, paths: Seq[Path], downstream: Downstream)( - implicit ec: ExecutionContext - ): Future[Seq[Path]] = { + def apply[T <: Path](paths: Seq[T]): Future[Seq[Path]] = { info(s"Processing ${paths.size} paths with max batch size $maxBatchSize") Future @@ -111,3 +121,11 @@ object PathsProcessor extends Logging { } } } + +object PathsProcessor { + def apply( + downstream: Downstream, + maxBatchSize: Int + )(implicit ec: ExecutionContext): PathsProcessor = + new PathsProcessor(downstream, maxBatchSize) +} diff --git a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/lib/BatcherConfig.scala b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/lib/BatcherConfig.scala new file mode 100644 index 0000000000..3bbc515ec2 --- /dev/null +++ b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/lib/BatcherConfig.scala @@ -0,0 +1,31 @@ +package weco.pipeline.batcher.lib + +import com.typesafe.config.Config +import weco.lambda.{ + ApplicationConfig, + DownstreamTarget, + LambdaConfigurable, + SNS, + StdOut +} +import weco.messaging.typesafe.SNSBuilder.buildSNSConfig + +case class BatcherConfig( + maxBatchSize: Int, + downstreamTarget: DownstreamTarget +) extends ApplicationConfig + +trait BatcherConfigurable extends LambdaConfigurable[BatcherConfig] { + import weco.typesafe.config.builders.EnrichConfig._ + + def build(rawConfig: Config): BatcherConfig = + BatcherConfig( + maxBatchSize = rawConfig.requireInt("batcher.max_batch_size"), + downstreamTarget = { + rawConfig.requireString("batcher.use_downstream") match { + case "sns" => SNS(buildSNSConfig(rawConfig)) + case "stdio" => StdOut + } + } + ) +} diff --git a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/lib/SQSEventOps.scala b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/lib/SQSEventOps.scala deleted file mode 100644 index e27e19128a..0000000000 --- a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/lib/SQSEventOps.scala +++ /dev/null @@ -1,26 +0,0 @@ -package weco.pipeline.batcher.lib - -import com.amazonaws.services.lambda.runtime.events.SQSEvent -import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage -import scala.collection.JavaConverters._ - -object SQSEventOps { - - /** Messages consumed by Lambda from SQS are taken from a queue populated by - * an SNS topic. The actual message we are interested in is a String - * containing the path. However, the matryoshka-like nature of these things - * means the lambda receives - * - an event containing - * - a `Records` list, each Record containing - * - an SQS Message with a JSON body containing - * - an SNS notification containing - * - a `Message`, which is the actual content we want - */ - implicit class ExtractPathFromSqsEvent(event: SQSEvent) { - def extractPaths: List[String] = - event.getRecords.asScala.toList.flatMap(extractPathFromMessage) - - private def extractPathFromMessage(message: SQSMessage): Option[String] = - ujson.read(message.getBody).obj.get("Message").flatMap(_.strOpt) - } -} diff --git a/pipeline/relation_embedder/batcher/src/test/scala/weco/pipeline/batcher/BatcherWorkerServiceTest.scala b/pipeline/relation_embedder/batcher/src/test/scala/weco/pipeline/batcher/BatcherWorkerServiceTest.scala index 0069739884..9ec01a1fd8 100644 --- a/pipeline/relation_embedder/batcher/src/test/scala/weco/pipeline/batcher/BatcherWorkerServiceTest.scala +++ b/pipeline/relation_embedder/batcher/src/test/scala/weco/pipeline/batcher/BatcherWorkerServiceTest.scala @@ -1,21 +1,23 @@ package weco.pipeline.batcher -import scala.concurrent.duration._ -import scala.concurrent.ExecutionContext.Implicits.global -import scala.util.{Failure, Try} +import io.circe.Encoder +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.concurrent.{Eventually, IntegrationPatience} import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers -import org.scalatest.concurrent.{Eventually, IntegrationPatience} -import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.{Seconds, Span} -import io.circe.Encoder import weco.fixtures.TestWith -import weco.pekko.fixtures.Pekko +import weco.json.JsonUtil._ +import weco.lambda.Downstream import weco.messaging.fixtures.SQS +import weco.messaging.fixtures.SQS.QueuePair import weco.messaging.memory.MemoryMessageSender import weco.messaging.sns.NotificationMessage -import weco.json.JsonUtil._ -import SQS.QueuePair +import weco.pekko.fixtures.Pekko + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.util.{Failure, Try} class BatcherWorkerServiceTest extends AnyFunSpec @@ -154,12 +156,16 @@ class BatcherWorkerServiceTest withSQSStream[NotificationMessage, R](queuePair.queue) { msgStream => val msgSender = new MessageSender(brokenPaths) + val memoryDownstream = new MemoryDownstream(msgSender) + val pathsProcessor = new PathsProcessor( + downstream = memoryDownstream, + maxBatchSize = maxBatchSize + ) val workerService = new BatcherWorkerService[String]( msgStream = msgStream, - msgSender = msgSender, flushInterval = flushInterval, maxProcessedPaths = 1000, - maxBatchSize = maxBatchSize + pathsProcessor = pathsProcessor ) workerService.run() testWith((queuePair, msgSender)) @@ -167,6 +173,7 @@ class BatcherWorkerServiceTest } } + class MessageSender(brokenPaths: Set[String] = Set.empty) extends MemoryMessageSender { override def sendT[T](t: T)(implicit encoder: Encoder[T]): Try[Unit] = { @@ -177,4 +184,9 @@ class BatcherWorkerServiceTest super.sendT(t) } } + + class MemoryDownstream(messageSender: MessageSender) extends Downstream { + override def notify(workId: String): Try[Unit] = ??? + override def notify[T](batch: T)(implicit encoder: Encoder[T]): Try[Unit] = messageSender.sendT(batch) + } } diff --git a/pipeline/relation_embedder/batcher/src/test/scala/weco/pipeline/batcher/SQSEventOpsTest.scala b/pipeline/relation_embedder/batcher/src/test/scala/weco/pipeline/batcher/SQSEventOpsTest.scala deleted file mode 100644 index 01b45f7838..0000000000 --- a/pipeline/relation_embedder/batcher/src/test/scala/weco/pipeline/batcher/SQSEventOpsTest.scala +++ /dev/null @@ -1,24 +0,0 @@ -package weco.pipeline.batcher - -import com.amazonaws.services.lambda.runtime.events.SQSEvent -import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage -import org.scalatest.funspec.AnyFunSpec -import org.scalatest.matchers.should.Matchers -import scala.collection.JavaConverters._ - -class SQSEventOpsTest extends AnyFunSpec with Matchers { - import lib.SQSEventOps._ - - describe("Using the implicit class SQSEventOps") { - it("extracts paths from an SQSEvent") { - val fakeMessage = new SQSMessage() - fakeMessage.setBody("{\"Message\":\"A/C\"}") - val fakeSQSEvent = new SQSEvent() - fakeSQSEvent.setRecords(List(fakeMessage).asJava) - - val paths = fakeSQSEvent.extractPaths - - paths shouldBe List("A/C") - } - } -} diff --git a/pipeline/relation_embedder/relation_embedder/scripts/batches.txt b/pipeline/relation_embedder/relation_embedder/scripts/batches.txt index d8d57038b6..ccf09a64b8 100644 --- a/pipeline/relation_embedder/relation_embedder/scripts/batches.txt +++ b/pipeline/relation_embedder/relation_embedder/scripts/batches.txt @@ -1,6 +1,2 @@ {"rootPath":"GC103","selectors":[{"path":"GC103","type":"Tree"}]} -{"rootPath":"MS1302","selectors":[{"path":"MS1302","type":"Node"},{"path":"MS1302","type":"Children"},{"path":"MS1302/5403","type":"Descendents"}]} -{"rootPath":"MS5122","selectors":[{"path":"MS5122","type":"Tree"}]} -{"rootPath":"MS5120","selectors":[{"path":"MS5120","type":"Children"},{"path":"MS5120","type":"Node"},{"path":"MS5120/5121","type":"Descendents"}]} -{"rootPath":"MS6037","selectors":[{"path":"MS6037","type":"Tree"}]} -{"rootPath":"SAIPA","selectors":[{"path":"SAIPA/B/1/23","type":"Node"},{"path":"SAIPA/B/1/23","type":"Children"},{"path":"SAIPA/B/1/23/22","type":"Descendents"}]} + diff --git a/pipeline/relation_embedder/relation_embedder/scripts/run_local.sh b/pipeline/relation_embedder/relation_embedder/scripts/run_local.sh index 17061f295d..83eaf83cd2 100755 --- a/pipeline/relation_embedder/relation_embedder/scripts/run_local.sh +++ b/pipeline/relation_embedder/relation_embedder/scripts/run_local.sh @@ -17,8 +17,10 @@ PROJECT_NAME="relation_embedder" DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" cd "$DIR"/.. -# Read .template.env, substitute variables, and write to .env -envsubst < template.env > .env +# Read template.env, substitute variables, and write to .env +if [ -f template.env ]; then + envsubst < template.env > .env +fi # Build the project, skipping if requested if [ "$SKIP_BUILD" = true ]; then diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala index f7effe5935..1dc269d528 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala @@ -8,7 +8,6 @@ import org.apache.pekko.stream.Materializer import org.apache.pekko.stream.scaladsl.{Sink, Source} import weco.catalogue.internal_model.work.Work import weco.catalogue.internal_model.work.WorkState.Denormalised -import lib.ElasticBuilder import weco.pipeline.relation_embedder.models.{ ArchiveRelationsCache, Batch, @@ -19,7 +18,8 @@ import weco.pipeline_storage.elastic.ElasticIndexer import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} import weco.catalogue.internal_model.Implicits._ -import weco.pipeline.relation_embedder.lib.{Downstream, RelationEmbedderConfig} +import weco.lambda.{Downstream, ElasticBuilder} +import weco.pipeline.relation_embedder.lib.RelationEmbedderConfig class BatchProcessor( relationsService: RelationsService, diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/CLIMain.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/CLIMain.scala index 64f01dbc94..9c2c76404c 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/CLIMain.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/CLIMain.scala @@ -8,7 +8,8 @@ import com.sksamuel.elastic4s.Index import weco.typesafe.config.builders.EnrichConfig._ import weco.elasticsearch.typesafe.ElasticBuilder import weco.json.JsonUtil._ -import weco.pipeline.relation_embedder.lib.{STDIODownstream, StdInBatches} +import weco.lambda.{STDIODownstream, StdInNDJSON} +import weco.pipeline.relation_embedder.models.Batch /** A main function providing a local CLI for the relation embedder. To invoke, * provide a list of Batch objects in NDJSON on StdIn. @@ -16,7 +17,7 @@ import weco.pipeline.relation_embedder.lib.{STDIODownstream, StdInBatches} * This will embed relations as required and print the resulting Works and * their identifiers */ -object CLIMain extends App with StdInBatches { +object CLIMain extends App with StdInNDJSON[Batch] { implicit val actorSystem: ActorSystem = ActorSystem("main-actor-system") implicit val ec: ExecutionContext = actorSystem.dispatcher @@ -33,7 +34,7 @@ object CLIMain extends App with StdInBatches { ) Await.result( - Future.sequence(batches.map(batchProcessor.apply)), + Future.sequence(instances.map(batchProcessor.apply)), 5 minutes ) diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/LambdaMain.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/LambdaMain.scala index 34e2a2c0d8..2abb281aa2 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/LambdaMain.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/LambdaMain.scala @@ -1,22 +1,23 @@ package weco.pipeline.relation_embedder -import com.amazonaws.services.lambda.runtime.events.SQSEvent +import weco.lambda.SQSLambdaApp import weco.pipeline.relation_embedder.lib._ +import weco.json.JsonUtil._ +import weco.pipeline.relation_embedder.models.Batch import scala.concurrent.Future object LambdaMain - extends LambdaApp[SQSEvent, String, RelationEmbedderConfig] + extends SQSLambdaApp[Batch, String, RelationEmbedderConfig] with RelationEmbedderConfigurable { - import SQSEventOps._ private lazy val batchProcessor = BatchProcessor(config) - def processEvent(event: SQSEvent): Future[String] = { - info(s"running relation_embedder lambda, got event: $event") + def processT(t: List[Batch]): Future[String] = { + info(s"running relation_embedder lambda, got event: $t") Future - .sequence(event.extractBatches.map(batchProcessor(_))) + .sequence(t.map(batchProcessor(_))) .map(_ => "Done") } } diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/RelationEmbedderConfig.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/RelationEmbedderConfig.scala index a740b59d31..4e01b39243 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/RelationEmbedderConfig.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/RelationEmbedderConfig.scala @@ -1,8 +1,9 @@ package weco.pipeline.relation_embedder.lib import com.typesafe.config.Config +import weco.lambda.ElasticBuilder.buildElasticClientConfig +import weco.lambda._ import weco.messaging.typesafe.SNSBuilder.buildSNSConfig -import ElasticBuilder.buildElasticClientConfig case class RelationEmbedderConfig( mergedWorkIndex: String, diff --git a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/BatchProcessorTest.scala b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/BatchProcessorTest.scala index 66540751a0..cb1df5319d 100644 --- a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/BatchProcessorTest.scala +++ b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/BatchProcessorTest.scala @@ -1,5 +1,6 @@ package weco.pipeline.relation_embedder +import io.circe.Encoder import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers import weco.pekko.fixtures.Pekko @@ -11,8 +12,8 @@ import org.apache.pekko.stream.Materializer import weco.catalogue.internal_model.work.{Availability, Relations, Work} import weco.catalogue.internal_model.work.WorkState.{Denormalised, Merged} import weco.fixtures.TestWith +import weco.lambda.Downstream import weco.messaging.memory.MemoryMessageSender -import weco.pipeline.relation_embedder.lib.Downstream import weco.pipeline.relation_embedder.models.Batch import weco.pipeline.relation_embedder.models.Selector.{Descendents, Node, Tree} import weco.pipeline_storage.memory.MemoryIndexer @@ -33,6 +34,8 @@ class BatchProcessorTest override def notify(workId: String): Try[Unit] = Try(msgSender.send(workId)) + override def notify[T](batch: T)(implicit encoder: Encoder[T]): Try[Unit] = + ??? } protected def withProcessedBatch[R]( diff --git a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/RelationEmbedderWorkerServiceTest.scala b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/RelationEmbedderWorkerServiceTest.scala index 0890cb9190..1e22307599 100644 --- a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/RelationEmbedderWorkerServiceTest.scala +++ b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/RelationEmbedderWorkerServiceTest.scala @@ -1,6 +1,7 @@ package weco.pipeline.relation_embedder import com.sksamuel.elastic4s.Index +import io.circe.Encoder import scala.collection.mutable import scala.concurrent.ExecutionContext.Implicits.global @@ -21,8 +22,8 @@ import weco.messaging.memory.MemoryMessageSender import weco.messaging.sns.NotificationMessage import weco.catalogue.internal_model.work.WorkState.{Denormalised, Merged} import weco.catalogue.internal_model.work._ +import weco.lambda.Downstream import weco.pipeline.relation_embedder.fixtures.SampleWorkTree -import weco.pipeline.relation_embedder.lib.Downstream import weco.pipeline.relation_embedder.models._ import weco.pipeline_storage.memory.MemoryIndexer @@ -148,6 +149,10 @@ class RelationEmbedderWorkerServiceTest override def notify(workId: String): Try[Unit] = Try(msgSender.send(workId)) + + override def notify[T](batch: T)( + implicit encoder: Encoder[T] + ): Try[Unit] = ??? } val denormalisedIndex = diff --git a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/lib/LambdaAppTest.scala b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/lib/LambdaAppTest.scala deleted file mode 100644 index 87c35ecc8f..0000000000 --- a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/lib/LambdaAppTest.scala +++ /dev/null @@ -1,82 +0,0 @@ -package weco.pipeline.relation_embedder.lib - -import com.typesafe.config.Config -import org.scalatest.funspec.AnyFunSpec -import org.scalatest.matchers.should.Matchers -import weco.fixtures.RandomGenerators -import weco.pipeline.relation_embedder.helpers.ConfigurationTestHelpers - -import scala.concurrent.duration.{DurationInt, FiniteDuration} -import scala.concurrent.Future - -class LambdaAppTest - extends AnyFunSpec - with ConfigurationTestHelpers - with RandomGenerators - with Matchers { - - // This value is from application.conf in test resources - val configString = "knownConfigValue" - - case class TestLambdaAppConfiguration(configString: String) - extends ApplicationConfig - class TestLambdaApp - extends LambdaApp[String, String, TestLambdaAppConfiguration] { - override protected val maximumExecutionTime: FiniteDuration = 200.millis - - // Config is available in this scope - lazy val configString: String = config.configString - - // Function to process an event is required, and should return a Future - override def processEvent(event: String): Future[String] = - Future.successful(event + configString) - - // Function to convert typesafe config to application config is required - override def build(rawConfig: Config): TestLambdaAppConfiguration = - TestLambdaAppConfiguration( - configString = rawConfig.getString("configString") - ) - } - - it( - "creates a lambda app with a config, and allows execution of a processEvent function" - ) { - val lambdaApp = new TestLambdaApp() - val eventString = randomAlphanumeric() - - lambdaApp.handleRequest( - eventString, - null - ) shouldBe eventString + configString - } - - class FailingTestLambdaApp extends TestLambdaApp { - override def processEvent(event: String): Future[String] = - Future.failed(new Throwable("Failed")) - } - - it("fails if the processEvent function fails") { - val lambdaApp = new FailingTestLambdaApp() - val eventString = randomAlphanumeric() - - a[Throwable] shouldBe thrownBy { - lambdaApp.handleRequest(eventString, null) - } - } - - class SleepingTestLambdaApp extends TestLambdaApp { - override def processEvent(event: String): Future[String] = Future { - Thread.sleep(500) - event + configString - } - } - - it("fails if the processEvent function takes too long") { - val lambdaApp = new SleepingTestLambdaApp() - val eventString = randomAlphanumeric() - - a[Throwable] shouldBe thrownBy { - lambdaApp.handleRequest(eventString, null) - } - } -} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index e7706031ab..5dfd6cea77 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -197,6 +197,17 @@ object CatalogueDependencies { val flowDependencies: Seq[ModuleID] = WellcomeDependencies.typesafeLibrary + val lambdaDependencies: Seq[ModuleID] = + WellcomeDependencies.typesafeLibrary ++ + WellcomeDependencies.messagingTypesafeLibrary ++ + WellcomeDependencies.elasticsearchTypesafeLibrary ++ + WellcomeDependencies.jsonLibrary ++ + WellcomeDependencies.fixturesLibrary ++ + ExternalDependencies.scalatestDependencies ++ + ExternalDependencies.awsLambdaClient ++ + ExternalDependencies.awsLambdaEvents ++ + ExternalDependencies.uPickle + val sourceModelDependencies: Seq[sbt.ModuleID] = WellcomeDependencies.storageLibrary ++ WellcomeDependencies.fixturesLibrary ++ @@ -238,24 +249,9 @@ object CatalogueDependencies { val pathConcatenatorDependencies: Seq[ModuleID] = WellcomeDependencies.messagingTypesafeLibrary - val relationEmbedderDependencies: Seq[ModuleID] = - WellcomeDependencies.messagingTypesafeLibrary ++ - ExternalDependencies.awsLambdaClient ++ - ExternalDependencies.awsLambdaEvents ++ - ExternalDependencies.uPickle - val routerDependencies: Seq[ModuleID] = WellcomeDependencies.messagingTypesafeLibrary - val batcherDependencies: Seq[ModuleID] = - ExternalDependencies.scalatestDependencies ++ - ExternalDependencies.awsLambdaClient ++ - ExternalDependencies.awsLambdaEvents ++ - ExternalDependencies.uPickle ++ - WellcomeDependencies.typesafeLibrary ++ - WellcomeDependencies.fixturesLibrary ++ - WellcomeDependencies.messagingTypesafeLibrary - val miroTransformerDependencies: Seq[ModuleID] = ExternalDependencies.apacheCommonsDependencies ++ WellcomeDependencies.storageTypesafeLibrary