From 8d1cd0141297cab0277a232cc5a7343da481b057 Mon Sep 17 00:00:00 2001 From: Mihir Bala Date: Wed, 28 Feb 2024 13:45:29 -0500 Subject: [PATCH] Fixed manual kill functionality for missions --- hermes/python/mission/MissionController.py | 84 +++++----------------- hermes/python/mission/TaskRunner.py | 58 ++++++--------- hermes/python/requirements.txt | 6 +- 3 files changed, 46 insertions(+), 102 deletions(-) diff --git a/hermes/python/mission/MissionController.py b/hermes/python/mission/MissionController.py index 27dfea6c..7fa021ef 100644 --- a/hermes/python/mission/MissionController.py +++ b/hermes/python/mission/MissionController.py @@ -21,127 +21,81 @@ def __init__(self, drone, cloudlet): self.start_task_id = None self.curr_task_id = None self.transitMap = {} - self.transitMap["default"]= MissionCreator.default_transit + self.transitMap["default"] = MissionCreator.default_transit self.task_arg_map = {} - ######################################################## TASK ############################################################# def create_task(self, task_id): - logger.info(f'MC: taskid{task_id}') if (task_id in self.task_arg_map.keys()): if (self.task_arg_map[task_id].task_type == TaskType.Detect): - logger.info('MC: Detect task') return DetectTask(self.drone, self.cloudlet, task_id, self.trigger_event_queue, self.task_arg_map[task_id]) elif (self.task_arg_map[task_id].task_type == TaskType.Track): - logger.info('MC: Track task') return TrackTask(self.drone, self.cloudlet, task_id, self.trigger_event_queue, self.task_arg_map[task_id]) return None def next_task(self, current_task_id, triggered_event): - logger.info(f"MC next task, current_task_id {current_task_id}, trigger_event {triggered_event}") try: next_task_id = self.transitMap.get(current_task_id, MissionCreator.default_transit)(triggered_event) except Exception as e: logger.info(f"{e}") - logger.info(f"next_task_id {next_task_id}") return next_task_id - - ######################################################## MISSION ############################################################ async def start_mission(self, tr): - logger.info('MC: Start mission') self.start_task_id = self.next_task("start", None) - logger.info('MC: Create task') start_task = self.create_task(self.start_task_id) - logger.info('MC: Got task, starting...') if start_task != None: - # set the current task + # Set the current task self.curr_task_id = start_task.task_id - logger.info(f"MC: start mission, current taskid:{self.curr_task_id}\n") - # takeoff + # Takeoff await self.drone.takeOff() - logger.info("MC: taking off") - # start - tr.push_task(start_task) + # Start + tr.queue_task(start_task) async def transit_to(self, task, tr): - logger.info(f"MC: transit to task with task_id: {task.task_id}, current_task_id: {self.curr_task_id}") - tr.stop_task() - tr.push_task(task) + logger.info(f"[MissionController] Transiting to task: {task.task_id} from current task: {self.curr_task_id}") + tr.force_task(task) self.curr_task_id = task.task_id async def end_mission(self): - logger.info("MC: end mission, rth\n") + logger.info("[MissionController] End of mission, returning home!") await self.drone.rth() def get_current_task(self): return self.curr_task_id - ######################################################## CONTROL ############################################################ - async def run(self): try: - # start the mc - logger.info("MissionController: hi start the controller\n") + # Start the MissionController + logger.info("[MissionController] Starting the MissionController") - logger.info("MissionController: define mission \n") MissionCreator.define_mission(self.transitMap, self.task_arg_map) - logger.info(f"MissionController: transitMap {str(self.transitMap)} \n") - logger.info(f"MissionController: task_arg_map {str(self.task_arg_map)} \n") - - logger.info("MissionController: create TaskRunner \n") tr = TaskRunner(self.drone) tr_coroutine = asyncio.create_task(tr.run()) - - logger.info("MissionController: start mission \n") await self.start_mission(tr) - logger.info("MissionController: go to the inf loop routine\n") - # main logic check the triggered event - while True: - # logger.info('[MC] HI tttt') - - if (not self.trigger_event_queue.empty()): - item = self.trigger_event_queue.get() + try: + item = self.trigger_event_queue.get_nowait() task_id = item[0] trigger_event = item[1] - logger.info(f"MissionController: Trigger one event! \n") - logger.info(f"MissionController: Task id {task_id} \n") - logger.info(f"MissionController: event {trigger_event} \n") + logger.info(f"[MissionController] Event triggered: {trigger_event}") if (task_id == self.get_current_task()): next_task_id = self.next_task(task_id, trigger_event) if (next_task_id == "terminate"): break else: next_task = self.create_task(next_task_id) - logger.info(f"MissionController: task created taskid {str(next_task.task_id)} \n") + logger.info(f"[MissionController] Task created: {str(next_task.task_id)} \n") await self.transit_to(next_task, tr) - - await asyncio.sleep(0.1) + except queue.Empty: + await asyncio.sleep(0.1) except asyncio.CancelledError as e: logger.info(f"MissionController: catching the asyncio exception {e} \n") finally: - # terminate the tr - logger.info("MissionController: terminating TaskRunner \n") - tr.terminate() - await tr_coroutine - logger.info("MissionController: terminated TaskRunner \n") - - # terminate the mr - logger.info(f"MissionController: the current task is done, end mission \n") - await self.end_mission() - - #end the mc - logger.info("MissionController: terminate the controller\n") - - - - - - - + # Terminate the TaskRunner + logger.info("MissionController: terminating TaskRunner") + tr_coroutine.cancel() diff --git a/hermes/python/mission/TaskRunner.py b/hermes/python/mission/TaskRunner.py index 2ad2ac62..476fda38 100644 --- a/hermes/python/mission/TaskRunner.py +++ b/hermes/python/mission/TaskRunner.py @@ -8,61 +8,49 @@ class TaskRunner(): def __init__(self, drone): self.drone = drone - # lock the taskCoroutinue for contention? self.taskCoroutinue = None self.currentTask = None - # thread safe queue self.taskQueue = queue.Queue() - self.isTerminated = False - - def terminate(self): - self.isTerminated = True - + async def run(self): - logger.info('[TaskRunner] Start to manage the task queue') + logger.info('[TaskRunner] Start executing the task queue') try: while True: - # termniate the loop if commanded by mission controller - if (self.isTerminated): - logger.info('[TaskRunner] terminating the task queue') - break - - # logger.info('[TaskRunner] HI tttt') - if (not self.taskQueue.empty()): - # get the task + try: + # Get the current task, if it exists. + self.currentTask = self.taskQueue.get_nowait() + # If we get here without throwing an exception, we have a new task! logger.info('[TaskRunner] Pulling a task off the task queue') - self.currentTask = self.taskQueue.get() - # execute a task + # Execute the task + # We don't wait on this task to complete so that we can stop it + # independently without causing a CancelledError. self.taskCoroutinue = asyncio.create_task(self.currentTask.run()) - await asyncio.sleep(0.1) + except queue.Empty: + await asyncio.sleep(0.5) except asyncio.CancelledError as e: - logger.info(f"[TaskRunner]: catching the asyncio exception {e} \n") - raise - finally: self.stop_task() + logger.info(f'[TaskRunner] Stopped!') def stop_task(self): logger.info(f'[TaskRunner] Stopping current task!') if self.taskCoroutinue is not None: - # stop all the transitions of the task + # Stop all the transitions of the task self.currentTask.stop_trans() - logger.info(f'[TaskRunner] transitions in the current task stopped!') + logger.info(f'[TaskRunner] Transitions for the current task stopped!') - is_canceled = self.taskCoroutinue.cancel() - if is_canceled: - logger.info(f'[TaskRunner] task cancelled successfully') - - + self.taskCoroutinue.cancel() + logger.info(f'[TaskRunner] Current task cancelled.') - def push_task(self, task): - logger.info(f'[TaskRunner] push the task! task: {str(task)}') - + def queue_task(self, task): + logger.info(f'[TaskRunner] Queueing task: {str(task)}') self.taskQueue.put(task) - def pause(self): - pass + def pause_task(self): + raise NotImplemented() def force_task(self, task): - pass + logger.info(f'[TaskRunner] Forcing new task: {str(task)}') + self.stop_task() + self.queue_task(task) diff --git a/hermes/python/requirements.txt b/hermes/python/requirements.txt index 139597f9..2a518f5f 100644 --- a/hermes/python/requirements.txt +++ b/hermes/python/requirements.txt @@ -1,2 +1,4 @@ - - +aenum==3.1.12 +gabriel_protocol==2.0.1 +numpy==1.26.4 +scipy==1.12.0