diff --git a/pom.xml b/pom.xml index e071e25a..17db71ed 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 com.univocity univocity-parsers - 2.5.0 + 2.5.1-SNAPSHOT univocity-parsers jar uniVocity's open source parsers for processing different text formats using a consistent API diff --git a/src/main/java/com/univocity/parsers/common/input/AbstractCharInputReader.java b/src/main/java/com/univocity/parsers/common/input/AbstractCharInputReader.java index b7e2bc3b..633d753a 100644 --- a/src/main/java/com/univocity/parsers/common/input/AbstractCharInputReader.java +++ b/src/main/java/com/univocity/parsers/common/input/AbstractCharInputReader.java @@ -143,6 +143,21 @@ private void setLineSeparator(char[] lineSeparator) { */ protected abstract void reloadBuffer(); + protected final void unwrapInputStream(BomInput.BytesProcessedNotification notification){ + InputStream inputStream = notification.input; + String encoding = notification.encoding; + + if (encoding != null) { + try { + start(new InputStreamReader(inputStream, encoding)); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } else { + start(new InputStreamReader(inputStream)); + } + } + @Override public final void start(Reader reader) { stop(); @@ -151,6 +166,7 @@ public final void start(Reader reader) { lineSeparatorDetected = false; submitLineSeparatorDetector(); + updateBuffer(); //if the input has been properly decoded with the correct UTF* character set, but has a BOM marker, we can safely discard it. diff --git a/src/main/java/com/univocity/parsers/common/input/BomInput.java b/src/main/java/com/univocity/parsers/common/input/BomInput.java index 45536516..00210725 100644 --- a/src/main/java/com/univocity/parsers/common/input/BomInput.java +++ b/src/main/java/com/univocity/parsers/common/input/BomInput.java @@ -36,11 +36,9 @@ public final class BomInput extends InputStream { public static final byte[] UTF_32LE_BOM = toByteArray(0xFF, 0xFE, 0x00, 0x00); private int bytesRead; - private int b1; - private int b2; - private int b3; - private int b4; + private int bytes[] = new int[4]; private String encoding; + private int consumed = 0; private final InputStream input; private IOException exception; @@ -54,20 +52,20 @@ public BomInput(InputStream input) { this.input = input; try { //This looks shitty on purpose (all in the name of speed). - if ((b1 = next()) == 0xEF) { - if ((b2 = next()) == 0xBB) { - if ((b3 = next()) == 0xBF) { + if ((bytes[0] = next()) == 0xEF) { + if ((bytes[1] = next()) == 0xBB) { + if ((bytes[2] = next()) == 0xBF) { setEncoding("UTF-8"); } } - } else if (b1 == 0xFE) { - if ((b2 = next()) == 0xFF) { + } else if (bytes[0] == 0xFE) { + if ((bytes[1] = next()) == 0xFF) { setEncoding("UTF-16BE"); } - } else if (b1 == 0xFF) { - if ((b2 = next()) == 0xFE) { - if ((b3 = next()) == 0x00) { - if ((b4 = next()) == 0x00) { + } else if (bytes[0] == 0xFF) { + if ((bytes[1] = next()) == 0xFE) { + if ((bytes[2] = next()) == 0x00) { + if ((bytes[3] = next()) == 0x00) { setEncoding("UTF-32LE"); } else { setEncoding("UTF-16LE"); //gotcha! @@ -76,10 +74,10 @@ public BomInput(InputStream input) { setEncoding("UTF-16LE"); //gotcha! } } - } else if (b1 == 0x00) { - if ((b2 = next()) == 0x00) { - if ((b3 = next()) == 0xFE) { - if ((b4 = next()) == 0xFF) { + } else if (bytes[0] == 0x00) { + if ((bytes[1] = next()) == 0x00) { + if ((bytes[2] = next()) == 0xFE) { + if ((bytes[3] = next()) == 0xFF) { setEncoding("UTF-32BE"); } } @@ -97,12 +95,17 @@ private void setEncoding(String encoding) { if (encoding.equals("UTF-16LE")) { //gotcha! if (bytesRead == 3) { //third byte not a 0x00 bytesRead = 1; - b1 = b3; + bytes[0] = bytes[2]; + try { + bytes[1] = next(); //reads next byte to be able to decode to a character + } catch (Exception e) { + exception = (IOException) e; + } return; } else if (bytesRead == 4) { //fourth byte not a 0x00 bytesRead = 2; - b1 = b3; - b2 = b4; + bytes[0] = bytes[2]; + bytes[1] = bytes[3]; return; } } @@ -117,25 +120,23 @@ private int next() throws IOException { @Override public final int read() throws IOException { - if (bytesRead > 0) { - int out = b1; - if (bytesRead == 2) { - out = b2; - } else if (bytesRead == 3) { - out = b3; - } else if (bytesRead == 4) { - out = b4; - } + if (bytesRead > 0 && bytesRead > consumed) { + int out = bytes[consumed]; // Ensures that if the original input stream returned a byte, it will be consumed. // In case of exceptions, bytes produced prior to the exception will still be returned. // Once the last byte has been consumed, the original exception will be thrown. - if (--bytesRead == 0 && exception != null) { + if (++consumed == bytesRead && exception != null) { throw exception; } return out; } - return input.read(); + if (consumed == bytesRead) { + consumed++; + return -1; + } + + throw new BytesProcessedNotification(input, encoding); } /** @@ -179,4 +180,24 @@ public final Charset getCharset() { public final String getEncoding() { return encoding; } + + /** + * Internal notification exception used to re-wrap the original {@link InputStream} into a {@link Reader}. + * This is required for performance reasons as overriding {@link InputStream#read()} incurs a heavy performance + * penalty when the implementation is native (as in {@link FileInputStream#read()}. + */ + public static final class BytesProcessedNotification extends RuntimeException { + public final InputStream input; + public final String encoding; + + public BytesProcessedNotification(InputStream input, String encoding) { + this.input = input; + this.encoding = encoding; + } + + @Override + public Throwable fillInStackTrace() { + return this; + } + } } diff --git a/src/main/java/com/univocity/parsers/common/input/DefaultCharInputReader.java b/src/main/java/com/univocity/parsers/common/input/DefaultCharInputReader.java index 451f0ca7..1a7abd3f 100755 --- a/src/main/java/com/univocity/parsers/common/input/DefaultCharInputReader.java +++ b/src/main/java/com/univocity/parsers/common/input/DefaultCharInputReader.java @@ -23,7 +23,6 @@ * A default CharInputReader which only loads batches of characters when requested by the {@link AbstractCharInputReader} through the {@link DefaultCharInputReader#reloadBuffer} method. * * @author uniVocity Software Pty Ltd - parsers@univocity.com - * */ public class DefaultCharInputReader extends AbstractCharInputReader { @@ -31,8 +30,9 @@ public class DefaultCharInputReader extends AbstractCharInputReader { /** * Creates a new instance with the mandatory characters for handling newlines transparently. Line separators will be detected automatically. + * * @param normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) that is used to replace any lineSeparator sequence found in the input. - * @param bufferSize the buffer size used to store characters read from the input. + * @param bufferSize the buffer size used to store characters read from the input. * @param whitespaceRangeStart starting range of characters considered to be whitespace. */ public DefaultCharInputReader(char normalizedLineSeparator, int bufferSize, int whitespaceRangeStart) { @@ -42,9 +42,10 @@ public DefaultCharInputReader(char normalizedLineSeparator, int bufferSize, int /** * Creates a new instance with the mandatory characters for handling newlines transparently. - * @param lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()} + * + * @param lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()} * @param normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) that is used to replace any lineSeparator sequence found in the input. - * @param bufferSize the buffer size used to store characters read from the input. + * @param bufferSize the buffer size used to store characters read from the input. * @param whitespaceRangeStart starting range of characters considered to be whitespace. */ public DefaultCharInputReader(char[] lineSeparator, char normalizedLineSeparator, int bufferSize, int whitespaceRangeStart) { @@ -77,6 +78,9 @@ public void reloadBuffer() { super.length = reader.read(buffer, 0, buffer.length); } catch (IOException e) { throw new IllegalStateException("Error reading from input", e); + } catch (BomInput.BytesProcessedNotification notification) { + stop(); + unwrapInputStream(notification); } } } diff --git a/src/main/java/com/univocity/parsers/common/input/concurrent/ConcurrentCharInputReader.java b/src/main/java/com/univocity/parsers/common/input/concurrent/ConcurrentCharInputReader.java index 7a1d54ea..4a305025 100755 --- a/src/main/java/com/univocity/parsers/common/input/concurrent/ConcurrentCharInputReader.java +++ b/src/main/java/com/univocity/parsers/common/input/concurrent/ConcurrentCharInputReader.java @@ -24,18 +24,16 @@ * A concurrent CharInputReader that loads batches of characters in a separate thread and assigns them to buffer in {@link AbstractCharInputReader} when requested. * *

This class loads "buckets" of characters in the background and provides them sequentially to the {@link ConcurrentCharInputReader#buffer} - * attribute in {@link AbstractCharInputReader}. + * attribute in {@link AbstractCharInputReader}. *

The bucket loading process will block and wait while all buckets are full. *

Similarly, the reader will block while all buckets are empty. * * This CharInputReader implementation provides a better throughput than {@link DefaultCharInputReader} when reading large inputs ({@code > 100 mb}). * + * @author uniVocity Software Pty Ltd - parsers@univocity.com * @see CharInputReader * @see ConcurrentCharLoader * @see CharBucket - * - * @author uniVocity Software Pty Ltd - parsers@univocity.com - * */ public class ConcurrentCharInputReader extends AbstractCharInputReader { @@ -45,10 +43,11 @@ public class ConcurrentCharInputReader extends AbstractCharInputReader { /** * Creates a new instance with the mandatory characters for handling newlines transparently. Line separators will be detected automatically. + * * @param normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) - * that is used to replace any lineSeparator sequence found in the input. - * @param bucketSize the size of an each individual "bucket" used to store characters read from the input. - * @param bucketQuantity the number of "buckets" to load in memory. Note the reader will stop if all buckets are full. + * that is used to replace any lineSeparator sequence found in the input. + * @param bucketSize the size of an each individual "bucket" used to store characters read from the input. + * @param bucketQuantity the number of "buckets" to load in memory. Note the reader will stop if all buckets are full. * @param whitespaceRangeStart starting range of characters considered to be whitespace. */ public ConcurrentCharInputReader(char normalizedLineSeparator, int bucketSize, int bucketQuantity, int whitespaceRangeStart) { @@ -59,11 +58,12 @@ public ConcurrentCharInputReader(char normalizedLineSeparator, int bucketSize, i /** * Creates a new instance with the mandatory characters for handling newlines transparently. - * @param lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()} + * + * @param lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()} * @param normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) - * that is used to replace any lineSeparator sequence found in the input. - * @param bucketSize the size of an each individual "bucket" used to store characters read from the input. - * @param bucketQuantity the number of "buckets" to load in memory. Note the reader will stop if all buckets are full. + * that is used to replace any lineSeparator sequence found in the input. + * @param bucketSize the size of an each individual "bucket" used to store characters read from the input. + * @param bucketQuantity the number of "buckets" to load in memory. Note the reader will stop if all buckets are full. * @param whitespaceRangeStart starting range of characters considered to be whitespace. */ public ConcurrentCharInputReader(char[] lineSeparator, char normalizedLineSeparator, int bucketSize, int bucketQuantity, int whitespaceRangeStart) { @@ -81,6 +81,12 @@ public void stop() { if (bucketLoader != null) { bucketLoader.stopReading(); bucketLoader.reportError(); + + if(bucketLoader.notification != null){ + BomInput.BytesProcessedNotification notification = bucketLoader.notification; + bucketLoader = null; + unwrapInputStream(notification); + } } } diff --git a/src/main/java/com/univocity/parsers/common/input/concurrent/ConcurrentCharLoader.java b/src/main/java/com/univocity/parsers/common/input/concurrent/ConcurrentCharLoader.java index 2188978f..68269f28 100755 --- a/src/main/java/com/univocity/parsers/common/input/concurrent/ConcurrentCharLoader.java +++ b/src/main/java/com/univocity/parsers/common/input/concurrent/ConcurrentCharLoader.java @@ -16,6 +16,7 @@ package com.univocity.parsers.common.input.concurrent; import com.univocity.parsers.common.*; +import com.univocity.parsers.common.input.*; import java.io.*; import java.util.concurrent.*; @@ -40,6 +41,7 @@ class ConcurrentCharLoader implements Runnable { private final Reader reader; private final Thread activeExecution; private Exception error; + BomInput.BytesProcessedNotification notification; /** * Creates a {@link FixedInstancePool} with a given amount of {@link CharBucket} instances and starts a thread to fill each one. @@ -91,6 +93,9 @@ public void run() { } } catch (InterruptedException e) { Thread.currentThread().interrupt(); + } catch (BomInput.BytesProcessedNotification e) { + finished = true; + notification = e; } catch (Exception e) { finished = true; error = e; @@ -99,6 +104,8 @@ public void run() { } } + + /** * Returns the next available bucket. Blocks until a bucket is made available or the reading process stops. * diff --git a/src/test/java/com/univocity/parsers/issues/github/Github_154.java b/src/test/java/com/univocity/parsers/issues/github/Github_154.java index b7d0adf8..614afb42 100644 --- a/src/test/java/com/univocity/parsers/issues/github/Github_154.java +++ b/src/test/java/com/univocity/parsers/issues/github/Github_154.java @@ -80,6 +80,7 @@ public void readWithBom(boolean extractFromBom, String encoding, byte[] prepend) parserSettings.setLineSeparatorDetectionEnabled(true); parserSettings.setHeaderExtractionEnabled(true); parserSettings.setSkipEmptyLines(false); + parserSettings.setReadInputOnSeparateThread(false); final CsvParser parser = new CsvParser(parserSettings); @@ -94,9 +95,19 @@ public void readWithBom(boolean extractFromBom, String encoding, byte[] prepend) bytes = newBytes; } - parser.parse(new ByteArrayInputStream(bytes), encoding); - final List actual = rowProcessor.getBeans(); - - assertEquals(actual.get(0).email, "dev@univocity.com"); + parser.beginParsing(new ByteArrayInputStream(bytes), encoding); + String[] row = parser.parseNext(); + parser.stopParsing(); + + if(prepend != null && prepend[prepend.length -1] == ' '){ + assertEquals(parser.getContext().headers()[0], " Email"); + assertEquals(row[0], "dev@univocity.com"); + + } else { + assertEquals(parser.getContext().headers()[0], "Email"); + assertEquals(row[0], "dev@univocity.com"); + final List actual = rowProcessor.getBeans(); + assertEquals(actual.get(0).email, "dev@univocity.com"); + } } } diff --git a/src/test/java/com/univocity/parsers/issues/github/Github_176.java b/src/test/java/com/univocity/parsers/issues/github/Github_176.java new file mode 100644 index 00000000..96c8273b --- /dev/null +++ b/src/test/java/com/univocity/parsers/issues/github/Github_176.java @@ -0,0 +1,52 @@ +/******************************************************************************* + * Copyright 2017 uniVocity Software Pty Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package com.univocity.parsers.issues.github; + +import com.univocity.parsers.csv.*; +import org.testng.annotations.*; + +import java.io.*; + +/** + * From: https://github.com/uniVocity/univocity-parsers/issues/165 + * + * @author uniVocity Software Pty Ltd - dev@univocity.com + */ +public class Github_176 { + + @DataProvider + Object[][] threadProvider() { + return new Object[][]{ + {true}, + {false} + }; + } + + @Test(enabled = false, dataProvider = "threadProvider") + public void testPerformance(boolean readInputOnSeparateThread) { + CsvParserSettings s = new CsvParserSettings(); + s.setReadInputOnSeparateThread(readInputOnSeparateThread); + CsvParser p = new CsvParser(s); + + String path = System.getProperty("user.home") + File.separator + "dev" + File.separator + "data" + File.separator + "Sample-Spreadsheet-500000-rows.csv"; + + for (int i = 0; i < 10; i++) { + long start = System.currentTimeMillis(); + p.parse(new File(path)); + System.out.println(p.getContext().currentLine() + " rows parsed in " + (System.currentTimeMillis() - start) + " ms"); + } + } +}