diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 25ac2c9f6b2..92467ac63d3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -73,6 +73,12 @@ private ExecConstants() { public static final String TEMP_DIRECTORIES = "drill.exec.tmp.directories"; public static final String TEMP_FILESYSTEM = "drill.exec.tmp.filesystem"; public static final String INCOMING_BUFFER_IMPL = "drill.exec.buffer.impl"; + + public static final String EXCHANGE_BATCH_SIZE = "drill.exec.memory.operator.exchange_batch_size"; + // Exchange Batch Size in Bytes. + public static final LongValidator EXCHANGE_BATCH_SIZE_VALIDATOR = new RangeLongValidator(EXCHANGE_BATCH_SIZE, 1024, 10 * 1024 * 1024, + new OptionDescription("Limit, in bytes, of the outgoing batch sent by an Exchange Sender.")); + /** incoming buffer size (number of batches) */ public static final String INCOMING_BUFFER_SIZE = "drill.exec.buffer.size"; public static final String SPOOLING_BUFFER_DELETE = "drill.exec.buffer.spooling.delete"; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java index a586bbe2b1e..7318544e0be 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java @@ -66,9 +66,6 @@ public LogicalExpression getExpr() { return expr; } - public int getOutgoingBatchSize() { - return outgoingBatchSize; - } @Override public T accept(PhysicalVisitor physicalVisitor, X value) throws E { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 1abc3d804b5..bb14ea1898c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -582,6 +582,15 @@ public WritableBatch getWritableBatch() { return WritableBatch.get(this); } + @Override + public WritableBatch getWritableBatch(int startIndex, int length) { + VectorContainer partialContainer = new VectorContainer(context.getAllocator(), getSchema()); + container.transferOut(partialContainer, startIndex, length); + partialContainer.setRecordCount(length); + final WritableBatch batch = WritableBatch.get(partialContainer); + return batch; + } + @Override public void close() throws Exception { container.clear(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SenderMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SenderMemoryManager.java new file mode 100644 index 00000000000..33f3fee172a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SenderMemoryManager.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl; + +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordBatchMemoryManager; +import org.apache.drill.exec.record.RecordBatchSizer; + +public class SenderMemoryManager extends RecordBatchMemoryManager { + private RecordBatch incomingBatch; + private int batchSizeLimit; + + public SenderMemoryManager(int batchSizeLimit, RecordBatch incomingBatch) { + super(batchSizeLimit); + this.batchSizeLimit = batchSizeLimit; + this.incomingBatch = incomingBatch; + } + + public void update() { + RecordBatchSizer batchSizer = new RecordBatchSizer(incomingBatch); + setRecordBatchSizer(batchSizer); + int outputRowCount = computeOutputRowCount(batchSizeLimit, batchSizer.getNetRowWidth()); + outputRowCount = Math.min(outputRowCount, batchSizer.rowCount()); + setOutputRowCount(outputRowCount); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 55e7cd5e558..00f317e45dd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -20,6 +20,7 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.ops.AccountingDataTunnel; import org.apache.drill.exec.ops.ExecutorFragmentContext; @@ -31,8 +32,10 @@ import org.apache.drill.exec.record.FragmentWritableBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; +import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ControlsInjectorFactory; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; public class SingleSenderCreator implements RootCreator{ @@ -55,6 +58,7 @@ public static class SingleSenderRootExec extends BaseRootExec { private int recMajor; private volatile boolean ok = true; private volatile boolean done = false; + private SenderMemoryManager memoryManager; public enum Metric implements MetricDef { BYTES_SENT; @@ -78,6 +82,8 @@ public SingleSenderRootExec(RootFragmentContext context, RecordBatch batch, Sing .build(); tunnel = context.getDataTunnel(config.getDestination()); tunnel.setTestInjectionControls(injector, context.getExecutionControls(), logger); + int exchangeBatchSizeLimit = (int) context.getOptions().getOption(ExecConstants.EXCHANGE_BATCH_SIZE_VALIDATOR); + this.memoryManager = new SenderMemoryManager(exchangeBatchSizeLimit, incoming); } @Override @@ -118,16 +124,28 @@ public boolean innerNext() { case OK_NEW_SCHEMA: case OK: - final FragmentWritableBatch batch = new FragmentWritableBatch( - false, handle.getQueryId(), handle.getMajorFragmentId(), - handle.getMinorFragmentId(), recMajor, oppositeHandle.getMinorFragmentId(), - incoming.getWritableBatch().transfer(oContext.getAllocator())); - updateStats(batch); - stats.startWait(); - try { - tunnel.sendRecordBatch(batch); - } finally { - stats.stopWait(); + memoryManager.update(); + // manager ensures that sizedBatchRowCount <= incoming recordcount + int sizedBatchRowCount = memoryManager.getOutputRowCount(); + int incomingRowCount = incoming.getRecordCount(); + if (sizedBatchRowCount == incomingRowCount) { + doSend(incoming.getWritableBatch().transfer(oContext.getAllocator())); + return true; + } + Preconditions.checkState(sizedBatchRowCount != 0); + int batchesToSend = incomingRowCount / sizedBatchRowCount; + int tailRowCount = incomingRowCount % sizedBatchRowCount; + int batchCount = 0; + + while(batchCount < batchesToSend) { + int start = batchCount * sizedBatchRowCount; + final WritableBatch writableBatch = incoming.getWritableBatch(start, sizedBatchRowCount); + doSend(writableBatch.transfer(oContext.getAllocator())); + batchCount++; + } + if (tailRowCount != 0) { + int start = batchCount * sizedBatchRowCount; + doSend(incoming.getWritableBatch(start, tailRowCount).transfer(oContext.getAllocator())); } return true; @@ -137,6 +155,19 @@ public boolean innerNext() { } } + private void doSend(WritableBatch writableBatch) { + final FragmentWritableBatch batch = new FragmentWritableBatch( + false, handle.getQueryId(), handle.getMajorFragmentId(), + handle.getMinorFragmentId(), recMajor, oppositeHandle.getMinorFragmentId(), writableBatch); + updateStats(batch); + stats.startWait(); + try { + tunnel.sendRecordBatch(batch); + } finally { + stats.stopWait(); + } + } + public void updateStats(FragmentWritableBatch writableBatch) { stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java index 822a810d162..820bdbe4e09 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java @@ -107,6 +107,15 @@ public Iterator> iterator() { @Override public BatchSchema getSchema() { return schema; } + @Override + public WritableBatch getWritableBatch(int startIndex, int length) { + VectorContainer partialContainer = new VectorContainer(context.getAllocator(), getSchema()); + container.transferOut(partialContainer, startIndex, length); + partialContainer.setRecordCount(length); + final WritableBatch batch = WritableBatch.get(partialContainer); + return batch; + } + @Override public WritableBatch getWritableBatch() { return WritableBatch.get(this); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 97f4b96de18..29891e9654d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -838,4 +838,13 @@ public void dump() { container, outgoingPosition, Arrays.toString(incomingBatches), Arrays.toString(batchOffsets), Arrays.toString(tempBatchHolder), Arrays.toString(inputCounts), Arrays.toString(outputCounts)); } + + @Override + public WritableBatch getWritableBatch(int startIndex, int length) { + VectorContainer partialContainer = new VectorContainer(context.getAllocator(), getSchema()); + outgoingContainer.transferOut(partialContainer, startIndex, length); + partialContainer.setRecordCount(length); + final WritableBatch batch = WritableBatch.get(partialContainer); + return batch; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java index e0beab14409..0db0c7c5988 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java @@ -115,6 +115,11 @@ public VectorWrapper getValueAccessorById(Class clazz, int... ids) { return batchAccessor.getValueAccessorById(clazz, ids); } + @Override + public WritableBatch getWritableBatch(int startIndex, int length) { + throw new UnsupportedOperationException(); + } + @Override public WritableBatch getWritableBatch() { return batchAccessor.getWritableBatch(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index a6ebef0e621..0434cc7a19a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -218,6 +218,11 @@ public IterOutcome next() { } } + @Override + public WritableBatch getWritableBatch(int startIndex, int length) { + return batchLoader.getWritableBatch(startIndex, length); + } + @Override public WritableBatch getWritableBatch() { return batchLoader.getWritableBatch(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index 5c70f5d2ecf..48a67ff55c2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -353,6 +353,13 @@ public WritableBatch getWritableBatch() { return incoming.getWritableBatch(); } + @Override + public WritableBatch getWritableBatch(int startIndex, int length) { + validateReadState("getWritableBatch()"); + return incoming.getWritableBatch(startIndex, length); + } + + @Override public void close() { // (Log construction and close() calls at same logging level to bracket diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index cb790918a47..dcdbee293f0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -252,6 +252,15 @@ public WritableBatch getWritableBatch() { } + @Override + public WritableBatch getWritableBatch(int startIndex, int length) { + VectorContainer partialContainer = new VectorContainer(context.getAllocator(), getSchema()); + container.transferOut(partialContainer, startIndex, length); + partialContainer.setRecordCount(length); + final WritableBatch batch = WritableBatch.get(partialContainer); + return batch; + } + @Override public VectorContainer getOutgoingContainer() { throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java index 16d7e3a1633..76473271c1d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java @@ -159,6 +159,23 @@ public void transfer(VectorWrapper destination) { } } + /** + * Transfer vectors to destination HyperVectorWrapper. + * Both this and destination must be of same type and have same number of vectors. + * @param destination destination HyperVectorWrapper. + */ + @Override + public void transferPartial(VectorWrapper destination, int startIndex, int length) { + Preconditions.checkArgument(destination instanceof HyperVectorWrapper); + Preconditions.checkArgument(getField().getType().equals(destination.getField().getType())); + Preconditions.checkArgument(vectors.length == ((HyperVectorWrapper)destination).vectors.length); + + final ValueVector[] destionationVectors = ((HyperVectorWrapper)destination).vectors; + for (int i = 0; i < vectors.length; ++i) { + vectors[i].makeTransferPair(destionationVectors[i]).splitAndTransfer(startIndex, length); + } + } + /** * Method to replace existing list of vectors with the newly provided ValueVectors list in this HyperVectorWrapper * @param vv - New list of ValueVectors to be stored diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index 7473c8caf2b..be27dcbc74a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -321,6 +321,12 @@ public boolean isError() { */ WritableBatch getWritableBatch(); + /** + * Gets a writable version of this batch. Takes over ownership of existing + * buffers. + */ + WritableBatch getWritableBatch(int startIndex, int length); + /** * Perform dump of this batch's state to logs. */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java index cd3a22f6175..af4229b598e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java @@ -262,6 +262,14 @@ public WritableBatch getWritableBatch(){ return WritableBatch.getBatchNoHVWrap(valueCount, container, isSV2); } + public WritableBatch getWritableBatch(int startIndex, int length) { + VectorContainer partialContainer = new VectorContainer(allocator, getSchema()); + container.transferOut(partialContainer, startIndex, length); + partialContainer.setRecordCount(length); + final WritableBatch batch = WritableBatch.get(partialContainer); + return batch; + } + @Override public Iterator> iterator() { return this.container.iterator(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java index e4278ba8861..8b81f3354a7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java @@ -95,6 +95,12 @@ public WritableBatch getWritableBatch() { this.getClass().getCanonicalName())); } + @Override + public WritableBatch getWritableBatch(int start, int length) { + throw new UnsupportedOperationException(String.format("You should not call getWritableBatch() for class %s", + this.getClass().getCanonicalName())); + } + @Override public Iterator> iterator() { return null; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java index c588f25f812..cc064d3d987 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java @@ -88,6 +88,11 @@ public WritableBatch getWritableBatch() { throw new UnsupportedOperationException(); } + @Override + public WritableBatch getWritableBatch(int start, int length) { + throw new UnsupportedOperationException(); + } + @Override public Iterator> iterator() { return container.iterator(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java index 4c416799c94..9a977993eae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java @@ -109,6 +109,13 @@ public void transfer(VectorWrapper destination) { vector.makeTransferPair(((SimpleVectorWrapper)destination).vector).transfer(); } + @Override + public void transferPartial(VectorWrapper destination, int startIndex, int length) { + Preconditions.checkArgument(destination instanceof SimpleVectorWrapper); + Preconditions.checkArgument(getField().getType().equals(destination.getField().getType())); + vector.makeTransferPair(((SimpleVectorWrapper)destination).vector).splitAndTransfer(startIndex, length); + } + @Override public String toString() { if (vector == null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 05d951089e9..4ff86aae46e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -131,6 +131,16 @@ public void transferOut(VectorContainer containerOut) { } } + /** + * Transfer vectors from this to containerOut + */ + public void transferOut(VectorContainer containerOut, int startIndex, int length) { + Preconditions.checkArgument(this.wrappers.size() == containerOut.wrappers.size()); + for (int i = 0; i < this.wrappers.size(); ++i) { + this.wrappers.get(i).transferPartial(containerOut.wrappers.get(i), startIndex, length); + } + } + public T addOrGet(MaterializedField field) { return addOrGet(field, null); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java index 8d95701460d..0a83c0faed5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java @@ -35,6 +35,8 @@ public interface VectorWrapper { public VectorWrapper cloneAndTransfer(BufferAllocator allocator); public VectorWrapper getChildWrapper(int[] ids); public void transfer(VectorWrapper destination); + public void transferPartial(VectorWrapper destination, int startIndex, int length); + /** * Traverse the object graph and determine whether the provided SchemaPath matches data within the Wrapper. If so, return a TypedFieldId associated with this path. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index e201e2686e1..8ed1441447c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -266,6 +266,7 @@ public static CaseInsensitiveMap createDefaultOptionDefinition new OptionDefinition(ExecConstants.CPU_LOAD_AVERAGE), new OptionDefinition(ExecConstants.ENABLE_VECTOR_VALIDATOR), new OptionDefinition(ExecConstants.ENABLE_ITERATOR_VALIDATOR), + new OptionDefinition(ExecConstants.EXCHANGE_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)), new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)), new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)), new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)), diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java index 5487d95088e..ecc780cb95c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java @@ -36,11 +36,9 @@ private enum BufferState { } protected interface BufferQueue { - void addOomBatch(RawFragmentBatch batch); RawFragmentBatch poll() throws IOException, InterruptedException; RawFragmentBatch take() throws IOException, InterruptedException; RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws InterruptedException, IOException; - boolean checkForOutOfMemory(); int size(); boolean isEmpty(); void add(T obj); @@ -56,7 +54,6 @@ protected interface BufferQueue { public BaseRawBatchBuffer(final FragmentContext context, final int fragmentCount) { bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE); - this.fragmentCount = fragmentCount; this.streamCounter = fragmentCount; this.context = context; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java index 50f582dfa38..327b2a3595c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java @@ -94,13 +94,6 @@ private class SpoolingBufferQueue implements BufferQueue buffer = Queues.newLinkedBlockingDeque(); - @Override - public void addOomBatch(RawFragmentBatch batch) { - RawFragmentBatchWrapper batchWrapper = new RawFragmentBatchWrapper(batch, true); - batchWrapper.setOutOfMemory(true); - buffer.addFirst(batchWrapper); - } - @Override public RawFragmentBatch poll() throws IOException, InterruptedException { RawFragmentBatchWrapper batchWrapper = buffer.poll(); @@ -124,11 +117,6 @@ public RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws Interrupted return null; } - @Override - public boolean checkForOutOfMemory() { - return buffer.peek().isOutOfMemory(); - } - @Override public int size() { return buffer.size(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java index 0d36d5d4083..3ca1cad9ef6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java @@ -30,24 +30,17 @@ public class UnlimitedRawBatchBuffer extends BaseRawBatchBuffer { private final LinkedBlockingDeque buffer = Queues.newLinkedBlockingDeque(); - @Override - public void addOomBatch(RawFragmentBatch batch) { - buffer.addFirst(batch); - } - @Override public RawFragmentBatch poll() throws IOException { RawFragmentBatch batch = buffer.poll(); @@ -73,11 +66,6 @@ public RawFragmentBatch poll(long timeout, TimeUnit timeUnit) throws Interrupted return batch; } - @Override - public boolean checkForOutOfMemory() { - return context.getAllocator().isOverLimit(); - } - @Override public int size() { return buffer.size(); diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 5443eea5fb3..30abf54371e 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -474,6 +474,7 @@ drill.exec.options: { drill.exec.storage.implicit.fqn.column.label: "fqn", drill.exec.storage.implicit.suffix.column.label: "suffix", drill.exec.testing.controls: "{}", + drill.exec.memory.operator.exchange_batch_size : 1048576, # 512 KB (524288) drill.exec.memory.operator.output_batch_size : 16777216, # 16 MB drill.exec.memory.operator.output_batch_size_avail_mem_factor : 0.1, exec.bulk_load_table_list.bulk_size: 1000, diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java index 94e0c0ec7c6..16c15a9b32d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java @@ -272,6 +272,11 @@ public WritableBatch getWritableBatch() { throw new UnsupportedOperationException("MockRecordBatch doesn't support gettingWritableBatch yet"); } + @Override + public WritableBatch getWritableBatch(int start, int length) { + throw new UnsupportedOperationException("MockRecordBatch doesn't support gettingWritableBatch yet"); + } + @Override public Iterator> iterator() { return container.iterator(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java index 0be22db3e4e..1c9dda6efee 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestUnionExchange.java @@ -18,16 +18,23 @@ package org.apache.drill.exec.physical.impl; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.util.List; import org.apache.drill.categories.OperatorTest; import org.apache.drill.common.util.DrillFileUtils; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.client.DrillClient; import org.apache.drill.exec.pop.PopUnitTestBase; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.RecordBatchSizer; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.rpc.user.QueryDataBatch; import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.RemoteServiceSet; +import org.apache.drill.exec.vector.BigIntVector; import org.junit.Test; import org.apache.drill.shaded.guava.com.google.common.base.Charsets; @@ -43,15 +50,15 @@ public void twoBitTwoExchangeTwoEntryRun() throws Exception { RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try (Drillbit bit1 = new Drillbit(CONFIG, serviceSet); - Drillbit bit2 = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + Drillbit bit2 = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { bit1.run(); bit2.run(); client.connect(); List results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL, - Files.asCharSource(DrillFileUtils.getResourceAsFile("/sender/union_exchange.json"), - Charsets.UTF_8).read()); + Files.asCharSource(DrillFileUtils.getResourceAsFile("/sender/union_exchange.json"), + Charsets.UTF_8).read()); int count = 0; for (QueryDataBatch b : results) { if (b.getHeader().getRowCount() != 0) { @@ -63,4 +70,76 @@ public void twoBitTwoExchangeTwoEntryRun() throws Exception { } } + // Test UnionExchange BatchSizing + @Test + public void twoBitUnionExchangeRunBatchSizing() throws Exception { + twoBitUnionExchangeRunBatchSizingImpl(64 * 1024); + twoBitUnionExchangeRunBatchSizingImpl(512 * 1024); + twoBitUnionExchangeRunBatchSizingImpl(1024 * 1024); + } + + /** + * This function takes in the batchSizeLimit and configures two Drillbits with that batch limit. + * Then runs a plan that includes a UnionExchange over some mock data. + * The test checks for exchange batch size conformity and also checks the integrity of the exchanged data. + * @param batchSizeLimit + * @throws Exception + */ + public void twoBitUnionExchangeRunBatchSizingImpl(int batchSizeLimit) throws Exception { + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + try (Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + Drillbit bit2 = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + + bit1.run(); + bit2.run(); + client.connect(); + bit1.getContext().getOptionManager().setLocalOption(ExecConstants.EXCHANGE_BATCH_SIZE, batchSizeLimit); + bit2.getContext().getOptionManager().setLocalOption(ExecConstants.EXCHANGE_BATCH_SIZE, batchSizeLimit); + + List results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL, + Files.asCharSource(DrillFileUtils.getResourceAsFile("/sender/union_exchange_batch_sizing.json"), + Charsets.UTF_8).read()); + int count = 0; + final RecordBatchLoader loader = new RecordBatchLoader(client.getAllocator()); + boolean even = true; + for (QueryDataBatch b : results) { //iterate over each incoming batch + int recordCount = b.getHeader().getRowCount(); + if (recordCount != 0) { + // convert QueryDataBatch to a VectorContainer + loader.load(b.getHeader().getDef(), b.getData()); + final VectorContainer container = loader.getContainer(); + container.setRecordCount(recordCount); + // check if the size is within the batchSizeLimit + RecordBatchSizer recordBatchSizer = new RecordBatchSizer(container); + assertTrue(recordBatchSizer.getNetBatchSize() <= batchSizeLimit); + // get col 1 vector. This is a mocked BigInt, as defined in the json file. + // this col has Long.MIN_VALUE for even rows and Long.MAX_VALUE for odd rows + VectorWrapper wrapper = container.getValueVector(1); + BigIntVector bigIntVector = (BigIntVector) (wrapper.getValueVector()); + for (int i = 0; i < recordCount; i++) { + long value = bigIntVector.getAccessor().get(i); + // row0 is guaranteed to be Long.MIN_VALUE (even) only in the original mock data + // batch splitting can cause row0 to be Long.MAX_VALUE, so it has to be checked + if (i == 0) { + even = (value == Long.MIN_VALUE); + } + // check if the values alternate + if (even) { + assertEquals(value, Long.MIN_VALUE); + } else { + assertEquals(value, Long.MAX_VALUE); + } + even = !even; + } + count += recordCount; + loader.clear(); + } + b.release(); + } + // check if total row count received is the same as the + // count in the mock definition + assertEquals(300000, count); + } + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java index aefa28a0c13..d53ced6a8e7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java @@ -153,6 +153,10 @@ public IterOutcome next() { return null; } + @Override public WritableBatch getWritableBatch(int start, int length) { + return null; + } + public List getResultList() { return resultList; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java index b26d760d1e1..e076f63f917 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java @@ -45,7 +45,7 @@ @Category({SlowTest.class}) public class TestSimpleExternalSort extends DrillTest { @Rule - public final TestRule TIMEOUT = TestTools.getTimeoutRule(160_000); + public final TestRule TIMEOUT = TestTools.getTimeoutRule(180_000); @Rule public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java index 44ea06a8b3a..a5c47de946d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java @@ -117,6 +117,11 @@ public WritableBatch getWritableBatch() { return null; } + @Override + public WritableBatch getWritableBatch(int start, int length) { + return null; + } + @Override public Iterator> iterator() { return null; diff --git a/exec/java-exec/src/test/resources/sender/union_exchange_batch_sizing.json b/exec/java-exec/src/test/resources/sender/union_exchange_batch_sizing.json new file mode 100644 index 00000000000..a841db956e6 --- /dev/null +++ b/exec/java-exec/src/test/resources/sender/union_exchange_batch_sizing.json @@ -0,0 +1,32 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-scan", + url: "http://apache.org", + entries:[ + {records: 300000, types: [ + {name: "blue", type: "INT", mode: "REQUIRED"}, + {name: "red", type: "BIGINT", mode: "REQUIRED"} + ]} + ] + }, + { + @id: 2, + child: 1, + pop: "union-exchange" + }, + { + @id: 3, + child: 2, + pop: "screen" + } + ] +} \ No newline at end of file