From f315c140aaf73146dd4bfe04bf64794a557c2080 Mon Sep 17 00:00:00 2001 From: Tobias Weber Date: Mon, 6 Jan 2025 19:45:08 +0100 Subject: [PATCH] Fix edge cases and add ability to clone activities --- .../polypheny/db/workflow/dag/Workflow.java | 2 ++ .../db/workflow/dag/WorkflowImpl.java | 26 +++++++++++++++- .../workflow/engine/scheduler/GraphUtils.java | 7 ++++- .../engine/scheduler/WorkflowScheduler.java | 28 ++++++++++++----- .../db/workflow/models/ActivityModel.java | 17 +++++++++++ .../workflow/models/requests/WsRequest.java | 10 +++++++ .../workflow/models/responses/WsResponse.java | 16 ---------- .../db/workflow/session/AbstractSession.java | 6 ++++ .../db/workflow/session/UserSession.java | 11 ++++++- .../workflow/session/WorkflowWebSocket.java | 2 ++ .../engine/scheduler/GlobalSchedulerTest.java | 30 ++++++++++++++++++- 11 files changed, 128 insertions(+), 27 deletions(-) diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java index c4198993ff..601c4a9282 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java @@ -167,6 +167,8 @@ public interface Workflow { ActivityWrapper addActivity( String activityType, RenderModel renderModel ); + ActivityWrapper cloneActivity( UUID activityId, double posX, double posY ); + void deleteActivity( UUID activityId, StorageManager sm ); void addEdge( EdgeModel model, StorageManager sm ); diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java index f751ff9c7c..c7d3014072 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java @@ -343,6 +343,15 @@ private void addActivity( ActivityWrapper activity ) { } + @Override + public ActivityWrapper cloneActivity( UUID activityId, double posX, double posY ) { + ActivityModel clonedModel = getActivityOrThrow( activityId ).toModel( false ).createCopy( posX, posY ); + ActivityWrapper wrapper = ActivityWrapper.fromModel( clonedModel ); + addActivity( wrapper ); + return wrapper; + } + + @Override public void deleteActivity( UUID activityId, StorageManager sm ) { Set reachable = getReachableActivities( activityId, false ); @@ -358,8 +367,20 @@ public void addEdge( EdgeModel model, StorageManager sm ) { if ( getEdge( model ) != null ) { throw new GenericRuntimeException( "Cannot add an edge that is already part of this workflow." ); } + if ( model.getFromId().equals( model.getToId() ) ) { + throw new GenericRuntimeException( "Cannot add an edge with same source and target activity." ); + } + // We allow the workflow to temporarily have more than 1 in-edge per data input, to allow the UI to swap the source activity. + // The occupation validation is performed before execution. + Edge edge = Edge.fromModel( model, activities ); edges.computeIfAbsent( model.toPair(), k -> new ArrayList<>() ).add( edge ); + + if ( !(new CycleDetector<>( toDag() ).findCycles().isEmpty()) ) { + edges.get( model.toPair() ).remove( edge ); + throw new GenericRuntimeException( "Cannot add an edge that would result in a cycle." ); + } + reset( edge.getTo().getId(), sm ); } @@ -428,6 +449,9 @@ public void validateStructure( StorageManager sm, AttributedDirectedGraph V findInvertedTreeRoot( AttributedDirec * @return the subgraph of graph induced by nodes */ public static AttributedDirectedGraph getInducedSubgraph( AttributedDirectedGraph graph, Collection nodes ) { + return getInducedSubgraph( graph, nodes, Set.of() ); + } + + + public static AttributedDirectedGraph getInducedSubgraph( AttributedDirectedGraph graph, Collection nodes, Set edgesToIgnore ) { AttributedDirectedGraph subgraph = AttributedDirectedGraph.create( (AttributedEdgeFactory) graph.getEdgeFactory() ); Set vertexSet = graph.vertexSet(); for ( V n : nodes ) { @@ -65,7 +70,7 @@ public static AttributedDirectedGraph getInduce } vertexSet = subgraph.vertexSet(); - Set insertedEdges = new HashSet<>(); + Set insertedEdges = new HashSet<>( edgesToIgnore ); for ( V source : vertexSet ) { for ( E edge : graph.getOutwardEdges( source ) ) { if ( insertedEdges.contains( edge ) ) { diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java index 8823ab7090..4d06986a0e 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java @@ -153,6 +153,8 @@ private AttributedDirectedGraph prepareExecutionDag( List savedActivities = new HashSet<>(); + Set edgesToIgnore = new HashSet<>(); // edges that go from finished to successfully executed activities (see explanation of compromise solution below) + Set finishedActivities = new HashSet<>(); Queue open = new LinkedList<>( targets ); Set visited = new HashSet<>(); @@ -169,6 +171,17 @@ private AttributedDirectedGraph prepareExecutionDag( List assumption: activities are idempotent (not actually true, but good enough) + List outEdges = workflow.getOutEdges( n ); + finishedActivities.add( n ); + edgesToIgnore.addAll( outEdges.stream() + .filter( e -> !finishedActivities.contains( e.getTo().getId() ) && e.getTo().getState().isSuccess() ) + .map( e -> new ExecutionEdge( e.getFrom().getId(), e.getTo().getId(), e ) ) + .toList() ); } nWrapper.resetExecution(); @@ -183,7 +196,7 @@ private AttributedDirectedGraph prepareExecutionDag( List execDag = GraphUtils.getInducedSubgraph( workflow.toDag(), visited ); + AttributedDirectedGraph execDag = GraphUtils.getInducedSubgraph( workflow.toDag(), visited, edgesToIgnore ); // handle saved activities (= simulate that they finish their execution successfully) for ( UUID saved : savedActivities ) { @@ -226,7 +239,6 @@ public List handleExecutionResult( ExecutionResult result ) updateGraph( result.isSuccess(), result.getActivities(), result.getRootId(), execDag ); updatePartitions(); - executionMonitor.forwardStates(); log.warn( "Remaining activities: " + remainingActivities ); @@ -244,7 +256,9 @@ public List handleExecutionResult( ExecutionResult result ) } try { - return computeNextSubmissions(); + List next = computeNextSubmissions(); + executionMonitor.forwardStates(); + return next; } catch ( Exception e ) { // this should never happen, but as a fallback we finish workflow execution log.error( "An unexpected error occurred while determining the next activities to be submitted", e ); @@ -277,8 +291,8 @@ private void updateTransactions( ExecutionResult result ) throws TransactionExce private List computeNextSubmissions() { List factories = optimizer.computeNextTrees( maxWorkers - pendingCount, activePartition.commonType ); - if (pendingCount == 0 && factories.isEmpty()) { - throw new IllegalStateException("The optimizer is unable to determine the next activity to be executed"); + if ( pendingCount == 0 && factories.isEmpty() ) { + throw new IllegalStateException( "The optimizer is unable to determine the next activity to be executed" ); } pendingCount += factories.size(); List submissions = factories.stream().map( f -> f.create( sm, workflow ) ).toList(); @@ -372,7 +386,7 @@ private void propagateResult( boolean isActive, Edge edge, AttributedDirectedGra case INACTIVE -> { target.setState( ActivityState.SKIPPED ); remainingActivities.remove( target.getId() ); - if ( targetPartition != null) { // in case of initial propagation for saved activities, there is no targetPartition yet + if ( targetPartition != null ) { // in case of initial propagation for saved activities, there is no targetPartition yet targetPartition.setResolved( target.getId(), false ); // no need to catch the exception, as the transaction is already rolled back } // a skipped activity does NOT count as failed -> onFail control edges also become INACTIVE @@ -400,7 +414,7 @@ private void updatePartitions() { private void setFinished() { - if (!remainingActivities.isEmpty()) { + if ( !remainingActivities.isEmpty() ) { setStates( remainingActivities, ActivityState.SKIPPED ); } workflow.setState( WorkflowState.IDLE ); diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityModel.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityModel.java index d2482c9f7d..d89185c93f 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityModel.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityModel.java @@ -17,7 +17,9 @@ package org.polypheny.db.workflow.models; import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.List; import java.util.Map; import java.util.UUID; @@ -71,4 +73,19 @@ public ActivityModel( String type, UUID id, Map settings, Acti this.invalidReason = null; } + + public ActivityModel createCopy( double posX, double posY ) { + ObjectMapper mapper = new ObjectMapper(); + try { + ActivityModel trueCopy = mapper.readValue( mapper.writeValueAsString( this ), ActivityModel.class ); // but we need a copy with a different UUID... + return new ActivityModel( trueCopy.type, + UUID.randomUUID(), + trueCopy.settings, + trueCopy.config, + new RenderModel( posX, posY, trueCopy.rendering.getName(), trueCopy.rendering.getNotes() ) ); + } catch ( JsonProcessingException e ) { + throw new RuntimeException( e ); + } + } + } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/requests/WsRequest.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/requests/WsRequest.java index ea5712b045..e681c43aa1 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/requests/WsRequest.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/requests/WsRequest.java @@ -38,6 +38,7 @@ public enum RequestType { CREATE_ACTIVITY, DELETE_ACTIVITY, UPDATE_ACTIVITY, + CLONE_ACTIVITY, CREATE_EDGE, DELETE_EDGE, EXECUTE, @@ -75,6 +76,15 @@ public static class DeleteActivityRequest extends WsRequest { } + public static class CloneActivityRequest extends WsRequest { + + public UUID targetId; + public double posX; + public double posY; + + } + + public static class CreateEdgeRequest extends WsRequest { public EdgeModel edge; diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/responses/WsResponse.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/responses/WsResponse.java index 42621f1c83..bbab3432cf 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/responses/WsResponse.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/responses/WsResponse.java @@ -21,7 +21,6 @@ import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.PROGRESS_UPDATE; import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.RENDERING_UPDATE; import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.STATE_UPDATE; -import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.WORKFLOW_UPDATE; import java.util.List; import java.util.Map; @@ -35,7 +34,6 @@ import org.polypheny.db.workflow.models.ActivityModel; import org.polypheny.db.workflow.models.EdgeModel; import org.polypheny.db.workflow.models.RenderModel; -import org.polypheny.db.workflow.models.WorkflowModel; import org.polypheny.db.workflow.models.requests.WsRequest.RequestType; /** @@ -56,7 +54,6 @@ public WsResponse( ResponseType type, @Nullable UUID parentId ) { public enum ResponseType { - WORKFLOW_UPDATE, // entire workflow ACTIVITY_UPDATE, // single activity RENDERING_UPDATE, // only renderModel of an activity STATE_UPDATE, // all edge and activity states @@ -65,19 +62,6 @@ public enum ResponseType { } - public static class WorkflowUpdateResponse extends WsResponse { - - public final WorkflowModel workflow; - - - public WorkflowUpdateResponse( @Nullable UUID parentId, WorkflowModel workflow ) { - super( WORKFLOW_UPDATE, parentId ); - this.workflow = workflow; - } - - } - - public static class ActivityUpdateResponse extends WsResponse { public final ActivityModel activity; diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/AbstractSession.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/AbstractSession.java index 25f94289b3..28f089709b 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/AbstractSession.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/AbstractSession.java @@ -37,6 +37,7 @@ import org.polypheny.db.workflow.models.WorkflowConfigModel; import org.polypheny.db.workflow.models.WorkflowModel; import org.polypheny.db.workflow.models.requests.WsRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.CloneActivityRequest; import org.polypheny.db.workflow.models.requests.WsRequest.CreateActivityRequest; import org.polypheny.db.workflow.models.requests.WsRequest.CreateEdgeRequest; import org.polypheny.db.workflow.models.requests.WsRequest.DeleteActivityRequest; @@ -165,6 +166,11 @@ public void handleRequest( UpdateActivityRequest request ) { } + public void handleRequest( CloneActivityRequest request ) { + throwUnsupported( request ); + } + + public void handleRequest( CreateEdgeRequest request ) { throwUnsupported( request ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/UserSession.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/UserSession.java index 13412baaa3..bb8c2cf891 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/UserSession.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/UserSession.java @@ -29,6 +29,7 @@ import org.polypheny.db.workflow.models.SessionModel; import org.polypheny.db.workflow.models.SessionModel.SessionModelType; import org.polypheny.db.workflow.models.WorkflowDefModel; +import org.polypheny.db.workflow.models.requests.WsRequest.CloneActivityRequest; import org.polypheny.db.workflow.models.requests.WsRequest.CreateActivityRequest; import org.polypheny.db.workflow.models.requests.WsRequest.CreateEdgeRequest; import org.polypheny.db.workflow.models.requests.WsRequest.DeleteActivityRequest; @@ -98,6 +99,14 @@ public void handleRequest( UpdateActivityRequest request ) { } + @Override + public void handleRequest( CloneActivityRequest request ) { + throwIfNotEditable(); + ActivityWrapper activity = workflow.cloneActivity( request.targetId, request.posX, request.posY ); + broadcastMessage( new ActivityUpdateResponse( request.msgId, activity ) ); + } + + @Override public void handleRequest( CreateEdgeRequest request ) { throwIfNotEditable(); @@ -149,7 +158,7 @@ public void handleRequest( InterruptRequest request ) { @Override public SessionModel toModel() { - return new SessionModel( SessionModelType.USER_SESSION, sessionId, getSubscriberCount(), wId, openedVersion, workflowDef); + return new SessionModel( SessionModelType.USER_SESSION, sessionId, getSubscriberCount(), wId, openedVersion, workflowDef ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/WorkflowWebSocket.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/WorkflowWebSocket.java index 07de3cbfc5..4b7f0499d7 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/WorkflowWebSocket.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/WorkflowWebSocket.java @@ -29,6 +29,7 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; import org.eclipse.jetty.websocket.api.annotations.WebSocket; import org.polypheny.db.workflow.models.requests.WsRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.CloneActivityRequest; import org.polypheny.db.workflow.models.requests.WsRequest.CreateActivityRequest; import org.polypheny.db.workflow.models.requests.WsRequest.CreateEdgeRequest; import org.polypheny.db.workflow.models.requests.WsRequest.DeleteActivityRequest; @@ -77,6 +78,7 @@ public void onMessage( final WsMessageContext ctx ) { case CREATE_ACTIVITY -> session.handleRequest( ctx.messageAsClass( CreateActivityRequest.class ) ); case UPDATE_ACTIVITY -> session.handleRequest( ctx.messageAsClass( UpdateActivityRequest.class ) ); case DELETE_ACTIVITY -> session.handleRequest( ctx.messageAsClass( DeleteActivityRequest.class ) ); + case CLONE_ACTIVITY -> session.handleRequest( ctx.messageAsClass( CloneActivityRequest.class ) ); case CREATE_EDGE -> session.handleRequest( ctx.messageAsClass( CreateEdgeRequest.class ) ); case DELETE_EDGE -> session.handleRequest( ctx.messageAsClass( DeleteEdgeRequest.class ) ); case EXECUTE -> session.handleRequest( ctx.messageAsClass( ExecuteRequest.class ) ); diff --git a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/GlobalSchedulerTest.java b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/GlobalSchedulerTest.java index 3eeeec8948..a6a2739430 100644 --- a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/GlobalSchedulerTest.java +++ b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/GlobalSchedulerTest.java @@ -36,10 +36,12 @@ import org.polypheny.db.workflow.WorkflowUtils; import org.polypheny.db.workflow.dag.Workflow; import org.polypheny.db.workflow.dag.activities.ActivityWrapper.ActivityState; +import org.polypheny.db.workflow.dag.edges.ControlEdge; import org.polypheny.db.workflow.engine.monitoring.ExecutionMonitor; import org.polypheny.db.workflow.engine.storage.StorageManager; import org.polypheny.db.workflow.engine.storage.StorageManagerImpl; import org.polypheny.db.workflow.engine.storage.StorageUtils; +import org.polypheny.db.workflow.models.EdgeModel; class GlobalSchedulerTest { @@ -164,7 +166,7 @@ void advancedFusionTest() throws Exception { @Test - void RelValuesFusionTest() throws Exception { + void relValuesFusionTest() throws Exception { Workflow workflow = WorkflowUtils.getRelValuesFusion(); List ids = WorkflowUtils.getTopologicalActivityIds( workflow ); scheduler.startExecution( workflow, sm, null ); @@ -173,6 +175,31 @@ void RelValuesFusionTest() throws Exception { } + @Test + void executeModifiedFusionTest() throws Exception { + Pair> pair = WorkflowUtils.getAdvancedFusion(); + Workflow workflow = pair.left; + List ids = pair.right; + + scheduler.startExecution( workflow, sm, ids.get( 3 ) ); + scheduler.awaitResultProcessor( 5000 ); + + workflow.deleteEdge( workflow.getOutEdges( ids.get( 3 ) ).get( 0 ).toModel( false ), sm ); + workflow.addEdge( new EdgeModel( ids.get( 0 ), ids.get( 4 ), 0, 0, false, null ), sm ); // activity 0 does not yet have a checkpoint, but we add an edge that requires one + workflow.addEdge( new EdgeModel( ids.get( 3 ), ids.get( 4 ), ControlEdge.SUCCESS_PORT, 0, true, null ), sm ); + + assertEquals( ActivityState.FINISHED, workflow.getActivity( ids.get( 0 ) ).getState() ); + assertEquals( ActivityState.SAVED, workflow.getActivity( ids.get( 3 ) ).getState() ); + assertEquals( ActivityState.IDLE, workflow.getActivity( ids.get( 4 ) ).getState() ); + + scheduler.startExecution( workflow, sm, null ); + scheduler.awaitResultProcessor( 5000 ); + assertEquals( ActivityState.FINISHED, workflow.getActivity( ids.get( 0 ) ).getState() ); // 0 is fused with 4 + assertEquals( ActivityState.SAVED, workflow.getActivity( ids.get( 3 ) ).getState() ); + assertEquals( ActivityState.SAVED, workflow.getActivity( ids.get( 4 ) ).getState() ); + } + + @Test void relExtractTest() throws Exception { Workflow workflow = WorkflowUtils.getExtractWorkflow(); @@ -402,6 +429,7 @@ void executionMonitorTest() throws Exception { } + @Test @Disabled // TODO: delete when no longer required