Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement queuing priority #56 #58

Merged
merged 6 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/main/config/run.properties.example
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,28 @@ defaultFacilityName=LILS
# but queued requests will only be started when there are less than this many RESTORING downloads.
# Negative values will start all queued jobs immediately, regardless of load.
queue.maxActiveDownloads = 10

# Limit the number files per queued Download part. Multiple Datasets will be combined into part
# Downloads based on their fileCount up to this limit. If a single Dataset has a fileCount
# greater than this limit, it will still be submitted in a part by itself.
queue.maxFileCount = 10000

# When queueing Downloads a positive priority will allow a User to proceed.
# Non-positive values will block that User from submitting a request to the queue.
# When automatically moving jobs from the queued to the PREPARING state, all Downloads
# from Users with priority 1 will be scheduled before 2 and so on.
# InstrumentScientists can either be identified for specific Instrument.names, or a global default
# InvestigationUsers can either be identified for specific InvestigationUser.roles, or a global default
# Authenticated Users without InstrumentScientist or InvestigationUser status will use the authenticated priority
# Anyone who does not meet a specific priority class will use the default
# Users meeting multiple criteria will use the highest priority available (lowest number)
queue.priority.instrumentScientist.instruments = {"ABC": 1}
queue.priority.instrumentScientist.default = 2
queue.priority.investigationUser.roles = {"ABC": 3}
queue.priority.investigationUser.default = 4
queue.priority.authenticated = 5
queue.priority.default = 0

# Configurable limit for the length of the GET URL for requesting Datafiles by a list of file locations
# The exact limit may depend on the server
getUrlLimit=1024
55 changes: 55 additions & 0 deletions src/main/java/org/icatproject/topcat/IcatClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.List;
import java.util.ListIterator;
import java.util.ArrayList;
import java.util.Collections;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;

Expand Down Expand Up @@ -341,6 +342,60 @@ public List<JsonObject> getEntities(String entityType, List<Long> entityIds) thr
return out;
}

/**
* @param userName ICAT User.name to check for access to the queue
* @throws TopcatException If the user has a non-positive priority value (or
* another internal error is triggered)
*/
public void checkQueueAllowed(String userName) throws TopcatException {
if (getQueuePriority(userName) < 1) {
throw new ForbiddenException("Queuing Downloads forbidden");
}
}

/**
* If explicitly set via InstrumentScientist or InvestigationUser mappings,
* the highest priority (lowest value) will be returned.
* Otherwise, if authenticated, the authenticated user default will be returned.
* Otherwise, global default will be returned.
*
* @param userName ICAT User.name to determine the queue priority of
* @return int representing the queue priority. <1 indicates disabled, >=1
* indicates enabled with higher values having lower priority.
* @throws TopcatException
*/
public int getQueuePriority(String userName) throws TopcatException {
PriorityMap priorityMap = PriorityMap.getInstance();
HashMap<Integer, String> mapping = priorityMap.getMapping();
List<Integer> keyList = new ArrayList<>(mapping.keySet());
Collections.sort(keyList);
for (Integer priority : keyList) {
if (checkUser(userName, mapping.get(priority)) > 0) {
return priority;
}
}

if (!userName.equals(Properties.getInstance().getProperty("anonUserName"))) {
return priorityMap.getAuthenticatedPriority();
} else {
return priorityMap.getDefaultPriority();
}
}

/**
* @param userName ICAT User.name to determine the queue priority of
* @param condition JPQL condition representing the possible ways a user can
* have priority
* @return size of the results, 0 means use did not have priority, 1 means they
* did
* @throws TopcatException
*/
int checkUser(String userName, String condition) throws TopcatException {
String query = "SELECT user FROM User user WHERE user.name = '" + userName + "' AND (" + condition + ")";
JsonArray results = submitQuery(query);
return results.size();
}

protected String[] getAdminUserNames() throws Exception {
return Properties.getInstance().getProperty("adminUserNames", "").split("([ ]*,[ ]*|[ ]+)");
}
Expand Down
145 changes: 145 additions & 0 deletions src/main/java/org/icatproject/topcat/PriorityMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package org.icatproject.topcat;

import java.io.ByteArrayInputStream;
import java.util.HashMap;

import org.icatproject.topcat.exceptions.InternalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonReader;

public class PriorityMap {

private static PriorityMap instance = null;

public synchronized static PriorityMap getInstance() throws InternalException {
if (instance == null) {
instance = new PriorityMap();
}
return instance;
}

private int defaultPriority;
private int authenticatedPriority;
private HashMap<Integer, String> mapping = new HashMap<>();
private Logger logger = LoggerFactory.getLogger(PriorityMap.class);

public PriorityMap() {
Properties properties = Properties.getInstance();

String defaultString = properties.getProperty("queue.priority.default", "0");
defaultPriority = Integer.valueOf(defaultString);

String authenticatedString = properties.getProperty("queue.priority.authenticated", defaultString);
setAuthenticatedPriority(authenticatedString);

String property = "queue.priority.investigationUser.default";
String investigationUserString = properties.getProperty(property, authenticatedString);
updateMapping(Integer.valueOf(investigationUserString), "user.investigationUsers IS NOT EMPTY");

property = "queue.priority.instrumentScientist.default";
String instrumentScientistString = properties.getProperty(property, authenticatedString);
updateMapping(Integer.valueOf(instrumentScientistString), "user.instrumentScientists IS NOT EMPTY");

String investigationUserProperty = properties.getProperty("queue.priority.investigationUser.roles");
String investigationUserCondition = "EXISTS ( SELECT o FROM InvestigationUser o WHERE o.role='";
parseObject(investigationUserProperty, investigationUserCondition);

String instrumentScientistProperty = properties.getProperty("queue.priority.instrumentScientist.instruments");
String instrumentScientistCondition = "EXISTS ( SELECT o FROM InstrumentScientist o WHERE o.instrument.name='";
parseObject(instrumentScientistProperty, instrumentScientistCondition);
}

/**
* Set the minimum priority for all authenticated Users. This cannot be lower
* than the defaultPriority, which will be used instead if this is the case.
*
* @param authenticatedString The value read from the run.properties file
*/
private void setAuthenticatedPriority(String authenticatedString) {
authenticatedPriority = Integer.valueOf(authenticatedString);
if (authenticatedPriority < 1 && defaultPriority >= 1) {
String msg = "queue.priority.authenticated disabled with value " + authenticatedString;
msg += " but queue.priority.default enabled with value " + defaultPriority;
msg += "\nAuthenticated users will use default priority if no superseding priority applies";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These last 3 lines in both the if and the else could go below the if/else as they are common.
Although you would also need to initialise msg above the if/else, so that line is borderline worth/not worth it.
However, the final two lines are probably worth moving.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have refactored the last three lines, this also means introducing an else clause so we can return early don't warn/set default when we don't meet either criteria (corresponding to a "normal" value for authenticatedPriority).

logger.warn(msg);
authenticatedPriority = defaultPriority;
} else if (authenticatedPriority >= 1 && authenticatedPriority > defaultPriority) {
String msg = "queue.priority.authenticated enabled with value " + authenticatedString;
msg += " but queue.priority.default supersedes with value " + defaultPriority;
msg += "\nAuthenticated users will use default priority if no superseding priority applies";
logger.warn(msg);
authenticatedPriority = defaultPriority;
}
}

/**
* Extracts each key from a JsonObject, and appends this to the JPQL condition
* for this priority level with OR.
*
* @param propertyString String representing a JsonObject from the
* run.properties file, or null
* @param conditionPrefix JPQL condition which will be formatted with each key
* in the object
*/
private void parseObject(String propertyString, String conditionPrefix) {
if (propertyString == null) {
return;
}
JsonReader reader = Json.createReader(new ByteArrayInputStream(propertyString.getBytes()));
JsonObject object = reader.readObject();
for (String key : object.keySet()) {
int priority = object.getInt(key);
updateMapping(priority, conditionPrefix + key + "' AND o.user=user )");
}
}

/**
* Appends the newCondition to the mapping at the specified priority level using
* OR.
*
* @param priority Priority of the new condition
* @param newCondition Fully formatted JPQL condition
*/
private void updateMapping(int priority, String newCondition) {
if (priority < 1) {
logger.warn("Non-positive priority found in mapping, ignoring entry");
return;
} else if (authenticatedPriority >= 1 && priority >= authenticatedPriority) {
logger.warn("Priority set in mapping would be superseded by queue.priority.authenticated, ignoring entry");
return;
}

String oldCondition = mapping.get(priority);
if (oldCondition != null) {
mapping.put(priority, oldCondition + " OR " + newCondition);
} else {
mapping.put(priority, newCondition);
}
}

/**
* @return Mapping of priority level to a JPQL condition which defines the Users
* who have this priority
*/
public HashMap<Integer, String> getMapping() {
return mapping;
}

/**
* @return The priority which applies to all authenticated users
*/
public int getAuthenticatedPriority() {
return authenticatedPriority;
}

/**
* @return The priority which applies to all users, included anonymous access
*/
public int getDefaultPriority() {
return defaultPriority;
}
}
80 changes: 63 additions & 17 deletions src/main/java/org/icatproject/topcat/StatusCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import java.net.URL;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -273,9 +276,13 @@ private void prepareDownload(Download download, IdsClient injectedIdsClient) thr
}

/**
* Prepares Downloads which are queued (PAUSED with no preparedId) up to the maxActiveDownloads limit.
* Prepares Downloads which are queued (PAUSED with no preparedId) up to the
* maxActiveDownloads limit.
* Downloads will be prepared in order of priority, with all Downloads from
* Users with a value of 1 being prepared first, then 2 and so on.
*
* @param maxActiveDownloads Limit on the number of concurrent jobs with RESTORING status
* @param maxActiveDownloads Limit on the number of concurrent jobs with
* RESTORING status
* @throws Exception
*/
public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
Expand All @@ -288,29 +295,68 @@ public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
String restoringCondition = "download.status = org.icatproject.topcat.domain.DownloadStatus.RESTORING";
String pausedCondition = "download.status = org.icatproject.topcat.domain.DownloadStatus.PAUSED";

String activeQueryString = selectString + " and " + restoringCondition;
TypedQuery<Download> activeDownloadsQuery = em.createQuery(activeQueryString, Download.class);
List<Download> activeDownloads = activeDownloadsQuery.getResultList();

String queuedQueryString = selectString + " and " + pausedCondition + " and download.preparedId = null";
queuedQueryString += " order by download.createdAt";
TypedQuery<Download> queuedDownloadsQuery = em.createQuery(queuedQueryString, Download.class);
if (maxActiveDownloads > 0) {
int freeActiveDownloads = maxActiveDownloads - activeDownloads.size();
if (freeActiveDownloads <= 0) {
String activeQueryString = selectString + " and " + restoringCondition;
TypedQuery<Download> activeDownloadsQuery = em.createQuery(activeQueryString, Download.class);
List<Download> activeDownloads = activeDownloadsQuery.getResultList();
maxActiveDownloads -= activeDownloads.size();
if (maxActiveDownloads <= 0) {
String format = "More downloads currently RESTORING {} than maxActiveDownloads {}, cannot prepare queued jobs";
logger.info(format, activeDownloads.size(), maxActiveDownloads);
return;
}
queuedDownloadsQuery.setMaxResults(freeActiveDownloads);
}

String queuedQueryString = selectString + " and " + pausedCondition + " and download.preparedId = null";
queuedQueryString += " order by download.createdAt";
TypedQuery<Download> queuedDownloadsQuery = em.createQuery(queuedQueryString, Download.class);
List<Download> queuedDownloads = queuedDownloadsQuery.getResultList();

logger.info("Preparing {} queued downloads", queuedDownloads.size());
for (Download queuedDownload : queuedDownloads) {
queuedDownload.setStatus(DownloadStatus.PREPARING);
prepareDownload(queuedDownload, null);

if (maxActiveDownloads <= 0) {
logger.info("Preparing {} queued downloads", queuedDownloads.size());
// No limits on how many to submit
for (Download queuedDownload : queuedDownloads) {
queuedDownload.setStatus(DownloadStatus.PREPARING);
prepareDownload(queuedDownload, null);
}
} else {
logger.info("Preparing up to {} queued downloads", maxActiveDownloads);
HashMap<Integer, List<Download>> mapping = new HashMap<>();
for (Download queuedDownload : queuedDownloads) {
String icatUrl = FacilityMap.getInstance().getIcatUrl(queuedDownload.getFacilityName());
IcatClient icatClient = new IcatClient(icatUrl, queuedDownload.getSessionId());
int priority = icatClient.getQueuePriority(queuedDownload.getUserName());
if (priority == 1) {
// Highest priority, prepare now
queuedDownload.setStatus(DownloadStatus.PREPARING);
prepareDownload(queuedDownload, null);
maxActiveDownloads -= 1;
if (maxActiveDownloads <= 0) {
return;
}
} else {
// Lower priority, add to mapping
mapping.putIfAbsent(priority, new ArrayList<>());
mapping.get(priority).add(queuedDownload);
}
}
List<Integer> keyList = new ArrayList<>();
for (Object key : mapping.keySet().toArray()) {
keyList.add((Integer) key);
}
Collections.sort(keyList);
for (int key : keyList) {
// Prepare from mapping in priority order
List<Download> downloadList = mapping.get(key);
for (Download download : downloadList) {
download.setStatus(DownloadStatus.PREPARING);
prepareDownload(download, null);
maxActiveDownloads -= 1;
if (maxActiveDownloads <= 0) {
return;
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ public Response queueVisitId(@FormParam("facilityName") String facilityName,
// If we wanted to block the user, this is where we would do it
String userName = icatClient.getUserName();
String fullName = icatClient.getFullName();
icatClient.checkQueueAllowed(userName);
JsonArray datasets = icatClient.getDatasets(visitId);

long downloadId;
Expand Down Expand Up @@ -937,6 +938,7 @@ public Response queueFiles(@FormParam("facilityName") String facilityName,
// If we wanted to block the user, this is where we would do it
String userName = icatClient.getUserName();
String fullName = icatClient.getFullName();
icatClient.checkQueueAllowed(userName);
List<Long> datafileIds = icatClient.getDatafiles(files);

long downloadId;
Expand Down
Loading
Loading