Skip to content

Commit

Permalink
Fixed slowness issue caused BomInput - happens when user doesn't prov…
Browse files Browse the repository at this point in the history
…ide a character encoding.

#176
  • Loading branch information
jbax committed Jul 28, 2017
1 parent 334d097 commit 15ca061
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 51 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
<version>2.5.0</version>
<version>2.5.1-SNAPSHOT</version>
<name>univocity-parsers</name>
<packaging>jar</packaging>
<description>uniVocity's open source parsers for processing different text formats using a consistent API</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,21 @@ private void setLineSeparator(char[] lineSeparator) {
*/
protected abstract void reloadBuffer();

protected final void unwrapInputStream(BomInput.BytesProcessedNotification notification){
InputStream inputStream = notification.input;
String encoding = notification.encoding;

if (encoding != null) {
try {
start(new InputStreamReader(inputStream, encoding));
} catch (Exception e) {
throw new IllegalStateException(e);
}
} else {
start(new InputStreamReader(inputStream));
}
}

@Override
public final void start(Reader reader) {
stop();
Expand All @@ -151,6 +166,7 @@ public final void start(Reader reader) {

lineSeparatorDetected = false;
submitLineSeparatorDetector();

updateBuffer();

//if the input has been properly decoded with the correct UTF* character set, but has a BOM marker, we can safely discard it.
Expand Down
83 changes: 52 additions & 31 deletions src/main/java/com/univocity/parsers/common/input/BomInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@ public final class BomInput extends InputStream {
public static final byte[] UTF_32LE_BOM = toByteArray(0xFF, 0xFE, 0x00, 0x00);

private int bytesRead;
private int b1;
private int b2;
private int b3;
private int b4;
private int bytes[] = new int[4];
private String encoding;
private int consumed = 0;

private final InputStream input;
private IOException exception;
Expand All @@ -54,20 +52,20 @@ public BomInput(InputStream input) {
this.input = input;

try { //This looks shitty on purpose (all in the name of speed).
if ((b1 = next()) == 0xEF) {
if ((b2 = next()) == 0xBB) {
if ((b3 = next()) == 0xBF) {
if ((bytes[0] = next()) == 0xEF) {
if ((bytes[1] = next()) == 0xBB) {
if ((bytes[2] = next()) == 0xBF) {
setEncoding("UTF-8");
}
}
} else if (b1 == 0xFE) {
if ((b2 = next()) == 0xFF) {
} else if (bytes[0] == 0xFE) {
if ((bytes[1] = next()) == 0xFF) {
setEncoding("UTF-16BE");
}
} else if (b1 == 0xFF) {
if ((b2 = next()) == 0xFE) {
if ((b3 = next()) == 0x00) {
if ((b4 = next()) == 0x00) {
} else if (bytes[0] == 0xFF) {
if ((bytes[1] = next()) == 0xFE) {
if ((bytes[2] = next()) == 0x00) {
if ((bytes[3] = next()) == 0x00) {
setEncoding("UTF-32LE");
} else {
setEncoding("UTF-16LE"); //gotcha!
Expand All @@ -76,10 +74,10 @@ public BomInput(InputStream input) {
setEncoding("UTF-16LE"); //gotcha!
}
}
} else if (b1 == 0x00) {
if ((b2 = next()) == 0x00) {
if ((b3 = next()) == 0xFE) {
if ((b4 = next()) == 0xFF) {
} else if (bytes[0] == 0x00) {
if ((bytes[1] = next()) == 0x00) {
if ((bytes[2] = next()) == 0xFE) {
if ((bytes[3] = next()) == 0xFF) {
setEncoding("UTF-32BE");
}
}
Expand All @@ -97,12 +95,17 @@ private void setEncoding(String encoding) {
if (encoding.equals("UTF-16LE")) { //gotcha!
if (bytesRead == 3) { //third byte not a 0x00
bytesRead = 1;
b1 = b3;
bytes[0] = bytes[2];
try {
bytes[1] = next(); //reads next byte to be able to decode to a character
} catch (Exception e) {
exception = (IOException) e;
}
return;
} else if (bytesRead == 4) { //fourth byte not a 0x00
bytesRead = 2;
b1 = b3;
b2 = b4;
bytes[0] = bytes[2];
bytes[1] = bytes[3];
return;
}
}
Expand All @@ -117,25 +120,23 @@ private int next() throws IOException {

@Override
public final int read() throws IOException {
if (bytesRead > 0) {
int out = b1;
if (bytesRead == 2) {
out = b2;
} else if (bytesRead == 3) {
out = b3;
} else if (bytesRead == 4) {
out = b4;
}
if (bytesRead > 0 && bytesRead > consumed) {
int out = bytes[consumed];

// Ensures that if the original input stream returned a byte, it will be consumed.
// In case of exceptions, bytes produced prior to the exception will still be returned.
// Once the last byte has been consumed, the original exception will be thrown.
if (--bytesRead == 0 && exception != null) {
if (++consumed == bytesRead && exception != null) {
throw exception;
}
return out;
}
return input.read();
if (consumed == bytesRead) {
consumed++;
return -1;
}

throw new BytesProcessedNotification(input, encoding);
}

/**
Expand Down Expand Up @@ -179,4 +180,24 @@ public final Charset getCharset() {
public final String getEncoding() {
return encoding;
}

/**
* Internal notification exception used to re-wrap the original {@link InputStream} into a {@link Reader}.
* This is required for performance reasons as overriding {@link InputStream#read()} incurs a heavy performance
* penalty when the implementation is native (as in {@link FileInputStream#read()}.
*/
public static final class BytesProcessedNotification extends RuntimeException {
public final InputStream input;
public final String encoding;

public BytesProcessedNotification(InputStream input, String encoding) {
this.input = input;
this.encoding = encoding;
}

@Override
public Throwable fillInStackTrace() {
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
* A default CharInputReader which only loads batches of characters when requested by the {@link AbstractCharInputReader} through the {@link DefaultCharInputReader#reloadBuffer} method.
*
* @author uniVocity Software Pty Ltd - <a href="mailto:[email protected]">[email protected]</a>
*
*/
public class DefaultCharInputReader extends AbstractCharInputReader {

private Reader reader;

/**
* Creates a new instance with the mandatory characters for handling newlines transparently. Line separators will be detected automatically.
*
* @param normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) that is used to replace any lineSeparator sequence found in the input.
* @param bufferSize the buffer size used to store characters read from the input.
* @param bufferSize the buffer size used to store characters read from the input.
* @param whitespaceRangeStart starting range of characters considered to be whitespace.
*/
public DefaultCharInputReader(char normalizedLineSeparator, int bufferSize, int whitespaceRangeStart) {
Expand All @@ -42,9 +42,10 @@ public DefaultCharInputReader(char normalizedLineSeparator, int bufferSize, int

/**
* Creates a new instance with the mandatory characters for handling newlines transparently.
* @param lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()}
*
* @param lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()}
* @param normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) that is used to replace any lineSeparator sequence found in the input.
* @param bufferSize the buffer size used to store characters read from the input.
* @param bufferSize the buffer size used to store characters read from the input.
* @param whitespaceRangeStart starting range of characters considered to be whitespace.
*/
public DefaultCharInputReader(char[] lineSeparator, char normalizedLineSeparator, int bufferSize, int whitespaceRangeStart) {
Expand Down Expand Up @@ -77,6 +78,9 @@ public void reloadBuffer() {
super.length = reader.read(buffer, 0, buffer.length);
} catch (IOException e) {
throw new IllegalStateException("Error reading from input", e);
} catch (BomInput.BytesProcessedNotification notification) {
stop();
unwrapInputStream(notification);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,16 @@
* A concurrent CharInputReader that loads batches of characters in a separate thread and assigns them to buffer in {@link AbstractCharInputReader} when requested.
*
* <p> This class loads "buckets" of characters in the background and provides them sequentially to the {@link ConcurrentCharInputReader#buffer}
* attribute in {@link AbstractCharInputReader}.
* attribute in {@link AbstractCharInputReader}.
* <p> The bucket loading process will block and wait while all buckets are full.
* <p> Similarly, the reader will block while all buckets are empty.
*
* This CharInputReader implementation provides a better throughput than {@link DefaultCharInputReader} when reading large inputs ({@code > 100 mb}).
*
* @author uniVocity Software Pty Ltd - <a href="mailto:[email protected]">[email protected]</a>
* @see CharInputReader
* @see ConcurrentCharLoader
* @see CharBucket
*
* @author uniVocity Software Pty Ltd - <a href="mailto:[email protected]">[email protected]</a>
*
*/
public class ConcurrentCharInputReader extends AbstractCharInputReader {

Expand All @@ -45,10 +43,11 @@ public class ConcurrentCharInputReader extends AbstractCharInputReader {

/**
* Creates a new instance with the mandatory characters for handling newlines transparently. Line separators will be detected automatically.
*
* @param normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()})
* that is used to replace any lineSeparator sequence found in the input.
* @param bucketSize the size of an each individual "bucket" used to store characters read from the input.
* @param bucketQuantity the number of "buckets" to load in memory. Note the reader will stop if all buckets are full.
* that is used to replace any lineSeparator sequence found in the input.
* @param bucketSize the size of an each individual "bucket" used to store characters read from the input.
* @param bucketQuantity the number of "buckets" to load in memory. Note the reader will stop if all buckets are full.
* @param whitespaceRangeStart starting range of characters considered to be whitespace.
*/
public ConcurrentCharInputReader(char normalizedLineSeparator, int bucketSize, int bucketQuantity, int whitespaceRangeStart) {
Expand All @@ -59,11 +58,12 @@ public ConcurrentCharInputReader(char normalizedLineSeparator, int bucketSize, i

/**
* Creates a new instance with the mandatory characters for handling newlines transparently.
* @param lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()}
*
* @param lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()}
* @param normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()})
* that is used to replace any lineSeparator sequence found in the input.
* @param bucketSize the size of an each individual "bucket" used to store characters read from the input.
* @param bucketQuantity the number of "buckets" to load in memory. Note the reader will stop if all buckets are full.
* that is used to replace any lineSeparator sequence found in the input.
* @param bucketSize the size of an each individual "bucket" used to store characters read from the input.
* @param bucketQuantity the number of "buckets" to load in memory. Note the reader will stop if all buckets are full.
* @param whitespaceRangeStart starting range of characters considered to be whitespace.
*/
public ConcurrentCharInputReader(char[] lineSeparator, char normalizedLineSeparator, int bucketSize, int bucketQuantity, int whitespaceRangeStart) {
Expand All @@ -81,6 +81,12 @@ public void stop() {
if (bucketLoader != null) {
bucketLoader.stopReading();
bucketLoader.reportError();

if(bucketLoader.notification != null){
BomInput.BytesProcessedNotification notification = bucketLoader.notification;
bucketLoader = null;
unwrapInputStream(notification);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.univocity.parsers.common.input.concurrent;

import com.univocity.parsers.common.*;
import com.univocity.parsers.common.input.*;

import java.io.*;
import java.util.concurrent.*;
Expand All @@ -40,6 +41,7 @@ class ConcurrentCharLoader implements Runnable {
private final Reader reader;
private final Thread activeExecution;
private Exception error;
BomInput.BytesProcessedNotification notification;

/**
* Creates a {@link FixedInstancePool} with a given amount of {@link CharBucket} instances and starts a thread to fill each one.
Expand Down Expand Up @@ -91,6 +93,9 @@ public void run() {
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (BomInput.BytesProcessedNotification e) {
finished = true;
notification = e;
} catch (Exception e) {
finished = true;
error = e;
Expand All @@ -99,6 +104,8 @@ public void run() {
}
}



/**
* Returns the next available bucket. Blocks until a bucket is made available or the reading process stops.
*
Expand Down
19 changes: 15 additions & 4 deletions src/test/java/com/univocity/parsers/issues/github/Github_154.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public void readWithBom(boolean extractFromBom, String encoding, byte[] prepend)
parserSettings.setLineSeparatorDetectionEnabled(true);
parserSettings.setHeaderExtractionEnabled(true);
parserSettings.setSkipEmptyLines(false);
parserSettings.setReadInputOnSeparateThread(false);

final CsvParser parser = new CsvParser(parserSettings);

Expand All @@ -94,9 +95,19 @@ public void readWithBom(boolean extractFromBom, String encoding, byte[] prepend)

bytes = newBytes;
}
parser.parse(new ByteArrayInputStream(bytes), encoding);
final List<User> actual = rowProcessor.getBeans();

assertEquals(actual.get(0).email, "[email protected]");
parser.beginParsing(new ByteArrayInputStream(bytes), encoding);
String[] row = parser.parseNext();
parser.stopParsing();

if(prepend != null && prepend[prepend.length -1] == ' '){
assertEquals(parser.getContext().headers()[0], " Email");
assertEquals(row[0], "[email protected]");

} else {
assertEquals(parser.getContext().headers()[0], "Email");
assertEquals(row[0], "[email protected]");
final List<User> actual = rowProcessor.getBeans();
assertEquals(actual.get(0).email, "[email protected]");
}
}
}
Loading

0 comments on commit 15ca061

Please sign in to comment.