diff --git a/framework/src/main/java/org/apache/drill/test/framework/DrillQueryProfile.java b/framework/src/main/java/org/apache/drill/test/framework/DrillQueryProfile.java index 41a4a0f01..83c5b9597 100644 --- a/framework/src/main/java/org/apache/drill/test/framework/DrillQueryProfile.java +++ b/framework/src/main/java/org/apache/drill/test/framework/DrillQueryProfile.java @@ -3,8 +3,10 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import oadd.org.apache.drill.exec.proto.UserBitShared; import java.util.List; +import java.util.stream.Collectors; @JsonIgnoreProperties(ignoreUnknown = true) @JsonTypeName("queryProfile") @@ -159,6 +161,7 @@ public static class OperatorProfile { public long processNanos; public long peakLocalMemoryAllocated; public long waitNanos; + public long optimalMemAllocation; @Override public String toString() { @@ -176,7 +179,8 @@ public String toString() { .append("setupNanos=").append(setupNanos).append(", ") .append("processNanos=").append(processNanos).append(", ") .append("peakLocalMemoryAllocated=").append(peakLocalMemoryAllocated).append(", ") - .append("waitNanos=").append(waitNanos).append(")"); + .append("waitNanos=").append(waitNanos).append(", ") + .append("optimalMemAllocation=").append(optimalMemAllocation).append(")"); return sb.toString(); } } @@ -197,6 +201,49 @@ public String toString() { } } + + /** + * Get optimal memory allocated per operator. + * Utility parses the DrillQueryProfile + * @param operator + * @return + */ + public long getOptimalMemoryPerOperator(final UserBitShared.CoreOperatorType operator) { + return this.fragmentProfiles + .stream() + .flatMap(f -> f.minorFragmentProfiles + .stream() + .flatMap(m -> m.operatorProfiles.stream()) + ).filter(o -> o.operatorId == operator.getNumber()) + .mapToLong(o -> o.optimalMemAllocation) + .sum(); + } + + /** + * Get different operators in the profile. + * @return a list of operators in the query profile. + */ + public List getOperatorsFromProfile() { + return this.fragmentProfiles + .stream().flatMap(f -> f.minorFragmentProfiles + .stream() + .flatMap(m -> m.operatorProfiles.stream()) + ).mapToInt(o -> o.operatorId) + .distinct() + .mapToObj(UserBitShared.CoreOperatorType::forNumber) + .collect(Collectors.toList()); + } + + /** + * Total optimal memory required for the query. + * @return total optimal memory required for the query (as estimated by the RM planner). + */ + public long getTotalOptimalMemoryEstimate() { + return getOperatorsFromProfile() + .stream() + .mapToLong(this::getOptimalMemoryPerOperator) + .sum(); + } } diff --git a/framework/src/main/java/org/apache/drill/test/framework/Utils.java b/framework/src/main/java/org/apache/drill/test/framework/Utils.java index 934a4e292..6315e29b8 100755 --- a/framework/src/main/java/org/apache/drill/test/framework/Utils.java +++ b/framework/src/main/java/org/apache/drill/test/framework/Utils.java @@ -17,6 +17,7 @@ */ package org.apache.drill.test.framework; +import oadd.org.apache.drill.exec.proto.UserBitShared; import org.apache.commons.io.FilenameUtils; import java.io.*; @@ -44,6 +45,7 @@ import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonNode; diff --git a/framework/src/test/java/org/apache/drill/test/framework/DrillTestFrameworkUnitTests.java b/framework/src/test/java/org/apache/drill/test/framework/DrillTestFrameworkUnitTests.java index afe2ee7df..ed554cea6 100644 --- a/framework/src/test/java/org/apache/drill/test/framework/DrillTestFrameworkUnitTests.java +++ b/framework/src/test/java/org/apache/drill/test/framework/DrillTestFrameworkUnitTests.java @@ -2,6 +2,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import oadd.org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.test.framework.common.DrillJavaTestBase; import org.apache.log4j.Logger; import org.testng.Assert; @@ -16,6 +17,7 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; +import java.util.List; import java.util.Properties; import static org.apache.drill.test.framework.DrillTestDefaults.DRILL_EXEC_RM_CONFIG_KEY; @@ -137,6 +139,7 @@ public void testConfigFileRenderer() { * Read from the file to validate. * @throws IOException */ + @Test(groups = UNIT_GROUP) public void testWriteRMConfigToFile() throws IOException { final String fileName = "tempRMConfig.conf"; final String filePath = DrillTestDefaults.TEST_ROOT_DIR + "conf/" + fileName; @@ -159,4 +162,35 @@ public void testWriteRMConfigToFile() throws IOException { Assert.assertEquals(drillRMConfig2.childPools.size(), 2, "Number of child pools in the config did not match!"); } + + /** + * Test parsing operator memory and type from QueryProfile. + */ + @Test + public void testTotalMemoryForQueryProfile() { + final Properties props = Utils.createConnectionProperties(); + final ConnectionPool pool = new ConnectionPool(props); + final String sqlStatement = "select name, val, status from sys.options where name like \'%runtime%\'"; + + try (Connection connection = pool.getOrCreateConnection()) { + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sqlStatement); + + final String queryId = Utils.getQueryID(resultSet); + DrillQueryProfile profile = Utils.getQueryProfile(queryId); + Assert.assertEquals(profile.queryId, queryId); + + long rmMemEstimate = profile.getTotalOptimalMemoryEstimate(); + LOG.info("Memory estimated by RM planner: " + rmMemEstimate); + Assert.assertTrue(rmMemEstimate > 0, + "RM estimated memory should be greater than 0"); + List operators = profile.getOperatorsFromProfile(); + Assert.assertTrue(operators.size() > 0, + "Number of operators in the profile should be greater than 0"); + operators.forEach(LOG::info); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } }