Skip to content

Commit

Permalink
Merge pull request #51 from sybila/fix-replication
Browse files Browse the repository at this point in the history
Fix replication handling
  • Loading branch information
xtrojak authored Mar 23, 2022
2 parents 89a5695 + 5a6228a commit 60ea994
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 6 deletions.
5 changes: 5 additions & 0 deletions Galaxy/tools/GenerateTS/GenerateTS.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import argparse
import numpy as np

import sys, os
# this add to path eBCSgen home dir, so it can be called from anywhere
sys.path.append('/home/xtrojak/Documents/GITs/eBCSgen/')

from eBCSgen.Parsing.ParseBCSL import Parser, load_TS_from_json
from eBCSgen.Errors.ModelParsingError import ModelParsingError
from eBCSgen.Errors.UnspecifiedParsingError import UnspecifiedParsingError
Expand Down Expand Up @@ -53,6 +57,7 @@
if model.success:
if eval(args.direct):
ts = model.data.generate_direct_transition_system(args.max_time, args.max_size, args.bound)
ts.change_to_vector_backend()
else:
vm = model.data.to_vector_model(args.bound)
ts = vm.generate_transition_system(ts, args.max_time, args.max_size)
Expand Down
88 changes: 82 additions & 6 deletions eBCSgen/Core/Rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,55 @@ def rate_to_vector(self, ordering, definitions: dict):
self.rate.vectorize(ordering, definitions)

def create_reactions(self, atomic_signature: dict, structure_signature: dict) -> set:
"""
Create all possible reactions.
Decide if rule is of replication type and call corresponding lower level method.
:param atomic_signature: given mapping of atomic name to possible states
:param structure_signature: given mapping of structure name to possible atomics
:return: set of created reactions
"""
unique_lhs_indices = set(column(self.pairs, 0))
if len(self.pairs) > 1 and len(unique_lhs_indices) == 1 and None not in unique_lhs_indices:
# should be the replication rule
return self._create_replication_reactions(atomic_signature, structure_signature)
else:
return self._create_normal_reactions(atomic_signature, structure_signature)

def _create_replication_reactions(self, atomic_signature: dict, structure_signature: dict) -> set:
"""
Create reaction from rule of special form for replication (A -> 2 A)
:param atomic_signature: given mapping of atomic name to possible states
:param structure_signature: given mapping of structure name to possible atomics
:return: set of created reactions
"""
# create only for first pair
l, r = self.pairs[0]
left = self.agents[l]
right = self.agents[r]
results = left.add_context(right, atomic_signature, structure_signature)

reactions = set()
for result in results:
new_agents = list(result)
# replicate RHS agent n times
for _ in range(len(self.pairs)):
new_agents.append(deepcopy(new_agents[-1]))
new_rule = Rule(tuple(new_agents), self.mid, self.compartments,
self.complexes, self.pairs, self.rate, self.label)
reactions.add(new_rule.to_reaction())

return reactions

def _create_normal_reactions(self, atomic_signature: dict, structure_signature: dict) -> set:
"""
Adds context to all agents and generated all possible combinations.
Then, new rules with these enhances agents are generated and converted to Reactions.
:param atomic_signature: given mapping of atomic name to possible states
:param structure_signature: given mapping of structure name to possible atomics
:return:
:return: set of created reactions
"""
results = []
for (l, r) in self.pairs:
Expand All @@ -129,11 +171,13 @@ def create_reactions(self, atomic_signature: dict, structure_signature: dict) ->
left = self.agents[l]
right = self.agents[r]
results.append(left.add_context(right, atomic_signature, structure_signature))

reactions = set()
for result in itertools.product(*results):
new_agents = tuple(filter(None, column(result, 0) + column(result, 1)))
new_rule = Rule(new_agents, self.mid, self.compartments, self.complexes, self.pairs, self.rate, self.label)
reactions.add(new_rule.to_reaction())

return reactions

def compatible(self, other: 'Rule') -> bool:
Expand Down Expand Up @@ -231,19 +275,51 @@ def replace(self, aligned_match):
:return: multiset replaced according to the match
"""
# replace respective agents

unique_lhs_indices = set(column(self.pairs, 0))
if len(self.pairs) > 1 and len(unique_lhs_indices) == 1 and \
None not in unique_lhs_indices and len(aligned_match) == 1:
resulting_rhs = self._replace_replicated_rhs(aligned_match[0])
else:
resulting_rhs = self._replace_normal_rhs(aligned_match)

# construct resulting complexes
output_complexes = []
for (f, t) in list(filter(lambda item: item[0] >= self.mid, self.complexes)):
output_complexes.append(Complex(resulting_rhs[f - self.mid:t - self.mid + 1], self.compartments[f]))

return Multiset(collections.Counter(output_complexes))

def _replace_normal_rhs(self, aligned_match):
"""
Replace in normal mode.
:param aligned_match: complexes fitting LHS of the rule
:return: RHS with replaced agents
"""
resulting_rhs = []
for i, rhs_agent in enumerate(self.agents[self.mid:]):
if len(aligned_match) <= i:
resulting_rhs.append(rhs_agent)
else:
resulting_rhs.append(rhs_agent.replace(aligned_match[i]))
return resulting_rhs

# construct resulting complexes
output_complexes = []
for (f, t) in list(filter(lambda item: item[0] >= self.mid, self.complexes)):
output_complexes.append(Complex(resulting_rhs[f - self.mid:t - self.mid + 1], self.compartments[f]))
def _replace_replicated_rhs(self, aligned_agent):
"""
Replace in replication mode (special form for replication A -> 2 A)
return Multiset(collections.Counter(output_complexes))
:param aligned_agent: complex fitting LHS of the rule
:return: RHS with replaced agents
"""
resulting_rhs = []

rhs_agent = self.agents[self.mid]
rhs_agent = rhs_agent.replace(aligned_agent)

for _ in range(len(self.pairs)):
resulting_rhs.append(deepcopy(rhs_agent))
return resulting_rhs

def reconstruct_complexes_from_match(self, match):
"""
Expand Down

0 comments on commit 60ea994

Please sign in to comment.