From ea7a209babc68c92b54584e99bd62a7887330473 Mon Sep 17 00:00:00 2001 From: Arjen Kroezen Date: Wed, 8 Nov 2023 18:50:43 +0100 Subject: [PATCH] feat: testframework pipeline evaluation logic without iteration activities --- .../models/activities}/__init__.py | 0 .../models/activities/base/activity.py | 31 ++- .../activities/control_activities/__init__.py | 0 .../control_activities/control_activity.py | 13 ++ .../execute_pipeline_activity.py | 21 ++ .../models/base/parameter_type.py | 9 + .../models/base/run_parameter.py | 12 + .../models/base/run_variable.py | 9 + .../models/repositories/models_repository.py | 7 +- .../models/state/__init__.py | 0 .../models/state/pipeline_run_state.py | 22 ++ .../models/state/run_state.py | 8 + .../models/test_framework.py | 58 ++++- src/python/tests/batch_job/batchjob_test.py | 213 ++++++++++++++++++ .../pipelines/batch_job.json | 0 src/python/tests/example/batchjob_tests.py | 16 -- src/python/tests/playground/example_test.py | 1 - 17 files changed, 390 insertions(+), 30 deletions(-) rename src/python/{tests/example => data_factory_testing_framework/models/activities}/__init__.py (100%) create mode 100644 src/python/data_factory_testing_framework/models/activities/control_activities/__init__.py create mode 100644 src/python/data_factory_testing_framework/models/activities/control_activities/control_activity.py create mode 100644 src/python/data_factory_testing_framework/models/activities/control_activities/execute_pipeline_activity.py create mode 100644 src/python/data_factory_testing_framework/models/base/parameter_type.py create mode 100644 src/python/data_factory_testing_framework/models/base/run_parameter.py create mode 100644 src/python/data_factory_testing_framework/models/base/run_variable.py create mode 100644 src/python/data_factory_testing_framework/models/state/__init__.py create mode 100644 src/python/data_factory_testing_framework/models/state/pipeline_run_state.py create mode 100644 src/python/data_factory_testing_framework/models/state/run_state.py create mode 100644 src/python/tests/batch_job/batchjob_test.py rename src/python/tests/{example => batch_job}/pipelines/batch_job.json (100%) delete mode 100644 src/python/tests/example/batchjob_tests.py diff --git a/src/python/tests/example/__init__.py b/src/python/data_factory_testing_framework/models/activities/__init__.py similarity index 100% rename from src/python/tests/example/__init__.py rename to src/python/data_factory_testing_framework/models/activities/__init__.py diff --git a/src/python/data_factory_testing_framework/models/activities/base/activity.py b/src/python/data_factory_testing_framework/models/activities/base/activity.py index 9d5ca05f..c4af37e1 100644 --- a/src/python/data_factory_testing_framework/models/activities/base/activity.py +++ b/src/python/data_factory_testing_framework/models/activities/base/activity.py @@ -1,20 +1,23 @@ from typing import Any, List from data_factory_testing_framework.generated.models import Activity, DependencyCondition, DataFactoryElement +from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState class Activity: + status: DependencyCondition @staticmethod def patch_generated_models(models): models.Activity._evaluate_expressions = Activity.evaluate_expressions models.Activity.evaluate = Activity.evaluate + models.Activity.are_dependency_condition_met = Activity.are_dependency_condition_met + models.Activity.get_scoped_activity_result_by_name = Activity.get_scoped_activity_result_by_name models.Activity.status = None - def evaluate(self: Activity): + def evaluate(self, state: PipelineRunState) -> Activity: self._evaluate_expressions(self) self.status = DependencyCondition.Succeeded - return self.status - + return self def evaluate_expressions(self, obj: Any, visited: List[Any] = None): if visited is None: @@ -26,9 +29,27 @@ def evaluate_expressions(self, obj: Any, visited: List[Any] = None): visited.append(obj) attribute_names = [attribute for attribute in dir(obj) if not attribute.startswith('__') and not callable(getattr(obj, attribute))] - for attributeName in attribute_names: - attribute = getattr(obj, attributeName) + for attribute_name in attribute_names: + attribute = getattr(obj, attribute_name) if data_factory_element := isinstance(attribute, DataFactoryElement) and attribute: data_factory_element.evaluate() else: self._evaluate_expressions(attribute, visited) + + def get_scoped_activity_result_by_name(self, name: str, state: PipelineRunState): + return next((activity_result for activity_result in state.scoped_pipeline_activity_results if activity_result.name == name), None) + + def are_dependency_condition_met(self, state: PipelineRunState): + for dependency in self.depends_on: + dependency_activity = self.get_scoped_activity_result_by_name(dependency.activity, state) + + if dependency_activity is None: + return False + + for dependency_condition in dependency.dependency_conditions: + if dependency_activity.status != dependency_condition: + return False + + return True + + diff --git a/src/python/data_factory_testing_framework/models/activities/control_activities/__init__.py b/src/python/data_factory_testing_framework/models/activities/control_activities/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/python/data_factory_testing_framework/models/activities/control_activities/control_activity.py b/src/python/data_factory_testing_framework/models/activities/control_activities/control_activity.py new file mode 100644 index 00000000..c3c172b4 --- /dev/null +++ b/src/python/data_factory_testing_framework/models/activities/control_activities/control_activity.py @@ -0,0 +1,13 @@ +from typing import Callable + +from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState + + +class ControlActivity: + + @staticmethod + def patch_generated_models(models): + models.ControlActivity.evaluate_control_activity_iterations = ControlActivity.evaluate_control_activity_iterations + + def evaluate_control_activity_iterations(self, state: PipelineRunState, evaluate: Callable): + return [] diff --git a/src/python/data_factory_testing_framework/models/activities/control_activities/execute_pipeline_activity.py b/src/python/data_factory_testing_framework/models/activities/control_activities/execute_pipeline_activity.py new file mode 100644 index 00000000..cae96a3c --- /dev/null +++ b/src/python/data_factory_testing_framework/models/activities/control_activities/execute_pipeline_activity.py @@ -0,0 +1,21 @@ +from typing import List + +from data_factory_testing_framework.generated.models import ExecutePipelineActivity +from data_factory_testing_framework.models.base.parameter_type import ParameterType +from data_factory_testing_framework.models.base.run_parameter import RunParameter +from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState + + +class ExecutePipelineActivity: + + @staticmethod + def patch_generated_models(models): + models.ExecutePipelineActivity.get_child_run_parameters = ExecutePipelineActivity.get_child_run_parameters + + def get_child_run_parameters(self, state: PipelineRunState) -> List[RunParameter]: + child_parameters = [] + for parameter in state.parameters: + if parameter.type == ParameterType.Global: + child_parameters.append(RunParameter(ParameterType.Global, parameter.name, parameter.value)) + + return child_parameters diff --git a/src/python/data_factory_testing_framework/models/base/parameter_type.py b/src/python/data_factory_testing_framework/models/base/parameter_type.py new file mode 100644 index 00000000..a1c5dc40 --- /dev/null +++ b/src/python/data_factory_testing_framework/models/base/parameter_type.py @@ -0,0 +1,9 @@ +from enum import Enum +from azure.core import CaseInsensitiveEnumMeta + + +class ParameterType(str, Enum, metaclass=CaseInsensitiveEnumMeta): + Pipeline = "Pipeline" + Global = "Global" + Dataset = "Dataset" + LinkedService = "LinkedService" diff --git a/src/python/data_factory_testing_framework/models/base/run_parameter.py b/src/python/data_factory_testing_framework/models/base/run_parameter.py new file mode 100644 index 00000000..3f1bc829 --- /dev/null +++ b/src/python/data_factory_testing_framework/models/base/run_parameter.py @@ -0,0 +1,12 @@ +from typing import TypeVar, Generic + +from data_factory_testing_framework.models.base.parameter_type import ParameterType + +T = TypeVar("T") + + +class RunParameter(Generic[T]): + def __init__(self, type: ParameterType, name: str, value: T): + self.type = type + self.name = name + self.value = value diff --git a/src/python/data_factory_testing_framework/models/base/run_variable.py b/src/python/data_factory_testing_framework/models/base/run_variable.py new file mode 100644 index 00000000..10c5cb8b --- /dev/null +++ b/src/python/data_factory_testing_framework/models/base/run_variable.py @@ -0,0 +1,9 @@ +from typing import TypeVar, Generic + +T = TypeVar("T") + + +class RunVariable(Generic[T]): + def __init__(self, name: str, default_value: T = None): + self.name = name + self.value = default_value diff --git a/src/python/data_factory_testing_framework/models/repositories/models_repository.py b/src/python/data_factory_testing_framework/models/repositories/models_repository.py index 8322ec62..ac9ad1c9 100644 --- a/src/python/data_factory_testing_framework/models/repositories/models_repository.py +++ b/src/python/data_factory_testing_framework/models/repositories/models_repository.py @@ -1,6 +1,9 @@ from data_factory_testing_framework.generated import models as _models from data_factory_testing_framework.models.activities.base import Activity +from data_factory_testing_framework.models.activities.control_activities.control_activity import ControlActivity +from data_factory_testing_framework.models.activities.control_activities.execute_pipeline_activity import \ + ExecutePipelineActivity class ModelsRepository: @@ -8,8 +11,10 @@ class ModelsRepository: def __init__(self): # Patch models with our custom classes Activity.patch_generated_models(_models) + ControlActivity.patch_generated_models(_models) + ExecutePipelineActivity.patch_generated_models(_models) self.models = {k: v for k, v in _models.__dict__.items() if isinstance(v, type)} def get_models(self): - return self.models \ No newline at end of file + return self.models diff --git a/src/python/data_factory_testing_framework/models/state/__init__.py b/src/python/data_factory_testing_framework/models/state/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/python/data_factory_testing_framework/models/state/pipeline_run_state.py b/src/python/data_factory_testing_framework/models/state/pipeline_run_state.py new file mode 100644 index 00000000..391b6d3e --- /dev/null +++ b/src/python/data_factory_testing_framework/models/state/pipeline_run_state.py @@ -0,0 +1,22 @@ +from typing import List, Dict + +from data_factory_testing_framework.generated.models import VariableSpecification +from data_factory_testing_framework.models.base.run_parameter import RunParameter +from data_factory_testing_framework.models.base.run_variable import RunVariable +from data_factory_testing_framework.models.state.run_state import RunState + + +class PipelineRunState(RunState): + def __init__(self, parameters: List[RunParameter] = [], variables: Dict[str, VariableSpecification] = []): + super().__init__(parameters) + self.variables = [] + for variable in variables: + self.variables.append(RunVariable(variable.name, variable.default_value)) + + self.pipeline_activity_results = [] + self.scoped_pipeline_activity_results = [] + self.iteration_item = None + + def add_activity_result(self, activity): + self.pipeline_activity_results.append(activity) + self.scoped_pipeline_activity_results.append(activity) diff --git a/src/python/data_factory_testing_framework/models/state/run_state.py b/src/python/data_factory_testing_framework/models/state/run_state.py new file mode 100644 index 00000000..d3086dce --- /dev/null +++ b/src/python/data_factory_testing_framework/models/state/run_state.py @@ -0,0 +1,8 @@ +from typing import List + +from data_factory_testing_framework.models.base.run_parameter import RunParameter + + +class RunState: + def __init__(self, parameters: List[RunParameter]): + self.parameters = parameters diff --git a/src/python/data_factory_testing_framework/models/test_framework.py b/src/python/data_factory_testing_framework/models/test_framework.py index 1935b706..ee38924b 100644 --- a/src/python/data_factory_testing_framework/models/test_framework.py +++ b/src/python/data_factory_testing_framework/models/test_framework.py @@ -1,17 +1,61 @@ from typing import Any, List -from data_factory_testing_framework.generated.models import Activity, DependencyCondition +from data_factory_testing_framework.generated.models import Activity, DependencyCondition, PipelineResource, \ + UntilActivity, ForEachActivity, IfConditionActivity, ExecutePipelineActivity, ControlActivity from data_factory_testing_framework.models.repositories.data_factory_repository_factory import \ DataFactoryRepositoryFactory +from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState class TestFramework: - def __init__(self, data_factory_folder_path: str): + def __init__(self, data_factory_folder_path: str, should_evaluate_child_pipelines: bool = False): self.repository = DataFactoryRepositoryFactory.parse_from_folder(data_factory_folder_path) - - def evaluate(self, activity: Activity) -> DependencyCondition: - return activity.evaluate() - - + self.should_evaluate_child_pipelines = should_evaluate_child_pipelines + + def evaluate_activity(self, activity: Activity) -> DependencyCondition: + self.evaluate_activities([activity]) + return activity.status + + def evaluate_pipeline(self, pipeline: PipelineResource, state: PipelineRunState): + return self.evaluate_activities(pipeline.activities, state) + + def evaluate_activities(self, activities: List[Activity], state: PipelineRunState) -> DependencyCondition: + print("hi") + while len(state.scoped_pipeline_activity_results) != len(activities): + any_activity_evaluated = False + for activity in filter( + lambda a: a not in state.scoped_pipeline_activity_results and a.are_dependency_condition_met(state), + activities): + evaluated_activity = activity.evaluate(state) + if not self._is_iteration_activity(evaluated_activity) or (isinstance(evaluated_activity, ExecutePipelineActivity) and not self.should_evaluate_child_pipelines): + yield evaluated_activity + + any_activity_evaluated = True + state.add_activity_result(activity) + + if self._is_iteration_activity(activity): + if isinstance(activity, ExecutePipelineActivity) and self.should_evaluate_child_pipelines: + execute_pipeline_activity: ExecutePipelineActivity = activity + pipeline = self.repository.get_pipeline_by_name(execute_pipeline_activity.pipeline.reference_name) + + # Evaluate the pipeline with its own scope + for childActivity in self.evaluate_pipeline(pipeline, activity.get_child_run_parameters(state)): + yield childActivity + + if isinstance(activity, ControlActivity): + control_activity: ControlActivity = activity + for childActivity in control_activity.evaluate_control_activity_iterations(state, self.evaluate_activities): + yield childActivity + + if not any_activity_evaluated: + raise Exception( + "Validate that there are no circular dependencies or whether activity results were not set correctly.") + + @staticmethod + def _is_iteration_activity(activity: Activity) -> bool: + return (isinstance(activity, UntilActivity) or + isinstance(activity, ForEachActivity) or + isinstance(activity, IfConditionActivity) or + isinstance(activity, ExecutePipelineActivity)) diff --git a/src/python/tests/batch_job/batchjob_test.py b/src/python/tests/batch_job/batchjob_test.py new file mode 100644 index 00000000..049914ed --- /dev/null +++ b/src/python/tests/batch_job/batchjob_test.py @@ -0,0 +1,213 @@ +from data_factory_testing_framework.generated.models import WebActivity, DependencyCondition, SetVariableActivity +from data_factory_testing_framework.models.base.parameter_type import ParameterType +from data_factory_testing_framework.models.base.run_parameter import RunParameter +from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState +from data_factory_testing_framework.models.test_framework import TestFramework + + +class TestBatchJob: + + def test_batch_job_pipeline(self): + # Arrange + test_framework = TestFramework("pipelines") + pipeline = test_framework.repository.get_pipeline_by_name("batch_job") + state = PipelineRunState(parameters=[ + RunParameter(ParameterType.Pipeline, "BatchPoolId", "batch-pool-id"), + RunParameter(ParameterType.Pipeline, "WorkloadApplicationPackageName", "test-application"), + RunParameter(ParameterType.Pipeline, "WorkloadApplicationPackageVersion", "1.5.0"), + RunParameter(ParameterType.Pipeline, "ManagerApplicationPackageName", "batchmanager"), + RunParameter(ParameterType.Pipeline, "ManagerApplicationPackageVersion", "2.0.0"), + RunParameter(ParameterType.Pipeline, "ManagerTaskParameters", "--parameter1 dummy --parameter2 another-dummy"), + RunParameter(ParameterType.Pipeline, "JobId", "802100a5-ec79-4a52-be62-8d6109f3ff9a"), + RunParameter(ParameterType.Pipeline, "TaskOutputFolderPrefix", "TASKOUTPUT_"), + RunParameter(ParameterType.Pipeline, "WorkloadUserAssignedIdentityName", "test-application-identity-name"), + RunParameter(ParameterType.Pipeline, "WorkloadUserAssignedIdentityClientId", "/subscriptions/SUBSCRIPTION_ID/resourcegroups/RESOURCE_GROUP/providers/Microsoft.ManagedIdentity/userAssignedIdentities/test-application-identity-name"), + RunParameter(ParameterType.Pipeline, "JobAdditionalEnvironmentSettings", "[]"), + RunParameter(ParameterType.Pipeline, "OutputStorageAccountName", "test-application-output-storage-account-name"), + RunParameter(ParameterType.Pipeline, "OutputContainerName", "test-application-output-container-name"), + RunParameter(ParameterType.Pipeline, "OutputFolderName", "TEMP"), + RunParameter(ParameterType.Pipeline, "BatchJobTimeout", "PT4H"), + RunParameter(ParameterType.Global, "BatchStorageAccountName", "batch-account-name"), + RunParameter(ParameterType.Global, "BatchAccountSubscription", "SUBSCRIPTION_ID"), + RunParameter(ParameterType.Global, "BatchAccountResourceGroup", "RESOURCE_GROUP"), + RunParameter(ParameterType.Global, "BatchURI", "https://batch-account-name.westeurope.batch.azure.com"), + RunParameter(ParameterType.Global, "ADFSubscription", "bd19dba4-89ad-4976-b862-848bf43a4340"), + RunParameter(ParameterType.Global, "ADFResourceGroup", "adf-rg"), + RunParameter(ParameterType.Global, "ADFName", "adf-name"), + ]) + + # Act + activities = test_framework.evaluate_pipeline(pipeline, state) + + # Assert + activity: SetVariableActivity = next(activities) + assert activity.name == "Set UserAssignedIdentityReference" + assert activity.variable_name == "UserAssignedIdentityReference" + # assert activity.value.value == "/subscriptions/SUBSCRIPTION_ID/resourcegroups/RESOURCE_GROUP/providers/Microsoft.ManagedIdentity/userAssignedIdentities/test-application-identity-name" + + activity = next(activities) + assert activity.name == "Set ManagerApplicationPackagePath" + assert activity.variable_name == "ManagerApplicationPackagePath" + # assert activity.value == "$AZ_BATCH_APP_PACKAGE_batchmanager_2_0_0/batchmanager.tar.gz" + + activity = next(activities) + assert activity.name == "Set WorkloadApplicationPackagePath" + assert activity.variable_name == "WorkloadApplicationPackagePath" + # assert activity.value == "$AZ_BATCH_APP_PACKAGE_test-application_1_5_0/test-application.tar.gz" + + activity = next(activities) + assert activity.name == "Set CommonEnvironmentSettings" + assert activity.variable_name == "CommonEnvironmentSettings" + # assert activity.value == """ + # [ + # { + # "name": "WORKLOAD_APP_PACKAGE", + # "value": "test-application" + # }, + # { + # "name": "WORKLOAD_APP_PACKAGE_VERSION", + # "value": "1.5.0" + # }, + # { + # "name": "MANAGER_APP_PACKAGE", + # "value": "batchmanager" + # }, + # { + # "name": "MANAGER_APP_PACKAGE_VERSION", + # "value": "2.0.0" + # }, + # { + # "name": "BATCH_JOB_TIMEOUT", + # "value": "PT4H" + # }, + # { + # "name": "WORKLOAD_AUTO_STORAGE_ACCOUNT_NAME", + # "value": "batch-account-name" + # }, + # { + # "name": "WORKLOAD_USER_ASSIGNED_IDENTITY_RESOURCE_ID", + # "value": "/subscriptions/SUBSCRIPTION_ID/resourcegroups/RESOURCE_GROUP/providers/Microsoft.ManagedIdentity/userAssignedIdentities/test-application-identity-name" + # }, + # { + # "name": "WORKLOAD_USER_ASSIGNED_IDENTITY_CLIENT_ID", + # "value": "/subscriptions/SUBSCRIPTION_ID/resourcegroups/RESOURCE_GROUP/providers/Microsoft.ManagedIdentity/userAssignedIdentities/test-application-identity-name" + # } + # ] + # """ + + activity = next(activities) + assert activity.name == "Set JobContainerName" + assert activity.variable_name == "JobContainerName" + #assert activity.value == "job-802100a5-ec79-4a52-be62-8d6109f3ff9a" + + activity = next(activities) + assert activity.name == "Set Job Container URL" + assert activity.variable_name == "JobContainerURL" + #assert activity.value == "https://batch-account-name.blob.core.windows.net/job-802100a5-ec79-4a52-be62-8d6109f3ff9a" + + activity = next(activities) + assert activity.name == "Create Job Storage Container" + #assert activity.url == "https://batch-account-name.blob.core.windows.net/job-802100a5-ec79-4a52-be62-8d6109f3ff9a?restype=container" + assert activity.method == "PUT" + #assert activity.body == "{}" + + activity = next(activities) + assert activity.name == "Start Job" + #assert activity.uri == "https://batch-account-name.westeurope.batch.azure.com/jobs?api-version=2022-10-01.16.0" + assert activity.method == "POST" + # assert activity.body == """ + # { + # "id": "802100a5-ec79-4a52-be62-8d6109f3ff9a", + # "priority": 100, + # "constraints": { + # "maxWallClockTime": "PT4H", + # "maxTaskRetryCount": 0 + # }, + # "jobManagerTask": { + # "id": "Manager", + # "displayName": "Manager", + # "authenticationTokenSettings": { + # "access": [ + # "job" + # ] + # }, + # "commandLine": "/bin/bash -c \"python3 -m ensurepip --upgrade && python3 -m pip install --user $AZ_BATCH_APP_PACKAGE_batchmanager_2_0_0/batchmanager.tar.gz && python3 -m pip install --user $AZ_BATCH_APP_PACKAGE_test-application_1_5_0/test-application.tar.gz && python3 -m test-application job --parameter1 dummy --parameter2 another-dummy\"", + # "applicationPackageReferences": [ + # { + # "applicationId": "batchmanager", + # "version": "2.0.0" + # }, + # { + # "applicationId": "test-application", + # "version": "1.5.0" + # } + # ], + # "outputFiles": [ + # { + # "destination": { + # "container": { + # "containerUrl": "https://batch-account-name.blob.core.windows.net/job-802100a5-ec79-4a52-be62-8d6109f3ff9a", + # "identityReference": { + # "resourceId": "/subscriptions/SUBSCRIPTION_ID/resourcegroups/RESOURCE_GROUP/providers/Microsoft.ManagedIdentity/userAssignedIdentities/test-application-identity-name" + # }, + # "path": "Manager/$TaskLog" + # } + # }, + # "filePattern": "../*.txt", + # "uploadOptions": { + # "uploadCondition": "taskcompletion" + # } + # } + # ], + # "environmentSettings": [], + # "requiredSlots": 1, + # "killJobOnCompletion": false, + # "userIdentity": { + # "username": null, + # "autoUser": { + # "scope": "pool", + # "elevationLevel": "nonadmin" + # } + # }, + # "runExclusive": true, + # "allowLowPriorityNode": true + # }, + # "poolInfo": { + # "poolId": "batch-pool-id" + # }, + # "onAllTasksComplete": "terminatejob", + # "onTaskFailure": "noaction", + # "usesTaskDependencies": true, + # "commonEnvironmentSettings": [{"name": "WORKLOAD_APP_PACKAGE", "value": "test-application"}, {"name": "WORKLOAD_APP_PACKAGE_VERSION", "value": "1.5.0"}, {"name": "MANAGER_APP_PACKAGE", "value": "batchmanager"}, {"name": "MANAGER_APP_PACKAGE_VERSION", "value": "2.0.0"}, {"name": "BATCH_JOB_TIMEOUT", "value": "PT4H"}, {"name": "WORKLOAD_AUTO_STORAGE_ACCOUNT_NAME", "value": "batch-account-name"}, {"name": "WORKLOAD_USER_ASSIGNED_IDENTITY_RESOURCE_ID", "value": "/subscriptions/SUBSCRIPTION_ID/resourcegroups/RESOURCE_GROUP/providers/Microsoft.ManagedIdentity/userAssignedIdentities/test-application-identity-name"}, {"name": "WORKLOAD_USER_ASSIGNED_IDENTITY_CLIENT_ID", "value": "/subscriptions/SUBSCRIPTION_ID/resourcegroups/RESOURCE_GROUP/providers/Microsoft.ManagedIdentity/userAssignedIdentities/test-application-identity-name"}] + # }""" + + activity = next(activities) + assert activity.name == "Monitor Batch Job" + assert activity.pipeline.reference_name == "monitor_batch_job" + assert len(activity.parameters) == 1 + # assert activity.parameters["JobId"] == "802100a5-ec79-4a52-be62-8d6109f3ff9a" + + activity = next(activities) + assert activity.name == "Copy Output Files" + assert activity.pipeline.reference_name == "copy_output_files" + assert len(activity.parameters) == 5 + # assert activity.parameters["JobContainerName"] == "job-802100a5-ec79-4a52-be62-8d6109f3ff9a" + # assert activity.parameters["TaskOutputFolderPrefix"] == "TASKOUTPUT_" + # assert activity.parameters["OutputStorageAccountName"] == "test-application-output-storage-account-name" + # assert activity.parameters["OutputContainerName"] == "test-application-output-container-name" + # assert activity.parameters["OutputFolderName"] == "TEMP" + + activity = next(activities) + assert activity.name == "Delete Job Storage Container" + # assert activity.uri == "https://batch-account-name.blob.core.windows.net/job-802100a5-ec79-4a52-be62-8d6109f3ff9a?restype=container" + assert activity.method == "DELETE" + # assert activity.body is None + + # Assert that there are no more activities + try: + next(activities) + assert False # This line should not be reached, an exception should be raised + except StopIteration: + pass + + diff --git a/src/python/tests/example/pipelines/batch_job.json b/src/python/tests/batch_job/pipelines/batch_job.json similarity index 100% rename from src/python/tests/example/pipelines/batch_job.json rename to src/python/tests/batch_job/pipelines/batch_job.json diff --git a/src/python/tests/example/batchjob_tests.py b/src/python/tests/example/batchjob_tests.py deleted file mode 100644 index 95d862c9..00000000 --- a/src/python/tests/example/batchjob_tests.py +++ /dev/null @@ -1,16 +0,0 @@ -from data_factory_testing_framework.generated.models import WebActivity, DependencyCondition -from data_factory_testing_framework.models.test_framework import TestFramework - - -def test_batch_job_pipeline(): - # Arrange - test_framework = TestFramework("pipelines") - pipeline = test_framework.repository.get_pipeline_by_name("batch_job") - web_activity: WebActivity = pipeline.activities[0] - - # Act - result = test_framework.evaluate(web_activity) - - # Assert - assert web_activity.body.value == "test2" - assert result == DependencyCondition.Succeeded diff --git a/src/python/tests/playground/example_test.py b/src/python/tests/playground/example_test.py index 69453bc9..39d1fd4a 100644 --- a/src/python/tests/playground/example_test.py +++ b/src/python/tests/playground/example_test.py @@ -3,7 +3,6 @@ from data_factory_testing_framework.generated import Deserializer from data_factory_testing_framework.generated.models import WebActivity, PipelineResource -import data_factory_testing_framework.models.activities.base.activity def get_pipeline():