From addcfa93a9badaf9e19de77e5131a6565bc19f33 Mon Sep 17 00:00:00 2001 From: Lorenzo Biava Date: Thu, 9 Jun 2016 11:49:10 +0200 Subject: [PATCH] Improve Chronos job handling with chuncking Chronos job are created/polled/deleted in chuncks instead of one for each WF task invocation to improve performance (to avoid serialization/deserialization overhead). The `orchestrator.chronos.jobChunkSize` property allows customization of this behavior. See #48 --- .../providers/ChronosServiceImpl.java | 197 +++++++++++------- .../providers/DeploymentStatusHelperImpl.java | 4 +- src/main/resources/application.properties | 4 +- .../extra/TemplateGeneratorHelper.java | 3 +- 4 files changed, 135 insertions(+), 73 deletions(-) diff --git a/src/main/java/it/reply/orchestrator/service/deployment/providers/ChronosServiceImpl.java b/src/main/java/it/reply/orchestrator/service/deployment/providers/ChronosServiceImpl.java index ab22e3b5aa..ff93051799 100644 --- a/src/main/java/it/reply/orchestrator/service/deployment/providers/ChronosServiceImpl.java +++ b/src/main/java/it/reply/orchestrator/service/deployment/providers/ChronosServiceImpl.java @@ -85,6 +85,9 @@ public class ChronosServiceImpl extends AbstractDeploymentProviderService @Value("${onedata.provider}") private String provider; + @Value("${orchestrator.chronos.jobChunkSize}") + private int jobChunkSize; + /** * Temporary method to instantiate a default Chronos client (just for experimental purpose * ). @@ -150,16 +153,24 @@ public boolean doDeploy(DeploymentMessage deploymentMessage) { } // Create Jobs in the required order on Chronos (but 1 at each invocation) - LOG.debug("Launching jobs for deployment <{}> on Chronos", deployment.getId()); - if (!createJobsOnChronosIteratively(deployment, deploymentMessage.getChronosJobGraph(), - deploymentMessage.getTemplateTopologicalOrderIterator(), getChronosClient())) { + LOG.debug("Launching <{}> jobs for deployment <{}> on Chronos", jobChunkSize, + deployment.getId()); + boolean noMoreJob = false; + Chronos client = getChronosClient(); + for (int i = 0; i < jobChunkSize && !noMoreJob; i++) { + noMoreJob = + !createJobsOnChronosIteratively(deployment, deploymentMessage.getChronosJobGraph(), + deploymentMessage.getTemplateTopologicalOrderIterator(), client); + } + + if (noMoreJob) { // No more jobs to create deploymentMessage.setCreateComplete(true); // Start over with the polling check deploymentMessage.getTemplateTopologicalOrderIterator().reset(); - return true; } + // No error occurred return true; } catch (RuntimeException exception) { // Chronos job launch error // TODO use a custom exception ? @@ -288,65 +299,35 @@ public boolean isDeployed(DeploymentMessage deploymentMessage) throws Deployment TemplateTopologicalOrderIterator templateTopologicalOrderIterator = deploymentMessage.getTemplateTopologicalOrderIterator(); - Resource currentNode = templateTopologicalOrderIterator.getCurrent(); - // No nodes left -> all task succeeded -> deployment is ready - if (currentNode == null) { - return true; - } - - IndigoJob job = deploymentMessage.getChronosJobGraph().get(currentNode.getToscaNodeName()); - - String jobName = job.getChronosJob().getName(); - Job updatedJob = getJobStatus(client, jobName); - if (updatedJob == null) { - String errorMsg = String.format( - "Failed to deploy deployment <%s>. Chronos job <%s> (id: <%s>) does not exist", - deployment.getId(), job.getToscaNodeName(), jobName); - LOG.error(errorMsg); - // Update job status - updateResource(deployment, job, NodeStates.ERROR); - throw new DeploymentException(errorMsg); - } - - JobState jobState = getLastState(updatedJob); - LOG.debug("Status for Chronos job <{}> is <{}>", jobName, jobState); - - // Go ahead only if the job succeeded - if (jobState != JobState.SUCCESS) { - if (jobState != JobState.FAILURE) { + LOG.debug("Polling <{}> jobs for deployment <{}> on Chronos", jobChunkSize, + deployment.getId()); + boolean noMoreJob = templateTopologicalOrderIterator.getCurrent() == null; + for (int i = 0; i < jobChunkSize && !noMoreJob; i++) { + boolean jobCompleted = + checkJobsOnChronosIteratively(deployment, deploymentMessage.getChronosJobGraph(), + deploymentMessage.getTemplateTopologicalOrderIterator(), client); + if (!jobCompleted) { // Job still in progress - LOG.debug("Polling again job <{}> for deployment <{}>", jobName, deployment.getId()); // Wait before retrying to poll on the same node deploymentMessage.setSkipPollInterval(false); return false; - } else { - // Job failed -> Deployment failed! - String errorMsg = String.format( - "Failed to deploy deployment <%s>. Chronos job <%s> (id: <%s>) status is <%s>", - deployment.getId(), job.getToscaNodeName(), jobName, jobState); - LOG.error(errorMsg); - // Update job status - updateResource(deployment, job, NodeStates.ERROR); - currentNode.setState(NodeStates.ERROR); - throw new DeploymentException(errorMsg); } - } else { - // Update job status - updateResource(deployment, job, NodeStates.STARTED); - currentNode.setState(NodeStates.STARTED); - if (templateTopologicalOrderIterator.getNext() != null) { - // Poll the following node next time - Disable poll interval - LOG.debug("Polling next job for deployment <{}>", deployment.getId()); - deploymentMessage.setSkipPollInterval(true); - return false; - } else { - // No more jobs - LOG.debug("Polling complete for deployment <{}>", deployment.getId()); - deploymentMessage.setPollComplete(true); - return true; - } + noMoreJob = templateTopologicalOrderIterator.getNext() == null; } + + if (noMoreJob) { + // No more jobs + LOG.debug("Polling complete for deployment <{}>", deployment.getId()); + deploymentMessage.setPollComplete(true); + return true; + } else { + // Poll the following node next time - Disable poll interval + LOG.debug("Polling next job for deployment <{}>", deployment.getId()); + deploymentMessage.setSkipPollInterval(true); + return false; + } + } catch (DeploymentException dex) { // Deploy failed; let caller know (as for the method definition) updateOnError(deployment.getId(), dex); @@ -362,6 +343,68 @@ public boolean isDeployed(DeploymentMessage deploymentMessage) throws Deployment } + /** + * + * @param deployment + * @param jobgraph + * @param templateTopologicalOrderIterator + * @param client + * @return true if the currently checked node is ready, false if still in + * progress. + * @throws DeploymentException + * if the currently node failed. + */ + protected boolean checkJobsOnChronosIteratively(Deployment deployment, + Map jobgraph, + TemplateTopologicalOrderIterator templateTopologicalOrderIterator, Chronos client) + throws DeploymentException { + + // Get current job + Resource currentNode = templateTopologicalOrderIterator.getCurrent(); + IndigoJob job = jobgraph.get(currentNode.getToscaNodeName()); + + String jobName = job.getChronosJob().getName(); + Job updatedJob = getJobStatus(client, jobName); + if (updatedJob == null) { + String errorMsg = String.format( + "Failed to deploy deployment <%s>. Chronos job <%s> (id: <%s>) does not exist", + deployment.getId(), job.getToscaNodeName(), jobName); + LOG.error(errorMsg); + // Update job status + updateResource(deployment, job, NodeStates.ERROR); + throw new DeploymentException(errorMsg); + } + + JobState jobState = getLastState(updatedJob); + LOG.debug("Status for Chronos job <{}> is <{}> ({}/{})", jobName, jobState, + templateTopologicalOrderIterator.getPosition() + 1, + templateTopologicalOrderIterator.getNodeSize()); + + // Go ahead only if the job succeeded + if (jobState != JobState.SUCCESS) { + if (jobState != JobState.FAILURE) { + // Job still in progress + LOG.debug("Polling again job <{}> for deployment <{}>", jobName, deployment.getId()); + return false; + } else { + // Job failed -> Deployment failed! + String errorMsg = String.format( + "Failed to deploy deployment <%s>. Chronos job <%s> (id: <%s>) status is <%s>", + deployment.getId(), job.getToscaNodeName(), jobName, jobState); + LOG.error(errorMsg); + // Update job status + updateResource(deployment, job, NodeStates.ERROR); + currentNode.setState(NodeStates.ERROR); + throw new DeploymentException(errorMsg); + } + } else { + // Job finished -> Update job status + updateResource(deployment, job, NodeStates.STARTED); + currentNode.setState(NodeStates.STARTED); + return true; + } + } + private void updateResource(Deployment deployment, IndigoJob job, NodeStates state) { // Find the Resource from DB @@ -484,21 +527,28 @@ public boolean doUndeploy(DeploymentMessage deploymentMessage) { TemplateTopologicalOrderIterator templateTopologicalOrderIterator = deploymentMessage.getTemplateTopologicalOrderIterator(); - // Create Jobs in the required order on Chronos - LOG.debug("Deleting jobs for deployment <{}> on Chronos", deployment.getId()); + // Delete Jobs + LOG.debug("Deleting <{}> jobs for deployment <{}> on Chronos", jobChunkSize, + deployment.getId()); + Chronos client = getChronosClient(); + boolean noMoreJob = templateTopologicalOrderIterator.getCurrent() == null; + for (int i = 0; i < jobChunkSize && !noMoreJob; i++) { + deleteJobsOnChronosIteratively(deployment, deploymentMessage.getChronosJobGraph(), + templateTopologicalOrderIterator, client, true); - deleteJobsOnChronosIteratively(deployment, deploymentMessage.getChronosJobGraph(), - templateTopologicalOrderIterator, getChronosClient(), true); + noMoreJob = templateTopologicalOrderIterator.getNext() == null; + } - if (templateTopologicalOrderIterator.getNext() != null) { - // Delete the following node - LOG.debug("Deleting next node for deployment <{}>", deployment.getId()); - } else { + if (noMoreJob) { // No more nodes LOG.debug("All nodes deleted for deployment <{}>", deployment.getId()); deploymentMessage.setDeleteComplete(true); + } else { + // Delete the following node + LOG.debug("Deleting next node for deployment <{}>", deployment.getId()); } + // No error occurred return true; } catch (RuntimeException exception) { // Chronos job launch error // TODO use a custom exception ? @@ -548,10 +598,14 @@ protected boolean deleteJobsOnChronosIteratively(Deployment deployment, // Chronos API hack (to avoid error 400 if the job to delete does not exist) if (getJobStatus(client, jobName) == null) { - LOG.debug("Job on Chronos does not exist: name <{}>", jobName); + LOG.debug("Deleted Chronos job <{}> - did not exist ({}/{})", jobName, + templateTopologicalOrderIterator.getPosition() + 1, + templateTopologicalOrderIterator.getNodeSize()); } else { client.deleteJob(jobName); - LOG.debug("Deleted job on Chronos: name <{}>", jobName); + LOG.debug("Deleted Chronos job <{}> ({}/{})", jobName, + templateTopologicalOrderIterator.getPosition() + 1, + templateTopologicalOrderIterator.getNodeSize()); } } catch (ChronosException ce) { // Just log the error @@ -650,12 +704,16 @@ protected Map generateJobGraph(Deployment deployment, OneData // TODO Iterate on Chronos nodes and related dependencies (just ignore others - also if invalid // - for now) + // Populate resources (nodes) hashmap to speed up job creation (id-name mapping is needed) + Map resources = deployment.getResources().stream() + .collect(Collectors.toMap(e -> e.getToscaNodeName(), e -> e)); + // Only create Indigo Jobs for (Map.Entry node : nodes.entrySet()) { NodeTemplate nodeTemplate = node.getValue(); String nodeName = node.getKey(); if (isChronosNode(nodeTemplate)) { - Job chronosJob = createJob(nodes, deploymentId, nodeName, nodeTemplate); + Job chronosJob = createJob(nodes, deploymentId, nodeName, nodeTemplate, resources); IndigoJob job = new IndigoJob(nodeName, chronosJob); jobs.put(nodeName, job); } @@ -777,15 +835,14 @@ protected List getJobParents(NodeTemplate nodeTemplate, String nodeName, } protected Job createJob(Map nodes, String deploymentId, String nodeName, - NodeTemplate nodeTemplate) { + NodeTemplate nodeTemplate, Map resources) { try { Job chronosJob = new Job(); // Init job infos // Get the generated UUID for the node (in DB resource ?) // FIXME This is just for prototyping... Otherwise is madness!! - Resource resourceJob = - resourceRepository.findByToscaNodeNameAndDeployment_id(nodeName, deploymentId); + Resource resourceJob = resources.get(nodeName); chronosJob.setName(resourceJob.getId()); diff --git a/src/main/java/it/reply/orchestrator/service/deployment/providers/DeploymentStatusHelperImpl.java b/src/main/java/it/reply/orchestrator/service/deployment/providers/DeploymentStatusHelperImpl.java index cb9a606636..bc74feec57 100644 --- a/src/main/java/it/reply/orchestrator/service/deployment/providers/DeploymentStatusHelperImpl.java +++ b/src/main/java/it/reply/orchestrator/service/deployment/providers/DeploymentStatusHelperImpl.java @@ -50,7 +50,9 @@ public void updateOnError(String deploymentUuid, String message) { break; } deployment.setTask(Task.NONE); - deployment.setStatusReason(message); + if (message != null) { + deployment.setStatusReason(message); + } deploymentRepository.save(deployment); } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index b27a305cbc..125a309b58 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -10,4 +10,6 @@ tosca.definitions.normative=normative-types.yml tosca.definitions.indigo=custom_types.yaml # TEMPORARY - Chronos auth file path (default value to read the files as classpath resource) -chronos.auth.file.path=classpath:chronos/chronos.properties \ No newline at end of file +chronos.auth.file.path=classpath:chronos/chronos.properties + +orchestrator.chronos.jobChunkSize=100 \ No newline at end of file diff --git a/src/test/java/it/reply/orchestrator/extra/TemplateGeneratorHelper.java b/src/test/java/it/reply/orchestrator/extra/TemplateGeneratorHelper.java index d6ce7ccfde..f1f46a2fb9 100644 --- a/src/test/java/it/reply/orchestrator/extra/TemplateGeneratorHelper.java +++ b/src/test/java/it/reply/orchestrator/extra/TemplateGeneratorHelper.java @@ -12,7 +12,8 @@ public class TemplateGeneratorHelper { public static void main(String args[]) throws FileException { - System.out.println(generateChronosTemplate(1000)); + System.out.println( + generateChronosTemplate(10000).replace(System.getProperty("line.separator"), "\\n")); } public static String generateChronosTemplate(int nodeNumber) throws FileException {