-
Notifications
You must be signed in to change notification settings - Fork 1
Runtime Operator Protocol
Drill's execution model consists of a large variety of runtime operators assembled by the planner to implement a specific SQL query. Operators use the Volcano model: each is an iterator that returns records from a call to a next()
method. The devil, as they say, is in the details. We explore those details here.
Each operator implements some relational operator: scan, sort, join, broadcast, etc. The Volcano iterator approach provides a unified iterator model implemented by all operators. No matter whether the operator reads data, sorts, or performs aggregations, it implements the same basic iterator model. Here we focus on that iterator model, not on the unique behavior of each operator.
Each operator has a Creator
class. For example, FilterRecordBatch
has a FilterBatchCreator
. The creator builds the operator. The operator constructor performs one-time initial setup. The key fact to understand is that the constructor does not have visibility to the schema of the data, hence the constructor cannot do setup that requires this information. Most operators therefore have some internal state (sometimes explicit as an enum, sometimes implicit via some variable) to know that they are in the "have not yet seen a schema" state.
As [Runtime Model|described elsewhere], operators are implemented in a bit of a non-intuitive way. The term "operator" in Drill is what we might call an "operator definition": the information that describes the operator. The actual runtime operator is called a "record batch" in Drill. All such runtime operators derive from RecordBatch. The basic (highly simplified) protocol is:
public interface RecordBatch {
public static enum IterOutcome { NONE, OK_NEW_SCHEMA, OK, STOP };
public IterOutcome next();
public BatchSchema getSchema();
public VectorContainer getOutgoingContainer();
}
The actual code is somewhat more complex, but contains thorough comments that you should read for the details.
The heart of the protocol is the next()
method. The theory in Volcano is simple: each call returns a record until all records are read. In Drill, the operation is a bit more complex because operators return batches of records (as value vectors), not individual records. Drill also allows the schema to vary as the query runs, and handles error cases. This results in the protocol explained here.
First, note that next()
returns a variety of exit codes:
-
OK_NEW_SCHEMA
: Returned a schema (and optionally a record batch). Returned (ideally) each time the schema from this call tonext()
differs from that of previous calls. -
OK
: Returned a record batch (which always includes a schema). The schema is the same as that from the previous call. -
DONE
: No data returned, end of data. Equivalent to an EOF from a file reader. -
STOP
: Error condition: stop processing.
The FragmentExecutor
runs the fragment which consists of a tree of operators, one of which is the root. The fragment executor calls next()
on the root fragment to start execution.
while (shouldContinue() && root.next()) {
// loop
}
The next()
call propagates down the tree (the order is highly dependent on the particular type of operator). For any given operator, it will eventually see a first call to next()
.
That first call to next()
completes initialization:
- Call
next()
on the input to get a first batch. - Initialize the present operator based on the returned schema.
- Process the record batch.
That is, the first next()
both initializes and processes records the same way that subsequent next()
calls will.
At this point the operator does not know the data schema. Therefore, the operator must call next()
on its own input in order to get the first batch. (That call may, in turn, cascade down the operator tree until it reaches a leaf: a scanner or a network receiver.)
The operator now must consider what to do based on the return value from next()
. For example:
-
OK
: Indicates that the child (input) operator returned a batch of records (along with a schema.) Since this is the first batch, the present operator must usually do some form of setup which often involves generating code based on the schema. -
OK_NEW_SCHEMA
: In theory, the input should return theOK_NEW_SCHEMA
status each time the schema changes, including the first time. In practice, the first batch seems to be returned (for some operators) as simplyOK
. Operators contain code to handle this ambiguity. -
DONE
: It could be that the query has no data at all: as scanner read an empty file, a filter removed all records, etc. In this case, the very first call to the inputnext()
can returnDONE
, indicating that no data is available. -
STOP
: Indicates that an error occurred and that the operator should stop processing and exit.