-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLocalApplication.java
147 lines (128 loc) · 5.56 KB
/
LocalApplication.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import Task.DoneTask;
import Task.NewTask;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2ClientBuilder;
import com.amazonaws.services.ec2.model.IamInstanceProfileSpecification;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import org.apache.commons.codec.binary.Base64;
import java.io.*;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
public class LocalApplication {
static private String appId;
static private String inputPath;
static private String outputPath;
static private Integer n;
static private QueueHandler newTasksQueue;
static private QueueHandler doneTasksQueue = null;
static private AmazonS3 s3 = AmazonS3ClientBuilder.standard().withRegion("us-east-1").build();
static private DoneTask doneTask = null;
static private AmazonEC2 ec2 = AmazonEC2ClientBuilder.standard().withRegion("us-east-1").build();
public static void main(String[] args) {
if (args.length == 3) {
try {
inputPath = args[0];
outputPath = args[1];
n = Integer.parseInt(args[2]);
appId = UUID.randomUUID().toString();
System.out.println("Running application with id: " + appId + " on: " + inputPath + " with n = " + n.toString() +
" your output can be found here: " + outputPath);
turnOnManager();
createTask();
receiveDoneTask();
saveFile();
System.out.println("Finish");
} catch (Exception e) {
}
}
}
private static void turnOnManager() {
try {
newTasksQueue = new QueueHandler(Manager.NEW_TASKS_QUEUE_NAME);
} catch (QueueDoesNotExistException e) {
turnOn();
}
}
private static void turnOn() {
String userData = getUserData("manager.jar");
RunInstancesRequest instancesRequest = new RunInstancesRequest("ami-08fe4614a9f89c0ec", 1, 1);
instancesRequest.withInstanceType(InstanceType.T2Micro)
.withIamInstanceProfile(new IamInstanceProfileSpecification().withArn(Manager.ARN_NAME))
.withUserData(userData)
.withKeyName("aws-key1");
ec2.runInstances(instancesRequest).getReservation().getInstances();
boolean in = true;
while (in) {
try {
newTasksQueue = new QueueHandler(Manager.NEW_TASKS_QUEUE_NAME);
in = false;
} catch (QueueDoesNotExistException e) {
}
}
}
private static void createTask() {
File file = new File(inputPath);
s3.putObject(Manager.BUCKET_NAME, inputPath, file);
NewTask task = new NewTask(appId, inputPath, n);
String body = task.toJson();
newTasksQueue.sendMessage(body);
}
private static void receiveDoneTask() {
// receive message
doneTasksQueue = new QueueHandler(Manager.DONE_TASKS_QUEUE_NAME);
while (true) {
List<Message> messages = doneTasksQueue.receiveMessages();
for (Message message : messages) {
doneTask = DoneTask.fromJson(message.getBody());
System.out.println(message.getBody());
if (doneTask.getAppId() .equals(appId)) {
// remove from queue
doneTasksQueue.deleteMessage(message);
return;
}
}
}
}
private static void saveFile() {
try (S3Object o = s3.getObject(Manager.BUCKET_NAME, doneTask.getOutputKeyName());
S3ObjectInputStream s3is = o.getObjectContent();
BufferedReader reader = new BufferedReader(new InputStreamReader(s3is));
BufferedWriter writer = new BufferedWriter(new FileWriter(new File(outputPath+ ".html")))
) {
String line;
while ((line = reader.readLine()) != null) {
writer.write(line);
}
s3.deleteObject(Manager.BUCKET_NAME, doneTask.getOutputKeyName());
s3.deleteObject(Manager.BUCKET_NAME, inputPath);
} catch (IOException e) {
e.printStackTrace();
}
}
public static String getUserData(String jarName) {
String userData =
"#!/bin/bash\n" +
"yum update -y\n" +
"yum -y install python-pip\n" +
"yum -y install awscli\n" +
"yum -y install java-1.8.0\n" +
"echo 2 | alternatives --config java\n" +
"aws s3 cp s3://" + Manager.BUCKET_NAME + "/" + jarName + " . --region us-east-1\n" +
"java -jar " + jarName + "\n";
String base64UserData = null;
try {
base64UserData = new String(Base64.encodeBase64(userData.getBytes("UTF-8")), "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return base64UserData;
}
}