Skip to content

Commit

Permalink
Various small improvements / fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Jan 1, 2025
1 parent 6470060 commit 260089c
Show file tree
Hide file tree
Showing 28 changed files with 182 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public class WorkflowManager {

private final SessionManager sessionManager;
private final WorkflowRepo repo;
public final String PATH = "/workflows";
public static final String PATH = "/workflows";
public static final String DEFAULT_CHECKPOINT_ADAPTER = "hsqldb_disk";
private final ObjectMapper mapper = new ObjectMapper();


Expand Down Expand Up @@ -154,19 +155,20 @@ private void createExecuteDummyWorkflowTest() {


private void addSampleWorkflows() {
if ( PolyphenyDb.mode == RunMode.TEST) {
if ( PolyphenyDb.mode == RunMode.TEST ) {
return;
}
URL workflowDir = this.getClass().getClassLoader().getResource( "workflows/" );
File[] files = Sources.of( workflowDir )
.file()
.listFiles( ( d, name ) -> name.endsWith( ".json" ) );
if (files == null) {
if ( files == null ) {
return;
}

for (File file : files) {
for ( File file : files ) {
String fileName = file.getName();
fileName = fileName.substring( 0, fileName.length() - ".json".length() );
try {
WorkflowModel workflow = mapper.readValue( file, WorkflowModel.class );
UUID wId = repo.createWorkflow( fileName );
Expand All @@ -179,7 +181,7 @@ private void addSampleWorkflows() {


private void registerAdapter() {
if ( PolyphenyDb.mode == RunMode.TEST || Catalog.getInstance().getAdapters().values().stream().anyMatch( a -> a.adapterName.equals( "hsqldb_disk" ) ) ) {
if ( PolyphenyDb.mode == RunMode.TEST || Catalog.getInstance().getAdapters().values().stream().anyMatch( a -> a.adapterName.equals( DEFAULT_CHECKPOINT_ADAPTER ) ) ) {
return;
}

Expand All @@ -189,7 +191,7 @@ private void registerAdapter() {
settings.put( "type", "File" );
settings.put( "tableType", "Cached" );

DdlManager.getInstance().createStore( "hsqldb_disk", storeTemplate.getAdapterName(), AdapterType.STORE, settings, storeTemplate.getDefaultMode() );
DdlManager.getInstance().createStore( DEFAULT_CHECKPOINT_ADAPTER, storeTemplate.getAdapterName(), AdapterType.STORE, settings, storeTemplate.getDefaultMode() );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,21 @@

public interface ReadableVariableStore {

String VARIABLE_REF_FIELD = "_variableRef";
/**
* A variable reference is given as an object with a VARIABLE_REF_FIELD key and the reference as a string value.
* A reference corresponds to a JsonPointer (https://datatracker.ietf.org/doc/html/rfc6901).
* Note that "/" and "~" in variable names need to be escaped as ~1 and ~0 respectively.
* The leading "/" is optional, as the first referenced object must always correspond to a variable name.
* Examples of valid references:
* <ul>
* <li>{@code "myVariable"}</li>
* <li>{@code "/myVariable"}</li>
* <li>{@code "myVariable/names"}</li>
* <li>{@code "myVariable/names/0"}</li>
* <li>{@code "my~1escaped~0variable~1example"}</li>
* </ul>
*/
String VARIABLE_REF_FIELD = "$ref";

boolean contains( String key );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.polypheny.db.workflow.dag.variables;

import com.fasterxml.jackson.core.JsonPointer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
Expand Down Expand Up @@ -150,8 +151,16 @@ public JsonNode resolveVariables( JsonNode node ) {
if ( node.isObject() ) {
ObjectNode objectNode = (ObjectNode) node;
if ( objectNode.size() == 1 && objectNode.has( VARIABLE_REF_FIELD ) ) {
String variableRef = objectNode.get( VARIABLE_REF_FIELD ).asText();
String refString = objectNode.get( VARIABLE_REF_FIELD ).asText();
if ( refString.startsWith( "/" ) ) {
refString = refString.substring( 1 );
}
String[] refSplit = refString.split( "/", 2 );
String variableRef = refSplit[0].replace( JsonPointer.ESC_SLASH, "/" ).replace( JsonPointer.ESC_TILDE, "~" );
JsonNode replacement = variables.get( variableRef );
if ( refSplit.length == 2 && !refSplit[1].isEmpty() ) {
replacement = replacement.at( "/" + refSplit[1] ); // resolve JsonPointer
}

// Replace the entire object with the value from the map, if it exists
if ( replacement == null ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
import java.util.function.Consumer;
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;
import org.polypheny.db.workflow.dag.Workflow;
import org.polypheny.db.workflow.engine.execution.Executor.ExecutorType;
import org.polypheny.db.workflow.models.responses.WsResponse;
import org.polypheny.db.workflow.models.responses.WsResponse.ProgressUpdateResponse;
import org.polypheny.db.workflow.models.responses.WsResponse.StateUpdateResponse;

@Slf4j
public class ExecutionMonitor {

private static final int UPDATE_PROGRESS_DELAY = 2000;
Expand Down Expand Up @@ -69,9 +71,9 @@ private void startPeriodicUpdates() {
try {
callback.accept( new ProgressUpdateResponse( null, getAllProgress() ) );
} catch ( Exception e ) {
e.printStackTrace(); // Log any exceptions
log.error( "An error occurred while sending a workflow progress update", e );
}
}, 500, UPDATE_PROGRESS_DELAY, TimeUnit.MILLISECONDS );
}, 0, UPDATE_PROGRESS_DELAY, TimeUnit.MILLISECONDS );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import lombok.extern.slf4j.Slf4j;
import org.polypheny.db.catalog.exceptions.GenericRuntimeException;
import org.polypheny.db.workflow.dag.Workflow;
import org.polypheny.db.workflow.dag.Workflow.WorkflowState;
import org.polypheny.db.workflow.engine.execution.Executor.ExecutorException;
import org.polypheny.db.workflow.engine.monitoring.ExecutionInfo;
import org.polypheny.db.workflow.engine.monitoring.ExecutionInfo.ExecutionState;
Expand Down Expand Up @@ -85,7 +86,14 @@ public synchronized ExecutionMonitor startExecution( Workflow workflow, StorageM
}
interruptedSessions.remove( sessionId );
ExecutionMonitor monitor = new ExecutionMonitor( workflow, targetActivity, monitoringCallback );
WorkflowScheduler scheduler = new WorkflowScheduler( workflow, sm, monitor, GLOBAL_WORKERS, targetActivity );
WorkflowScheduler scheduler;
try {
scheduler = new WorkflowScheduler( workflow, sm, monitor, GLOBAL_WORKERS, targetActivity );
} catch ( Exception e ) {
monitor.stop();
workflow.setState( WorkflowState.IDLE );
throw e;
}
List<ExecutionSubmission> submissions = scheduler.startExecution();
if ( submissions.isEmpty() ) {
throw new GenericRuntimeException( "At least one activity needs to be executable when submitting a workflow for execution" );
Expand Down Expand Up @@ -145,6 +153,7 @@ private void submit( List<ExecutionSubmission> submissions ) {
UUID sessionId = submission.getSessionId();

completionService.submit( () -> {
log.info( "Begin actual execution {}", submission );
submission.getInfo().setState( ExecutionState.EXECUTING );
if ( interruptedSessions.contains( sessionId ) ) {
return new ExecutionResult( submission, new ExecutorException( "Execution was interrupted before it started" ) );
Expand Down Expand Up @@ -179,6 +188,7 @@ private Thread startResultProcessor() {
ExecutionInfo info = null;
try {
ExecutionResult result = completionService.take().get();
log.info( "processing next result: " + result );
info = result.getInfo();
info.setState( ExecutionState.PROCESSING_RESULT );
WorkflowScheduler scheduler = schedulers.get( result.getSessionId() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public class WorkflowScheduler {

public WorkflowScheduler( Workflow workflow, StorageManager sm, ExecutionMonitor monitor, int globalWorkers, @Nullable UUID targetActivity ) throws Exception {
log.info( "Instantiating WorkflowScheduler with target: {}", targetActivity );
workflow.setState( WorkflowState.EXECUTING );
this.workflow = workflow;
this.sm = sm;
this.executionMonitor = monitor;
Expand All @@ -96,14 +95,16 @@ public WorkflowScheduler( Workflow workflow, StorageManager sm, ExecutionMonitor
workflow.validateStructure( sm, this.execDag );
log.info( "Structure is valid" );

workflow.setState( WorkflowState.EXECUTING );
this.optimizer = new WorkflowOptimizerImpl( workflow, execDag );

}


public List<ExecutionSubmission> startExecution() {
List<ExecutionSubmission> submissions = computeNextSubmissions();
executionMonitor.forwardStates();
return computeNextSubmissions();
return submissions;
}


Expand Down Expand Up @@ -147,6 +148,7 @@ private AttributedDirectedGraph<UUID, ExecutionEdge> prepareExecutionDag() throw


private AttributedDirectedGraph<UUID, ExecutionEdge> prepareExecutionDag( List<UUID> targets ) throws Exception {
System.out.println( "targets: " + targets );
if ( targets.isEmpty() ) {
throw new GenericRuntimeException( "Cannot prepare executionDag for empty targets" );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
@Value
public class RenderModel {

int posX;
int posY;
double posX;
double posY;
String name; // the display name of an element
String notes; // add additional notes to an element

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Map;
import lombok.Value;
import org.polypheny.db.catalog.logistic.DataModel;
import org.polypheny.db.workflow.WorkflowManager;


@Value
Expand All @@ -34,7 +35,9 @@ public class WorkflowConfigModel {

public static WorkflowConfigModel of() {
return new WorkflowConfigModel(
Map.of( DataModel.RELATIONAL, "hsqldb", DataModel.DOCUMENT, "hsqldb", DataModel.GRAPH, "hsqldb" ),
Map.of( DataModel.RELATIONAL, WorkflowManager.DEFAULT_CHECKPOINT_ADAPTER,
DataModel.DOCUMENT, WorkflowManager.DEFAULT_CHECKPOINT_ADAPTER,
DataModel.GRAPH, WorkflowManager.DEFAULT_CHECKPOINT_ADAPTER ),
true,
true,
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public enum RequestType {
EXECUTE,
INTERRUPT,
RESET,
UPDATE_CONFIG // workflow config
UPDATE_CONFIG, // workflow config
KEEPALIVE
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.polypheny.db.workflow.models.responses;

import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.ACTIVITY_UPDATE;
import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.ERROR;
import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.PROGRESS_UPDATE;
import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.RENDERING_UPDATE;
import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.STATE_UPDATE;
Expand All @@ -35,6 +36,7 @@
import org.polypheny.db.workflow.models.EdgeModel;
import org.polypheny.db.workflow.models.RenderModel;
import org.polypheny.db.workflow.models.WorkflowModel;
import org.polypheny.db.workflow.models.requests.WsRequest.RequestType;

/**
* The structure of workflows is modified with requests sent over the websocket.
Expand All @@ -59,6 +61,7 @@ public enum ResponseType {
RENDERING_UPDATE, // only renderModel of an activity
STATE_UPDATE, // all edge and activity states
PROGRESS_UPDATE,
ERROR
}


Expand Down Expand Up @@ -135,4 +138,22 @@ public ProgressUpdateResponse( @Nullable UUID parentId, Map<UUID, Double> progre

}


public static class ErrorResponse extends WsResponse {

public final String reason;
public final String cause; // null if there is no cause
public final RequestType parentType;


public ErrorResponse( @Nullable UUID parentId, Throwable error, RequestType parentType ) {
super( ERROR, parentId );
this.reason = error.getMessage();
Throwable cause = error.getCause();
this.cause = cause == null ? null : cause.getMessage();
this.parentType = parentType;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.polypheny.db.workflow.models.requests.WsRequest.UpdateActivityRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.UpdateConfigRequest;
import org.polypheny.db.workflow.models.responses.WsResponse;
import org.polypheny.db.workflow.models.responses.WsResponse.ResponseType;

@Slf4j
public abstract class AbstractSession {
Expand Down Expand Up @@ -88,6 +89,7 @@ public void unsubscribe( Session session ) {
subscribers.remove( session );
}


public int getSubscriberCount() {
return subscribers.size();
}
Expand All @@ -114,7 +116,9 @@ public WorkflowConfigModel getWorkflowConfig() {
void broadcastMessage( WsResponse msg ) {
try {
String json = mapper.writeValueAsString( msg );
log.info( "Broadcasting message: " + json );
if ( msg.type != ResponseType.PROGRESS_UPDATE ) {
log.info( "Broadcasting message: " + json );
}
for ( Session subscriber : subscribers ) {
try {
subscriber.getRemote().sendString( json );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
Expand All @@ -34,10 +35,13 @@
import org.polypheny.db.workflow.models.requests.WsRequest.DeleteEdgeRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.ExecuteRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.InterruptRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.RequestType;
import org.polypheny.db.workflow.models.requests.WsRequest.ResetRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.UpdateActivityRequest;
import org.polypheny.db.workflow.models.requests.WsRequest.UpdateConfigRequest;
import org.polypheny.db.workflow.models.responses.WsResponse.ErrorResponse;

@Slf4j
@WebSocket
public class WorkflowWebSocket implements Consumer<WsConfig> {

Expand All @@ -62,24 +66,34 @@ public void connected( final WsConnectContext ctx ) {
public void onMessage( final WsMessageContext ctx ) {
AbstractSession session = sessions.get( ctx.session );
WsRequest baseRequest = ctx.messageAsClass( WsRequest.class );
System.out.println( "Received message with id: " + baseRequest.msgId );
try {
if ( baseRequest.type != RequestType.KEEPALIVE ) {
System.out.println( "Received message with id: " + baseRequest.msgId );
}

switch ( baseRequest.type ) {
case CREATE_ACTIVITY -> session.handleRequest( ctx.messageAsClass( CreateActivityRequest.class ) );
case UPDATE_ACTIVITY -> session.handleRequest( ctx.messageAsClass( UpdateActivityRequest.class ) );
case DELETE_ACTIVITY -> session.handleRequest( ctx.messageAsClass( DeleteActivityRequest.class ) );
case CREATE_EDGE -> session.handleRequest( ctx.messageAsClass( CreateEdgeRequest.class ) );
case DELETE_EDGE -> session.handleRequest( ctx.messageAsClass( DeleteEdgeRequest.class ) );
case EXECUTE -> session.handleRequest( ctx.messageAsClass( ExecuteRequest.class ) );
case INTERRUPT -> session.handleRequest( ctx.messageAsClass( InterruptRequest.class ) );
case RESET -> session.handleRequest( ctx.messageAsClass( ResetRequest.class ) );
case UPDATE_CONFIG -> session.handleRequest( ctx.messageAsClass( UpdateConfigRequest.class ) );
default -> throw new IllegalArgumentException( "Received request with unknown type!" );
switch ( baseRequest.type ) {
case KEEPALIVE -> {
}
case CREATE_ACTIVITY -> session.handleRequest( ctx.messageAsClass( CreateActivityRequest.class ) );
case UPDATE_ACTIVITY -> session.handleRequest( ctx.messageAsClass( UpdateActivityRequest.class ) );
case DELETE_ACTIVITY -> session.handleRequest( ctx.messageAsClass( DeleteActivityRequest.class ) );
case CREATE_EDGE -> session.handleRequest( ctx.messageAsClass( CreateEdgeRequest.class ) );
case DELETE_EDGE -> session.handleRequest( ctx.messageAsClass( DeleteEdgeRequest.class ) );
case EXECUTE -> session.handleRequest( ctx.messageAsClass( ExecuteRequest.class ) );
case INTERRUPT -> session.handleRequest( ctx.messageAsClass( InterruptRequest.class ) );
case RESET -> session.handleRequest( ctx.messageAsClass( ResetRequest.class ) );
case UPDATE_CONFIG -> session.handleRequest( ctx.messageAsClass( UpdateConfigRequest.class ) );
default -> throw new IllegalArgumentException( "Received request with unknown type!" );
}
} catch ( Exception e ) {
// catch any exception that has no specific error handling already
session.broadcastMessage( new ErrorResponse( baseRequest.msgId, e, baseRequest.type ) );
}
}


public void closed( WsCloseContext ctx ) {
System.out.println( "closed websocket: " + ctx.reason() );
sessions.get( ctx.session ).unsubscribe( ctx.session );
sessions.remove( ctx.session );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@
],
"config": {
"preferredStores": {
"DOCUMENT": "hsqldb",
"GRAPH": "hsqldb",
"RELATIONAL": "hsqldb"
"DOCUMENT": "hsqldb_disk",
"GRAPH": "hsqldb_disk",
"RELATIONAL": "hsqldb_disk"
},
"fusionEnabled": true,
"pipelineEnabled": false,
Expand Down
Loading

0 comments on commit 260089c

Please sign in to comment.