Skip to content

Commit

Permalink
Fix edge cases and add ability to clone activities
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Jan 6, 2025
1 parent 1388019 commit f315c14
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public interface Workflow {

ActivityWrapper addActivity( String activityType, RenderModel renderModel );

ActivityWrapper cloneActivity( UUID activityId, double posX, double posY );

void deleteActivity( UUID activityId, StorageManager sm );

void addEdge( EdgeModel model, StorageManager sm );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,15 @@ private void addActivity( ActivityWrapper activity ) {
}


@Override
public ActivityWrapper cloneActivity( UUID activityId, double posX, double posY ) {
ActivityModel clonedModel = getActivityOrThrow( activityId ).toModel( false ).createCopy( posX, posY );
ActivityWrapper wrapper = ActivityWrapper.fromModel( clonedModel );
addActivity( wrapper );
return wrapper;
}


@Override
public void deleteActivity( UUID activityId, StorageManager sm ) {
Set<UUID> reachable = getReachableActivities( activityId, false );
Expand All @@ -358,8 +367,20 @@ public void addEdge( EdgeModel model, StorageManager sm ) {
if ( getEdge( model ) != null ) {
throw new GenericRuntimeException( "Cannot add an edge that is already part of this workflow." );
}
if ( model.getFromId().equals( model.getToId() ) ) {
throw new GenericRuntimeException( "Cannot add an edge with same source and target activity." );
}
// We allow the workflow to temporarily have more than 1 in-edge per data input, to allow the UI to swap the source activity.
// The occupation validation is performed before execution.

Edge edge = Edge.fromModel( model, activities );
edges.computeIfAbsent( model.toPair(), k -> new ArrayList<>() ).add( edge );

if ( !(new CycleDetector<>( toDag() ).findCycles().isEmpty()) ) {
edges.get( model.toPair() ).remove( edge );
throw new GenericRuntimeException( "Cannot add an edge that would result in a cycle." );
}

reset( edge.getTo().getId(), sm );
}

Expand Down Expand Up @@ -428,6 +449,9 @@ public void validateStructure( StorageManager sm, AttributedDirectedGraph<UUID,
}

for ( ExecutionEdge execEdge : subDag.edgeSet() ) {
if ( execEdge.getSource().equals( execEdge.getTarget() ) ) {
throw new IllegalStateException( "Source activity must differ from target activity for edge: " + execEdge );
}
if ( !activities.containsKey( execEdge.getSource() ) || !activities.containsKey( execEdge.getTarget() ) ) {
throw new IllegalStateException( "Source and target activities of an edge must be part of the workflow: " + execEdge );
}
Expand Down Expand Up @@ -494,7 +518,7 @@ public void validateStructure( StorageManager sm, AttributedDirectedGraph<UUID,
}

}
if ( !requiredInPorts.isEmpty() && wrapper.getState() != ActivityState.SAVED) { // already saved activities do not need their predecessors in the subDag
if ( !requiredInPorts.isEmpty() && wrapper.getState() != ActivityState.SAVED ) { // already saved activities do not need their predecessors in the subDag
throw new IllegalStateException( "Activity is missing the required data input(s) " + requiredInPorts + ": " + wrapper );
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public static <V, E extends DefaultEdge> V findInvertedTreeRoot( AttributedDirec
* @return the subgraph of graph induced by nodes
*/
public static <V, E extends DefaultEdge> AttributedDirectedGraph<V, E> getInducedSubgraph( AttributedDirectedGraph<V, E> graph, Collection<V> nodes ) {
return getInducedSubgraph( graph, nodes, Set.of() );
}


public static <V, E extends DefaultEdge> AttributedDirectedGraph<V, E> getInducedSubgraph( AttributedDirectedGraph<V, E> graph, Collection<V> nodes, Set<E> edgesToIgnore ) {
AttributedDirectedGraph<V, E> subgraph = AttributedDirectedGraph.create( (AttributedEdgeFactory<V, E>) graph.getEdgeFactory() );
Set<V> vertexSet = graph.vertexSet();
for ( V n : nodes ) {
Expand All @@ -65,7 +70,7 @@ public static <V, E extends DefaultEdge> AttributedDirectedGraph<V, E> getInduce
}

vertexSet = subgraph.vertexSet();
Set<E> insertedEdges = new HashSet<>();
Set<E> insertedEdges = new HashSet<>( edgesToIgnore );
for ( V source : vertexSet ) {
for ( E edge : graph.getOutwardEdges( source ) ) {
if ( insertedEdges.contains( edge ) ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ private AttributedDirectedGraph<UUID, ExecutionEdge> prepareExecutionDag( List<U
throw new GenericRuntimeException( "Cannot prepare executionDag for empty targets" );
}
Set<UUID> savedActivities = new HashSet<>();
Set<ExecutionEdge> edgesToIgnore = new HashSet<>(); // edges that go from finished to successfully executed activities (see explanation of compromise solution below)
Set<UUID> finishedActivities = new HashSet<>();
Queue<UUID> open = new LinkedList<>( targets );
Set<UUID> visited = new HashSet<>();

Expand All @@ -169,6 +171,17 @@ private AttributedDirectedGraph<UUID, ExecutionEdge> prepareExecutionDag( List<U
if ( nWrapper.getState() == ActivityState.SAVED ) {
savedActivities.add( n );
continue;
} else if ( nWrapper.getState() == ActivityState.FINISHED ) {
// this can happen when a new outgoing edge to a FINISHED activity has been created between executions.
// problem: the entire fused / piped subtree that this activity belongs to and all executed successor subtrees also need to be executed again
// solution: either reset activities already when user creates edge (but this is not very robust), or handle it here, possibly recomputing large parts of the DAG
// compromise solution: only recompute this activity (and unsaved data-edge predecessors) -> assumption: activities are idempotent (not actually true, but good enough)
List<Edge> outEdges = workflow.getOutEdges( n );
finishedActivities.add( n );
edgesToIgnore.addAll( outEdges.stream()
.filter( e -> !finishedActivities.contains( e.getTo().getId() ) && e.getTo().getState().isSuccess() )
.map( e -> new ExecutionEdge( e.getFrom().getId(), e.getTo().getId(), e ) )
.toList() );
}

nWrapper.resetExecution();
Expand All @@ -183,7 +196,7 @@ private AttributedDirectedGraph<UUID, ExecutionEdge> prepareExecutionDag( List<U
}
}

AttributedDirectedGraph<UUID, ExecutionEdge> execDag = GraphUtils.getInducedSubgraph( workflow.toDag(), visited );
AttributedDirectedGraph<UUID, ExecutionEdge> execDag = GraphUtils.getInducedSubgraph( workflow.toDag(), visited, edgesToIgnore );

// handle saved activities (= simulate that they finish their execution successfully)
for ( UUID saved : savedActivities ) {
Expand Down Expand Up @@ -226,7 +239,6 @@ public List<ExecutionSubmission> handleExecutionResult( ExecutionResult result )

updateGraph( result.isSuccess(), result.getActivities(), result.getRootId(), execDag );
updatePartitions();
executionMonitor.forwardStates();

log.warn( "Remaining activities: " + remainingActivities );

Expand All @@ -244,7 +256,9 @@ public List<ExecutionSubmission> handleExecutionResult( ExecutionResult result )
}

try {
return computeNextSubmissions();
List<ExecutionSubmission> next = computeNextSubmissions();
executionMonitor.forwardStates();
return next;
} catch ( Exception e ) {
// this should never happen, but as a fallback we finish workflow execution
log.error( "An unexpected error occurred while determining the next activities to be submitted", e );
Expand Down Expand Up @@ -277,8 +291,8 @@ private void updateTransactions( ExecutionResult result ) throws TransactionExce

private List<ExecutionSubmission> computeNextSubmissions() {
List<SubmissionFactory> factories = optimizer.computeNextTrees( maxWorkers - pendingCount, activePartition.commonType );
if (pendingCount == 0 && factories.isEmpty()) {
throw new IllegalStateException("The optimizer is unable to determine the next activity to be executed");
if ( pendingCount == 0 && factories.isEmpty() ) {
throw new IllegalStateException( "The optimizer is unable to determine the next activity to be executed" );
}
pendingCount += factories.size();
List<ExecutionSubmission> submissions = factories.stream().map( f -> f.create( sm, workflow ) ).toList();
Expand Down Expand Up @@ -372,7 +386,7 @@ private void propagateResult( boolean isActive, Edge edge, AttributedDirectedGra
case INACTIVE -> {
target.setState( ActivityState.SKIPPED );
remainingActivities.remove( target.getId() );
if ( targetPartition != null) { // in case of initial propagation for saved activities, there is no targetPartition yet
if ( targetPartition != null ) { // in case of initial propagation for saved activities, there is no targetPartition yet
targetPartition.setResolved( target.getId(), false ); // no need to catch the exception, as the transaction is already rolled back
}
// a skipped activity does NOT count as failed -> onFail control edges also become INACTIVE
Expand Down Expand Up @@ -400,7 +414,7 @@ private void updatePartitions() {


private void setFinished() {
if (!remainingActivities.isEmpty()) {
if ( !remainingActivities.isEmpty() ) {
setStates( remainingActivities, ActivityState.SKIPPED );
}
workflow.setState( WorkflowState.IDLE );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package org.polypheny.db.workflow.models;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -71,4 +73,19 @@ public ActivityModel( String type, UUID id, Map<String, JsonNode> settings, Acti
this.invalidReason = null;
}


public ActivityModel createCopy( double posX, double posY ) {
ObjectMapper mapper = new ObjectMapper();
try {
ActivityModel trueCopy = mapper.readValue( mapper.writeValueAsString( this ), ActivityModel.class ); // but we need a copy with a different UUID...
return new ActivityModel( trueCopy.type,
UUID.randomUUID(),
trueCopy.settings,
trueCopy.config,
new RenderModel( posX, posY, trueCopy.rendering.getName(), trueCopy.rendering.getNotes() ) );
} catch ( JsonProcessingException e ) {
throw new RuntimeException( e );
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public enum RequestType {
CREATE_ACTIVITY,
DELETE_ACTIVITY,
UPDATE_ACTIVITY,
CLONE_ACTIVITY,
CREATE_EDGE,
DELETE_EDGE,
EXECUTE,
Expand Down Expand Up @@ -75,6 +76,15 @@ public static class DeleteActivityRequest extends WsRequest {
}


public static class CloneActivityRequest extends WsRequest {

public UUID targetId;
public double posX;
public double posY;

}


public static class CreateEdgeRequest extends WsRequest {

public EdgeModel edge;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.PROGRESS_UPDATE;
import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.RENDERING_UPDATE;
import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.STATE_UPDATE;
import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.WORKFLOW_UPDATE;

import java.util.List;
import java.util.Map;
Expand All @@ -35,7 +34,6 @@
import org.polypheny.db.workflow.models.ActivityModel;
import org.polypheny.db.workflow.models.EdgeModel;
import org.polypheny.db.workflow.models.RenderModel;
import org.polypheny.db.workflow.models.WorkflowModel;
import org.polypheny.db.workflow.models.requests.WsRequest.RequestType;

/**
Expand All @@ -56,7 +54,6 @@ public WsResponse( ResponseType type, @Nullable UUID parentId ) {


public enum ResponseType {
WORKFLOW_UPDATE, // entire workflow
ACTIVITY_UPDATE, // single activity
RENDERING_UPDATE, // only renderModel of an activity
STATE_UPDATE, // all edge and activity states
Expand All @@ -65,19 +62,6 @@ public enum ResponseType {
}


public static class WorkflowUpdateResponse extends WsResponse {

public final WorkflowModel workflow;


public WorkflowUpdateResponse( @Nullable UUID parentId, WorkflowModel workflow ) {
super( WORKFLOW_UPDATE, parentId );
this.workflow = workflow;
}

}


public static class ActivityUpdateResponse extends WsResponse {

public final ActivityModel activity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.polypheny.db.workflow.models.WorkflowConfigModel;
import org.polypheny.db.workflow.models.WorkflowModel;
import org.polypheny.db.workflow.models.requests.WsRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.CloneActivityRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.CreateActivityRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.CreateEdgeRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.DeleteActivityRequest;
Expand Down Expand Up @@ -165,6 +166,11 @@ public void handleRequest( UpdateActivityRequest request ) {
}


public void handleRequest( CloneActivityRequest request ) {
throwUnsupported( request );
}


public void handleRequest( CreateEdgeRequest request ) {
throwUnsupported( request );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.polypheny.db.workflow.models.SessionModel;
import org.polypheny.db.workflow.models.SessionModel.SessionModelType;
import org.polypheny.db.workflow.models.WorkflowDefModel;
import org.polypheny.db.workflow.models.requests.WsRequest.CloneActivityRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.CreateActivityRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.CreateEdgeRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.DeleteActivityRequest;
Expand Down Expand Up @@ -98,6 +99,14 @@ public void handleRequest( UpdateActivityRequest request ) {
}


@Override
public void handleRequest( CloneActivityRequest request ) {
throwIfNotEditable();
ActivityWrapper activity = workflow.cloneActivity( request.targetId, request.posX, request.posY );
broadcastMessage( new ActivityUpdateResponse( request.msgId, activity ) );
}


@Override
public void handleRequest( CreateEdgeRequest request ) {
throwIfNotEditable();
Expand Down Expand Up @@ -149,7 +158,7 @@ public void handleRequest( InterruptRequest request ) {

@Override
public SessionModel toModel() {
return new SessionModel( SessionModelType.USER_SESSION, sessionId, getSubscriberCount(), wId, openedVersion, workflowDef);
return new SessionModel( SessionModelType.USER_SESSION, sessionId, getSubscriberCount(), wId, openedVersion, workflowDef );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.polypheny.db.workflow.models.requests.WsRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.CloneActivityRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.CreateActivityRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.CreateEdgeRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.DeleteActivityRequest;
Expand Down Expand Up @@ -77,6 +78,7 @@ public void onMessage( final WsMessageContext ctx ) {
case CREATE_ACTIVITY -> session.handleRequest( ctx.messageAsClass( CreateActivityRequest.class ) );
case UPDATE_ACTIVITY -> session.handleRequest( ctx.messageAsClass( UpdateActivityRequest.class ) );
case DELETE_ACTIVITY -> session.handleRequest( ctx.messageAsClass( DeleteActivityRequest.class ) );
case CLONE_ACTIVITY -> session.handleRequest( ctx.messageAsClass( CloneActivityRequest.class ) );
case CREATE_EDGE -> session.handleRequest( ctx.messageAsClass( CreateEdgeRequest.class ) );
case DELETE_EDGE -> session.handleRequest( ctx.messageAsClass( DeleteEdgeRequest.class ) );
case EXECUTE -> session.handleRequest( ctx.messageAsClass( ExecuteRequest.class ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@
import org.polypheny.db.workflow.WorkflowUtils;
import org.polypheny.db.workflow.dag.Workflow;
import org.polypheny.db.workflow.dag.activities.ActivityWrapper.ActivityState;
import org.polypheny.db.workflow.dag.edges.ControlEdge;
import org.polypheny.db.workflow.engine.monitoring.ExecutionMonitor;
import org.polypheny.db.workflow.engine.storage.StorageManager;
import org.polypheny.db.workflow.engine.storage.StorageManagerImpl;
import org.polypheny.db.workflow.engine.storage.StorageUtils;
import org.polypheny.db.workflow.models.EdgeModel;

class GlobalSchedulerTest {

Expand Down Expand Up @@ -164,7 +166,7 @@ void advancedFusionTest() throws Exception {


@Test
void RelValuesFusionTest() throws Exception {
void relValuesFusionTest() throws Exception {
Workflow workflow = WorkflowUtils.getRelValuesFusion();
List<UUID> ids = WorkflowUtils.getTopologicalActivityIds( workflow );
scheduler.startExecution( workflow, sm, null );
Expand All @@ -173,6 +175,31 @@ void RelValuesFusionTest() throws Exception {
}


@Test
void executeModifiedFusionTest() throws Exception {
Pair<Workflow, List<UUID>> pair = WorkflowUtils.getAdvancedFusion();
Workflow workflow = pair.left;
List<UUID> ids = pair.right;

scheduler.startExecution( workflow, sm, ids.get( 3 ) );
scheduler.awaitResultProcessor( 5000 );

workflow.deleteEdge( workflow.getOutEdges( ids.get( 3 ) ).get( 0 ).toModel( false ), sm );
workflow.addEdge( new EdgeModel( ids.get( 0 ), ids.get( 4 ), 0, 0, false, null ), sm ); // activity 0 does not yet have a checkpoint, but we add an edge that requires one
workflow.addEdge( new EdgeModel( ids.get( 3 ), ids.get( 4 ), ControlEdge.SUCCESS_PORT, 0, true, null ), sm );

assertEquals( ActivityState.FINISHED, workflow.getActivity( ids.get( 0 ) ).getState() );
assertEquals( ActivityState.SAVED, workflow.getActivity( ids.get( 3 ) ).getState() );
assertEquals( ActivityState.IDLE, workflow.getActivity( ids.get( 4 ) ).getState() );

scheduler.startExecution( workflow, sm, null );
scheduler.awaitResultProcessor( 5000 );
assertEquals( ActivityState.FINISHED, workflow.getActivity( ids.get( 0 ) ).getState() ); // 0 is fused with 4
assertEquals( ActivityState.SAVED, workflow.getActivity( ids.get( 3 ) ).getState() );
assertEquals( ActivityState.SAVED, workflow.getActivity( ids.get( 4 ) ).getState() );
}


@Test
void relExtractTest() throws Exception {
Workflow workflow = WorkflowUtils.getExtractWorkflow();
Expand Down Expand Up @@ -402,6 +429,7 @@ void executionMonitorTest() throws Exception {

}


@Test
@Disabled
// TODO: delete when no longer required
Expand Down

0 comments on commit f315c14

Please sign in to comment.