From 6bb0ee6355724d631154a225dded86d8dc5b4bfb Mon Sep 17 00:00:00 2001 From: Eric Rozell Date: Wed, 7 Jun 2017 10:19:44 -0400 Subject: [PATCH] Add schema upload/download support to AzureStorageDriver Uses the CloudBlobContainer abstraction to write the schema to a well-known path. Fixes #450 --- .../executor/backup/AzureStorageDriver.java | 83 ++++++++++++++++--- 1 file changed, 73 insertions(+), 10 deletions(-) diff --git a/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/AzureStorageDriver.java b/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/AzureStorageDriver.java index 7535de59..d42e4f39 100644 --- a/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/AzureStorageDriver.java +++ b/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/AzureStorageDriver.java @@ -18,12 +18,14 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; import java.nio.file.FileSystems; import java.nio.file.Files; import java.security.InvalidKeyException; @@ -131,13 +133,22 @@ private void uploadDirectory(String localLocation, private void uploadFile(CloudBlobContainer container, String fileKey, File sourceFile) { - PageBlobOutputStream pageBlobOutputStream = null; - SnappyOutputStream compress = null; - BufferedOutputStream bufferedOutputStream = null; try (BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(sourceFile))) { LOGGER.info("Initiating upload for file: {} | key: {}", sourceFile.getAbsolutePath(), fileKey); + uploadStream(container, fileKey, inputStream); + } catch (IOException e) { + LOGGER.error("Unable to store blob", e); + } + } + + private void uploadStream(CloudBlobContainer container, String fileKey, InputStream inputStream) { + + PageBlobOutputStream pageBlobOutputStream = null; + SnappyOutputStream compress = null; + BufferedOutputStream bufferedOutputStream = null; + try { final CloudPageBlob blob = container.getPageBlobReference(fileKey); pageBlobOutputStream = new PageBlobOutputStream(blob); @@ -145,7 +156,6 @@ private void uploadFile(CloudBlobContainer container, String fileKey, File sourc compress = new SnappyOutputStream(bufferedOutputStream, DEFAULT_PART_SIZE_UPLOAD); IOUtils.copy(inputStream, compress, DEFAULT_PART_SIZE_UPLOAD); - } catch (StorageException | URISyntaxException | IOException e) { LOGGER.error("Unable to store blob", e); } finally { @@ -157,8 +167,29 @@ private void uploadFile(CloudBlobContainer container, String fileKey, File sourc @Override public void uploadSchema(BackupRestoreContext ctx, String schema) { - // ToDo : Add the upload schema to Azure. - // Path: + final String accountName = ctx.getAccountId(); + final String accountKey = ctx.getSecretKey(); + final String backupName = ctx.getName(); + final String nodeId = ctx.getNodeId(); + + final String containerName = StringUtils.lowerCase(getContainerName(ctx.getExternalLocation())); + // https://.blob.core.windows.net/ + final CloudBlobContainer container = getCloudBlobContainer(accountName, accountKey, containerName); + + if (container == null) { + LOGGER.error("Error uploading schema. Unable to connect to {}, for container {}.", + ctx.getExternalLocation(), containerName); + return; + } + + final String key = backupName + "/" + nodeId + "/" + StorageUtil.SCHEMA_FILE; + uploadText(container, key, schema); + } + + private void uploadText(CloudBlobContainer container, String fileKey, String text) { + final InputStream inputStream = new ByteArrayInputStream(text.getBytes(StandardCharsets.UTF_8)); + LOGGER.info("Initiating upload for schema | key: {}", fileKey); + uploadStream(container, fileKey, inputStream); } @Override @@ -175,8 +206,8 @@ public void download(BackupRestoreContext ctx) throws IOException { final CloudBlobContainer container = getCloudBlobContainer(accountName, accountKey, containerName); if (container == null) { - LOGGER.error("Error uploading snapshots. Unable to connect to {}, for container {}.", - ctx.getExternalLocation(), containerName, localLocation); + LOGGER.error("Error downloading snapshots. Unable to connect to {}, for container {}.", + ctx.getExternalLocation(), containerName); return; } String keyPrefix = String.format("%s/%s", backupName, nodeId); @@ -224,8 +255,40 @@ private void downloadFile(String localLocation, CloudBlobContainer container, St @Override public String downloadSchema(BackupRestoreContext ctx) throws Exception { - // ToDo : Add the download schema to Azure. - return new String(""); + final String accountName = ctx.getAccountId(); + final String accountKey = ctx.getSecretKey(); + final String backupName = ctx.getName(); + final String nodeId = ctx.getNodeId(); + + final String containerName = StringUtils.lowerCase(getContainerName(ctx.getExternalLocation())); + // https://.blob.core.windows.net/ + final CloudBlobContainer container = getCloudBlobContainer(accountName, accountKey, containerName); + + if (container == null) { + LOGGER.error("Error downloading snapshots. Unable to connect to {}, for container {}.", + ctx.getExternalLocation(), containerName); + return new String(""); + } + + final String key = backupName + "/" + nodeId + "/" + StorageUtil.SCHEMA_FILE; + + InputStream inputStream = null; + SnappyInputStream compress = null; + + try { + final CloudPageBlob pageBlobReference = container.getPageBlobReference(key); + inputStream = new PageBlobInputStream(pageBlobReference); + compress = new SnappyInputStream(inputStream); + + return IOUtils.toString(compress, "UTF-8"); + + } catch (Exception e) { + LOGGER.error("Unable to read schema from: {}", key, e); + return new String(""); + } finally { + IOUtils.closeQuietly(compress); + IOUtils.closeQuietly(inputStream); + } } private String getContainerName(String externalLocation) {