Skip to content

Commit

Permalink
feat: if condition activity
Browse files Browse the repository at this point in the history
  • Loading branch information
arjendev committed Nov 9, 2023
1 parent bbc159a commit d731665
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def evaluate(self: ForEachActivity, state: PipelineRunState):
return super(ControlActivity, self).evaluate(state)

def evaluate_control_activity_iterations(self: ForEachActivity, state: PipelineRunState, evaluate_activities: Callable[[PipelineRunState], Generator[Activity, None, None]]):
for item in self.items.evaluated_items:
for item in self.items.evaluated:
scoped_state = state.create_iteration_scope(item)
for activity in evaluate_activities(self.activities, scoped_state):
yield activity
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import Callable, Generator

from data_factory_testing_framework.generated.models import ForEachActivity, Activity, ControlActivity, \
IfConditionActivity
from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState


class IfConditionActivity:

def evaluate(self: IfConditionActivity, state: PipelineRunState):
self.expression.evaluate(state)

return super(ControlActivity, self).evaluate(state)

def evaluate_control_activity_iterations(self: IfConditionActivity, state: PipelineRunState, evaluate_activities: Callable[[PipelineRunState], Generator[Activity, None, None]]):
scoped_state = state.create_iteration_scope(None)
activities = self.if_true_activities if self.expression.value else self.if_false_activities
for activity in evaluate_activities(activities, scoped_state):
yield activity

state.add_scoped_activity_results_from_scoped_state(scoped_state)
11 changes: 7 additions & 4 deletions src/python/data_factory_testing_framework/models/expression.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from typing import List
from typing import List, TypeVar

from data_factory_testing_framework.generated.models import Expression
from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState


class Expression:
TResult = TypeVar("TResult")

evaluated_items: List[str] = []

class Expression[TResult]:

evaluated: TResult = []

def evaluate(self: Expression, state: PipelineRunState):
self.evaluated_items = [
self.evaluated = [
"item1",
"item2",
"item3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from data_factory_testing_framework.models.activities.control_activities.execute_pipeline_activity import \
ExecutePipelineActivity
from data_factory_testing_framework.models.activities.control_activities.for_each_activity import ForEachActivity
from data_factory_testing_framework.models.activities.control_activities.if_condition_activity import \
IfConditionActivity
from data_factory_testing_framework.models.expression import Expression


Expand All @@ -15,6 +17,7 @@ def patch_models():
patch_model(_models.ControlActivity, ControlActivity)
patch_model(_models.ForEachActivity, ForEachActivity)
patch_model(_models.Expression, Expression)
patch_model(_models.IfConditionActivity, IfConditionActivity)


def patch_model(main_class, partial_class):
Expand Down
Empty file.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import pytest

from data_factory_testing_framework.generated.models import ForEachActivity, Expression, ExpressionType, \
SetVariableActivity, DataFactoryElement, IfConditionActivity
from data_factory_testing_framework.models.base.pipeline_run_variable import PipelineRunVariable
from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState
from data_factory_testing_framework.models.test_framework import TestFramework


class TestIfConditionActivity:

def test_when_evaluated_should_evaluate_expression(self):
# Arrange
activity = IfConditionActivity(
name="IfConditionActivity",
expression=Expression(type=ExpressionType.EXPRESSION, value="@equals(1, 1)"))

# Act
activity.evaluate(PipelineRunState())

# Assert
# assert activity.expression.evaluated == True

@pytest.mark.parametrize("expression_outcome,expected_activity_name", [(True, "setVariableActivity1"),(False, "setVariableActivity2")])
def test_when_evaluated_should_evaluate_correct_child_activities(self, expression_outcome, expected_activity_name):
# Arrange
test_framework = TestFramework()
expression = "@equals(1, 1)" if expression_outcome else "@equals(1, 2)"
activity = IfConditionActivity(
name="IfConditionActivity",
expression=Expression(type=ExpressionType.EXPRESSION, value=expression),
if_true_activities=[
SetVariableActivity(
name="setVariableActivity1",
variable_name="variable",
value="dummy")
],
if_false_activities=[
SetVariableActivity(
name="setVariableActivity2",
variable_name="variable",
value="dummy")
]
)

state = PipelineRunState()
state.variables.append(PipelineRunVariable("variable", ""))

# Act
child_activities = list(test_framework.evaluate_activity(activity, state))

# Assert
assert len(child_activities) == 1
assert child_activities[0].name == expected_activity_name

0 comments on commit d731665

Please sign in to comment.