Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): introduce an always block on flow & flowable #6686

Draft
wants to merge 3 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions core/src/main/java/io/kestra/core/models/executions/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,15 @@ public TaskRun findTaskRunByTaskIdAndValue(String id, List<String> values)
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedErrors afters tasks
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolvedTasks,
List<ResolvedTask> resolvedErrors) {
return this.findTaskDependingFlowState(resolvedTasks, resolvedErrors, null);
public List<ResolvedTask> findTaskDependingFlowState(
List<ResolvedTask> resolvedTasks,
List<ResolvedTask> resolvedErrors,
List<ResolvedTask> resolvedAfters
) {
return this.findTaskDependingFlowState(resolvedTasks, resolvedErrors, resolvedAfters, null);
}

/**
Expand All @@ -349,15 +353,28 @@ public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolved
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedAlways afters tasks
* @param parentTaskRun the parent task
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolvedTasks,
@Nullable List<ResolvedTask> resolvedErrors, TaskRun parentTaskRun) {
public List<ResolvedTask> findTaskDependingFlowState(
List<ResolvedTask> resolvedTasks,
@Nullable List<ResolvedTask> resolvedErrors,
@Nullable List<ResolvedTask> resolvedAlways,
TaskRun parentTaskRun
) {
resolvedTasks = removeDisabled(resolvedTasks);
resolvedErrors = removeDisabled(resolvedErrors);
resolvedAlways = removeDisabled(resolvedAlways);


List<TaskRun> errorsFlow = this.findTaskRunByTasks(resolvedErrors, parentTaskRun);
List<TaskRun> alwaysFlow = this.findTaskRunByTasks(resolvedAlways, parentTaskRun);

// always is already started, just continue theses always
if (!alwaysFlow.isEmpty()) {
return resolvedAlways == null ? Collections.emptyList() : resolvedAlways;
}

// Check if flow has failed task
if (!errorsFlow.isEmpty() || this.hasFailed(resolvedTasks, parentTaskRun)) {
Expand All @@ -366,8 +383,15 @@ public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolved
return Collections.emptyList();
}

return resolvedErrors == null ? Collections.emptyList() : resolvedErrors;
if (resolvedAlways != null && resolvedErrors != null && !this.isTerminated(resolvedErrors, parentTaskRun)) {
return resolvedErrors;
} else if (resolvedAlways == null) {
return resolvedErrors == null ? Collections.emptyList() : resolvedErrors;
}
}

if (this.isTerminated(resolvedTasks, parentTaskRun) && resolvedAlways != null) {
return resolvedAlways;
}

return resolvedTasks;
Expand All @@ -390,8 +414,7 @@ private List<ResolvedTask> removeDisabled(List<ResolvedTask> tasks) {
.toList();
}

public List<TaskRun> findTaskRunByTasks(List<ResolvedTask> resolvedTasks,
TaskRun parentTaskRun) {
public List<TaskRun> findTaskRunByTasks(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun) {
if (resolvedTasks == null || this.taskRunList == null) {
return Collections.emptyList();
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {
@Valid
List<Task> errors;

@Valid
List<Task> always;

@Valid
@Deprecated
List<Listener> listeners;
Expand Down Expand Up @@ -188,6 +191,7 @@ public Stream<Task> allTasks() {
return Stream.of(
this.tasks != null ? this.tasks : new ArrayList<Task>(),
this.errors != null ? this.errors : new ArrayList<Task>(),
this.always != null ? this.always : new ArrayList<Task>(),
this.listenersTasks()
)
.flatMap(Collection::stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public class FlowForExecution extends AbstractFlow {
@Valid
List<TaskForExecution> errors;

@Valid
List<TaskForExecution> always;

@Valid
List<AbstractTriggerForExecution> triggers;

Expand All @@ -36,6 +39,7 @@ public static FlowForExecution of(Flow flow) {
.inputs(flow.getInputs())
.tasks(flow.getTasks().stream().map(TaskForExecution::of).toList())
.errors(ListUtils.emptyOnNull(flow.getErrors()).stream().map(TaskForExecution::of).toList())
.always(ListUtils.emptyOnNull(flow.getAlways()).stream().map(TaskForExecution::of).toList())
.triggers(ListUtils.emptyOnNull(flow.getTriggers()).stream().map(AbstractTriggerForExecution::of).toList())
.disabled(flow.isDisabled())
.deleted(flow.isDeleted())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public Flow toFlow() {
.variables(this.variables)
.tasks(this.tasks)
.errors(this.errors)
.always(this.always)
.listeners(this.listeners)
.triggers(this.triggers)
.pluginDefaults(this.pluginDefaults)
Expand Down Expand Up @@ -69,6 +70,7 @@ public static FlowWithSource of(Flow flow, String source) {
.variables(flow.variables)
.tasks(flow.tasks)
.errors(flow.errors)
.always(flow.always)
.listeners(flow.listeners)
.triggers(flow.triggers)
.pluginDefaults(flow.pluginDefaults)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ public interface FlowableTask <T extends Output> {
@PluginProperty
List<Task> getErrors();

@Schema(
title = "List of tasks to run after any tasks failed or success on this FlowableTask."
)
@PluginProperty
List<Task> getAlways();

/**
* Create the topology representation of a flowable task.
* <p>
Expand Down Expand Up @@ -71,6 +77,7 @@ default Optional<State.Type> resolveState(RunContext runContext, Execution execu
execution,
this.childTasks(runContext, parentTaskRun),
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
FlowableUtils.resolveTasks(this.getAlways(), parentTaskRun),
parentTaskRun,
runContext,
isAllowFailure(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {
@Valid
private List<Task> errors;

@Valid
private List<Task> always;

@NotNull
@Builder.Default
private final boolean deleted = false;
Expand Down Expand Up @@ -138,6 +141,7 @@ public Template toDeleted() {
this.description,
this.tasks,
this.errors,
this.always,
true
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ private Optional<WorkerTaskResult> childWorkerTaskResult(Flow flow, Execution ex
// Then wait for completion (KILLED or whatever) on child tasks to KILLED the parent one.
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(
flowableParent.childTasks(runContext, parentTaskRun),
FlowableUtils.resolveTasks(flowableParent.getErrors(), parentTaskRun)
FlowableUtils.resolveTasks(flowableParent.getErrors(), parentTaskRun),
FlowableUtils.resolveTasks(flowableParent.getAlways(), parentTaskRun)
);

List<TaskRun> taskRunByTasks = execution.findTaskRunByTasks(currentTasks, parentTaskRun);
Expand Down Expand Up @@ -426,7 +427,8 @@ private Executor handleNext(Executor executor) {
.resolveSequentialNexts(
executor.getExecution(),
ResolvedTask.of(executor.getFlow().getTasks()),
ResolvedTask.of(executor.getFlow().getErrors())
ResolvedTask.of(executor.getFlow().getErrors()),
ResolvedTask.of(executor.getFlow().getAlways())
);

if (nextTaskRuns.isEmpty()) {
Expand Down Expand Up @@ -686,7 +688,8 @@ private Executor handleEnd(Executor executor) {

List<ResolvedTask> currentTasks = executor.getExecution().findTaskDependingFlowState(
ResolvedTask.of(executor.getFlow().getTasks()),
ResolvedTask.of(executor.getFlow().getErrors())
ResolvedTask.of(executor.getFlow().getErrors()),
ResolvedTask.of(executor.getFlow().getAlways())
);

if (!executor.getExecution().isTerminated(currentTasks)) {
Expand Down
31 changes: 23 additions & 8 deletions core/src/main/java/io/kestra/core/runners/FlowableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.stream.Stream;

public class FlowableUtils {
@Deprecated(forRemoval = true)
public static List<NextTaskRun> resolveSequentialNexts(
Execution execution,
List<ResolvedTask> tasks
Expand All @@ -31,18 +32,20 @@ public static List<NextTaskRun> resolveSequentialNexts(
public static List<NextTaskRun> resolveSequentialNexts(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors
List<ResolvedTask> errors,
List<ResolvedTask> always
) {
return resolveSequentialNexts(execution, tasks, errors, null);
return resolveSequentialNexts(execution, tasks, errors, always, null);
}

public static List<NextTaskRun> resolveSequentialNexts(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> always,
TaskRun parentTaskRun
) {
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, parentTaskRun);
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, always, parentTaskRun);

return FlowableUtils.innerResolveSequentialNexts(execution, currentTasks, parentTaskRun);
}
Expand Down Expand Up @@ -92,9 +95,10 @@ public static List<NextTaskRun> resolveWaitForNext(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> always,
TaskRun parentTaskRun
) {
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, parentTaskRun);
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, always, parentTaskRun);

// nothing
if (currentTasks == null || currentTasks.isEmpty() || execution.getState().getCurrent() == State.Type.KILLING) {
Expand Down Expand Up @@ -140,12 +144,13 @@ public static Optional<State.Type> resolveState(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> always,
TaskRun parentTaskRun,
RunContext runContext,
boolean allowFailure,
boolean allowWarning
) {
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, parentTaskRun);
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(tasks, errors, always, parentTaskRun);

if (currentTasks == null) {
runContext.logger().warn(
Expand Down Expand Up @@ -197,12 +202,15 @@ public static List<NextTaskRun> resolveParallelNexts(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> always,
TaskRun parentTaskRun,
Integer concurrency
) {
return resolveParallelNexts(
execution,
tasks, errors,
tasks,
errors,
always,
parentTaskRun,
concurrency,
(nextTaskRunStream, taskRuns) -> nextTaskRunStream
Expand All @@ -217,6 +225,7 @@ public static List<NextTaskRun> resolveConcurrentNexts(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> always,
TaskRun parentTaskRun,
Integer concurrency
) {
Expand All @@ -227,6 +236,7 @@ public static List<NextTaskRun> resolveConcurrentNexts(
List<ResolvedTask> allTasks = execution.findTaskDependingFlowState(
tasks,
errors,
always,
parentTaskRun
);

Expand All @@ -249,7 +259,7 @@ public static List<NextTaskRun> resolveConcurrentNexts(
if (taskRuns.isEmpty()) {
Map<String, List<ResolvedTask>> collect = allTasks
.stream()
.collect(Collectors.groupingBy(resolvedTask -> resolvedTask.getValue(), () -> new LinkedHashMap<>(), Collectors.toList()));
.collect(Collectors.groupingBy(ResolvedTask::getValue, LinkedHashMap::new, Collectors.toList()));
return collect.values().stream()
.limit(concurrencySlots)
.map(resolvedTasks -> resolvedTasks.getFirst().toNextTaskRun(execution))
Expand All @@ -260,7 +270,8 @@ public static List<NextTaskRun> resolveConcurrentNexts(
// start as many tasks as we have concurrency slots
Map<String, List<ResolvedTask>> collect = allTasks
.stream()
.collect(Collectors.groupingBy(resolvedTask -> resolvedTask.getValue(), () -> new LinkedHashMap<>(), Collectors.toList()));
.collect(Collectors.groupingBy(ResolvedTask::getValue, LinkedHashMap::new, Collectors.toList()));

return collect.values().stream()
.map(resolvedTasks -> filterCreated(resolvedTasks, taskRuns, parentTaskRun))
.filter(resolvedTasks -> !resolvedTasks.isEmpty())
Expand All @@ -281,6 +292,7 @@ public static List<NextTaskRun> resolveDagNexts(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> always,
TaskRun parentTaskRun,
Integer concurrency,
List<Dag.DagTask> taskDependencies
Expand All @@ -289,6 +301,7 @@ public static List<NextTaskRun> resolveDagNexts(
execution,
tasks,
errors,
always,
parentTaskRun,
concurrency,
(nextTaskRunStream, taskRuns) -> nextTaskRunStream
Expand Down Expand Up @@ -321,6 +334,7 @@ public static List<NextTaskRun> resolveParallelNexts(
Execution execution,
List<ResolvedTask> tasks,
List<ResolvedTask> errors,
List<ResolvedTask> always,
TaskRun parentTaskRun,
Integer concurrency,
BiFunction<Stream<NextTaskRun>, List<TaskRun>, Stream<NextTaskRun>> nextTaskRunFunction
Expand All @@ -332,6 +346,7 @@ public static List<NextTaskRun> resolveParallelNexts(
List<ResolvedTask> currentTasks = execution.findTaskDependingFlowState(
tasks,
errors,
always,
parentTaskRun
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ public class AllowFailure extends Sequential implements FlowableTask<VoidOutput>
public Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
List<ResolvedTask> resolvedTasks = this.childTasks(runContext, parentTaskRun);
List<ResolvedTask> resolvedErrors = FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun);
List<ResolvedTask> resolvedAlways = FlowableUtils.resolveTasks(this.getAlways(), parentTaskRun);

Optional<State.Type> type = FlowableUtils.resolveState(
execution,
resolvedTasks,
resolvedErrors,
resolvedAlways,
parentTaskRun,
runContext,
this.isAllowFailure(),
Expand All @@ -77,6 +79,7 @@ public Optional<State.Type> resolveState(RunContext runContext, Execution execut
execution,
resolvedTasks,
null,
resolvedAlways,
parentTaskRun,
runContext,
this.isAllowFailure(),
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/java/io/kestra/plugin/core/flow/Dag.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public class Dag extends Task implements FlowableTask<VoidOutput> {
@PluginProperty
protected List<Task> errors;

@Valid
protected List<Task> always;

@Override
public GraphCluster tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException {
GraphCluster subGraph = new GraphCluster(this, taskRun, parentValues, RelationType.DYNAMIC);
Expand Down Expand Up @@ -138,7 +141,10 @@ public List<Task> allChildTasks() {
return Stream
.concat(
this.tasks != null ? this.tasks.stream().map(DagTask::getTask) : Stream.empty(),
this.errors != null ? this.errors.stream() : Stream.empty()
Stream.concat(
this.errors != null ? this.errors.stream() : Stream.empty(),
this.always != null ? this.always.stream() : Stream.empty()
)
)
.toList();
}
Expand All @@ -156,6 +162,7 @@ public List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution
execution,
this.childTasks(runContext, parentTaskRun),
FlowableUtils.resolveTasks(this.errors, parentTaskRun),
FlowableUtils.resolveTasks(this.always, parentTaskRun),
parentTaskRun,
this.concurrent,
this.tasks
Expand Down
Loading
Loading