diff --git a/movement/kinematics.py b/movement/kinematics.py index a6e0b0b9..12e1514f 100644 --- a/movement/kinematics.py +++ b/movement/kinematics.py @@ -7,7 +7,8 @@ import xarray as xr from scipy.spatial.distance import cdist -from movement.utils.logging import log_error +from movement.utils.logging import log_error, log_warning +from movement.utils.reports import report_nan_values from movement.utils.vector import compute_norm from movement.validators.arrays import validate_dims_coords @@ -173,6 +174,30 @@ def compute_time_derivative(data: xr.DataArray, order: int) -> xr.DataArray: return result +def compute_speed(data: xr.DataArray) -> xr.DataArray: + """Compute instantaneous speed at each time point. + + Speed is a scalar quantity computed as the Euclidean norm (magnitude) + of the velocity vector at each time point. + + + Parameters + ---------- + data : xarray.DataArray + The input data containing position information, with ``time`` + and ``space`` (in Cartesian coordinates) as required dimensions. + + Returns + ------- + xarray.DataArray + An xarray DataArray containing the computed speed, + with dimensions matching those of the input data, + except ``space`` is removed. + + """ + return compute_norm(compute_velocity(data)) + + def compute_forward_vector( data: xr.DataArray, left_keypoint: str, @@ -675,3 +700,164 @@ def _validate_type_data_array(data: xr.DataArray) -> None: TypeError, f"Input data must be an xarray.DataArray, but got {type(data)}.", ) + + +def compute_path_length( + data: xr.DataArray, + start: float | None = None, + stop: float | None = None, + nan_policy: Literal["ffill", "scale"] = "ffill", + nan_warn_threshold: float = 0.2, +) -> xr.DataArray: + """Compute the length of a path travelled between two time points. + + The path length is defined as the sum of the norms (magnitudes) of the + displacement vectors between two time points ``start`` and ``stop``, + which should be provided in the time units of the data array. + If not specified, the minimum and maximum time coordinates of the data + array are used as start and stop times, respectively. + + Parameters + ---------- + data : xarray.DataArray + The input data containing position information, with ``time`` + and ``space`` (in Cartesian coordinates) as required dimensions. + start : float, optional + The start time of the path. If None (default), + the minimum time coordinate in the data is used. + stop : float, optional + The end time of the path. If None (default), + the maximum time coordinate in the data is used. + nan_policy : Literal["ffill", "scale"], optional + Policy to handle NaN (missing) values. Can be one of the ``"ffill"`` + or ``"scale"``. Defaults to ``"ffill"`` (forward fill). + See Notes for more details on the two policies. + nan_warn_threshold : float, optional + If more than this proportion of values are missing in any point track, + a warning will be emitted. Defaults to 0.2 (20%). + + Returns + ------- + xarray.DataArray + An xarray DataArray containing the computed path length, + with dimensions matching those of the input data, + except ``time`` and ``space`` are removed. + + Notes + ----- + Choosing ``nan_policy="ffill"`` will use :meth:`xarray.DataArray.ffill` + to forward-fill missing segments (NaN values) across time. + This equates to assuming that a track remains stationary for + the duration of the missing segment and then instantaneously moves to + the next valid position, following a straight line. This approach tends + to underestimate the path length, and the error increases with the number + of missing values. + + Choosing ``nan_policy="scale"`` will adjust the path length based on the + the proportion of valid segments per point track. For example, if only + 80% of segments are present, the path length will be computed based on + these and the result will be divided by 0.8. This approach assumes + that motion dynamics are similar across observed and missing time + segments, which may not accurately reflect actual conditions. + + """ + validate_dims_coords(data, {"time": [], "space": []}) + data = data.sel(time=slice(start, stop)) + # Check that the data is not empty or too short + n_time = data.sizes["time"] + if n_time < 2: + raise log_error( + ValueError, + f"At least 2 time points are required to compute path length, " + f"but {n_time} were found. Double-check the start and stop times.", + ) + + _warn_about_nan_proportion(data, nan_warn_threshold) + + if nan_policy == "ffill": + return compute_norm( + compute_displacement(data.ffill(dim="time")).isel( + time=slice(1, None) + ) # skip first displacement (always 0) + ).sum(dim="time", min_count=1) # return NaN if no valid segment + + elif nan_policy == "scale": + return _compute_scaled_path_length(data) + else: + raise log_error( + ValueError, + f"Invalid value for nan_policy: {nan_policy}. " + "Must be one of 'ffill' or 'scale'.", + ) + + +def _warn_about_nan_proportion( + data: xr.DataArray, nan_warn_threshold: float +) -> None: + """Print a warning if the proportion of NaN values exceeds a threshold. + + The NaN proportion is evaluated per point track, and a given point is + considered NaN if any of its ``space`` coordinates are NaN. The warning + specifically lists the point tracks that exceed the threshold. + + Parameters + ---------- + data : xarray.DataArray + The input data array. + nan_warn_threshold : float + The threshold for the proportion of NaN values. Must be a number + between 0 and 1. + + """ + nan_warn_threshold = float(nan_warn_threshold) + if not 0 <= nan_warn_threshold <= 1: + raise log_error( + ValueError, + "nan_warn_threshold must be between 0 and 1.", + ) + + n_nans = data.isnull().any(dim="space").sum(dim="time") + data_to_warn_about = data.where( + n_nans > data.sizes["time"] * nan_warn_threshold, drop=True + ) + if len(data_to_warn_about) > 0: + log_warning( + "The result may be unreliable for point tracks with many " + "missing values. The following tracks have more than " + f"{nan_warn_threshold * 100:.3} % NaN values:", + ) + print(report_nan_values(data_to_warn_about)) + + +def _compute_scaled_path_length( + data: xr.DataArray, +) -> xr.DataArray: + """Compute scaled path length based on proportion of valid segments. + + Path length is first computed based on valid segments (non-NaN values + on both ends of the segment) and then scaled based on the proportion of + valid segments per point track - i.e. the result is divided by the + proportion of valid segments. + + Parameters + ---------- + data : xarray.DataArray + The input data containing position information, with ``time`` + and ``space`` (in Cartesian coordinates) as required dimensions. + + Returns + ------- + xarray.DataArray + An xarray DataArray containing the computed path length, + with dimensions matching those of the input data, + except ``time`` and ``space`` are removed. + + """ + # Skip first displacement segment (always 0) to not mess up the scaling + displacement = compute_displacement(data).isel(time=slice(1, None)) + # count number of valid displacement segments per point track + valid_segments = (~displacement.isnull()).all(dim="space").sum(dim="time") + # compute proportion of valid segments per point track + valid_proportion = valid_segments / (data.sizes["time"] - 1) + # return scaled path length + return compute_norm(displacement).sum(dim="time") / valid_proportion diff --git a/tests/conftest.py b/tests/conftest.py index 4de80c31..2a78e3a7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -518,6 +518,40 @@ def valid_poses_dataset_uniform_linear_motion( ) +@pytest.fixture +def valid_poses_dataset_uniform_linear_motion_with_nans( + valid_poses_dataset_uniform_linear_motion, +): + """Return a valid poses dataset with NaN values in the position array. + + Specifically, we will introducde: + - 1 NaN value in the centroid keypoint of individual id_1 at time=0 + - 5 NaN values in the left keypoint of individual id_1 (frames 3-7) + - 10 NaN values in the right keypoint of individual id_1 (all frames) + """ + valid_poses_dataset_uniform_linear_motion.position.loc[ + { + "individuals": "id_1", + "keypoints": "centroid", + "time": 0, + } + ] = np.nan + valid_poses_dataset_uniform_linear_motion.position.loc[ + { + "individuals": "id_1", + "keypoints": "left", + "time": slice(3, 7), + } + ] = np.nan + valid_poses_dataset_uniform_linear_motion.position.loc[ + { + "individuals": "id_1", + "keypoints": "right", + } + ] = np.nan + return valid_poses_dataset_uniform_linear_motion + + # -------------------- Invalid datasets fixtures ------------------------------ @pytest.fixture def not_a_dataset(): diff --git a/tests/test_unit/test_kinematics.py b/tests/test_unit/test_kinematics.py index 18cfa2db..4a68a23f 100644 --- a/tests/test_unit/test_kinematics.py +++ b/tests/test_unit/test_kinematics.py @@ -1,4 +1,5 @@ import re +from contextlib import nullcontext as does_not_raise import numpy as np import pytest @@ -43,6 +44,13 @@ np.zeros((10, 2)), # Individual 1 ], ), + ( + "speed", # magnitude of velocity + [ + np.ones(10) * np.sqrt(2), # Individual 0 + np.ones(10) * np.sqrt(2), # Individual 1 + ], + ), ], ) def test_kinematics_uniform_linear_motion( @@ -74,19 +82,24 @@ def test_kinematics_uniform_linear_motion( position ) + # Figure out which dimensions to expect in kinematic_array + # and in the final xarray.DataArray + expected_dims = ["time", "individuals"] + if kinematic_variable in ["displacement", "velocity", "acceleration"]: + expected_dims.append("space") + # Build expected data array from the expected numpy array expected_array = xr.DataArray( - np.stack(expected_kinematics, axis=1), # Stack along the "individuals" axis - dims=["time", "individuals", "space"], + np.stack(expected_kinematics, axis=1), + dims=expected_dims, ) if "keypoints" in position.coords: expected_array = expected_array.expand_dims( {"keypoints": position.coords["keypoints"].size} ) - expected_array = expected_array.transpose( - "time", "individuals", "keypoints", "space" - ) + expected_dims.insert(2, "keypoints") + expected_array = expected_array.transpose(*expected_dims) # Compare the values of the kinematic_array against the expected_array np.testing.assert_allclose(kinematic_array.values, expected_array.values) @@ -105,6 +118,7 @@ def test_kinematics_uniform_linear_motion( ("displacement", [5, 0]), # individual 0, individual 1 ("velocity", [6, 0]), ("acceleration", [7, 0]), + ("speed", [6, 0]), ], ) def test_kinematics_with_dataset_with_nans( @@ -134,10 +148,13 @@ def test_kinematics_with_dataset_with_nans( ] # expected nans per individual adjusted for space and keypoints dimensions + if "space" in kinematic_array.dims: + n_space_dims = position.sizes["space"] + else: + n_space_dims = 1 + expected_nans_adjusted = [ - n - * valid_dataset.sizes["space"] - * valid_dataset.sizes.get("keypoints", 1) + n * n_space_dims * valid_dataset.sizes.get("keypoints", 1) for n in expected_nans_per_individual ] # check number of nans per individual is as expected in kinematic array @@ -163,6 +180,7 @@ def test_kinematics_with_dataset_with_nans( "displacement", "velocity", "acceleration", + "speed", ], ) def test_kinematics_with_invalid_dataset( @@ -186,6 +204,175 @@ def test_approximate_derivative_with_invalid_order(order): kinematics.compute_time_derivative(data, order=order) +time_points_value_error = pytest.raises( + ValueError, + match="At least 2 time points are required to compute path length", +) + + +@pytest.mark.parametrize( + "start, stop, expected_exception", + [ + # full time ranges + (None, None, does_not_raise()), + (0, None, does_not_raise()), + (0, 9, does_not_raise()), + (0, 10, does_not_raise()), # xarray.sel will truncate to 0, 9 + (-1, 9, does_not_raise()), # xarray.sel will truncate to 0, 9 + # partial time ranges + (1, 8, does_not_raise()), + (1.5, 8.5, does_not_raise()), + (2, None, does_not_raise()), + # Empty time ranges + (9, 0, time_points_value_error), # start > stop + ("text", 9, time_points_value_error), # invalid start type + # Time range too short + (0, 0.5, time_points_value_error), + ], +) +def test_path_length_across_time_ranges( + valid_poses_dataset_uniform_linear_motion, + start, + stop, + expected_exception, +): + """Test path length computation for a uniform linear motion case, + across different time ranges. + + The test dataset ``valid_poses_dataset_uniform_linear_motion`` + contains 2 individuals ("id_0" and "id_1"), moving + along x=y and x=-y lines, respectively, at a constant velocity. + At each frame they cover a distance of sqrt(2) in x-y space, so in total + we expect a path length of sqrt(2) * num_segments, where num_segments is + the number of selected frames minus 1. + """ + position = valid_poses_dataset_uniform_linear_motion.position + with expected_exception: + path_length = kinematics.compute_path_length( + position, start=start, stop=stop + ) + + # Expected number of segments (displacements) in selected time range + num_segments = 9 # full time range: 10 frames - 1 + start = max(0, start) if start is not None else 0 + stop = min(9, stop) if stop is not None else 9 + if start is not None: + num_segments -= np.ceil(max(0, start)) + if stop is not None: + stop = min(9, stop) + num_segments -= 9 - np.floor(min(9, stop)) + + expected_path_length = xr.DataArray( + np.ones((2, 3)) * np.sqrt(2) * num_segments, + dims=["individuals", "keypoints"], + coords={ + "individuals": position.coords["individuals"], + "keypoints": position.coords["keypoints"], + }, + ) + xr.testing.assert_allclose(path_length, expected_path_length) + + +@pytest.mark.parametrize( + "nan_policy, expected_path_lengths_id_1, expected_exception", + [ + ( + "ffill", + np.array([np.sqrt(2) * 8, np.sqrt(2) * 9, np.nan]), + does_not_raise(), + ), + ( + "scale", + np.array([np.sqrt(2) * 9, np.sqrt(2) * 9, np.nan]), + does_not_raise(), + ), + ( + "invalid", # invalid value for nan_policy + np.zeros(3), + pytest.raises(ValueError, match="Invalid value for nan_policy"), + ), + ], +) +def test_path_length_with_nans( + valid_poses_dataset_uniform_linear_motion_with_nans, + nan_policy, + expected_path_lengths_id_1, + expected_exception, +): + """Test path length computation for a uniform linear motion case, + with varying number of missing values per individual and keypoint. + + The test dataset ``valid_poses_dataset_uniform_linear_motion_with_nans`` + contains 2 individuals ("id_0" and "id_1"), moving + along x=y and x=-y lines, respectively, at a constant velocity. + At each frame they cover a distance of sqrt(2) in x-y space. + + Individual "id_1" has some missing values per keypoint: + - "centroid" is missing a value on the very first frame + - "left" is missing 5 values in middle frames (not at the edges) + - "right" is missing values in all frames + + Individual "id_0" has no missing values. + + Because the underlying motion is uniform linear, the "scale" policy should + perfectly restore the path length for individual "id_1" to its true value. + The "ffill" policy should do likewise if frames are missing in the middle, + but will not "correct" for missing values at the edges. + """ + position = valid_poses_dataset_uniform_linear_motion_with_nans.position + with expected_exception: + path_length = kinematics.compute_path_length( + position, + nan_policy=nan_policy, + ) + # Get path_length for individual "id_1" as a numpy array + path_length_id_1 = path_length.sel(individuals="id_1").values + # Check them against the expected values + np.testing.assert_allclose( + path_length_id_1, expected_path_lengths_id_1 + ) + + +@pytest.mark.parametrize( + "nan_warn_threshold, expected_exception", + [ + (1, does_not_raise()), + (0.2, does_not_raise()), + (-1, pytest.raises(ValueError, match="between 0 and 1")), + ], +) +def test_path_length_warns_about_nans( + valid_poses_dataset_uniform_linear_motion_with_nans, + nan_warn_threshold, + expected_exception, + caplog, +): + """Test that a warning is raised when the number of missing values + exceeds a given threshold. + + See the docstring of ``test_path_length_with_nans`` for details + about what's in the dataset. + """ + position = valid_poses_dataset_uniform_linear_motion_with_nans.position + with expected_exception: + kinematics.compute_path_length( + position, nan_warn_threshold=nan_warn_threshold + ) + + if (nan_warn_threshold > 0.1) and (nan_warn_threshold < 0.5): + # Make sure that a warning was emitted + assert caplog.records[0].levelname == "WARNING" + assert "The result may be unreliable" in caplog.records[0].message + # Make sure that the NaN report only mentions + # the individual and keypoint that violate the threshold + assert caplog.records[1].levelname == "INFO" + assert "Individual: id_1" in caplog.records[1].message + assert "Individual: id_2" not in caplog.records[1].message + assert "left: 5/10 (50.0%)" in caplog.records[1].message + assert "right: 10/10 (100.0%)" in caplog.records[1].message + assert "centroid" not in caplog.records[1].message + + @pytest.fixture def valid_data_array_for_forward_vector(): """Return a position data array for an individual with 3 keypoints