Skip to content

Commit

Permalink
Merge cleanup tasks into a single Housekeeping task
Browse files Browse the repository at this point in the history
Signed-off-by: nscuro <[email protected]>
  • Loading branch information
nscuro committed Aug 14, 2024
1 parent b6c9074 commit 70d4d71
Show file tree
Hide file tree
Showing 18 changed files with 428 additions and 520 deletions.
8 changes: 4 additions & 4 deletions src/main/java/org/dependencytrack/common/ConfigKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import alpine.Config;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

public enum ConfigKey implements Config.Key {

Expand Down Expand Up @@ -49,12 +50,11 @@ public enum ConfigKey implements Config.Key {
CRON_EXPRESSION_FOR_LDAP_SYNC_TASK("task.cron.ldapSync", "0 */6 * * *"),
CRON_EXPRESSION_FOR_REPO_META_ANALYSIS_TASK("task.cron.repoMetaAnalysis", "0 1 * * *"),
CRON_EXPRESSION_FOR_VULN_ANALYSIS_TASK("task.cron.vulnAnalysis", "0 6 * * *"),
CRON_EXPRESSION_FOR_VULN_SCAN_CLEANUP_TASK("task.cron.vulnScanCleanUp", "5 8 * * 4"),
CRON_EXPRESSION_FOR_FORTIFY_SSC_SYNC("task.cron.fortify.ssc.sync", "0 2 * * *"),
CRON_EXPRESSION_FOR_DEFECT_DOJO_SYNC("task.cron.defectdojo.sync", "0 2 * * *"),
CRON_EXPRESSION_FOR_KENNA_SYNC("task.cron.kenna.sync", "0 2 * * *"),
CRON_EXPRESSION_FOR_WORKFLOW_STATE_CLEANUP_TASK("task.cron.workflow.state.cleanup", "*/15 * * * *"),
CRON_EXPRESSION_FOR_INTEGRITY_META_INITIALIZER_TASK("task.cron.integrityInitializer", "0 */12 * * *"),
CRON_EXPRESSION_FOR_HOUSEKEEPING_TASK("task.cron.housekeeping", "45 * * * *"),
TASK_SCHEDULER_INITIAL_DELAY("task.scheduler.initial.delay", "180000"),
TASK_SCHEDULER_POLLING_INTERVAL("task.scheduler.polling.interval", "60000"),
TASK_PORTFOLIO_LOCK_AT_MOST_FOR("task.metrics.portfolio.lockAtMostForInMillis", "900000"),
Expand All @@ -67,14 +67,14 @@ public enum ConfigKey implements Config.Key {
TASK_COMPONENT_IDENTIFICATION_LOCK_AT_LEAST_FOR("task.componentIdentification.lockAtLeastForInMillis", "90000"),
TASK_LDAP_SYNC_LOCK_AT_MOST_FOR("task.ldapSync.lockAtMostForInMillis", "900000"),
TASK_LDAP_SYNC_LOCK_AT_LEAST_FOR("task.ldapSync.lockAtLeastForInMillis", "90000"),
TASK_WORKFLOW_STEP_CLEANUP_LOCK_AT_MOST_FOR("task.workflow.state.cleanup.lockAtMostForInMillis", String.valueOf(Duration.ofMinutes(15).toMillis())),
TASK_WORKFLOW_STEP_CLEANUP_LOCK_AT_LEAST_FOR("task.workflow.state.cleanup.lockAtLeastForInMillis", String.valueOf(Duration.ofMinutes(15).toMillis())),
TASK_PORTFOLIO_REPO_META_ANALYSIS_LOCK_AT_MOST_FOR("task.portfolio.repoMetaAnalysis.lockAtMostForInMillis", String.valueOf(Duration.ofMinutes(15).toMillis())),
TASK_PORTFOLIO_REPO_META_ANALYSIS_LOCK_AT_LEAST_FOR("task.portfolio.repoMetaAnalysis.lockAtLeastForInMillis", String.valueOf(Duration.ofMinutes(5).toMillis())),
TASK_PORTFOLIO_VULN_ANALYSIS_LOCK_AT_MOST_FOR("task.portfolio.vulnAnalysis.lockAtMostForInMillis", String.valueOf(Duration.ofMinutes(15).toMillis())),
TASK_PORTFOLIO_VULN_ANALYSIS_LOCK_AT_LEAST_FOR("task.portfolio.vulnAnalysis.lockAtLeastForInMillis", String.valueOf(Duration.ofMinutes(5).toMillis())),
TASK_VULNERABILITY_POLICY_BUNDLE_FETCH_LOCK_AT_MOST_FOR("task.vulnerability.policy.bundle.fetch.lockAtMostForInMillis", String.valueOf(Duration.ofMinutes(5).toMillis())),
TASK_VULNERABILITY_POLICY_BUNDLE_FETCH_LOCK_AT_LEAST_FOR("task.vulnerability.policy.bundle.fetch.lockAtLeastForInMillis", String.valueOf(Duration.ofSeconds(5).toMillis())),
TASK_HOUSEKEEPING_LOCK_AT_MOST_FOR("task.housekeeping.lockAtMostForInMillis", TimeUnit.MINUTES.toMillis(15)),
TASK_HOUSEKEEPING_LOCK_AT_LEAST_FOR("task.housekeeping.lockAtLeastForInMillis", TimeUnit.MINUTES.toMillis(5)),
BOM_UPLOAD_PROCESSING_TRX_FLUSH_THRESHOLD("bom.upload.processing.trx.flush.threshold", "10000"),
WORKFLOW_RETENTION_DURATION("workflow.retention.duration", "P3D"),
WORKFLOW_STEP_TIMEOUT_DURATION("workflow.step.timeout.duration", "PT1H"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import alpine.common.logging.Logger;
import alpine.event.LdapSyncEvent;
import alpine.event.framework.EventService;
import jakarta.servlet.ServletContextEvent;
import jakarta.servlet.ServletContextListener;
import org.dependencytrack.common.ConfigKey;
import org.dependencytrack.tasks.BomUploadProcessingTask;
import org.dependencytrack.tasks.CallbackTask;
Expand All @@ -30,6 +32,7 @@
import org.dependencytrack.tasks.EpssMirrorTask;
import org.dependencytrack.tasks.FortifySscUploadTask;
import org.dependencytrack.tasks.GitHubAdvisoryMirrorTask;
import org.dependencytrack.tasks.HouseKeepingTask;
import org.dependencytrack.tasks.IntegrityAnalysisTask;
import org.dependencytrack.tasks.IntegrityMetaInitializerTask;
import org.dependencytrack.tasks.InternalComponentIdentificationTask;
Expand All @@ -42,15 +45,11 @@
import org.dependencytrack.tasks.TaskScheduler;
import org.dependencytrack.tasks.VexUploadProcessingTask;
import org.dependencytrack.tasks.VulnerabilityAnalysisTask;
import org.dependencytrack.tasks.vulnerabilitypolicy.VulnerabilityPolicyFetchTask;
import org.dependencytrack.tasks.VulnerabilityScanCleanupTask;
import org.dependencytrack.tasks.WorkflowStateCleanupTask;
import org.dependencytrack.tasks.metrics.PortfolioMetricsUpdateTask;
import org.dependencytrack.tasks.metrics.ProjectMetricsUpdateTask;
import org.dependencytrack.tasks.metrics.VulnerabilityMetricsUpdateTask;
import org.dependencytrack.tasks.vulnerabilitypolicy.VulnerabilityPolicyFetchTask;

import jakarta.servlet.ServletContextEvent;
import jakarta.servlet.ServletContextListener;
import java.time.Duration;

/**
Expand Down Expand Up @@ -93,15 +92,14 @@ public void contextInitialized(final ServletContextEvent event) {
EVENT_SERVICE.subscribe(KennaSecurityUploadEventAbstract.class, KennaSecurityUploadTask.class);
EVENT_SERVICE.subscribe(InternalComponentIdentificationEvent.class, InternalComponentIdentificationTask.class);
EVENT_SERVICE.subscribe(CallbackEvent.class, CallbackTask.class);
EVENT_SERVICE.subscribe(VulnerabilityScanCleanupEvent.class, VulnerabilityScanCleanupTask.class);
EVENT_SERVICE.subscribe(NistMirrorEvent.class, NistMirrorTask.class);
EVENT_SERVICE.subscribe(VulnerabilityPolicyFetchEvent.class, VulnerabilityPolicyFetchTask.class);
EVENT_SERVICE.subscribe(EpssMirrorEvent.class, EpssMirrorTask.class);
EVENT_SERVICE.subscribe(ComponentPolicyEvaluationEvent.class, PolicyEvaluationTask.class);
EVENT_SERVICE.subscribe(ProjectPolicyEvaluationEvent.class, PolicyEvaluationTask.class);
EVENT_SERVICE.subscribe(WorkflowStateCleanupEvent.class, WorkflowStateCleanupTask.class);
EVENT_SERVICE.subscribe(IntegrityMetaInitializerEvent.class, IntegrityMetaInitializerTask.class);
EVENT_SERVICE.subscribe(IntegrityAnalysisEvent.class, IntegrityAnalysisTask.class);
EVENT_SERVICE.subscribe(HouseKeepingEvent.class, HouseKeepingTask.class);

TaskScheduler.getInstance();
}
Expand Down Expand Up @@ -130,14 +128,13 @@ public void contextDestroyed(final ServletContextEvent event) {
EVENT_SERVICE.unsubscribe(KennaSecurityUploadTask.class);
EVENT_SERVICE.unsubscribe(InternalComponentIdentificationTask.class);
EVENT_SERVICE.unsubscribe(CallbackTask.class);
EVENT_SERVICE.unsubscribe(VulnerabilityScanCleanupTask.class);
EVENT_SERVICE.unsubscribe(NistMirrorTask.class);
EVENT_SERVICE.unsubscribe(EpssMirrorTask.class);
EVENT_SERVICE.unsubscribe(PolicyEvaluationTask.class);
EVENT_SERVICE.unsubscribe(WorkflowStateCleanupTask.class);
EVENT_SERVICE.unsubscribe(IntegrityMetaInitializerTask.class);
EVENT_SERVICE.unsubscribe(IntegrityAnalysisTask.class);
EVENT_SERVICE.unsubscribe(VulnerabilityPolicyFetchTask.class);
EVENT_SERVICE.unsubscribe(HouseKeepingTask.class);
EVENT_SERVICE.shutdown(DRAIN_TIMEOUT_DURATION);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@

import alpine.event.framework.Event;

public class WorkflowStateCleanupEvent implements Event {
}
/**
* @since 5.6.0
*/
public class HouseKeepingEvent implements Event {
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1899,10 +1899,6 @@ public void updateWorkflowStateToFailed(WorkflowState workflowState, String fail
getWorkflowStateQueryManager().updateWorkflowStateToFailed(workflowState, failureReason);
}

public boolean hasWorkflowStepWithStatus(final UUID token, final WorkflowStep step, final WorkflowStatus status) {
return getWorkflowStateQueryManager().hasWorkflowStepWithStatus(token, step, status);
}

public IntegrityMetaComponent getIntegrityMetaComponent(String purl) {
return getIntegrityMetaQueryManager().getIntegrityMetaComponent(purl);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;

public class WorkflowStateQueryManager extends QueryManager implements IQueryManager {
Expand Down Expand Up @@ -329,20 +328,4 @@ public void createWorkflowSteps(UUID token) {
}
}

public boolean hasWorkflowStepWithStatus(final UUID token, final WorkflowStep step, final WorkflowStatus status) {
final Query<WorkflowState> stateQuery = pm.newQuery(WorkflowState.class);
stateQuery.setFilter("token == :token && step == :step && status == :status");
stateQuery.setNamedParameters(Map.of(
"token", token,
"step", step,
"status", status
));
stateQuery.setResult("count(this)");
try {
return stateQuery.executeResultUnique(Long.class) > 0;
} finally {
stateQuery.closeAll();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.GetGeneratedKeys;
import org.jdbi.v3.sqlobject.statement.SqlBatch;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

import java.time.Duration;
import java.util.List;

public interface VulnerabilityScanDao extends SqlObject {
Expand Down Expand Up @@ -54,9 +56,18 @@ THEN CASE WHEN (CAST("SCAN_FAILED" + :scannerResultsFailed AS DOUBLE PRECISION)
""")
@RegisterBeanMapper(VulnerabilityScan.class)
@GetGeneratedKeys({"TOKEN", "STATUS", "TARGET_TYPE", "TARGET_IDENTIFIER", "FAILURE_REASON"})
List<VulnerabilityScan> updateAll(@Bind("token") List<String> tokens,
@Bind List<Integer> resultsTotal,
@Bind List<Integer> scannerResultsTotal,
@Bind List<Integer> scannerResultsFailed);
List<VulnerabilityScan> updateAll(
@Bind("token") List<String> tokens,
@Bind List<Integer> resultsTotal,
@Bind List<Integer> scannerResultsTotal,
@Bind List<Integer> scannerResultsFailed
);

@SqlUpdate("""
DELETE
FROM "VULNERABILITYSCAN"
WHERE AGE(NOW(), "UPDATED_AT") >= :duration
""")
int deleteAllForRetentionDuration(@Bind Duration duration);

}
110 changes: 97 additions & 13 deletions src/main/java/org/dependencytrack/persistence/jdbi/WorkflowDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@
import org.dependencytrack.model.WorkflowState;
import org.dependencytrack.model.WorkflowStatus;
import org.dependencytrack.model.WorkflowStep;
import org.jdbi.v3.sqlobject.SqlObject;
import org.jdbi.v3.sqlobject.config.RegisterBeanMapper;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.GetGeneratedKeys;
import org.jdbi.v3.sqlobject.statement.SqlBatch;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;

public interface WorkflowDao {
public interface WorkflowDao extends SqlObject {

@SqlBatch("""
UPDATE "WORKFLOW_STATE"
Expand All @@ -46,15 +49,19 @@ public interface WorkflowDao {
""")
@GetGeneratedKeys("*")
@RegisterBeanMapper(WorkflowState.class)
List<WorkflowState> updateAllStates(@Bind WorkflowStep step,
@Bind("token") List<String> tokens,
@Bind("status") List<WorkflowStatus> statuses,
@Bind("failureReason") List<String> failureReasons);
List<WorkflowState> updateAllStates(
@Bind WorkflowStep step,
@Bind("token") List<String> tokens,
@Bind("status") List<WorkflowStatus> statuses,
@Bind("failureReason") List<String> failureReasons
);

default Optional<WorkflowState> updateState(final WorkflowStep step,
final String token,
final WorkflowStatus status,
final String failureReason) {
default Optional<WorkflowState> updateState(
final WorkflowStep step,
final String token,
final WorkflowStatus status,
final String failureReason
) {
final List<WorkflowState> updatedStates = updateAllStates(step, List.of(token), List.of(status), Collections.singletonList(failureReason));
if (updatedStates.isEmpty()) {
return Optional.empty();
Expand All @@ -70,9 +77,11 @@ default Optional<WorkflowState> updateState(final WorkflowStep step,
AND "STATUS" = :status
AND "TOKEN" = ANY(:tokens)
""")
Set<String> getTokensByStepAndStateAndTokenAnyOf(@Bind WorkflowStep step,
@Bind WorkflowStatus status,
@Bind Collection<String> tokens);
Set<String> getTokensByStepAndStateAndTokenAnyOf(
@Bind WorkflowStep step,
@Bind WorkflowStatus status,
@Bind Collection<String> tokens
);

@SqlBatch("""
WITH RECURSIVE
Expand All @@ -95,8 +104,83 @@ Set<String> getTokensByStepAndStateAndTokenAnyOf(@Bind WorkflowStep step,
UPDATE "WORKFLOW_STATE"
SET "STATUS" = 'CANCELLED'
, "UPDATED_AT" = NOW()
WHERE "ID" IN (SELECT "ID" FROM "CTE_CHILDREN")
WHERE "ID" = ANY(SELECT "ID" FROM "CTE_CHILDREN")
""")
void cancelAllChildren(@Bind WorkflowStep step, @Bind("token") List<String> tokens);

/**
* @since 5.6.0
*/
@SqlBatch("""
WITH RECURSIVE
"CTE_CHILDREN" ("ID") AS (
SELECT "ID"
FROM "WORKFLOW_STATE"
WHERE "PARENT_STEP_ID" = :parentId
UNION ALL
SELECT "CHILD"."ID"
FROM "WORKFLOW_STATE" AS "CHILD"
INNER JOIN "CTE_CHILDREN" AS "PARENT"
ON "PARENT"."ID" = "CHILD"."PARENT_STEP_ID"
)
UPDATE "WORKFLOW_STATE"
SET "STATUS" = 'CANCELLED'
, "UPDATED_AT" = NOW()
WHERE "ID" = ANY(SELECT "ID" FROM "CTE_CHILDREN")
""")
int[] cancelAllChildrenByParentStepIdAnyOf(@Bind("parentId") List<Long> parentIds);

/**
* @since 5.6.0
*/
@SqlUpdate("""
UPDATE "WORKFLOW_STATE"
SET "STATUS" = 'TIMED_OUT'
, "UPDATED_AT" = NOW()
WHERE "STATUS" = 'PENDING'
AND AGE(NOW(), "UPDATED_AT") >= :timeoutDuration
""")
int transitionAllPendingStepsToTimedOutForTimeout(@Bind Duration timeoutDuration);

/**
* @since 5.6.0
*/
default List<Long> transitionAllTimedOutStepsToFailedForTimeout(final Duration timeoutDuration) {
// NB: Can't use interface method here due to https://github.com/jdbi/jdbi/issues/1807.
return getHandle().createUpdate("""
UPDATE "WORKFLOW_STATE"
SET "STATUS" = 'FAILED'
, "FAILURE_REASON" = 'Timed out'
, "UPDATED_AT" = NOW()
WHERE "STATUS" = 'TIMED_OUT'
AND AGE(NOW(), "UPDATED_AT") >= :timeoutDuration
RETURNING "ID"
""")
.bind("timeoutDuration", timeoutDuration)
.executeAndReturnGeneratedKeys()
.mapTo(Long.class)
.list();
}

/**
* @since 5.6.0
*/
@SqlUpdate("""
WITH "CTE_ELIGIBLE_TOKENS" AS (
SELECT "TOKEN"
FROM "WORKFLOW_STATE" AS "WFS_PARENT"
WHERE NOT EXISTS(
SELECT 1
FROM "WORKFLOW_STATE" AS "WFS"
WHERE "WFS"."TOKEN" = "WFS_PARENT"."TOKEN"
AND "WFS"."STATUS" IN ('PENDING', 'TIMED_OUT'))
GROUP BY "TOKEN"
HAVING AGE(NOW(), MAX("UPDATED_AT")) >= :retentionDuration
)
DELETE
FROM "WORKFLOW_STATE"
WHERE "TOKEN" = ANY(SELECT "TOKEN" FROM "CTE_ELIGIBLE_TOKENS")
""")
int deleteAllForRetention(@Bind Duration retentionDuration);

}
Loading

0 comments on commit 70d4d71

Please sign in to comment.