Skip to content
This repository has been archived by the owner on Jul 19, 2024. It is now read-only.

Commit

Permalink
Merge pull request #540 from rickle-msft/abortAndClose
Browse files Browse the repository at this point in the history
Abort and close
  • Loading branch information
rickle-msft authored Mar 30, 2020
2 parents 3296182 + e7064fb commit 20d19f7
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 26 deletions.
4 changes: 4 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
2020.03.30 Version 8.6.3
* Added the commitWriteOnInputStreamException option to BlobRequestOptions, which will allow the user to configure whether any data staged through openWrite when using the upload(InputStream, long) method will be committed upon failures in the InputStream.
* Disabled httpsKeepAlive by default as it can introduce some perf issues.

2020.03.18 Version 8.6.2
* Fixed a bug in the pom that disrupted the ability to download from maven central.

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ To get the binaries of this library as distributed by Microsoft, ready for use w
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
<version>8.6.2</version>
<version>8.6.3</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion microsoft-azure-storage-samples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
<version>8.6.2</version>
<version>8.6.3</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
<version>8.6.2</version>
<version>8.6.3</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2755,6 +2755,71 @@ public void testSkipEtagCheck() throws StorageException, IOException, URISyntaxE
stream.close();
}

private static class ExceptionInputStream extends InputStream {
private final byte[] data;
int index = 0;
boolean firstRead = true;

ExceptionInputStream(byte[] data) {
this.data = data;
}

@Override
public int read() throws IOException {
return 0;
}

@Override
public int read(byte[] arr, int offset, int len) throws IOException {
if (firstRead) {
// Fill either half the incoming buffer or use half the data, whichever is smaller.
// For safe partial write.
int size = Math.min(data.length, len) / 2;
if (len >= 0) System.arraycopy(data, index, arr, offset, size);
firstRead = false;
return size;
} else {
throw new IOException();
}
}
}

@Test
public void testCommitOnInputStreamException() throws StorageException, IOException, URISyntaxException {
final int blobSize = 2 * Constants.DEFAULT_MINIMUM_READ_SIZE_IN_BYTES; // so BlobInputStream doesn't read entire blob at once.

CloudBlobContainer container = BlobTestHelper.getRandomContainerReference();
container.createIfNotExists();
CloudBlockBlob blob = container.getBlockBlobReference(BlobTestHelper.generateRandomBlobNameWithPrefix(""));

BlobRequestOptions options = new BlobRequestOptions();

// Upload with no commit on failure.
byte[] data = TestHelper.getRandomBuffer(blobSize);
InputStream is = new ExceptionInputStream(data);
options.setCommitWriteOnInputStreamException(false);

try {
blob.upload(is, blobSize, null, null, options, null);
fail();
} catch(IOException e) {
assertFalse(blob.exists());
}

// Upload with commit on failure.
is = new ExceptionInputStream(data);
options = new BlobRequestOptions(); // Test the default. Should be true.

try {
blob.upload(is, blobSize, null, null, options, null);
fail();
} catch (IOException e) {
assertTrue(blob.exists());
}

container.delete();
}

@Test
public void testPutGetBlobCPK() throws URISyntaxException, StorageException, IOException {
// load CPK into options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ public static class HeaderConstants {
/**
* Specifies the value to use for UserAgent header.
*/
public static final String USER_AGENT_VERSION = "8.6.2";
public static final String USER_AGENT_VERSION = "8.6.3";

/**
* The default type for content-type and accept
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ protected static void applyBaseDefaultsInternal(final RequestOptions modifiedOpt
}

if (modifiedOptions.disableHttpsSocketKeepAlive() == null) {
modifiedOptions.setDisableHttpsSocketKeepAlive(false);
modifiedOptions.setDisableHttpsSocketKeepAlive(true);
}
}

Expand Down Expand Up @@ -209,7 +209,7 @@ public Boolean requireEncryption() {
* keep-alive; otherwise, <code>false</code>. For more information about disableHttpsSocketKeepAlive defaults, see
* {@link ServiceClient#getDefaultRequestOptions()}
*
* @return A value to indicate whther https socket keep-alive should be disabled.
* @return A value to indicate whether https socket keep-alive should be disabled.
*/
public Boolean disableHttpsSocketKeepAlive() {
return this.disableHttpsSocketKeepAlive;
Expand Down Expand Up @@ -322,16 +322,16 @@ public void setRequireEncryption(Boolean requireEncryption) {
* Sets a value to indicate whether https socket keep-alive should be disabled. Use <code>true</code> to disable
* keep-alive; otherwise, <code>false</code>
* <p>
* The default is set in the client and is by default false, indicating that https socket keep-alive will be
* enabled. You can change the value on this request by setting this property. You can also change the value on
* The default is set in the client and is by default true, indicating that https socket keep-alive will be
* disabled. You can change the value on this request by setting this property. You can also change the value on
* on the {@link ServiceClient#getDefaultRequestOptions()} object so that all subsequent requests made via the
* service client will use the appropriate value.
* <p>
* Setting keep-alive on https sockets is to work around a bug in the JVM where connection timeouts are not honored
* on retried requests. In those cases, we use socket keep-alive as a fallback. Unfortunately, the timeout value
* must be taken from a JVM property rather than configured locally. Therefore, in rare cases the JVM has configured
* aggressively short keep-alive times, it may be beneficial to disable the use of keep-alives lest they interfere
* with long running data transfer operations.
* on retried requests. In those cases, you may choose to use socket keep-alive as a fallback. Unfortunately, the
* timeout value must be taken from a JVM property rather than configured locally. Therefore, in rare cases the JVM
* has configured aggressively short keep-alive times, it may not be beneficial to enable the use of keep-alives
* lest they interfere with long running data transfer operations.
*
* @param disableHttpsSocketKeepAlive
* A value to indicate whether https socket keep-alive should be disabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public void close() throws IOException
this.cryptoStream.close();
}

@Override
void abort() throws IOException {
// no-op. This method is used in the case of aborting uploads, and decrypt streams are on downloads.
}

@Override
public void write(byte[] data, int offset, int length) throws IOException {
// Keep buffering until we have 16 bytes of IV.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ final class BlobEncryptStream extends BlobOutputStream {
* Holds the cipher stream.
*/
private CipherOutputStream cipherStream;

private BlobOutputStreamInternal blobStream;

/**
* Initializes a new instance of the BlobEncryptStream class for a CloudBlockBlob
Expand All @@ -70,8 +72,7 @@ protected BlobEncryptStream(final CloudBlockBlob blockBlob, final AccessConditio
this.options = options;

this.options.setValidateEncryptionPolicy(false);
BlobOutputStreamInternal blobStream =
new BlobOutputStreamInternal(blockBlob, accessCondition, options, opContext);
blobStream = new BlobOutputStreamInternal(blockBlob, accessCondition, options, opContext);
this.cipherStream = new CipherOutputStream(blobStream, cipher);
}

Expand Down Expand Up @@ -159,4 +160,9 @@ public void close() throws IOException {
this.cipherStream.close();
}

@Override
void abort() throws IOException {
this.blobStream.abort();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,10 @@ public void write(final byte[] data) throws IOException {
@Override
@DoesServiceRequest
public abstract void close() throws IOException;

/**
* Signals to the BlobOutputStream that it is being aborted and should not commit the data to the service on
* closing, typically to be used in cases of errors or exceptions in the data source.
*/
abstract void abort() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ public Thread newThread(Runnable r) {
*/
private final ThreadPoolExecutor threadExecutor;

/**
* Indicates whether the stream has been aborted and therefore closing will skip committing data.
*/
private boolean aborted;

/**
* Initializes a new instance of the BlobOutputStream class.
*
Expand Down Expand Up @@ -314,17 +319,20 @@ public synchronized void close() throws IOException {
this.checkStreamState();

// flush any remaining data
this.flush();
if (!this.aborted) {
this.flush();
}

// shut down the ExecutorService.
this.threadExecutor.shutdown();

// try to commit the blob
try {
this.commit();
}
catch (final StorageException e) {
throw Utility.initIOException(e);
if (!this.aborted) {
try {
this.commit();
} catch (final StorageException e) {
throw Utility.initIOException(e);
}
}
}
finally {
Expand All @@ -339,6 +347,11 @@ public synchronized void close() throws IOException {
}
}

@Override
public void abort() throws IOException {
this.aborted = true;
}

/**
* Commits the blob, for block blob this uploads the block list.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,15 @@ public final class BlobRequestOptions extends RequestOptions {
* Default is false.
*/
private boolean skipEtagLocking = false;


/**
* A <code>boolean</code> that defines the behavior for handling exceptions when reading from the
* <code>InputStream</code> and using <code>openWrite</code>. If <code>true</code> the data that has been read from
* the stream up to the point of the exception will be flushed and a new blob will be committed with that data.
* Otherwise, the upload will be aborted and no data will be committed.
*/
private boolean commitWriteOnInputStreamException = true;

/**
* Creates an instance of the <code>BlobRequestOptions</code> class.
*/
Expand Down Expand Up @@ -119,6 +127,7 @@ public BlobRequestOptions(final BlobRequestOptions other) {
//this.setSourceCustomerProvidedKey(other.getSourceCustomerProvidedKey());
this.setValidateEncryptionPolicy(other.getValidateEncryptionPolicy());
this.setSkipEtagLocking(other.getSkipEtagLocking());
this.setCommitWriteOnInputStreamException(other.getCommitWriteOnInputStreamException());
}
}

Expand Down Expand Up @@ -362,6 +371,20 @@ public boolean getSkipEtagLocking() {
return this.skipEtagLocking;
}

/**
* A <code>boolean</code> that defines the behavior for handling exceptions when reading from the
* <code>InputStream</code> and using <code>openWrite</code>. If <code>true</code> the data that has been read from
* the stream up to the point of the exception will be flushed and a new blob will be committed with that data.
* Otherwise, the upload will be aborted and no data will be committed.
*
* For more information about defaults, see {@link #setCommitWriteOnInputStreamException(boolean)}.
*
* @return <code>true</code> if data will be committed upon an exception; otherwise, <code>false</code>.
*/
public boolean getCommitWriteOnInputStreamException() {
return this.commitWriteOnInputStreamException;
}

/**
* Sets whether a conditional failure should be absorbed on a retry attempt for the request. This option
* is only used by {@link CloudAppendBlob} in upload and openWrite methods. By default, it is set to
Expand Down Expand Up @@ -528,6 +551,21 @@ public void setSkipEtagLocking(boolean skipEtagLocking) {
this.skipEtagLocking = skipEtagLocking;
}

/**
* A <code>boolean</code> that defines the behavior for handling exceptions when reading from the
* <code>InputStream</code> and using <code>openWrite</code>. If <code>true</code> the data that has been read from
* the stream up to the point of the exception will be flushed and a new blob will be committed with that data.
* Otherwise, the upload will be aborted and no data will be committed.
*
* The default value is <code>true</code>.
*
* @param commitWriteOnInputStreamException
* Use <code>true</code> if data will be committed upon an exception; otherwise, <code>false</code>.
*/
public void setCommitWriteOnInputStreamException(boolean commitWriteOnInputStreamException) {
this.commitWriteOnInputStreamException = commitWriteOnInputStreamException;
}

/**
* Assert that if validation is on, an encryption policy is not specified.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -769,8 +769,9 @@ public void upload(final InputStream sourceStream, final long length, final Acce
* If a storage service error occurred.
*/
@DoesServiceRequest
public void upload(final InputStream sourceStream, final long length, final StandardBlobTier standardBlobTier, final AccessCondition accessCondition,
BlobRequestOptions options, OperationContext opContext) throws StorageException, IOException {
public void upload(final InputStream sourceStream, final long length, final StandardBlobTier standardBlobTier,
final AccessCondition accessCondition, BlobRequestOptions options,
OperationContext opContext) throws StorageException, IOException {
if (length < -1) {
throw new IllegalArgumentException(SR.STREAM_LENGTH_NEGATIVE);
}
Expand Down Expand Up @@ -892,10 +893,20 @@ public byte[] getByteArray() {
useOpenWrite = true;
if (useOpenWrite) {
final BlobOutputStream writeStream = this.openOutputStream(accessCondition, options, opContext);
/*
We want to give the customer the option to skip the commit on close in case of input stream failures.
While we catch all exceptions here, both from reading the IS and writing to the service, any write which
fails in such a way as to throw effectively aborts the upload anyway, so calling abort in all cases
achieves the intended behavior.
*/
try {
writeStream.write(inputDataStream, length);
}
finally {
} catch (Exception e) {
if (!options.getCommitWriteOnInputStreamException()) {
writeStream.abort();
}
throw e;
} finally {
writeStream.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,9 @@ public void close() throws IOException {
// no op
}

@Override
void abort() throws IOException {
// no op. Abort is only used to abort uploads and this type is only used on download paths.
}

}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
<version>8.6.2</version>
<version>8.6.3</version>
<packaging>jar</packaging>

<name>Microsoft Azure Storage Client SDK</name>
Expand Down

0 comments on commit 20d19f7

Please sign in to comment.