Skip to content

Commit

Permalink
Extract processEvent from TransformerMain (#2792)
Browse files Browse the repository at this point in the history
* split event from flow

* push decoding message up a level
  • Loading branch information
kenoir authored Jan 6, 2025
1 parent 9272771 commit 9421f10
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,5 @@ object BatchProcessor {
bulkWriter = batchWriter,
downstream = Downstream(Some(config))
)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class CalmTransformerEndToEndTest
new TransformerWorker[CalmSourcePayload, CalmSourceData, String](
transformer = CalmTransformer,
pipelineStream = pipelineStream,
retriever = retriever,
transformedWorkRetriever = retriever,
sourceDataRetriever = new CalmSourceDataRetriever(store)
)
worker.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TransformerMain[Payload <: SourcePayload, SourceData](
new TransformerWorker(
transformer = transformer,
pipelineStream = pipelineStream,
retriever = sourceWorkRetriever,
transformedWorkRetriever = sourceWorkRetriever,
sourceDataRetriever = sourceDataRetriever
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,72 +36,29 @@ trait SourceDataRetriever[Payload, SourceData] {
): Either[ReadError, Identified[Version[String, Int], SourceData]]
}

/** A TransformerWorker:
* - Takes an SQS stream that emits VHS keys
* - Gets the record of type `SourceData`
* - Runs it through a transformer and transforms the `SourceData` to
* `Work[Source]`
* - Emits the message via `MessageSender` to SNS
*/
final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest](
transformer: Transformer[SourceData],
retriever: Retriever[Work[Source]],
pipelineStream: PipelineStorageStream[NotificationMessage, Work[
Source
], SenderDest],
sourceDataRetriever: SourceDataRetriever[Payload, SourceData]
)(implicit ec: ExecutionContext, decoder: Decoder[Payload])
extends Logging
with Runnable {
trait TransformerEventProcessor[Payload <: SourcePayload, SourceData]
extends Logging {
type Result[T] = Either[TransformerWorkerError, T]
type StoreKey = Version[String, Int]

def name: String = this.getClass.getSimpleName

def run(): Future[Done] =
pipelineStream.foreach(
name,
(notification: NotificationMessage) =>
process(notification).map {
case Left(err) =>
// We do some slightly nicer logging here to give context to the errors
err match {
case DecodePayloadError(_, notificationMsg) =>
error(s"$name: DecodePayloadError from $notificationMsg")
case StoreReadError(_, key) =>
error(s"$name: StoreReadError on $key")
case TransformerError(t, sourceData, key) =>
error(s"$name: TransformerError on $sourceData with $key ($t)")
}

throw err
implicit val ec: ExecutionContext
implicit val decoder: Decoder[Payload]

case Right(None) =>
debug(
s"$name: no transformed Work returned for $notification (this means the Work is already in the pipeline)"
)
Nil
val sourceDataRetriever: SourceDataRetriever[Payload, SourceData]
val transformer: Transformer[SourceData]
val transformedWorkRetriever: Retriever[Work[Source]]

case Right(Some((work, key))) =>
info(s"$name: from $key transformed work with id ${work.id}")
List(work)
}
)
val transformerName: String

def process(
message: NotificationMessage
def processEvent(
payload: Payload
): Future[Result[Option[(Work[Source], StoreKey)]]] =
Future {
for {
payload <- decodePayload(message)
key = Version(payload.id, payload.version)

_ = debug(s"Decoded payload $payload and key $key")
val key = Version(payload.id, payload.version)

for {
getResult <- getSourceData(payload)
(sourceData, version) = getResult
_ = debug(s"Retrieved sourceData version $version for key $key")

newWork <- work(sourceData, version, key)
} yield (newWork, key)
}.flatMap { compareToStored }
Expand All @@ -123,15 +80,15 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest](
// Calm. The records get a new modifiedDate from Sierra, but none of the data
// we care about for the pipeline is changed.
case Right((transformedWork, key)) =>
retriever
transformedWorkRetriever
.apply(workIndexable.id(transformedWork))
.map {
storedWork =>
if (shouldSend(transformedWork, storedWork)) {
Right(Some((transformedWork, key)))
} else {
info(
s"$name: from $key transformed work with id ${transformedWork.id}; already in pipeline so not re-sending"
s"$transformerName: from $key transformed work with id ${transformedWork.id}; already in pipeline so not re-sending"
)
Right(None)
}
Expand All @@ -156,12 +113,6 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest](
Left(TransformerError(err, sourceData, key))
}

private def decodePayload(message: NotificationMessage): Result[Payload] =
fromJson[Payload](message.body) match {
case Success(storeKey) => Right(storeKey)
case Failure(err) => Left(DecodePayloadError(err, message))
}

private def getSourceData(p: Payload): Result[(SourceData, Int)] =
sourceDataRetriever
.lookupSourceData(p)
Expand All @@ -172,7 +123,9 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest](
s"Stored ID ($storedId) does not match ID from message (${p.id})"
)
}

debug(
s"Retrieved sourceData version $storedVersion for key $storedId"
)
(sourceData, storedVersion)
}
.left
Expand Down Expand Up @@ -247,3 +200,77 @@ final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest](
modifiedTransformedWork == modifiedSourceWork
}
}

/** A TransformerWorker:
* - Takes an SQS stream that emits VHS keys
* - Gets the record of type `SourceData`
* - Runs it through a transformer and transforms the `SourceData` to
* `Work[Source]`
* - Emits the message via `MessageSender` to SNS
*/
final class TransformerWorker[Payload <: SourcePayload, SourceData, SenderDest](
val transformer: Transformer[SourceData],
val transformedWorkRetriever: Retriever[Work[Source]],
pipelineStream: PipelineStorageStream[NotificationMessage, Work[
Source
], SenderDest],
val sourceDataRetriever: SourceDataRetriever[Payload, SourceData]
)(implicit val ec: ExecutionContext, val decoder: Decoder[Payload])
extends Logging
with TransformerEventProcessor[Payload, SourceData]
with Runnable {

lazy val transformerName: String = this.getClass.getSimpleName

def run(): Future[Done] =
pipelineStream.foreach(
transformerName,
(notification: NotificationMessage) =>
process(notification).map {
case Left(err) =>
// We do some slightly nicer logging here to give context to the errors
err match {
case DecodePayloadError(_, notificationMsg) =>
error(
s"$transformerName: DecodePayloadError from $notificationMsg"
)
case StoreReadError(_, key) =>
error(s"$transformerName: StoreReadError on $key")
case TransformerError(t, sourceData, key) =>
error(
s"$transformerName: TransformerError on $sourceData with $key ($t)"
)
}

throw err

case Right(None) =>
debug(
s"$transformerName: no transformed Work returned for $notification (this means the Work is already in the pipeline)"
)
Nil

case Right(Some((work, key))) =>
info(
s"$transformerName: from $key transformed work with id ${work.id}"
)
List(work)
}
)

def process(
message: NotificationMessage
): Future[Result[Option[(Work[Source], StoreKey)]]] = {
decodePayload(message).flatMap {
payload =>
debug(s"Decoded payload $payload, successfully")
processEvent(payload)
}
}

private def decodePayload(message: NotificationMessage): Future[Payload] =
fromJson[Payload](message.body) match {
case Success(storeKey) => Future.successful(storeKey)
case Failure(err) => Future.failed(DecodePayloadError(err, message))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ class TransformerWorkerTest
val worker = new TransformerWorker(
transformer = transformer,
pipelineStream = pipelineStream,
retriever = retriever,
transformedWorkRetriever = retriever,
sourceDataRetriever =
new ExampleSourcePayloadLookup(sourceStore = store)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class MetsTransformerEndToEndTest
new TransformerWorker[MetsSourcePayload, MetsSourceData, String](
transformer = new MetsXmlTransformer(store),
pipelineStream = pipelineStream,
retriever = retriever,
transformedWorkRetriever = retriever,
sourceDataRetriever = new MetsSourceDataRetriever
)
worker.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class MiroTransformerEndToEndTest
](
transformer = new MiroRecordTransformer,
pipelineStream = pipelineStream,
retriever = retriever,
transformedWorkRetriever = retriever,
sourceDataRetriever = new MiroSourceDataRetriever(store)
)
worker.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class SierraTransformerEndToEndTest
version: Int
) => SierraTransformer(transformable, version).toEither,
pipelineStream = pipelineStream,
retriever = retriever,
transformedWorkRetriever = retriever,
sourceDataRetriever = new SierraSourceDataRetriever(store)
)
worker.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class TeiTransformerEndToEndTest
new TransformerWorker[TeiSourcePayload, TeiMetadata, String](
transformer = new TeiTransformer(store),
pipelineStream = pipelineStream,
retriever = retriever,
transformedWorkRetriever = retriever,
sourceDataRetriever = new TeiSourceDataRetriever
)
worker.run()
Expand Down

0 comments on commit 9421f10

Please sign in to comment.