Skip to content

Commit

Permalink
improve robustness of XMLReaderUtils
Browse files Browse the repository at this point in the history
(cherry picked from commit 6305da4)
  • Loading branch information
tballison committed Oct 25, 2024
1 parent 2bb4624 commit 11b2cd1
Showing 1 changed file with 70 additions and 77 deletions.
147 changes: 70 additions & 77 deletions tika-core/src/main/java/org/apache/tika/utils/XMLReaderUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ public void fatalError(SAXParseException exception) throws SAXException {
};
private static final String JAXP_ENTITY_EXPANSION_LIMIT_KEY = "jdk.xml.entityExpansionLimit";
//TODO: figure out if the rw lock is any better than a simple lock
private static final ReentrantReadWriteLock SAX_READ_WRITE_LOCK = new ReentrantReadWriteLock();
private static final ReentrantReadWriteLock DOM_READ_WRITE_LOCK = new ReentrantReadWriteLock();
//these lock the pool arrayblocking queues so that there isn't a race condition
//of trying to acquire a parser while the pool is being resized
private static final ReentrantReadWriteLock SAX_POOL_LOCK = new ReentrantReadWriteLock();
private static final ReentrantReadWriteLock DOM_POOL_LOCK = new ReentrantReadWriteLock();
private static final AtomicInteger POOL_GENERATION = new AtomicInteger();
private static final EntityResolver IGNORING_SAX_ENTITY_RESOLVER =
(publicId, systemId) -> new InputSource(new StringReader(""));
Expand Down Expand Up @@ -399,7 +401,11 @@ public static Document buildDOM(InputStream is, ParseContext context)
PoolDOMBuilder poolBuilder = null;
if (builder == null) {
poolBuilder = acquireDOMBuilder();
builder = poolBuilder.getDocumentBuilder();
if (poolBuilder != null) {
builder = poolBuilder.getDocumentBuilder();
} else {
builder = getDocumentBuilder();
}
}

try {
Expand Down Expand Up @@ -517,7 +523,11 @@ public static void parseSAX(InputStream is, ContentHandler contentHandler, Parse
PoolSAXParser poolSAXParser = null;
if (saxParser == null) {
poolSAXParser = acquireSAXParser();
saxParser = poolSAXParser.getSAXParser();
if (poolSAXParser != null) {
saxParser = poolSAXParser.getSAXParser();
} else {
saxParser = getSAXParser();
}
}
try {
saxParser.parse(is, new OfflineContentHandler(contentHandler));
Expand Down Expand Up @@ -549,7 +559,11 @@ public static void parseSAX(Reader reader, ContentHandler contentHandler, ParseC
PoolSAXParser poolSAXParser = null;
if (saxParser == null) {
poolSAXParser = acquireSAXParser();
saxParser = poolSAXParser.getSAXParser();
if (poolSAXParser != null) {
saxParser = poolSAXParser.getSAXParser();
} else {
saxParser = getSAXParser();
}
}
try {
saxParser.parse(new InputSource(reader), new OfflineContentHandler(contentHandler));
Expand All @@ -561,48 +575,32 @@ public static void parseSAX(Reader reader, ContentHandler contentHandler, ParseC
}

/**
* Acquire a SAXParser from the pool. Make sure to
* Acquire a DOMBuilder from the pool. Make sure to
* {@link #releaseDOMBuilder(PoolDOMBuilder)} in
* a <code>finally</code> block every time you call this.
*
* @return a DocumentBuilder
* @return a DocumentBuilder or null if no DOMBuilders are available
* @throws TikaException
*/
private static PoolDOMBuilder acquireDOMBuilder() throws TikaException {
int waiting = 0;
long lastWarn = -1;
while (true) {
PoolDOMBuilder builder = null;
DOM_READ_WRITE_LOCK.readLock().lock();
try {
builder = DOM_BUILDERS.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new TikaException("interrupted while waiting for DOMBuilder", e);
} finally {
DOM_READ_WRITE_LOCK.readLock().unlock();
}
if (builder != null) {
return builder;
}
if (lastWarn < 0 || System.currentTimeMillis() - lastWarn > 1000) {
//avoid spamming logs
LOG.warn("Contention waiting for a DOMParser. " +
"Consider increasing the XMLReaderUtils.POOL_SIZE");
lastWarn = System.currentTimeMillis();
}
waiting++;

if (waiting > 3000) {
//freshen the pool. Something went very wrong...
setPoolSize(POOL_SIZE);
//better to get an exception than have permahang by a bug in one of our parsers
throw new TikaException("Waited more than 5 minutes for a DocumentBuilder; " +
"This could indicate that a parser has not correctly released its " +
"DocumentBuilder. " +
"Please report this to the Tika team: [email protected]");
PoolDOMBuilder builder = null;
DOM_POOL_LOCK
.readLock()
.lock();
try {
builder = DOM_BUILDERS.poll();
} finally {
DOM_POOL_LOCK
.readLock()
.unlock();
}
if (builder == null) {
LOG.warn("Contention waiting for a DOMBuilder. " +
"Consider increasing the XMLReaderUtils.POOL_SIZE");

}
}
return builder;
}

/**
Expand All @@ -619,7 +617,8 @@ private static void releaseDOMBuilder(PoolDOMBuilder builder) {
} catch (UnsupportedOperationException e) {
//ignore
}
DOM_READ_WRITE_LOCK.readLock().lock();
DOM_POOL_LOCK
.readLock().lock();
try {
//if there are extra parsers (e.g. after a reset of the pool to a smaller size),
// this parser will not be added and will then be gc'd
Expand All @@ -631,7 +630,8 @@ private static void releaseDOMBuilder(PoolDOMBuilder builder) {
"'acquire' than to 'release'");
}
} finally {
DOM_READ_WRITE_LOCK.readLock().unlock();
DOM_POOL_LOCK
.readLock().unlock();
}
}

Expand All @@ -640,42 +640,29 @@ private static void releaseDOMBuilder(PoolDOMBuilder builder) {
* {@link #releaseParser(PoolSAXParser)} in
* a <code>finally</code> block every time you call this.
*
* @return a SAXParser
* @return a SAXParser or null if a parser is not available
* @throws TikaException
*/
private static PoolSAXParser acquireSAXParser() throws TikaException {
int waiting = 0;
long lastWarn = -1;
while (true) {
PoolSAXParser parser = null;
SAX_READ_WRITE_LOCK.readLock().lock();
try {
parser = SAX_PARSERS.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new TikaException("interrupted while waiting for SAXParser", e);
} finally {
SAX_READ_WRITE_LOCK.readLock().unlock();
}
if (parser != null) {
return parser;
}
if (lastWarn < 0 || System.currentTimeMillis() - lastWarn > 1000) {
//avoid spamming logs
LOG.warn("Contention waiting for a SAXParser. " +
"Consider increasing the XMLReaderUtils.POOL_SIZE");
lastWarn = System.currentTimeMillis();
}
waiting++;
if (waiting > 3000) {
//freshen the pool. Something went very wrong...
setPoolSize(POOL_SIZE);
//better to get an exception than have permahang by a bug in one of our parsers
throw new TikaException("Waited more than 5 minutes for a SAXParser; " +
"This could indicate that a parser has not correctly released its " +
"SAXParser. Please report this to the Tika team: [email protected]");
PoolSAXParser parser = null;

}
//this locks around the pool so that there's
//no race condition with it being resized
SAX_POOL_LOCK
.readLock()
.lock();
try {
parser = SAX_PARSERS.poll();
} finally {
SAX_POOL_LOCK
.readLock()
.unlock();
}
if (parser == null) {
LOG.warn("Contention waiting for a SAXParser. " +
"Consider increasing the XMLReaderUtils.POOL_SIZE");
}
return parser;
}

/**
Expand All @@ -694,7 +681,8 @@ private static void releaseParser(PoolSAXParser parser) {
if (parser.getGeneration() != POOL_GENERATION.get()) {
return;
}
SAX_READ_WRITE_LOCK.readLock().lock();
SAX_POOL_LOCK
.readLock().lock();
try {
//if there are extra parsers (e.g. after a reset of the pool to a smaller size),
// this parser will not be added and will then be gc'd
Expand All @@ -706,7 +694,8 @@ private static void releaseParser(PoolSAXParser parser) {
"than to 'release'");
}
} finally {
SAX_READ_WRITE_LOCK.readLock().unlock();
SAX_POOL_LOCK
.readLock().unlock();
}
}

Expand Down Expand Up @@ -834,7 +823,8 @@ public static void setPoolSize(int poolSize) throws TikaException {
//the read locking in case one thread resizes the pool when the
//parsers have already started. We could have an NPE on SAX_PARSERS
//if we didn't lock.
SAX_READ_WRITE_LOCK.writeLock().lock();
SAX_POOL_LOCK
.writeLock().lock();
try {
//free up any resources before emptying SAX_PARSERS
for (PoolSAXParser parser : SAX_PARSERS) {
Expand All @@ -852,18 +842,21 @@ public static void setPoolSize(int poolSize) throws TikaException {
}
}
} finally {
SAX_READ_WRITE_LOCK.writeLock().unlock();
SAX_POOL_LOCK
.writeLock().unlock();
}

DOM_READ_WRITE_LOCK.writeLock().lock();
DOM_POOL_LOCK
.writeLock().lock();
try {
DOM_BUILDERS.clear();
DOM_BUILDERS = new ArrayBlockingQueue<>(poolSize);
for (int i = 0; i < poolSize; i++) {
DOM_BUILDERS.offer(new PoolDOMBuilder(POOL_GENERATION.get(), getDocumentBuilder()));
}
} finally {
DOM_READ_WRITE_LOCK.writeLock().unlock();
DOM_POOL_LOCK
.writeLock().unlock();
}
POOL_SIZE = poolSize;
}
Expand Down

0 comments on commit 11b2cd1

Please sign in to comment.