diff --git a/aiida_workgraph/engine/workgraph.py b/aiida_workgraph/engine/workgraph.py index d41714d2..ab63693f 100644 --- a/aiida_workgraph/engine/workgraph.py +++ b/aiida_workgraph/engine/workgraph.py @@ -1079,10 +1079,13 @@ def run_tasks(self, names: t.List[str], continue_workgraph: bool = True) -> None executor, args, kwargs, var_args, var_kwargs ) if not isinstance(results, dict): - self.report("The results of the awaitable builder must be a dict.") + + self.logger.error( + "The results of the awaitable builder must be a dict." + ) for key, value in results.items(): if not isinstance(value, ProcessNode): - self.report( + self.logger.error( f"The value of key {key} is not an instance of ProcessNode." ) self.set_task_state_info(name, "state", "Failed") diff --git a/aiida_workgraph/utils/__init__.py b/aiida_workgraph/utils/__init__.py index cc04c555..7e8a4082 100644 --- a/aiida_workgraph/utils/__init__.py +++ b/aiida_workgraph/utils/__init__.py @@ -261,7 +261,7 @@ def get_parent_workgraphs(pk: int) -> list: def get_processes_latest( - pk: int, node_name: str = None + pk: int, node_name: str = None, item_type: str = "task" ) -> Dict[str, Dict[str, Union[int, str]]]: """Get the latest info of all tasks from the process.""" import aiida @@ -270,39 +270,53 @@ def get_processes_latest( from aiida_workgraph.engine.workgraph import WorkGraphEngine tasks = {} - node_names = [node_name] if node_name else [] - if node_name: - projections = [ - f"extras._task_state_{node_name}", - f"extras._task_process_{node_name}", - ] - else: - projections = [] - process = aiida.orm.load_node(pk) - node_names = [ - key[12:] - for key in process.base.extras.keys() - if key.startswith("_task_state") - ] - projections = [f"extras._task_state_{name}" for name in node_names] - projections.extend([f"extras._task_process_{name}" for name in node_names]) - qb = QueryBuilder() - qb.append(WorkGraphEngine, filters={"id": pk}, project=projections) - # print("projections: ", projections) - results = qb.all() - # change results to dict - results = dict(zip(projections, results[0])) - # print("results: ", results) - for name in node_names: - state = results[f"extras._task_state_{name}"] - task_process = deserialize_unsafe(results[f"extras._task_process_{name}"]) - tasks[name] = { - "pk": task_process.pk if task_process else None, - "process_type": task_process.process_type if task_process else "", - "state": state, - "ctime": task_process.ctime if task_process else None, - "mtime": task_process.mtime if task_process else None, - } + if item_type == "called_process": + # fetch the process that called by the workgraph + node = aiida.orm.load_node(pk) + for link in node.base.links.get_outgoing().all(): + if isinstance(link.node, aiida.orm.ProcessNode): + tasks[f"{link.node.process_label}_{link.node.pk}"] = { + "pk": link.node.pk, + "process_type": link.node.process_type, + "state": link.node.process_state.value, + "ctime": link.node.ctime, + "mtime": link.node.mtime, + } + elif item_type == "task": + node_names = [node_name] if node_name else [] + if node_name: + projections = [ + f"extras._task_state_{node_name}", + f"extras._task_process_{node_name}", + ] + else: + projections = [] + process = aiida.orm.load_node(pk) + node_names = [ + key[12:] + for key in process.base.extras.keys() + if key.startswith("_task_state") + ] + projections = [f"extras._task_state_{name}" for name in node_names] + projections.extend([f"extras._task_process_{name}" for name in node_names]) + qb = QueryBuilder() + qb.append(WorkGraphEngine, filters={"id": pk}, project=projections) + # print("projections: ", projections) + results = qb.all() + # change results to dict + results = dict(zip(projections, results[0])) + # print("results: ", results) + for name in node_names: + state = results[f"extras._task_state_{name}"] + task_process = deserialize_unsafe(results[f"extras._task_process_{name}"]) + tasks[name] = { + "pk": task_process.pk if task_process else None, + "process_type": task_process.process_type if task_process else "", + "state": state, + "ctime": task_process.ctime if task_process else None, + "mtime": task_process.mtime if task_process else None, + } + return tasks diff --git a/aiida_workgraph/web/backend/app/workgraph.py b/aiida_workgraph/web/backend/app/workgraph.py index 20c51db8..fef88692 100644 --- a/aiida_workgraph/web/backend/app/workgraph.py +++ b/aiida_workgraph/web/backend/app/workgraph.py @@ -100,11 +100,11 @@ async def read_workgraph(id: int): @router.get("/api/workgraph-state/{id}") -async def read_workgraph_tasks_state(id: int): +async def read_workgraph_tasks_state(id: int, item_type: str = "task"): from aiida_workgraph.utils import get_processes_latest try: - processes_info = get_processes_latest(id) + processes_info = get_processes_latest(id, item_type=item_type) return processes_info except KeyError: raise HTTPException(status_code=404, detail=f"Workgraph {id} not found") diff --git a/aiida_workgraph/web/frontend/src/components/WorkGraphDuration.js b/aiida_workgraph/web/frontend/src/components/WorkGraphDuration.js index 3a9cb3cd..c1835d7c 100644 --- a/aiida_workgraph/web/frontend/src/components/WorkGraphDuration.js +++ b/aiida_workgraph/web/frontend/src/components/WorkGraphDuration.js @@ -10,11 +10,11 @@ const NodeDurationGraph = ({ id }) => { const [timeStart, setTimeStart] = useState(null); const [timeEnd, setTimeEnd] = useState(null); const [initialLoad, setInitialLoad] = useState(true); + const [useItemType, setUseItemType] = useState("task"); - // Function to fetch data from the backend const fetchData = async () => { try { - const response = await fetch(`http://localhost:8000/api/workgraph-state/${id}`); + const response = await fetch(`http://localhost:8000/api/workgraph-state/${id}?item_type=${useItemType}`); if (!response.ok) { throw new Error('Network response was not ok'); } @@ -26,10 +26,11 @@ const NodeDurationGraph = ({ id }) => { }; useEffect(() => { + setInitialLoad(true); fetchData(); const interval = setInterval(fetchData, 5000); return () => clearInterval(interval); - }, [id]); + }, [id, useItemType]); useEffect(() => { if (Object.keys(processesInfo).length) { @@ -38,15 +39,13 @@ const NodeDurationGraph = ({ id }) => { title: key })); - const newItems = Object.entries(processesInfo).map(([key, { ctime, mtime, process_type }], idx) => { - return process_type ? { - id: idx, - group: idx, - title: key, - start_time: ctime ? moment(ctime) : null, - end_time: mtime ? moment(mtime) : null - } : null; - }).filter(item => item !== null); + const newItems = Object.entries(processesInfo).map(([key, { ctime, mtime, process_type }], idx) => ({ + id: idx, + group: idx, + title: key, + start_time: ctime ? moment(ctime) : null, + end_time: mtime ? moment(mtime) : null + })); setGroups(newGroups); setItems(newItems); @@ -58,20 +57,27 @@ const NodeDurationGraph = ({ id }) => { setTimeStart(moment.min(validStartTimes).valueOf()); setTimeEnd(moment.max(validEndTimes).valueOf()); } + else { + // use the current time as the start time + setTimeStart(moment().valueOf()); + // use the current time + 1 hour as the end time + setTimeEnd(moment().add(1, 'hour').valueOf()); + } setInitialLoad(false); } + } else { + setGroups([]); + setItems([]); + setTimeStart(null); + setTimeEnd(null); } }, [processesInfo]); const minZoom = 10000; // 10 seconds in milliseconds const maxZoom = 365.25 * 24 * 60 * 60 * 1000; // 1 year in milliseconds - if (!timeStart || !timeEnd) { - return
@@ -80,24 +86,50 @@ const NodeDurationGraph = ({ id }) => {
Data nodes are shown as rows without bars.
+