-
Notifications
You must be signed in to change notification settings - Fork 1
Toward a Workable Dynamic Schema Model
Proposed is a general solution to the schema conflicts which can arise in Drill's dynamic schema model, with emphasis on the source of those conflicts: the scan operator.
The SQL language leverages the schema of a database to provide very simple
syntax for working with data. The expression SELECT a + b AS c FROM t
is
simple because the SQL planner can work out that a
and b
come from table
t
, which gives the types of t.a
and b
, and from
there work out the type of column c
. Typical query engine planners do so by
consulting a catalog: a listing of tables and their columns.
The planner does column resolution and type propagation at plan time. In a typical query engine, the execution engine receives a plan with all type information fully specified. In Impala, for example, this allows the planner to provide a low-level instructions for working with each data type.
Drill's primary innovation is to defer column resolution and type propagation to run time, just after Drill sees the first rows of data. While this is very convenient in the big data world, it does introduce certain challenges which we address here.
In a typical database, the catalog describes not just the schema that we expect to see during a query, but rather the generative rules used to create the data. That is, this same schema was used to create database tables, so it is, by definition, the correct schema for querying those tables.
Drill operates in world where data creation happens in some other system. Drill does not have access to the generative rules used to write, say, a JSON, CSV or Parquet files. Instead, drill uses a sampling process to infer schema from a small set of samples, often from a single row.
As with any statistical process that uses too-small a sample size, a sample size of one can lead to problems when that first row is not a good predictor of all subsequent rows.
For example, when reading JSON, the first occurrence of a column sets the type of
that column or all subsequent rows. If that first value is 10
, Drill will
infer the type is INT
, even if the very next row turns out to contain 10.1
,
which is a DOUBLE
. Further, if a column appears infrequently, then the
first sample may not see the column, and so Drill remains ignorant of the column
until it appears later.
Drill is distributed, a typical query runs many scan fragments each of which
will perform its own inference. With ambiguous data, different scans can infer
different schemas. Later, as those schemas converge on some downstream operator,
the operator sees a schema conflict: one scan thought column a
is INT
,
another thought it was DOUBLE
. Worse, one saw column b
as a MAP
, another
never saw column b
and guessed Nullable INT
.
SQL's dependence on a defined schema means that SQL can be far simpler than a full programming language, and so SQL is not designed for, nor well suited for, resolving schema conflicts.
The result is that the sampling approach produces the same answers as the generative approach when data is unambiguous (a set of Parquet files, say, all with exactly the same schema.) The sampling approach fails, however, when data is ambiguous and so leads different scan operators to infer different schemas. Since SQL is not designed to handle messy data, the user ends up frustrated with a query that won't work.
Further, Drill's clients are typically JDBC or ODBC: clients that expect a
generative model: they can obtain a schema at the start of a query and expect all
rows to follow that schema. Even if schema conflicts could be resolved (column
a
starts as INT
, later changes to DOUBLE
in the same query), Drill's clients
cannot handle such schema changes.
This is the problem we wish to address.
Each operator in a query DAG produces or consumes a tuple schema. Scan operators produce an output schema, the screen operator consumes an input schema, and all others produce a schema and consume one or more input schemas. The schema produced by one operator is the schema consumed by the next operator. That is:
Upstream Operator --> Schema --> Downstream Operator
Classic database query engines use a defined schema. The planner computes the schemas at plan time, allowing the run-time engine to, say, generate code against a known in-memory data layout.
Drill is unique in that it supports a dynamic schema: the schema is partially defined at plan time, with some aspects left open until runtime.
If a query uses a wildcard (SELECT *
), then the entire schema is defined
at run time. If a query uses explicit projection (SELECT a, b, c
) then
the column names are defined at plan time, the column type is determined
at run time.
Moving forward we want Drill to support any combination of defined or dynamic schema. A defined schema allows the planner to do more of the work currently done at run time, allowing queries to be more efficient. (The planner does the work once rather than fragments repeating the work many times.)
The relationship between a defined and dynamic schema is simple. The defined schema is what the dynamic schema will turn out to be at runtime. We can compute the defined schema if we have available at plan time the information (which in Drill) is normally available only at runtime.
As noted in the introduction, the use of dynamic schemas, coupled with a small sample size, leads to schema conflicts, such as the examples cited above.
After many years of tinkering, the time ha some to recognize that there is no general, principled solution to a schema conflict. Neither Drill nor clients can predict the future to know how to create a schema that works for all rows in the above case.
The lesson is that the dynamic schema concept works when the data source produces a consistent schema, it leads to ambiguities if the source itself is ambiguous (as is the case with schema evolution, carelessly written JSON files, etc.)
The question we want to answer is: how do we continue to reap the benefits of a dynamic-schema approach, while providing the means to resolve conflicts in a principled way when they occur due to inevitable ambiguities of runtime schema sampling?
Although Drill cannot predict, when reading data, what data will appear in
the future, the user can and should. The user may know facts such as
"the column a
should be treated as DOUBLE
even if it looks like an INT
."
Or, "column b
was added later. When reading files without b
, assume b
will be a Boolean
with the value true
."
A defined schema also helps recover information lost during encoding into
a file format. For example, if the file is a CSV file, then all data is just
text. A defined schema can tell us that, say, column a
should be decoded a
a DOUBLE
.
A defined schema simply states the intent of the file (or other data source): what data, in what format, when into the file. If the file is unambiguous, the defined schema is redundant; the information can be recovered at run time from the data source. However, if the source is ambiguous (or variable) the defined schema allows Drill to predict the future because we know the generative function that produced the data.
Formally, a defined schema is one that is fully defined: no wildcards, no "types to be defined later." Every operator produces a defined schema at runtime as its output schema. A defined schema from the planner is of the same form.
Our goal is to allow Drill to work with both dynamic and defined schemas, whichever gets the job done for a particular use case.
At present, Drill allows a schema change operation, which results in the ambiguities presented above. As it turns out, most clients and operators can't even handle a schema change. The concept of a schema change is not even valid in relational theory. As a result, the premise upon which relational algebra is based breaks down if the schema can change. Relational algebra is timeless: it is a set of relations. With schema change, relational algebra reduces to a set of sequential algorithms (if the operator is in state X, and the schema changes in form A, then convert existing data to form B and move to state Y.)
After years of trying, it is clear that handling schema changes is impossible in general, and very hard when it is possible. Further, it is not clear if schema changes are even helpful. Because they have not worked, users cannot really use them, and there have been no complaints.
The time has come to face reality: SQL does not work when the schema shifts underneath relational operators. Drill must define the semantics it supports, then enforce those semantics.
Since Drill should support dynamic schema, we can define a set of rules which do so in a semantically-meaningful way. The gist of the solution is to allow the dynamic schema as defined above: the planner provides an incomplete schema at plan time which operators (primarily the scan) flushes out at runtime. The key change is that we define a schema commit point: the point at which the dynamic schema resolves to a defined schema. After the commit point, the schema cannot change.
The schema commit point design turns the dynamic schema idea into a normal relational solution. Drill's innovation is to perform schema commit at run time rather than plan time.
This gives us the following schema lifecycle:
- The planner defines a dynamic (partial) schema.
- The scan gathers information to resolve the dynamic schema to a defined schema.
- Before delivering the first batch of data, the scan commits to a defined schema which it reports to its downstream operator.
- The downstream operator, having a committed input schema, computes its own output schema and commits it to its own downstream operator. And so on.
- Once the scan delivers the first batch of data, the query schema is committed and fixed. It will not change.
- The scan deals with variations in its inputs by ignoring new columns or by failing.
- Each scan operates independently. If each scan returns the same schema, all is fine. However, if different scans return different schemas, then an exchange receiver will see differing schemas. The receiver will fail the query in this situation.
- The user receives an error message which states the cause of the conflict. The user provides schema information to resolve the conflict and reruns the query, which should now succeed.
The above captures the good points of Drill (dynamic schema) and avoids the bad parts (fruitless efforts to deal with schema changes, complex code which tries, and fails, to deal with schema changes, obscure failures for users, no solution for users to resolve the issues.)
With a provided schema, Drill can move to the schema-commit model while still embracing dynamic schemas for the cases where all scans agree on their dynamic output schemas.
Note that if the planner were to produce a fully defined schema, it would simply move the schema commit point forward: from just before the first batch to instead occur at plan time. This is a nice unification of the defined and dynamic schema models.
We propose to formalize how schemas operate in the scan operator in preparation for (partial) plan-time schema computation.
The Scan operator bridges from the data format of some data source to the row format of the scan's output schema. The output schema is the scan's interface to the downstream operator as shown in the diagram above. The scan typically works with some reader to implement a conversion from the source schema to the output schema.
Today the scan's output schema is dynamic. The planner provides a project list which is, essentially a list of column names. Types are "to be named later" at run time.
If Drill were to support a defined schema, the project list would evolve into a
full output schema definition: a list of (name, type) pairs. Since Drill supports
complex types, some columns (such as MAP
s) would contain internal structure.
The output schema declares the schema to be found in the record batches which the scan produces. In the current version of Drill, the project list is a dynamic schema, the output record batch implicitly defines the actual, resolved, output schema.
One improvement (already available in EVF) is to allow the scan (and eventually all operators) to declare an output schema independent of the output data from that operator. An independent output schema allows, for example, an operator to perform runtime planning and code generation using a schema description rather than concrete vectors. The output schema also allows an operator to declare a schema even if it produces no rows. (Today we use awkward "empty batches" to handle this case which have been an ongoing source of bugs.)
By declaring an output schema separately from data, we now have a three-part output schema lifecycle:
- An output schema definition, present in the execution plan, which is a dynamic schema (names only), defined schema (names and types) or a hybrid (some times are known, others are not.)
- An output schema declaration which resolves the definition into a concrete schema that describes the data the operator produces.
- Output batches: the actual data which adheres to the declared output schema.
Today, for the scan operator, the projection list is the schema definition,
the output vector container (with its BatchSchema
) is the schema declaration.
The scan output schema generally contains three components:
- Subset of columns from the input source,
- Implicit columns,
- Made-up columns for items in the project list, but not provided by either of the above two processes.
Thus, the scan output schema says little about the input source: the output schema is the result of processing on top of the input source. The output schema is the scan's contract with the rest of Drill. Of course, the planner should have produced a given output schema only because the underlying data source can provide the data needed for that schema. But, the output schema is not the input source schema.
Each data source has some implied schema. A CSV file has some number of columns; a Parquet file has some set of data columns. Drill has long inferred the input schema directly from the data. However, naive inference can lead to the ambiguities discussed above.
Each source has its own way to represent columns, and its own set of data types. The reader maps these columns to Drill using implicit mappings expressed in the reader code. A provided schema, when available, either assists with the source-to-Drill type mapping, or restates the resulting Drill schema.
Even if a source (such as Parquet) has a clear per-file schema, the per-file information cannot give Drill information about a collection of files if that collection has experienced schema evolution.
The provided schema feature in Drill helps to resolve schema ambiguity. The
provided schema can be seen as a set of hints to a reader to help decode an
input file. For CSV, since all columns are encoded as text, the provided
schema can say to interpret that text as an INT
for one column, and a
DOUBLE
for another. Similarly, for a JSON file with ambiguous textual types
such as {a: 10} {a: 10.1}
, the provided schema can identify the common
column format (such as treat the column a
as a DOUBLE
.)
The provided schema is not a per-query interpretation of data types. The
CAST
function should be used for that purposes. Nor is the provided schema
a way of selecting some columns and not others; that is the job of a view.
Rather, the provided schema gives the file format decoder information that it
cannot obtain from the file itself. The provided schema tells the decoder how
to interpret the data into Drill types, and thus presents the file schema
as converted to a Drill schema after performing the defined decode steps.
To hammer the point home: the provided schema is about the file itself; it is not about queries or how someone would like the data to be used. The provided schema states the generative function used to encode the data so that the reader can reverse that encoding and recover the original data.
We now have five forms of schema:
- Defined scan schema from the query plan (the project list in the current Drill.)
- The source format: an encoding of data to bytes on disk
- An optional provided schema which provides schema hints to augment data within the file itself.
- The internal schema created when reading the file
- The scan output schema declaration which results from resolving the defined output schema against the internal schema.
The scan and reader cooperate to perform the computation steps to produce the declared output schema from the defined output schema and the source implementation.
The steps are, essentially:
+-------------------- Query Plan --------------------+
Project List Provided Schema
| |
v v
Output Schema ---------- Projection List ------------> Reader
Definition | |
| v v
| Implicit Cols Internal Schema
| | +-------------+
+---------------------+ | |
v v v
Output Schema Resolution <---- Null Columns
|
v
Declared Output Schema
The above process occurs before the first batch of data is sent downstream. (Though, it will typically occur after the reader produces a batch of data to allow the reader to discover the schema.) The schema commit point for the scan operator occurs once the above process completes.
The EVF implements an early form of the above flow; the desire here is to formalize the concepts.
The first step is to convert from Drill's current scan query plan into an output schema definition. Our goal is that eventually this work should be done in the planner, but we must work step-by-step.
Note that the output schema definition is a step toward locking down our contract with the downstream operator. The output schema definition turns out to not be needed by the scan implementation. The output definition is, essentially, a statement of what the scan and readers will produce once they do their work. The definition is not an instruction to the readers about the work that they should perform (except if the reader can use projection to avoid reading an unneeded column.)
Today, Drill provides the scan with an ill-formed project list: a single top-level column can appear multiple times. For example:
a.b, a.c -- For a map
c[0], c[3] -- For an array
The scan operator produces a single top level column per item: a single map, a single array or (in the simple case) a single scalar. So, the first step is to convert the ill-formed project list into a schema definition. That is, for the above examples:
a.b, a.c --> a{b, c}
c[0], c[3] --> c[0, 3]
The existing RequestedTuple
and RequestedColumn
classes do this work, though
we will refine this implementation.
With this transform, we now have a list of top-level columns. We can now convert
these columns into a schema definition. The first step is to represent the project
list in a schema-like form. We have he TupleMetadata
and ColumnMetadata
classes
to represent a schema. However, the project list does not specify types. So, we
need to add two new column types:
-
WildcardColumn
- AColumnMetadata
that represents a wildcard: the output schema definition says that the scan can define the set of columns. -
DynamicColumn
- AColumnMetadta
that has a name, but no type. The dynamic column does hold projection information such as the members from the above example.
With this transform, we now have an actual schema, albeit one with dynamic aspects: a wildcard or dynamic columns. Nevertheless, we have a draft of our contract with the downstream operator. Whatever output schema we declare later, it will derive from the defined output schema.
Later sections will detail the scan internal schema process. Basically, the reader defines a schema that maps the source schema into a Drill row format, perhaps with the assistance of a provided schema.
The internal schema is constrained by the data source:
- The internal schema may contain more columns than requested by the output
schema. For example, a CSV file must read all columns sequentially, even
those not needed. (The
ResultSetLoader
helps with this.) - The internal schema may contain fewer columns than requested by the output schema. For example, requesting a column which appears in some files but not others.
Thus, the internal schema is a disjoint set from the output schema. However, where the two sets intersect, the following must hold:
- A column which appears both in the input and output schemas must have a schema compatible with the output column.
Here there are also two cases:
- If the output column definition is dynamic, the internal schema column provides the type. That type must be consistent with any qualifiers that appear in the output column definition.
- If the output column definition is fully defined (has a type), then the internal schema column must be of the same type.
The above constraints would appear to require work to make the schemas match. However, recall that, if the planner worked correctly, the output schema came from an earlier examination of the data source or its metadata. The output schema should simply be a statement of what the reader will do. Thus, in a working system, the two schemas agree by definition and so no conversion is needed (or wanted.)
The project list (output schema definition) can contain implicit columns. The reader knows nothing about these columns. Instead, a separate mechanism recognizes implicit columns and computes their values. This mechanism produces another internal schema: a small one for the implicit columns.
We stared with an output schema definition. The reader's internal schema provided some of these columns, the implicit column mechanism provided others. There may be columns that are "unclaimed" which do not match either internal schema. These are omitted columns, also known as "null columns" or "missing columns."
The scan must fulfill its output schema contract by manufacturing a column for
each omitted column. Historically Drill created a Nullable INT
column. EVF
allows the plugin to select the type. The provided schema can specify the type
and default value. In the future, if the planner provides a defined output
schema, omitted columns can obtain their type from the output schema definition.
Regardless of how the column is invented, the result is a third internal schema that resolves each omitted column.
The final step is to resolve the output schema. Each of the three internal schemas is examined and compared to the output schema definition to produce the output schema declaration and a data map. For each column:
- If the internal column matches the name of an output column, the output column is resolved using the type of the internal column. A map entry is created from internal to output column.
- Otherwise, if the output schema contains a wildcard, the column is inserted at the position of the wildcard, and mapped as above.
- Otherwise, the column is marked for removal. (This means that the column does not appear in the output schema and its data is simply discarded. A goal of the scan operator is to minimize the number of times this outcome is used.)
The result is a set of maps:
(defined output col --> internal col)
(internal col --> trash | declared output col)
Once all three internal schemas are processed, there should be no unmatched output columns. The three internal schemas are concrete (no dynamic columns.) The output schema is not resolved to the declared schema and so is also concrete. Concrete schemas are a tuple: columns have both a name and position. So, the final output is a map that says what to do with each column from each of the three internal schemas:
((source, position) --> trash, output posn)}
As noted earlier, the above process is designed to work seamlessly with eventual Drill support for the planner to fully define the schema. To do so, the planner needs to know the schema that the scan (that is, the readers) will produce at schema commit time. If that information is available at plan time, we can shift schema computation to plan time.
As we revise the storage plugins, we would want to add a plan-time metadata feature: an API where the storage or format plugin can report its source schema (converted to Drill format). After that, we simply need to shift the output schema computation from run to plan time.
Each plugin may report a schema differently. Some (such as JDBC or Hive) can query an external system. Some (such as Parquet) could read information from files. All can consult data provided by the provided schema or Drill metastore.
It is very likely that Calcite already provides the mechanics to compute the row format for each operator given a defined scan schema. The process starts with the project list, then resolves the list to a set of column schemas, which Calcite propagates up the DAG from leaf to root.
Suppose we implemented plan-time schema resolution. We would modify the physical plan to provide the additional information.
The simplest solution is for each fragment to have a table of row formats, and for each operator in the fragment to hold a pointer to its input and output schemas within the table. (A fragment executes within a thread and is thus self-contained.) As noted earlier, the output schema of one operator is the input schema of the next. The schema table avoids the need to serialize the schema twice.
For the scan, the output schema is the contract with the downstream operator. The output schema does not help the reader decode its input. For that, each plugin can define its own set of "decode hints." For files, this could be the provided schema, in Drill-schema form, possibly with additional properties. For other plugins, the information could be in some other form such as an Avro schema definition.
Regardless of the form, the decode hints describe the input source independent of any query on that source. For a CSV file without headers, the schema defines each column, in the order the columns appear in the file. If no decode hints are needed (or available), then the reader must implicitly know how to decode the source.
The result is that the scan plan will include, a least:
{
outputSchema: <schema-table-ref>,
hints: <source-specific hints>
}
The output schema replaces the project list: it is a project list possibly augmented with type information.