diff --git a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala index 61b4732f77..2d60765616 100644 --- a/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala +++ b/pipeline/relation_embedder/relation_embedder/src/main/scala/weco/pipeline/relation_embedder/BatchProcessor.scala @@ -137,6 +137,5 @@ object BatchProcessor { bulkWriter = batchWriter, downstream = Downstream(Some(config)) ) - } } diff --git a/pipeline/transformer/transformer_calm/src/test/scala/weco/pipeline/transformer/calm/services/CalmTransformerEndToEndTest.scala b/pipeline/transformer/transformer_calm/src/test/scala/weco/pipeline/transformer/calm/services/CalmTransformerEndToEndTest.scala index 4ea59f3adb..9cc89a91ca 100644 --- a/pipeline/transformer/transformer_calm/src/test/scala/weco/pipeline/transformer/calm/services/CalmTransformerEndToEndTest.scala +++ b/pipeline/transformer/transformer_calm/src/test/scala/weco/pipeline/transformer/calm/services/CalmTransformerEndToEndTest.scala @@ -87,7 +87,7 @@ class CalmTransformerEndToEndTest new TransformerWorker[CalmSourcePayload, CalmSourceData, String]( transformer = CalmTransformer, pipelineStream = pipelineStream, - retriever = retriever, + transformedWorkRetriever = retriever, sourceDataRetriever = new CalmSourceDataRetriever(store) ) worker.run() diff --git a/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerMain.scala b/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerMain.scala index d4e8223c28..c52ccca65e 100644 --- a/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerMain.scala +++ b/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerMain.scala @@ -55,7 +55,7 @@ class TransformerMain[Payload <: SourcePayload, SourceData]( new TransformerWorker( transformer = transformer, pipelineStream = pipelineStream, - retriever = sourceWorkRetriever, + transformedWorkRetriever = sourceWorkRetriever, sourceDataRetriever = sourceDataRetriever ) } diff --git a/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerWorker.scala b/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerWorker.scala index 0ea2007e02..716fa24aab 100644 --- a/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerWorker.scala +++ b/pipeline/transformer/transformer_common/src/main/scala/weco/pipeline/transformer/TransformerWorker.scala @@ -36,72 +36,29 @@ trait SourceDataRetriever[Payload, SourceData] { ): Either[ReadError, Identified[Version[String, Int], SourceData]] } -/** A TransformerWorker: - * - Takes an SQS stream that emits VHS keys - * - Gets the record of type `SourceData` - * - Runs it through a transformer and transforms the `SourceData` to - * `Work[Source]` - * - Emits the message via `MessageSender` to SNS - */ -final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( - transformer: Transformer[SourceData], - retriever: Retriever[Work[Source]], - pipelineStream: PipelineStorageStream[NotificationMessage, Work[ - Source - ], SenderDest], - sourceDataRetriever: SourceDataRetriever[Payload, SourceData] -)(implicit ec: ExecutionContext, decoder: Decoder[Payload]) - extends Logging - with Runnable { +trait TransformerEventProcessor[Payload <: SourcePayload, SourceData] + extends Logging { type Result[T] = Either[TransformerWorkerError, T] type StoreKey = Version[String, Int] - def name: String = this.getClass.getSimpleName - - def run(): Future[Done] = - pipelineStream.foreach( - name, - (notification: NotificationMessage) => - process(notification).map { - case Left(err) => - // We do some slightly nicer logging here to give context to the errors - err match { - case DecodePayloadError(_, notificationMsg) => - error(s"$name: DecodePayloadError from $notificationMsg") - case StoreReadError(_, key) => - error(s"$name: StoreReadError on $key") - case TransformerError(t, sourceData, key) => - error(s"$name: TransformerError on $sourceData with $key ($t)") - } - - throw err + implicit val ec: ExecutionContext + implicit val decoder: Decoder[Payload] - case Right(None) => - debug( - s"$name: no transformed Work returned for $notification (this means the Work is already in the pipeline)" - ) - Nil + val sourceDataRetriever: SourceDataRetriever[Payload, SourceData] + val transformer: Transformer[SourceData] + val transformedWorkRetriever: Retriever[Work[Source]] - case Right(Some((work, key))) => - info(s"$name: from $key transformed work with id ${work.id}") - List(work) - } - ) + val transformerName: String - def process( - message: NotificationMessage + def processEvent( + payload: Payload ): Future[Result[Option[(Work[Source], StoreKey)]]] = Future { - for { - payload <- decodePayload(message) - key = Version(payload.id, payload.version) - - _ = debug(s"Decoded payload $payload and key $key") + val key = Version(payload.id, payload.version) + for { getResult <- getSourceData(payload) (sourceData, version) = getResult - _ = debug(s"Retrieved sourceData version $version for key $key") - newWork <- work(sourceData, version, key) } yield (newWork, key) }.flatMap { compareToStored } @@ -123,7 +80,7 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( // Calm. The records get a new modifiedDate from Sierra, but none of the data // we care about for the pipeline is changed. case Right((transformedWork, key)) => - retriever + transformedWorkRetriever .apply(workIndexable.id(transformedWork)) .map { storedWork => @@ -131,7 +88,7 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( Right(Some((transformedWork, key))) } else { info( - s"$name: from $key transformed work with id ${transformedWork.id}; already in pipeline so not re-sending" + s"$transformerName: from $key transformed work with id ${transformedWork.id}; already in pipeline so not re-sending" ) Right(None) } @@ -156,12 +113,6 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( Left(TransformerError(err, sourceData, key)) } - private def decodePayload(message: NotificationMessage): Result[Payload] = - fromJson[Payload](message.body) match { - case Success(storeKey) => Right(storeKey) - case Failure(err) => Left(DecodePayloadError(err, message)) - } - private def getSourceData(p: Payload): Result[(SourceData, Int)] = sourceDataRetriever .lookupSourceData(p) @@ -172,7 +123,9 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( s"Stored ID ($storedId) does not match ID from message (${p.id})" ) } - + debug( + s"Retrieved sourceData version $storedVersion for key $storedId" + ) (sourceData, storedVersion) } .left @@ -247,3 +200,77 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( modifiedTransformedWork == modifiedSourceWork } } + +/** A TransformerWorker: + * - Takes an SQS stream that emits VHS keys + * - Gets the record of type `SourceData` + * - Runs it through a transformer and transforms the `SourceData` to + * `Work[Source]` + * - Emits the message via `MessageSender` to SNS + */ +final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest]( + val transformer: Transformer[SourceData], + val transformedWorkRetriever: Retriever[Work[Source]], + pipelineStream: PipelineStorageStream[NotificationMessage, Work[ + Source + ], SenderDest], + val sourceDataRetriever: SourceDataRetriever[Payload, SourceData] +)(implicit val ec: ExecutionContext, val decoder: Decoder[Payload]) + extends Logging + with TransformerEventProcessor[Payload, SourceData] + with Runnable { + + lazy val transformerName: String = this.getClass.getSimpleName + + def run(): Future[Done] = + pipelineStream.foreach( + transformerName, + (notification: NotificationMessage) => + process(notification).map { + case Left(err) => + // We do some slightly nicer logging here to give context to the errors + err match { + case DecodePayloadError(_, notificationMsg) => + error( + s"$transformerName: DecodePayloadError from $notificationMsg" + ) + case StoreReadError(_, key) => + error(s"$transformerName: StoreReadError on $key") + case TransformerError(t, sourceData, key) => + error( + s"$transformerName: TransformerError on $sourceData with $key ($t)" + ) + } + + throw err + + case Right(None) => + debug( + s"$transformerName: no transformed Work returned for $notification (this means the Work is already in the pipeline)" + ) + Nil + + case Right(Some((work, key))) => + info( + s"$transformerName: from $key transformed work with id ${work.id}" + ) + List(work) + } + ) + + def process( + message: NotificationMessage + ): Future[Result[Option[(Work[Source], StoreKey)]]] = { + decodePayload(message).flatMap { + payload => + debug(s"Decoded payload $payload, successfully") + processEvent(payload) + } + } + + private def decodePayload(message: NotificationMessage): Future[Payload] = + fromJson[Payload](message.body) match { + case Success(storeKey) => Future.successful(storeKey) + case Failure(err) => Future.failed(DecodePayloadError(err, message)) + } +} diff --git a/pipeline/transformer/transformer_common/src/test/scala/weco/pipeline/transformer/TransformerWorkerTest.scala b/pipeline/transformer/transformer_common/src/test/scala/weco/pipeline/transformer/TransformerWorkerTest.scala index e42184c126..fea6c93859 100644 --- a/pipeline/transformer/transformer_common/src/test/scala/weco/pipeline/transformer/TransformerWorkerTest.scala +++ b/pipeline/transformer/transformer_common/src/test/scala/weco/pipeline/transformer/TransformerWorkerTest.scala @@ -451,7 +451,7 @@ class TransformerWorkerTest val worker = new TransformerWorker( transformer = transformer, pipelineStream = pipelineStream, - retriever = retriever, + transformedWorkRetriever = retriever, sourceDataRetriever = new ExampleSourcePayloadLookup(sourceStore = store) ) diff --git a/pipeline/transformer/transformer_mets/src/test/scala/weco/pipeline/transformer/mets/services/MetsTransformerEndToEndTest.scala b/pipeline/transformer/transformer_mets/src/test/scala/weco/pipeline/transformer/mets/services/MetsTransformerEndToEndTest.scala index dfa4d5285a..6df06b87ea 100644 --- a/pipeline/transformer/transformer_mets/src/test/scala/weco/pipeline/transformer/mets/services/MetsTransformerEndToEndTest.scala +++ b/pipeline/transformer/transformer_mets/src/test/scala/weco/pipeline/transformer/mets/services/MetsTransformerEndToEndTest.scala @@ -94,7 +94,7 @@ class MetsTransformerEndToEndTest new TransformerWorker[MetsSourcePayload, MetsSourceData, String]( transformer = new MetsXmlTransformer(store), pipelineStream = pipelineStream, - retriever = retriever, + transformedWorkRetriever = retriever, sourceDataRetriever = new MetsSourceDataRetriever ) worker.run() diff --git a/pipeline/transformer/transformer_miro/src/test/scala/weco/pipeline/transformer/miro/services/MiroTransformerEndToEndTest.scala b/pipeline/transformer/transformer_miro/src/test/scala/weco/pipeline/transformer/miro/services/MiroTransformerEndToEndTest.scala index 9171a74b32..5d39379858 100644 --- a/pipeline/transformer/transformer_miro/src/test/scala/weco/pipeline/transformer/miro/services/MiroTransformerEndToEndTest.scala +++ b/pipeline/transformer/transformer_miro/src/test/scala/weco/pipeline/transformer/miro/services/MiroTransformerEndToEndTest.scala @@ -98,7 +98,7 @@ class MiroTransformerEndToEndTest ]( transformer = new MiroRecordTransformer, pipelineStream = pipelineStream, - retriever = retriever, + transformedWorkRetriever = retriever, sourceDataRetriever = new MiroSourceDataRetriever(store) ) worker.run() diff --git a/pipeline/transformer/transformer_sierra/src/test/scala/weco/pipeline/transformer/sierra/services/SierraTransformerEndToEndTest.scala b/pipeline/transformer/transformer_sierra/src/test/scala/weco/pipeline/transformer/sierra/services/SierraTransformerEndToEndTest.scala index 16a88d0976..b623092b00 100644 --- a/pipeline/transformer/transformer_sierra/src/test/scala/weco/pipeline/transformer/sierra/services/SierraTransformerEndToEndTest.scala +++ b/pipeline/transformer/transformer_sierra/src/test/scala/weco/pipeline/transformer/sierra/services/SierraTransformerEndToEndTest.scala @@ -102,7 +102,7 @@ class SierraTransformerEndToEndTest version: Int ) => SierraTransformer(transformable, version).toEither, pipelineStream = pipelineStream, - retriever = retriever, + transformedWorkRetriever = retriever, sourceDataRetriever = new SierraSourceDataRetriever(store) ) worker.run() diff --git a/pipeline/transformer/transformer_tei/src/test/scala/weco/pipeline/transformer/tei/service/TeiTransformerEndToEndTest.scala b/pipeline/transformer/transformer_tei/src/test/scala/weco/pipeline/transformer/tei/service/TeiTransformerEndToEndTest.scala index f18cd06446..262e22a11f 100644 --- a/pipeline/transformer/transformer_tei/src/test/scala/weco/pipeline/transformer/tei/service/TeiTransformerEndToEndTest.scala +++ b/pipeline/transformer/transformer_tei/src/test/scala/weco/pipeline/transformer/tei/service/TeiTransformerEndToEndTest.scala @@ -94,7 +94,7 @@ class TeiTransformerEndToEndTest new TransformerWorker[TeiSourcePayload, TeiMetadata, String]( transformer = new TeiTransformer(store), pipelineStream = pipelineStream, - retriever = retriever, + transformedWorkRetriever = retriever, sourceDataRetriever = new TeiSourceDataRetriever ) worker.run()