Skip to content

Commit

Permalink
Restart manifest generations when manifest has expired (#6441)
Browse files Browse the repository at this point in the history
  • Loading branch information
hannes-ucsc committed Dec 12, 2024
1 parent 50efa34 commit f2b6ac4
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 79 deletions.
54 changes: 41 additions & 13 deletions src/azul/service/async_manifest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
from typing import (
Self,
TypedDict,
)
from uuid import (
UUID,
Expand Down Expand Up @@ -35,15 +36,26 @@ class InvalidTokenError(Exception):
@attrs.frozen(kw_only=True)
class Token:
"""
Represents an ongoing manifest generation
Represents a Step Function execution to generate a manifest
"""
#: A hash of the inputs
generation_id: UUID = strict_auto()

#: Number of prior executions for the generation represented by this token
iteration: int = strict_auto()

#: The number of times the service received a request to inspect the
#: status of the execution represented by this token
request_index: int = strict_auto()

#: How long clients should wait before requesting a status update about the
#: execution represented by the token
retry_after: int = strict_auto()

def pack(self) -> bytes:
return msgpack.packb([
self.generation_id.bytes,
self.iteration,
self.request_index,
self.retry_after
])
Expand All @@ -52,6 +64,7 @@ def pack(self) -> bytes:
def unpack(cls, pack: bytes) -> Self:
i = iter(msgpack.unpackb(pack))
return cls(generation_id=UUID(bytes=next(i)),
iteration=next(i),
request_index=next(i),
retry_after=next(i))

Expand All @@ -66,8 +79,9 @@ def decode(cls, token: str) -> Self:
raise InvalidTokenError(token) from e

@classmethod
def first(cls, generation_id: UUID) -> Self:
def first(cls, generation_id: UUID, iteration: int) -> Self:
return cls(generation_id=generation_id,
iteration=iteration,
request_index=0,
retry_after=cls._next_retry_after(0))

Expand All @@ -87,11 +101,21 @@ def _next_retry_after(cls, request_index: int) -> int:
return delays[-1]


class ExecutionResult(TypedDict):
input: JSON
output: JSON


@attrs.frozen
class NoSuchGeneration(Exception):
token: Token = strict_auto()


@attrs.frozen
class GenerationFinished(Exception):
token: Token = strict_auto()


@attrs.frozen(kw_only=True)
class GenerationFailed(Exception):
status: str = strict_auto()
Expand All @@ -112,14 +136,18 @@ class AsyncManifestService:
def machine_name(self):
return config.qualified_resource_name(config.manifest_sfn)

def start_generation(self, generation_id: UUID, input: JSON) -> Token:
execution_name = self.execution_name(generation_id)
def start_generation(self,
generation_id: UUID,
input: JSON,
iteration: int
) -> Token:
execution_name = self.execution_name(generation_id, iteration)
execution_arn = self.execution_arn(execution_name)
# The input contains the verbatim manifest key as JSON while the ARN
# contains the encoded hash of the key so this log line is useful for
# associating the hash with the key for diagnostic purposes.
log.info('Starting execution %r for input %r', execution_arn, input)
token = Token.first(generation_id)
token = Token.first(generation_id, iteration)
try:
# If there already is an execution of the given name, and if that
# execution is still ongoing and was given the same input as what we
Expand All @@ -145,16 +173,16 @@ def start_generation(self, generation_id: UUID, input: JSON) -> Token:
execution = self._sfn.describe_execution(executionArn=execution_arn)
if input == json.loads(execution['input']):
log.info('A completed execution %r already exists', execution_arn)
return token
raise GenerationFinished(token)
else:
raise InvalidGeneration(token)
else:
assert execution_arn == execution['executionArn'], execution
assert execution_arn == execution['executionArn'], (execution_arn, execution)
log.info('Started execution %r or it was already running', execution_arn)
return token

def inspect_generation(self, token: Token) -> Token | JSON:
execution_name = self.execution_name(token.generation_id)
def inspect_generation(self, token: Token) -> Token | ExecutionResult:
execution_name = self.execution_name(token.generation_id, token.iteration)
execution_arn = self.execution_arn(execution_name)
try:
execution = self._sfn.describe_execution(executionArn=execution_arn)
Expand All @@ -169,7 +197,7 @@ def inspect_generation(self, token: Token) -> Token | JSON:
return token.next(retry_after=1)
else:
log.info('Execution %r succeeded with output %r', execution_arn, output)
return json.loads(output)
return {k: json.loads(execution[k]) for k in ['input', 'output']}
elif status == 'RUNNING':
log.info('Execution %r is still running', execution_arn)
return token.next()
Expand All @@ -186,10 +214,10 @@ def machine_arn(self):
def execution_arn(self, execution_name):
return self.arn(f'execution:{self.machine_name}:{execution_name}')

def execution_name(self, generation_id: UUID) -> str:
def execution_name(self, generation_id: UUID, iteration: int) -> str:
assert isinstance(generation_id, UUID)
execution_name = str(generation_id)
assert 0 < len(execution_name) <= 80, (generation_id, execution_name)
execution_name = f'{generation_id}_{iteration}'
assert 0 < len(execution_name) <= 80, execution_name
return execution_name

@property
Expand Down
84 changes: 59 additions & 25 deletions src/azul/service/manifest_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
)
from azul.service.async_manifest_service import (
AsyncManifestService,
GenerationFinished,
GenerationFailed,
InvalidTokenError,
NoSuchGeneration,
Expand Down Expand Up @@ -127,6 +128,33 @@ def _unpack_token_or_key(self,
# The OpenAPI spec doesn't distinguish key and token
raise BadRequestError('Invalid token')

def _start_execution(self,
filters: Filters,
manifest_key: ManifestKey,
previous_token: Token | None = None,
) -> Token:
partition = ManifestPartition.first()
state: ManifestGenerationState = {
'filters': filters.to_json(),
'manifest_key': manifest_key.to_json(),
'partition': partition.to_json()
}
# Manifest keys for catalogs with long names would be too long to be
# used directly as state machine execution names.
generation_id = manifest_key.uuid
# ManifestGenerationState is also JSON but there is no way to express
# that since TypedDict rejects a co-parent class.
input = cast(JSON, state)
next_iteration = 0 if previous_token is None else previous_token.iteration + 1
for i in range(10):
try:
return self.async_service.start_generation(generation_id,
input,
iteration=next_iteration + i)
except GenerationFinished:
pass
raise ChaliceViewError('Too many executions of this manifest generation')

def get_manifest_async(self,
*,
token_or_key: str,
Expand All @@ -152,19 +180,8 @@ def get_manifest_async(self,
# A cache miss, but the exception tells us the cache key
manifest, manifest_key = None, e.manifest_key
# Prepare the execution that will generate the manifest
partition = ManifestPartition.first()
state: ManifestGenerationState = {
'filters': filters.to_json(),
'manifest_key': manifest_key.to_json(),
'partition': partition.to_json()
}
# Manifest keys for catalogs with long names would be too
# long to be used directly in state machine execution names.
generation_id = manifest_key.uuid
# ManifestGenerationState is also JSON but there is no way
# to express that since TypedDict rejects a co-parent class.
input = cast(JSON, state)
token = self.async_service.start_generation(generation_id, input)
token = self._start_execution(filters=filters,
manifest_key=manifest_key)
else:
# A cache hit
manifest_key = manifest.manifest_key
Expand All @@ -180,31 +197,48 @@ def get_manifest_async(self,
manifest_key = self.service.verify_manifest_key(manifest_key)
manifest = self.service.get_cached_manifest_with_key(manifest_key)
except CachedManifestNotFound:
# We can't restart the execution to regenerate the manifest
# because we don't know the parameters that were used to
# create it. So the client will have to do it.
raise GoneError('The requested manifest has expired, please request a new one')
# We could start another execution but that would require
# the client to follow more redirects. We've already sent
# the final 302 so we shouldn't that.
raise GoneError('The manifest has expired, please request a new one')
except InvalidManifestKeySignature:
raise BadRequestError('Invalid token')
else:
# A token for an ongoing execution was given
assert manifest_key is None, manifest_key
try:
token_or_state = self.async_service.inspect_generation(token)
token_or_result = self.async_service.inspect_generation(token)
except NoSuchGeneration:
raise BadRequestError('Invalid token')
except GenerationFailed as e:
raise ChaliceViewError('Failed to generate manifest', e.status, e.output)
if isinstance(token_or_state, Token):
if isinstance(token_or_result, Token):
# Execution is still ongoing, we got an updated token
token, manifest, manifest_key = token_or_state, None, None
elif isinstance(token_or_state, dict):
# Eecution is done, a cached manifest should be ready
state = token_or_state
manifest = Manifest.from_json(state['manifest'])
token, manifest, manifest_key = token_or_result, None, None
elif isinstance(token_or_result, dict):
# The execution is done, the resulting manifest should be ready
result = token_or_result
manifest = Manifest.from_json(result['output']['manifest'])
manifest_key = manifest.manifest_key
try:
manifest = self.service.get_cached_manifest_with_key(manifest_key)
except CachedManifestNotFound as e:
assert manifest_key == e.manifest_key
# There are two possible causes for the missing manifest: it
# may have expired, in which case the supplied token must be
# really stale, or the manifest was deleted immediately
# after it was created. We haven't sent a 302 redirect yet,
# so we'll just restart the generation by starting another
# execution for it.
manifest = None
filters = Filters.from_json(result['input']['filters'])
token = self._start_execution(filters=filters,
manifest_key=manifest_key,
previous_token=token)
else:
assert manifest_key == manifest.manifest_key
else:
assert False, token_or_state
assert False, token_or_result

body: dict[str, int | str | FlatJSON]

Expand Down
Loading

0 comments on commit f2b6ac4

Please sign in to comment.