-
Notifications
You must be signed in to change notification settings - Fork 1
EVF Tutorial Row Batch Reader
Prior to the EVF, Easy format plugins were based on the original scan operator called (confusingly) ScanBatch
. Traditional plugins and record readers were structured around the design of ScanBatch
. For example, at the start of execution, the plugin creates a set of record readers which are passed to the scan batch.
EVF introduced a new scan operator framework based on the idea of composition: rather than having a single complex ScanBatch
class, EVF is made up of a collection of smaller classes each of which focuses on one task. Your plugin simply configures EVF to do what you want. Just as ScanBatch
guided the construction of traditional format plugins, so does EVF guide the structure of new plugins; but, we hope, in a simpler, more flexible way. (And, in a way that supports control of memory and the use of a provided schema.)
The next step is to create the new batch reader discussed previously. We'll do so as an "orphan" class so that the plugin continues to use the old record reader. We'll then swap in the new one when we're read. The tutorial assumes you create a new file and copy over methods and fields as needed. In the end, you'll have both a traditional and an EVF reader: you can even add an option to choose one or the other at runtime.
Create a new class:
public class LogBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
Now might be a good time to check out the ManagedReader
interface. There are only three methods to implement and doing so will be easier if we are familiar with the interface.
The FileSchemaNegotiator
is a new concept: we will use it to learn about the file we are to scan, and to define the schema of our file as we learn the schema on read.
Copy the existing constructor:
public LogRecordReader(FragmentContext context, DrillFileSystem dfs,
FileWork fileWork, List<SchemaPath> columns, String userName,
LogFormatConfig formatConfig) {
Modify it to remove the arguments not needed with the EVF:
public LogBatchReader(FileSplit split,
LogFormatConfig formatConfig) {
this.split = split;
this.formatConfig = formatConfig;
this.maxErrors = formatConfig.getMaxErrors();
if (maxErrors < 0) {
throw UserException
.validationError()
.message("Max Errors must be a positive integer greater than zero.")
.build(logger);
}
}
The context
, columns
, dfs
and userName
are handled by EVF for us. EVF gives us a FileSplit
instead of a FileWork
object.
Next, let's implement the open()
method. Create a stub:
@Override
public boolean open(FileSchemaNegotiator negotiator) {
return false;
}
The caller gives us a "schema negotiator" which we'll discuss later. The method returns true
if there is data to read, false
if we can tell immediately that there is no data available. Errors are indicated by unchecked exceptions, preferably in the form of a UserException
that Drill can forward to the user with a clear description of the issue.
To figure out what we need to do, let's look at the existing LogRecordReader
implementation:
@Override
public void setup(final OperatorContext context, final OutputMutator output) {
this.outputMutator = output;
setupPattern();
openFile();
setupProjection();
defineVectors();
}
Of the above, we only need the first two methods, so let's copy them over, along with the variables that they require.
@Override
public boolean open(FileSchemaNegotiator negotiator) {
setupPattern();
openFile(negotiator);
return true;
}
The openFile()
method is adjusted to get context information (the Drill File System, the user name) from the schema negotiator:
private void openFile(FileSchemaNegotiator negotiator) {
InputStream in;
try {
in = negotiator.fileSystem().open(fileWork.getPath());
} catch (Exception e) {
throw UserException
.dataReadError(e)
.message("Failed to open open input file: %s", fileWork.getPath())
.addContext("User name", negotiator.userName())
.build(logger);
}
reader = new BufferedReader(new InputStreamReader(in, Charsets.UTF_8));
}
We omitted the setupProjection()
and defineVectors()
methods from the old version. This gets us to the key difference between "old school" and EVF: EVF handles projection and vector creation for us; we just have to tell it what we want. Here'w how the old code handled the wildcard (*, project all) case:
private void projectAll() {
List<String> fields = formatConfig.getFieldNames();
for (int i = fields.size(); i < capturingGroups; i++) {
fields.add("field_" + i);
}
columns = new ColumnDefn[capturingGroups];
for (int i = 0; i < capturingGroups; i++) {
columns[i] = makeColumn(fields.get(i), i);
}
}
Under EVF, the framework itself handles projection, but we need to tell it the columns we can provide. We can reuse the above logic to help us. We must:
- Create a
TupleMetadata
schema of the available columns. We can use theSchemaBuilder
to help. (There are two classes of that name, be sure to use the correct one.) - Adjust the
ColumnDefn
to use the EVFColumnWriter
objects.
The log reader uses a ColumnDefn
class to define a column. We'll reuse these classes with minor changes. Let's start by changing how we define columns:
private abstract static class ColumnDefn {
...
public abstract void define(OutputMutator outputMutator) throws SchemaChangeException;
}
Replace the above method with:
public abstract void define(SchemaBuilder builder);
Here is how we define the "reader schema" (the schema that the reader can provide):
@Override
public boolean open(FileSchemaNegotiator negotiator) {
...
negotiator.setTableSchema(defineSchema(), true);
openFile(negotiator);
return true;
}
private TupleMetadata defineSchema() {
...
columns = new ColumnDefn[capturingGroups];
SchemaBuilder builder = new SchemaBuilder();
for (int i = 0; i < capturingGroups; i++) {
columns[i] = makeColumn(fields.get(i), i);
columns[i].define(builder);
}
return builder.buildSchema();
}
The true
argument to setTableSchema()
says that the schema is "complete": we won't be discovering any new columns as the read proceeds. (Other readers might know of no columns at open time and discover all at read time, others may do a combination.)
We can reuse the existing makeColumn()
method as-is, so let's just copy it across. The IDE will complain that none of the column definition classes exist; we'll fix that later.
We've told the schema negotiator our schema. We can now as the EVF to build the result set loader that will handle all the grunt tasks of creating batches:
private ResultSetLoader loader;
@Override
public boolean open(FileSchemaNegotiator negotiator) {
...
negotiator.setTableSchema(defineSchema(), true);
loader = negotiator.build();
Now we can modify the column definition to hold onto the column writer instead of a mutator:
private abstract static class ColumnDefn {
...
private ScalarWriter colWriter;
...
public void bind(TupleWriter rowWriter) {
colWriter = rowWriter.scalar(index);
}
The EVF uses a set of JSON-like writers. TupleWriter
manages the entire row. (The same writer handles maps, which is why it is called a "tuple" writer.)
From a tuple we can reference column writers by either name or position. Since the ColumnDefn
knows its column index, we just use that value.
The log writer only works with "scalars": strings, numbers, dates. We store the scalar column writer on the ColumnDefn
base class for convenience.
We now need something to call our new bind()
method:
@Override
public boolean open(FileSchemaNegotiator negotiator) {
...
loader = negotiator.build();
bindColumns(loader.writer());
...
private void bindColumns(RowSetLoader writer) {
for (int i = 0; i < capturingGroups; i++) {
columns[i].bind(writer);
}
}
We are now ready to tackle the changes to the concrete column definition classes. Let's start with the VarCharDefn
class. Here is the existing implementation:
private static class VarCharDefn extends ColumnDefn {
private NullableVarCharVector.Mutator mutator;
public VarCharDefn(String name, int index) {
super(name, index);
}
@Override
public void define(OutputMutator outputMutator) throws SchemaChangeException {
MaterializedField field = MaterializedField.create(getName(),
Types.optional(MinorType.VARCHAR));
mutator = outputMutator.addField(field, NullableVarCharVector.class).getMutator();
}
@Override
public void load(int rowIndex, String value) {
byte[] bytes = value.getBytes();
mutator.setSafe(rowIndex, bytes, 0, bytes.length);
}
}
Changes:
- Remove the
define()
method. - Add the new
getSchema()
method. - Change
load()
to use the column writer. - Remove the unused mutator.
Here is the new version:
private static class VarCharDefn extends ColumnDefn {
public VarCharDefn(String name, int index) {
super(name, index);
}
@Override
public void define(SchemaBuilder builder) {
builder.addNullable(getName(), MinorType.VARCHAR);
}
@Override
public void load(int rowIndex, String value) {
colWriter.setString(value);
}
}
We use the setString()
method of the writer to set the string. Note that we no longer need to specify the row position; the EVF tracks that for us. Later, we can go ahead and remove the rowIndex
argument.
The class is getting to be pretty light-weight. We could even remove it completely. But, to keep things simple, let's just keep it for now.
Next, we make the same changes for the other column defns; not shown here for brevity. You can see the new form in [this branch](need link) NEED LINK. Some things to note:
- The
TINYINT
,SMALLINT
andINT
types all use thesetInt()
method to set values. - The
FLOAT4
andFLOAT8
types use thesetDouble()
method to set values. -
DATE
,TIME
andTIMESTAMP
can use Joda (not Java 8 date/time) objects orsetLong()
to set values.
Regardless of the Java type used to set the value, the underlying vector type is the one you request.
With this, we've handled the tasks needed to open our batch reader (and have jumped ahead on a few others.)
Our next step is to convert the code that handles each batch. The next()
method on the batch reader asks us to read the next batch.
Here is the old version:
@Override
public int next() {
rowIndex = 0;
while (nextLine()) {
}
return rowIndex;
}
The new version must add a number of items:
- Use the row writer to start and end each row (so that the column writers know where to write their values within the underlying vectors.)
- Ask the result set loader if we can add more rows. (The result set loader will detect when the batch is full, so we don't have to maintain our own count-based batch size.)
- Return
true
if there is more to read,false
if no more rows are available. The return value is forward looking: it tells the EVF whether to call thenext()
method again. On the last batch, we'll typically load rows into the batch, then returnfalse
to indicate that, after this batch, we have nothing more to offer.
Here is the revised version:
@Override
public boolean next() {
RowSetLoader rowWriter = loader.writer();
while (! rowWriter.isFull()) {
if (! nextLine(rowWriter)) {
return false;
}
}
return true;
}
The existing nextLine()
method is reused, with the key change that the method returns true
if it read a row, false
if it hit EOF. We remove the existing logic to count rows against a maximum.
The reader is responsible for telling EVF when to create a row. In normal cases, the reader does this once per row. In odd cases, such as the log plugin, we might omit some input rows. In other case, a single input row might give rise to multiple batch rows.
The normal sequence is:
RowSetLoader rowWriter = loader.writer();
rowWriter.start();
// Use column writers to write to the vectors
rowWriter.save();
The log reader, as noted, is tricky. It saves a row in two cases:
- The row matches.
- The row does not match, and the user wants to capture the unmatched row.
We need to make a number of changes. Here is the original version:
private boolean nextLine() {
String line;
try {
line = reader.readLine();
} catch (IOException e) {
throw ...
}
if (line == null) {
return false;
}
Matcher lineMatcher = pattern.matcher(line);
if (lineMatcher.matches()) {
loadVectors(lineMatcher);
return rowIndex < BATCH_SIZE;
}
errorCount++;
if (errorCount < maxErrors) {
logger.warn("Unmatached line: {}", line);
} else if (errorCount > maxErrors) {
throw ...
}
//If the user asked for the unmatched columns display them
if (unmatchedRows) {
//If the user asked for the unmatched columns AND other columns
if (columns.length > 1) {
columns[unmatchedColumnIndex].load(rowIndex, line);
rowIndex++;
return rowIndex < BATCH_SIZE;
} else {
//If the user ONLY asked for the unmatched columns
columns[unmatchedColumnIndex].load(unmatchedRowIndex, line);
unmatchedRowIndex++;
rowIndex = unmatchedRowIndex;
return unmatchedRowIndex < BATCH_SIZE;
}
}
return true;
}
There is no need to maintain a row index; EVF does that for us. The log reader benefits from a lineNumber
counter, however, for use in error messages since the presence of non-matching lines means that the row count, maintained by EVF, is not the same as the line number.
After we've implemented the needed changes (and introduced a flag we'll describe later), we get:
private boolean nextLine(RowSetLoader rowWriter) {
String line;
try {
line = reader.readLine();
} catch (IOException e) {
throw ...
}
if (line == null) {
return false;
}
lineNumber++;
Matcher lineMatcher = pattern.matcher(line);
if (lineMatcher.matches()) {
// Load matched row into vectors.
rowWriter.start();
loadVectors(lineMatcher);
rowWriter.save();
return true;
}
errorCount++;
if (errorCount < maxErrors) {
logger.warn("Unmatached line: {}", line);
} else {
throw ...
}
... // TBD later
return true;
}
This is not quite done: we've ignored the two special columns for now.
The existing loadVectors()
method is also reused, but is simplified. Previously, the record writer had to handle projection: create vectors only for the projected columns, and save values only to those columns. With EVF, all this is hidden: the EVF creates column writers for all of the columns we declare in the schema (and only for those columns.) If a column is unprojected, the EVF gives us a "dummy" writer that we use just as if it were a real witer. The EVF handles projection of any columns not defined in our schema.
Here is the revised implementation:
private void loadVectors(Matcher m) {
for (int i = 0; i < columns.length; i++) {
String value = m.group(columns[i].index + 1);
if (value != null) {
columns[i].load(0, value);
}
}
}
The log reader has a number of special columns; we'll handle those later.
Finally, we round out the core of our batch reader by copying the close method from the record reader. As in prior versions, ensure that all resources are released in close. After this call, the EVF will forget about your batch reader, allowing the JVM to garbage collect it.