-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-46862][SQL] Disable CSV column pruning in the multi-line mode #44872
Conversation
cc @cloud-fan |
do we have a test to show the data correctness issue? |
.option("header", "true") | ||
.option("escape", "\"") | ||
.csv(path.getCanonicalPath) | ||
assert(df.count() === 5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
count()
returns 4 without the changes.
@cloud-fan I added a test for the issue. |
@cloud-fan @HyukjinKwon FYI, since 3.5.x and 3.4.x suffer from the same issue, I am going to backport this the branches. Thanks for review. |
Merging to master/3.5/3.4. Thank you, @HyukjinKwon @cloud-fan for review. |
### What changes were proposed in this pull request? In the PR, I propose to disable the column pruning feature in the CSV datasource for the `multiLine` mode. ### Why are the changes needed? To workaround the issue in the `uniVocity` parser used by the CSV datasource: uniVocity/univocity-parsers#529 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *CSVv1Suite" $ build/sbt "test:testOnly *CSVv2Suite" $ build/sbt "test:testOnly *CSVLegacyTimeParserSuite" $ build/sbt "testOnly *.CsvFunctionsSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44872 from MaxGekk/csv-disable-column-pruning. Authored-by: Max Gekk <[email protected]> Signed-off-by: Max Gekk <[email protected]> (cherry picked from commit 829e742) Signed-off-by: Max Gekk <[email protected]>
### What changes were proposed in this pull request? In the PR, I propose to disable the column pruning feature in the CSV datasource for the `multiLine` mode. ### Why are the changes needed? To workaround the issue in the `uniVocity` parser used by the CSV datasource: uniVocity/univocity-parsers#529 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *CSVv1Suite" $ build/sbt "test:testOnly *CSVv2Suite" $ build/sbt "test:testOnly *CSVLegacyTimeParserSuite" $ build/sbt "testOnly *.CsvFunctionsSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44872 from MaxGekk/csv-disable-column-pruning. Authored-by: Max Gekk <[email protected]> Signed-off-by: Max Gekk <[email protected]> (cherry picked from commit 829e742) Signed-off-by: Max Gekk <[email protected]>
@@ -58,7 +58,7 @@ case class CSVPartitionReaderFactory( | |||
actualReadDataSchema, | |||
options, | |||
filters) | |||
val schema = if (options.columnPruning) actualReadDataSchema else actualDataSchema | |||
val schema = if (options.isColumnPruningEnabled) actualReadDataSchema else actualDataSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
Line 103 in 829e742
val columnPruning = sparkSession.sessionState.conf.csvColumnPruning |
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
Line 128 in 829e742
val schema = if (columnPruning) actualRequiredSchema else actualDataSchema |
@MaxGekk Should the check in CSVFileFormat
be changed to parsedOptions.isColumnPruningEnabled
too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The schema
is used only in CSVHeaderChecker
which is supposed to check column names in CSV and provided schema fields. It shouldn't depend on the column pruning feature at all, from my point of view.
private def checkHeaderColumnNames(columnNames: Array[String]): Unit = {
...
if (headerLen == schemaSize) {
...
} else {
errorMessage = Some(
s"""|Number of column in CSV header is not equal to number of fields in the schema:
| Header length: $headerLen, schema size: $schemaSize
|$source""".stripMargin)
}
schemaSize
must be full data schema of CSV files, but not the required schema.
Let me re-think it, and avoid the dependency from the column pruning in CSVHeaderChecker
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@LuciferYang Actually, you are right. Please, review this follow up PR: #44910
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
Show resolved
Hide resolved
…ing in V1 CSV datasource ### What changes were proposed in this pull request? In the PR, I propose to invoke `CSVOptons.isColumnPruningEnabled` introduced by #44872 while matching of CSV header to a schema in the V1 CSV datasource. ### Why are the changes needed? To fix the failure when column pruning happens and a schema is not enforced: ```scala scala> spark.read. | option("multiLine", true). | option("header", true). | option("escape", "\""). | option("enforceSchema", false). | csv("/Users/maximgekk/tmp/es-939111-data.csv"). | count() 24/01/27 12:43:14 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.IllegalArgumentException: Number of column in CSV header is not equal to number of fields in the schema: Header length: 4, schema size: 0 CSV file: file:///Users/maximgekk/tmp/es-939111-data.csv ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *CSVv1Suite" $ build/sbt "test:testOnly *CSVv2Suite" $ build/sbt "test:testOnly *CSVLegacyTimeParserSuite" $ build/sbt "testOnly *.CsvFunctionsSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44910 from MaxGekk/check-header-column-pruning. Authored-by: Max Gekk <[email protected]> Signed-off-by: Max Gekk <[email protected]>
…ing in V1 CSV datasource ### What changes were proposed in this pull request? In the PR, I propose to invoke `CSVOptons.isColumnPruningEnabled` introduced by #44872 while matching of CSV header to a schema in the V1 CSV datasource. ### Why are the changes needed? To fix the failure when column pruning happens and a schema is not enforced: ```scala scala> spark.read. | option("multiLine", true). | option("header", true). | option("escape", "\""). | option("enforceSchema", false). | csv("/Users/maximgekk/tmp/es-939111-data.csv"). | count() 24/01/27 12:43:14 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.IllegalArgumentException: Number of column in CSV header is not equal to number of fields in the schema: Header length: 4, schema size: 0 CSV file: file:///Users/maximgekk/tmp/es-939111-data.csv ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *CSVv1Suite" $ build/sbt "test:testOnly *CSVv2Suite" $ build/sbt "test:testOnly *CSVLegacyTimeParserSuite" $ build/sbt "testOnly *.CsvFunctionsSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44910 from MaxGekk/check-header-column-pruning. Authored-by: Max Gekk <[email protected]> Signed-off-by: Max Gekk <[email protected]> (cherry picked from commit bc51c9f) Signed-off-by: Max Gekk <[email protected]>
…ing in V1 CSV datasource ### What changes were proposed in this pull request? In the PR, I propose to invoke `CSVOptons.isColumnPruningEnabled` introduced by #44872 while matching of CSV header to a schema in the V1 CSV datasource. ### Why are the changes needed? To fix the failure when column pruning happens and a schema is not enforced: ```scala scala> spark.read. | option("multiLine", true). | option("header", true). | option("escape", "\""). | option("enforceSchema", false). | csv("/Users/maximgekk/tmp/es-939111-data.csv"). | count() 24/01/27 12:43:14 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.IllegalArgumentException: Number of column in CSV header is not equal to number of fields in the schema: Header length: 4, schema size: 0 CSV file: file:///Users/maximgekk/tmp/es-939111-data.csv ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *CSVv1Suite" $ build/sbt "test:testOnly *CSVv2Suite" $ build/sbt "test:testOnly *CSVLegacyTimeParserSuite" $ build/sbt "testOnly *.CsvFunctionsSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44910 from MaxGekk/check-header-column-pruning. Authored-by: Max Gekk <[email protected]> Signed-off-by: Max Gekk <[email protected]> (cherry picked from commit bc51c9f) Signed-off-by: Max Gekk <[email protected]>
### What changes were proposed in this pull request? In the PR, I propose to disable the column pruning feature in the CSV datasource for the `multiLine` mode. ### Why are the changes needed? To workaround the issue in the `uniVocity` parser used by the CSV datasource: uniVocity/univocity-parsers#529 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *CSVv1Suite" $ build/sbt "test:testOnly *CSVv2Suite" $ build/sbt "test:testOnly *CSVLegacyTimeParserSuite" $ build/sbt "testOnly *.CsvFunctionsSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#44872 from MaxGekk/csv-disable-column-pruning. Authored-by: Max Gekk <[email protected]> Signed-off-by: Max Gekk <[email protected]> (cherry picked from commit 829e742) Signed-off-by: Max Gekk <[email protected]>
…ing in V1 CSV datasource ### What changes were proposed in this pull request? In the PR, I propose to invoke `CSVOptons.isColumnPruningEnabled` introduced by apache#44872 while matching of CSV header to a schema in the V1 CSV datasource. ### Why are the changes needed? To fix the failure when column pruning happens and a schema is not enforced: ```scala scala> spark.read. | option("multiLine", true). | option("header", true). | option("escape", "\""). | option("enforceSchema", false). | csv("/Users/maximgekk/tmp/es-939111-data.csv"). | count() 24/01/27 12:43:14 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.IllegalArgumentException: Number of column in CSV header is not equal to number of fields in the schema: Header length: 4, schema size: 0 CSV file: file:///Users/maximgekk/tmp/es-939111-data.csv ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *CSVv1Suite" $ build/sbt "test:testOnly *CSVv2Suite" $ build/sbt "test:testOnly *CSVLegacyTimeParserSuite" $ build/sbt "testOnly *.CsvFunctionsSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#44910 from MaxGekk/check-header-column-pruning. Authored-by: Max Gekk <[email protected]> Signed-off-by: Max Gekk <[email protected]> (cherry picked from commit bc51c9f) Signed-off-by: Max Gekk <[email protected]>
A bit off-topic for this PR, but is uniVocity even maintained anymore?
|
is there any other popular Java libraries for parsing CSV? |
There are a bunch of libraries listed here, but I don't have experience with any of them. jackson-dataformats-text looks interesting. I know we already use FasterXML to parse JSON. Perhaps we should use them to parse CSV as well. |
I've filed SPARK-47180 to track potentially migrating off of Univocity to something else. |
What changes were proposed in this pull request?
In the PR, I propose to disable the column pruning feature in the CSV datasource for the
multiLine
mode.Why are the changes needed?
To workaround the issue in the
uniVocity
parser used by the CSV datasource: uniVocity/univocity-parsers#529Does this PR introduce any user-facing change?
No.
How was this patch tested?
By running the affected test suites:
Was this patch authored or co-authored using generative AI tooling?
No.