From 230aff02054a1c9677c0261570e381bb944baabe Mon Sep 17 00:00:00 2001 From: Tobias Weber Date: Wed, 8 Jan 2025 18:38:43 +0100 Subject: [PATCH] Implement more workflow and activity config settings --- .../org/polypheny/db/workflow/dag/Workflow.java | 9 +++++++++ .../polypheny/db/workflow/dag/WorkflowImpl.java | 16 ++++++++++++++++ .../dag/activities/impl/IdentityActivity.java | 5 ++++- .../engine/scheduler/ExecutionSubmission.java | 1 + .../engine/scheduler/GlobalScheduler.java | 13 +++++++++++++ .../engine/scheduler/WorkflowScheduler.java | 17 +++++++++++++++++ .../scheduler/optimizer/WorkflowOptimizer.java | 2 +- .../db/workflow/models/ActivityConfigModel.java | 3 ++- .../db/workflow/models/WorkflowConfigModel.java | 4 ++++ .../polypheny/db/workflow/WorkflowUtils.java | 4 +++- 10 files changed, 70 insertions(+), 4 deletions(-) diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java index 601c4a9282..d23d01214f 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java @@ -177,6 +177,15 @@ public interface Workflow { ActivityWrapper updateActivity( UUID activityId, @Nullable Map settings, @Nullable ActivityConfigModel config, @Nullable RenderModel rendering, StorageManager sm ); + /** + * Returns the number of milliseconds until the execution of the subtree containing the given activities times out. + * If there is more than one activitiy in the set, the sum of all timeout durations is returned. + * + * @param activities the activities of the subtree to be executed + * @return the timout duration in millis, or 0 if no timeout is desired. + */ + int getTimeoutMillis( Set activities ); + AttributedDirectedGraph toDag(); void validateStructure( StorageManager sm ) throws Exception; diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java index bd81b2243a..7702a1e6a5 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java @@ -432,6 +432,22 @@ public ActivityWrapper updateActivity( UUID activityId, @Nullable Map activities ) { + int baseTimeout = Math.max( 0, config.getTimeoutMillis() ); + int timeout = 0; + for ( UUID activityId : activities ) { + int activityTimeout = Math.max( 0, getActivity( activityId ).getConfig().getTimeoutMillis() ); + if ( activityTimeout > 0 ) { // override base timeout + timeout += activityTimeout; + } else { + timeout += baseTimeout; // activity cannot disable base timeout + } + } + return timeout; + } + + @Override public AttributedDirectedGraph toDag() { AttributedDirectedGraph dag = AttributedDirectedGraph.create( new ExecutionEdgeFactory() ); diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/IdentityActivity.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/IdentityActivity.java index fd6ed14843..ce8a5b755a 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/IdentityActivity.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/IdentityActivity.java @@ -16,6 +16,7 @@ package org.polypheny.db.workflow.dag.activities.impl; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import org.polypheny.db.algebra.AlgNode; @@ -69,7 +70,9 @@ public IdentityActivity() { @Override public List> previewOutTypes( List> inTypes, SettingsPreview settings ) throws ActivityException { - return List.of( inTypes.get( 0 ) ); + List> outTypes = new ArrayList<>(); // TODO: this is a workaround since inType could be null and List.of() requires non null + outTypes.add( inTypes.get( 0 ) ); + return outTypes; } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionSubmission.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionSubmission.java index 80c3486b55..ccd5d800fd 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionSubmission.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionSubmission.java @@ -31,6 +31,7 @@ public class ExecutionSubmission { UUID rootId; CommonType commonType; UUID sessionId; + int timeoutMillis; // 0 for no timeout ExecutionInfo info; diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java index fed62a88c8..6fd9c46ccf 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java @@ -25,6 +25,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -54,6 +56,7 @@ public class GlobalScheduler { private final Set interruptedSessions = ConcurrentHashMap.newKeySet(); private final ThreadPoolExecutor executor; private final CompletionService completionService; + private final ScheduledExecutorService timeoutService; private Thread resultProcessor; // When no workflow is being executed, the thread may die and be replaced by a new thread when execution starts again @@ -63,6 +66,7 @@ private GlobalScheduler() { 60L, TimeUnit.SECONDS, new SynchronousQueue<>() ); completionService = new ExecutorCompletionService<>( executor ); + timeoutService = new ScheduledThreadPoolExecutor( 1 ); } @@ -169,6 +173,15 @@ private void submit( List submissions ) { activeSubmissions.computeIfAbsent( sessionId, k -> ConcurrentHashMap.newKeySet() ).add( submission ); + int timeoutMillis = submission.getTimeoutMillis(); + if ( timeoutMillis > 0 ) { + timeoutService.schedule( () -> { + if ( submission.getInfo().getState() == ExecutionState.EXECUTING ) { + submission.getExecutor().interrupt(); + } + }, Math.max( 50, timeoutMillis ), TimeUnit.MILLISECONDS ); // minimum timeout to ensure executor was actually called + } + ExecutionResult result; try { submission.getExecutor().call(); diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java index b155c3a6a1..cc76a4d5fc 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java @@ -345,6 +345,23 @@ private void updateGraph( boolean isSuccess, Set activities, UUID rootId, } } + if ( workflow.getConfig().isDropUnusedCheckpoints() ) { + for ( UUID n : dag.vertexSet() ) { + ActivityWrapper wrapper = workflow.getActivity( n ); + if ( wrapper.getConfig().isEnforceCheckpoint() || wrapper.getState() != ActivityState.SAVED ) { + continue; + } + if ( dag.getOutwardEdges( n ).stream().allMatch( execEdge -> { + ActivityState state = workflow.getActivity( execEdge.getTarget() ).getState(); + return state.isExecuted() || state == ActivityState.SKIPPED || workflow.getEdge( execEdge ).isIgnored(); + } ) + ) { + sm.dropCheckpoints( n ); // all successors are already executed + wrapper.setState( ActivityState.FINISHED ); + } + } + } + // TODO: update entire workflow instead of dag? if ( !isInitialUpdate ) { for ( UUID n : GraphUtils.getTopologicalIterable( dag, rootId, false ) ) { diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizer.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizer.java index 1f8e84f20c..314ee2cad6 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizer.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizer.java @@ -170,7 +170,7 @@ public ExecutionSubmission create( StorageManager sm, Workflow wf ) { case VARIABLE_WRITER -> new VariableWriterExecutor( sm, wf, getRootActivity(), info ); }; - return new ExecutionSubmission( executor, activities, root, commonType, sm.getSessionId(), info ); + return new ExecutionSubmission( executor, activities, root, commonType, sm.getSessionId(), wf.getTimeoutMillis( activities ), info ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityConfigModel.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityConfigModel.java index 455dcfca70..0db3d76b3e 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityConfigModel.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityConfigModel.java @@ -27,6 +27,7 @@ public class ActivityConfigModel { boolean enforceCheckpoint; + int timeoutMillis; // 0 for no timeout String[] preferredStores; // one entry per output @JsonProperty(required = true) @@ -52,7 +53,7 @@ public String getPreferredStore( int outputIdx ) { public static ActivityConfigModel of() { - return new ActivityConfigModel( false, null, CommonType.NONE, ControlStateMerger.AND_AND ); + return new ActivityConfigModel( false, 0, null, CommonType.NONE, ControlStateMerger.AND_AND ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/WorkflowConfigModel.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/WorkflowConfigModel.java index 8678558f48..54e8187809 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/WorkflowConfigModel.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/WorkflowConfigModel.java @@ -29,6 +29,8 @@ public class WorkflowConfigModel { Map preferredStores; boolean fusionEnabled; boolean pipelineEnabled; + int timeoutMillis; // 0 for no timeout + boolean dropUnusedCheckpoints; int maxWorkers; int pipelineQueueCapacity; // TODO: config value for changing behavior of deleting created checkpoints @@ -41,6 +43,8 @@ public static WorkflowConfigModel of() { DataModel.GRAPH, WorkflowManager.DEFAULT_CHECKPOINT_ADAPTER ), true, true, + 0, + false, 1, 1000 ); diff --git a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/WorkflowUtils.java b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/WorkflowUtils.java index 973148a652..ac56ca321a 100644 --- a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/WorkflowUtils.java +++ b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/WorkflowUtils.java @@ -59,6 +59,8 @@ public static Workflow getWorkflow( List activities, List> getWorkflowWithActivities( List settings, CommonType commonType ) { - ActivityConfigModel config = new ActivityConfigModel( false, null, commonType, ControlStateMerger.AND_AND ); + ActivityConfigModel config = new ActivityConfigModel( false, 0, null, commonType, ControlStateMerger.AND_AND ); return new ActivityModel( type, UUID.randomUUID(), settings, config, RenderModel.of() ); }