Skip to content

Commit

Permalink
Smart benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
rvassantlal committed Nov 14, 2019
1 parent 971b716 commit 4c58570
Show file tree
Hide file tree
Showing 3 changed files with 311 additions and 0 deletions.
Binary file modified bin/COBRA.jar
Binary file not shown.
180 changes: 180 additions & 0 deletions src/confidential/benchmark/SmartKVStoreClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package confidential.benchmark;

import bftsmart.tom.ServiceProxy;
import bftsmart.tom.util.Storage;
import confidential.demo.map.client.Operation;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* @author Robin
*/
public class SmartKVStoreClient {
private static int initialId;

public static void main(String[] args) {
if (args.length != 6) {
System.out.println("USAGE: ... SmartKVStoreClient <initial client id> " +
"<num clients> <number of ops> <request size> <write?> <precomputed?>");
System.exit(-1);
}

initialId = Integer.parseInt(args[0]);
int numClients = Integer.parseInt(args[1]);
int numOperations = Integer.parseInt(args[2]);
int requestSize = Integer.parseInt(args[3]);
boolean write = Boolean.parseBoolean(args[4]);
boolean precomputed = Boolean.parseBoolean(args[5]);


Client[] clients = new Client[numClients];

for (int i = 0; i < numClients; i++) {
clients[i] = new Client(initialId + i, precomputed, numOperations, requestSize, write);
}

ExecutorService executorService = Executors.newFixedThreadPool(numClients);
Collection<Future<?>> tasks = new LinkedList<>();

for (Client client : clients) {
tasks.add(executorService.submit(client));
}
Runtime.getRuntime().addShutdownHook(new Thread(executorService::shutdownNow));

for (Future<?> task : tasks) {
try {
task.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
executorService.shutdownNow();
System.exit(-1);
}
}

executorService.shutdown();
}

private static class Client extends Thread {
private int id;
private int numOperations;
private boolean write;
private ServiceProxy proxy;
private boolean preComputed;
private int rampup = 1000;
private byte[] writeRequest;
private byte[] readRequest;
private byte[] data;

Client(int id, boolean precomputed, int numOperations, int requestSize, boolean write) {
super("Client " + id);
this.id = id;
this.numOperations = numOperations;
this.write = write;
this.preComputed = precomputed;

Random random = new Random(1L);
String key = "k" + id;
data = new byte[requestSize];
random.nextBytes(data);
writeRequest = serialize(Operation.PUT, key, data);
readRequest = serialize(Operation.GET, key, null);

this.proxy = new ServiceProxy(id);
}

@Override
public void run() {
if (id == initialId)
System.out.println("Warming up...");
byte[] response;
try {
proxy.invokeOrdered(writeRequest);
for (int i = 0; i < 100; i++) {
if (write)
proxy.invokeOrdered(writeRequest);
else {
response = proxy.invokeUnordered(readRequest);
if (!preComputed && !Arrays.equals(response, data))
throw new RuntimeException("Wrong response");
}
}
Storage st = new Storage(numOperations);

if (id == initialId)
System.out.println("Executing experiment for " + numOperations + " ops");
for (int i = 0; i < numOperations; i++) {
long t1 = System.nanoTime();
long t2;
if (write) {
proxy.invokeOrdered(writeRequest);
t2 = System.nanoTime();
} else {
response = proxy.invokeUnordered(readRequest);
t2 = System.nanoTime();
if (!preComputed && !Arrays.equals(response, data)) {
System.out.println("Checking");
throw new RuntimeException("Wrong response");
}
}
long latency = t2 - t1;
st.store(latency);
try {
if (rampup > 0) {
Thread.sleep(rampup);
rampup -= 100;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

if (id == initialId) {
System.out.println("Average time for " + numOperations + " executions (-10%) = " + st.getAverage(true) / 1000 + " us ");
System.out.println("Standard deviation for " + numOperations + " executions (-10%) = " + st.getDP(true) / 1000 + " us ");
System.out.println("Average time for " + numOperations + " executions (all samples) = " + st.getAverage(false) / 1000 + " us ");
System.out.println("Standard deviation for " + numOperations + " executions (all samples) = " + st.getDP(false) / 1000 + " us ");
System.out.println("Maximum time for " + numOperations + " executions (all samples) = " + st.getMax(false) / 1000 + " us ");
}

} finally {
proxy.close();
}
}

@Override
public void interrupt() {
proxy.close();
super.interrupt();
}

private byte[] serialize(Operation op, String str, byte[] data) {
try(ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = new ObjectOutputStream(bos)) {
out.write((byte)op.ordinal());
if(str != null)
out.writeUTF(str);
if (data != null) {
out.writeInt(data.length);
out.write(data);
}
out.flush();
bos.flush();
return bos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
}
131 changes: 131 additions & 0 deletions src/confidential/benchmark/SmartThroughputLatencyKVStoreServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package confidential.benchmark;

import bftsmart.tom.MessageContext;
import bftsmart.tom.ServiceReplica;
import bftsmart.tom.server.defaultservices.DefaultRecoverable;
import confidential.ConfidentialData;
import confidential.ConfidentialMessage;
import confidential.demo.map.client.Operation;
import confidential.server.ConfidentialRecoverable;
import confidential.statemanagement.ConfidentialSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.*;

public class SmartThroughputLatencyKVStoreServer extends DefaultRecoverable {
private Logger logger = LoggerFactory.getLogger("demo");
private Map<String, byte[]> map;
private long startTime;
private long numRequests;
private Set<Integer> senders;
private double maxThroughput;

public static void main(String[] args) throws NumberFormatException {
new SmartThroughputLatencyKVStoreServer(Integer.parseInt(args[0]));
}

SmartThroughputLatencyKVStoreServer(int processId) {
map = new TreeMap<>();
new ServiceReplica(processId, this, this);
senders = new HashSet<>(1000);
}

private void printMeasurement() {
long currentTime = System.nanoTime();
double deltaTime = (currentTime - startTime) / 1_000_000_000.0;
if ((int) (deltaTime / 5) > 0) {
double throughput = numRequests / deltaTime;
if (throughput > maxThroughput)
maxThroughput = throughput;
logger.info("Clients: {} | Requests: {} | DeltaTime[s]: {} | Throughput[ops/s]: {} (max: {})",
senders.size(), numRequests, deltaTime, throughput, maxThroughput);
numRequests = 0;
startTime = currentTime;
senders.clear();
}
}

private byte[] execute(byte[] command, MessageContext msgCtx) {
numRequests++;
senders.add(msgCtx.getSender());

try (ByteArrayInputStream bis = new ByteArrayInputStream(command);
ObjectInput in = new ObjectInputStream(bis)) {
Operation op = Operation.getOperation(in.read());
String str;
byte[] value;
switch (op) {
case GET:
str = in.readUTF();
value = map.get(str);
return value;
case PUT:
str = in.readUTF();
value = new byte[in.readInt()];
in.readFully(value);
map.put(str, value);
return null;
}
} catch (IOException e) {
logger.error("Failed to attend ordered request from {}", msgCtx.getSender(), e);
} finally {
printMeasurement();
}
return null;
}

@Override
public void installSnapshot(byte[] state) {
try (ByteArrayInputStream bis = new ByteArrayInputStream(state);
ObjectInput in = new ObjectInputStream(bis)) {
int size = in.readInt();
map = new TreeMap<>();

for (int i = 0; i < size; i++) {
String key = in.readUTF();
byte[] b = new byte[in.readInt()];
in.readFully(b);
map.put(key, b);
}
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public byte[] getSnapshot() {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = new ObjectOutputStream(bos)) {
out.writeInt(map.size());
for (Map.Entry<String, byte[]> e : map.entrySet()) {
out.writeUTF(e.getKey());
out.writeInt(e.getValue().length);
out.write(e.getValue());
}
out.flush();
bos.flush();
return bos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

@Override
public byte[][] appExecuteBatch(byte[][] commands, MessageContext[] msgCtxs, boolean fromConsensus) {
byte[][] replies = new byte[commands.length][];

for (int i = 0; i < commands.length; i++) {
replies[i] = execute(commands[i], msgCtxs[i]);
}

return replies;
}

@Override
public byte[] appExecuteUnordered(byte[] command, MessageContext msgCtx) {
return execute(command, msgCtx);
}
}

0 comments on commit 4c58570

Please sign in to comment.