diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/AggregationUtils.java b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/AggregationUtils.java index dd370c645..23283c5b6 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/AggregationUtils.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/AggregationUtils.java @@ -17,6 +17,9 @@ package io.cdap.plugin.batch.aggregator; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; /** * Common functions for aggregation related functionalities. @@ -69,8 +72,10 @@ private static void generateException(Schema fieldSchema, String fieldName, Stri Schema.Type fieldType, String expectedType) { Schema.LogicalType logicalType = fieldSchema.isNullable() ? fieldSchema.getNonNullable().getLogicalType() : fieldSchema.getLogicalType(); - throw new IllegalArgumentException(String.format( + String error = String.format( "Cannot compute %s on field %s because its type %s is not %s", functionName, fieldName, - logicalType == null ? fieldType : logicalType, expectedType)); + logicalType == null ? fieldType : logicalType, expectedType); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, new IllegalArgumentException(error)); } } diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DedupAggregator.java b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DedupAggregator.java index 2e5292abf..992b6cd87 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DedupAggregator.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DedupAggregator.java @@ -21,6 +21,9 @@ import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; @@ -160,9 +163,12 @@ private StructuredRecord select(StructuredRecord record1, StructuredRecord recor if (selectionFunction == null) { Schema.Field field = record1.getSchema().getField(filterFunction.getField()); if (field == null) { - throw new IllegalArgumentException( - String.format("Field '%s' cannot be used as a filter field since it does not exist in the output schema", - filterFunction.getField())); + String errorReason = String.format("Field '%s' cannot be used as a filter field since it does not exist in " + + "the output schema", filterFunction.getField()); + String errorMessage = String.format("Failed to merge values because the field '%s' cannot be used as a " + + "filter field since it does not exist in the output schema", filterFunction.getField()); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason)); } selectionFunction = filterFunction.getSelectionFunction(field.getSchema()); } @@ -174,8 +180,12 @@ private Schema getGroupKeySchema(Schema inputSchema) { for (String fieldName : dedupConfig.getUniqueFields()) { Schema.Field field = inputSchema.getField(fieldName); if (field == null) { - throw new IllegalArgumentException(String.format("Field %s does not exist in input schema %s.", - fieldName, inputSchema)); + String errorReason = String.format("Field %s does not exist in input schema %s.", + fieldName, inputSchema); + String errorMessage = String.format("Failed to groupBy because field %s does not exist in input schema %s.", + fieldName, inputSchema); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason)); } fields.add(field); } diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DedupConfig.java b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DedupConfig.java index dd4757089..5b3e41c4a 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DedupConfig.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DedupConfig.java @@ -22,6 +22,9 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.batch.aggregator.function.Any; import io.cdap.plugin.batch.aggregator.function.First; import io.cdap.plugin.batch.aggregator.function.Last; @@ -91,8 +94,10 @@ DedupFunctionInfo getFilter() { } if (filterParts.size() != 2) { - throw new IllegalArgumentException(String.format("Invalid filter operation. It should be of format " + - "'fieldName:functionName'. But got : %s", filterOperation)); + String error = String.format("Invalid filter operation. It should be of format " + + "'fieldName:functionName'. But got : %s", filterOperation); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, new IllegalArgumentException(error)); } Function function; @@ -106,8 +111,12 @@ DedupFunctionInfo getFilter() { try { function = Function.valueOf(functionStr.toUpperCase()); } catch (IllegalArgumentException e) { - throw new IllegalArgumentException(String.format("Invalid function '%s'. Must be one of %s.", - functionStr, Joiner.on(',').join(Function.values()))); + String errorReason = String.format("Invalid function '%s'. Must be one of %s.", + functionStr, Joiner.on(',').join(Function.values())); + String errorMessage = String.format("Failed to filter due to invalid function '%s', Must be one of %s, " + + "with message: %s.", functionStr, Joiner.on(',').join(Function.values()), e.getMessage()); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.USER, false, e); } return new DedupFunctionInfo(fieldName, function); } @@ -142,9 +151,10 @@ public SelectionFunction getSelectionFunction(Schema fieldSchema) { case MIN: return new MinSelection(field, fieldSchema); } - throw new IllegalArgumentException(String.format( - "The function '%s' provided is not supported. It must be one of %s.", - function, Joiner.on(',').join(Function.values()))); + String error = String.format("The function '%s' provided is not supported. It must be one of %s.", + function, Joiner.on(',').join(Function.values())); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, new IllegalArgumentException(error)); } @Override diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DistinctAggregator.java b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DistinctAggregator.java index cde9b408c..85fe11135 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DistinctAggregator.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/DistinctAggregator.java @@ -24,6 +24,9 @@ import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; @@ -36,7 +39,6 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import javax.annotation.Nullable; @@ -170,8 +172,12 @@ private static Schema getOutputSchema(Schema inputSchema, Iterable field for (String fieldName : fields) { Schema.Field field = inputSchema.getField(fieldName); if (field == null) { - throw new IllegalArgumentException(String.format("Field %s does not exist in input schema %s.", - fieldName, inputSchema)); + String errorReason = String.format("Field %s does not exist in input schema %s.", + fieldName, inputSchema); + String errorMessage = String.format("Failed to fetch record schema due to field %s does not exist in" + + " input schema %s.", fieldName, inputSchema); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason)); } outputFields.add(field); } diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/GroupByAggregator.java b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/GroupByAggregator.java index bdfaaa1c1..12c030dca 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/GroupByAggregator.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/GroupByAggregator.java @@ -22,6 +22,9 @@ import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.etl.api.Emitter; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; @@ -319,7 +322,7 @@ public void initialize(BatchRuntimeContext context) throws Exception { } @Override - public void groupBy(StructuredRecord record, Emitter emitter) throws Exception { + public void groupBy(StructuredRecord record, Emitter emitter) { // app should provide some way to make some data calculated in configurePipeline available here. // then we wouldn't have to calculate schema here StructuredRecord.Builder builder = StructuredRecord.builder(getGroupKeySchema(record.getSchema())); @@ -371,9 +374,11 @@ private Schema getOutputSchema(Schema inputSchema, List groupByFields, for (String groupByField : groupByFields) { Schema.Field field = inputSchema.getField(groupByField); if (field == null) { - throw new IllegalArgumentException(String.format( + String error = String.format( "Cannot group by field '%s' because it does not exist in input schema %s.", - groupByField, inputSchema)); + groupByField, inputSchema); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, new IllegalArgumentException(error)); } outputFields.add(field); } @@ -406,9 +411,13 @@ private Schema.Field getOutputSchemaField(GroupByConfig.FunctionInfo functionInf Schema.Field inputField = inputSchema.getField(functionInfo.getField()); if (inputField == null) { - throw new IllegalArgumentException(String.format( + String errorMessage = String.format( "Invalid aggregate %s(%s): Field '%s' does not exist in input schema %s.", - functionInfo.getFunction(), functionInfo.getField(), functionInfo.getField(), inputSchema)); + functionInfo.getFunction(), functionInfo.getField(), functionInfo.getField(), inputSchema); + String errorReason = String.format("Field '%s' does not exist in input schema %s.", + functionInfo.getField(), inputSchema); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason)); } AggregateFunction aggregateFunction = functionInfo.getAggregateFunction(inputField.getSchema()); return Schema.Field.of(functionInfo.getName(), aggregateFunction.getOutputSchema()); @@ -437,9 +446,11 @@ private Schema getGroupKeySchema(Schema inputSchema) { for (String groupByField : conf.getGroupByFields()) { Schema.Field fieldSchema = inputSchema.getField(groupByField); if (fieldSchema == null) { - throw new IllegalArgumentException(String.format( + String error = String.format( "Cannot group by field '%s' because it does not exist in input schema %s", - groupByField, inputSchema)); + groupByField, inputSchema); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, new IllegalArgumentException()); } fields.add(fieldSchema); } diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/GroupByConfig.java b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/GroupByConfig.java index fbf05b6bf..63222f66c 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/GroupByConfig.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/aggregator/GroupByConfig.java @@ -23,6 +23,9 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.plugin.batch.aggregator.function.AggregateFunction; import io.cdap.plugin.batch.aggregator.function.AnyIf; import io.cdap.plugin.batch.aggregator.function.Avg; @@ -120,7 +123,10 @@ List getGroupByFields() { fields.add(field); } if (fields.isEmpty()) { - throw new IllegalArgumentException("The 'groupByFields' property must be set."); + String errorReason = "Can not group by fields as fields are empty."; + String errorMessage = "Fields are empty. The 'groupByFields' property must be set."; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason)); } return fields; } @@ -138,46 +144,66 @@ List getAggregates() { for (String aggregate : Splitter.on(',').trimResults().split(aggregates)) { int colonIdx = aggregate.indexOf(':'); if (colonIdx < 0) { - throw new IllegalArgumentException(String.format( - "Could not find ':' separating aggregate name from its function in '%s'.", aggregate)); + String error = String.format( + "Could not find ':' separating aggregate name from its function in '%s'.", aggregate); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, new IllegalArgumentException(error)); } String name = aggregate.substring(0, colonIdx).trim(); if (!aggregateNames.add(name)) { - throw new IllegalArgumentException(String.format( - "Cannot create multiple aggregate functions with the same name '%s'.", name)); + String error = String.format( + "Cannot create multiple aggregate functions with the same name '%s'.", name); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, new IllegalArgumentException(error)); } String functionAndField = aggregate.substring(colonIdx + 1).trim(); int leftParanIdx = functionAndField.indexOf('('); if (leftParanIdx < 0) { - throw new IllegalArgumentException(String.format( + String errorReason = String.format("Could not find '(' in function '%s'.", functionAndField); + String errorMessage = String.format( "Could not find '(' in function '%s'. Functions must be specified as function(field).", - functionAndField)); + functionAndField); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason)); } String functionStr = functionAndField.substring(0, leftParanIdx).trim(); Function function; try { function = Function.valueOf(functionStr.toUpperCase()); } catch (IllegalArgumentException e) { - throw new IllegalArgumentException(String.format( - "Invalid function '%s'. Must be one of %s.", functionStr, Joiner.on(',').join(Function.values()))); + String errorReason = String.format( + "Invalid function '%s'. Must be one of %s.", functionStr, Joiner.on(',').join(Function.values())); + String errorMessage = String.format( + "Failed to fetch function due to invalid function '%s', must be one of %s with message: %s.", + functionStr, Joiner.on(',').join(Function.values()), e.getMessage()); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.USER, false, e); } if (!functionAndField.endsWith(")")) { - throw new IllegalArgumentException(String.format( + String errorReason = String.format("Could not find closing ')' in function '%s'.", functionAndField); + String errorMessage = String.format( "Could not find closing ')' in function '%s'. Functions must be specified as function(field).", - functionAndField)); + functionAndField); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason)); } int conditionIndex = functionAndField.toLowerCase().indexOf("condition("); // check if condition involved extract substring up to condition otherwise extract up to length of string int fieldEndIndex = (conditionIndex == -1) ? functionAndField.length() - 1 : conditionIndex - 2; String field = functionAndField.substring(leftParanIdx + 1, fieldEndIndex).trim(); if (field.isEmpty()) { - throw new IllegalArgumentException(String.format( - "Invalid function '%s'. A field must be given as an argument.", functionAndField)); + String errorReason = String.format("Invalid function '%s'.", functionAndField); + String errorMessage = String.format( + "Invalid function '%s'. A field must be given as an argument.", functionAndField); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason)); } if (conditionIndex == -1 && function.isConditional()) { - throw new IllegalArgumentException("Missing 'condition' property for conditional function."); + String error = "Missing 'condition' property for conditional function."; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, new IllegalArgumentException(error)); } String functionCondition = null; if (conditionIndex != -1) { @@ -187,7 +213,9 @@ List getAggregates() { // department.equals('d1') functionCondition = functionAndField.substring(conditionIndex + 10, functionAndField.length() - 1); if (Strings.isNullOrEmpty(functionCondition)) { - throw new IllegalArgumentException("The 'condition' property is missing arguments."); + String error = "The 'condition' property is missing arguments."; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, new IllegalArgumentException(error)); } functionCondition = functionCondition.trim(); } @@ -195,7 +223,9 @@ List getAggregates() { } if (functionInfos.isEmpty()) { - throw new IllegalArgumentException("The 'aggregates' property must be set."); + String error = "The 'aggregates' property must be set."; + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.USER, false, new IllegalArgumentException(error)); } return functionInfos; } @@ -326,7 +356,11 @@ public AggregateFunction getAggregateFunction(Schema fieldSchema) { return new AnyIf(field, fieldSchema, JexlCondition.of(condition)); } // should never happen - throw new IllegalStateException("Unknown function type " + function); + String errorReason = String.format("Unknown function type %s", function); + String errorMessage = String.format("Failed to fetch Aggregate function for schema %s. Unknown function type %s.", + fieldSchema, function); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorReason, errorMessage, ErrorType.USER, false, new IllegalStateException(errorReason)); } @Override diff --git a/core-plugins/src/main/java/io/cdap/plugin/batch/joiner/JoinerConfig.java b/core-plugins/src/main/java/io/cdap/plugin/batch/joiner/JoinerConfig.java index 4a6b220b2..2effdfa43 100644 --- a/core-plugins/src/main/java/io/cdap/plugin/batch/joiner/JoinerConfig.java +++ b/core-plugins/src/main/java/io/cdap/plugin/batch/joiner/JoinerConfig.java @@ -25,6 +25,9 @@ import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; import io.cdap.cdap.api.plugin.PluginConfig; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.join.JoinCondition; @@ -264,7 +267,9 @@ JoinCondition getCondition(FailureCollector failureCollector) { .build(); } // will never happen unless getConditionType() is changed without changing this - throw new IllegalStateException("Unsupported condition type " + conditionType); + String error = String.format("Unsupported condition type %s.", conditionType); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + error, error, ErrorType.SYSTEM, false, new IllegalStateException(error)); } Set getJoinKeys(FailureCollector failureCollector) { diff --git a/pom.xml b/pom.xml index 7d317cdfa..ce0827ca3 100644 --- a/pom.xml +++ b/pom.xml @@ -1004,12 +1004,12 @@ !cloudBuild - wrangler-transform +