Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLUGIN-1837] Error management for Analytics plugin i.e. GroupByAggregate, Deduplicate, Distinct and Joiner #1903

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package io.cdap.plugin.batch.aggregator;

import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;

/**
* Common functions for aggregation related functionalities.
Expand Down Expand Up @@ -69,8 +72,10 @@ private static void generateException(Schema fieldSchema, String fieldName, Stri
Schema.Type fieldType, String expectedType) {
Schema.LogicalType logicalType = fieldSchema.isNullable() ? fieldSchema.getNonNullable().getLogicalType() :
fieldSchema.getLogicalType();
throw new IllegalArgumentException(String.format(
String error = String.format(
"Cannot compute %s on field %s because its type %s is not %s", functionName, fieldName,
logicalType == null ? fieldType : logicalType, expectedType));
logicalType == null ? fieldType : logicalType, expectedType);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
Expand Down Expand Up @@ -160,9 +163,12 @@ private StructuredRecord select(StructuredRecord record1, StructuredRecord recor
if (selectionFunction == null) {
Schema.Field field = record1.getSchema().getField(filterFunction.getField());
if (field == null) {
throw new IllegalArgumentException(
String.format("Field '%s' cannot be used as a filter field since it does not exist in the output schema",
filterFunction.getField()));
String errorReason = String.format("Field '%s' cannot be used as a filter field since it does not exist in " +
"the output schema", filterFunction.getField());
String errorMessage = String.format("Failed to merge values because the field '%s' cannot be used as a " +
"filter field since it does not exist in the output schema", filterFunction.getField());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason));
}
selectionFunction = filterFunction.getSelectionFunction(field.getSchema());
}
Expand All @@ -174,8 +180,12 @@ private Schema getGroupKeySchema(Schema inputSchema) {
for (String fieldName : dedupConfig.getUniqueFields()) {
Schema.Field field = inputSchema.getField(fieldName);
if (field == null) {
throw new IllegalArgumentException(String.format("Field %s does not exist in input schema %s.",
fieldName, inputSchema));
String errorReason = String.format("Field %s does not exist in input schema %s.",
fieldName, inputSchema);
String errorMessage = String.format("Failed to groupBy because field %s does not exist in input schema %s.",
fieldName, inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason));
}
fields.add(field);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.batch.aggregator.function.Any;
import io.cdap.plugin.batch.aggregator.function.First;
import io.cdap.plugin.batch.aggregator.function.Last;
Expand Down Expand Up @@ -91,8 +94,10 @@ DedupFunctionInfo getFilter() {
}

if (filterParts.size() != 2) {
throw new IllegalArgumentException(String.format("Invalid filter operation. It should be of format " +
"'fieldName:functionName'. But got : %s", filterOperation));
String error = String.format("Invalid filter operation. It should be of format " +
"'fieldName:functionName'. But got : %s", filterOperation);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}

Function function;
Expand All @@ -106,8 +111,12 @@ DedupFunctionInfo getFilter() {
try {
function = Function.valueOf(functionStr.toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(String.format("Invalid function '%s'. Must be one of %s.",
functionStr, Joiner.on(',').join(Function.values())));
String errorReason = String.format("Invalid function '%s'. Must be one of %s.",
functionStr, Joiner.on(',').join(Function.values()));
String errorMessage = String.format("Failed to filter due to invalid function '%s', Must be one of %s, " +
"with message: %s.", functionStr, Joiner.on(',').join(Function.values()), e.getMessage());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, e);
}
return new DedupFunctionInfo(fieldName, function);
}
Expand Down Expand Up @@ -142,9 +151,10 @@ public SelectionFunction getSelectionFunction(Schema fieldSchema) {
case MIN:
return new MinSelection(field, fieldSchema);
}
throw new IllegalArgumentException(String.format(
"The function '%s' provided is not supported. It must be one of %s.",
function, Joiner.on(',').join(Function.values())));
String error = String.format("The function '%s' provided is not supported. It must be one of %s.",
function, Joiner.on(',').join(Function.values()));
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
Expand All @@ -36,7 +39,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -170,8 +172,12 @@ private static Schema getOutputSchema(Schema inputSchema, Iterable<String> field
for (String fieldName : fields) {
Schema.Field field = inputSchema.getField(fieldName);
if (field == null) {
throw new IllegalArgumentException(String.format("Field %s does not exist in input schema %s.",
fieldName, inputSchema));
String errorReason = String.format("Field %s does not exist in input schema %s.",
fieldName, inputSchema);
String errorMessage = String.format("Failed to fetch record schema due to field %s does not exist in" +
" input schema %s.", fieldName, inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason));
}
outputFields.add(field);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
Expand Down Expand Up @@ -319,7 +322,7 @@ public void initialize(BatchRuntimeContext context) throws Exception {
}

@Override
public void groupBy(StructuredRecord record, Emitter<StructuredRecord> emitter) throws Exception {
public void groupBy(StructuredRecord record, Emitter<StructuredRecord> emitter) {
// app should provide some way to make some data calculated in configurePipeline available here.
// then we wouldn't have to calculate schema here
StructuredRecord.Builder builder = StructuredRecord.builder(getGroupKeySchema(record.getSchema()));
Expand Down Expand Up @@ -371,9 +374,11 @@ private Schema getOutputSchema(Schema inputSchema, List<String> groupByFields,
for (String groupByField : groupByFields) {
Schema.Field field = inputSchema.getField(groupByField);
if (field == null) {
throw new IllegalArgumentException(String.format(
String error = String.format(
"Cannot group by field '%s' because it does not exist in input schema %s.",
groupByField, inputSchema));
groupByField, inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}
outputFields.add(field);
}
Expand Down Expand Up @@ -406,9 +411,13 @@ private Schema.Field getOutputSchemaField(GroupByConfig.FunctionInfo functionInf

Schema.Field inputField = inputSchema.getField(functionInfo.getField());
if (inputField == null) {
throw new IllegalArgumentException(String.format(
String errorMessage = String.format(
"Invalid aggregate %s(%s): Field '%s' does not exist in input schema %s.",
functionInfo.getFunction(), functionInfo.getField(), functionInfo.getField(), inputSchema));
functionInfo.getFunction(), functionInfo.getField(), functionInfo.getField(), inputSchema);
String errorReason = String.format("Field '%s' does not exist in input schema %s.",
functionInfo.getField(), inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason));
}
AggregateFunction aggregateFunction = functionInfo.getAggregateFunction(inputField.getSchema());
return Schema.Field.of(functionInfo.getName(), aggregateFunction.getOutputSchema());
Expand Down Expand Up @@ -437,9 +446,11 @@ private Schema getGroupKeySchema(Schema inputSchema) {
for (String groupByField : conf.getGroupByFields()) {
Schema.Field fieldSchema = inputSchema.getField(groupByField);
if (fieldSchema == null) {
throw new IllegalArgumentException(String.format(
String error = String.format(
"Cannot group by field '%s' because it does not exist in input schema %s",
groupByField, inputSchema));
groupByField, inputSchema);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException());
}
fields.add(fieldSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.batch.aggregator.function.AggregateFunction;
import io.cdap.plugin.batch.aggregator.function.AnyIf;
import io.cdap.plugin.batch.aggregator.function.Avg;
Expand Down Expand Up @@ -120,7 +123,10 @@ List<String> getGroupByFields() {
fields.add(field);
}
if (fields.isEmpty()) {
throw new IllegalArgumentException("The 'groupByFields' property must be set.");
String errorReason = "Can not group by fields as fields are empty.";
String errorMessage = "Fields are empty. The 'groupByFields' property must be set.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason));
}
return fields;
}
Expand All @@ -138,46 +144,66 @@ List<FunctionInfo> getAggregates() {
for (String aggregate : Splitter.on(',').trimResults().split(aggregates)) {
int colonIdx = aggregate.indexOf(':');
if (colonIdx < 0) {
throw new IllegalArgumentException(String.format(
"Could not find ':' separating aggregate name from its function in '%s'.", aggregate));
String error = String.format(
"Could not find ':' separating aggregate name from its function in '%s'.", aggregate);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}
String name = aggregate.substring(0, colonIdx).trim();
if (!aggregateNames.add(name)) {
throw new IllegalArgumentException(String.format(
"Cannot create multiple aggregate functions with the same name '%s'.", name));
String error = String.format(
"Cannot create multiple aggregate functions with the same name '%s'.", name);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}

String functionAndField = aggregate.substring(colonIdx + 1).trim();
int leftParanIdx = functionAndField.indexOf('(');
if (leftParanIdx < 0) {
throw new IllegalArgumentException(String.format(
String errorReason = String.format("Could not find '(' in function '%s'.", functionAndField);
String errorMessage = String.format(
"Could not find '(' in function '%s'. Functions must be specified as function(field).",
functionAndField));
functionAndField);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason));
}
String functionStr = functionAndField.substring(0, leftParanIdx).trim();
Function function;
try {
function = Function.valueOf(functionStr.toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(String.format(
"Invalid function '%s'. Must be one of %s.", functionStr, Joiner.on(',').join(Function.values())));
String errorReason = String.format(
"Invalid function '%s'. Must be one of %s.", functionStr, Joiner.on(',').join(Function.values()));
String errorMessage = String.format(
"Failed to fetch function due to invalid function '%s', must be one of %s with message: %s.",
functionStr, Joiner.on(',').join(Function.values()), e.getMessage());
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, e);
}

if (!functionAndField.endsWith(")")) {
throw new IllegalArgumentException(String.format(
String errorReason = String.format("Could not find closing ')' in function '%s'.", functionAndField);
String errorMessage = String.format(
"Could not find closing ')' in function '%s'. Functions must be specified as function(field).",
functionAndField));
functionAndField);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason));
}
int conditionIndex = functionAndField.toLowerCase().indexOf("condition(");
// check if condition involved extract substring up to condition otherwise extract up to length of string
int fieldEndIndex = (conditionIndex == -1) ? functionAndField.length() - 1 : conditionIndex - 2;
String field = functionAndField.substring(leftParanIdx + 1, fieldEndIndex).trim();
if (field.isEmpty()) {
throw new IllegalArgumentException(String.format(
"Invalid function '%s'. A field must be given as an argument.", functionAndField));
String errorReason = String.format("Invalid function '%s'.", functionAndField);
String errorMessage = String.format(
"Invalid function '%s'. A field must be given as an argument.", functionAndField);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalArgumentException(errorReason));
}
if (conditionIndex == -1 && function.isConditional()) {
throw new IllegalArgumentException("Missing 'condition' property for conditional function.");
String error = "Missing 'condition' property for conditional function.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}
String functionCondition = null;
if (conditionIndex != -1) {
Expand All @@ -187,15 +213,19 @@ List<FunctionInfo> getAggregates() {
// department.equals('d1')
functionCondition = functionAndField.substring(conditionIndex + 10, functionAndField.length() - 1);
if (Strings.isNullOrEmpty(functionCondition)) {
throw new IllegalArgumentException("The 'condition' property is missing arguments.");
String error = "The 'condition' property is missing arguments.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}
functionCondition = functionCondition.trim();
}
functionInfos.add(new FunctionInfo(name, field, function, functionCondition));
}

if (functionInfos.isEmpty()) {
throw new IllegalArgumentException("The 'aggregates' property must be set.");
String error = "The 'aggregates' property must be set.";
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
error, error, ErrorType.USER, false, new IllegalArgumentException(error));
}
return functionInfos;
}
Expand Down Expand Up @@ -326,7 +356,11 @@ public AggregateFunction getAggregateFunction(Schema fieldSchema) {
return new AnyIf(field, fieldSchema, JexlCondition.of(condition));
}
// should never happen
throw new IllegalStateException("Unknown function type " + function);
String errorReason = String.format("Unknown function type %s", function);
String errorMessage = String.format("Failed to fetch Aggregate function for schema %s. Unknown function type %s.",
fieldSchema, function);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.USER, false, new IllegalStateException(errorReason));
}

@Override
Expand Down
Loading
Loading