diff --git a/bin/COBRA.jar b/bin/COBRA.jar index 60806d9..3adc297 100644 Binary files a/bin/COBRA.jar and b/bin/COBRA.jar differ diff --git a/src/confidential/benchmark/SmartKVStoreClient.java b/src/confidential/benchmark/SmartKVStoreClient.java new file mode 100644 index 0000000..0571de3 --- /dev/null +++ b/src/confidential/benchmark/SmartKVStoreClient.java @@ -0,0 +1,180 @@ +package confidential.benchmark; + +import bftsmart.tom.ServiceProxy; +import bftsmart.tom.util.Storage; +import confidential.demo.map.client.Operation; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * @author Robin + */ +public class SmartKVStoreClient { + private static int initialId; + + public static void main(String[] args) { + if (args.length != 6) { + System.out.println("USAGE: ... SmartKVStoreClient " + + " "); + System.exit(-1); + } + + initialId = Integer.parseInt(args[0]); + int numClients = Integer.parseInt(args[1]); + int numOperations = Integer.parseInt(args[2]); + int requestSize = Integer.parseInt(args[3]); + boolean write = Boolean.parseBoolean(args[4]); + boolean precomputed = Boolean.parseBoolean(args[5]); + + + Client[] clients = new Client[numClients]; + + for (int i = 0; i < numClients; i++) { + clients[i] = new Client(initialId + i, precomputed, numOperations, requestSize, write); + } + + ExecutorService executorService = Executors.newFixedThreadPool(numClients); + Collection> tasks = new LinkedList<>(); + + for (Client client : clients) { + tasks.add(executorService.submit(client)); + } + Runtime.getRuntime().addShutdownHook(new Thread(executorService::shutdownNow)); + + for (Future task : tasks) { + try { + task.get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + executorService.shutdownNow(); + System.exit(-1); + } + } + + executorService.shutdown(); + } + + private static class Client extends Thread { + private int id; + private int numOperations; + private boolean write; + private ServiceProxy proxy; + private boolean preComputed; + private int rampup = 1000; + private byte[] writeRequest; + private byte[] readRequest; + private byte[] data; + + Client(int id, boolean precomputed, int numOperations, int requestSize, boolean write) { + super("Client " + id); + this.id = id; + this.numOperations = numOperations; + this.write = write; + this.preComputed = precomputed; + + Random random = new Random(1L); + String key = "k" + id; + data = new byte[requestSize]; + random.nextBytes(data); + writeRequest = serialize(Operation.PUT, key, data); + readRequest = serialize(Operation.GET, key, null); + + this.proxy = new ServiceProxy(id); + } + + @Override + public void run() { + if (id == initialId) + System.out.println("Warming up..."); + byte[] response; + try { + proxy.invokeOrdered(writeRequest); + for (int i = 0; i < 100; i++) { + if (write) + proxy.invokeOrdered(writeRequest); + else { + response = proxy.invokeUnordered(readRequest); + if (!preComputed && !Arrays.equals(response, data)) + throw new RuntimeException("Wrong response"); + } + } + Storage st = new Storage(numOperations); + + if (id == initialId) + System.out.println("Executing experiment for " + numOperations + " ops"); + for (int i = 0; i < numOperations; i++) { + long t1 = System.nanoTime(); + long t2; + if (write) { + proxy.invokeOrdered(writeRequest); + t2 = System.nanoTime(); + } else { + response = proxy.invokeUnordered(readRequest); + t2 = System.nanoTime(); + if (!preComputed && !Arrays.equals(response, data)) { + System.out.println("Checking"); + throw new RuntimeException("Wrong response"); + } + } + long latency = t2 - t1; + st.store(latency); + try { + if (rampup > 0) { + Thread.sleep(rampup); + rampup -= 100; + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + if (id == initialId) { + System.out.println("Average time for " + numOperations + " executions (-10%) = " + st.getAverage(true) / 1000 + " us "); + System.out.println("Standard deviation for " + numOperations + " executions (-10%) = " + st.getDP(true) / 1000 + " us "); + System.out.println("Average time for " + numOperations + " executions (all samples) = " + st.getAverage(false) / 1000 + " us "); + System.out.println("Standard deviation for " + numOperations + " executions (all samples) = " + st.getDP(false) / 1000 + " us "); + System.out.println("Maximum time for " + numOperations + " executions (all samples) = " + st.getMax(false) / 1000 + " us "); + } + + } finally { + proxy.close(); + } + } + + @Override + public void interrupt() { + proxy.close(); + super.interrupt(); + } + + private byte[] serialize(Operation op, String str, byte[] data) { + try(ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = new ObjectOutputStream(bos)) { + out.write((byte)op.ordinal()); + if(str != null) + out.writeUTF(str); + if (data != null) { + out.writeInt(data.length); + out.write(data); + } + out.flush(); + bos.flush(); + return bos.toByteArray(); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + } + } +} diff --git a/src/confidential/benchmark/SmartThroughputLatencyKVStoreServer.java b/src/confidential/benchmark/SmartThroughputLatencyKVStoreServer.java new file mode 100644 index 0000000..7f541c9 --- /dev/null +++ b/src/confidential/benchmark/SmartThroughputLatencyKVStoreServer.java @@ -0,0 +1,131 @@ +package confidential.benchmark; + +import bftsmart.tom.MessageContext; +import bftsmart.tom.ServiceReplica; +import bftsmart.tom.server.defaultservices.DefaultRecoverable; +import confidential.ConfidentialData; +import confidential.ConfidentialMessage; +import confidential.demo.map.client.Operation; +import confidential.server.ConfidentialRecoverable; +import confidential.statemanagement.ConfidentialSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.*; + +public class SmartThroughputLatencyKVStoreServer extends DefaultRecoverable { + private Logger logger = LoggerFactory.getLogger("demo"); + private Map map; + private long startTime; + private long numRequests; + private Set senders; + private double maxThroughput; + + public static void main(String[] args) throws NumberFormatException { + new SmartThroughputLatencyKVStoreServer(Integer.parseInt(args[0])); + } + + SmartThroughputLatencyKVStoreServer(int processId) { + map = new TreeMap<>(); + new ServiceReplica(processId, this, this); + senders = new HashSet<>(1000); + } + + private void printMeasurement() { + long currentTime = System.nanoTime(); + double deltaTime = (currentTime - startTime) / 1_000_000_000.0; + if ((int) (deltaTime / 5) > 0) { + double throughput = numRequests / deltaTime; + if (throughput > maxThroughput) + maxThroughput = throughput; + logger.info("Clients: {} | Requests: {} | DeltaTime[s]: {} | Throughput[ops/s]: {} (max: {})", + senders.size(), numRequests, deltaTime, throughput, maxThroughput); + numRequests = 0; + startTime = currentTime; + senders.clear(); + } + } + + private byte[] execute(byte[] command, MessageContext msgCtx) { + numRequests++; + senders.add(msgCtx.getSender()); + + try (ByteArrayInputStream bis = new ByteArrayInputStream(command); + ObjectInput in = new ObjectInputStream(bis)) { + Operation op = Operation.getOperation(in.read()); + String str; + byte[] value; + switch (op) { + case GET: + str = in.readUTF(); + value = map.get(str); + return value; + case PUT: + str = in.readUTF(); + value = new byte[in.readInt()]; + in.readFully(value); + map.put(str, value); + return null; + } + } catch (IOException e) { + logger.error("Failed to attend ordered request from {}", msgCtx.getSender(), e); + } finally { + printMeasurement(); + } + return null; + } + + @Override + public void installSnapshot(byte[] state) { + try (ByteArrayInputStream bis = new ByteArrayInputStream(state); + ObjectInput in = new ObjectInputStream(bis)) { + int size = in.readInt(); + map = new TreeMap<>(); + + for (int i = 0; i < size; i++) { + String key = in.readUTF(); + byte[] b = new byte[in.readInt()]; + in.readFully(b); + map.put(key, b); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public byte[] getSnapshot() { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = new ObjectOutputStream(bos)) { + out.writeInt(map.size()); + for (Map.Entry e : map.entrySet()) { + out.writeUTF(e.getKey()); + out.writeInt(e.getValue().length); + out.write(e.getValue()); + } + out.flush(); + bos.flush(); + return bos.toByteArray(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + @Override + public byte[][] appExecuteBatch(byte[][] commands, MessageContext[] msgCtxs, boolean fromConsensus) { + byte[][] replies = new byte[commands.length][]; + + for (int i = 0; i < commands.length; i++) { + replies[i] = execute(commands[i], msgCtxs[i]); + } + + return replies; + } + + @Override + public byte[] appExecuteUnordered(byte[] command, MessageContext msgCtx) { + return execute(command, msgCtx); + } +}