Skip to content

Commit

Permalink
feat: testframework pipeline evaluation logic without iteration activ…
Browse files Browse the repository at this point in the history
…ities
  • Loading branch information
arjendev committed Nov 8, 2023
1 parent 149d995 commit ea7a209
Show file tree
Hide file tree
Showing 17 changed files with 390 additions and 30 deletions.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
from typing import Any, List
from data_factory_testing_framework.generated.models import Activity, DependencyCondition, DataFactoryElement
from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState


class Activity:
status: DependencyCondition

@staticmethod
def patch_generated_models(models):
models.Activity._evaluate_expressions = Activity.evaluate_expressions
models.Activity.evaluate = Activity.evaluate
models.Activity.are_dependency_condition_met = Activity.are_dependency_condition_met
models.Activity.get_scoped_activity_result_by_name = Activity.get_scoped_activity_result_by_name
models.Activity.status = None

def evaluate(self: Activity):
def evaluate(self, state: PipelineRunState) -> Activity:
self._evaluate_expressions(self)
self.status = DependencyCondition.Succeeded
return self.status

return self

def evaluate_expressions(self, obj: Any, visited: List[Any] = None):
if visited is None:
Expand All @@ -26,9 +29,27 @@ def evaluate_expressions(self, obj: Any, visited: List[Any] = None):
visited.append(obj)

attribute_names = [attribute for attribute in dir(obj) if not attribute.startswith('__') and not callable(getattr(obj, attribute))]
for attributeName in attribute_names:
attribute = getattr(obj, attributeName)
for attribute_name in attribute_names:
attribute = getattr(obj, attribute_name)
if data_factory_element := isinstance(attribute, DataFactoryElement) and attribute:
data_factory_element.evaluate()
else:
self._evaluate_expressions(attribute, visited)

def get_scoped_activity_result_by_name(self, name: str, state: PipelineRunState):
return next((activity_result for activity_result in state.scoped_pipeline_activity_results if activity_result.name == name), None)

def are_dependency_condition_met(self, state: PipelineRunState):
for dependency in self.depends_on:
dependency_activity = self.get_scoped_activity_result_by_name(dependency.activity, state)

if dependency_activity is None:
return False

for dependency_condition in dependency.dependency_conditions:
if dependency_activity.status != dependency_condition:
return False

return True


Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import Callable

from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState


class ControlActivity:

@staticmethod
def patch_generated_models(models):
models.ControlActivity.evaluate_control_activity_iterations = ControlActivity.evaluate_control_activity_iterations

def evaluate_control_activity_iterations(self, state: PipelineRunState, evaluate: Callable):
return []
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import List

from data_factory_testing_framework.generated.models import ExecutePipelineActivity
from data_factory_testing_framework.models.base.parameter_type import ParameterType
from data_factory_testing_framework.models.base.run_parameter import RunParameter
from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState


class ExecutePipelineActivity:

@staticmethod
def patch_generated_models(models):
models.ExecutePipelineActivity.get_child_run_parameters = ExecutePipelineActivity.get_child_run_parameters

def get_child_run_parameters(self, state: PipelineRunState) -> List[RunParameter]:
child_parameters = []
for parameter in state.parameters:
if parameter.type == ParameterType.Global:
child_parameters.append(RunParameter(ParameterType.Global, parameter.name, parameter.value))

return child_parameters
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from enum import Enum
from azure.core import CaseInsensitiveEnumMeta


class ParameterType(str, Enum, metaclass=CaseInsensitiveEnumMeta):
Pipeline = "Pipeline"
Global = "Global"
Dataset = "Dataset"
LinkedService = "LinkedService"
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import TypeVar, Generic

from data_factory_testing_framework.models.base.parameter_type import ParameterType

T = TypeVar("T")


class RunParameter(Generic[T]):
def __init__(self, type: ParameterType, name: str, value: T):
self.type = type
self.name = name
self.value = value
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from typing import TypeVar, Generic

T = TypeVar("T")


class RunVariable(Generic[T]):
def __init__(self, name: str, default_value: T = None):
self.name = name
self.value = default_value
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from data_factory_testing_framework.generated import models as _models

from data_factory_testing_framework.models.activities.base import Activity
from data_factory_testing_framework.models.activities.control_activities.control_activity import ControlActivity
from data_factory_testing_framework.models.activities.control_activities.execute_pipeline_activity import \
ExecutePipelineActivity


class ModelsRepository:

def __init__(self):
# Patch models with our custom classes
Activity.patch_generated_models(_models)
ControlActivity.patch_generated_models(_models)
ExecutePipelineActivity.patch_generated_models(_models)

self.models = {k: v for k, v in _models.__dict__.items() if isinstance(v, type)}

def get_models(self):
return self.models
return self.models
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import List, Dict

from data_factory_testing_framework.generated.models import VariableSpecification
from data_factory_testing_framework.models.base.run_parameter import RunParameter
from data_factory_testing_framework.models.base.run_variable import RunVariable
from data_factory_testing_framework.models.state.run_state import RunState


class PipelineRunState(RunState):
def __init__(self, parameters: List[RunParameter] = [], variables: Dict[str, VariableSpecification] = []):
super().__init__(parameters)
self.variables = []
for variable in variables:
self.variables.append(RunVariable(variable.name, variable.default_value))

self.pipeline_activity_results = []
self.scoped_pipeline_activity_results = []
self.iteration_item = None

def add_activity_result(self, activity):
self.pipeline_activity_results.append(activity)
self.scoped_pipeline_activity_results.append(activity)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from typing import List

from data_factory_testing_framework.models.base.run_parameter import RunParameter


class RunState:
def __init__(self, parameters: List[RunParameter]):
self.parameters = parameters
58 changes: 51 additions & 7 deletions src/python/data_factory_testing_framework/models/test_framework.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,61 @@
from typing import Any, List

from data_factory_testing_framework.generated.models import Activity, DependencyCondition
from data_factory_testing_framework.generated.models import Activity, DependencyCondition, PipelineResource, \
UntilActivity, ForEachActivity, IfConditionActivity, ExecutePipelineActivity, ControlActivity

from data_factory_testing_framework.models.repositories.data_factory_repository_factory import \
DataFactoryRepositoryFactory
from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState


class TestFramework:

def __init__(self, data_factory_folder_path: str):
def __init__(self, data_factory_folder_path: str, should_evaluate_child_pipelines: bool = False):
self.repository = DataFactoryRepositoryFactory.parse_from_folder(data_factory_folder_path)

def evaluate(self, activity: Activity) -> DependencyCondition:
return activity.evaluate()


self.should_evaluate_child_pipelines = should_evaluate_child_pipelines

def evaluate_activity(self, activity: Activity) -> DependencyCondition:
self.evaluate_activities([activity])
return activity.status

def evaluate_pipeline(self, pipeline: PipelineResource, state: PipelineRunState):
return self.evaluate_activities(pipeline.activities, state)

def evaluate_activities(self, activities: List[Activity], state: PipelineRunState) -> DependencyCondition:
print("hi")
while len(state.scoped_pipeline_activity_results) != len(activities):
any_activity_evaluated = False
for activity in filter(
lambda a: a not in state.scoped_pipeline_activity_results and a.are_dependency_condition_met(state),
activities):
evaluated_activity = activity.evaluate(state)
if not self._is_iteration_activity(evaluated_activity) or (isinstance(evaluated_activity, ExecutePipelineActivity) and not self.should_evaluate_child_pipelines):
yield evaluated_activity

any_activity_evaluated = True
state.add_activity_result(activity)

if self._is_iteration_activity(activity):
if isinstance(activity, ExecutePipelineActivity) and self.should_evaluate_child_pipelines:
execute_pipeline_activity: ExecutePipelineActivity = activity
pipeline = self.repository.get_pipeline_by_name(execute_pipeline_activity.pipeline.reference_name)

# Evaluate the pipeline with its own scope
for childActivity in self.evaluate_pipeline(pipeline, activity.get_child_run_parameters(state)):
yield childActivity

if isinstance(activity, ControlActivity):
control_activity: ControlActivity = activity
for childActivity in control_activity.evaluate_control_activity_iterations(state, self.evaluate_activities):
yield childActivity

if not any_activity_evaluated:
raise Exception(
"Validate that there are no circular dependencies or whether activity results were not set correctly.")

@staticmethod
def _is_iteration_activity(activity: Activity) -> bool:
return (isinstance(activity, UntilActivity) or
isinstance(activity, ForEachActivity) or
isinstance(activity, IfConditionActivity) or
isinstance(activity, ExecutePipelineActivity))
Loading

0 comments on commit ea7a209

Please sign in to comment.