From 6ec7b4f7ce75d6b753570deb15d2d18d773b4e12 Mon Sep 17 00:00:00 2001 From: Hannes Schmidt Date: Sat, 14 Dec 2024 21:56:52 -0800 Subject: [PATCH] Restart manifest generations when manifest has expired (#6441) --- src/azul/service/async_manifest_service.py | 59 +++++-- src/azul/service/manifest_controller.py | 84 +++++++--- test/integration_test.py | 84 ++++++---- test/service/test_manifest_async.py | 182 ++++++++++++++++----- 4 files changed, 302 insertions(+), 107 deletions(-) diff --git a/src/azul/service/async_manifest_service.py b/src/azul/service/async_manifest_service.py index c85a62e15..6d4dbac3e 100644 --- a/src/azul/service/async_manifest_service.py +++ b/src/azul/service/async_manifest_service.py @@ -3,6 +3,7 @@ import logging from typing import ( Self, + TypedDict, ) from uuid import ( UUID, @@ -35,15 +36,30 @@ class InvalidTokenError(Exception): @attrs.frozen(kw_only=True) class Token: """ - Represents an ongoing manifest generation + Represents a Step Function execution to generate a manifest """ + #: A hash of the inputs generation_id: UUID = strict_auto() + + #: Number of prior executions for the generation represented by this token + iteration: int = strict_auto() + + #: The number of times the service received a request to inspect the + #: status of the execution represented by this token request_index: int = strict_auto() + + #: How long clients should wait before requesting a status update about the + #: execution represented by the token retry_after: int = strict_auto() + @property + def execution_id(self) -> tuple[UUID, int]: + return self.generation_id, self.iteration + def pack(self) -> bytes: return msgpack.packb([ self.generation_id.bytes, + self.iteration, self.request_index, self.retry_after ]) @@ -52,6 +68,7 @@ def pack(self) -> bytes: def unpack(cls, pack: bytes) -> Self: i = iter(msgpack.unpackb(pack)) return cls(generation_id=UUID(bytes=next(i)), + iteration=next(i), request_index=next(i), retry_after=next(i)) @@ -66,8 +83,9 @@ def decode(cls, token: str) -> Self: raise InvalidTokenError(token) from e @classmethod - def first(cls, generation_id: UUID) -> Self: + def first(cls, generation_id: UUID, iteration: int) -> Self: return cls(generation_id=generation_id, + iteration=iteration, request_index=0, retry_after=cls._next_retry_after(0)) @@ -87,11 +105,21 @@ def _next_retry_after(cls, request_index: int) -> int: return delays[-1] +class ExecutionResult(TypedDict): + input: JSON + output: JSON + + @attrs.frozen class NoSuchGeneration(Exception): token: Token = strict_auto() +@attrs.frozen +class GenerationFinished(Exception): + token: Token = strict_auto() + + @attrs.frozen(kw_only=True) class GenerationFailed(Exception): status: str = strict_auto() @@ -112,14 +140,18 @@ class AsyncManifestService: def machine_name(self): return config.qualified_resource_name(config.manifest_sfn) - def start_generation(self, generation_id: UUID, input: JSON) -> Token: - execution_name = self.execution_name(generation_id) + def start_generation(self, + generation_id: UUID, + input: JSON, + iteration: int + ) -> Token: + execution_name = self.execution_name(generation_id, iteration) execution_arn = self.execution_arn(execution_name) # The input contains the verbatim manifest key as JSON while the ARN # contains the encoded hash of the key so this log line is useful for # associating the hash with the key for diagnostic purposes. log.info('Starting execution %r for input %r', execution_arn, input) - token = Token.first(generation_id) + token = Token.first(generation_id, iteration) try: # If there already is an execution of the given name, and if that # execution is still ongoing and was given the same input as what we @@ -145,16 +177,16 @@ def start_generation(self, generation_id: UUID, input: JSON) -> Token: execution = self._sfn.describe_execution(executionArn=execution_arn) if input == json.loads(execution['input']): log.info('A completed execution %r already exists', execution_arn) - return token + raise GenerationFinished(token) else: raise InvalidGeneration(token) else: - assert execution_arn == execution['executionArn'], execution + assert execution_arn == execution['executionArn'], (execution_arn, execution) log.info('Started execution %r or it was already running', execution_arn) return token - def inspect_generation(self, token: Token) -> Token | JSON: - execution_name = self.execution_name(token.generation_id) + def inspect_generation(self, token: Token) -> Token | ExecutionResult: + execution_name = self.execution_name(token.generation_id, token.iteration) execution_arn = self.execution_arn(execution_name) try: execution = self._sfn.describe_execution(executionArn=execution_arn) @@ -169,7 +201,7 @@ def inspect_generation(self, token: Token) -> Token | JSON: return token.next(retry_after=1) else: log.info('Execution %r succeeded with output %r', execution_arn, output) - return json.loads(output) + return {k: json.loads(execution[k]) for k in ['input', 'output']} elif status == 'RUNNING': log.info('Execution %r is still running', execution_arn) return token.next() @@ -186,10 +218,11 @@ def machine_arn(self): def execution_arn(self, execution_name): return self.arn(f'execution:{self.machine_name}:{execution_name}') - def execution_name(self, generation_id: UUID) -> str: + def execution_name(self, generation_id: UUID, iteration: int) -> str: assert isinstance(generation_id, UUID), generation_id - execution_name = str(generation_id) - assert 0 < len(execution_name) <= 80, (generation_id, execution_name) + assert isinstance(iteration, int), iteration + execution_name = f'{generation_id}_{iteration}' + assert 0 < len(execution_name) <= 80, execution_name return execution_name @property diff --git a/src/azul/service/manifest_controller.py b/src/azul/service/manifest_controller.py index 1f86312fb..d94cff1ff 100644 --- a/src/azul/service/manifest_controller.py +++ b/src/azul/service/manifest_controller.py @@ -37,6 +37,7 @@ from azul.service.async_manifest_service import ( AsyncManifestService, GenerationFailed, + GenerationFinished, InvalidTokenError, NoSuchGeneration, Token, @@ -127,6 +128,33 @@ def _unpack_token_or_key(self, # The OpenAPI spec doesn't distinguish key and token raise BadRequestError('Invalid token') + def _start_execution(self, + filters: Filters, + manifest_key: ManifestKey, + previous_token: Token | None = None, + ) -> Token: + partition = ManifestPartition.first() + state: ManifestGenerationState = { + 'filters': filters.to_json(), + 'manifest_key': manifest_key.to_json(), + 'partition': partition.to_json() + } + # Manifest keys for catalogs with long names would be too long to be + # used directly as state machine execution names. + generation_id = manifest_key.uuid + # ManifestGenerationState is also JSON but there is no way to express + # that since TypedDict rejects a co-parent class. + input = cast(JSON, state) + next_iteration = 0 if previous_token is None else previous_token.iteration + 1 + for i in range(10): + try: + return self.async_service.start_generation(generation_id, + input, + iteration=next_iteration + i) + except GenerationFinished: + pass + raise ChaliceViewError('Too many executions of this manifest generation') + def get_manifest_async(self, *, token_or_key: str, @@ -152,19 +180,8 @@ def get_manifest_async(self, # A cache miss, but the exception tells us the cache key manifest, manifest_key = None, e.manifest_key # Prepare the execution that will generate the manifest - partition = ManifestPartition.first() - state: ManifestGenerationState = { - 'filters': filters.to_json(), - 'manifest_key': manifest_key.to_json(), - 'partition': partition.to_json() - } - # Manifest keys for catalogs with long names would be too - # long to be used directly in state machine execution names. - generation_id = manifest_key.uuid - # ManifestGenerationState is also JSON but there is no way - # to express that since TypedDict rejects a co-parent class. - input = cast(JSON, state) - token = self.async_service.start_generation(generation_id, input) + token = self._start_execution(filters=filters, + manifest_key=manifest_key) else: # A cache hit manifest_key = manifest.manifest_key @@ -180,31 +197,48 @@ def get_manifest_async(self, manifest_key = self.service.verify_manifest_key(manifest_key) manifest = self.service.get_cached_manifest_with_key(manifest_key) except CachedManifestNotFound: - # We can't restart the execution to regenerate the manifest - # because we don't know the parameters that were used to - # create it. So the client will have to do it. - raise GoneError('The requested manifest has expired, please request a new one') + # We could start another execution but that would require + # the client to follow more redirects. We've already sent + # the final 302 so we shouldn't that. + raise GoneError('The manifest has expired, please request a new one') except InvalidManifestKeySignature: raise BadRequestError('Invalid token') else: # A token for an ongoing execution was given assert manifest_key is None, manifest_key try: - token_or_state = self.async_service.inspect_generation(token) + token_or_result = self.async_service.inspect_generation(token) except NoSuchGeneration: raise BadRequestError('Invalid token') except GenerationFailed as e: raise ChaliceViewError('Failed to generate manifest', e.status, e.output) - if isinstance(token_or_state, Token): + if isinstance(token_or_result, Token): # Execution is still ongoing, we got an updated token - token, manifest, manifest_key = token_or_state, None, None - elif isinstance(token_or_state, dict): - # Eecution is done, a cached manifest should be ready - state = token_or_state - manifest = Manifest.from_json(state['manifest']) + token, manifest, manifest_key = token_or_result, None, None + elif isinstance(token_or_result, dict): + # The execution is done, the resulting manifest should be ready + result = token_or_result + manifest = Manifest.from_json(result['output']['manifest']) manifest_key = manifest.manifest_key + try: + manifest = self.service.get_cached_manifest_with_key(manifest_key) + except CachedManifestNotFound as e: + assert manifest_key == e.manifest_key + # There are two possible causes for the missing manifest: it + # may have expired, in which case the supplied token must be + # really stale, or the manifest was deleted immediately + # after it was created. We haven't sent a 302 redirect yet, + # so we'll just restart the generation by starting another + # execution for it. + manifest = None + filters = Filters.from_json(result['input']['filters']) + token = self._start_execution(filters=filters, + manifest_key=manifest_key, + previous_token=token) + else: + assert manifest_key == manifest.manifest_key else: - assert False, token_or_state + assert False, token_or_result body: dict[str, int | str | FlatJSON] diff --git a/test/integration_test.py b/test/integration_test.py index dc40995d8..9fc5e7965 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -109,6 +109,9 @@ from azul.csp import ( CSP, ) +from azul.deployment import ( + aws, +) from azul.drs import ( AccessMethod, ) @@ -532,9 +535,10 @@ def _test_manifest(self, catalog: CatalogName): assert supported_formats for format in [None, *supported_formats]: filters = self._manifest_filters(catalog) - first_fetch = bool(self.random.getrandbits(1)) - for fetch in [first_fetch, not first_fetch]: - with self.subTest('manifest', catalog=catalog, format=format, fetch=fetch): + execution_ids = set() + coin_flip = bool(self.random.getrandbits(1)) + for i, fetch in enumerate([coin_flip, coin_flip, not coin_flip]): + with self.subTest('manifest', catalog=catalog, format=format, i=i, fetch=fetch): args = dict(catalog=catalog, filters=json.dumps(filters)) if format is None: format = first(supported_formats) @@ -565,13 +569,22 @@ def worker(_): results = list(tpe.map(worker, range(num_workers))) self.assertEqual([None] * num_workers, results) - execution_ids = self._manifest_execution_ids(responses, fetch=fetch) - # The second iteration of the inner-most loop re-requests - # the manifest with only `fetch` being different. In that - # case, the manifest will already be cached and no step - # function execution is expected to have been started. - expect_execution = fetch == first_fetch - self.assertEqual(1 if expect_execution else 0, len(execution_ids)) + + execution_ids.update(self._manifest_execution_ids(responses)) + bucket, key = one(self._manifest_objects(responses)) + if i == 0: + aws.s3.delete_object(Bucket=bucket, Key=key) + # One execution to generate the manifest + self.assertEqual(1, len(execution_ids)) + elif i == 1: + # One more execution to re-generate the manifest + self.assertEqual(2, len(execution_ids)) + elif i == 2: + # Only fetch mode changed, cached manifest will be used, + # and no additional executions are expectect + self.assertEqual(2, len(execution_ids)) + else: + assert False def _manifest_filters(self, catalog: CatalogName) -> JSON: # IT catalogs with just one public source are always indexed completely @@ -642,28 +655,43 @@ def _test_manifest_tagging_race(self, catalog: CatalogName): self._manifest_validators[format](catalog, response.data) break - execution_ids = self._manifest_execution_ids(responses, fetch=False) + execution_ids = self._manifest_execution_ids(responses) self.assertEqual(1, len(execution_ids)) def _manifest_execution_ids(self, - responses: list[urllib3.HTTPResponse], - *, - fetch: bool - ) -> set[uuid.UUID]: - urls: list[furl] - if fetch: - responses = [ - json.loads(r.data) - for r in responses - if r.status == 200 and r.headers['Content-Type'] == 'application/json' - ] - urls = [furl(r['Location']) for r in responses if r['Status'] == 301] - else: - urls = [furl(r.headers['Location']) for r in responses if r.status == 301] + responses: list[urllib3.HTTPResponse] + ) -> set[tuple[uuid.UUID, int]]: + urls = self._manifest_urls(responses, status=301) tokens = {Token.decode(url.path.segments[-1]) for url in urls} - execution_ids = {token.generation_id for token in tokens} + execution_ids = {token.execution_id for token in tokens} return execution_ids + def _manifest_objects(self, + responses: list[urllib3.HTTPResponse] + ) -> set[tuple[str, str]]: + urls = self._manifest_urls(responses, status=302) + return { + (url.path.segments[0], '/'.join(url.path.segments[1:])) + for url in urls + if url.netloc == 's3.amazonaws.com' and url.scheme == 'https' + } + + def _manifest_urls(self, + responses: list[urllib3.HTTPResponse], + *, + status: int + ) -> list[furl]: + urls: list[furl] = [] + for response in responses: + if response.status == 200: + if response.headers['Content-Type'] == 'application/json': + body = json.loads(response.data) + if body['Status'] == status: + urls.append(furl(body['Location'])) + elif response.status == status: + urls.append(furl(response.headers['Location'])) + return urls + def _get_one_inner_file(self, catalog: CatalogName) -> tuple[JSON, FileInnerEntity]: outer_file = self._get_one_outer_file(catalog) inner_files: JSONs = outer_file['files'] @@ -1065,8 +1093,8 @@ def _check_curl_manifest(self, _catalog: CatalogName, response: bytes): num_files = 0 for url, output in grouper(lines, 2): num_files += 1 - self.assertTrue(url.startswith('url=')) - self.assertTrue(output.startswith('output=')) + self.assertTrue(url.startswith('url='), url) + self.assertTrue(output.startswith('output='), output) log.info(f'Manifest contains {num_files} files.') self.assertGreater(num_files, 0) diff --git a/test/service/test_manifest_async.py b/test/service/test_manifest_async.py index bcdcd25b7..ddde41283 100644 --- a/test/service/test_manifest_async.py +++ b/test/service/test_manifest_async.py @@ -36,6 +36,12 @@ from app_test_case import ( LocalAppTestCase, ) +from azul import ( + JSON, +) +from azul.collections import ( + deep_dict_merge, +) from azul.logging import ( configure_test_logging, ) @@ -44,6 +50,7 @@ ) from azul.service import ( Filters, + FiltersJSON, ) from azul.service.async_manifest_service import ( AsyncManifestService, @@ -79,11 +86,17 @@ class TestAsyncManifestService(AzulUnitTestCase): generation_id = UUID('1ea94a54-a64d-54f1-8b41-15455fb958db') def test_token_encoding(self, _sfn): - token = Token(generation_id=self.generation_id, request_index=42, retry_after=123) + token = Token(generation_id=self.generation_id, + iteration=3, + request_index=42, + retry_after=123) self.assertEqual(token, Token.decode(token.encode())) def test_token_validation(self, _sfn): - token = Token(generation_id=self.generation_id, request_index=42, retry_after=123) + token = Token(generation_id=self.generation_id, + iteration=3, + request_index=42, + retry_after=123) self.assertRaises(InvalidTokenError, token.decode, token.encode()[:-1]) def test_status_success(self, _sfn): @@ -92,8 +105,8 @@ def test_status_success(self, _sfn): manifest """ service = AsyncManifestService() - execution_name = service.execution_name(self.generation_id) - output = {'foo': 'bar'} + execution_name = service.execution_name(self.generation_id, iteration=0) + input, output = {'filters': {}}, {'foo': 'bar'} _sfn.describe_execution.return_value = { 'executionArn': service.execution_arn(execution_name), 'stateMachineArn': service.machine_arn, @@ -101,12 +114,15 @@ def test_status_success(self, _sfn): 'status': 'SUCCEEDED', 'startDate': datetime.datetime(2018, 11, 15, 18, 30, 44, 896000), 'stopDate': datetime.datetime(2018, 11, 15, 18, 30, 59, 295000), - 'input': '{"filters": {}}', + 'input': json.dumps(input), 'output': json.dumps(output) } - token = Token(generation_id=self.generation_id, request_index=0, retry_after=0) - actual_output = service.inspect_generation(token) - self.assertEqual(output, actual_output) + token = Token(generation_id=self.generation_id, + iteration=0, + request_index=0, + retry_after=0) + actual_result = service.inspect_generation(token) + self.assertEqual({'input': input, 'output': output}, actual_result) def test_status_running(self, _sfn): """ @@ -114,7 +130,7 @@ def test_status_running(self, _sfn): checking the job status """ service = AsyncManifestService() - execution_name = service.execution_name(self.generation_id) + execution_name = service.execution_name(self.generation_id, iteration=0) _sfn.describe_execution.return_value = { 'executionArn': service.execution_arn(execution_name), 'stateMachineArn': service.machine_arn, @@ -123,9 +139,15 @@ def test_status_running(self, _sfn): 'startDate': datetime.datetime(2018, 11, 15, 18, 30, 44, 896000), 'input': '{"filters": {}}' } - token = Token(generation_id=self.generation_id, request_index=0, retry_after=0) + token = Token(generation_id=self.generation_id, + iteration=0, + request_index=0, + retry_after=0) token = service.inspect_generation(token) - expected = Token(generation_id=self.generation_id, request_index=1, retry_after=1) + expected = Token(generation_id=self.generation_id, + iteration=0, + request_index=1, + retry_after=1) self.assertEqual(expected, token) def test_status_failed(self, _sfn): @@ -133,7 +155,7 @@ def test_status_failed(self, _sfn): A failed manifest job should raise a GenerationFailed """ service = AsyncManifestService() - execution_name = service.execution_name(self.generation_id) + execution_name = service.execution_name(self.generation_id, iteration=0) _sfn.describe_execution.return_value = { 'executionArn': service.execution_arn(execution_name), 'stateMachineArn': service.machine_arn, @@ -143,7 +165,10 @@ def test_status_failed(self, _sfn): 'stopDate': datetime.datetime(2018, 11, 14, 16, 6, 55, 860000), 'input': '{"filters": {"organ": {"is": ["lymph node"]}}}', } - token = Token(generation_id=self.generation_id, request_index=0, retry_after=0) + token = Token(generation_id=self.generation_id, + iteration=0, + request_index=0, + retry_after=0) with self.assertRaises(GenerationFailed): service.inspect_generation(token) @@ -246,9 +271,19 @@ def wrapper(*args, **kwargs): service: AsyncManifestService service = self.app_module.app.manifest_controller.async_service generation_id = manifest_key.uuid - execution_name = service.execution_name(generation_id) + execution_names = [ + service.execution_name(generation_id, iteration=i) + for i in range(3) + ] machine_arn = service.machine_arn - execution_arn = service.execution_arn(execution_name) + execution_arns = list(map(service.execution_arn, execution_names)) + + not_found = CachedManifestNotFound(manifest_key) + execution_exists = self._mock_sfn_exception( + _sfn, + operation_name='StartExecution', + error_code='ExecutionAlreadyExists' + ) not_found = CachedManifestNotFound(manifest_key) execution_exists = self._mock_sfn_exception(_sfn, @@ -277,6 +312,45 @@ def assert_get_manifest(partition): key_url: furl final_url: furl equivalent_url: furl + equivalent_filters: FiltersJSON + equivalent_input: ManifestGenerationState + + iterations: list[JSON] = [] + + def mock_start_generation(*, start: int = 0, describe: int = 0): + *rest, last = range(start, len(iterations)) + _sfn.start_execution.side_effect = [ + *(execution_exists for _ in rest), + { + 'executionArn': execution_arns[last], + 'startDate': 1234 + } + ] + *rest, last = range(describe, len(iterations)) + _sfn.describe_execution.side_effect = [ + { + 'status': 'SUCCEEDED', + 'input': json.dumps(iterations[i]), + 'output': json.dumps(state) + } + for i in rest + ] + + def assert_start_generation(*, start: int = 0, describe: int = 0): + indices = range(start, len(iterations)) + expected_calls = [ + mock.call(stateMachineArn=machine_arn, + name=execution_names[i], + input=json.dumps(iterations[-1])) + for i in indices + ] + self.assertEqual(expected_calls, _sfn.start_execution.mock_calls) + indices = range(describe, len(iterations)) + expected_calls = [ + mock.call(executionArn=execution_arns[i]) + for i in indices[:-1] + ] + self.assertEqual(expected_calls, _sfn.describe_execution.mock_calls) # Request the manifest. The cached manifest does not exist # so we expect a StepFunction execution to be started and a @@ -287,19 +361,13 @@ def assert_get_manifest(partition): def put(): nonlocal url, state, token_url get_cached_manifest.side_effect = not_found - _sfn.start_execution.return_value = { - 'executionArn': execution_arn, - 'startDate': 123 - } + iterations.append(input) + mock_start_generation() url = self._request('PUT', initial_url, expect=301) assert_get_cached_manifest() + assert_start_generation() state = input token_url = url - _sfn.start_execution.assert_called_once_with( - stateMachineArn=machine_arn, - name=execution_name, - input=json.dumps(input) - ) put() @@ -361,10 +429,12 @@ def get_token_when_done(): key_url = None final_url = object_url get_manifest_url.return_value = str(object_url) + get_cached_manifest_with_key.return_value = manifest url = self._request('GET', url, expect=302) self.assertEqual(final_url, url) assert_get_manifest(partition=1) _sfn.describe_execution.assert_called_once() + get_cached_manifest_with_key.assert_called_once_with(manifest_key) get_token_when_done() @@ -394,7 +464,7 @@ def repeat_put(): # @reset def modified_put(): - nonlocal url, equivalent_url + nonlocal url, equivalent_url, equivalent_filters get_cached_manifest.return_value = manifest get_manifest_url.return_value = str(object_url) if key_url is not None: @@ -412,28 +482,58 @@ def modified_put(): # Expire the cached manifest and repeat the initial request # with the insignificant difference. The repeated request # should be considered valid and matching the completed step - # function execution. + # function execution. However, because the manifest is missing, + # the generation should be restarted with a new execution. # @reset def modified_put_after_expiration(): - nonlocal url + nonlocal url, state, token_url, equivalent_input get_cached_manifest.side_effect = not_found - _sfn.start_execution.side_effect = execution_exists - _sfn.describe_execution.return_value = { - 'status': 'SUCCEEDED', - 'input': json.dumps(input), - 'output': json.dumps(state) - } + equivalent_input = deep_dict_merge( + {'filters': {'explicit': equivalent_filters}}, + input + ) + iterations.append(equivalent_input) + mock_start_generation() url = self._request('PUT', equivalent_url, expect=301) - # FIXME: 404 from S3 when re-requesting manifest after it expired - # https://github.com/DataBiosphere/azul/issues/6441 - if True: - self.assertEqual(token_url, url) - else: - self.assertNotEqual(token_url, url) assert_get_cached_manifest() + self.assertNotEqual(token_url, url) + assert_get_cached_manifest() + assert_start_generation() + token_url = url + state = equivalent_input modified_put_after_expiration() + get_token_while_running() + get_token_when_almost_done() + + # The StepFunction has finished but the output is has expired + # or was deleted. We expect yet another execution to restart + # the generation. + # + @reset + def get_stale_token_when_done(): + nonlocal url, state, key_url, token_url + get_manifest.return_value = manifest + state = self.app_module.generate_manifest(state, None) + get_cached_manifest_with_key.side_effect = not_found + previous_iteration = len(iterations) + iterations.append(equivalent_input) + mock_start_generation(start=previous_iteration, + describe=previous_iteration - 1) + url = self._request('GET', url, expect=301) + self.assertNotEqual(token_url, url) + assert_get_manifest(partition=1) + get_cached_manifest_with_key.assert_called_once_with(manifest_key) + assert_start_generation(start=previous_iteration, + describe=previous_iteration - 1) + token_url = url + state = equivalent_input + + get_stale_token_when_done() + get_token_while_running() + get_token_when_almost_done() + get_token_when_done() # Request the manifest by its key if a URL with that key # was the result of the final 302 redirect above. @@ -467,7 +567,7 @@ def get_key_after_expiration(): self.assertEqual(410, response.status_code) expected_response = { 'Code': 'GoneError', - 'Message': 'The requested manifest has expired, please request a new one' + 'Message': 'The manifest has expired, please request a new one' } self.assertEqual(expected_response, response.json()) get_cached_manifest_with_key.assert_called_once_with(manifest_key) @@ -491,7 +591,7 @@ def _request(self, method: str, url: furl, *, expect: int) -> furl: self.assertGreaterEqual(int(headers['Retry-After']), 0) return furl(headers['Location']) - token = Token.first(generation_id).encode() + token = Token.first(generation_id, iteration=0).encode() def _test(self, *, expected_status, token=token): url = self.base_url.set(path=['fetch', 'manifest', 'files', token])