Skip to content

Commit

Permalink
define strict config layer (keep typesafe at the edge)
Browse files Browse the repository at this point in the history
  • Loading branch information
kenoir committed Dec 18, 2024
1 parent 304eb76 commit d8807d7
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
package weco.pipeline.relation_embedder

import com.sksamuel.elastic4s.Index
import com.typesafe.config.Config
import grizzled.slf4j.Logging
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import weco.catalogue.internal_model.work.Work
import weco.catalogue.internal_model.work.WorkState.Denormalised
import weco.elasticsearch.typesafe.ElasticBuilder
import weco.pipeline.relation_embedder.models.{
ArchiveRelationsCache,
Batch,
RelationWork
}
import lib.ElasticBuilder
import weco.pipeline.relation_embedder.models.{ArchiveRelationsCache, Batch, RelationWork}
import weco.pipeline_storage.elastic.ElasticIndexer

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import weco.catalogue.internal_model.Implicits._
import weco.typesafe.config.builders.EnrichConfig._
import weco.pipeline.relation_embedder.lib.{Downstream, RelationEmbedderConfig}

class BatchProcessor(
relationsService: RelationsService,
Expand Down Expand Up @@ -103,39 +98,38 @@ class BatchProcessor(
object BatchProcessor {

def apply(
config: Config
config: RelationEmbedderConfig
)(
implicit actorSystem: ActorSystem,
ec: ExecutionContext,
materializer: Materializer
): BatchProcessor = {

val identifiedIndex =
Index(config.requireString("es.merged-works.index"))
Index(config.mergedWorkIndex)

val esClient = ElasticBuilder.buildElasticClient(config)
val esClient = ElasticBuilder.buildElasticClient(config.elasticConfig)

val workIndexer =
new ElasticIndexer[Work[Denormalised]](
client = esClient,
index = Index(config.requireString(s"es.denormalised-works.index"))
index = Index(config.denormalisedWorkIndex)
)

val batchWriter = new BulkIndexWriter(
workIndexer = workIndexer,
maxBatchWeight = config.requireInt("es.works.batch_size")
maxBatchWeight = config.maxBatchWeight
)

new BatchProcessor(
relationsService = new PathQueryRelationsService(
esClient,
identifiedIndex,
completeTreeScroll = config.requireInt("es.works.scroll.complete_tree"),
affectedWorksScroll =
config.requireInt("es.works.scroll.affected_works")
completeTreeScroll = config.completeTreeScroll,
affectedWorksScroll = config.affectedWorksScroll
),
bulkWriter = batchWriter,
downstream = Downstream(Some(config))
downstream = Downstream(config.downstreamTarget)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package weco.pipeline.relation_embedder

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.DurationInt

import org.apache.pekko.actor.ActorSystem

import com.typesafe.config.ConfigFactory
import com.sksamuel.elastic4s.Index

import weco.typesafe.config.builders.EnrichConfig._
import weco.elasticsearch.typesafe.ElasticBuilder
import weco.json.JsonUtil._
import weco.pipeline.relation_embedder.lib.StdInBatches
import weco.pipeline.relation_embedder.lib.{STDIODownstream, StdInBatches}

/** A main function providing a local CLI for the relation embedder. To invoke,
* provide a list of Batch objects in NDJSON on StdIn.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import scala.concurrent.{Await, ExecutionContext, Future}
object LambdaMain
extends RequestHandler[SQSEvent, String]
with Logging
with LambdaConfiguration {
with RelationEmbedderConfigurable {

import SQSEventOps._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@ package weco.pipeline.relation_embedder.lib
import java.io.File
import com.typesafe.config.{Config, ConfigFactory}

trait Configuration {
val config: Config
trait ApplicationConfig {}

trait ConfigurationBuilder[C, T <: ApplicationConfig] {
protected val rawConfig: C

def build(rawConfig: C): T
def config: T = build(rawConfig)
}

trait TypesafeConfigurable[T <: ApplicationConfig] extends ConfigurationBuilder[Config, T] {
def build(rawConfig: Config): T
}

trait LambdaConfiguration extends Configuration {
trait LambdaConfigurable[T <: ApplicationConfig] extends TypesafeConfigurable[T] {
private val defaultResolveFromFile: String = "/tmp/config"
private val defaultApplicationConfig: String = "application.conf"

Expand All @@ -26,7 +35,7 @@ trait LambdaConfiguration extends Configuration {
ConfigFactory.empty()
}

lazy val config = lambdaConfig
lazy val rawConfig = lambdaConfig
.withFallback(applicationConfig)
.withFallback(baseConfig)
.resolve()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package weco.pipeline.relation_embedder
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import weco.pekko.fixtures.Pekko
import weco.pipeline.relation_embedder.fixtures.{
BulkWriterAssertions,
SampleWorkTree
}
import weco.pipeline.relation_embedder.fixtures.{BulkWriterAssertions, SampleWorkTree}
import org.apache.pekko.stream.Materializer
import weco.catalogue.internal_model.work.{Availability, Relations, Work}
import weco.catalogue.internal_model.work.WorkState.{Denormalised, Merged}
import weco.fixtures.TestWith
import weco.messaging.memory.MemoryMessageSender
import weco.pipeline.relation_embedder.lib.Downstream
import weco.pipeline.relation_embedder.models.Batch
import weco.pipeline.relation_embedder.models.Selector.{Descendents, Node, Tree}
import weco.pipeline_storage.memory.MemoryIndexer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import weco.messaging.sns.NotificationMessage
import weco.catalogue.internal_model.work.WorkState.{Denormalised, Merged}
import weco.catalogue.internal_model.work._
import weco.pipeline.relation_embedder.fixtures.SampleWorkTree

import weco.pipeline.relation_embedder.lib.Downstream
import weco.pipeline.relation_embedder.models._
import weco.pipeline_storage.memory.MemoryIndexer

Expand Down

0 comments on commit d8807d7

Please sign in to comment.