Skip to content

Commit

Permalink
Added offset tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
cgivre committed Jan 1, 2024
1 parent ad37574 commit fca094b
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ public class DruidBatchRecordReader implements ManagedReader<SchemaNegotiator> {
private final List<String> columns;
private final DruidFilter filter;
private final DruidQueryClient druidQueryClient;

private BigInteger nextOffset = BigInteger.ZERO;
private final DruidOffsetTracker offsetTracker;
private int maxRecordsToRead = -1;
private JsonLoaderBuilder jsonBuilder;
private JsonLoaderImpl jsonLoader;
Expand All @@ -64,13 +63,14 @@ public DruidBatchRecordReader(DruidSubScan subScan,
DruidSubScanSpec subScanSpec,
List<SchemaPath> projectedColumns,
int maxRecordsToRead,
DruidStoragePlugin plugin) {
DruidStoragePlugin plugin, DruidOffsetTracker offsetTracker) {
this.columns = new ArrayList<>();
this.maxRecordsToRead = maxRecordsToRead;
this.plugin = plugin;
this.scanSpec = subScanSpec;
this.filter = subScanSpec.getFilter();
this.druidQueryClient = plugin.getDruidQueryClient();
this.offsetTracker = offsetTracker;
}

@Override
Expand Down Expand Up @@ -118,10 +118,6 @@ public void close() {
jsonLoader.close();
jsonLoader = null;
}

if (! nextOffset.equals(BigInteger.ZERO)) {
nextOffset = BigInteger.ZERO;
}
}

private String getQuery() throws JsonProcessingException {
Expand All @@ -135,7 +131,7 @@ private String getQuery() throws JsonProcessingException {
scanSpec.dataSourceName,
columns,
filter,
nextOffset,
offsetTracker.getOffset(),
queryThreshold,
scanSpec.getMinTime(),
scanSpec.getMaxTime()
Expand All @@ -144,6 +140,7 @@ private String getQuery() throws JsonProcessingException {
}

private void setNextOffset(DruidScanResponse druidScanResponse) {
nextOffset = nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size()));
//nextOffset = nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size()));
offsetTracker.setNextOffset(BigInteger.valueOf(druidScanResponse.getEvents().size()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.druid;

import java.math.BigInteger;

public class DruidOffsetTracker {
private BigInteger nextOffset;

public DruidOffsetTracker() {
this.nextOffset = BigInteger.ZERO;
}

public BigInteger getOffset() {
return nextOffset;
}

public void setNextOffset(BigInteger offset) {
nextOffset = nextOffset.add(offset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ private ScanFrameworkBuilder createBuilder(OptionManager options, DruidSubScan s

private static class DruidReaderFactory implements ReaderFactory {
private final DruidSubScan subScan;

private final DruidOffsetTracker offsetTracker;
private final Iterator<DruidSubScanSpec> scanSpecIterator;

public DruidReaderFactory(DruidSubScan subScan) {
this.subScan = subScan;
this.scanSpecIterator = subScan.getScanSpec().listIterator();
this.offsetTracker = new DruidOffsetTracker();
}

@Override
Expand All @@ -92,7 +93,7 @@ public void bind(ManagedScanFramework framework) {
public ManagedReader<? extends SchemaNegotiator> next() {
if (scanSpecIterator.hasNext()) {
DruidSubScanSpec scanSpec = scanSpecIterator.next();
return new DruidBatchRecordReader(subScan, scanSpec, subScan.getColumns(), subScan.getMaxRecordsToRead(), subScan.getStorageEngine());
return new DruidBatchRecordReader(subScan, scanSpec, subScan.getColumns(), subScan.getMaxRecordsToRead(), subScan.getStorageEngine(), offsetTracker);
}
return null;
}
Expand Down

0 comments on commit fca094b

Please sign in to comment.