Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement queuing priority #56 #58

Merged
merged 6 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/main/config/run.properties.example
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,28 @@ maxCacheSize=100000
# Username that corresponds with the anonymous user - this is used to make anonymous carts unique
anonUserName=anon/anon

# Limit the number files per queued Download part. Multiple Datasets will be combined into part
# Downloads based on their fileCount up to this limit. If a single Dataset has a fileCount
# greater than this limit, it will still be submitted in a part by itself.
queue.maxFileCount = 10000

# Limit the number maximum of active RESTORING downloads. Does not affect user submitted carts,
# but queued requests will only be started when there are less than this many RESTORING downloads.
# Negative values will start all queued jobs immediately, regardless of load.
queue.maxActiveDownloads = 10

# When queueing Downloads a positive priority will allow a User to proceed.
# Non-positive values will block that User from submitting a request to the queue.
# When automatically moving jobs from the queued to the PREPARING state, all Downloads
# from Users with priority 1 will be scheduled before 2 and so on.
# InstrumentScientists can either be identified for specific Instrument.names, or a global default
# InvestigationUsers can either be identified for specific InvestigationUser.roles, or a global default
# Authenticated Users without InstrumentScientist or InvestigationUser status will use the authenticated priority
# Anyone who does not meet a specific priority class will use the default
# Users meeting multiple criteria will use the highest priority available (lowest number)
queue.priority.instrumentScientist.instruments = {"ABC": 1}
queue.priority.instrumentScientist.default = 2
queue.priority.investigationUser.roles = {"ABC": 3}
queue.priority.investigationUser.default = 4
queue.priority.authenticated = 5
queue.priority.default = 0
132 changes: 131 additions & 1 deletion src/main/java/org/icatproject/topcat/IcatClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.ArrayList;

import java.util.Collections;
import java.net.URLEncoder;

import org.icatproject.topcat.httpclient.*;
Expand Down Expand Up @@ -94,6 +95,81 @@ public String getFullName() throws TopcatException {
}
}

/**
* Get all Datasets whose parent Investigation has the specified visitId.
*
* @param visitId ICAT Investigation.visitId
* @return JsonArray of Dataset fields, where each entry is a JsonArray of
* [dataset.id, dataset.fileCount].
* @throws TopcatException
*/
public JsonArray getDatasets(String visitId) throws TopcatException {
String query = "SELECT dataset.id, dataset.fileCount from Dataset dataset";
query += " WHERE dataset.investigation.visitId = '" + visitId + "' ORDER BY dataset.id";
return submitQuery(query);
}

/**
* Get all Datafiles in the list of file locations.
*
* @param files List of ICAT Datafile.locations
* @return JsonArray of Datafile ids.
* @throws TopcatException
*/
public JsonArray getDatafiles(List<String> files) throws TopcatException {
StringBuilder stringBuilder = new StringBuilder();
ListIterator<String> fileIterator = files.listIterator();
stringBuilder.append("'" + fileIterator.next() + "'");
fileIterator.forEachRemaining(file -> {
stringBuilder.append(",");
stringBuilder.append("'" + file + "'");
});
String formattedFiles = stringBuilder.toString();
String query = "SELECT datafile.id from Datafile datafile";
query += " WHERE datafile.location in (" + formattedFiles + ") ORDER BY datafile.id";
return submitQuery(query);
}

/**
* Utility method to get the fileCount (not size) of a Dataset by COUNT of its
* child Datafiles. Ideally the fileCount field should be used, this is a
* fallback option if that field is not set.
*
* @param datasetId ICAT Dataset.id
* @return The number of Datafiles in the specified Dataset
* @throws TopcatException
*/
public long getDatasetFileCount(long datasetId) throws TopcatException {
String query = "SELECT COUNT(datafile) FROM Datafile datafile WHERE datafile.dataset.id = " + datasetId;
JsonArray jsonArray = submitQuery(query);
return jsonArray.getJsonNumber(0).longValueExact();
}

/**
* Utility method for submitting an unformatted query to the entityManager
* endpoint, and returning the resultant JsonArray.
*
* @param query Unformatted String query to submit
* @return JsonArray of results, contents will depend on the query.
* @throws TopcatException
*/
private JsonArray submitQuery(String query) throws TopcatException {
try {
String encodedQuery = URLEncoder.encode(query, "UTF8");
String url = "entityManager?sessionId=" + URLEncoder.encode(sessionId, "UTF8") + "&query=" + encodedQuery;
Response response = httpClient.get(url, new HashMap<String, String>());
if (response.getCode() == 404) {
throw new NotFoundException("Could not run getEntities got a 404 response");
} else if (response.getCode() >= 400) {
throw new BadRequestException(Utils.parseJsonObject(response.toString()).getString("message"));
}
return Utils.parseJsonArray(response.toString());
} catch (TopcatException e) {
throw e; } catch (Exception e) {
throw new BadRequestException(e.getMessage());
}
}

/**
* Gets a single Entity of the specified type, without any other conditions.
*
Expand Down Expand Up @@ -193,6 +269,60 @@ public List<JsonObject> getEntities(String entityType, List<Long> entityIds) thr
return out;
}

/**
* @param userName ICAT User.name to check for access to the queue
* @throws TopcatException If the user has a non-positive priority value (or
* another internal error is triggered)
*/
public void checkQueueAllowed(String userName) throws TopcatException {
if (getQueuePriority(userName) < 1) {
throw new ForbiddenException("Queuing Downloads forbidden");
}
}

/**
* If explicitly set via InstrumentScientist or InvestigationUser mappings,
* the highest priority (lowest value) will be returned.
* Otherwise, if authenticated, the authenticated user default will be returned.
* Otherwise, global default will be returned.
*
* @param userName ICAT User.name to determine the queue priority of
* @return int representing the queue priority. <1 indicates disabled, >=1
* indicates enabled with higher values having lower priority.
* @throws TopcatException
*/
public int getQueuePriority(String userName) throws TopcatException {
PriorityMap priorityMap = PriorityMap.getInstance();
HashMap<Integer, String> mapping = priorityMap.getMapping();
List<Integer> keyList = new ArrayList<>(mapping.keySet());
Collections.sort(keyList);
for (Integer priority : keyList) {
if (checkUser(userName, mapping.get(priority)) > 0) {
return priority;
}
}

if (!userName.equals(Properties.getInstance().getProperty("anonUserName"))) {
return priorityMap.getAuthenticatedPriority();
} else {
return priorityMap.getDefaultPriority();
}
}

/**
* @param userName ICAT User.name to determine the queue priority of
* @param condition JPQL condition representing the possible ways a user can
* have priority
* @return size of the results, 0 means use did not have priority, 1 means they
* did
* @throws TopcatException
*/
int checkUser(String userName, String condition) throws TopcatException {
String query = "SELECT user FROM User user WHERE user.name = '" + userName + "' AND (" + condition + ")";
JsonArray results = submitQuery(query);
return results.size();
}

protected String[] getAdminUserNames() throws Exception {
return Properties.getInstance().getProperty("adminUserNames", "").split("([ ]*,[ ]*|[ ]+)");
}
Expand Down
145 changes: 145 additions & 0 deletions src/main/java/org/icatproject/topcat/PriorityMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package org.icatproject.topcat;

import java.io.ByteArrayInputStream;
import java.util.HashMap;

import org.icatproject.topcat.exceptions.InternalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonReader;

public class PriorityMap {

private static PriorityMap instance = null;

public synchronized static PriorityMap getInstance() throws InternalException {
if (instance == null) {
instance = new PriorityMap();
}
return instance;
}

private int defaultPriority;
private int authenticatedPriority;
private HashMap<Integer, String> mapping = new HashMap<>();
private Logger logger = LoggerFactory.getLogger(PriorityMap.class);

public PriorityMap() {
Properties properties = Properties.getInstance();

String defaultString = properties.getProperty("queue.priority.default", "0");
defaultPriority = Integer.valueOf(defaultString);

String authenticatedString = properties.getProperty("queue.priority.authenticated", defaultString);
setAuthenticatedPriority(authenticatedString);

String property = "queue.priority.investigationUser.default";
String investigationUserString = properties.getProperty(property, authenticatedString);
updateMapping(Integer.valueOf(investigationUserString), "user.investigationUsers IS NOT EMPTY");

property = "queue.priority.instrumentScientist.default";
String instrumentScientistString = properties.getProperty(property, authenticatedString);
updateMapping(Integer.valueOf(instrumentScientistString), "user.instrumentScientists IS NOT EMPTY");

String investigationUserProperty = properties.getProperty("queue.priority.investigationUser.roles");
String investigationUserCondition = "EXISTS ( SELECT o FROM InvestigationUser o WHERE o.role='";
parseObject(investigationUserProperty, investigationUserCondition);

String instrumentScientistProperty = properties.getProperty("queue.priority.instrumentScientist.instruments");
String instrumentScientistCondition = "EXISTS ( SELECT o FROM InstrumentScientist o WHERE o.instrument.name='";
parseObject(instrumentScientistProperty, instrumentScientistCondition);
}

/**
* Set the minimum priority for all authenticated Users. This cannot be lower
* than the defaultPriority, which will be used instead if this is the case.
*
* @param authenticatedString The value read from the run.properties file
*/
private void setAuthenticatedPriority(String authenticatedString) {
authenticatedPriority = Integer.valueOf(authenticatedString);
if (authenticatedPriority < 1 && defaultPriority >= 1) {
String msg = "queue.priority.authenticated disabled with value " + authenticatedString;
msg += " but queue.priority.default enabled with value " + defaultPriority;
msg += "\nAuthenticated users will use default priority if no superseding priority applies";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These last 3 lines in both the if and the else could go below the if/else as they are common.
Although you would also need to initialise msg above the if/else, so that line is borderline worth/not worth it.
However, the final two lines are probably worth moving.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have refactored the last three lines, this also means introducing an else clause so we can return early don't warn/set default when we don't meet either criteria (corresponding to a "normal" value for authenticatedPriority).

logger.warn(msg);
authenticatedPriority = defaultPriority;
} else if (authenticatedPriority >= 1 && authenticatedPriority > defaultPriority) {
String msg = "queue.priority.authenticated enabled with value " + authenticatedString;
msg += " but queue.priority.default supersedes with value " + defaultPriority;
msg += "\nAuthenticated users will use default priority if no superseding priority applies";
logger.warn(msg);
authenticatedPriority = defaultPriority;
}
}

/**
* Extracts each key from a JsonObject, and appends this to the JPQL condition
* for this priority level with OR.
*
* @param propertyString String representing a JsonObject from the
* run.properties file, or null
* @param conditionPrefix JPQL condition which will be formatted with each key
* in the object
*/
private void parseObject(String propertyString, String conditionPrefix) {
if (propertyString == null) {
return;
}
JsonReader reader = Json.createReader(new ByteArrayInputStream(propertyString.getBytes()));
JsonObject object = reader.readObject();
for (String key : object.keySet()) {
int priority = object.getInt(key);
updateMapping(priority, conditionPrefix + key + "' AND o.user=user )");
}
}

/**
* Appends the newCondition to the mapping at the specified priority level using
* OR.
*
* @param priority Priority of the new condition
* @param newCondition Fully formatted JPQL condition
*/
private void updateMapping(int priority, String newCondition) {
if (priority < 1) {
logger.warn("Non-positive priority found in mapping, ignoring entry");
return;
} else if (authenticatedPriority >= 1 && priority >= authenticatedPriority) {
logger.warn("Priority set in mapping would be superseded by queue.priority.authenticated, ignoring entry");
return;
}

String oldCondition = mapping.get(priority);
if (oldCondition != null) {
mapping.put(priority, oldCondition + " OR " + newCondition);
} else {
mapping.put(priority, newCondition);
}
}

/**
* @return Mapping of priority level to a JPQL condition which defines the Users
* who have this priority
*/
public HashMap<Integer, String> getMapping() {
return mapping;
}

/**
* @return The priority which applies to all authenticated users
*/
public int getAuthenticatedPriority() {
return authenticatedPriority;
}

/**
* @return The priority which applies to all users, included anonymous access
*/
public int getDefaultPriority() {
return defaultPriority;
}
}
Loading
Loading