Skip to content

Commit

Permalink
Improve Chronos job handling with chuncking
Browse files Browse the repository at this point in the history
Chronos job are created/polled/deleted in chuncks instead of one for
each WF task invocation to improve performance (to avoid
serialization/deserialization overhead).

The `orchestrator.chronos.jobChunkSize` property allows customization of
this behavior.

See #48
  • Loading branch information
lorenzo-biava committed Jun 9, 2016
1 parent 6f04479 commit addcfa9
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public class ChronosServiceImpl extends AbstractDeploymentProviderService
@Value("${onedata.provider}")
private String provider;

@Value("${orchestrator.chronos.jobChunkSize}")
private int jobChunkSize;

/**
* Temporary method to instantiate a default Chronos client (<b>just for experimental purpose</b>
* ).
Expand Down Expand Up @@ -150,16 +153,24 @@ public boolean doDeploy(DeploymentMessage deploymentMessage) {
}

// Create Jobs in the required order on Chronos (but 1 at each invocation)
LOG.debug("Launching jobs for deployment <{}> on Chronos", deployment.getId());
if (!createJobsOnChronosIteratively(deployment, deploymentMessage.getChronosJobGraph(),
deploymentMessage.getTemplateTopologicalOrderIterator(), getChronosClient())) {
LOG.debug("Launching <{}> jobs for deployment <{}> on Chronos", jobChunkSize,
deployment.getId());
boolean noMoreJob = false;
Chronos client = getChronosClient();
for (int i = 0; i < jobChunkSize && !noMoreJob; i++) {
noMoreJob =
!createJobsOnChronosIteratively(deployment, deploymentMessage.getChronosJobGraph(),
deploymentMessage.getTemplateTopologicalOrderIterator(), client);
}

if (noMoreJob) {
// No more jobs to create
deploymentMessage.setCreateComplete(true);
// Start over with the polling check
deploymentMessage.getTemplateTopologicalOrderIterator().reset();
return true;
}

// No error occurred
return true;
} catch (RuntimeException exception) { // Chronos job launch error
// TODO use a custom exception ?
Expand Down Expand Up @@ -288,65 +299,35 @@ public boolean isDeployed(DeploymentMessage deploymentMessage) throws Deployment
TemplateTopologicalOrderIterator templateTopologicalOrderIterator =
deploymentMessage.getTemplateTopologicalOrderIterator();

Resource currentNode = templateTopologicalOrderIterator.getCurrent();
// No nodes left -> all task succeeded -> deployment is ready
if (currentNode == null) {
return true;
}

IndigoJob job = deploymentMessage.getChronosJobGraph().get(currentNode.getToscaNodeName());

String jobName = job.getChronosJob().getName();
Job updatedJob = getJobStatus(client, jobName);
if (updatedJob == null) {
String errorMsg = String.format(
"Failed to deploy deployment <%s>. Chronos job <%s> (id: <%s>) does not exist",
deployment.getId(), job.getToscaNodeName(), jobName);
LOG.error(errorMsg);
// Update job status
updateResource(deployment, job, NodeStates.ERROR);
throw new DeploymentException(errorMsg);
}

JobState jobState = getLastState(updatedJob);
LOG.debug("Status for Chronos job <{}> is <{}>", jobName, jobState);

// Go ahead only if the job succeeded
if (jobState != JobState.SUCCESS) {
if (jobState != JobState.FAILURE) {
LOG.debug("Polling <{}> jobs for deployment <{}> on Chronos", jobChunkSize,
deployment.getId());
boolean noMoreJob = templateTopologicalOrderIterator.getCurrent() == null;
for (int i = 0; i < jobChunkSize && !noMoreJob; i++) {
boolean jobCompleted =
checkJobsOnChronosIteratively(deployment, deploymentMessage.getChronosJobGraph(),
deploymentMessage.getTemplateTopologicalOrderIterator(), client);
if (!jobCompleted) {
// Job still in progress
LOG.debug("Polling again job <{}> for deployment <{}>", jobName, deployment.getId());
// Wait before retrying to poll on the same node
deploymentMessage.setSkipPollInterval(false);
return false;
} else {
// Job failed -> Deployment failed!
String errorMsg = String.format(
"Failed to deploy deployment <%s>. Chronos job <%s> (id: <%s>) status is <%s>",
deployment.getId(), job.getToscaNodeName(), jobName, jobState);
LOG.error(errorMsg);
// Update job status
updateResource(deployment, job, NodeStates.ERROR);
currentNode.setState(NodeStates.ERROR);
throw new DeploymentException(errorMsg);
}
} else {
// Update job status
updateResource(deployment, job, NodeStates.STARTED);
currentNode.setState(NodeStates.STARTED);

if (templateTopologicalOrderIterator.getNext() != null) {
// Poll the following node next time - Disable poll interval
LOG.debug("Polling next job for deployment <{}>", deployment.getId());
deploymentMessage.setSkipPollInterval(true);
return false;
} else {
// No more jobs
LOG.debug("Polling complete for deployment <{}>", deployment.getId());
deploymentMessage.setPollComplete(true);
return true;
}
noMoreJob = templateTopologicalOrderIterator.getNext() == null;
}

if (noMoreJob) {
// No more jobs
LOG.debug("Polling complete for deployment <{}>", deployment.getId());
deploymentMessage.setPollComplete(true);
return true;
} else {
// Poll the following node next time - Disable poll interval
LOG.debug("Polling next job for deployment <{}>", deployment.getId());
deploymentMessage.setSkipPollInterval(true);
return false;
}

} catch (DeploymentException dex) {
// Deploy failed; let caller know (as for the method definition)
updateOnError(deployment.getId(), dex);
Expand All @@ -362,6 +343,68 @@ public boolean isDeployed(DeploymentMessage deploymentMessage) throws Deployment

}

/**
*
* @param deployment
* @param jobgraph
* @param templateTopologicalOrderIterator
* @param client
* @return <tt>true</tt> if the currently checked node is ready, <tt>false</tt> if still in
* progress.
* @throws DeploymentException
* if the currently node failed.
*/
protected boolean checkJobsOnChronosIteratively(Deployment deployment,
Map<String, IndigoJob> jobgraph,
TemplateTopologicalOrderIterator templateTopologicalOrderIterator, Chronos client)
throws DeploymentException {

// Get current job
Resource currentNode = templateTopologicalOrderIterator.getCurrent();
IndigoJob job = jobgraph.get(currentNode.getToscaNodeName());

String jobName = job.getChronosJob().getName();
Job updatedJob = getJobStatus(client, jobName);
if (updatedJob == null) {
String errorMsg = String.format(
"Failed to deploy deployment <%s>. Chronos job <%s> (id: <%s>) does not exist",
deployment.getId(), job.getToscaNodeName(), jobName);
LOG.error(errorMsg);
// Update job status
updateResource(deployment, job, NodeStates.ERROR);
throw new DeploymentException(errorMsg);
}

JobState jobState = getLastState(updatedJob);
LOG.debug("Status for Chronos job <{}> is <{}> ({}/{})", jobName, jobState,
templateTopologicalOrderIterator.getPosition() + 1,
templateTopologicalOrderIterator.getNodeSize());

// Go ahead only if the job succeeded
if (jobState != JobState.SUCCESS) {
if (jobState != JobState.FAILURE) {
// Job still in progress
LOG.debug("Polling again job <{}> for deployment <{}>", jobName, deployment.getId());
return false;
} else {
// Job failed -> Deployment failed!
String errorMsg = String.format(
"Failed to deploy deployment <%s>. Chronos job <%s> (id: <%s>) status is <%s>",
deployment.getId(), job.getToscaNodeName(), jobName, jobState);
LOG.error(errorMsg);
// Update job status
updateResource(deployment, job, NodeStates.ERROR);
currentNode.setState(NodeStates.ERROR);
throw new DeploymentException(errorMsg);
}
} else {
// Job finished -> Update job status
updateResource(deployment, job, NodeStates.STARTED);
currentNode.setState(NodeStates.STARTED);
return true;
}
}

private void updateResource(Deployment deployment, IndigoJob job, NodeStates state) {

// Find the Resource from DB
Expand Down Expand Up @@ -484,21 +527,28 @@ public boolean doUndeploy(DeploymentMessage deploymentMessage) {
TemplateTopologicalOrderIterator templateTopologicalOrderIterator =
deploymentMessage.getTemplateTopologicalOrderIterator();

// Create Jobs in the required order on Chronos
LOG.debug("Deleting jobs for deployment <{}> on Chronos", deployment.getId());
// Delete Jobs
LOG.debug("Deleting <{}> jobs for deployment <{}> on Chronos", jobChunkSize,
deployment.getId());
Chronos client = getChronosClient();
boolean noMoreJob = templateTopologicalOrderIterator.getCurrent() == null;
for (int i = 0; i < jobChunkSize && !noMoreJob; i++) {
deleteJobsOnChronosIteratively(deployment, deploymentMessage.getChronosJobGraph(),
templateTopologicalOrderIterator, client, true);

deleteJobsOnChronosIteratively(deployment, deploymentMessage.getChronosJobGraph(),
templateTopologicalOrderIterator, getChronosClient(), true);
noMoreJob = templateTopologicalOrderIterator.getNext() == null;
}

if (templateTopologicalOrderIterator.getNext() != null) {
// Delete the following node
LOG.debug("Deleting next node for deployment <{}>", deployment.getId());
} else {
if (noMoreJob) {
// No more nodes
LOG.debug("All nodes deleted for deployment <{}>", deployment.getId());
deploymentMessage.setDeleteComplete(true);
} else {
// Delete the following node
LOG.debug("Deleting next node for deployment <{}>", deployment.getId());
}

// No error occurred
return true;
} catch (RuntimeException exception) { // Chronos job launch error
// TODO use a custom exception ?
Expand Down Expand Up @@ -548,10 +598,14 @@ protected boolean deleteJobsOnChronosIteratively(Deployment deployment,

// Chronos API hack (to avoid error 400 if the job to delete does not exist)
if (getJobStatus(client, jobName) == null) {
LOG.debug("Job on Chronos does not exist: name <{}>", jobName);
LOG.debug("Deleted Chronos job <{}> - did not exist ({}/{})", jobName,
templateTopologicalOrderIterator.getPosition() + 1,
templateTopologicalOrderIterator.getNodeSize());
} else {
client.deleteJob(jobName);
LOG.debug("Deleted job on Chronos: name <{}>", jobName);
LOG.debug("Deleted Chronos job <{}> ({}/{})", jobName,
templateTopologicalOrderIterator.getPosition() + 1,
templateTopologicalOrderIterator.getNodeSize());
}
} catch (ChronosException ce) {
// Just log the error
Expand Down Expand Up @@ -650,12 +704,16 @@ protected Map<String, IndigoJob> generateJobGraph(Deployment deployment, OneData
// TODO Iterate on Chronos nodes and related dependencies (just ignore others - also if invalid
// - for now)

// Populate resources (nodes) hashmap to speed up job creation (id-name mapping is needed)
Map<String, Resource> resources = deployment.getResources().stream()
.collect(Collectors.toMap(e -> e.getToscaNodeName(), e -> e));

// Only create Indigo Jobs
for (Map.Entry<String, NodeTemplate> node : nodes.entrySet()) {
NodeTemplate nodeTemplate = node.getValue();
String nodeName = node.getKey();
if (isChronosNode(nodeTemplate)) {
Job chronosJob = createJob(nodes, deploymentId, nodeName, nodeTemplate);
Job chronosJob = createJob(nodes, deploymentId, nodeName, nodeTemplate, resources);
IndigoJob job = new IndigoJob(nodeName, chronosJob);
jobs.put(nodeName, job);
}
Expand Down Expand Up @@ -777,15 +835,14 @@ protected List<String> getJobParents(NodeTemplate nodeTemplate, String nodeName,
}

protected Job createJob(Map<String, NodeTemplate> nodes, String deploymentId, String nodeName,
NodeTemplate nodeTemplate) {
NodeTemplate nodeTemplate, Map<String, Resource> resources) {
try {
Job chronosJob = new Job();
// Init job infos

// Get the generated UUID for the node (in DB resource ?)
// FIXME This is just for prototyping... Otherwise is madness!!
Resource resourceJob =
resourceRepository.findByToscaNodeNameAndDeployment_id(nodeName, deploymentId);
Resource resourceJob = resources.get(nodeName);

chronosJob.setName(resourceJob.getId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ public void updateOnError(String deploymentUuid, String message) {
break;
}
deployment.setTask(Task.NONE);
deployment.setStatusReason(message);
if (message != null) {
deployment.setStatusReason(message);
}
deploymentRepository.save(deployment);
}

Expand Down
4 changes: 3 additions & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ tosca.definitions.normative=normative-types.yml
tosca.definitions.indigo=custom_types.yaml

# TEMPORARY - Chronos auth file path (default value to read the files as classpath resource)
chronos.auth.file.path=classpath:chronos/chronos.properties
chronos.auth.file.path=classpath:chronos/chronos.properties

orchestrator.chronos.jobChunkSize=100
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

public class TemplateGeneratorHelper {
public static void main(String args[]) throws FileException {
System.out.println(generateChronosTemplate(1000));
System.out.println(
generateChronosTemplate(10000).replace(System.getProperty("line.separator"), "\\n"));
}

public static String generateChronosTemplate(int nodeNumber) throws FileException {
Expand Down

0 comments on commit addcfa9

Please sign in to comment.