Skip to content

Commit

Permalink
Changes for calculating "optimal memory requirement" for RM (#576)
Browse files Browse the repository at this point in the history
* Changes for calculating "optimal memory requirement" for RM

- Added utilities to get the total memory required per operator
- To get the total memory required by the query
- Get the different operators in the query

* Update unit test to fail in case of exception
  • Loading branch information
abhidotravi authored Apr 12, 2019
1 parent ca4abe8 commit 2544553
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import oadd.org.apache.drill.exec.proto.UserBitShared;

import java.util.List;
import java.util.stream.Collectors;

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeName("queryProfile")
Expand Down Expand Up @@ -159,6 +161,7 @@ public static class OperatorProfile {
public long processNanos;
public long peakLocalMemoryAllocated;
public long waitNanos;
public long optimalMemAllocation;

@Override
public String toString() {
Expand All @@ -176,7 +179,8 @@ public String toString() {
.append("setupNanos=").append(setupNanos).append(", ")
.append("processNanos=").append(processNanos).append(", ")
.append("peakLocalMemoryAllocated=").append(peakLocalMemoryAllocated).append(", ")
.append("waitNanos=").append(waitNanos).append(")");
.append("waitNanos=").append(waitNanos).append(", ")
.append("optimalMemAllocation=").append(optimalMemAllocation).append(")");
return sb.toString();
}
}
Expand All @@ -197,6 +201,49 @@ public String toString() {

}
}

/**
* Get optimal memory allocated per operator.
* Utility parses the DrillQueryProfile
* @param operator
* @return
*/
public long getOptimalMemoryPerOperator(final UserBitShared.CoreOperatorType operator) {
return this.fragmentProfiles
.stream()
.flatMap(f -> f.minorFragmentProfiles
.stream()
.flatMap(m -> m.operatorProfiles.stream())
).filter(o -> o.operatorId == operator.getNumber())
.mapToLong(o -> o.optimalMemAllocation)
.sum();
}

/**
* Get different operators in the profile.
* @return a list of operators in the query profile.
*/
public List<UserBitShared.CoreOperatorType> getOperatorsFromProfile() {
return this.fragmentProfiles
.stream().flatMap(f -> f.minorFragmentProfiles
.stream()
.flatMap(m -> m.operatorProfiles.stream())
).mapToInt(o -> o.operatorId)
.distinct()
.mapToObj(UserBitShared.CoreOperatorType::forNumber)
.collect(Collectors.toList());
}

/**
* Total optimal memory required for the query.
* @return total optimal memory required for the query (as estimated by the RM planner).
*/
public long getTotalOptimalMemoryEstimate() {
return getOperatorsFromProfile()
.stream()
.mapToLong(this::getOptimalMemoryPerOperator)
.sum();
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.test.framework;

import oadd.org.apache.drill.exec.proto.UserBitShared;
import org.apache.commons.io.FilenameUtils;

import java.io.*;
Expand Down Expand Up @@ -44,6 +45,7 @@
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import oadd.org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.test.framework.common.DrillJavaTestBase;
import org.apache.log4j.Logger;
import org.testng.Assert;
Expand All @@ -16,6 +17,7 @@
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.List;
import java.util.Properties;

import static org.apache.drill.test.framework.DrillTestDefaults.DRILL_EXEC_RM_CONFIG_KEY;
Expand Down Expand Up @@ -137,6 +139,7 @@ public void testConfigFileRenderer() {
* Read from the file to validate.
* @throws IOException
*/
@Test(groups = UNIT_GROUP)
public void testWriteRMConfigToFile() throws IOException {
final String fileName = "tempRMConfig.conf";
final String filePath = DrillTestDefaults.TEST_ROOT_DIR + "conf/" + fileName;
Expand All @@ -159,4 +162,35 @@ public void testWriteRMConfigToFile() throws IOException {
Assert.assertEquals(drillRMConfig2.childPools.size(), 2,
"Number of child pools in the config did not match!");
}

/**
* Test parsing operator memory and type from QueryProfile.
*/
@Test
public void testTotalMemoryForQueryProfile() {
final Properties props = Utils.createConnectionProperties();
final ConnectionPool pool = new ConnectionPool(props);
final String sqlStatement = "select name, val, status from sys.options where name like \'%runtime%\'";

try (Connection connection = pool.getOrCreateConnection()) {
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sqlStatement);

final String queryId = Utils.getQueryID(resultSet);
DrillQueryProfile profile = Utils.getQueryProfile(queryId);
Assert.assertEquals(profile.queryId, queryId);

long rmMemEstimate = profile.getTotalOptimalMemoryEstimate();
LOG.info("Memory estimated by RM planner: " + rmMemEstimate);
Assert.assertTrue(rmMemEstimate > 0,
"RM estimated memory should be greater than 0");
List<UserBitShared.CoreOperatorType> operators = profile.getOperatorsFromProfile();
Assert.assertTrue(operators.size() > 0,
"Number of operators in the profile should be greater than 0");
operators.forEach(LOG::info);
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}
}

0 comments on commit 2544553

Please sign in to comment.