Skip to content

Commit

Permalink
Merge pull request #58 from ral-facilities/56_queue_priority
Browse files Browse the repository at this point in the history
 Implement queuing priority #56
  • Loading branch information
patrick-austin authored Jan 16, 2025
2 parents a4bc2fc + d25db61 commit 431a3f3
Show file tree
Hide file tree
Showing 9 changed files with 467 additions and 49 deletions.
17 changes: 17 additions & 0 deletions src/main/config/run.properties.example
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,28 @@ defaultFacilityName=LILS
# but queued requests will only be started when there are less than this many RESTORING downloads.
# Negative values will start all queued jobs immediately, regardless of load.
queue.maxActiveDownloads = 10

# Limit the number files per queued Download part. Multiple Datasets will be combined into part
# Downloads based on their fileCount up to this limit. If a single Dataset has a fileCount
# greater than this limit, it will still be submitted in a part by itself.
queue.maxFileCount = 10000

# When queueing Downloads a positive priority will allow a User to proceed.
# Non-positive values will block that User from submitting a request to the queue.
# When automatically moving jobs from the queued to the PREPARING state, all Downloads
# from Users with priority 1 will be scheduled before 2 and so on.
# InstrumentScientists can either be identified for specific Instrument.names, or a global default
# InvestigationUsers can either be identified for specific InvestigationUser.roles, or a global default
# Authenticated Users without InstrumentScientist or InvestigationUser status will use the authenticated priority
# Anyone who does not meet a specific priority class will use the default
# Users meeting multiple criteria will use the highest priority available (lowest number)
queue.priority.instrumentScientist.instruments = {"ABC": 1}
queue.priority.instrumentScientist.default = 2
queue.priority.investigationUser.roles = {"ABC": 3}
queue.priority.investigationUser.default = 4
queue.priority.authenticated = 5
queue.priority.default = 0

# Configurable limit for the length of the GET URL for requesting Datafiles by a list of file locations
# The exact limit may depend on the server
getUrlLimit=1024
55 changes: 55 additions & 0 deletions src/main/java/org/icatproject/topcat/IcatClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.List;
import java.util.ListIterator;
import java.util.ArrayList;
import java.util.Collections;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;

Expand Down Expand Up @@ -341,6 +342,60 @@ public List<JsonObject> getEntities(String entityType, List<Long> entityIds) thr
return out;
}

/**
* @param userName ICAT User.name to check for access to the queue
* @throws TopcatException If the user has a non-positive priority value (or
* another internal error is triggered)
*/
public void checkQueueAllowed(String userName) throws TopcatException {
if (getQueuePriority(userName) < 1) {
throw new ForbiddenException("Queuing Downloads forbidden");
}
}

/**
* If explicitly set via InstrumentScientist or InvestigationUser mappings,
* the highest priority (lowest value) will be returned.
* Otherwise, if authenticated, the authenticated user default will be returned.
* Otherwise, global default will be returned.
*
* @param userName ICAT User.name to determine the queue priority of
* @return int representing the queue priority. <1 indicates disabled, >=1
* indicates enabled with higher values having lower priority.
* @throws TopcatException
*/
public int getQueuePriority(String userName) throws TopcatException {
PriorityMap priorityMap = PriorityMap.getInstance();
HashMap<Integer, String> mapping = priorityMap.getMapping();
List<Integer> keyList = new ArrayList<>(mapping.keySet());
Collections.sort(keyList);
for (Integer priority : keyList) {
if (checkUser(userName, mapping.get(priority)) > 0) {
return priority;
}
}

if (!userName.equals(Properties.getInstance().getProperty("anonUserName"))) {
return priorityMap.getAuthenticatedPriority();
} else {
return priorityMap.getDefaultPriority();
}
}

/**
* @param userName ICAT User.name to determine the queue priority of
* @param condition JPQL condition representing the possible ways a user can
* have priority
* @return size of the results, 0 means use did not have priority, 1 means they
* did
* @throws TopcatException
*/
int checkUser(String userName, String condition) throws TopcatException {
String query = "SELECT user FROM User user WHERE user.name = '" + userName + "' AND (" + condition + ")";
JsonArray results = submitQuery(query);
return results.size();
}

protected String[] getAdminUserNames() throws Exception {
return Properties.getInstance().getProperty("adminUserNames", "").split("([ ]*,[ ]*|[ ]+)");
}
Expand Down
145 changes: 145 additions & 0 deletions src/main/java/org/icatproject/topcat/PriorityMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package org.icatproject.topcat;

import java.io.ByteArrayInputStream;
import java.util.HashMap;

import org.icatproject.topcat.exceptions.InternalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonReader;

public class PriorityMap {

private static PriorityMap instance = null;

public synchronized static PriorityMap getInstance() throws InternalException {
if (instance == null) {
instance = new PriorityMap();
}
return instance;
}

private int defaultPriority;
private int authenticatedPriority;
private HashMap<Integer, String> mapping = new HashMap<>();
private Logger logger = LoggerFactory.getLogger(PriorityMap.class);

public PriorityMap() {
Properties properties = Properties.getInstance();

String defaultString = properties.getProperty("queue.priority.default", "0");
defaultPriority = Integer.valueOf(defaultString);

String authenticatedString = properties.getProperty("queue.priority.authenticated", defaultString);
setAuthenticatedPriority(authenticatedString);

String property = "queue.priority.investigationUser.default";
String investigationUserString = properties.getProperty(property, authenticatedString);
updateMapping(Integer.valueOf(investigationUserString), "user.investigationUsers IS NOT EMPTY");

property = "queue.priority.instrumentScientist.default";
String instrumentScientistString = properties.getProperty(property, authenticatedString);
updateMapping(Integer.valueOf(instrumentScientistString), "user.instrumentScientists IS NOT EMPTY");

String investigationUserProperty = properties.getProperty("queue.priority.investigationUser.roles");
String investigationUserCondition = "EXISTS ( SELECT o FROM InvestigationUser o WHERE o.role='";
parseObject(investigationUserProperty, investigationUserCondition);

String instrumentScientistProperty = properties.getProperty("queue.priority.instrumentScientist.instruments");
String instrumentScientistCondition = "EXISTS ( SELECT o FROM InstrumentScientist o WHERE o.instrument.name='";
parseObject(instrumentScientistProperty, instrumentScientistCondition);
}

/**
* Set the minimum priority for all authenticated Users. This cannot be lower
* than the defaultPriority, which will be used instead if this is the case.
*
* @param authenticatedString The value read from the run.properties file
*/
private void setAuthenticatedPriority(String authenticatedString) {
authenticatedPriority = Integer.valueOf(authenticatedString);
if (authenticatedPriority < 1 && defaultPriority >= 1) {
String msg = "queue.priority.authenticated disabled with value " + authenticatedString;
msg += " but queue.priority.default enabled with value " + defaultPriority;
msg += "\nAuthenticated users will use default priority if no superseding priority applies";
logger.warn(msg);
authenticatedPriority = defaultPriority;
} else if (authenticatedPriority >= 1 && authenticatedPriority > defaultPriority) {
String msg = "queue.priority.authenticated enabled with value " + authenticatedString;
msg += " but queue.priority.default supersedes with value " + defaultPriority;
msg += "\nAuthenticated users will use default priority if no superseding priority applies";
logger.warn(msg);
authenticatedPriority = defaultPriority;
}
}

/**
* Extracts each key from a JsonObject, and appends this to the JPQL condition
* for this priority level with OR.
*
* @param propertyString String representing a JsonObject from the
* run.properties file, or null
* @param conditionPrefix JPQL condition which will be formatted with each key
* in the object
*/
private void parseObject(String propertyString, String conditionPrefix) {
if (propertyString == null) {
return;
}
JsonReader reader = Json.createReader(new ByteArrayInputStream(propertyString.getBytes()));
JsonObject object = reader.readObject();
for (String key : object.keySet()) {
int priority = object.getInt(key);
updateMapping(priority, conditionPrefix + key + "' AND o.user=user )");
}
}

/**
* Appends the newCondition to the mapping at the specified priority level using
* OR.
*
* @param priority Priority of the new condition
* @param newCondition Fully formatted JPQL condition
*/
private void updateMapping(int priority, String newCondition) {
if (priority < 1) {
logger.warn("Non-positive priority found in mapping, ignoring entry");
return;
} else if (authenticatedPriority >= 1 && priority >= authenticatedPriority) {
logger.warn("Priority set in mapping would be superseded by queue.priority.authenticated, ignoring entry");
return;
}

String oldCondition = mapping.get(priority);
if (oldCondition != null) {
mapping.put(priority, oldCondition + " OR " + newCondition);
} else {
mapping.put(priority, newCondition);
}
}

/**
* @return Mapping of priority level to a JPQL condition which defines the Users
* who have this priority
*/
public HashMap<Integer, String> getMapping() {
return mapping;
}

/**
* @return The priority which applies to all authenticated users
*/
public int getAuthenticatedPriority() {
return authenticatedPriority;
}

/**
* @return The priority which applies to all users, included anonymous access
*/
public int getDefaultPriority() {
return defaultPriority;
}
}
80 changes: 63 additions & 17 deletions src/main/java/org/icatproject/topcat/StatusCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import java.net.URL;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -273,9 +276,13 @@ private void prepareDownload(Download download, IdsClient injectedIdsClient) thr
}

/**
* Prepares Downloads which are queued (PAUSED with no preparedId) up to the maxActiveDownloads limit.
* Prepares Downloads which are queued (PAUSED with no preparedId) up to the
* maxActiveDownloads limit.
* Downloads will be prepared in order of priority, with all Downloads from
* Users with a value of 1 being prepared first, then 2 and so on.
*
* @param maxActiveDownloads Limit on the number of concurrent jobs with RESTORING status
* @param maxActiveDownloads Limit on the number of concurrent jobs with
* RESTORING status
* @throws Exception
*/
public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
Expand All @@ -288,29 +295,68 @@ public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
String restoringCondition = "download.status = org.icatproject.topcat.domain.DownloadStatus.RESTORING";
String pausedCondition = "download.status = org.icatproject.topcat.domain.DownloadStatus.PAUSED";

String activeQueryString = selectString + " and " + restoringCondition;
TypedQuery<Download> activeDownloadsQuery = em.createQuery(activeQueryString, Download.class);
List<Download> activeDownloads = activeDownloadsQuery.getResultList();

String queuedQueryString = selectString + " and " + pausedCondition + " and download.preparedId = null";
queuedQueryString += " order by download.createdAt";
TypedQuery<Download> queuedDownloadsQuery = em.createQuery(queuedQueryString, Download.class);
if (maxActiveDownloads > 0) {
int freeActiveDownloads = maxActiveDownloads - activeDownloads.size();
if (freeActiveDownloads <= 0) {
String activeQueryString = selectString + " and " + restoringCondition;
TypedQuery<Download> activeDownloadsQuery = em.createQuery(activeQueryString, Download.class);
List<Download> activeDownloads = activeDownloadsQuery.getResultList();
maxActiveDownloads -= activeDownloads.size();
if (maxActiveDownloads <= 0) {
String format = "More downloads currently RESTORING {} than maxActiveDownloads {}, cannot prepare queued jobs";
logger.info(format, activeDownloads.size(), maxActiveDownloads);
return;
}
queuedDownloadsQuery.setMaxResults(freeActiveDownloads);
}

String queuedQueryString = selectString + " and " + pausedCondition + " and download.preparedId = null";
queuedQueryString += " order by download.createdAt";
TypedQuery<Download> queuedDownloadsQuery = em.createQuery(queuedQueryString, Download.class);
List<Download> queuedDownloads = queuedDownloadsQuery.getResultList();

logger.info("Preparing {} queued downloads", queuedDownloads.size());
for (Download queuedDownload : queuedDownloads) {
queuedDownload.setStatus(DownloadStatus.PREPARING);
prepareDownload(queuedDownload, null);

if (maxActiveDownloads <= 0) {
logger.info("Preparing {} queued downloads", queuedDownloads.size());
// No limits on how many to submit
for (Download queuedDownload : queuedDownloads) {
queuedDownload.setStatus(DownloadStatus.PREPARING);
prepareDownload(queuedDownload, null);
}
} else {
logger.info("Preparing up to {} queued downloads", maxActiveDownloads);
HashMap<Integer, List<Download>> mapping = new HashMap<>();
for (Download queuedDownload : queuedDownloads) {
String icatUrl = FacilityMap.getInstance().getIcatUrl(queuedDownload.getFacilityName());
IcatClient icatClient = new IcatClient(icatUrl, queuedDownload.getSessionId());
int priority = icatClient.getQueuePriority(queuedDownload.getUserName());
if (priority == 1) {
// Highest priority, prepare now
queuedDownload.setStatus(DownloadStatus.PREPARING);
prepareDownload(queuedDownload, null);
maxActiveDownloads -= 1;
if (maxActiveDownloads <= 0) {
return;
}
} else {
// Lower priority, add to mapping
mapping.putIfAbsent(priority, new ArrayList<>());
mapping.get(priority).add(queuedDownload);
}
}
List<Integer> keyList = new ArrayList<>();
for (Object key : mapping.keySet().toArray()) {
keyList.add((Integer) key);
}
Collections.sort(keyList);
for (int key : keyList) {
// Prepare from mapping in priority order
List<Download> downloadList = mapping.get(key);
for (Download download : downloadList) {
download.setStatus(DownloadStatus.PREPARING);
prepareDownload(download, null);
maxActiveDownloads -= 1;
if (maxActiveDownloads <= 0) {
return;
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@ public Response queueVisitId(@FormParam("facilityName") String facilityName,
// If we wanted to block the user, this is where we would do it
String userName = icatClient.getUserName();
String fullName = icatClient.getFullName();
icatClient.checkQueueAllowed(userName);
JsonArray datasets = icatClient.getDatasets(visitId);

long downloadId;
Expand Down Expand Up @@ -967,6 +968,7 @@ public Response queueFiles(@FormParam("facilityName") String facilityName,
// If we wanted to block the user, this is where we would do it
String userName = icatClient.getUserName();
String fullName = icatClient.getFullName();
icatClient.checkQueueAllowed(userName);
List<Long> datafileIds = icatClient.getDatafiles(files);

long downloadId;
Expand Down
Loading

0 comments on commit 431a3f3

Please sign in to comment.