diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java index e26179f216c..b5811203d30 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java @@ -51,8 +51,7 @@ public class DruidBatchRecordReader implements ManagedReader { private final List columns; private final DruidFilter filter; private final DruidQueryClient druidQueryClient; - - private BigInteger nextOffset = BigInteger.ZERO; + private final DruidOffsetTracker offsetTracker; private int maxRecordsToRead = -1; private JsonLoaderBuilder jsonBuilder; private JsonLoaderImpl jsonLoader; @@ -64,13 +63,14 @@ public DruidBatchRecordReader(DruidSubScan subScan, DruidSubScanSpec subScanSpec, List projectedColumns, int maxRecordsToRead, - DruidStoragePlugin plugin) { + DruidStoragePlugin plugin, DruidOffsetTracker offsetTracker) { this.columns = new ArrayList<>(); this.maxRecordsToRead = maxRecordsToRead; this.plugin = plugin; this.scanSpec = subScanSpec; this.filter = subScanSpec.getFilter(); this.druidQueryClient = plugin.getDruidQueryClient(); + this.offsetTracker = offsetTracker; } @Override @@ -118,10 +118,6 @@ public void close() { jsonLoader.close(); jsonLoader = null; } - - if (! nextOffset.equals(BigInteger.ZERO)) { - nextOffset = BigInteger.ZERO; - } } private String getQuery() throws JsonProcessingException { @@ -135,7 +131,7 @@ private String getQuery() throws JsonProcessingException { scanSpec.dataSourceName, columns, filter, - nextOffset, + offsetTracker.getOffset(), queryThreshold, scanSpec.getMinTime(), scanSpec.getMaxTime() @@ -144,6 +140,7 @@ private String getQuery() throws JsonProcessingException { } private void setNextOffset(DruidScanResponse druidScanResponse) { - nextOffset = nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size())); + //nextOffset = nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size())); + offsetTracker.setNextOffset(BigInteger.valueOf(druidScanResponse.getEvents().size())); } } diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java new file mode 100644 index 00000000000..da15b309cdc --- /dev/null +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.druid; + +import java.math.BigInteger; + +public class DruidOffsetTracker { + private BigInteger nextOffset; + + public DruidOffsetTracker() { + this.nextOffset = BigInteger.ZERO; + } + + public BigInteger getOffset() { + return nextOffset; + } + + public void setNextOffset(BigInteger offset) { + nextOffset = nextOffset.add(offset); + } +} diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java index 01be10dc676..1aa6893204d 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java @@ -75,12 +75,13 @@ private ScanFrameworkBuilder createBuilder(OptionManager options, DruidSubScan s private static class DruidReaderFactory implements ReaderFactory { private final DruidSubScan subScan; - + private final DruidOffsetTracker offsetTracker; private final Iterator scanSpecIterator; public DruidReaderFactory(DruidSubScan subScan) { this.subScan = subScan; this.scanSpecIterator = subScan.getScanSpec().listIterator(); + this.offsetTracker = new DruidOffsetTracker(); } @Override @@ -92,7 +93,7 @@ public void bind(ManagedScanFramework framework) { public ManagedReader next() { if (scanSpecIterator.hasNext()) { DruidSubScanSpec scanSpec = scanSpecIterator.next(); - return new DruidBatchRecordReader(subScan, scanSpec, subScan.getColumns(), subScan.getMaxRecordsToRead(), subScan.getStorageEngine()); + return new DruidBatchRecordReader(subScan, scanSpec, subScan.getColumns(), subScan.getMaxRecordsToRead(), subScan.getStorageEngine(), offsetTracker); } return null; }