diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala index 2d60765616..abad859478 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala @@ -1,7 +1,6 @@ package weco.pipeline.relation_embedder import com.sksamuel.elastic4s.Index -import com.typesafe.config.Config import grizzled.slf4j.Logging import org.apache.pekko.NotUsed import org.apache.pekko.actor.ActorSystem @@ -9,18 +8,14 @@ import org.apache.pekko.stream.Materializer import org.apache.pekko.stream.scaladsl.{Sink, Source} import weco.catalogue.internal_model.work.Work import weco.catalogue.internal_model.work.WorkState.Denormalised -import weco.elasticsearch.typesafe.ElasticBuilder -import weco.pipeline.relation_embedder.models.{ - ArchiveRelationsCache, - Batch, - RelationWork -} +import lib.ElasticBuilder +import weco.pipeline.relation_embedder.models.{ArchiveRelationsCache, Batch, RelationWork} import weco.pipeline_storage.elastic.ElasticIndexer import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} import weco.catalogue.internal_model.Implicits._ -import weco.typesafe.config.builders.EnrichConfig._ +import weco.pipeline.relation_embedder.lib.{Downstream, RelationEmbedderConfig} class BatchProcessor( relationsService: RelationsService, @@ -103,7 +98,7 @@ class BatchProcessor( object BatchProcessor { def apply( - config: Config + config: RelationEmbedderConfig )( implicit actorSystem: ActorSystem, ec: ExecutionContext, @@ -111,31 +106,30 @@ object BatchProcessor { ): BatchProcessor = { val identifiedIndex = - Index(config.requireString("es.merged-works.index")) + Index(config.mergedWorkIndex) - val esClient = ElasticBuilder.buildElasticClient(config) + val esClient = ElasticBuilder.buildElasticClient(config.elasticConfig) val workIndexer = new ElasticIndexer[Work[Denormalised]]( client = esClient, - index = Index(config.requireString(s"es.denormalised-works.index")) + index = Index(config.denormalisedWorkIndex) ) val batchWriter = new BulkIndexWriter( workIndexer = workIndexer, - maxBatchWeight = config.requireInt("es.works.batch_size") + maxBatchWeight = config.maxBatchWeight ) new BatchProcessor( relationsService = new PathQueryRelationsService( esClient, identifiedIndex, - completeTreeScroll = config.requireInt("es.works.scroll.complete_tree"), - affectedWorksScroll = - config.requireInt("es.works.scroll.affected_works") + completeTreeScroll = config.completeTreeScroll, + affectedWorksScroll = config.affectedWorksScroll ), bulkWriter = batchWriter, - downstream = Downstream(Some(config)) + downstream = Downstream(config.downstreamTarget) ) } } diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/CLIMain.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/CLIMain.scala index 3dae51b067..64f01dbc94 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/CLIMain.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/CLIMain.scala @@ -2,16 +2,13 @@ package weco.pipeline.relation_embedder import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration.DurationInt - import org.apache.pekko.actor.ActorSystem - import com.typesafe.config.ConfigFactory import com.sksamuel.elastic4s.Index - import weco.typesafe.config.builders.EnrichConfig._ import weco.elasticsearch.typesafe.ElasticBuilder import weco.json.JsonUtil._ -import weco.pipeline.relation_embedder.lib.StdInBatches +import weco.pipeline.relation_embedder.lib.{STDIODownstream, StdInBatches} /** A main function providing a local CLI for the relation embedder. To invoke, * provide a list of Batch objects in NDJSON on StdIn. diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/Downstream.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/Downstream.scala deleted file mode 100644 index c2d79dd9fa..0000000000 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/Downstream.scala +++ /dev/null @@ -1,34 +0,0 @@ -package weco.pipeline.relation_embedder - -import com.typesafe.config.Config -import weco.messaging.typesafe.SNSBuilder - -import scala.util.Try - -trait Downstream { - def notify(workId: String): Try[Unit] -} - -class SNSDownstream(config: Config) extends Downstream { - private val msgSender = SNSBuilder - .buildSNSMessageSender(config, subject = "Sent from relation_embedder") - - override def notify(workId: String): Try[Unit] = Try(msgSender.send(workId)) -} - -object STDIODownstream extends Downstream { - override def notify(workId: String): Try[Unit] = Try(println(workId)) -} - -object Downstream { - def apply(maybeConfig: Option[Config]): Downstream = { - maybeConfig match { - case Some(config) => - config.getString("relation_embedder.use_downstream") match { - case "sns" => new SNSDownstream(config) - case "stdio" => STDIODownstream - } - case None => STDIODownstream - } - } -} diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/LambdaMain.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/LambdaMain.scala index ebd4c4565d..3cc50a0a52 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/LambdaMain.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/LambdaMain.scala @@ -12,7 +12,7 @@ import scala.concurrent.{Await, ExecutionContext, Future} object LambdaMain extends RequestHandler[SQSEvent, String] with Logging - with LambdaConfiguration { + with RelationEmbedderConfigurable { import SQSEventOps._ diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaConfiguration.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaConfiguration.scala index 1592c2b67f..8a8543cf75 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaConfiguration.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/lib/LambdaConfiguration.scala @@ -3,11 +3,20 @@ package weco.pipeline.relation_embedder.lib import java.io.File import com.typesafe.config.{Config, ConfigFactory} -trait Configuration { - val config: Config +trait ApplicationConfig {} + +trait ConfigurationBuilder[C, T <: ApplicationConfig] { + protected val rawConfig: C + + def build(rawConfig: C): T + def config: T = build(rawConfig) +} + +trait TypesafeConfigurable[T <: ApplicationConfig] extends ConfigurationBuilder[Config, T] { + def build(rawConfig: Config): T } -trait LambdaConfiguration extends Configuration { +trait LambdaConfigurable[T <: ApplicationConfig] extends TypesafeConfigurable[T] { private val defaultResolveFromFile: String = "/tmp/config" private val defaultApplicationConfig: String = "application.conf" @@ -26,7 +35,7 @@ trait LambdaConfiguration extends Configuration { ConfigFactory.empty() } - lazy val config = lambdaConfig + lazy val rawConfig = lambdaConfig .withFallback(applicationConfig) .withFallback(baseConfig) .resolve() diff --git a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/BatchProcessorTest.scala b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/BatchProcessorTest.scala index ad5ea8535b..a39a23a050 100644 --- a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/BatchProcessorTest.scala +++ b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/BatchProcessorTest.scala @@ -3,15 +3,13 @@ package weco.pipeline.relation_embedder import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers import weco.pekko.fixtures.Pekko -import weco.pipeline.relation_embedder.fixtures.{ - BulkWriterAssertions, - SampleWorkTree -} +import weco.pipeline.relation_embedder.fixtures.{BulkWriterAssertions, SampleWorkTree} import org.apache.pekko.stream.Materializer import weco.catalogue.internal_model.work.{Availability, Relations, Work} import weco.catalogue.internal_model.work.WorkState.{Denormalised, Merged} import weco.fixtures.TestWith import weco.messaging.memory.MemoryMessageSender +import weco.pipeline.relation_embedder.lib.Downstream import weco.pipeline.relation_embedder.models.Batch import weco.pipeline.relation_embedder.models.Selector.{Descendents, Node, Tree} import weco.pipeline_storage.memory.MemoryIndexer diff --git a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/RelationEmbedderWorkerServiceTest.scala b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/RelationEmbedderWorkerServiceTest.scala index c1cf3b06c7..0890cb9190 100644 --- a/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/RelationEmbedderWorkerServiceTest.scala +++ b/pipeline/relation_embedder/relation_embedder/src/test/scala/weco/pipeline/relation_embedder/RelationEmbedderWorkerServiceTest.scala @@ -22,7 +22,7 @@ import weco.messaging.sns.NotificationMessage import weco.catalogue.internal_model.work.WorkState.{Denormalised, Merged} import weco.catalogue.internal_model.work._ import weco.pipeline.relation_embedder.fixtures.SampleWorkTree - +import weco.pipeline.relation_embedder.lib.Downstream import weco.pipeline.relation_embedder.models._ import weco.pipeline_storage.memory.MemoryIndexer