Skip to content

Commit

Permalink
native: Extend model writer to write full model
Browse files Browse the repository at this point in the history
  • Loading branch information
jonls committed Dec 2, 2016
1 parent 52e7d56 commit 041d8e6
Showing 1 changed file with 259 additions and 0 deletions.
259 changes: 259 additions & 0 deletions psamm/datasource/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from ..formula import Formula
from ..metabolicmodel import MetabolicModel
from ..database import DictDatabase
from ..util import mkdir_p
from .context import FilePathContext, FileMark
from .entry import (DictCompoundEntry as CompoundEntry,
DictReactionEntry as ReactionEntry)
Expand Down Expand Up @@ -922,6 +923,9 @@ def parse_model_file(path):
yield reaction_id


# Threshold for putting reactions into subsystem files
_MAX_REACTION_COUNT = 3

# Threshold for converting reactions into dictionary representation.
_MAX_REACTION_LENGTH = 10

Expand Down Expand Up @@ -1006,6 +1010,66 @@ def _decimal_representer(dumper, data):
return dumper.represent_scalar('tag:yaml.org,2002:float', value)


def _get_default_compartment(model):
"""Return what the default compartment should be set to.
If some compounds have no compartment, unique compartment
name is returned to avoid collisions.
"""
default_compartment = 'c'
default_key = set()
for reaction_id, reaction in iteritems(model.reactions):
if 'equation' not in reaction.properties:
continue

equation = reaction.properties['equation']
for compound, _ in equation.compounds:
default_key.add(compound.compartment)

if None in default_key and default_compartment in default_key:
suffix = 1
while True:
new_key = '{}_{}'.format(default_compartment, suffix)
if new_key not in default_key:
default_compartment = new_key
break
suffix += 1

if None in default_key:
logger.warning(
'Compound(s) found without compartment, default'
' compartment is {}.'.format(default_compartment))
return default_compartment


def _detect_best_flux_limit(model):
"""Detect the best default flux limit to use for model output.
The default flux limit does not change the model but selecting a good
value reduced the amount of output produced and reduces clutter in the
output files.
"""
flux_limit_count = Counter()

for reaction_id, reaction in iteritems(model.reactions):
if 'equation' not in reaction.properties:
continue

equation = reaction.properties['equation']
if reaction_id in model.limits:
lower_flux, upper_flux = model.limits[reaction_id]
if equation.direction.forward and upper_flux is not None:
flux_limit_count[upper_flux] += 1
if equation.direction.reverse and lower_flux is not None:
flux_limit_count[-lower_flux] += 1

if len(flux_limit_count) == 0:
return None

best_flux_limit, _ = flux_limit_count.most_common(1)[0]
return best_flux_limit


class ModelWriter(object):
"""Writer for native (YAML) format."""

Expand Down Expand Up @@ -1084,6 +1148,137 @@ def is_equation_valid(equation):

return d

def convert_limits_entry(self, limit, equation, default_flux_limit):
"""Return reaction flux limit as YAML dict."""

reaction_id, (lower_flux, upper_flux) = limit

# Determine the default flux limits. If the value is already at the
# default it does not need to be included in the output.
lower_default, upper_default = None, None
if default_flux_limit is not None:
if equation.direction.reverse:
lower_default = -default_flux_limit
else:
lower_default = 0.0

if equation.direction.forward:
upper_default = default_flux_limit
else:
upper_default = 0.0

d = None
if ((lower_flux is not None and lower_flux != lower_default) or
(upper_flux is not None and upper_flux != upper_default)):
d = OrderedDict([('reaction', reaction_id)])
if (lower_flux is not None and upper_flux is not None and
lower_flux == upper_flux):
d['fixed'] = upper_flux
else:
if lower_flux is not None and lower_flux != lower_default:
d['lower'] = lower_flux
if upper_flux is not None and upper_flux != upper_default:
d['upper'] = upper_flux

return d

def _get_limits_entries(self, model):
for _, reaction in sorted(iteritems(model.reactions)):
if 'equation' not in reaction.properties:
continue
if reaction.id not in model.limits:
continue

limit = reaction.id, model.limits[reaction.id]
equation = reaction.properties['equation']
yield self.convert_limits_entry(
limit, equation, model.default_flux_limit)

def convert_medium(self, medium, default_flux_limit):
# Determine the default flux limits. If the value is already at the
# default it does not need to be included in the output.
lower_default, upper_default = None, None
if default_flux_limit is not None:
lower_default = -default_flux_limit
upper_default = default_flux_limit

compounds = []
for compound, (reaction_id, lower, upper) in iteritems(medium):
c = OrderedDict([('id', compound.name)])
c['reaction'] = reaction_id

if lower is not None and upper is not None and lower == upper:
c['fixed'] = lower
else:
if lower is not None and lower != lower_default:
c['lower'] = lower
if upper is not None and upper != upper_default:
c['upper'] = upper

compounds.append(c)

medium_d = OrderedDict([('name', 'Default medium')])
medium_d['compounds'] = compounds

return medium_d

def write_model(self, model, dest, split_subsystem=True):
"""Write the given model to YAML files in dest folder."""

mkdir_p(dest)

entries = (entry for _, entry in sorted(iteritems(model.compounds)))
with open(os.path.join(dest, 'compounds.yaml'), 'w') as f:
self.write_compounds(f, entries)

if model.default_flux_limit is None:
model.default_flux_limit = _detect_best_flux_limit(model)
if model.default_compartment is None:
model.default_compartment = _get_default_compartment(model)

if model.default_flux_limit is not None:
logger.info('Using default flux limit of {}'.format(
model.default_flux_limit))

entries = (entry for _, entry in sorted(iteritems(model.reactions)))
reaction_files = self.write_reactions_to_files(
entries, dest, split_subsystem)

medium_d = self.convert_medium(model.medium, model.default_flux_limit)
if medium_d is not None:
with open(os.path.join(dest, 'medium.yaml'), 'w') as f:
self._dump(f, medium_d)

reaction_limits = [
entry for entry in self._get_limits_entries(model)
if entry is not None]
if len(reaction_limits) > 0:
with open(os.path.join(dest, 'limits.yaml'), 'w') as f:
self._dump(f, reaction_limits)

model_d = OrderedDict([('name', model.name)])

if model.biomass_reaction is not None:
model_d['biomass'] = model.biomass_reaction
if model.default_flux_limit is not None:
model_d['default_flux_limit'] = model.default_flux_limit
if model.extracellular_compartment != 'e':
model_d['extracellular'] = model.extracellular_compartment
if model.default_compartment != 'c':
model_d['default'] = model.default_compartment
model_d['compounds'] = [{'include': 'compounds.yaml'}]
model_d['reactions'] = [
{'include': filename} for filename in reaction_files]

if medium_d is not None:
model_d['media'] = [{'include': 'medium.yaml'}]

if len(reaction_limits) > 0:
model_d['limits'] = [{'include': 'limits.yaml'}]

with open(os.path.join(dest, 'model.yaml'), 'w') as f:
self._dump(f, model_d)

def write_compounds(self, stream, compounds, properties=None):
"""Write iterable of compounds as YAML object to stream.
Expand Down Expand Up @@ -1127,3 +1322,67 @@ def iter_entries():
yield entry

self._dump(stream, list(iter_entries()))

def write_reactions_to_files(self, reactions, dest, split_subsystem):
"""Turn the reaction subsystems into their own files.
If a subsystem has a number of reactions over the threshold, it gets
its own YAML file. All other reactions, those that don't have a
subsystem or are in a subsystem that falls below the threshold, get
added to a common reaction file.
Args:
reactions: Iterable of reaction entries.
dest: Where to write output files.
split_subsystem: Whether to split into separate files.
"""
def safe_file_name(origin_name):
safe_name = re.sub(
r'\W+', '_', origin_name, flags=re.UNICODE)
safe_name = re.sub(
r'_+', '_', safe_name.lower(), flags=re.UNICODE)
safe_name = safe_name.strip('_')
return safe_name

common_reactions_file = 'reactions.yaml'
common_reactions = []
reaction_files = []
if not split_subsystem:
common_reactions.extend(reactions)
else:
subsystems = {}
for reaction in reactions:
if reaction.subsystem is not None:
subsystem_file = safe_file_name(reaction.subsystem)
subsystems.setdefault(subsystem_file, []).append(reaction)
else:
common_reactions.append(reaction)

subsystem_folder = 'reactions'
sub_existence = False
for subsystem_name, reactions in iteritems(subsystems):
if len(reactions) < _MAX_REACTION_COUNT:
common_reactions.extend(reactions)
else:
mkdir_p(os.path.join(dest, subsystem_folder))
subsystem_file = os.path.join(
dest, subsystem_folder,
'{}.yaml'.format(subsystem_name))
with open(subsystem_file, 'w') as f:
self.write_reactions(f, reactions)
reaction_files.append(subsystem_file)
sub_existence = True

reaction_files.sort()

if sub_existence:
common_reactions_file = os.path.join(
subsystem_folder, 'other_reactions.yaml')

# Write common reactions
if len(common_reactions) > 0:
with open(os.path.join(dest, common_reactions_file), 'w') as f:
self.write_reactions(f, common_reactions)
reaction_files.append(common_reactions_file)

return reaction_files

0 comments on commit 041d8e6

Please sign in to comment.