Skip to content

Commit

Permalink
add matcher cli
Browse files Browse the repository at this point in the history
  • Loading branch information
kenoir committed Sep 5, 2024
1 parent 15ac0a1 commit 63ec5da
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import weco.messaging.sns.NotificationMessage
import weco.messaging.typesafe.{SNSBuilder, SQSBuilder}
import weco.pipeline.matcher.matcher.WorkMatcher
import weco.pipeline.matcher.models.MatcherResult
import weco.pipeline.matcher.services.MatcherWorkerService
import weco.pipeline.matcher.services.{CommandLineMatcherWorkerService, MatcherWorkerService}
import weco.pipeline.matcher.storage.elastic.ElasticWorkStubRetriever
import weco.pipeline.matcher.storage.{WorkGraphStore, WorkNodeDao}
import weco.pipeline_storage.typesafe.PipelineStorageStreamBuilder
Expand All @@ -22,6 +22,11 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.language.higherKinds

object Main extends WellcomeTypesafeApp {

// read and print args passed from the command line
val runAsCli = args.length > 0
val idsToCheck = if(runAsCli) Some(args(0)) else None

runWithConfig {
config: Config =>
implicit val actorSystem: ActorSystem =
Expand Down Expand Up @@ -51,13 +56,20 @@ object Main extends WellcomeTypesafeApp {
index = Index(config.requireString("es.index"))
)

new MatcherWorkerService(
PipelineStorageStreamBuilder.buildPipelineStorageConfig(config),
retriever = retriever,
msgStream = SQSBuilder.buildSQSStream[NotificationMessage](config),
msgSender = SNSBuilder
.buildSNSMessageSender(config, subject = "Sent from the matcher"),
workMatcher = workMatcher
)
}
if (runAsCli) {
new CommandLineMatcherWorkerService(
retriever = retriever,
workMatcher = workMatcher
)(workId = idsToCheck)
} else {
new MatcherWorkerService(
PipelineStorageStreamBuilder.buildPipelineStorageConfig(config),
retriever = retriever,
msgStream = SQSBuilder.buildSQSStream[NotificationMessage](config),
msgSender = SNSBuilder
.buildSNSMessageSender(config, subject = "Sent from the matcher"),
workMatcher = workMatcher
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,65 @@ import weco.messaging.sqs.SQSStream
import weco.pipeline_storage.PipelineStorageStream._
import weco.pipeline.matcher.matcher.WorkMatcher
import weco.pipeline.matcher.models.MatcherResult._
import weco.pipeline.matcher.models.{VersionExpectedConflictException, WorkStub}
import weco.pipeline.matcher.models.{MatcherResult, VersionExpectedConflictException, WorkStub}
import weco.typesafe.Runnable
import weco.pipeline_storage.{PipelineStorageConfig, Retriever}

import scala.concurrent.{ExecutionContext, Future}

trait Worker[T, Output] {
def doWork(t: T): Output
}

trait MatcherWorker extends Worker[WorkStub, Future[Option[MatcherResult]]] with Logging {
implicit val ec: ExecutionContext
val workMatcher: WorkMatcher

def doWork(workStub: WorkStub): Future[Option[MatcherResult]] = {
workMatcher
.matchWork(workStub)
.map(Some(_))
.recover {
case e: VersionExpectedConflictException =>
debug(
s"Not matching work due to version conflict exception: ${e.getMessage}"
)
None
}
}
}

class CommandLineMatcherWorkerService(
retriever: Retriever[WorkStub],
val workMatcher: WorkMatcher
)(val workId: Option[String])(implicit val ec: ExecutionContext)
extends MatcherWorker
with Runnable {

def run(): Future[Unit] = workId match {
case Some(workId) => runWithId(workId)
case None => Future.failed(new RuntimeException("No work ID provided"))
}

private def runWithId(workId: String): Future[Unit] =
retriever
.apply(workId)
.flatMap(doWork)
.map { _.foreach(printResults) }

private def printResults(matcherResult: MatcherResult): Unit = {
info(s"Matcher result: ${matcherResult.works}")
}
}

class MatcherWorkerService[MsgDestination](
config: PipelineStorageConfig,
retriever: Retriever[WorkStub],
msgStream: SQSStream[NotificationMessage],
msgSender: MessageSender[MsgDestination],
workMatcher: WorkMatcher
)(implicit ec: ExecutionContext)
extends Logging
val workMatcher: WorkMatcher
)(implicit val ec: ExecutionContext)
extends MatcherWorker
with Runnable {

def run(): Future[Done] =
Expand All @@ -36,17 +81,12 @@ class MatcherWorkerService[MsgDestination](
}
)

def processMessage(workStub: WorkStub): Future[Unit] =
workMatcher
.matchWork(workStub)
.flatMap {
matcherResult =>
Future.fromTry(msgSender.sendT(matcherResult))
}
.recover {
case e: VersionExpectedConflictException =>
debug(
s"Not matching work due to version conflict exception: ${e.getMessage}"
)
}
def processMessage(workStub: WorkStub): Future[Unit] = {
doWork(workStub).flatMap {
case Some(matcherResult) =>
Future.fromTry(msgSender.sendT(matcherResult))
case None =>
Future.successful(())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ object ImagesRule extends ImageRule with ImageDataOps {
)(sierraTarget: Work.Visible[Identified]): FieldMergeResult[FieldData] = {
val metsImages =
getMetsPictureAndEphemeraImages(sierraTarget, sources).getOrElse(Nil)

val miroImages = mergeMetsLicenceIntoMiroLocation(
getPairedMiroImages(sierraTarget, sources).getOrElse(Nil),
metsImages
Expand Down

0 comments on commit 63ec5da

Please sign in to comment.