Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ready for Review] Expansion probabilities #150

Merged
merged 9 commits into from
Oct 26, 2021
1 change: 1 addition & 0 deletions cassiopeia/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from .autocorrelation import compute_morans_i
from .branch_length_estimator import IIDExponentialBayesian, IIDExponentialMLE
from .small_parsimony import fitch_count, fitch_hartigan, score_small_parsimony
from .topology import compute_expansion_pvalues
126 changes: 126 additions & 0 deletions cassiopeia/tools/topology.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""
Utilities to assess topological properties of a phylogeny, such as balance
and expansion.
"""
from typing import Union

import math
import numpy as np
import pandas as pd

from cassiopeia.data import CassiopeiaTree
from cassiopeia.mixins import CassiopeiaError


def compute_expansion_pvalues(
tree: CassiopeiaTree,
min_clade_size: int = 10,
min_depth: int = 1,
copy: bool = False,
) -> Union[CassiopeiaTree, None]:
"""Call expansion pvalues on a tree.

Uses the methodology described in Yang, Jones et al, BioRxiv (2021) to
assess the expansion probability of a given subclade of a phylogeny.
Mathematical treatment of the coalescent probability is described in
Griffiths and Tavare, Stochastic Models (1998).

The probability computed corresponds to the probability that, under a simple
neutral coalescent model, a given subclade contains the observed number of
cells; in other words, a one-sided p-value. Often, if the probability is
less than some threshold (e.g., 0.05), this might indicate that there exists
some subclade under this node that to which this expansion probability can
be attributed (i.e. the null hypothesis that the subclade is undergoing
neutral drift can be rejected).

This function will add an attribute "expansion_pvalue" to the tree, and
return None unless :param:`copy` is set to True.

On a typical balanced tree, this function will perform in O(n log n) time,
but can be up to O(n^3) on highly unbalanced trees. A future endeavor may
be to impelement the function in O(n) time.

Args:
tree: CassiopeiaTree
min_clade_size: Minimum number of leaves in a subtree to be considered.
min_depth: Minimum depth of clade to be considered. Depth is measured
in number of nodes from the root, not branch lengths.
copy: Return copy.

Returns:
If copy is set to False, returns the tree with attributes added
in place. Else, returns a new CassiopeiaTree.
"""

tree = tree.copy() if copy else tree

# instantiate attributes
_depths = {}
for node in tree.depth_first_traverse_nodes(postorder=False):
tree.set_attribute(node, "expansion_pvalue", 1.0)

if tree.is_root(node):
_depths[node] = 0
else:
_depths[node] = (_depths[tree.parent(node)] + 1)

for node in tree.depth_first_traverse_nodes(postorder=False):

n = len(tree.leaves_in_subtree(node))

k = len(tree.children(node))
for c in tree.children(node):

if len(tree.leaves_in_subtree(c)) < min_clade_size:
continue

depth = _depths[c]
if depth < min_depth:
continue

b = len(tree.leaves_in_subtree(c))

# this value below is a simplification of the quantity:
# sum[simple_coalescent_probability(n, b2, k) for \
# b2 in range(b, n - k + 2)]
p = nCk(n-b, k-1) / nCk(n-1, k-1)

tree.set_attribute(c, "expansion_pvalue", p)

return tree if copy else None


def simple_coalescent_probability(n: int, b: int, k: int) -> float:
"""Simple coalescent probability of imbalance.

Assuming a simple coalescent model, compute the probability that a given
lineage has exactly b samples, given there are n cells and k lineages
overall.

Args:
n: Number of leaves in subtree
b: Number of leaves in one lineage
k: Number of lineages
Returns:
Probability of observing b leaves on one lineage in a tree of n total
leaves
"""
return nCk(n - b - 1, k - 2) / nCk(n - 1, k - 1)


def nCk(n: int, k: int) -> float:
"""Compute the quantity n choose k.

Args:
n: Number of items total.
k: Number of items to choose.

Returns:
The number of ways to choose k items from n.
"""

if k > n:
raise CassiopeiaError("Argument k cannot be larger than n.")

f = math.factorial
return f(n) // f(k) // f(n - k)
1 change: 1 addition & 0 deletions docs/api/tools.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Small-Parsimony
.. autosummary::
:toctree: reference/

tl.compute_expansion_pvalues
tl.compute_morans_i
tl.fitch_count
tl.fitch_hartigan
Expand Down
180 changes: 180 additions & 0 deletions test/tools_tests/topology_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""
Test suite for the topology functions in
cassiopeia/tools/topology.py
"""
import unittest

import networkx as nx
import numpy as np
import pandas as pd

import cassiopeia as cas
from cassiopeia.mixins import CassiopeiaError, CassiopeiaTreeError
from cassiopeia.tools import topology


class TestTopology(unittest.TestCase):
def setUp(self) -> None:

tree = nx.DiGraph()
tree.add_edge("0", "1")
tree.add_edge("0", "2")
tree.add_edge("1", "3")
tree.add_edge("1", "4")
tree.add_edge("1", "5")
tree.add_edge("2", "6")
tree.add_edge("2", "7")
tree.add_edge("3", "8")
tree.add_edge("3", "9")
tree.add_edge("7", "10")
tree.add_edge("7", "11")
tree.add_edge("8", "12")
tree.add_edge("8", "13")
tree.add_edge("9", "14")
tree.add_edge("9", "15")
tree.add_edge("3", "16")
tree.add_edge("16", "17")
tree.add_edge("16", "18")

self.tree = cas.data.CassiopeiaTree(tree=tree)

def test_simple_choose_function(self):

num_choices = topology.nCk(10, 2)

self.assertEqual(num_choices, 45)

self.assertRaises(CassiopeiaError, topology.nCk, 5, 7)

def test_simple_coalescent_probability(self):

N = 100
B = 2
K = 60
coalescent_probability = topology.simple_coalescent_probability(N, B, K)
self.assertAlmostEqual(coalescent_probability, 0.24, delta=0.01)

self.assertRaises(
CassiopeiaError, topology.simple_coalescent_probability, 50, 2, 60
)

def test_expansion_probability(self):

# make sure attributes are instantiated correctly
cas.tl.compute_expansion_pvalues(self.tree, min_clade_size=20)
for node in self.tree.depth_first_traverse_nodes(postorder=False):
self.assertEqual(
1.0, self.tree.get_attribute(node, "expansion_pvalue")
)

cas.tl.compute_expansion_pvalues(self.tree, min_clade_size=2)
expected_probabilities = {
"0": 1.0,
"1": 0.3,
"2": 0.8,
"3": 0.047,
"4": 1.0,
"5": 1.0,
"6": 1.0,
"7": 0.5,
"8": 0.6,
"9": 0.6,
"10": 1.0,
"11": 1.0,
"12": 1.0,
"13": 1.0,
"14": 1.0,
"15": 1.0,
"16": 0.6,
"17": 1.0,
"18": 1.0,
}

for node in self.tree.depth_first_traverse_nodes(postorder=False):
expected = expected_probabilities[node]
self.assertAlmostEqual(
expected,
self.tree.get_attribute(node, "expansion_pvalue"),
delta=0.01,
)

def test_expansion_probability_variable_depths(self):

cas.tl.compute_expansion_pvalues(self.tree, min_clade_size=2, min_depth=3)
expected_probabilities = {
"0": 1.0,
"1": 1.0,
"2": 1.0,
"3": 1.0,
"4": 1.0,
"5": 1.0,
"6": 1.0,
"7": 1.0,
"8": 0.6,
"9": 0.6,
"10": 1.0,
"11": 1.0,
"12": 1.0,
"13": 1.0,
"14": 1.0,
"15": 1.0,
"16": 0.6,
"17": 1.0,
"18": 1.0,
}

for node in self.tree.depth_first_traverse_nodes(postorder=False):
expected = expected_probabilities[node]
self.assertAlmostEqual(
expected,
self.tree.get_attribute(node, "expansion_pvalue"),
delta=0.01,
)

def test_expansion_probability_copy_tree(self):

tree = cas.tl.compute_expansion_pvalues(
self.tree, min_clade_size=2, min_depth=1, copy=True
)

expected_probabilities = {
"0": 1.0,
"1": 0.3,
"2": 0.8,
"3": 0.047,
"4": 1.0,
"5": 1.0,
"6": 1.0,
"7": 0.5,
"8": 0.6,
"9": 0.6,
"10": 1.0,
"11": 1.0,
"12": 1.0,
"13": 1.0,
"14": 1.0,
"15": 1.0,
"16": 0.6,
"17": 1.0,
"18": 1.0,
}

for node in self.tree.depth_first_traverse_nodes(postorder=False):
expected_copy = expected_probabilities[node]

self.assertAlmostEqual(
expected_copy,
tree.get_attribute(node, "expansion_pvalue"),
delta=0.01,
)

self.assertRaises(
CassiopeiaTreeError,
self.tree.get_attribute,
node,
"expansion_pvalue",
)


if __name__ == "__main__":
unittest.main()