Skip to content

Commit

Permalink
Implement more workflow and activity config settings
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Jan 8, 2025
1 parent 4bae975 commit 230aff0
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,15 @@ public interface Workflow {

ActivityWrapper updateActivity( UUID activityId, @Nullable Map<String, JsonNode> settings, @Nullable ActivityConfigModel config, @Nullable RenderModel rendering, StorageManager sm );

/**
* Returns the number of milliseconds until the execution of the subtree containing the given activities times out.
* If there is more than one activitiy in the set, the sum of all timeout durations is returned.
*
* @param activities the activities of the subtree to be executed
* @return the timout duration in millis, or 0 if no timeout is desired.
*/
int getTimeoutMillis( Set<UUID> activities );

AttributedDirectedGraph<UUID, ExecutionEdge> toDag();

void validateStructure( StorageManager sm ) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,22 @@ public ActivityWrapper updateActivity( UUID activityId, @Nullable Map<String, Js
}


@Override
public int getTimeoutMillis( Set<UUID> activities ) {
int baseTimeout = Math.max( 0, config.getTimeoutMillis() );
int timeout = 0;
for ( UUID activityId : activities ) {
int activityTimeout = Math.max( 0, getActivity( activityId ).getConfig().getTimeoutMillis() );
if ( activityTimeout > 0 ) { // override base timeout
timeout += activityTimeout;
} else {
timeout += baseTimeout; // activity cannot disable base timeout
}
}
return timeout;
}


@Override
public AttributedDirectedGraph<UUID, ExecutionEdge> toDag() {
AttributedDirectedGraph<UUID, ExecutionEdge> dag = AttributedDirectedGraph.create( new ExecutionEdgeFactory() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.polypheny.db.workflow.dag.activities.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.polypheny.db.algebra.AlgNode;
Expand Down Expand Up @@ -69,7 +70,9 @@ public IdentityActivity() {

@Override
public List<Optional<AlgDataType>> previewOutTypes( List<Optional<AlgDataType>> inTypes, SettingsPreview settings ) throws ActivityException {
return List.of( inTypes.get( 0 ) );
List<Optional<AlgDataType>> outTypes = new ArrayList<>(); // TODO: this is a workaround since inType could be null and List.of() requires non null
outTypes.add( inTypes.get( 0 ) );
return outTypes;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class ExecutionSubmission {
UUID rootId;
CommonType commonType;
UUID sessionId;
int timeoutMillis; // 0 for no timeout
ExecutionInfo info;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -54,6 +56,7 @@ public class GlobalScheduler {
private final Set<UUID> interruptedSessions = ConcurrentHashMap.newKeySet();
private final ThreadPoolExecutor executor;
private final CompletionService<ExecutionResult> completionService;
private final ScheduledExecutorService timeoutService;

private Thread resultProcessor; // When no workflow is being executed, the thread may die and be replaced by a new thread when execution starts again

Expand All @@ -63,6 +66,7 @@ private GlobalScheduler() {
60L, TimeUnit.SECONDS,
new SynchronousQueue<>() );
completionService = new ExecutorCompletionService<>( executor );
timeoutService = new ScheduledThreadPoolExecutor( 1 );
}


Expand Down Expand Up @@ -169,6 +173,15 @@ private void submit( List<ExecutionSubmission> submissions ) {

activeSubmissions.computeIfAbsent( sessionId, k -> ConcurrentHashMap.newKeySet() ).add( submission );

int timeoutMillis = submission.getTimeoutMillis();
if ( timeoutMillis > 0 ) {
timeoutService.schedule( () -> {
if ( submission.getInfo().getState() == ExecutionState.EXECUTING ) {
submission.getExecutor().interrupt();
}
}, Math.max( 50, timeoutMillis ), TimeUnit.MILLISECONDS ); // minimum timeout to ensure executor was actually called
}

ExecutionResult result;
try {
submission.getExecutor().call();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,23 @@ private void updateGraph( boolean isSuccess, Set<UUID> activities, UUID rootId,
}
}

if ( workflow.getConfig().isDropUnusedCheckpoints() ) {
for ( UUID n : dag.vertexSet() ) {
ActivityWrapper wrapper = workflow.getActivity( n );
if ( wrapper.getConfig().isEnforceCheckpoint() || wrapper.getState() != ActivityState.SAVED ) {
continue;
}
if ( dag.getOutwardEdges( n ).stream().allMatch( execEdge -> {
ActivityState state = workflow.getActivity( execEdge.getTarget() ).getState();
return state.isExecuted() || state == ActivityState.SKIPPED || workflow.getEdge( execEdge ).isIgnored();
} )
) {
sm.dropCheckpoints( n ); // all successors are already executed
wrapper.setState( ActivityState.FINISHED );
}
}
}

// TODO: update entire workflow instead of dag?
if ( !isInitialUpdate ) {
for ( UUID n : GraphUtils.getTopologicalIterable( dag, rootId, false ) ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public ExecutionSubmission create( StorageManager sm, Workflow wf ) {
case VARIABLE_WRITER -> new VariableWriterExecutor( sm, wf, getRootActivity(), info );
};

return new ExecutionSubmission( executor, activities, root, commonType, sm.getSessionId(), info );
return new ExecutionSubmission( executor, activities, root, commonType, sm.getSessionId(), wf.getTimeoutMillis( activities ), info );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
public class ActivityConfigModel {

boolean enforceCheckpoint;
int timeoutMillis; // 0 for no timeout
String[] preferredStores; // one entry per output

@JsonProperty(required = true)
Expand All @@ -52,7 +53,7 @@ public String getPreferredStore( int outputIdx ) {


public static ActivityConfigModel of() {
return new ActivityConfigModel( false, null, CommonType.NONE, ControlStateMerger.AND_AND );
return new ActivityConfigModel( false, 0, null, CommonType.NONE, ControlStateMerger.AND_AND );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class WorkflowConfigModel {
Map<DataModel, String> preferredStores;
boolean fusionEnabled;
boolean pipelineEnabled;
int timeoutMillis; // 0 for no timeout
boolean dropUnusedCheckpoints;
int maxWorkers;
int pipelineQueueCapacity;
// TODO: config value for changing behavior of deleting created checkpoints
Expand All @@ -41,6 +43,8 @@ public static WorkflowConfigModel of() {
DataModel.GRAPH, WorkflowManager.DEFAULT_CHECKPOINT_ADAPTER ),
true,
true,
0,
false,
1,
1000
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public static Workflow getWorkflow( List<ActivityModel> activities, List<EdgeMod
Map.of( DataModel.RELATIONAL, "hsqldb", DataModel.DOCUMENT, "hsqldb", DataModel.GRAPH, "hsqldb" ),
fusionEnabled,
pipelineEnabled,
0,
false,
maxWorkers,
10 // low on purpose to observe blocking
);
Expand Down Expand Up @@ -429,7 +431,7 @@ private static Pair<Workflow, List<UUID>> getWorkflowWithActivities( List<Activi


private static ActivityModel getCommonActivity( String type, Map<String, JsonNode> settings, CommonType commonType ) {
ActivityConfigModel config = new ActivityConfigModel( false, null, commonType, ControlStateMerger.AND_AND );
ActivityConfigModel config = new ActivityConfigModel( false, 0, null, commonType, ControlStateMerger.AND_AND );
return new ActivityModel( type, UUID.randomUUID(), settings, config, RenderModel.of() );
}

Expand Down

0 comments on commit 230aff0

Please sign in to comment.