Skip to content

Commit

Permalink
Add Timeout Test.
Browse files Browse the repository at this point in the history
  • Loading branch information
oneinstepGO committed Dec 30, 2024
1 parent 340f5c6 commit 500d08c
Show file tree
Hide file tree
Showing 11 changed files with 279 additions and 71 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.oneinstep.haidu.config;

import com.oneinstep.haidu.context.RequestContext;

// 支持自定义参数解析器
public interface ParameterResolver<T> {
T resolve(String value, RequestContext context);
}
27 changes: 27 additions & 0 deletions src/main/java/com/oneinstep/haidu/config/TaskConfigValidator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.oneinstep.haidu.config;

import java.util.List;
import java.util.Map;

public class TaskConfigValidator {

// 验证任务配置的合法性
public static void validate(TaskConfig config) {
validateArrangeRules(config.getArrangeRule());
validateTaskDetails(config.getTaskDetailsMap());
validateParameters(config.getTaskDetailsMap());
validateCircularDependencies(config);
}

private static void validateArrangeRules(List<List<String>> arrangeRule) {
}

private static void validateTaskDetails(Map<String, TaskDetail> taskDetailsMap) {
}

private static void validateParameters(Map<String, TaskDetail> taskDetailsMap) {
}

private static void validateCircularDependencies(TaskConfig config) {
}
}
10 changes: 10 additions & 0 deletions src/main/java/com/oneinstep/haidu/context/RequestContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,14 @@ public class RequestContext {
public void clearTaskInstanceMap() {
taskInstanceMap.clear();
}

/**
* 获取任务实例
*
* @param taskId 任务ID
* @return 任务实例
*/
public AbstractTask<?> getTaskInstance(String taskId) {
return taskInstanceMap.get(taskId);
}
}
54 changes: 31 additions & 23 deletions src/main/java/com/oneinstep/haidu/core/AbstractTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,41 @@ protected boolean checkResult(RequestContext requestContext, Result<T> result) {
}

/**
* 任务执行异常处理
* 任务执行错误处理
*
* @param requestContext 请求上下文
* @param e 异常
* @param context 请求上下文
* @param e 异常
*/
protected void whenException(RequestContext requestContext, Exception e) {
getLogger().error("taskId:{} invoke exception.", getTaskId(), e);
protected void onError(RequestContext context, Throwable e) {
getLogger().error("taskId:{} invoke error.", getTaskId(), e);
throw new RuntimeException(e);
}

/**
* 任务执行超时处理
*
* @param context 请求上下文
*/
protected void onTimeout(RequestContext context) {
getLogger().warn("taskId:{} invoke timeout.", getTaskId());
}

/**
* 任务执行取消处理
*
* @param context 请求上下文
*/
protected void onCancel(RequestContext context) {
getLogger().warn("taskId:{} invoke cancel.", getTaskId());
}

@Override
public void accept(RequestContext requestContext) {
public final Consumer<RequestContext> andThen(Consumer<? super RequestContext> after) {
return Consumer.super.andThen(after);
}

@Override
public final void accept(RequestContext requestContext) {
int attempts = 0;
boolean success = false;
long startTime = System.currentTimeMillis();
Expand All @@ -102,36 +125,21 @@ public void accept(RequestContext requestContext) {
Result<T> result = invoke(requestContext);
// 检查任务是否超时
if (isTimeout(startTime)) {
getLogger().warn("taskId:{} invoke timeout.", getTaskId());
onTimeout(requestContext);
break;
}
// 检查任务执行结果并存储
success = checkAndPutResult(requestContext, result);

} catch (Exception e) {
// 处理任务执行异常
handleException(requestContext, e, attempts);
onError(requestContext, e);
}
// 重试次数加1
attempts++;
}
}

/**
* 处理任务执行异常
*
* @param requestContext 请求上下文
* @param e 异常
* @param attempts 重试次数
*/
private void handleException(RequestContext requestContext, Exception e, int attempts) {
if (attempts == getRetries()) {
whenException(requestContext, e);
} else {
getLogger().warn("taskId:{} invoke exception, retrying... Attempt: {}", getTaskId(), attempts + 1, e);
}
}

/**
* 检查任务是否超时
*
Expand Down
100 changes: 52 additions & 48 deletions src/main/java/com/oneinstep/haidu/core/TaskEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,62 +60,66 @@ public static TaskEngine getInstance(final ExecutorService taskThreadPool) {
* @param context 请求上下文,包含任务配置
*/
public void startEngine(RequestContext context) {
// 检查是否已经启动过引擎
if (context.isEngineStarted()) {
throw new IllegalStateException("任务引擎已经启动!");
}
try {
// 检查是否已经启动过引擎
if (context.isEngineStarted()) {
throw new IllegalStateException("任务引擎已经启动!");
}

// 获取任务配置
TaskConfig taskConfig = context.getTaskConfig();
if (taskConfig == null) {
log.warn("the task config is null.");
throw new IllegalTaskConfigException("任务配置为空");
}
// 获取任务编排规则
List<List<String>> arrange = taskConfig.getArrangeRule();
if (CollectionUtils.isEmpty(arrange)) {
log.warn("the task arrange is empty...");
throw new IllegalTaskConfigException("任务编排为空");
}
// 获取任务配置
TaskConfig taskConfig = context.getTaskConfig();
if (taskConfig == null) {
log.warn("the task config is null.");
throw new IllegalTaskConfigException("任务配置为空");
}
// 获取任务编排规则
List<List<String>> arrange = taskConfig.getArrangeRule();
if (CollectionUtils.isEmpty(arrange)) {
log.warn("the task arrange is empty...");
throw new IllegalTaskConfigException("任务编排为空");
}

// 处理任务参数
handleTaskParams(taskConfig.getTaskDetailsMap(), context);
// 处理任务参数
handleTaskParams(taskConfig.getTaskDetailsMap(), context);

// 第一组是前置任务,中间N个组是并行任务,最后一个组是后置任务
// 前置任务
List<String> preTasks = arrange.get(0);
arrangeToOneFuture(ExpressionParser.parseExpressions(preTasks, context.getTaskInstanceMap(),
taskConfig.getTaskDetailsMap()), context).join();
// 第一组是前置任务,中间N个组是并行任务,最后一个组是后置任务
// 前置任务
List<String> preTasks = arrange.get(0);
arrangeToOneFuture(ExpressionParser.parseExpressions(preTasks, context.getTaskInstanceMap(),
taskConfig.getTaskDetailsMap()), context).join();

// 并行任务
if (arrange.size() > 2) {
List<CompletableFuture<Void>> parallelFutures = new ArrayList<>();
for (int i = 1; i < arrange.size() - 1; i++) {
List<String> parallelTasks = arrange.get(i);
CompletableFuture<Void> future = arrangeToOneFuture(ExpressionParser.parseExpressions(parallelTasks,
context.getTaskInstanceMap(), taskConfig.getTaskDetailsMap()), context);
parallelFutures.add(future);
// 并行任务
if (arrange.size() > 2) {
List<CompletableFuture<Void>> parallelFutures = new ArrayList<>();
for (int i = 1; i < arrange.size() - 1; i++) {
List<String> parallelTasks = arrange.get(i);
CompletableFuture<Void> future = arrangeToOneFuture(ExpressionParser.parseExpressions(parallelTasks,
context.getTaskInstanceMap(), taskConfig.getTaskDetailsMap()), context);
parallelFutures.add(future);
}
// 等待所有并行任务完成
CompletableFuture.allOf(parallelFutures.toArray(new CompletableFuture[0])).join();
} else {
log.warn("没有并行任务组...");
}
// 等待所有并行任务完成
CompletableFuture.allOf(parallelFutures.toArray(new CompletableFuture[0])).join();
} else {
log.warn("没有并行任务组...");
}

// 后置任务
if (arrange.size() >= 2) {
// 后置任务
List<String> postTasks = arrange.get(arrange.size() - 1);
arrangeToOneFuture(ExpressionParser.parseExpressions(postTasks, context.getTaskInstanceMap(),
taskConfig.getTaskDetailsMap()), context).join();
} else {
log.warn("没有后置任务组...");
}
if (arrange.size() >= 2) {
// 后置任务
List<String> postTasks = arrange.get(arrange.size() - 1);
arrangeToOneFuture(ExpressionParser.parseExpressions(postTasks, context.getTaskInstanceMap(),
taskConfig.getTaskDetailsMap()), context).join();
} else {
log.warn("没有后置任务组...");
}

// 设置引擎启动标志位
context.setEngineStarted(true);
// 清空任务实例缓存
context.clearTaskInstanceMap();
// 设置引擎启动标志位
context.setEngineStarted(true);

} finally {
// 清空任务实例缓存
context.clearTaskInstanceMap();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.oneinstep.haidu.exception;

import com.oneinstep.haidu.context.RequestContext;

public interface ExceptionHandler {
void handle(String taskId, Throwable e, RequestContext context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.oneinstep.haidu.exception;

// 支持自定义异常重试策略
public interface RetryStrategy {
boolean shouldRetry(Throwable e, int attempts);

long getDelayMillis(int attempts);
}
16 changes: 16 additions & 0 deletions src/main/java/com/oneinstep/haidu/monitor/TaskMonitor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.oneinstep.haidu.monitor;

public interface TaskMonitor {
void onTaskStart(String taskId);

void onTaskComplete(String taskId, long duration);

void onTaskError(String taskId, Throwable error);

void onTaskTimeout(String taskId);

// 性能指标
void recordTaskDuration(String taskId, long duration);

void recordTaskQueueTime(String taskId, long queueTime);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.oneinstep.haidu.scheduler;

import com.oneinstep.haidu.context.RequestContext;
import com.oneinstep.haidu.core.AbstractTask;

// 支持自定义任务调度策略
public interface TaskScheduler {
void schedule(AbstractTask<?> task, RequestContext context);
}
74 changes: 74 additions & 0 deletions src/test/java/com/oneinstep/haidu/integration/TestTaskTimeout.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.oneinstep.haidu.integration;

import com.oneinstep.haidu.config.JsonTaskDefinitionReader;
import com.oneinstep.haidu.config.TaskConfig;
import com.oneinstep.haidu.config.TaskConfigFactory;
import com.oneinstep.haidu.context.RequestContext;
import com.oneinstep.haidu.core.TaskEngine;
import com.oneinstep.haidu.result.Result;
import org.junit.jupiter.api.Test;

import java.io.StringReader;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.junit.jupiter.api.Assertions.assertNull;

class TestTaskTimeout {

// 添加更多测试场景
@Test
void shouldHandleTaskTimeout() throws Exception {
// 准备配置
String config = """
{
"arrangeName": "test-timeout",
"description": "Task timeout test",
"arrangeRule": [
["timeOutTask"]
],
"taskDetailsMap": {
"timeOutTask": {
"taskId": "timeOutTask",
"fullClassName": "com.oneinstep.haidu.task.TimeOutTask",
"timeout": 1000,
"taskParams": [
]
}
}
}
""";
// 创建上下文
RequestContext context = new RequestContext();

// 配置任务
TaskConfigFactory factory = new TaskConfigFactory(new JsonTaskDefinitionReader());
TaskConfig taskConfig = factory.createConfig(new StringReader(config));
context.setTaskConfig(taskConfig);

// 执行任务
ExecutorService executorService = Executors.newFixedThreadPool(1);
TaskEngine engine = TaskEngine.getInstance(executorService);

engine.startEngine(context);

// 验证结果
Map<String, Result<?>> results = context.getTaskResultMap();
Result<?> result = results.get("timeOutTask");

// 验证结果
assertNull(result);

}

@Test
void shouldHandleCircularDependencies() {
}

@Test
void shouldHandleConcurrentTaskExecution() {
}

}
Loading

0 comments on commit 500d08c

Please sign in to comment.