From 80eaa4b3facf1898c44935af1f2c6840b1b8afcf Mon Sep 17 00:00:00 2001 From: Michal Staniaszek Date: Fri, 28 Oct 2022 14:11:30 +0100 Subject: [PATCH] apply black formatting and add black check on PRs and pushes --- .github/workflows/black.yml | 12 + docs/conf.py | 36 +- spot_driver/doc/conf.py | 27 +- spot_driver/setup.py | 6 +- spot_driver/src/spot_driver/graph_nav_util.py | 55 +- spot_driver/src/spot_driver/ros_helpers.py | 158 ++- spot_driver/src/spot_driver/spot_ros.py | 818 ++++++++++----- spot_driver/src/spot_driver/spot_wrapper.py | 959 ++++++++++++------ 8 files changed, 1430 insertions(+), 641 deletions(-) create mode 100644 .github/workflows/black.yml diff --git a/.github/workflows/black.yml b/.github/workflows/black.yml new file mode 100644 index 00000000..bce5d3e3 --- /dev/null +++ b/.github/workflows/black.yml @@ -0,0 +1,12 @@ +name: Black formatting + +on: [push, pull_request] + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: psf/black@stable + with: + src: "spot_driver/src spot_driver/scripts" diff --git a/docs/conf.py b/docs/conf.py index 9604abe9..c49d0247 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -7,31 +7,29 @@ sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) extensions = [ - 'sphinx.ext.autodoc', - 'sphinx.ext.doctest', - 'sphinx.ext.viewcode', + "sphinx.ext.autodoc", + "sphinx.ext.doctest", + "sphinx.ext.viewcode", ] -source_suffix = '.rst' -master_doc = 'index' +source_suffix = ".rst" +master_doc = "index" -project = u'Spot ROS User Documentation' -copyright = u'2020, Clearpath Robotics' +project = "Spot ROS User Documentation" +copyright = "2020, Clearpath Robotics" # Get version number from package.xml. -tree = etree.parse('../spot_driver/package.xml') +tree = etree.parse("../spot_driver/package.xml") version = tree.find("version").text release = version -#.. html_theme = 'nature' -#.. html_theme_path = ["."] +# .. html_theme = 'nature' +# .. html_theme_path = ["."] html_theme = "sphinx_rtd_theme" html_theme_path = ["."] -html_sidebars = { - '**': ['sidebartoc.html', 'sourcelink.html', 'searchbox.html'] -} +html_sidebars = {"**": ["sidebartoc.html", "sourcelink.html", "searchbox.html"]} # If true, "Created using Sphinx" is shown in the HTML footer. Default is True. html_show_sphinx = False @@ -48,15 +46,12 @@ # The paper size ('letterpaper' or 'a4paper'). # # 'papersize': 'letterpaper', - # The font size ('10pt', '11pt' or '12pt'). # # 'pointsize': '10pt', - # Additional stuff for the LaTeX preamble. # # 'preamble': '', - # Latex figure (float) alignment # # 'figure_align': 'htbp', @@ -66,6 +61,11 @@ # (source start file, target name, title, # author, documentclass [howto, manual, or own class]). latex_documents = [ - (master_doc, 'SpotROSUserDocumentation.tex', 'Spot ROS User Documentation', - 'Dave Niewinski', 'manual'), + ( + master_doc, + "SpotROSUserDocumentation.tex", + "Spot ROS User Documentation", + "Dave Niewinski", + "manual", + ), ] diff --git a/spot_driver/doc/conf.py b/spot_driver/doc/conf.py index a426a6cd..33292958 100644 --- a/spot_driver/doc/conf.py +++ b/spot_driver/doc/conf.py @@ -1,28 +1,23 @@ import os import sys -sys.path.insert(0, os.path.abspath('../scripts')) -project = 'spot_driver' -copyright = '2020, Dave Niewinski' -author = 'Dave Niewinski' +sys.path.insert(0, os.path.abspath("../scripts")) -extensions = [ -'sphinx.ext.autodoc', -'sphinx.ext.coverage', -'sphinx.ext.napoleon' -] +project = "spot_driver" +copyright = "2020, Dave Niewinski" +author = "Dave Niewinski" -html_theme = 'clearpath-sphinx-theme' +extensions = ["sphinx.ext.autodoc", "sphinx.ext.coverage", "sphinx.ext.napoleon"] + +html_theme = "clearpath-sphinx-theme" html_theme_path = ["."] -html_static_path = ['./clearpath-sphinx-theme/static'] +html_static_path = ["./clearpath-sphinx-theme/static"] -html_sidebars = { - '**': ['sidebartoc.html', 'sourcelink.html', 'searchbox.html'] -} +html_sidebars = {"**": ["sidebartoc.html", "sourcelink.html", "searchbox.html"]} html_show_sphinx = False -html_logo = 'clearpath-sphinx-theme/static/clearpathlogo.png' +html_logo = "clearpath-sphinx-theme/static/clearpathlogo.png" -html_favicon = 'clearpath-sphinx-theme/static/favicon.ico' +html_favicon = "clearpath-sphinx-theme/static/favicon.ico" diff --git a/spot_driver/setup.py b/spot_driver/setup.py index 2fb08075..e071cf4f 100644 --- a/spot_driver/setup.py +++ b/spot_driver/setup.py @@ -2,9 +2,7 @@ from catkin_pkg.python_setup import generate_distutils_setup d = generate_distutils_setup( - packages=['spot_driver'], - scripts=['scripts/spot_ros'], - package_dir={'': 'src'} + packages=["spot_driver"], scripts=["scripts/spot_ros"], package_dir={"": "src"} ) -setup(**d) \ No newline at end of file +setup(**d) diff --git a/spot_driver/src/spot_driver/graph_nav_util.py b/spot_driver/src/spot_driver/graph_nav_util.py index 96dd5c11..e35357e4 100644 --- a/spot_driver/src/spot_driver/graph_nav_util.py +++ b/spot_driver/src/spot_driver/graph_nav_util.py @@ -9,20 +9,28 @@ def id_to_short_code(id): """Convert a unique id to a 2 letter short code.""" - tokens = id.split('-') + tokens = id.split("-") if len(tokens) > 2: - return '%c%c' % (tokens[0][0], tokens[1][0]) + return "%c%c" % (tokens[0][0], tokens[1][0]) return None -def pretty_print_waypoints(waypoint_id, waypoint_name, short_code_to_count, localization_id, logger): +def pretty_print_waypoints( + waypoint_id, waypoint_name, short_code_to_count, localization_id, logger +): short_code = id_to_short_code(waypoint_id) if short_code is None or short_code_to_count[short_code] != 1: - short_code = ' ' # If the short code is not valid/unique, don't show it. + short_code = " " # If the short code is not valid/unique, don't show it. - logger.info("%s Waypoint name: %s id: %s short code: %s" % - ('->' if localization_id == waypoint_id else ' ', - waypoint_name, waypoint_id, short_code)) + logger.info( + "%s Waypoint name: %s id: %s short code: %s" + % ( + "->" if localization_id == waypoint_id else " ", + waypoint_name, + waypoint_id, + short_code, + ) + ) def find_unique_waypoint_id(short_code, graph, name_to_id, logger): @@ -35,8 +43,10 @@ def find_unique_waypoint_id(short_code, graph, name_to_id, logger): # Has an associated waypoint id! return name_to_id[short_code] else: - logger.error("The waypoint name %s is used for multiple different unique waypoints. Please use" + \ - "the waypoint id." % (short_code)) + logger.error( + "The waypoint name %s is used for multiple different unique waypoints. Please use" + + "the waypoint id." % (short_code) + ) return None # Also not an waypoint annotation name, so we will operate under the assumption that it is a # unique waypoint id. @@ -62,14 +72,17 @@ def update_waypoints_and_edges(graph, localization_id, logger): # Determine the timestamp that this waypoint was created at. timestamp = -1.0 try: - timestamp = waypoint.annotations.creation_time.seconds + waypoint.annotations.creation_time.nanos / 1e9 + timestamp = ( + waypoint.annotations.creation_time.seconds + + waypoint.annotations.creation_time.nanos / 1e9 + ) except: # Must be operating on an older graph nav map, since the creation_time is not # available within the waypoint annotations message. pass - waypoint_to_timestamp.append((waypoint.id, - timestamp, - waypoint.annotations.name)) + waypoint_to_timestamp.append( + (waypoint.id, timestamp, waypoint.annotations.name) + ) # Determine how many waypoints have the same short code. short_code = id_to_short_code(waypoint.id) @@ -91,13 +104,15 @@ def update_waypoints_and_edges(graph, localization_id, logger): # Sort the set of waypoints by their creation timestamp. If the creation timestamp is unavailable, # fallback to sorting by annotation name. - waypoint_to_timestamp = sorted(waypoint_to_timestamp, key= lambda x:(x[1], x[2])) + waypoint_to_timestamp = sorted(waypoint_to_timestamp, key=lambda x: (x[1], x[2])) # Print out the waypoints name, id, and short code in a ordered sorted by the timestamp from # when the waypoint was created. - logger.info('%d waypoints:' % len(graph.waypoints)) + logger.info("%d waypoints:" % len(graph.waypoints)) for waypoint in waypoint_to_timestamp: - pretty_print_waypoints(waypoint[0], waypoint[2], short_code_to_count, localization_id, logger) + pretty_print_waypoints( + waypoint[0], waypoint[2], short_code_to_count, localization_id, logger + ) for edge in graph.edges: if edge.id.to_waypoint in edges: @@ -105,7 +120,11 @@ def update_waypoints_and_edges(graph, localization_id, logger): edges[edge.id.to_waypoint].append(edge.id.from_waypoint) else: edges[edge.id.to_waypoint] = [edge.id.from_waypoint] - logger.info("(Edge) from waypoint id: ", edge.id.from_waypoint, " and to waypoint id: ", - edge.id.to_waypoint) + logger.info( + "(Edge) from waypoint id: ", + edge.id.from_waypoint, + " and to waypoint id: ", + edge.id.to_waypoint, + ) return name_to_id, edges diff --git a/spot_driver/src/spot_driver/ros_helpers.py b/spot_driver/src/spot_driver/ros_helpers.py index 96f84fed..4b362fe3 100644 --- a/spot_driver/src/spot_driver/ros_helpers.py +++ b/spot_driver/src/spot_driver/ros_helpers.py @@ -53,6 +53,7 @@ class DefaultCameraInfo(CameraInfo): """Blank class extending CameraInfo ROS topic that defaults most parameters""" + def __init__(self): super().__init__() self.distortion_model = "plumb_bob" @@ -106,8 +107,10 @@ def populateTransformStamped(time, parent_frame, child_frame, transform): elif hasattr(transform, "translation"): position = transform.translation else: - rospy.logerr("Trying to generate StampedTransform but input transform has neither position nor translation " - "attributes") + rospy.logerr( + "Trying to generate StampedTransform but input transform has neither position nor translation " + "attributes" + ) return TransformStamped() new_tf = TransformStamped() @@ -124,6 +127,7 @@ def populateTransformStamped(time, parent_frame, child_frame, transform): return new_tf + def getImageMsg(data, spot_wrapper): """Takes the imag and camera data and populates the necessary ROS messages @@ -199,6 +203,7 @@ def getImageMsg(data, spot_wrapper): return image_msg, camera_info_msg + def GetJointStatesFromState(state, spot_wrapper): """Maps joint state data from robot state proto to ROS JointState message @@ -209,12 +214,14 @@ def GetJointStatesFromState(state, spot_wrapper): JointState message """ joint_state = JointState() - local_time = spot_wrapper.robotToLocalTime(state.kinematic_state.acquisition_timestamp) + local_time = spot_wrapper.robotToLocalTime( + state.kinematic_state.acquisition_timestamp + ) joint_state.header.stamp = rospy.Time(local_time.seconds, local_time.nanos) for joint in state.kinematic_state.joint_states: - # there is a joint with name arm0.hr0 in the robot state, however this + # there is a joint with name arm0.hr0 in the robot state, however this # joint has no data and should not be there, this is why we ignore it - if joint.name == 'arm0.hr0': + if joint.name == "arm0.hr0": continue joint_state.name.append(friendly_joint_names.get(joint.name, "ERROR")) joint_state.position.append(joint.position.value) @@ -223,6 +230,7 @@ def GetJointStatesFromState(state, spot_wrapper): return joint_state + def GetEStopStateFromState(state, spot_wrapper): """Maps eStop state data from robot state proto to ROS EStopArray message @@ -245,6 +253,7 @@ def GetEStopStateFromState(state, spot_wrapper): return estop_array_msg + def GetFeetFromState(state, spot_wrapper): """Maps foot position state data from robot state proto to ROS FootStateArray message @@ -266,16 +275,27 @@ def GetFeetFromState(state, spot_wrapper): terrain = foot.terrain foot_msg.terrain.ground_mu_est = terrain.ground_mu_est foot_msg.terrain.frame_name = terrain.frame_name - foot_msg.terrain.foot_slip_distance_rt_frame = terrain.foot_slip_distance_rt_frame - foot_msg.terrain.foot_slip_velocity_rt_frame = terrain.foot_slip_velocity_rt_frame - foot_msg.terrain.ground_contact_normal_rt_frame = terrain.ground_contact_normal_rt_frame - foot_msg.terrain.visual_surface_ground_penetration_mean = terrain.visual_surface_ground_penetration_mean - foot_msg.terrain.visual_surface_ground_penetration_std = terrain.visual_surface_ground_penetration_std + foot_msg.terrain.foot_slip_distance_rt_frame = ( + terrain.foot_slip_distance_rt_frame + ) + foot_msg.terrain.foot_slip_velocity_rt_frame = ( + terrain.foot_slip_velocity_rt_frame + ) + foot_msg.terrain.ground_contact_normal_rt_frame = ( + terrain.ground_contact_normal_rt_frame + ) + foot_msg.terrain.visual_surface_ground_penetration_mean = ( + terrain.visual_surface_ground_penetration_mean + ) + foot_msg.terrain.visual_surface_ground_penetration_std = ( + terrain.visual_surface_ground_penetration_std + ) foot_array_msg.states.append(foot_msg) return foot_array_msg + def GetOdomTwistFromState(state, spot_wrapper): """Maps odometry data from robot state proto to ROS TwistWithCovarianceStamped message @@ -286,16 +306,31 @@ def GetOdomTwistFromState(state, spot_wrapper): TwistWithCovarianceStamped message """ twist_odom_msg = TwistWithCovarianceStamped() - local_time = spot_wrapper.robotToLocalTime(state.kinematic_state.acquisition_timestamp) + local_time = spot_wrapper.robotToLocalTime( + state.kinematic_state.acquisition_timestamp + ) twist_odom_msg.header.stamp = rospy.Time(local_time.seconds, local_time.nanos) - twist_odom_msg.twist.twist.linear.x = state.kinematic_state.velocity_of_body_in_odom.linear.x - twist_odom_msg.twist.twist.linear.y = state.kinematic_state.velocity_of_body_in_odom.linear.y - twist_odom_msg.twist.twist.linear.z = state.kinematic_state.velocity_of_body_in_odom.linear.z - twist_odom_msg.twist.twist.angular.x = state.kinematic_state.velocity_of_body_in_odom.angular.x - twist_odom_msg.twist.twist.angular.y = state.kinematic_state.velocity_of_body_in_odom.angular.y - twist_odom_msg.twist.twist.angular.z = state.kinematic_state.velocity_of_body_in_odom.angular.z + twist_odom_msg.twist.twist.linear.x = ( + state.kinematic_state.velocity_of_body_in_odom.linear.x + ) + twist_odom_msg.twist.twist.linear.y = ( + state.kinematic_state.velocity_of_body_in_odom.linear.y + ) + twist_odom_msg.twist.twist.linear.z = ( + state.kinematic_state.velocity_of_body_in_odom.linear.z + ) + twist_odom_msg.twist.twist.angular.x = ( + state.kinematic_state.velocity_of_body_in_odom.angular.x + ) + twist_odom_msg.twist.twist.angular.y = ( + state.kinematic_state.velocity_of_body_in_odom.angular.y + ) + twist_odom_msg.twist.twist.angular.z = ( + state.kinematic_state.velocity_of_body_in_odom.angular.z + ) return twist_odom_msg + def GetOdomFromState(state, spot_wrapper, use_vision=True): """Maps odometry data from robot state proto to ROS Odometry message @@ -306,15 +341,17 @@ def GetOdomFromState(state, spot_wrapper, use_vision=True): Odometry message """ odom_msg = Odometry() - local_time = spot_wrapper.robotToLocalTime(state.kinematic_state.acquisition_timestamp) + local_time = spot_wrapper.robotToLocalTime( + state.kinematic_state.acquisition_timestamp + ) odom_msg.header.stamp = rospy.Time(local_time.seconds, local_time.nanos) if use_vision == True: - odom_msg.header.frame_id = 'vision' + odom_msg.header.frame_id = "vision" tform_body = get_vision_tform_body(state.kinematic_state.transforms_snapshot) else: - odom_msg.header.frame_id = 'odom' + odom_msg.header.frame_id = "odom" tform_body = get_odom_tform_body(state.kinematic_state.transforms_snapshot) - odom_msg.child_frame_id = 'body' + odom_msg.child_frame_id = "body" pose_odom_msg = PoseWithCovariance() pose_odom_msg.pose.position.x = tform_body.position.x pose_odom_msg.pose.position.y = tform_body.position.y @@ -329,6 +366,7 @@ def GetOdomFromState(state, spot_wrapper, use_vision=True): odom_msg.twist = twist_odom_msg return odom_msg + def GetWifiFromState(state, spot_wrapper): """Maps wireless state data from robot state proto to ROS WiFiState message @@ -340,12 +378,13 @@ def GetWifiFromState(state, spot_wrapper): """ wifi_msg = WiFiState() for comm_state in state.comms_states: - if comm_state.HasField('wifi_state'): + if comm_state.HasField("wifi_state"): wifi_msg.current_mode = comm_state.wifi_state.current_mode wifi_msg.essid = comm_state.wifi_state.essid return wifi_msg + def generate_feet_tf(foot_states_msg): """ Generate a tf message containing information about foot states @@ -366,10 +405,15 @@ def generate_feet_tf(foot_states_msg): foot_transform.translation.x = foot_state.foot_position_rt_body.x foot_transform.translation.y = foot_state.foot_position_rt_body.y foot_transform.translation.z = foot_state.foot_position_rt_body.z - foot_tfs.transforms.append(populateTransformStamped(time_now, "body", foot_ordering[idx] + "_foot", foot_transform)) + foot_tfs.transforms.append( + populateTransformStamped( + time_now, "body", foot_ordering[idx] + "_foot", foot_transform + ) + ) return foot_tfs + def GetTFFromState(state, spot_wrapper, inverse_target_frame): """Maps robot link state data from robot state proto to ROS TFMessage message @@ -382,23 +426,44 @@ def GetTFFromState(state, spot_wrapper, inverse_target_frame): """ tf_msg = TFMessage() - for frame_name in state.kinematic_state.transforms_snapshot.child_to_parent_edge_map: - if state.kinematic_state.transforms_snapshot.child_to_parent_edge_map.get(frame_name).parent_frame_name: + for ( + frame_name + ) in state.kinematic_state.transforms_snapshot.child_to_parent_edge_map: + if state.kinematic_state.transforms_snapshot.child_to_parent_edge_map.get( + frame_name + ).parent_frame_name: try: - transform = state.kinematic_state.transforms_snapshot.child_to_parent_edge_map.get(frame_name) - local_time = spot_wrapper.robotToLocalTime(state.kinematic_state.acquisition_timestamp) + transform = state.kinematic_state.transforms_snapshot.child_to_parent_edge_map.get( + frame_name + ) + local_time = spot_wrapper.robotToLocalTime( + state.kinematic_state.acquisition_timestamp + ) tf_time = rospy.Time(local_time.seconds, local_time.nanos) if inverse_target_frame == frame_name: - geo_tform_inversed = SE3Pose.from_obj(transform.parent_tform_child).inverse() - new_tf = populateTransformStamped(tf_time, frame_name, transform.parent_frame_name, geo_tform_inversed) + geo_tform_inversed = SE3Pose.from_obj( + transform.parent_tform_child + ).inverse() + new_tf = populateTransformStamped( + tf_time, + frame_name, + transform.parent_frame_name, + geo_tform_inversed, + ) else: - new_tf = populateTransformStamped(tf_time, transform.parent_frame_name, frame_name, transform.parent_tform_child) + new_tf = populateTransformStamped( + tf_time, + transform.parent_frame_name, + frame_name, + transform.parent_tform_child, + ) tf_msg.transforms.append(new_tf) except Exception as e: - spot_wrapper.logger.error('Error: {}'.format(e)) + spot_wrapper.logger.error("Error: {}".format(e)) return tf_msg + def GetBatteryStatesFromState(state, spot_wrapper): """Maps battery state data from robot state proto to ROS BatteryStateArray message @@ -416,7 +481,9 @@ def GetBatteryStatesFromState(state, spot_wrapper): battery_msg.identifier = battery.identifier battery_msg.charge_percentage = battery.charge_percentage.value - battery_msg.estimated_runtime = rospy.Time(battery.estimated_runtime.seconds, battery.estimated_runtime.nanos) + battery_msg.estimated_runtime = rospy.Time( + battery.estimated_runtime.seconds, battery.estimated_runtime.nanos + ) battery_msg.current = battery.current.value battery_msg.voltage = battery.voltage.value for temp in battery.temperatures: @@ -426,6 +493,7 @@ def GetBatteryStatesFromState(state, spot_wrapper): return battery_states_array_msg + def GetPowerStatesFromState(state, spot_wrapper): """Maps power state data from robot state proto to ROS PowerState message @@ -440,10 +508,16 @@ def GetPowerStatesFromState(state, spot_wrapper): power_state_msg.header.stamp = rospy.Time(local_time.seconds, local_time.nanos) power_state_msg.motor_power_state = state.power_state.motor_power_state power_state_msg.shore_power_state = state.power_state.shore_power_state - power_state_msg.locomotion_charge_percentage = state.power_state.locomotion_charge_percentage.value - power_state_msg.locomotion_estimated_runtime = rospy.Time(state.power_state.locomotion_estimated_runtime.seconds, state.power_state.locomotion_estimated_runtime.nanos) + power_state_msg.locomotion_charge_percentage = ( + state.power_state.locomotion_charge_percentage.value + ) + power_state_msg.locomotion_estimated_runtime = rospy.Time( + state.power_state.locomotion_estimated_runtime.seconds, + state.power_state.locomotion_estimated_runtime.nanos, + ) return power_state_msg + def GetDockStatesFromState(state): """Maps dock state data from robot state proto to ROS DockState message @@ -459,6 +533,7 @@ def GetDockStatesFromState(state): dock_state_msg.power_status = state.power_status return dock_state_msg + def getBehaviorFaults(behavior_faults, spot_wrapper): """Helper function to strip out behavior faults into a list @@ -481,6 +556,7 @@ def getBehaviorFaults(behavior_faults, spot_wrapper): return faults + def getSystemFaults(system_faults, spot_wrapper): """Helper function to strip out system faults into a list @@ -510,6 +586,7 @@ def getSystemFaults(system_faults, spot_wrapper): return faults + def GetSystemFaultsFromState(state, spot_wrapper): """Maps system fault data from robot state proto to ROS SystemFaultState message @@ -520,10 +597,15 @@ def GetSystemFaultsFromState(state, spot_wrapper): SystemFaultState message """ system_fault_state_msg = SystemFaultState() - system_fault_state_msg.faults = getSystemFaults(state.system_fault_state.faults, spot_wrapper) - system_fault_state_msg.historical_faults = getSystemFaults(state.system_fault_state.historical_faults, spot_wrapper) + system_fault_state_msg.faults = getSystemFaults( + state.system_fault_state.faults, spot_wrapper + ) + system_fault_state_msg.historical_faults = getSystemFaults( + state.system_fault_state.historical_faults, spot_wrapper + ) return system_fault_state_msg + def getBehaviorFaultsFromState(state, spot_wrapper): """Maps behavior fault data from robot state proto to ROS BehaviorFaultState message @@ -534,5 +616,7 @@ def getBehaviorFaultsFromState(state, spot_wrapper): BehaviorFaultState message """ behavior_fault_state_msg = BehaviorFaultState() - behavior_fault_state_msg.faults = getBehaviorFaults(state.behavior_fault_state.faults, spot_wrapper) + behavior_fault_state_msg.faults = getBehaviorFaults( + state.behavior_fault_state.faults, spot_wrapper + ) return behavior_fault_state_msg diff --git a/spot_driver/src/spot_driver/spot_ros.py b/spot_driver/src/spot_driver/spot_ros.py index 44cb7531..6071ea2a 100644 --- a/spot_driver/src/spot_driver/spot_ros.py +++ b/spot_driver/src/spot_driver/spot_ros.py @@ -45,9 +45,21 @@ from spot_msgs.srv import Dock, DockResponse, GetDockState, GetDockStateResponse from spot_msgs.srv import PosedStand, PosedStandResponse from spot_msgs.srv import SetSwingHeight, SetSwingHeightResponse -from spot_msgs.srv import ArmJointMovement, ArmJointMovementResponse, ArmJointMovementRequest -from spot_msgs.srv import GripperAngleMove, GripperAngleMoveResponse, GripperAngleMoveRequest -from spot_msgs.srv import ArmForceTrajectory, ArmForceTrajectoryResponse, ArmForceTrajectoryRequest +from spot_msgs.srv import ( + ArmJointMovement, + ArmJointMovementResponse, + ArmJointMovementRequest, +) +from spot_msgs.srv import ( + GripperAngleMove, + GripperAngleMoveResponse, + GripperAngleMoveRequest, +) +from spot_msgs.srv import ( + ArmForceTrajectory, + ArmForceTrajectoryResponse, + ArmForceTrajectoryRequest, +) from spot_msgs.srv import HandPose, HandPoseResponse, HandPoseRequest from .ros_helpers import * @@ -57,10 +69,12 @@ import logging import threading + class RateLimitedCall: """ Wrap a function with this class to limit how frequently it can be called within a loop """ + def __init__(self, fn, rate_limit): """ @@ -69,7 +83,7 @@ def __init__(self, fn, rate_limit): rate_limit: The function will not be called faster than this rate """ self.fn = fn - self.min_time_between_calls = 1.0/rate_limit + self.min_time_between_calls = 1.0 / rate_limit self.last_call = 0 def __call__(self): @@ -78,7 +92,8 @@ def __call__(self): self.fn() self.last_call = now_sec -class SpotROS(): + +class SpotROS: """Parent class for using the wrapper. Defines all callbacks and keeps the wrapper alive""" def __init__(self): @@ -116,7 +131,7 @@ def RobotStateCB(self, results): self.odom_twist_pub.publish(twist_odom_msg) # Odom # - if self.mode_parent_odom_tf == 'vision': + if self.mode_parent_odom_tf == "vision": odom_msg = GetOdomFromState(state, self.spot_wrapper, use_vision=True) else: odom_msg = GetOdomFromState(state, self.spot_wrapper, use_vision=False) @@ -136,8 +151,13 @@ def RobotStateCB(self, results): self.wifi_pub.publish(wifi_msg) # Battery States # - battery_states_array_msg = GetBatteryStatesFromState(state, self.spot_wrapper) - self.is_charging = battery_states_array_msg.battery_states[0].status == BatteryState.STATUS_CHARGING + battery_states_array_msg = GetBatteryStatesFromState( + state, self.spot_wrapper + ) + self.is_charging = ( + battery_states_array_msg.battery_states[0].status + == BatteryState.STATUS_CHARGING + ) self.battery_pub.publish(battery_states_array_msg) # Power State # @@ -149,7 +169,9 @@ def RobotStateCB(self, results): self.system_faults_pub.publish(system_fault_state_msg) # Behavior Faults # - behavior_fault_state_msg = getBehaviorFaultsFromState(state, self.spot_wrapper) + behavior_fault_state_msg = getBehaviorFaultsFromState( + state, self.spot_wrapper + ) self.behavior_faults_pub.publish(behavior_fault_state_msg) def MetricsCB(self, results): @@ -170,9 +192,13 @@ def MetricsCB(self, results): if metric.label == "gait cycles": metrics_msg.gait_cycles = metric.int_value if metric.label == "time moving": - metrics_msg.time_moving = rospy.Time(metric.duration.seconds, metric.duration.nanos) + metrics_msg.time_moving = rospy.Time( + metric.duration.seconds, metric.duration.nanos + ) if metric.label == "electric power": - metrics_msg.electric_power = rospy.Time(metric.duration.seconds, metric.duration.nanos) + metrics_msg.electric_power = rospy.Time( + metric.duration.seconds, metric.duration.nanos + ) self.metrics_pub.publish(metrics_msg) @@ -270,7 +296,7 @@ def RearImageCB(self, results): self.populate_camera_static_transforms(data[0]) self.populate_camera_static_transforms(data[1]) - + def HandImageCB(self, results): """Callback for when the Spot Wrapper gets new hand image data. @@ -318,11 +344,17 @@ def handle_stop(self, req): resp = self.spot_wrapper.stop() message = "Spot stop service was called" if self.navigate_as.is_active(): - self.navigate_as.set_preempted(NavigateToResult(success=False, message=message)) + self.navigate_as.set_preempted( + NavigateToResult(success=False, message=message) + ) if self.trajectory_server.is_active(): - self.trajectory_server.set_preempted(TrajectoryResult(success=False, message=message)) + self.trajectory_server.set_preempted( + TrajectoryResult(success=False, message=message) + ) if self.body_pose_as.is_active(): - self.body_pose_as.set_preempted(PoseBodyResult(success=False, message=message)) + self.body_pose_as.set_preempted( + PoseBodyResult(success=False, message=message) + ) return TriggerResponse(resp[0], resp[1]) def handle_self_right(self, req): @@ -357,7 +389,9 @@ def handle_posed_stand(self, req): Returns: PosedStandResponse """ - success, message = self._posed_stand(req.body_height, req.body_yaw, req.body_pitch, req.body_roll) + success, message = self._posed_stand( + req.body_height, req.body_yaw, req.body_pitch, req.body_roll + ) return PosedStandResponse(success, message) def handle_posed_stand_action(self, action): @@ -370,7 +404,9 @@ def handle_posed_stand_action(self, action): action: PoseBodyGoal """ - success, message = self._posed_stand(action.body_height, action.yaw, action.pitch, action.roll) + success, message = self._posed_stand( + action.body_height, action.yaw, action.pitch, action.roll + ) result = PoseBodyResult(success, message) rospy.sleep(2) # Only return after the body has had a chance to move if success: @@ -398,10 +434,12 @@ def _posed_stand(self, body_height, yaw, pitch, roll): if not self.robot_allowed_to_move(autonomous_command=False): return False, "Robot motion is not allowed" - resp = self.spot_wrapper.stand(body_height=body_height, - body_yaw=math.radians(yaw), - body_pitch=math.radians(pitch), - body_roll=math.radians(roll)) + resp = self.spot_wrapper.stand( + body_height=body_height, + body_yaw=math.radians(yaw), + body_pitch=math.radians(pitch), + body_roll=math.radians(roll), + ) return resp[0], resp[1] @@ -422,7 +460,7 @@ def handle_estop_hard(self, req): def handle_estop_soft(self, req): """ROS service handler to soft-eStop the robot. The robot will try to settle on the ground before cutting - power to the motors """ + power to the motors""" resp = self.spot_wrapper.assertEStop(False) return TriggerResponse(resp[0], resp[1]) @@ -441,38 +479,42 @@ def handle_stair_mode(self, req): try: mobility_params = self.spot_wrapper.get_mobility_params() mobility_params.stair_hint = req.data - self.spot_wrapper.set_mobility_params( mobility_params ) - return SetBoolResponse(True, 'Success') + self.spot_wrapper.set_mobility_params(mobility_params) + return SetBoolResponse(True, "Success") except Exception as e: - return SetBoolResponse(False, 'Error:{}'.format(e)) + return SetBoolResponse(False, "Error:{}".format(e)) def handle_locomotion_mode(self, req): """ROS service handler to set locomotion mode""" if req.locomotion_mode in [0, 9] or req.locomotion_mode > 10: - msg = "Attempted to set locomotion mode to {}, which is an invalid value.".format(req.locomotion_mode) + msg = "Attempted to set locomotion mode to {}, which is an invalid value.".format( + req.locomotion_mode + ) rospy.logerr(msg) return SetLocomotionResponse(False, msg) try: mobility_params = self.spot_wrapper.get_mobility_params() mobility_params.locomotion_hint = req.locomotion_mode self.spot_wrapper.set_mobility_params(mobility_params) - return SetLocomotionResponse(True, 'Success') + return SetLocomotionResponse(True, "Success") except Exception as e: - return SetLocomotionResponse(False, 'Error:{}'.format(e)) + return SetLocomotionResponse(False, "Error:{}".format(e)) def handle_swing_height(self, req): """ROS service handler to set the step swing height""" if req.swing_height == 0 or req.swing_height > 3: - msg = "Attempted to set step swing height to {}, which is an invalid value.".format(req.swing_height) + msg = "Attempted to set step swing height to {}, which is an invalid value.".format( + req.swing_height + ) rospy.logerr(msg) return SetSwingHeightResponse(False, msg) try: mobility_params = self.spot_wrapper.get_mobility_params() mobility_params.swing_height = req.swing_height self.spot_wrapper.set_mobility_params(mobility_params) - return SetSwingHeightResponse(True, 'Success') + return SetSwingHeightResponse(True, "Success") except Exception as e: - return SetSwingHeightResponse(False, 'Error:{}'.format(e)) + return SetSwingHeightResponse(False, "Error:{}".format(e)) def handle_vel_limit(self, req): """ @@ -483,9 +525,11 @@ def handle_vel_limit(self, req): Returns: SetVelocityResponse """ - success, message = self.set_velocity_limits(req.velocity_limit.linear.x, - req.velocity_limit.linear.y, - req.velocity_limit.angular.z) + success, message = self.set_velocity_limits( + req.velocity_limit.linear.x, + req.velocity_limit.linear.y, + req.velocity_limit.angular.z, + ) return SetVelocityResponse(success, message) def set_velocity_limits(self, max_linear_x, max_linear_y, max_angular_z): @@ -503,21 +547,30 @@ def set_velocity_limits(self, max_linear_x, max_linear_y, max_angular_z): Returns: (bool, str) boolean indicating whether the call was successful, along with a message """ - if any(map(lambda x: 0 < x < 0.15, [max_linear_x, max_linear_y, max_angular_z])): - return False, 'Error: One of the values provided to velocity limits was below 0.15. Values in the range (' \ - '0,0.15) can cause unexpected behaviour of the trajectory command.' + if any( + map(lambda x: 0 < x < 0.15, [max_linear_x, max_linear_y, max_angular_z]) + ): + return ( + False, + "Error: One of the values provided to velocity limits was below 0.15. Values in the range (" + "0,0.15) can cause unexpected behaviour of the trajectory command.", + ) try: mobility_params = self.spot_wrapper.get_mobility_params() - mobility_params.vel_limit.CopyFrom(SE2VelocityLimit(max_vel=math_helpers.SE2Velocity(max_linear_x, - max_linear_y, - max_angular_z).to_proto(), - min_vel=math_helpers.SE2Velocity(-max_linear_x, - -max_linear_y, - -max_angular_z).to_proto())) + mobility_params.vel_limit.CopyFrom( + SE2VelocityLimit( + max_vel=math_helpers.SE2Velocity( + max_linear_x, max_linear_y, max_angular_z + ).to_proto(), + min_vel=math_helpers.SE2Velocity( + -max_linear_x, -max_linear_y, -max_angular_z + ).to_proto(), + ) + ) self.spot_wrapper.set_mobility_params(mobility_params) - return True, 'Success' + return True, "Success" except Exception as e: - return False, 'Error:{}'.format(e) + return False, "Error:{}".format(e) def _transform_pose_to_body_frame(self, pose): """ @@ -556,17 +609,23 @@ def robot_allowed_to_move(self, autonomous_command=True): """ if not self.allow_motion: - rospy.logwarn("Spot is not currently allowed to move. Use the allow_motion service to allow the robot to " - "move.") + rospy.logwarn( + "Spot is not currently allowed to move. Use the allow_motion service to allow the robot to " + "move." + ) autonomy_ok = True if autonomous_command: if not self.autonomy_enabled: - rospy.logwarn("Spot is not allowed to be autonomous because this instance of the driver was started " - "with it disabled. Set autonomy_enabled to true in the launch file to enable it.") + rospy.logwarn( + "Spot is not allowed to be autonomous because this instance of the driver was started " + "with it disabled. Set autonomy_enabled to true in the launch file to enable it." + ) autonomy_ok = False if self.is_charging: - rospy.logwarn("Spot cannot be autonomous because it is connected to shore power.") + rospy.logwarn( + "Spot cannot be autonomous because it is connected to shore power." + ) autonomy_ok = False return self.allow_motion and autonomy_ok @@ -582,7 +641,11 @@ def handle_allow_motion(self, req): """ self.allow_motion = req.data - rospy.loginfo("Robot motion is now {}".format("allowed" if self.allow_motion else "disallowed")) + rospy.loginfo( + "Robot motion is now {}".format( + "allowed" if self.allow_motion else "disallowed" + ) + ) if not self.allow_motion: # Always send a stop command if disallowing motion, in case the robot is moving when it is sent self.spot_wrapper.stop() @@ -603,25 +666,39 @@ def handle_obstacle_params(self, req): # Currently only the obstacle setting that we allow is the padding. The previous value is always overwritten # Clamp to the range [0, 0.5] as on the controller if req.obstacle_params.obstacle_avoidance_padding < 0: - rospy.logwarn("Received padding value of {}, clamping to 0".format( - req.obstacle_params.obstacle_avoidance_padding)) + rospy.logwarn( + "Received padding value of {}, clamping to 0".format( + req.obstacle_params.obstacle_avoidance_padding + ) + ) req.obstacle_params.obstacle_avoidance_padding = 0 if req.obstacle_params.obstacle_avoidance_padding > 0.5: - rospy.logwarn("Received padding value of {}, clamping to 0.5".format( - req.obstacle_params.obstacle_avoidance_padding)) + rospy.logwarn( + "Received padding value of {}, clamping to 0.5".format( + req.obstacle_params.obstacle_avoidance_padding + ) + ) req.obstacle_params.obstacle_avoidance_padding = 0.5 disable_notallowed = "" - if any([req.obstacle_params.disable_vision_foot_obstacle_avoidance, + if any( + [ + req.obstacle_params.disable_vision_foot_obstacle_avoidance, req.obstacle_params.disable_vision_foot_constraint_avoidance, req.obstacle_params.disable_vision_body_obstacle_avoidance, req.obstacle_params.disable_vision_foot_obstacle_body_assist, req.obstacle_params.disable_vision_negative_obstacles, - ]): + ] + ): disable_notallowed = " Disabling any of the obstacle avoidance components is not currently allowed." - rospy.logerr("At least one of the disable settings on obstacle params was true." + disable_notallowed) + rospy.logerr( + "At least one of the disable settings on obstacle params was true." + + disable_notallowed + ) - obstacle_params.obstacle_avoidance_padding = req.obstacle_params.obstacle_avoidance_padding + obstacle_params.obstacle_avoidance_padding = ( + req.obstacle_params.obstacle_avoidance_padding + ) mobility_params.obstacle_params.CopyFrom(obstacle_params) self.spot_wrapper.set_mobility_params(mobility_params) @@ -644,16 +721,28 @@ def handle_terrain_params(self, req): if 0.2 <= req.terrain_params.ground_mu_hint <= 0.8: # For some reason assignment to ground_mu_hint is not allowed once the terrain params are initialised # Must initialise with the protobuf type DoubleValue for initialisation to work - terrain_params = spot_command_pb2.TerrainParams(ground_mu_hint=DoubleValue(value=req.terrain_params.ground_mu_hint)) + terrain_params = spot_command_pb2.TerrainParams( + ground_mu_hint=DoubleValue(value=req.terrain_params.ground_mu_hint) + ) else: - return False, "Failed to set terrain params, ground_mu_hint of {} is not in the range [0.4, 0.8]".format( - req.terrain_params.ground_mu_hint) + return ( + False, + "Failed to set terrain params, ground_mu_hint of {} is not in the range [0.4, 0.8]".format( + req.terrain_params.ground_mu_hint + ), + ) if req.terrain_params.grated_surfaces_mode in [1, 2, 3]: - terrain_params.grated_surfaces_mode = req.terrain_params.grated_surfaces_mode + terrain_params.grated_surfaces_mode = ( + req.terrain_params.grated_surfaces_mode + ) else: - return False, "Failed to set terrain params, grated_surfaces_mode {} was not one of [1, 2, 3]".format( - req.terrain_params.grated_surfaces_mode) + return ( + False, + "Failed to set terrain params, grated_surfaces_mode {} was not one of [1, 2, 3]".format( + req.terrain_params.grated_surfaces_mode + ), + ) mobility_params.terrain_params.CopyFrom(terrain_params) self.spot_wrapper.set_mobility_params(mobility_params) @@ -671,69 +760,110 @@ def trajectory_callback(self, msg): Returns: """ if not self.robot_allowed_to_move(): - rospy.logerr("Trajectory topic received a message but the robot is not allowed to move.") + rospy.logerr( + "Trajectory topic received a message but the robot is not allowed to move." + ) return try: - self._send_trajectory_command(self._transform_pose_to_body_frame(msg), rospy.Duration(5)) + self._send_trajectory_command( + self._transform_pose_to_body_frame(msg), rospy.Duration(5) + ) except tf2_ros.LookupException as e: rospy.logerr(str(e)) def handle_trajectory(self, req): """ROS actionserver execution handler to handle receiving a request to move to a location""" if not self.robot_allowed_to_move(): - rospy.logerr("Trajectory service was called but robot is not allowed to move") - self.trajectory_server.set_aborted(TrajectoryResult(False, 'Robot is not allowed to move.')) + rospy.logerr( + "Trajectory service was called but robot is not allowed to move" + ) + self.trajectory_server.set_aborted( + TrajectoryResult(False, "Robot is not allowed to move.") + ) return target_pose = req.target_pose - if req.target_pose.header.frame_id != 'body': + if req.target_pose.header.frame_id != "body": rospy.logwarn("Pose given was not in the body frame, will transform") try: target_pose = self._transform_pose_to_body_frame(target_pose) except tf2_ros.LookupException as e: - self.trajectory_server.set_aborted(TrajectoryResult(False, 'Could not transform pose into body frame')) + self.trajectory_server.set_aborted( + TrajectoryResult(False, "Could not transform pose into body frame") + ) return if req.duration.data.to_sec() <= 0: - self.trajectory_server.set_aborted(TrajectoryResult(False, 'duration must be larger than 0')) + self.trajectory_server.set_aborted( + TrajectoryResult(False, "duration must be larger than 0") + ) return cmd_duration = rospy.Duration(req.duration.data.secs, req.duration.data.nsecs) - resp = self._send_trajectory_command(target_pose, cmd_duration, req.precise_positioning) + resp = self._send_trajectory_command( + target_pose, cmd_duration, req.precise_positioning + ) def timeout_cb(trajectory_server, _): - trajectory_server.publish_feedback(TrajectoryFeedback("Failed to reach goal, timed out")) - trajectory_server.set_aborted(TrajectoryResult(False, "Failed to reach goal, timed out")) + trajectory_server.publish_feedback( + TrajectoryFeedback("Failed to reach goal, timed out") + ) + trajectory_server.set_aborted( + TrajectoryResult(False, "Failed to reach goal, timed out") + ) # Abort the actionserver if cmd_duration is exceeded - the driver stops but does not provide feedback to # indicate this so we monitor it ourselves - cmd_timeout = rospy.Timer(cmd_duration, functools.partial(timeout_cb, self.trajectory_server), oneshot=True) + cmd_timeout = rospy.Timer( + cmd_duration, + functools.partial(timeout_cb, self.trajectory_server), + oneshot=True, + ) # Sleep to allow some feedback to come through from the trajectory command rospy.sleep(0.25) if self.spot_wrapper._trajectory_status_unknown: - rospy.logerr("Sent trajectory request to spot but received unknown feedback. Resending command. This will " - "only be attempted once") + rospy.logerr( + "Sent trajectory request to spot but received unknown feedback. Resending command. This will " + "only be attempted once" + ) # If we receive an unknown result from the trajectory, something went wrong internally (not # catastrophically). We need to resend the command, because getting status unknown happens right when # the command is sent. It's unclear right now why this happens - resp = self._send_trajectory_command(target_pose, cmd_duration, req.precise_positioning) + resp = self._send_trajectory_command( + target_pose, cmd_duration, req.precise_positioning + ) cmd_timeout.shutdown() - cmd_timeout = rospy.Timer(cmd_duration, functools.partial(timeout_cb, self.trajectory_server), - oneshot=True) + cmd_timeout = rospy.Timer( + cmd_duration, + functools.partial(timeout_cb, self.trajectory_server), + oneshot=True, + ) # The trajectory command is non-blocking but we need to keep this function up in order to interrupt if a # preempt is requested and to return success if/when the robot reaches the goal. Also check the is_active to # monitor whether the timeout_cb has already aborted the command rate = rospy.Rate(10) - while not rospy.is_shutdown() and not self.trajectory_server.is_preempt_requested() and not self.spot_wrapper.at_goal and self.trajectory_server.is_active() and not self.spot_wrapper._trajectory_status_unknown: + while ( + not rospy.is_shutdown() + and not self.trajectory_server.is_preempt_requested() + and not self.spot_wrapper.at_goal + and self.trajectory_server.is_active() + and not self.spot_wrapper._trajectory_status_unknown + ): if self.spot_wrapper.near_goal: if self.spot_wrapper._last_trajectory_command_precise: - self.trajectory_server.publish_feedback(TrajectoryFeedback("Near goal, performing final adjustments")) + self.trajectory_server.publish_feedback( + TrajectoryFeedback("Near goal, performing final adjustments") + ) else: - self.trajectory_server.publish_feedback(TrajectoryFeedback("Near goal")) + self.trajectory_server.publish_feedback( + TrajectoryFeedback("Near goal") + ) else: - self.trajectory_server.publish_feedback(TrajectoryFeedback("Moving to goal")) + self.trajectory_server.publish_feedback( + TrajectoryFeedback("Moving to goal") + ) rate.sleep() # If still active after exiting the loop, the command did not time out @@ -745,11 +875,17 @@ def timeout_cb(trajectory_server, _): self.spot_wrapper.stop() if self.spot_wrapper.at_goal: - self.trajectory_server.publish_feedback(TrajectoryFeedback("Reached goal")) + self.trajectory_server.publish_feedback( + TrajectoryFeedback("Reached goal") + ) self.trajectory_server.set_succeeded(TrajectoryResult(resp[0], resp[1])) else: - self.trajectory_server.publish_feedback(TrajectoryFeedback("Failed to reach goal")) - self.trajectory_server.set_aborted(TrajectoryResult(False, "Failed to reach goal")) + self.trajectory_server.publish_feedback( + TrajectoryFeedback("Failed to reach goal") + ) + self.trajectory_server.set_aborted( + TrajectoryResult(False, "Failed to reach goal") + ) def handle_roll_over_right(self, req): """Robot sit down and roll on to it its side for easier battery access""" @@ -852,7 +988,14 @@ def handle_in_motion_or_idle_body_pose(self, goal): # If the body_pose is empty, we use the rpy + height components instead if goal.body_pose == Pose(): # If the rpy+body height are all zero then we set the body to neutral pose - if not any([goal.roll, goal.pitch, goal.yaw, not math.isclose(goal.body_height, 0, abs_tol=1e-9)]): + if not any( + [ + goal.roll, + goal.pitch, + goal.yaw, + not math.isclose(goal.body_height, 0, abs_tol=1e-9), + ] + ): pose = Pose() pose.orientation.w = 1 self._set_in_motion_or_idle_body_pose(pose) @@ -874,7 +1017,9 @@ def handle_in_motion_or_idle_body_pose(self, goal): self._set_in_motion_or_idle_body_pose(goal.body_pose) # Give it some time to move rospy.sleep(2) - self.body_pose_as.set_succeeded(PoseBodyResult(success=True, message="Successfully posed body")) + self.body_pose_as.set_succeeded( + PoseBodyResult(success=True, message="Successfully posed body") + ) def _set_in_motion_or_idle_body_pose(self, pose): """ @@ -909,27 +1054,37 @@ def handle_list_graph(self, upload_path): def handle_navigate_to_feedback(self): """Thread function to send navigate_to feedback""" while not rospy.is_shutdown() and self.run_navigate_to: - localization_state = self.spot_wrapper._graph_nav_client.get_localization_state() + localization_state = ( + self.spot_wrapper._graph_nav_client.get_localization_state() + ) if localization_state.localization.waypoint_id: - self.navigate_as.publish_feedback(NavigateToFeedback(localization_state.localization.waypoint_id)) + self.navigate_as.publish_feedback( + NavigateToFeedback(localization_state.localization.waypoint_id) + ) rospy.Rate(10).sleep() def handle_navigate_to(self, msg): """ROS service handler to run mission of the robot. The robot will replay a mission""" if not self.robot_allowed_to_move(): rospy.logerr("navigate_to was requested but robot is not allowed to move.") - self.navigate_as.set_aborted(NavigateToResult(False, "Autonomy is not enabled")) + self.navigate_as.set_aborted( + NavigateToResult(False, "Autonomy is not enabled") + ) return # create thread to periodically publish feedback - feedback_thraed = threading.Thread(target = self.handle_navigate_to_feedback, args = ()) + feedback_thraed = threading.Thread( + target=self.handle_navigate_to_feedback, args=() + ) self.run_navigate_to = True feedback_thraed.start() # run navigate_to - resp = self.spot_wrapper.navigate_to(upload_path = msg.upload_path, - navigate_to = msg.navigate_to, - initial_localization_fiducial = msg.initial_localization_fiducial, - initial_localization_waypoint = msg.initial_localization_waypoint) + resp = self.spot_wrapper.navigate_to( + upload_path=msg.upload_path, + navigate_to=msg.navigate_to, + initial_localization_fiducial=msg.initial_localization_fiducial, + initial_localization_waypoint=msg.initial_localization_waypoint, + ) self.run_navigate_to = False feedback_thraed.join() @@ -951,27 +1106,50 @@ def populate_camera_static_transforms(self, image_data): # We exclude the odometry frames from static transforms since they are not static. We can ignore the body # frame because it is a child of odom or vision depending on the mode_parent_odom_tf, and will be published # by the non-static transform publishing that is done by the state callback - excluded_frames = [self.tf_name_vision_odom, self.tf_name_kinematic_odom, "body"] + excluded_frames = [ + self.tf_name_vision_odom, + self.tf_name_kinematic_odom, + "body", + ] for frame_name in image_data.shot.transforms_snapshot.child_to_parent_edge_map: if frame_name in excluded_frames: continue - parent_frame = image_data.shot.transforms_snapshot.child_to_parent_edge_map.get(frame_name).parent_frame_name - existing_transforms = [(transform.header.frame_id, transform.child_frame_id) for transform in self.camera_static_transforms] + parent_frame = ( + image_data.shot.transforms_snapshot.child_to_parent_edge_map.get( + frame_name + ).parent_frame_name + ) + existing_transforms = [ + (transform.header.frame_id, transform.child_frame_id) + for transform in self.camera_static_transforms + ] if (parent_frame, frame_name) in existing_transforms: # We already extracted this transform continue - transform = image_data.shot.transforms_snapshot.child_to_parent_edge_map.get(frame_name) - local_time = self.spot_wrapper.robotToLocalTime(image_data.shot.acquisition_time) + transform = ( + image_data.shot.transforms_snapshot.child_to_parent_edge_map.get( + frame_name + ) + ) + local_time = self.spot_wrapper.robotToLocalTime( + image_data.shot.acquisition_time + ) tf_time = rospy.Time(local_time.seconds, local_time.nanos) - static_tf = populateTransformStamped(tf_time, transform.parent_frame_name, frame_name, - transform.parent_tform_child) + static_tf = populateTransformStamped( + tf_time, + transform.parent_frame_name, + frame_name, + transform.parent_tform_child, + ) self.camera_static_transforms.append(static_tf) - self.camera_static_transform_broadcaster.sendTransform(self.camera_static_transforms) + self.camera_static_transform_broadcaster.sendTransform( + self.camera_static_transforms + ) # Arm functions ################################################## def handle_arm_stow(self, srv_data): - """ROS service handler to command the arm to stow, home position """ + """ROS service handler to command the arm to stow, home position""" resp = self.spot_wrapper.arm_stow() return TriggerResponse(resp[0], resp[1]) @@ -981,15 +1159,15 @@ def handle_arm_unstow(self, srv_data): return TriggerResponse(resp[0], resp[1]) def handle_arm_joint_move(self, srv_data: ArmJointMovementRequest): - """ROS service handler to send joint movement to the arm to execute""" - resp = self.spot_wrapper.arm_joint_move(joint_targets = srv_data.joint_target) + """ROS service handler to send joint movement to the arm to execute""" + resp = self.spot_wrapper.arm_joint_move(joint_targets=srv_data.joint_target) return ArmJointMovementResponse(resp[0], resp[1]) - + def handle_force_trajectory(self, srv_data: ArmForceTrajectoryRequest): """ROS service handler to send a force trajectory up or down a vertical force""" resp = self.spot_wrapper.force_trajectory(data=srv_data) return ArmForceTrajectoryResponse(resp[0], resp[1]) - + def handle_gripper_open(self, srv_data): """ROS service handler to open the gripper""" resp = self.spot_wrapper.gripper_open() @@ -998,26 +1176,25 @@ def handle_gripper_open(self, srv_data): def handle_gripper_close(self, srv_data): """ROS service handler to close the gripper""" resp = self.spot_wrapper.gripper_close() - return TriggerResponse(resp[0], resp[1]) - + return TriggerResponse(resp[0], resp[1]) + def handle_gripper_angle_open(self, srv_data: GripperAngleMoveRequest): """ROS service handler to open the gripper at an angle""" - resp = self.spot_wrapper.gripper_angle_open(gripper_ang = srv_data.gripper_angle) + resp = self.spot_wrapper.gripper_angle_open(gripper_ang=srv_data.gripper_angle) return GripperAngleMoveResponse(resp[0], resp[1]) - + def handle_arm_carry(self, srv_data): """ROS service handler to put arm in carry mode""" resp = self.spot_wrapper.arm_carry() return TriggerResponse(resp[0], resp[1]) - + def handle_hand_pose(self, srv_data: HandPoseRequest): """ROS service to give a position to the gripper""" - resp = self.spot_wrapper.hand_pose(pose_points = srv_data.pose_point) + resp = self.spot_wrapper.hand_pose(pose_points=srv_data.pose_point) return HandPoseResponse(resp[0], resp[1]) - ################################################################## - + def shutdown(self): rospy.loginfo("Shutting down ROS driver for Spot") self.spot_wrapper.sit() @@ -1028,54 +1205,89 @@ def publish_mobility_params(self): mobility_params_msg = MobilityParams() try: mobility_params = self.spot_wrapper.get_mobility_params() - mobility_params_msg.body_control.position.x = \ - mobility_params.body_control.base_offset_rt_footprint.points[0].pose.position.x - mobility_params_msg.body_control.position.y = \ - mobility_params.body_control.base_offset_rt_footprint.points[0].pose.position.y - mobility_params_msg.body_control.position.z = \ - mobility_params.body_control.base_offset_rt_footprint.points[0].pose.position.z - mobility_params_msg.body_control.orientation.x = \ - mobility_params.body_control.base_offset_rt_footprint.points[0].pose.rotation.x - mobility_params_msg.body_control.orientation.y = \ - mobility_params.body_control.base_offset_rt_footprint.points[0].pose.rotation.y - mobility_params_msg.body_control.orientation.z = \ - mobility_params.body_control.base_offset_rt_footprint.points[0].pose.rotation.z - mobility_params_msg.body_control.orientation.w = \ - mobility_params.body_control.base_offset_rt_footprint.points[0].pose.rotation.w + mobility_params_msg.body_control.position.x = ( + mobility_params.body_control.base_offset_rt_footprint.points[ + 0 + ].pose.position.x + ) + mobility_params_msg.body_control.position.y = ( + mobility_params.body_control.base_offset_rt_footprint.points[ + 0 + ].pose.position.y + ) + mobility_params_msg.body_control.position.z = ( + mobility_params.body_control.base_offset_rt_footprint.points[ + 0 + ].pose.position.z + ) + mobility_params_msg.body_control.orientation.x = ( + mobility_params.body_control.base_offset_rt_footprint.points[ + 0 + ].pose.rotation.x + ) + mobility_params_msg.body_control.orientation.y = ( + mobility_params.body_control.base_offset_rt_footprint.points[ + 0 + ].pose.rotation.y + ) + mobility_params_msg.body_control.orientation.z = ( + mobility_params.body_control.base_offset_rt_footprint.points[ + 0 + ].pose.rotation.z + ) + mobility_params_msg.body_control.orientation.w = ( + mobility_params.body_control.base_offset_rt_footprint.points[ + 0 + ].pose.rotation.w + ) mobility_params_msg.locomotion_hint = mobility_params.locomotion_hint mobility_params_msg.stair_hint = mobility_params.stair_hint mobility_params_msg.swing_height = mobility_params.swing_height - mobility_params_msg.obstacle_params.obstacle_avoidance_padding = \ + mobility_params_msg.obstacle_params.obstacle_avoidance_padding = ( mobility_params.obstacle_params.obstacle_avoidance_padding - mobility_params_msg.obstacle_params.disable_vision_foot_obstacle_avoidance = \ + ) + mobility_params_msg.obstacle_params.disable_vision_foot_obstacle_avoidance = ( mobility_params.obstacle_params.disable_vision_foot_obstacle_avoidance - mobility_params_msg.obstacle_params.disable_vision_foot_constraint_avoidance = \ + ) + mobility_params_msg.obstacle_params.disable_vision_foot_constraint_avoidance = ( mobility_params.obstacle_params.disable_vision_foot_constraint_avoidance - mobility_params_msg.obstacle_params.disable_vision_body_obstacle_avoidance = \ + ) + mobility_params_msg.obstacle_params.disable_vision_body_obstacle_avoidance = ( mobility_params.obstacle_params.disable_vision_body_obstacle_avoidance - mobility_params_msg.obstacle_params.disable_vision_foot_obstacle_body_assist = \ + ) + mobility_params_msg.obstacle_params.disable_vision_foot_obstacle_body_assist = ( mobility_params.obstacle_params.disable_vision_foot_obstacle_body_assist - mobility_params_msg.obstacle_params.disable_vision_negative_obstacles = \ + ) + mobility_params_msg.obstacle_params.disable_vision_negative_obstacles = ( mobility_params.obstacle_params.disable_vision_negative_obstacles + ) if mobility_params.HasField("terrain_params"): if mobility_params.terrain_params.HasField("ground_mu_hint"): - mobility_params_msg.terrain_params.ground_mu_hint = \ + mobility_params_msg.terrain_params.ground_mu_hint = ( mobility_params.terrain_params.ground_mu_hint + ) # hasfield does not work on grated surfaces mode if hasattr(mobility_params.terrain_params, "grated_surfaces_mode"): - mobility_params_msg.terrain_params.grated_surfaces_mode = \ + mobility_params_msg.terrain_params.grated_surfaces_mode = ( mobility_params.terrain_params.grated_surfaces_mode + ) # The velocity limit values can be set independently so make sure each of them exists before setting if mobility_params.HasField("vel_limit"): if hasattr(mobility_params.vel_limit.max_vel.linear, "x"): - mobility_params_msg.velocity_limit.linear.x = mobility_params.vel_limit.max_vel.linear.x + mobility_params_msg.velocity_limit.linear.x = ( + mobility_params.vel_limit.max_vel.linear.x + ) if hasattr(mobility_params.vel_limit.max_vel.linear, "y"): - mobility_params_msg.velocity_limit.linear.y = mobility_params.vel_limit.max_vel.linear.y + mobility_params_msg.velocity_limit.linear.y = ( + mobility_params.vel_limit.max_vel.linear.y + ) if hasattr(mobility_params.vel_limit.max_vel, "angular"): - mobility_params_msg.velocity_limit.angular.z = mobility_params.vel_limit.max_vel.angular + mobility_params_msg.velocity_limit.angular.z = ( + mobility_params.vel_limit.max_vel.angular + ) except Exception as e: - rospy.logerr('Error:{}'.format(e)) + rospy.logerr("Error:{}".format(e)) pass self.mobility_params_pub.publish(mobility_params_msg) @@ -1100,9 +1312,9 @@ def publish_allow_motion(self): def main(self): """Main function for the SpotROS class. Gets config from ROS and initializes the wrapper. Holds lease from wrapper and updates all async tasks at the ROS rate""" - rospy.init_node('spot_ros', anonymous=True) + rospy.init_node("spot_ros", anonymous=True) - self.rates = rospy.get_param('~rates', {}) + self.rates = rospy.get_param("~rates", {}) if "loop_frequency" in self.rates: loop_rate = self.rates["loop_frequency"] else: @@ -1110,17 +1322,21 @@ def main(self): for param, rate in self.rates.items(): if rate > loop_rate: - rospy.logwarn("{} has a rate of {} specified, which is higher than the loop rate of {}. It will not " - "be published at the expected frequency".format(param, rate, loop_rate)) + rospy.logwarn( + "{} has a rate of {} specified, which is higher than the loop rate of {}. It will not " + "be published at the expected frequency".format( + param, rate, loop_rate + ) + ) rate = rospy.Rate(loop_rate) - self.username = rospy.get_param('~username', 'default_value') - self.password = rospy.get_param('~password', 'default_value') - self.hostname = rospy.get_param('~hostname', 'default_value') - self.motion_deadzone = rospy.get_param('~deadzone', 0.05) - self.estop_timeout = rospy.get_param('~estop_timeout', 9.0) - self.autonomy_enabled = rospy.get_param('~autonomy_enabled', True) - self.allow_motion = rospy.get_param('~allow_motion', True) + self.username = rospy.get_param("~username", "default_value") + self.password = rospy.get_param("~password", "default_value") + self.hostname = rospy.get_param("~hostname", "default_value") + self.motion_deadzone = rospy.get_param("~deadzone", 0.05) + self.estop_timeout = rospy.get_param("~estop_timeout", 9.0) + self.autonomy_enabled = rospy.get_param("~autonomy_enabled", True) + self.allow_motion = rospy.get_param("~allow_motion", True) self.is_charging = False self.tf_buffer = tf2_ros.Buffer() @@ -1136,88 +1352,179 @@ def main(self): # Spot has 2 types of odometries: 'odom' and 'vision' # The former one is kinematic odometry and the second one is a combined odometry of vision and kinematics # These params enables to change which odometry frame is a parent of body frame and to change tf names of each odometry frames. - self.mode_parent_odom_tf = rospy.get_param('~mode_parent_odom_tf', 'odom') # 'vision' or 'odom' - self.tf_name_kinematic_odom = rospy.get_param('~tf_name_kinematic_odom', 'odom') - self.tf_name_raw_kinematic = 'odom' - self.tf_name_vision_odom = rospy.get_param('~tf_name_vision_odom', 'vision') - self.tf_name_raw_vision = 'vision' - if self.mode_parent_odom_tf != self.tf_name_raw_kinematic and self.mode_parent_odom_tf != self.tf_name_raw_vision: - rospy.logerr('rosparam \'~mode_parent_odom_tf\' should be \'odom\' or \'vision\'.') + self.mode_parent_odom_tf = rospy.get_param( + "~mode_parent_odom_tf", "odom" + ) # 'vision' or 'odom' + self.tf_name_kinematic_odom = rospy.get_param("~tf_name_kinematic_odom", "odom") + self.tf_name_raw_kinematic = "odom" + self.tf_name_vision_odom = rospy.get_param("~tf_name_vision_odom", "vision") + self.tf_name_raw_vision = "vision" + if ( + self.mode_parent_odom_tf != self.tf_name_raw_kinematic + and self.mode_parent_odom_tf != self.tf_name_raw_vision + ): + rospy.logerr( + "rosparam '~mode_parent_odom_tf' should be 'odom' or 'vision'." + ) return - self.logger = logging.getLogger('rosout') + self.logger = logging.getLogger("rosout") rospy.loginfo("Starting ROS driver for Spot") - self.spot_wrapper = SpotWrapper(self.username, self.password, self.hostname, self.logger, self.estop_timeout, self.rates, self.callbacks) + self.spot_wrapper = SpotWrapper( + self.username, + self.password, + self.hostname, + self.logger, + self.estop_timeout, + self.rates, + self.callbacks, + ) if not self.spot_wrapper.is_valid: return # Images # - self.back_image_pub = rospy.Publisher('camera/back/image', Image, queue_size=10) - self.frontleft_image_pub = rospy.Publisher('camera/frontleft/image', Image, queue_size=10) - self.frontright_image_pub = rospy.Publisher('camera/frontright/image', Image, queue_size=10) - self.left_image_pub = rospy.Publisher('camera/left/image', Image, queue_size=10) - self.right_image_pub = rospy.Publisher('camera/right/image', Image, queue_size=10) - self.hand_image_mono_pub = rospy.Publisher('camera/hand_mono/image', Image, queue_size=10) - self.hand_image_color_pub = rospy.Publisher('camera/hand_color/image', Image, queue_size=10) + self.back_image_pub = rospy.Publisher("camera/back/image", Image, queue_size=10) + self.frontleft_image_pub = rospy.Publisher( + "camera/frontleft/image", Image, queue_size=10 + ) + self.frontright_image_pub = rospy.Publisher( + "camera/frontright/image", Image, queue_size=10 + ) + self.left_image_pub = rospy.Publisher("camera/left/image", Image, queue_size=10) + self.right_image_pub = rospy.Publisher( + "camera/right/image", Image, queue_size=10 + ) + self.hand_image_mono_pub = rospy.Publisher( + "camera/hand_mono/image", Image, queue_size=10 + ) + self.hand_image_color_pub = rospy.Publisher( + "camera/hand_color/image", Image, queue_size=10 + ) # Depth # - self.back_depth_pub = rospy.Publisher('depth/back/image', Image, queue_size=10) - self.frontleft_depth_pub = rospy.Publisher('depth/frontleft/image', Image, queue_size=10) - self.frontright_depth_pub = rospy.Publisher('depth/frontright/image', Image, queue_size=10) - self.left_depth_pub = rospy.Publisher('depth/left/image', Image, queue_size=10) - self.right_depth_pub = rospy.Publisher('depth/right/image', Image, queue_size=10) - self.hand_depth_pub = rospy.Publisher('depth/hand/image', Image, queue_size=10) - self.hand_depth_in_hand_color_pub = rospy.Publisher('depth/hand/depth_in_color', Image, queue_size=10) - self.frontleft_depth_in_visual_pub = rospy.Publisher('depth/frontleft/depth_in_visual', Image, queue_size=10) - self.frontright_depth_in_visual_pub = rospy.Publisher('depth/frontright/depth_in_visual', Image, queue_size=10) + self.back_depth_pub = rospy.Publisher("depth/back/image", Image, queue_size=10) + self.frontleft_depth_pub = rospy.Publisher( + "depth/frontleft/image", Image, queue_size=10 + ) + self.frontright_depth_pub = rospy.Publisher( + "depth/frontright/image", Image, queue_size=10 + ) + self.left_depth_pub = rospy.Publisher("depth/left/image", Image, queue_size=10) + self.right_depth_pub = rospy.Publisher( + "depth/right/image", Image, queue_size=10 + ) + self.hand_depth_pub = rospy.Publisher("depth/hand/image", Image, queue_size=10) + self.hand_depth_in_hand_color_pub = rospy.Publisher( + "depth/hand/depth_in_color", Image, queue_size=10 + ) + self.frontleft_depth_in_visual_pub = rospy.Publisher( + "depth/frontleft/depth_in_visual", Image, queue_size=10 + ) + self.frontright_depth_in_visual_pub = rospy.Publisher( + "depth/frontright/depth_in_visual", Image, queue_size=10 + ) # Image Camera Info # - self.back_image_info_pub = rospy.Publisher('camera/back/camera_info', CameraInfo, queue_size=10) - self.frontleft_image_info_pub = rospy.Publisher('camera/frontleft/camera_info', CameraInfo, queue_size=10) - self.frontright_image_info_pub = rospy.Publisher('camera/frontright/camera_info', CameraInfo, queue_size=10) - self.left_image_info_pub = rospy.Publisher('camera/left/camera_info', CameraInfo, queue_size=10) - self.right_image_info_pub = rospy.Publisher('camera/right/camera_info', CameraInfo, queue_size=10) - self.hand_image_mono_info_pub = rospy.Publisher('camera/hand_mono/camera_info', CameraInfo, queue_size=10) - self.hand_image_color_info_pub = rospy.Publisher('camera/hand_color/camera_info', CameraInfo, queue_size=10) - - # Depth Camera Info # - self.back_depth_info_pub = rospy.Publisher('depth/back/camera_info', CameraInfo, queue_size=10) - self.frontleft_depth_info_pub = rospy.Publisher('depth/frontleft/camera_info', CameraInfo, queue_size=10) - self.frontright_depth_info_pub = rospy.Publisher('depth/frontright/camera_info', CameraInfo, queue_size=10) - self.left_depth_info_pub = rospy.Publisher('depth/left/camera_info', CameraInfo, queue_size=10) - self.right_depth_info_pub = rospy.Publisher('depth/right/camera_info', CameraInfo, queue_size=10) - self.hand_depth_info_pub = rospy.Publisher('depth/hand/camera_info', CameraInfo, queue_size=10) - self.hand_depth_in_color_info_pub = rospy.Publisher('camera/hand/depth_in_color/camera_info', CameraInfo, queue_size=10) - self.frontleft_depth_in_visual_info_pub = rospy.Publisher('depth/frontleft/depth_in_visual/camera_info', CameraInfo, queue_size=10) - self.frontright_depth_in_visual_info_pub = rospy.Publisher('depth/frontright/depth_in_visual/camera_info', CameraInfo, queue_size=10) + self.back_image_info_pub = rospy.Publisher( + "camera/back/camera_info", CameraInfo, queue_size=10 + ) + self.frontleft_image_info_pub = rospy.Publisher( + "camera/frontleft/camera_info", CameraInfo, queue_size=10 + ) + self.frontright_image_info_pub = rospy.Publisher( + "camera/frontright/camera_info", CameraInfo, queue_size=10 + ) + self.left_image_info_pub = rospy.Publisher( + "camera/left/camera_info", CameraInfo, queue_size=10 + ) + self.right_image_info_pub = rospy.Publisher( + "camera/right/camera_info", CameraInfo, queue_size=10 + ) + self.hand_image_mono_info_pub = rospy.Publisher( + "camera/hand_mono/camera_info", CameraInfo, queue_size=10 + ) + self.hand_image_color_info_pub = rospy.Publisher( + "camera/hand_color/camera_info", CameraInfo, queue_size=10 + ) + # Depth Camera Info # + self.back_depth_info_pub = rospy.Publisher( + "depth/back/camera_info", CameraInfo, queue_size=10 + ) + self.frontleft_depth_info_pub = rospy.Publisher( + "depth/frontleft/camera_info", CameraInfo, queue_size=10 + ) + self.frontright_depth_info_pub = rospy.Publisher( + "depth/frontright/camera_info", CameraInfo, queue_size=10 + ) + self.left_depth_info_pub = rospy.Publisher( + "depth/left/camera_info", CameraInfo, queue_size=10 + ) + self.right_depth_info_pub = rospy.Publisher( + "depth/right/camera_info", CameraInfo, queue_size=10 + ) + self.hand_depth_info_pub = rospy.Publisher( + "depth/hand/camera_info", CameraInfo, queue_size=10 + ) + self.hand_depth_in_color_info_pub = rospy.Publisher( + "camera/hand/depth_in_color/camera_info", CameraInfo, queue_size=10 + ) + self.frontleft_depth_in_visual_info_pub = rospy.Publisher( + "depth/frontleft/depth_in_visual/camera_info", CameraInfo, queue_size=10 + ) + self.frontright_depth_in_visual_info_pub = rospy.Publisher( + "depth/frontright/depth_in_visual/camera_info", CameraInfo, queue_size=10 + ) # Status Publishers # - self.joint_state_pub = rospy.Publisher('joint_states', JointState, queue_size=10) + self.joint_state_pub = rospy.Publisher( + "joint_states", JointState, queue_size=10 + ) """Defining a TF publisher manually because of conflicts between Python3 and tf""" - self.tf_pub = rospy.Publisher('tf', TFMessage, queue_size=10) - self.metrics_pub = rospy.Publisher('status/metrics', Metrics, queue_size=10) - self.lease_pub = rospy.Publisher('status/leases', LeaseArray, queue_size=10) - self.odom_twist_pub = rospy.Publisher('odometry/twist', TwistWithCovarianceStamped, queue_size=10) - self.odom_pub = rospy.Publisher('odometry', Odometry, queue_size=10) - self.feet_pub = rospy.Publisher('status/feet', FootStateArray, queue_size=10) - self.estop_pub = rospy.Publisher('status/estop', EStopStateArray, queue_size=10) - self.wifi_pub = rospy.Publisher('status/wifi', WiFiState, queue_size=10) - self.power_pub = rospy.Publisher('status/power_state', PowerState, queue_size=10) - self.battery_pub = rospy.Publisher('status/battery_states', BatteryStateArray, queue_size=10) - self.behavior_faults_pub = rospy.Publisher('status/behavior_faults', BehaviorFaultState, queue_size=10) - self.system_faults_pub = rospy.Publisher('status/system_faults', SystemFaultState, queue_size=10) - self.motion_allowed_pub = rospy.Publisher('status/motion_allowed', Bool, queue_size=10) - - self.feedback_pub = rospy.Publisher('status/feedback', Feedback, queue_size=10) - - self.mobility_params_pub = rospy.Publisher('status/mobility_params', MobilityParams, queue_size=10) - - rospy.Subscriber('cmd_vel', Twist, self.cmdVelCallback, queue_size = 1) - rospy.Subscriber('go_to_pose', PoseStamped, self.trajectory_callback, queue_size = 1) - rospy.Subscriber('in_motion_or_idle_body_pose', Pose, self.in_motion_or_idle_pose_cb, queue_size = 1) + self.tf_pub = rospy.Publisher("tf", TFMessage, queue_size=10) + self.metrics_pub = rospy.Publisher("status/metrics", Metrics, queue_size=10) + self.lease_pub = rospy.Publisher("status/leases", LeaseArray, queue_size=10) + self.odom_twist_pub = rospy.Publisher( + "odometry/twist", TwistWithCovarianceStamped, queue_size=10 + ) + self.odom_pub = rospy.Publisher("odometry", Odometry, queue_size=10) + self.feet_pub = rospy.Publisher("status/feet", FootStateArray, queue_size=10) + self.estop_pub = rospy.Publisher("status/estop", EStopStateArray, queue_size=10) + self.wifi_pub = rospy.Publisher("status/wifi", WiFiState, queue_size=10) + self.power_pub = rospy.Publisher( + "status/power_state", PowerState, queue_size=10 + ) + self.battery_pub = rospy.Publisher( + "status/battery_states", BatteryStateArray, queue_size=10 + ) + self.behavior_faults_pub = rospy.Publisher( + "status/behavior_faults", BehaviorFaultState, queue_size=10 + ) + self.system_faults_pub = rospy.Publisher( + "status/system_faults", SystemFaultState, queue_size=10 + ) + self.motion_allowed_pub = rospy.Publisher( + "status/motion_allowed", Bool, queue_size=10 + ) + + self.feedback_pub = rospy.Publisher("status/feedback", Feedback, queue_size=10) + + self.mobility_params_pub = rospy.Publisher( + "status/mobility_params", MobilityParams, queue_size=10 + ) + + rospy.Subscriber("cmd_vel", Twist, self.cmdVelCallback, queue_size=1) + rospy.Subscriber( + "go_to_pose", PoseStamped, self.trajectory_callback, queue_size=1 + ) + rospy.Subscriber( + "in_motion_or_idle_body_pose", + Pose, + self.in_motion_or_idle_pose_cb, + queue_size=1, + ) rospy.Service("claim", Trigger, self.handle_claim) rospy.Service("release", Trigger, self.handle_release) rospy.Service("self_right", Trigger, self.handle_self_right) @@ -1236,7 +1543,9 @@ def main(self): rospy.Service("locomotion_mode", SetLocomotion, self.handle_locomotion_mode) rospy.Service("swing_height", SetSwingHeight, self.handle_swing_height) rospy.Service("velocity_limit", SetVelocity, self.handle_vel_limit) - rospy.Service("clear_behavior_fault", ClearBehaviorFault, self.handle_clear_behavior_fault) + rospy.Service( + "clear_behavior_fault", ClearBehaviorFault, self.handle_clear_behavior_fault + ) rospy.Service("terrain_params", SetTerrainParams, self.handle_terrain_params) rospy.Service("obstacle_params", SetObstacleParams, self.handle_obstacle_params) rospy.Service("posed_stand", PosedStand, self.handle_posed_stand) @@ -1255,31 +1564,46 @@ def main(self): rospy.Service("gripper_open", Trigger, self.handle_gripper_open) rospy.Service("gripper_close", Trigger, self.handle_gripper_close) rospy.Service("arm_carry", Trigger, self.handle_arm_carry) - rospy.Service("gripper_angle_open", GripperAngleMove, self.handle_gripper_angle_open) + rospy.Service( + "gripper_angle_open", GripperAngleMove, self.handle_gripper_angle_open + ) rospy.Service("arm_joint_move", ArmJointMovement, self.handle_arm_joint_move) - rospy.Service("force_trajectory", ArmForceTrajectory, self.handle_force_trajectory) + rospy.Service( + "force_trajectory", ArmForceTrajectory, self.handle_force_trajectory + ) rospy.Service("gripper_pose", HandPose, self.handle_hand_pose) ######################################################### - self.navigate_as = actionlib.SimpleActionServer('navigate_to', NavigateToAction, - execute_cb = self.handle_navigate_to, - auto_start = False) + self.navigate_as = actionlib.SimpleActionServer( + "navigate_to", + NavigateToAction, + execute_cb=self.handle_navigate_to, + auto_start=False, + ) self.navigate_as.start() - - self.trajectory_server = actionlib.SimpleActionServer("trajectory", TrajectoryAction, - execute_cb=self.handle_trajectory, - auto_start=False) + self.trajectory_server = actionlib.SimpleActionServer( + "trajectory", + TrajectoryAction, + execute_cb=self.handle_trajectory, + auto_start=False, + ) self.trajectory_server.start() - self.motion_or_idle_body_pose_as = actionlib.SimpleActionServer('motion_or_idle_body_pose', PoseBodyAction, - execute_cb=self.handle_in_motion_or_idle_body_pose, - auto_start=False) + self.motion_or_idle_body_pose_as = actionlib.SimpleActionServer( + "motion_or_idle_body_pose", + PoseBodyAction, + execute_cb=self.handle_in_motion_or_idle_body_pose, + auto_start=False, + ) self.motion_or_idle_body_pose_as.start() - self.body_pose_as = actionlib.SimpleActionServer('body_pose', PoseBodyAction, - execute_cb=self.handle_posed_stand_action, - auto_start=False) + self.body_pose_as = actionlib.SimpleActionServer( + "body_pose", + PoseBodyAction, + execute_cb=self.handle_posed_stand_action, + auto_start=False, + ) self.body_pose_as.start() # Stop service calls other services so initialise it after them to prevent crashes which can happen if @@ -1289,14 +1613,14 @@ def main(self): rospy.on_shutdown(self.shutdown) - max_linear_x = rospy.get_param('~max_linear_velocity_x', 0) - max_linear_y = rospy.get_param('~max_linear_velocity_y', 0) - max_angular_z = rospy.get_param('~max_angular_velocity_z', 0) + max_linear_x = rospy.get_param("~max_linear_velocity_x", 0) + max_linear_y = rospy.get_param("~max_linear_velocity_y", 0) + max_angular_z = rospy.get_param("~max_angular_velocity_z", 0) self.set_velocity_limits(max_linear_x, max_linear_y, max_angular_z) - self.auto_claim = rospy.get_param('~auto_claim', False) - self.auto_power_on = rospy.get_param('~auto_power_on', False) - self.auto_stand = rospy.get_param('~auto_stand', False) + self.auto_claim = rospy.get_param("~auto_claim", False) + self.auto_power_on = rospy.get_param("~auto_power_on", False) + self.auto_stand = rospy.get_param("~auto_stand", False) if self.auto_claim: self.spot_wrapper.claim() @@ -1305,8 +1629,12 @@ def main(self): if self.auto_stand: self.spot_wrapper.stand() - rate_limited_feedback = RateLimitedCall(self.publish_feedback, self.rates["feedback"]) - rate_limited_mobility_params = RateLimitedCall(self.publish_mobility_params, self.rates["mobility_params"]) + rate_limited_feedback = RateLimitedCall( + self.publish_feedback, self.rates["feedback"] + ) + rate_limited_mobility_params = RateLimitedCall( + self.publish_mobility_params, self.rates["mobility_params"] + ) rate_limited_motion_allowed = RateLimitedCall(self.publish_allow_motion, 10) rospy.loginfo("Driver started") while not rospy.is_shutdown(): diff --git a/spot_driver/src/spot_driver/spot_wrapper.py b/spot_driver/src/spot_driver/spot_wrapper.py index 5eda5e8f..9b8415cd 100644 --- a/spot_driver/src/spot_driver/spot_wrapper.py +++ b/spot_driver/src/spot_driver/spot_wrapper.py @@ -8,9 +8,17 @@ from bosdyn.geometry import EulerZXY from bosdyn.client.robot_state import RobotStateClient -from bosdyn.client.robot_command import RobotCommandClient, RobotCommandBuilder, blocking_stand +from bosdyn.client.robot_command import ( + RobotCommandClient, + RobotCommandBuilder, + blocking_stand, +) from bosdyn.client.graph_nav import GraphNavClient -from bosdyn.client.frame_helpers import get_odom_tform_body, ODOM_FRAME_NAME, BODY_FRAME_NAME +from bosdyn.client.frame_helpers import ( + get_odom_tform_body, + ODOM_FRAME_NAME, + BODY_FRAME_NAME, +) from bosdyn.client.power import safe_power_off, PowerClient, power_on from bosdyn.client.lease import LeaseClient, LeaseKeepAlive from bosdyn.client.image import ImageClient, build_image_request @@ -40,27 +48,45 @@ from google.protobuf.duration_pb2 import Duration from google.protobuf.timestamp_pb2 import Timestamp -front_image_sources = ['frontleft_fisheye_image', 'frontright_fisheye_image', 'frontleft_depth', 'frontright_depth'] +front_image_sources = [ + "frontleft_fisheye_image", + "frontright_fisheye_image", + "frontleft_depth", + "frontright_depth", +] """List of image sources for front image periodic query""" -side_image_sources = ['left_fisheye_image', 'right_fisheye_image', 'left_depth', 'right_depth'] +side_image_sources = [ + "left_fisheye_image", + "right_fisheye_image", + "left_depth", + "right_depth", +] """List of image sources for side image periodic query""" -rear_image_sources = ['back_fisheye_image', 'back_depth'] +rear_image_sources = ["back_fisheye_image", "back_depth"] """List of image sources for rear image periodic query""" -hand_image_sources = ['hand_image', 'hand_depth', 'hand_color_image', 'hand_depth_in_hand_color_frame'] +hand_image_sources = [ + "hand_image", + "hand_depth", + "hand_color_image", + "hand_depth_in_hand_color_frame", +] """List of image sources for hand image periodic query""" + class AsyncRobotState(AsyncPeriodicQuery): """Class to get robot state at regular intervals. get_robot_state_async query sent to the robot at every tick. Callback registered to defined callback function. - Attributes: - client: The Client to a service on the robot - logger: Logger object - rate: Rate (Hz) to trigger the query - callback: Callback function to call when the results of the query are available + Attributes: + client: The Client to a service on the robot + logger: Logger object + rate: Rate (Hz) to trigger the query + callback: Callback function to call when the results of the query are available """ + def __init__(self, client, logger, rate, callback): - super(AsyncRobotState, self).__init__("robot-state", client, logger, - period_sec=1.0/max(rate, 1.0)) + super(AsyncRobotState, self).__init__( + "robot-state", client, logger, period_sec=1.0 / max(rate, 1.0) + ) self._callback = None if rate > 0.0: self._callback = callback @@ -71,18 +97,21 @@ def _start_query(self): callback_future.add_done_callback(self._callback) return callback_future + class AsyncMetrics(AsyncPeriodicQuery): """Class to get robot metrics at regular intervals. get_robot_metrics_async query sent to the robot at every tick. Callback registered to defined callback function. - Attributes: - client: The Client to a service on the robot - logger: Logger object - rate: Rate (Hz) to trigger the query - callback: Callback function to call when the results of the query are available + Attributes: + client: The Client to a service on the robot + logger: Logger object + rate: Rate (Hz) to trigger the query + callback: Callback function to call when the results of the query are available """ + def __init__(self, client, logger, rate, callback): - super(AsyncMetrics, self).__init__("robot-metrics", client, logger, - period_sec=1.0/max(rate, 1.0)) + super(AsyncMetrics, self).__init__( + "robot-metrics", client, logger, period_sec=1.0 / max(rate, 1.0) + ) self._callback = None if rate > 0.0: self._callback = callback @@ -93,18 +122,21 @@ def _start_query(self): callback_future.add_done_callback(self._callback) return callback_future + class AsyncLease(AsyncPeriodicQuery): """Class to get lease state at regular intervals. list_leases_async query sent to the robot at every tick. Callback registered to defined callback function. - Attributes: - client: The Client to a service on the robot - logger: Logger object - rate: Rate (Hz) to trigger the query - callback: Callback function to call when the results of the query are available + Attributes: + client: The Client to a service on the robot + logger: Logger object + rate: Rate (Hz) to trigger the query + callback: Callback function to call when the results of the query are available """ + def __init__(self, client, logger, rate, callback): - super(AsyncLease, self).__init__("lease", client, logger, - period_sec=1.0/max(rate, 1.0)) + super(AsyncLease, self).__init__( + "lease", client, logger, period_sec=1.0 / max(rate, 1.0) + ) self._callback = None if rate > 0.0: self._callback = callback @@ -115,18 +147,21 @@ def _start_query(self): callback_future.add_done_callback(self._callback) return callback_future + class AsyncImageService(AsyncPeriodicQuery): """Class to get images at regular intervals. get_image_from_sources_async query sent to the robot at every tick. Callback registered to defined callback function. - Attributes: - client: The Client to a service on the robot - logger: Logger object - rate: Rate (Hz) to trigger the query - callback: Callback function to call when the results of the query are available + Attributes: + client: The Client to a service on the robot + logger: Logger object + rate: Rate (Hz) to trigger the query + callback: Callback function to call when the results of the query are available """ + def __init__(self, client, logger, rate, callback, image_requests): - super(AsyncImageService, self).__init__("robot_image_service", client, logger, - period_sec=1.0/max(rate, 1.0)) + super(AsyncImageService, self).__init__( + "robot_image_service", client, logger, period_sec=1.0 / max(rate, 1.0) + ) self._callback = None if rate > 0.0: self._callback = callback @@ -138,31 +173,38 @@ def _start_query(self): callback_future.add_done_callback(self._callback) return callback_future + class AsyncIdle(AsyncPeriodicQuery): """Class to check if the robot is moving, and if not, command a stand with the set mobility parameters - Attributes: - client: The Client to a service on the robot - logger: Logger object - rate: Rate (Hz) to trigger the query - spot_wrapper: A handle to the wrapper library + Attributes: + client: The Client to a service on the robot + logger: Logger object + rate: Rate (Hz) to trigger the query + spot_wrapper: A handle to the wrapper library """ + def __init__(self, client, logger, rate, spot_wrapper): - super(AsyncIdle, self).__init__("idle", client, logger, - period_sec=1.0/rate) + super(AsyncIdle, self).__init__("idle", client, logger, period_sec=1.0 / rate) self._spot_wrapper = spot_wrapper def _start_query(self): if self._spot_wrapper._last_stand_command != None: try: - response = self._client.robot_command_feedback(self._spot_wrapper._last_stand_command) - status = response.feedback.synchronized_feedback.mobility_command_feedback.stand_feedback.status + response = self._client.robot_command_feedback( + self._spot_wrapper._last_stand_command + ) + status = ( + response.feedback.synchronized_feedback.mobility_command_feedback.stand_feedback.status + ) self._spot_wrapper._is_sitting = False if status == basic_command_pb2.StandCommand.Feedback.STATUS_IS_STANDING: self._spot_wrapper._is_standing = True self._spot_wrapper._last_stand_command = None - elif status == basic_command_pb2.StandCommand.Feedback.STATUS_IN_PROGRESS: + elif ( + status == basic_command_pb2.StandCommand.Feedback.STATUS_IN_PROGRESS + ): self._spot_wrapper._is_standing = False else: self._logger.warn("Stand command in unknown state") @@ -174,9 +216,13 @@ def _start_query(self): if self._spot_wrapper._last_sit_command != None: try: self._spot_wrapper._is_standing = False - response = self._client.robot_command_feedback(self._spot_wrapper._last_sit_command) - if (response.feedback.synchronized_feedback.mobility_command_feedback.sit_feedback.status == - basic_command_pb2.SitCommand.Feedback.STATUS_IS_SITTING): + response = self._client.robot_command_feedback( + self._spot_wrapper._last_sit_command + ) + if ( + response.feedback.synchronized_feedback.mobility_command_feedback.sit_feedback.status + == basic_command_pb2.SitCommand.Feedback.STATUS_IS_SITTING + ): self._spot_wrapper._is_sitting = True self._spot_wrapper._last_sit_command = None else: @@ -195,26 +241,49 @@ def _start_query(self): if self._spot_wrapper._last_trajectory_command != None: try: - response = self._client.robot_command_feedback(self._spot_wrapper._last_trajectory_command) - status = response.feedback.synchronized_feedback.mobility_command_feedback.se2_trajectory_feedback.status + response = self._client.robot_command_feedback( + self._spot_wrapper._last_trajectory_command + ) + status = ( + response.feedback.synchronized_feedback.mobility_command_feedback.se2_trajectory_feedback.status + ) # STATUS_AT_GOAL always means that the robot reached the goal. If the trajectory command did not # request precise positioning, then STATUS_NEAR_GOAL also counts as reaching the goal - if status == basic_command_pb2.SE2TrajectoryCommand.Feedback.STATUS_AT_GOAL or \ - (status == basic_command_pb2.SE2TrajectoryCommand.Feedback.STATUS_NEAR_GOAL and - not self._spot_wrapper._last_trajectory_command_precise): + if ( + status + == basic_command_pb2.SE2TrajectoryCommand.Feedback.STATUS_AT_GOAL + or ( + status + == basic_command_pb2.SE2TrajectoryCommand.Feedback.STATUS_NEAR_GOAL + and not self._spot_wrapper._last_trajectory_command_precise + ) + ): self._spot_wrapper._at_goal = True # Clear the command once at the goal self._spot_wrapper._last_trajectory_command = None - elif status == basic_command_pb2.SE2TrajectoryCommand.Feedback.STATUS_GOING_TO_GOAL: + elif ( + status + == basic_command_pb2.SE2TrajectoryCommand.Feedback.STATUS_GOING_TO_GOAL + ): is_moving = True - elif status == basic_command_pb2.SE2TrajectoryCommand.Feedback.STATUS_NEAR_GOAL: + elif ( + status + == basic_command_pb2.SE2TrajectoryCommand.Feedback.STATUS_NEAR_GOAL + ): is_moving = True self._spot_wrapper._near_goal = True - elif status == basic_command_pb2.SE2TrajectoryCommand.Feedback.STATUS_UNKNOWN: + elif ( + status + == basic_command_pb2.SE2TrajectoryCommand.Feedback.STATUS_UNKNOWN + ): self._spot_wrapper._trajectory_status_unknown = True self._spot_wrapper._last_trajectory_command = None else: - self._logger.error("Received trajectory command status outside of expected range, value is {}".format(status)) + self._logger.error( + "Received trajectory command status outside of expected range, value is {}".format( + status + ) + ) self._spot_wrapper._last_trajectory_command = None except (ResponseError, RpcError) as e: self._logger.error("Error when getting robot command feedback: %s", e) @@ -222,25 +291,31 @@ def _start_query(self): self._spot_wrapper._is_moving = is_moving - if (self._spot_wrapper.is_standing and not self._spot_wrapper.is_moving - and self._spot_wrapper._last_trajectory_command is not None - and self._spot_wrapper._last_stand_command is not None - and self._spot_wrapper._last_velocity_command_time is not None - and self._spot_wrapper._last_docking_command is not None): + if ( + self._spot_wrapper.is_standing + and not self._spot_wrapper.is_moving + and self._spot_wrapper._last_trajectory_command is not None + and self._spot_wrapper._last_stand_command is not None + and self._spot_wrapper._last_velocity_command_time is not None + and self._spot_wrapper._last_docking_command is not None + ): self._spot_wrapper.stand(False) + class AsyncEStopMonitor(AsyncPeriodicQuery): """Class to check if the estop endpoint is still valid - Attributes: - client: The Client to a service on the robot - logger: Logger object - rate: Rate (Hz) to trigger the query - spot_wrapper: A handle to the wrapper library + Attributes: + client: The Client to a service on the robot + logger: Logger object + rate: Rate (Hz) to trigger the query + spot_wrapper: A handle to the wrapper library """ def __init__(self, client, logger, rate, spot_wrapper): - super(AsyncEStopMonitor, self).__init__("estop_alive", client, logger, period_sec=1.0 / rate) + super(AsyncEStopMonitor, self).__init__( + "estop_alive", client, logger, period_sec=1.0 / rate + ) self._spot_wrapper = spot_wrapper def _start_query(self): @@ -249,17 +324,38 @@ def _start_query(self): return last_estop_status = self._spot_wrapper._estop_keepalive.status_queue.queue[-1] - if last_estop_status[0] == self._spot_wrapper._estop_keepalive.KeepAliveStatus.ERROR: - self._logger.error("Estop keepalive has an error: {}".format(last_estop_status[1])) - elif last_estop_status == self._spot_wrapper._estop_keepalive.KeepAliveStatus.DISABLED: - self._logger.error("Estop keepalive is disabled: {}".format(last_estop_status[1])) + if ( + last_estop_status[0] + == self._spot_wrapper._estop_keepalive.KeepAliveStatus.ERROR + ): + self._logger.error( + "Estop keepalive has an error: {}".format(last_estop_status[1]) + ) + elif ( + last_estop_status + == self._spot_wrapper._estop_keepalive.KeepAliveStatus.DISABLED + ): + self._logger.error( + "Estop keepalive is disabled: {}".format(last_estop_status[1]) + ) else: # estop keepalive is ok pass -class SpotWrapper(): + +class SpotWrapper: """Generic wrapper class to encompass release 1.1.4 API features as well as maintaining leases automatically""" - def __init__(self, username, password, hostname, logger, estop_timeout=9.0, rates = {}, callbacks = {}): + + def __init__( + self, + username, + password, + hostname, + logger, + estop_timeout=9.0, + rates={}, + callbacks={}, + ): self._username = username self._password = password self._hostname = hostname @@ -286,22 +382,30 @@ def __init__(self, username, password, hostname, logger, estop_timeout=9.0, rate self._front_image_requests = [] for source in front_image_sources: - self._front_image_requests.append(build_image_request(source, image_format=image_pb2.Image.FORMAT_RAW)) + self._front_image_requests.append( + build_image_request(source, image_format=image_pb2.Image.FORMAT_RAW) + ) self._side_image_requests = [] for source in side_image_sources: - self._side_image_requests.append(build_image_request(source, image_format=image_pb2.Image.FORMAT_RAW)) + self._side_image_requests.append( + build_image_request(source, image_format=image_pb2.Image.FORMAT_RAW) + ) self._rear_image_requests = [] for source in rear_image_sources: - self._rear_image_requests.append(build_image_request(source, image_format=image_pb2.Image.FORMAT_RAW)) + self._rear_image_requests.append( + build_image_request(source, image_format=image_pb2.Image.FORMAT_RAW) + ) self._hand_image_requests = [] for source in hand_image_sources: - self._hand_image_requests.append(build_image_request(source, image_format=image_pb2.Image.FORMAT_RAW)) + self._hand_image_requests.append( + build_image_request(source, image_format=image_pb2.Image.FORMAT_RAW) + ) try: - self._sdk = create_standard_sdk('ros_spot') + self._sdk = create_standard_sdk("ros_spot") except Exception as e: self._logger.error("Error creating SDK object: %s", e) self._valid = False @@ -320,10 +424,12 @@ def __init__(self, username, password, hostname, logger, estop_timeout=9.0, rate authenticated = True except RpcError as err: sleep_secs = 15 - self._logger.warn("Failed to communicate with robot: {}\nEnsure the robot is powered on and you can " - "ping {}. Robot may still be booting. Will retry in {} seconds".format(err, - self._hostname, - sleep_secs)) + self._logger.warn( + "Failed to communicate with robot: {}\nEnsure the robot is powered on and you can " + "ping {}. Robot may still be booting. Will retry in {} seconds".format( + err, self._hostname, sleep_secs + ) + ) time.sleep(sleep_secs) except bosdyn.client.auth.InvalidLoginError as err: self._logger.error("Failed to log in to robot: {}".format(err)) @@ -336,52 +442,123 @@ def __init__(self, username, password, hostname, logger, estop_timeout=9.0, rate initialised = False while not initialised: try: - self._robot_state_client = self._robot.ensure_client(RobotStateClient.default_service_name) - self._robot_command_client = self._robot.ensure_client(RobotCommandClient.default_service_name) - self._graph_nav_client = self._robot.ensure_client(GraphNavClient.default_service_name) - self._power_client = self._robot.ensure_client(PowerClient.default_service_name) - self._lease_client = self._robot.ensure_client(LeaseClient.default_service_name) + self._robot_state_client = self._robot.ensure_client( + RobotStateClient.default_service_name + ) + self._robot_command_client = self._robot.ensure_client( + RobotCommandClient.default_service_name + ) + self._graph_nav_client = self._robot.ensure_client( + GraphNavClient.default_service_name + ) + self._power_client = self._robot.ensure_client( + PowerClient.default_service_name + ) + self._lease_client = self._robot.ensure_client( + LeaseClient.default_service_name + ) self._lease_wallet = self._lease_client.lease_wallet - self._image_client = self._robot.ensure_client(ImageClient.default_service_name) - self._estop_client = self._robot.ensure_client(EstopClient.default_service_name) - self._docking_client = self._robot.ensure_client(DockingClient.default_service_name) + self._image_client = self._robot.ensure_client( + ImageClient.default_service_name + ) + self._estop_client = self._robot.ensure_client( + EstopClient.default_service_name + ) + self._docking_client = self._robot.ensure_client( + DockingClient.default_service_name + ) initialised = True except Exception as e: sleep_secs = 15 - self._logger.warn("Unable to create client service: {}. This usually means the robot hasn't " - "finished booting yet. Will wait {} seconds and try again.".format(e, sleep_secs)) + self._logger.warn( + "Unable to create client service: {}. This usually means the robot hasn't " + "finished booting yet. Will wait {} seconds and try again.".format( + e, sleep_secs + ) + ) time.sleep(sleep_secs) # Store the most recent knowledge of the state of the robot based on rpc calls. self._current_graph = None - self._current_edges = dict() #maps to_waypoint to list(from_waypoint) + self._current_edges = dict() # maps to_waypoint to list(from_waypoint) self._current_waypoint_snapshots = dict() # maps id to waypoint snapshot self._current_edge_snapshots = dict() # maps id to edge snapshot self._current_annotation_name_to_wp_id = dict() # Async Tasks self._async_task_list = [] - self._robot_state_task = AsyncRobotState(self._robot_state_client, self._logger, max(0.0, self._rates.get("robot_state", 0.0)), self._callbacks.get("robot_state", lambda:None)) - self._robot_metrics_task = AsyncMetrics(self._robot_state_client, self._logger, max(0.0, self._rates.get("metrics", 0.0)), self._callbacks.get("metrics", lambda:None)) - self._lease_task = AsyncLease(self._lease_client, self._logger, max(0.0, self._rates.get("lease", 0.0)), self._callbacks.get("lease", lambda:None)) - self._front_image_task = AsyncImageService(self._image_client, self._logger, max(0.0, self._rates.get("front_image", 0.0)), self._callbacks.get("front_image", lambda:None), self._front_image_requests) - self._side_image_task = AsyncImageService(self._image_client, self._logger, max(0.0, self._rates.get("side_image", 0.0)), self._callbacks.get("side_image", lambda:None), self._side_image_requests) - self._rear_image_task = AsyncImageService(self._image_client, self._logger, max(0.0, self._rates.get("rear_image", 0.0)), self._callbacks.get("rear_image", lambda:None), self._rear_image_requests) - self._hand_image_task = AsyncImageService(self._image_client, self._logger, max(0.0, self._rates.get("hand_image", 0.0)), self._callbacks.get("hand_image", lambda:None), self._hand_image_requests) - self._idle_task = AsyncIdle(self._robot_command_client, self._logger, 10.0, self) - self._estop_monitor = AsyncEStopMonitor(self._estop_client, self._logger, 20.0, self) + self._robot_state_task = AsyncRobotState( + self._robot_state_client, + self._logger, + max(0.0, self._rates.get("robot_state", 0.0)), + self._callbacks.get("robot_state", lambda: None), + ) + self._robot_metrics_task = AsyncMetrics( + self._robot_state_client, + self._logger, + max(0.0, self._rates.get("metrics", 0.0)), + self._callbacks.get("metrics", lambda: None), + ) + self._lease_task = AsyncLease( + self._lease_client, + self._logger, + max(0.0, self._rates.get("lease", 0.0)), + self._callbacks.get("lease", lambda: None), + ) + self._front_image_task = AsyncImageService( + self._image_client, + self._logger, + max(0.0, self._rates.get("front_image", 0.0)), + self._callbacks.get("front_image", lambda: None), + self._front_image_requests, + ) + self._side_image_task = AsyncImageService( + self._image_client, + self._logger, + max(0.0, self._rates.get("side_image", 0.0)), + self._callbacks.get("side_image", lambda: None), + self._side_image_requests, + ) + self._rear_image_task = AsyncImageService( + self._image_client, + self._logger, + max(0.0, self._rates.get("rear_image", 0.0)), + self._callbacks.get("rear_image", lambda: None), + self._rear_image_requests, + ) + self._hand_image_task = AsyncImageService( + self._image_client, + self._logger, + max(0.0, self._rates.get("hand_image", 0.0)), + self._callbacks.get("hand_image", lambda: None), + self._hand_image_requests, + ) + self._idle_task = AsyncIdle( + self._robot_command_client, self._logger, 10.0, self + ) + self._estop_monitor = AsyncEStopMonitor( + self._estop_client, self._logger, 20.0, self + ) self._estop_endpoint = None self._estop_keepalive = None - self._async_tasks = AsyncTasks([self._robot_state_task, - self._robot_metrics_task, self._lease_task, - self._front_image_task, self._side_image_task, - self._rear_image_task, self._idle_task, self._estop_monitor]) - + self._async_tasks = AsyncTasks( + [ + self._robot_state_task, + self._robot_metrics_task, + self._lease_task, + self._front_image_task, + self._side_image_task, + self._rear_image_task, + self._idle_task, + self._estop_monitor, + ] + ) + if self._robot.has_arm(): self._async_tasks.add_task(self._hand_image_task) - + self._robot_id = None self._lease = None @@ -429,7 +606,7 @@ def side_images(self): def rear_images(self): """Return latest proto from the _rear_image_task""" return self._rear_image_task.proto - + @property def hand_images(self): """Return latest proto from the _hand_image_task""" @@ -514,7 +691,9 @@ def updateTasks(self): def resetEStop(self): """Get keepalive for eStop""" - self._estop_endpoint = EstopEndpoint(self._estop_client, 'ros', self._estop_timeout) + self._estop_endpoint = EstopEndpoint( + self._estop_client, "ros", self._estop_timeout + ) self._estop_endpoint.force_simple_setup() # Set this endpoint as the robot's sole estop. self._estop_keepalive = EstopKeepAlive(self._estop_endpoint) @@ -542,7 +721,6 @@ def disengageEStop(self): except: return False, "Error" - def releaseEStop(self): """Stop eStop keepalive""" if self._estop_keepalive: @@ -586,7 +764,12 @@ def _robot_command(self, command_proto, end_time_secs=None, timesync_endpoint=No timesync_endpoint: (optional) Time sync endpoint """ try: - id = self._robot_command_client.robot_command(lease=None, command=command_proto, end_time_secs=end_time_secs, timesync_endpoint=timesync_endpoint) + id = self._robot_command_client.robot_command( + lease=None, + command=command_proto, + end_time_secs=end_time_secs, + timesync_endpoint=timesync_endpoint, + ) return True, "Success", id except Exception as e: return False, str(e), None @@ -607,7 +790,9 @@ def sit(self): self._last_sit_command = response[2] return response[0], response[1] - def stand(self, monitor_command=True, body_height=0, body_yaw=0, body_pitch=0, body_roll=0): + def stand( + self, monitor_command=True, body_height=0, body_yaw=0, body_pitch=0, body_roll=0 + ): """ If the e-stop is enabled, and the motor power is enabled, stand the robot up. Executes a stand command, but one where the robot will assume the pose specified by the given parameters. @@ -625,11 +810,16 @@ def stand(self, monitor_command=True, body_height=0, body_yaw=0, body_pitch=0, b if any([body_height, body_yaw, body_pitch, body_roll]): # If any of the orientation parameters are nonzero use them to pose the body body_orientation = EulerZXY(yaw=body_yaw, pitch=body_pitch, roll=body_roll) - response = self._robot_command(RobotCommandBuilder.synchro_stand_command(body_height=body_height, - footprint_R_body=body_orientation)) + response = self._robot_command( + RobotCommandBuilder.synchro_stand_command( + body_height=body_height, footprint_R_body=body_orientation + ) + ) else: # Otherwise just use the mobility params - response = self._robot_command(RobotCommandBuilder.synchro_stand_command(params=self._mobility_params)) + response = self._robot_command( + RobotCommandBuilder.synchro_stand_command(params=self._mobility_params) + ) if monitor_command: self._last_stand_command = response[2] @@ -643,7 +833,9 @@ def safe_power_off(self): def clear_behavior_fault(self, id): """Clear the behavior fault defined by id.""" try: - rid = self._robot_command_client.clear_behavior_fault(behavior_fault_id=id, lease=None) + rid = self._robot_command_client.clear_behavior_fault( + behavior_fault_id=id, lease=None + ) return True, "Success", rid except Exception as e: return False, str(e), None @@ -665,8 +857,7 @@ def set_mobility_params(self, mobility_params): self._mobility_params = mobility_params def get_mobility_params(self): - """Get mobility params - """ + """Get mobility params""" return self._mobility_params def velocity_cmd(self, v_x, v_y, v_rot, cmd_duration=0.125): @@ -678,14 +869,26 @@ def velocity_cmd(self, v_x, v_y, v_rot, cmd_duration=0.125): v_rot: Angular velocity around the Z axis in radians cmd_duration: (optional) Time-to-live for the command in seconds. Default is 125ms (assuming 10Hz command rate). """ - end_time=time.time() + cmd_duration - response = self._robot_command(RobotCommandBuilder.synchro_velocity_command( - v_x=v_x, v_y=v_y, v_rot=v_rot, params=self._mobility_params), - end_time_secs=end_time, timesync_endpoint=self._robot.time_sync.endpoint) + end_time = time.time() + cmd_duration + response = self._robot_command( + RobotCommandBuilder.synchro_velocity_command( + v_x=v_x, v_y=v_y, v_rot=v_rot, params=self._mobility_params + ), + end_time_secs=end_time, + timesync_endpoint=self._robot.time_sync.endpoint, + ) self._last_velocity_command_time = end_time return response[0], response[1] - def trajectory_cmd(self, goal_x, goal_y, goal_heading, cmd_duration, frame_name='odom', precise_position=False): + def trajectory_cmd( + self, + goal_x, + goal_y, + goal_heading, + cmd_duration, + frame_name="odom", + precise_position=False, + ): """Send a trajectory motion command to the robot. Args: @@ -705,37 +908,45 @@ def trajectory_cmd(self, goal_x, goal_y, goal_heading, cmd_duration, frame_name= self._trajectory_status_unknown = False self._last_trajectory_command_precise = precise_position self._logger.info("got command duration of {}".format(cmd_duration)) - end_time=time.time() + cmd_duration - if frame_name == 'vision': + end_time = time.time() + cmd_duration + if frame_name == "vision": vision_tform_body = frame_helpers.get_vision_tform_body( - self._robot_state_client.get_robot_state().kinematic_state.transforms_snapshot) - body_tform_goal = math_helpers.SE3Pose(x=goal_x, y=goal_y, z=0, rot=math_helpers.Quat.from_yaw(goal_heading)) + self._robot_state_client.get_robot_state().kinematic_state.transforms_snapshot + ) + body_tform_goal = math_helpers.SE3Pose( + x=goal_x, y=goal_y, z=0, rot=math_helpers.Quat.from_yaw(goal_heading) + ) vision_tform_goal = vision_tform_body * body_tform_goal response = self._robot_command( - RobotCommandBuilder.synchro_se2_trajectory_point_command( - goal_x=vision_tform_goal.x, - goal_y=vision_tform_goal.y, - goal_heading=vision_tform_goal.rot.to_yaw(), - frame_name=frame_helpers.VISION_FRAME_NAME, - params=self._mobility_params), - end_time_secs=end_time - ) - elif frame_name == 'odom': + RobotCommandBuilder.synchro_se2_trajectory_point_command( + goal_x=vision_tform_goal.x, + goal_y=vision_tform_goal.y, + goal_heading=vision_tform_goal.rot.to_yaw(), + frame_name=frame_helpers.VISION_FRAME_NAME, + params=self._mobility_params, + ), + end_time_secs=end_time, + ) + elif frame_name == "odom": odom_tform_body = frame_helpers.get_odom_tform_body( - self._robot_state_client.get_robot_state().kinematic_state.transforms_snapshot) - body_tform_goal = math_helpers.SE3Pose(x=goal_x, y=goal_y, z=0, rot=math_helpers.Quat.from_yaw(goal_heading)) + self._robot_state_client.get_robot_state().kinematic_state.transforms_snapshot + ) + body_tform_goal = math_helpers.SE3Pose( + x=goal_x, y=goal_y, z=0, rot=math_helpers.Quat.from_yaw(goal_heading) + ) odom_tform_goal = odom_tform_body * body_tform_goal response = self._robot_command( - RobotCommandBuilder.synchro_se2_trajectory_point_command( - goal_x=odom_tform_goal.x, - goal_y=odom_tform_goal.y, - goal_heading=odom_tform_goal.rot.to_yaw(), - frame_name=frame_helpers.ODOM_FRAME_NAME, - params=self._mobility_params), - end_time_secs=end_time - ) + RobotCommandBuilder.synchro_se2_trajectory_point_command( + goal_x=odom_tform_goal.x, + goal_y=odom_tform_goal.y, + goal_heading=odom_tform_goal.rot.to_yaw(), + frame_name=frame_helpers.ODOM_FRAME_NAME, + params=self._mobility_params, + ), + end_time_secs=end_time, + ) else: - raise ValueError('frame_name must be \'vision\' or \'odom\'') + raise ValueError("frame_name must be 'vision' or 'odom'") if response[0]: self._last_trajectory_command = response[2] return response[0], response[1] @@ -747,18 +958,28 @@ def list_graph(self, upload_path): """ ids, eds = self._list_graph_waypoint_and_edge_ids() # skip waypoint_ for v2.2.1, skip waypiont for < v2.2 - return [v for k, v in sorted(ids.items(), key=lambda id : int(id[0].replace('waypoint_','')))] + return [ + v + for k, v in sorted( + ids.items(), key=lambda id: int(id[0].replace("waypoint_", "")) + ) + ] def battery_change_pose(self, dir_hint=1): """Robot sit down and roll on to it its side for easier battery access""" - response = self._robot_command(RobotCommandBuilder.battery_change_pose_command(dir_hint)) + response = self._robot_command( + RobotCommandBuilder.battery_change_pose_command(dir_hint) + ) return response[0], response[1] - def navigate_to(self, upload_path, - navigate_to, - initial_localization_fiducial=True, - initial_localization_waypoint=None): - """ navigate with graph nav. + def navigate_to( + self, + upload_path, + navigate_to, + initial_localization_fiducial=True, + initial_localization_waypoint=None, + ): + """navigate with graph nav. Args: upload_path : Path to the root directory of the map. @@ -774,7 +995,7 @@ def navigate_to(self, upload_path, # Boolean indicating the robot's power state. power_state = self._robot_state_client.get_robot_state().power_state - self._started_powered_on = (power_state.motor_power_state == power_state.STATE_ON) + self._started_powered_on = power_state.motor_power_state == power_state.STATE_ON self._powered_on = self._started_powered_on # FIX ME somehow,,,, if the robot is stand, need to sit the robot before starting garph nav @@ -798,17 +1019,22 @@ def navigate_to(self, upload_path, def ensure_arm_power_and_stand(self): if not self._robot.has_arm(): return False, "Spot with an arm is required for this service" - + try: self._logger.info("Spot is powering on within the timeout of 20 secs") self._robot.power_on(timeout_sec=20) assert self._robot.is_powered_on(), "Spot failed to power on" self._logger.info("Spot is powered on") except Exception as e: - return False, "Exception occured while Spot or its arm were trying to power on" + return ( + False, + "Exception occured while Spot or its arm were trying to power on", + ) if not self._is_standing: - robot_command.blocking_stand(command_client=self._robot_command_client, timeout_sec=10.0) + robot_command.blocking_stand( + command_client=self._robot_command_client, timeout_sec=10.0 + ) self._logger.info("Spot is standing") else: self._logger.info("Spot is already standing") @@ -841,7 +1067,7 @@ def arm_unstow(self): if not success: self._logger.info(msg) return False, msg - else: + else: # Unstow Arm unstow = RobotCommandBuilder.arm_ready_command() @@ -854,7 +1080,7 @@ def arm_unstow(self): return False, "Exception occured while trying to unstow" return True, "Unstow arm success" - + def arm_carry(self): try: success, msg = self.ensure_arm_power_and_stand() @@ -875,15 +1101,22 @@ def arm_carry(self): return True, "Carry mode success" - def make_arm_trajectory_command(self, arm_joint_trajectory): - """ Helper function to create a RobotCommand from an ArmJointTrajectory. - Copy from 'spot-sdk/python/examples/arm_joint_move/arm_joint_move.py' """ - - joint_move_command = arm_command_pb2.ArmJointMoveCommand.Request(trajectory=arm_joint_trajectory) - arm_command = arm_command_pb2.ArmCommand.Request(arm_joint_move_command=joint_move_command) - sync_arm = synchronized_command_pb2.SynchronizedCommand.Request(arm_command=arm_command) - arm_sync_robot_cmd = robot_command_pb2.RobotCommand(synchronized_command=sync_arm) + """Helper function to create a RobotCommand from an ArmJointTrajectory. + Copy from 'spot-sdk/python/examples/arm_joint_move/arm_joint_move.py'""" + + joint_move_command = arm_command_pb2.ArmJointMoveCommand.Request( + trajectory=arm_joint_trajectory + ) + arm_command = arm_command_pb2.ArmCommand.Request( + arm_joint_move_command=joint_move_command + ) + sync_arm = synchronized_command_pb2.SynchronizedCommand.Request( + arm_command=arm_command + ) + arm_sync_robot_cmd = robot_command_pb2.RobotCommand( + synchronized_command=sync_arm + ) return RobotCommandBuilder.build_synchro_command(arm_sync_robot_cmd) def arm_joint_move(self, joint_targets): @@ -894,60 +1127,75 @@ def arm_joint_move(self, joint_targets): # RANGE: 0.4 -> -3.13 ( # Joint3: 0.0 arm straight. moves the arm down # RANGE: 0.0 -> 3.1415 - # Joint4: 0.0 middle position. negative: moves ccw, positive moves cw - # RANGE: -2.79253 -> 2.79253 - # # Joint5: 0.0 gripper points to the front. positive moves the gripper down + # Joint4: 0.0 middle position. negative: moves ccw, positive moves cw + # RANGE: -2.79253 -> 2.79253 + # # Joint5: 0.0 gripper points to the front. positive moves the gripper down # RANGE: -1.8326 -> 1.8326 - # Joint6: 0.0 Gripper is not rolled, positive is ccw + # Joint6: 0.0 Gripper is not rolled, positive is ccw # RANGE: -2.87 -> 2.87 # Values after unstow are: [0.0, -0.9, 1.8, 0.0, -0.9, 0.0] if abs(joint_targets[0]) > 3.14: - msg = "Joint 1 has to be between -3.14 and 3.14" - self._logger.warn(msg) - return False, msg - elif joint_targets[1] > 0.4 or joint_targets[1] < -3.13: - msg = "Joint 2 has to be between -3.13 and 0.4" - self._logger.warn(msg) - return False, msg - elif joint_targets[2] > 3.14 or joint_targets[2] < 0.0: - msg = "Joint 3 has to be between 0.0 and 3.14" - self._logger.warn(msg) - return False, msg - elif abs(joint_targets[3]) > 2.79253: - msg = "Joint 4 has to be between -2.79253 and 2.79253" - self._logger.warn(msg) - return False, msg - elif abs(joint_targets[4]) > 1.8326: - msg = "Joint 5 has to be between -1.8326 and 1.8326" - self._logger.warn(msg) - return False, msg - elif abs(joint_targets[5]) > 2.87: - msg = "Joint 6 has to be between -2.87 and 2.87" - self._logger.warn(msg) - return False, msg + msg = "Joint 1 has to be between -3.14 and 3.14" + self._logger.warn(msg) + return False, msg + elif joint_targets[1] > 0.4 or joint_targets[1] < -3.13: + msg = "Joint 2 has to be between -3.13 and 0.4" + self._logger.warn(msg) + return False, msg + elif joint_targets[2] > 3.14 or joint_targets[2] < 0.0: + msg = "Joint 3 has to be between 0.0 and 3.14" + self._logger.warn(msg) + return False, msg + elif abs(joint_targets[3]) > 2.79253: + msg = "Joint 4 has to be between -2.79253 and 2.79253" + self._logger.warn(msg) + return False, msg + elif abs(joint_targets[4]) > 1.8326: + msg = "Joint 5 has to be between -1.8326 and 1.8326" + self._logger.warn(msg) + return False, msg + elif abs(joint_targets[5]) > 2.87: + msg = "Joint 6 has to be between -2.87 and 2.87" + self._logger.warn(msg) + return False, msg try: success, msg = self.ensure_arm_power_and_stand() if not success: self._logger.info(msg) return False, msg else: - trajectory_point = RobotCommandBuilder.create_arm_joint_trajectory_point( - joint_targets[0], joint_targets[1], joint_targets[2], - joint_targets[3], joint_targets[4], joint_targets[5]) - arm_joint_trajectory = arm_command_pb2.ArmJointTrajectory(points = [trajectory_point]) + trajectory_point = ( + RobotCommandBuilder.create_arm_joint_trajectory_point( + joint_targets[0], + joint_targets[1], + joint_targets[2], + joint_targets[3], + joint_targets[4], + joint_targets[5], + ) + ) + arm_joint_trajectory = arm_command_pb2.ArmJointTrajectory( + points=[trajectory_point] + ) arm_command = self.make_arm_trajectory_command(arm_joint_trajectory) # Send the request cmd_id = self._robot_command_client.robot_command(arm_command) # Query for feedback to determine how long it will take - feedback_resp = self._robot_command_client.robot_command_feedback(cmd_id) - joint_move_feedback = feedback_resp.feedback.synchronized_feedback.arm_command_feedback.arm_joint_move_feedback - time_to_goal : Duration = joint_move_feedback.time_to_goal - time_to_goal_in_seconds: float = time_to_goal.seconds + (float(time_to_goal.nanos) / float(10**9)) + feedback_resp = self._robot_command_client.robot_command_feedback( + cmd_id + ) + joint_move_feedback = ( + feedback_resp.feedback.synchronized_feedback.arm_command_feedback.arm_joint_move_feedback + ) + time_to_goal: Duration = joint_move_feedback.time_to_goal + time_to_goal_in_seconds: float = time_to_goal.seconds + ( + float(time_to_goal.nanos) / float(10**9) + ) time.sleep(time_to_goal_in_seconds) return True, "Spot Arm moved successfully" - + except Exception as e: return False, "Exception occured during arm movement: " + str(e) @@ -971,51 +1219,58 @@ def force_trajectory(self, data): # 10 seconds def create_wrench_from_msg(forces, torques): - force = geometry_pb2.Vec3(x = forces[0], - y = forces[1], z = forces[2]) - torque = geometry_pb2.Vec3(x = torques[0], - y = torques[1], z = torques[2]) + force = geometry_pb2.Vec3(x=forces[0], y=forces[1], z=forces[2]) + torque = geometry_pb2.Vec3(x=torques[0], y=torques[1], z=torques[2]) return geometry_pb2.Wrench(force=force, torque=torque) # Duration in seconds. traj_duration = 5 # first point on trajectory - wrench0 = create_wrench_from_msg( - data.forces_pt0, data.torques_pt0) + wrench0 = create_wrench_from_msg(data.forces_pt0, data.torques_pt0) t0 = seconds_to_duration(0) traj_point0 = trajectory_pb2.WrenchTrajectoryPoint( - wrench=wrench0, time_since_reference=t0) - + wrench=wrench0, time_since_reference=t0 + ) + # Second point on the trajectory - wrench1 = create_wrench_from_msg( - data.forces_pt1, data.torques_pt1) + wrench1 = create_wrench_from_msg(data.forces_pt1, data.torques_pt1) t1 = seconds_to_duration(traj_duration) traj_point1 = trajectory_pb2.WrenchTrajectoryPoint( - wrench=wrench1, time_since_reference=t1) + wrench=wrench1, time_since_reference=t1 + ) # Build the trajectory - trajectory = trajectory_pb2.WrenchTrajectory(points=[traj_point0, traj_point1]) - + trajectory = trajectory_pb2.WrenchTrajectory( + points=[traj_point0, traj_point1] + ) + # Build the trajectory request, putting all axes into force mode arm_cartesian_command = arm_command_pb2.ArmCartesianCommand.Request( - root_frame_name=ODOM_FRAME_NAME, wrench_trajectory_in_task=trajectory, + root_frame_name=ODOM_FRAME_NAME, + wrench_trajectory_in_task=trajectory, x_axis=arm_command_pb2.ArmCartesianCommand.Request.AXIS_MODE_FORCE, y_axis=arm_command_pb2.ArmCartesianCommand.Request.AXIS_MODE_FORCE, z_axis=arm_command_pb2.ArmCartesianCommand.Request.AXIS_MODE_FORCE, rx_axis=arm_command_pb2.ArmCartesianCommand.Request.AXIS_MODE_FORCE, ry_axis=arm_command_pb2.ArmCartesianCommand.Request.AXIS_MODE_FORCE, - rz_axis=arm_command_pb2.ArmCartesianCommand.Request.AXIS_MODE_FORCE) + rz_axis=arm_command_pb2.ArmCartesianCommand.Request.AXIS_MODE_FORCE, + ) arm_command = arm_command_pb2.ArmCommand.Request( - arm_cartesian_command=arm_cartesian_command) - synchronized_command = synchronized_command_pb2.SynchronizedCommand.Request( - arm_command=arm_command) + arm_cartesian_command=arm_cartesian_command + ) + synchronized_command = ( + synchronized_command_pb2.SynchronizedCommand.Request( + arm_command=arm_command + ) + ) robot_command = robot_command_pb2.RobotCommand( - synchronized_command=synchronized_command) + synchronized_command=synchronized_command + ) # Send the request self._robot_command_client.robot_command(robot_command) - self._logger.info('Force trajectory command sent') + self._logger.info("Force trajectory command sent") time.sleep(10.0) @@ -1023,7 +1278,7 @@ def create_wrench_from_msg(forces, torques): return False, "Exception occured during arm movement" return True, "Moved arm successfully" - + def gripper_open(self): try: success, msg = self.ensure_arm_power_and_stand() @@ -1058,12 +1313,12 @@ def gripper_close(self): self._robot_command_client.robot_command(command) self._logger.info("Command gripper close sent") time.sleep(2.0) - + except Exception as e: return False, "Exception occured while gripper was moving" return True, "Closed gripper successfully" - + def gripper_angle_open(self, gripper_ang): # takes an angle between 0 (closed) and 90 (fully opened) and opens the # gripper at this angle @@ -1075,7 +1330,7 @@ def gripper_angle_open(self, gripper_ang): self._logger.info(msg) return False, msg else: - # The open angle command does not take degrees but the limits + # The open angle command does not take degrees but the limits # defined in the urdf, that is why we have to interpolate closed = 0.349066 opened = -1.396263 @@ -1091,7 +1346,7 @@ def gripper_angle_open(self, gripper_ang): return False, "Exception occured while gripper was moving" return True, "Opened gripper successfully" - + def hand_pose(self, pose_points): try: success, msg = self.ensure_arm_power_and_stand() @@ -1101,38 +1356,57 @@ def hand_pose(self, pose_points): else: # Move the arm to a spot in front of the robot given a pose for the gripper. # Build a position to move the arm to (in meters, relative to the body frame origin.) - position = geometry_pb2.Vec3(x=pose_points.position.x, - y=pose_points.position.y, z=pose_points.position.z) + position = geometry_pb2.Vec3( + x=pose_points.position.x, + y=pose_points.position.y, + z=pose_points.position.z, + ) # # Rotation as a quaternion. - rotation = geometry_pb2.Quaternion(w=pose_points.orientation.w, - x=pose_points.orientation.x, - y=pose_points.orientation.y, - z=pose_points.orientation.z) + rotation = geometry_pb2.Quaternion( + w=pose_points.orientation.w, + x=pose_points.orientation.x, + y=pose_points.orientation.y, + z=pose_points.orientation.z, + ) seconds = 5.0 duration = seconds_to_duration(seconds) - + # Build the SE(3) pose of the desired hand position in the moving body frame. hand_pose = geometry_pb2.SE3Pose(position=position, rotation=rotation) - hand_pose_traj_point = trajectory_pb2.SE3TrajectoryPoint(pose=hand_pose, time_since_reference=duration) - hand_trajectory = trajectory_pb2.SE3Trajectory(points=[hand_pose_traj_point]) - + hand_pose_traj_point = trajectory_pb2.SE3TrajectoryPoint( + pose=hand_pose, time_since_reference=duration + ) + hand_trajectory = trajectory_pb2.SE3Trajectory( + points=[hand_pose_traj_point] + ) + arm_cartesian_command = arm_command_pb2.ArmCartesianCommand.Request( - root_frame_name=BODY_FRAME_NAME, pose_trajectory_in_task=hand_trajectory) + root_frame_name=BODY_FRAME_NAME, + pose_trajectory_in_task=hand_trajectory, + ) arm_command = arm_command_pb2.ArmCommand.Request( - arm_cartesian_command=arm_cartesian_command) - synchronized_command = synchronized_command_pb2.SynchronizedCommand.Request( - arm_command=arm_command) - - #robot_command = self._robot_command(RobotCommandBuilder.build_synchro_command(synchronized_command)) - robot_command = robot_command_pb2.RobotCommand(synchronized_command=synchronized_command) + arm_cartesian_command=arm_cartesian_command + ) + synchronized_command = ( + synchronized_command_pb2.SynchronizedCommand.Request( + arm_command=arm_command + ) + ) + + # robot_command = self._robot_command(RobotCommandBuilder.build_synchro_command(synchronized_command)) + robot_command = robot_command_pb2.RobotCommand( + synchronized_command=synchronized_command + ) - command = self._robot_command(RobotCommandBuilder.build_synchro_command(robot_command)) + command = self._robot_command( + RobotCommandBuilder.build_synchro_command(robot_command) + ) # Send the request self._robot_command_client.robot_command(command) - self._logger.info('Moving arm to position.') + self._logger.info("Moving arm to position.") time.sleep(6.0) @@ -1140,28 +1414,34 @@ def hand_pose(self, pose_points): return False, "An error occured while trying to move arm" return True, "Moved arm successfully" - - + ################################################################### ## copy from spot-sdk/python/examples/graph_nav_command_line/graph_nav_command_line.py def _get_localization_state(self, *args): """Get the current localization and state of the robot.""" state = self._graph_nav_client.get_localization_state() - self._logger.info('Got localization: \n%s' % str(state.localization)) - odom_tform_body = get_odom_tform_body(state.robot_kinematics.transforms_snapshot) - self._logger.info('Got robot state in kinematic odometry frame: \n%s' % str(odom_tform_body)) + self._logger.info("Got localization: \n%s" % str(state.localization)) + odom_tform_body = get_odom_tform_body( + state.robot_kinematics.transforms_snapshot + ) + self._logger.info( + "Got robot state in kinematic odometry frame: \n%s" % str(odom_tform_body) + ) def _set_initial_localization_fiducial(self, *args): """Trigger localization when near a fiducial.""" robot_state = self._robot_state_client.get_robot_state() current_odom_tform_body = get_odom_tform_body( - robot_state.kinematic_state.transforms_snapshot).to_proto() + robot_state.kinematic_state.transforms_snapshot + ).to_proto() # Create an empty instance for initial localization since we are asking it to localize # based on the nearest fiducial. localization = nav_pb2.Localization() - self._graph_nav_client.set_localization(initial_guess_localization=localization, - ko_tform_body=current_odom_tform_body) + self._graph_nav_client.set_localization( + initial_guess_localization=localization, + ko_tform_body=current_odom_tform_body, + ) def _set_initial_localization_waypoint(self, *args): """Trigger localization to a waypoint.""" @@ -1171,14 +1451,19 @@ def _set_initial_localization_waypoint(self, *args): self._logger.error("No waypoint specified to initialize to.") return destination_waypoint = graph_nav_util.find_unique_waypoint_id( - args[0][0], self._current_graph, self._current_annotation_name_to_wp_id, self._logger) + args[0][0], + self._current_graph, + self._current_annotation_name_to_wp_id, + self._logger, + ) if not destination_waypoint: # Failed to find the unique waypoint id. return robot_state = self._robot_state_client.get_robot_state() current_odom_tform_body = get_odom_tform_body( - robot_state.kinematic_state.transforms_snapshot).to_proto() + robot_state.kinematic_state.transforms_snapshot + ).to_proto() # Create an initial localization to the specified waypoint as the identity. localization = nav_pb2.Localization() localization.waypoint_id = destination_waypoint @@ -1186,10 +1471,11 @@ def _set_initial_localization_waypoint(self, *args): self._graph_nav_client.set_localization( initial_guess_localization=localization, # It's hard to get the pose perfect, search +/-20 deg and +/-20cm (0.2m). - max_distance = 0.2, - max_yaw = 20.0 * math.pi / 180.0, + max_distance=0.2, + max_yaw=20.0 * math.pi / 180.0, fiducial_init=graph_nav_pb2.SetLocalizationRequest.FIDUCIAL_INIT_NO_FIDUCIAL, - ko_tform_body=current_odom_tform_body) + ko_tform_body=current_odom_tform_body, + ) def _list_graph_waypoint_and_edge_ids(self, *args): """List the waypoint ids and edge ids of the graph currently on the robot.""" @@ -1201,14 +1487,19 @@ def _list_graph_waypoint_and_edge_ids(self, *args): return self._current_graph = graph - localization_id = self._graph_nav_client.get_localization_state().localization.waypoint_id + localization_id = ( + self._graph_nav_client.get_localization_state().localization.waypoint_id + ) # Update and print waypoints and edges - self._current_annotation_name_to_wp_id, self._current_edges = graph_nav_util.update_waypoints_and_edges( - graph, localization_id, self._logger) + ( + self._current_annotation_name_to_wp_id, + self._current_edges, + ) = graph_nav_util.update_waypoints_and_edges( + graph, localization_id, self._logger + ) return self._current_annotation_name_to_wp_id, self._current_edges - def _upload_graph_and_snapshots(self, upload_filepath): """Upload the graph and snapshots to the robot.""" self._logger.info("Loading the graph from disk into local storage...") @@ -1217,26 +1508,35 @@ def _upload_graph_and_snapshots(self, upload_filepath): data = graph_file.read() self._current_graph = map_pb2.Graph() self._current_graph.ParseFromString(data) - self._logger.info("Loaded graph has {} waypoints and {} edges".format( - len(self._current_graph.waypoints), len(self._current_graph.edges))) + self._logger.info( + "Loaded graph has {} waypoints and {} edges".format( + len(self._current_graph.waypoints), len(self._current_graph.edges) + ) + ) for waypoint in self._current_graph.waypoints: # Load the waypoint snapshots from disk. - with open(upload_filepath + "/waypoint_snapshots/{}".format(waypoint.snapshot_id), - "rb") as snapshot_file: + with open( + upload_filepath + "/waypoint_snapshots/{}".format(waypoint.snapshot_id), + "rb", + ) as snapshot_file: waypoint_snapshot = map_pb2.WaypointSnapshot() waypoint_snapshot.ParseFromString(snapshot_file.read()) - self._current_waypoint_snapshots[waypoint_snapshot.id] = waypoint_snapshot + self._current_waypoint_snapshots[ + waypoint_snapshot.id + ] = waypoint_snapshot for edge in self._current_graph.edges: # Load the edge snapshots from disk. - with open(upload_filepath + "/edge_snapshots/{}".format(edge.snapshot_id), - "rb") as snapshot_file: + with open( + upload_filepath + "/edge_snapshots/{}".format(edge.snapshot_id), "rb" + ) as snapshot_file: edge_snapshot = map_pb2.EdgeSnapshot() edge_snapshot.ParseFromString(snapshot_file.read()) self._current_edge_snapshots[edge_snapshot.id] = edge_snapshot # Upload the graph to the robot. self._logger.info("Uploading the graph and snapshots to the robot...") - self._graph_nav_client.upload_graph(lease=self._lease.lease_proto, - graph=self._current_graph) + self._graph_nav_client.upload_graph( + lease=self._lease.lease_proto, graph=self._current_graph + ) # Upload the snapshots to the robot. for waypoint_snapshot in self._current_waypoint_snapshots.values(): self._graph_nav_client.upload_waypoint_snapshot(waypoint_snapshot) @@ -1252,8 +1552,9 @@ def _upload_graph_and_snapshots(self, upload_filepath): if not localization_state.localization.waypoint_id: # The robot is not localized to the newly uploaded graph. self._logger.info( - "Upload complete! The robot is currently not localized to the map; please localize", \ - "the robot using commands (2) or (3) before attempting a navigation command.") + "Upload complete! The robot is currently not localized to the map; please localize", + "the robot using commands (2) or (3) before attempting a navigation command.", + ) def _navigate_to(self, *args): """Navigate to a specific waypoint.""" @@ -1265,12 +1566,18 @@ def _navigate_to(self, *args): self._lease = self._lease_wallet.get_lease() destination_waypoint = graph_nav_util.find_unique_waypoint_id( - args[0][0], self._current_graph, self._current_annotation_name_to_wp_id, self._logger) + args[0][0], + self._current_graph, + self._current_annotation_name_to_wp_id, + self._logger, + ) if not destination_waypoint: # Failed to find the appropriate unique waypoint id for the navigation command. return if not self.toggle_power(should_power_on=True): - self._logger.info("Failed to power on the robot, and cannot complete navigate to request.") + self._logger.info( + "Failed to power on the robot, and cannot complete navigate to request." + ) return # Stop the lease keepalive and create a new sublease for graph nav. @@ -1284,9 +1591,10 @@ def _navigate_to(self, *args): while not is_finished: # Issue the navigation command about twice a second such that it is easy to terminate the # navigation command (with estop or killing the program). - nav_to_cmd_id = self._graph_nav_client.navigate_to(destination_waypoint, 1.0, - leases=[sublease.lease_proto]) - time.sleep(.5) # Sleep for half a second to allow for command execution. + nav_to_cmd_id = self._graph_nav_client.navigate_to( + destination_waypoint, 1.0, leases=[sublease.lease_proto] + ) + time.sleep(0.5) # Sleep for half a second to allow for command execution. # Poll the robot for feedback to determine if the navigation command is complete. Then sit # the robot down once it is finished. is_finished = self._check_success(nav_to_cmd_id) @@ -1300,13 +1608,25 @@ def _navigate_to(self, *args): self.toggle_power(should_power_on=False) status = self._graph_nav_client.navigation_feedback(nav_to_cmd_id) - if status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_REACHED_GOAL: + if ( + status.status + == graph_nav_pb2.NavigationFeedbackResponse.STATUS_REACHED_GOAL + ): return True, "Successfully completed the navigation commands!" elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_LOST: - return False, "Robot got lost when navigating the route, the robot will now sit down." + return ( + False, + "Robot got lost when navigating the route, the robot will now sit down.", + ) elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_STUCK: - return False, "Robot got stuck when navigating the route, the robot will now sit down." - elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_ROBOT_IMPAIRED: + return ( + False, + "Robot got stuck when navigating the route, the robot will now sit down.", + ) + elif ( + status.status + == graph_nav_pb2.NavigationFeedbackResponse.STATUS_ROBOT_IMPAIRED + ): return False, "Robot is impaired." else: return False, "Navigation command is not complete yet." @@ -1320,7 +1640,11 @@ def _navigate_route(self, *args): waypoint_ids = args[0] for i in range(len(waypoint_ids)): waypoint_ids[i] = graph_nav_util.find_unique_waypoint_id( - waypoint_ids[i], self._current_graph, self._current_annotation_name_to_wp_id, self._logger) + waypoint_ids[i], + self._current_graph, + self._current_annotation_name_to_wp_id, + self._logger, + ) if not waypoint_ids[i]: # Failed to find the unique waypoint id. return @@ -1337,7 +1661,12 @@ def _navigate_route(self, *args): edge_ids_list.append(edge_id) else: all_edges_found = False - self._logger.error("Failed to find an edge between waypoints: ", start_wp, " and ", end_wp) + self._logger.error( + "Failed to find an edge between waypoints: ", + start_wp, + " and ", + end_wp, + ) self._logger.error( "List the graph's waypoints and edges to ensure pairs of waypoints has an edge." ) @@ -1346,7 +1675,9 @@ def _navigate_route(self, *args): self._lease = self._lease_wallet.get_lease() if all_edges_found: if not self.toggle_power(should_power_on=True): - self._logger.error("Failed to power on the robot, and cannot complete navigate route request.") + self._logger.error( + "Failed to power on the robot, and cannot complete navigate route request." + ) return # Stop the lease keepalive and create a new sublease for graph nav. @@ -1361,8 +1692,11 @@ def _navigate_route(self, *args): # Issue the route command about twice a second such that it is easy to terminate the # navigation command (with estop or killing the program). nav_route_command_id = self._graph_nav_client.navigate_route( - route, cmd_duration=1.0, leases=[sublease.lease_proto]) - time.sleep(.5) # Sleep for half a second to allow for command execution. + route, cmd_duration=1.0, leases=[sublease.lease_proto] + ) + time.sleep( + 0.5 + ) # Sleep for half a second to allow for command execution. # Poll the robot for feedback to determine if the route is complete. Then sit # the robot down once it is finished. is_finished = self._check_success(nav_route_command_id) @@ -1388,12 +1722,17 @@ def toggle_power(self, should_power_on): motors_on = False while not motors_on: future = self._robot_state_client.get_robot_state_async() - state_response = future.result(timeout=10) # 10 second timeout for waiting for the state response. - if state_response.power_state.motor_power_state == robot_state_proto.PowerState.STATE_ON: + state_response = future.result( + timeout=10 + ) # 10 second timeout for waiting for the state response. + if ( + state_response.power_state.motor_power_state + == robot_state_proto.PowerState.STATE_ON + ): motors_on = True else: # Motors are not yet fully powered on. - time.sleep(.25) + time.sleep(0.25) elif is_powered_on and not should_power_on: # Safe power off (robot will sit then power down) when it is in a # powered-on state. @@ -1408,7 +1747,7 @@ def toggle_power(self, should_power_on): def check_is_powered_on(self): """Determine if the robot is powered on or off.""" power_state = self._robot_state_client.get_robot_state().power_state - self._powered_on = (power_state.motor_power_state == power_state.STATE_ON) + self._powered_on = power_state.motor_power_state == power_state.STATE_ON return self._powered_on def _check_success(self, command_id=-1): @@ -1417,16 +1756,26 @@ def _check_success(self, command_id=-1): # No command, so we have not status to check. return False status = self._graph_nav_client.navigation_feedback(command_id) - if status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_REACHED_GOAL: + if ( + status.status + == graph_nav_pb2.NavigationFeedbackResponse.STATUS_REACHED_GOAL + ): # Successfully completed the navigation commands! return True elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_LOST: - self._logger.error("Robot got lost when navigating the route, the robot will now sit down.") + self._logger.error( + "Robot got lost when navigating the route, the robot will now sit down." + ) return True elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_STUCK: - self._logger.error("Robot got stuck when navigating the route, the robot will now sit down.") + self._logger.error( + "Robot got stuck when navigating the route, the robot will now sit down." + ) return True - elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_ROBOT_IMPAIRED: + elif ( + status.status + == graph_nav_pb2.NavigationFeedbackResponse.STATUS_ROBOT_IMPAIRED + ): self._logger.error("Robot is impaired.") return True else: @@ -1440,10 +1789,14 @@ def _match_edge(self, current_edges, waypoint1, waypoint2): for edge_from_id in current_edges[edge_to_id]: if (waypoint1 == edge_to_id) and (waypoint2 == edge_from_id): # This edge matches the pair of waypoints! Add it the edge list and continue. - return map_pb2.Edge.Id(from_waypoint=waypoint2, to_waypoint=waypoint1) + return map_pb2.Edge.Id( + from_waypoint=waypoint2, to_waypoint=waypoint1 + ) elif (waypoint2 == edge_to_id) and (waypoint1 == edge_from_id): # This edge matches the pair of waypoints! Add it the edge list and continue. - return map_pb2.Edge.Id(from_waypoint=waypoint1, to_waypoint=waypoint2) + return map_pb2.Edge.Id( + from_waypoint=waypoint1, to_waypoint=waypoint2 + ) return None def dock(self, dock_id): @@ -1466,7 +1819,7 @@ def undock(self, timeout=20): # Maker sure we're powered on self._robot.power_on() # Undock the robot - blocking_undock(self._robot ,timeout) + blocking_undock(self._robot, timeout) return True, "Success" except Exception as e: return False, str(e) @@ -1474,4 +1827,4 @@ def undock(self, timeout=20): def get_docking_state(self, **kwargs): """Get docking state of robot.""" state = self._docking_client.get_docking_state(**kwargs) - return state \ No newline at end of file + return state