Skip to content

Commit

Permalink
Error management for Analytics plugin i.e. GroupByAggregate, Deduplic…
Browse files Browse the repository at this point in the history
…ate, Distinct, and Joiner plugins
  • Loading branch information
Amit-CloudSufi committed Jan 8, 2025
1 parent cca8d15 commit 3b3b46f
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

package io.cdap.plugin.batch.aggregator;

import com.google.common.base.Joiner;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;

/**
* Common functions for aggregation related functionalities.
Expand Down Expand Up @@ -69,8 +73,10 @@ private static void generateException(Schema fieldSchema, String fieldName, Stri
Schema.Type fieldType, String expectedType) {
Schema.LogicalType logicalType = fieldSchema.isNullable() ? fieldSchema.getNonNullable().getLogicalType() :
fieldSchema.getLogicalType();
throw new IllegalArgumentException(String.format(
String error = String.format(
"Cannot compute %s on field %s because its type %s is not %s", functionName, fieldName,
logicalType == null ? fieldType : logicalType, expectedType));
logicalType == null ? fieldType : logicalType, expectedType);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
Expand Down Expand Up @@ -160,9 +163,12 @@ private StructuredRecord select(StructuredRecord record1, StructuredRecord recor
if (selectionFunction == null) {
Schema.Field field = record1.getSchema().getField(filterFunction.getField());
if (field == null) {
throw new IllegalArgumentException(
String.format("Field '%s' cannot be used as a filter field since it does not exist in the output schema",
filterFunction.getField()));
String errorReason = String.format("Field '%s' cannot be used as a filter field since it does not exist in " +
"the output schema", filterFunction.getField());
String errorMessage = String.format("Failed to merge values because the field '%s' cannot be used as a " +
"filter field since it does not exist in the output schema", filterFunction.getField());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason));
}
selectionFunction = filterFunction.getSelectionFunction(field.getSchema());
}
Expand All @@ -174,8 +180,12 @@ private Schema getGroupKeySchema(Schema inputSchema) {
for (String fieldName : dedupConfig.getUniqueFields()) {
Schema.Field field = inputSchema.getField(fieldName);
if (field == null) {
throw new IllegalArgumentException(String.format("Field %s does not exist in input schema %s.",
fieldName, inputSchema));
String errorReason = String.format("Field %s does not exist in input schema %s.",
fieldName, inputSchema);
String errorMessage = String.format("Failed to groupBy because field %s does not exist in input schema %s.",
fieldName, inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason));
}
fields.add(field);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.batch.aggregator.function.Any;
import io.cdap.plugin.batch.aggregator.function.First;
import io.cdap.plugin.batch.aggregator.function.Last;
Expand Down Expand Up @@ -91,8 +94,10 @@ DedupFunctionInfo getFilter() {
}

if (filterParts.size() != 2) {
throw new IllegalArgumentException(String.format("Invalid filter operation. It should be of format " +
"'fieldName:functionName'. But got : %s", filterOperation));
String error = String.format("Invalid filter operation. It should be of format " +
"'fieldName:functionName'. But got : %s", filterOperation);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}

Function function;
Expand All @@ -106,8 +111,12 @@ DedupFunctionInfo getFilter() {
try {
function = Function.valueOf(functionStr.toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(String.format("Invalid function '%s'. Must be one of %s.",
functionStr, Joiner.on(',').join(Function.values())));
String errorReason = String.format("Invalid function '%s'. Must be one of %s.",
functionStr, Joiner.on(',').join(Function.values()));
String errorMessage = String.format("Failed to filter due to invalid function '%s', Must be one of %s, " +
"with message: %s.", functionStr, Joiner.on(',').join(Function.values()), e.getMessage());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, e);
}
return new DedupFunctionInfo(fieldName, function);
}
Expand Down Expand Up @@ -142,9 +151,10 @@ public SelectionFunction getSelectionFunction(Schema fieldSchema) {
case MIN:
return new MinSelection(field, fieldSchema);
}
throw new IllegalArgumentException(String.format(
"The function '%s' provided is not supported. It must be one of %s.",
function, Joiner.on(',').join(Function.values())));
String error = String.format("The function '%s' provided is not supported. It must be one of %s.",
function, Joiner.on(',').join(Function.values()));
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
Expand Down Expand Up @@ -170,8 +173,12 @@ private static Schema getOutputSchema(Schema inputSchema, Iterable<String> field
for (String fieldName : fields) {
Schema.Field field = inputSchema.getField(fieldName);
if (field == null) {
throw new IllegalArgumentException(String.format("Field %s does not exist in input schema %s.",
fieldName, inputSchema));
String errorReason = String.format("Field %s does not exist in input schema %s.",
fieldName, inputSchema);
String errorMessage = String.format("Failed to fetch record schema due to field %s does not exist in" +
" input schema %s.", fieldName, inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason));
}
outputFields.add(field);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
Expand Down Expand Up @@ -319,7 +322,7 @@ public void initialize(BatchRuntimeContext context) throws Exception {
}

@Override
public void groupBy(StructuredRecord record, Emitter<StructuredRecord> emitter) throws Exception {
public void groupBy(StructuredRecord record, Emitter<StructuredRecord> emitter) {
// app should provide some way to make some data calculated in configurePipeline available here.
// then we wouldn't have to calculate schema here
StructuredRecord.Builder builder = StructuredRecord.builder(getGroupKeySchema(record.getSchema()));
Expand Down Expand Up @@ -371,9 +374,11 @@ private Schema getOutputSchema(Schema inputSchema, List<String> groupByFields,
for (String groupByField : groupByFields) {
Schema.Field field = inputSchema.getField(groupByField);
if (field == null) {
throw new IllegalArgumentException(String.format(
String error = String.format(
"Cannot group by field '%s' because it does not exist in input schema %s.",
groupByField, inputSchema));
groupByField, inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}
outputFields.add(field);
}
Expand Down Expand Up @@ -406,9 +411,13 @@ private Schema.Field getOutputSchemaField(GroupByConfig.FunctionInfo functionInf

Schema.Field inputField = inputSchema.getField(functionInfo.getField());
if (inputField == null) {
throw new IllegalArgumentException(String.format(
String errorMessage = String.format(
"Invalid aggregate %s(%s): Field '%s' does not exist in input schema %s.",
functionInfo.getFunction(), functionInfo.getField(), functionInfo.getField(), inputSchema));
functionInfo.getFunction(), functionInfo.getField(), functionInfo.getField(), inputSchema);
String errorReason = String.format("Field '%s' does not exist in input schema %s.",
functionInfo.getField(), inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason));
}
AggregateFunction aggregateFunction = functionInfo.getAggregateFunction(inputField.getSchema());
return Schema.Field.of(functionInfo.getName(), aggregateFunction.getOutputSchema());
Expand Down Expand Up @@ -437,9 +446,11 @@ private Schema getGroupKeySchema(Schema inputSchema) {
for (String groupByField : conf.getGroupByFields()) {
Schema.Field fieldSchema = inputSchema.getField(groupByField);
if (fieldSchema == null) {
throw new IllegalArgumentException(String.format(
String error = String.format(
"Cannot group by field '%s' because it does not exist in input schema %s",
groupByField, inputSchema));
groupByField, inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, true, new IllegalArgumentException());
}
fields.add(fieldSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.batch.aggregator.function.AggregateFunction;
import io.cdap.plugin.batch.aggregator.function.AnyIf;
import io.cdap.plugin.batch.aggregator.function.Avg;
Expand Down Expand Up @@ -120,7 +123,10 @@ List<String> getGroupByFields() {
fields.add(field);
}
if (fields.isEmpty()) {
throw new IllegalArgumentException("The 'groupByFields' property must be set.");
String errorReason = "Can not group by fields as fields are empty.";
String errorMessage = "Fields are empty. The 'groupByFields' property must be set.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason));
}
return fields;
}
Expand All @@ -138,46 +144,66 @@ List<FunctionInfo> getAggregates() {
for (String aggregate : Splitter.on(',').trimResults().split(aggregates)) {
int colonIdx = aggregate.indexOf(':');
if (colonIdx < 0) {
throw new IllegalArgumentException(String.format(
"Could not find ':' separating aggregate name from its function in '%s'.", aggregate));
String error = String.format(
"Could not find ':' separating aggregate name from its function in '%s'.", aggregate);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}
String name = aggregate.substring(0, colonIdx).trim();
if (!aggregateNames.add(name)) {
throw new IllegalArgumentException(String.format(
"Cannot create multiple aggregate functions with the same name '%s'.", name));
String error = String.format(
"Cannot create multiple aggregate functions with the same name '%s'.", name);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}

String functionAndField = aggregate.substring(colonIdx + 1).trim();
int leftParanIdx = functionAndField.indexOf('(');
if (leftParanIdx < 0) {
throw new IllegalArgumentException(String.format(
String errorReason = String.format("Could not find '(' in function '%s'.", functionAndField);
String errorMessage = String.format(
"Could not find '(' in function '%s'. Functions must be specified as function(field).",
functionAndField));
functionAndField);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason));
}
String functionStr = functionAndField.substring(0, leftParanIdx).trim();
Function function;
try {
function = Function.valueOf(functionStr.toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(String.format(
"Invalid function '%s'. Must be one of %s.", functionStr, Joiner.on(',').join(Function.values())));
String errorReason = String.format(
"Invalid function '%s'. Must be one of %s.", functionStr, Joiner.on(',').join(Function.values()));
String errorMessage = String.format(
"Failed to fetch function due to invalid function '%s', must be one of %s with message: %s.",
functionStr, Joiner.on(',').join(Function.values()), e.getMessage());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, e);
}

if (!functionAndField.endsWith(")")) {
throw new IllegalArgumentException(String.format(
String errorReason = String.format("Could not find closing ')' in function '%s'.", functionAndField);
String errorMessage = String.format(
"Could not find closing ')' in function '%s'. Functions must be specified as function(field).",
functionAndField));
functionAndField);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason));
}
int conditionIndex = functionAndField.toLowerCase().indexOf("condition(");
// check if condition involved extract substring up to condition otherwise extract up to length of string
int fieldEndIndex = (conditionIndex == -1) ? functionAndField.length() - 1 : conditionIndex - 2;
String field = functionAndField.substring(leftParanIdx + 1, fieldEndIndex).trim();
if (field.isEmpty()) {
throw new IllegalArgumentException(String.format(
"Invalid function '%s'. A field must be given as an argument.", functionAndField));
String errorReason = String.format("Invalid function '%s'.", functionAndField);
String errorMessage = String.format(
"Invalid function '%s'. A field must be given as an argument.", functionAndField);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason));
}
if (conditionIndex == -1 && function.isConditional()) {
throw new IllegalArgumentException("Missing 'condition' property for conditional function.");
String error = "Missing 'condition' property for conditional function.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}
String functionCondition = null;
if (conditionIndex != -1) {
Expand All @@ -187,15 +213,19 @@ List<FunctionInfo> getAggregates() {
// department.equals('d1')
functionCondition = functionAndField.substring(conditionIndex + 10, functionAndField.length() - 1);
if (Strings.isNullOrEmpty(functionCondition)) {
throw new IllegalArgumentException("The 'condition' property is missing arguments.");
String error = "The 'condition' property is missing arguments.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}
functionCondition = functionCondition.trim();
}
functionInfos.add(new FunctionInfo(name, field, function, functionCondition));
}

if (functionInfos.isEmpty()) {
throw new IllegalArgumentException("The 'aggregates' property must be set.");
String error = "The 'aggregates' property must be set.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}
return functionInfos;
}
Expand Down Expand Up @@ -326,7 +356,11 @@ public AggregateFunction getAggregateFunction(Schema fieldSchema) {
return new AnyIf(field, fieldSchema, JexlCondition.of(condition));
}
// should never happen
throw new IllegalStateException("Unknown function type " + function);
String errorReason = String.format("Unknown function type %s", function);
String errorMessage = String.format("Failed to fetch Aggregate function for schema %s. Unknown function type %s.",
fieldSchema, function);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalStateException(errorReason));
}

@Override
Expand Down
Loading

0 comments on commit 3b3b46f

Please sign in to comment.