diff --git a/nutils/element.py b/nutils/element.py index 83b730a77..3fcd97567 100644 --- a/nutils/element.py +++ b/nutils/element.py @@ -86,6 +86,18 @@ def edges(self): def children(self): return list(zip(self.child_transforms, self.child_refs)) + def get_from_trans(self, trans): + if not trans: + return self + if trans[0].todims != self.ndims: + raise ValueError('Expected a transform chain that maps to {} dims but got {}.'.format(self.ndims, trans[0].todims)) + if trans[0].fromdims == self.ndims: + return self.child_refs[self.child_transforms.index(trans[0])].get_from_trans(trans[1:]) + elif trans[0].fromdims == self.ndims-1: + return self.edge_refs[self.edge_transforms.index(trans[0])].get_from_trans(trans[1:]) + else: + raise ValueError + @property def connectivity(self): # Nested tuple with connectivity information about edges of children: diff --git a/nutils/topology.py b/nutils/topology.py index c51ef2ba3..966375e8b 100644 --- a/nutils/topology.py +++ b/nutils/topology.py @@ -334,7 +334,7 @@ def refine(self, n): n = n[0] return self if n <= 0 else self.refined.refine(n-1) - def trim(self, levelset, maxrefine, ndivisions=8, name='trimmed', leveltopo=None, *, arguments=None): + def _trim(self, levelset, maxrefine, ndivisions=8, leveltopo=None, *, arguments=None): 'trim element along levelset' if arguments is None: @@ -367,7 +367,18 @@ def trim(self, levelset, maxrefine, ndivisions=8, name='trimmed', leveltopo=None mask[indices] = False refs.append(ref.trim(levels, maxrefine=maxrefine, ndivisions=ndivisions)) log.debug('cache', fcache.stats) - return SubsetTopology(self, refs, newboundary=name) + return refs + + def trim(self, levelset, maxrefine, ndivisions=8, name='trimmed', leveltopo=None, *, arguments=None): + refs = self._trim(levelset, maxrefine, ndivisions, leveltopo, arguments=arguments) + return SubsetTopology(self, refs, newboundary=name) + + @log.withcontext + @types.apply_annotations + def partition(self, levelset:function.asarray, maxrefine:types.strictint, posname:types.strictstr, negname:types.strictstr, *, ndivisions=8, arguments=None): + pos = self._trim(levelset, maxrefine=maxrefine, ndivisions=ndivisions, arguments=arguments) + refs = tuple((pref, bref-pref) for bref, pref in zip(self.references, pos)) + return PartitionedTopology(self, refs, (posname, negname)) def subset(self, topo, newboundary=None, strict=False): 'intersection' @@ -2024,6 +2035,214 @@ def interfaces(self): def getitem(self, item): return WithIdentifierTopology(self._parent.getitem(item), self._token) +class PartitionedTopology(DisjointUnionTopology): + + __slots__ = 'basetopo', 'refs', 'names', 'nparts', '_parts' + __cache__ = 'boundary', 'interfaces' + + @types.apply_annotations + def __init__(self, basetopo:stricttopology, refs:types.tuple[types.tuple[element.strictreference]], names:types.tuple[types.strictstr]): + if len(refs) != len(basetopo): + raise ValueError('Expected {} refs tuples but got {}.'.format(len(basetopo), len(refs))) + self.nparts = len(refs[0]) if refs else len(names) + if not all(len(r) == self.nparts for r in refs): + raise ValueError('Variable number of parts.') + if len(names) != self.nparts: + raise ValueError('Expected {} names, one for every part, but got {}.'.format(self.nparts, len(names))) + if any(':' in name for name in names): + raise ValueError('Names may not contain colons.') + if self.nparts == 0: + raise ValueError('A partition consists of at least one part, but got zero.') + assert all(functools.reduce(operator.or_, prefs) == bref for bref, prefs in zip(basetopo.references, refs)), 'not a partition: union of parts is smaller then base' + + self.basetopo = basetopo + self.refs = refs + self.names = names + + indices = tuple(types.frozenarray(numpy.where(list(map(bool, prefs)))[0]) for prefs in zip(*refs)) + self._parts = tuple(WithIdentifierTopology(SubsetTopology(basetopo, prefs), name) for name, prefs in zip(names, zip(*refs))) + super().__init__(self._parts, names) + + def getitem(self, item): + if item in self.names: + return _SubsetOfPartitionedTopology(self, {item}) + else: + topo = self.basetopo.getitem(item) + refs = tuple(tuple(ref & bref for ref in self.refs[self.basetopo.transforms.index(trans)]) for bref, trans in zip(topo.references, topo.transforms)) + return PartitionedTopology(topo, refs, self.names) + + @property + def boundary(self): + baseboundary = self.basetopo.boundary + brefs = [] + for bref, btrans in zip(baseboundary.references, baseboundary.transforms): + ielem, etrans = self.basetopo.transforms.index_with_tail(btrans) + brefs.append(tuple(pref.get_from_trans(etrans) for pref in self.refs[ielem])) + return PartitionedTopology(baseboundary, brefs, self.names) + + @property + def interfaces(self): + baseifaces = self.basetopo.interfaces + basereferences = {(a, b): [] for a in self.names for b in self.names} + baseindices = {(a, b): [] for a in self.names for b in self.names} + for ieelem, (eref, etrans, oppetrans) in enumerate(zip(baseifaces.references, baseifaces.transforms, baseifaces.opposites)): + ielem, tail = self.basetopo.transforms.index_with_tail(etrans) + ioppelem, opptail = self.basetopo.transforms.index_with_tail(oppetrans) + erefs = tuple(filter(lambda item: item[1], ((i, ref.get_from_trans(tail)) for i, ref in zip(self.names, self.refs[ielem])))) + opperefs = tuple(filter(lambda item: item[1], ((i, ref.get_from_trans(opptail)) for i, ref in zip(self.names, self.refs[ioppelem])))) + checkeref = eref.empty + for aname, aeref in erefs: + for bname, beref in opperefs: + parteref = aeref & beref + if parteref: + basereferences[aname, bname].append(parteref) + baseindices[aname, bname].append(ieelem) + checkeref |= parteref + assert checkeref == eref + baseindices = {p: types.frozenarray(i, dtype=int) for p, i in baseindices.items()} + + newreferences = {(a, b): [] for i, a in enumerate(self.names) for b in self.names[i+1:]} + newtransforms = {(a, b): [] for i, a in enumerate(self.names) for b in self.names[i+1:]} + newopposites = {(a, b): [] for i, a in enumerate(self.names) for b in self.names[i+1:]} + for baseref, partrefs, basetrans in zip(self.basetopo.references, self.refs, self.basetopo.transforms): + pool = {} + for aname, aref in zip(self.names, partrefs): + if not aref: + continue + for aetrans, aeref in aref.edges[baseref.nedges:]: + if not aeref: + continue + points = types.frozenarray(aetrans.apply(aeref.getpoints('bezier', 2).coords)) + bname, beref, betrans = pool.pop((points, not aetrans.isflipped), (None, None, None)) + if beref is None: + pool[(points, aetrans.isflipped)] = aname, aeref, aetrans + else: + assert aname != bname, 'elements are not supposed to count internal interfaces as edges' + assert aeref == beref + atrans = basetrans + (aetrans, transform.Identifier(self.ndims-1, aname)) + btrans = basetrans + (betrans, transform.Identifier(self.ndims-1, bname)) + if self.names.index(aname) <= self.names.index(bname): + iface = aname, bname + else: + iface = bname, aname + atrans, btrans = btrans, atrans + newreferences[iface].append(aeref) + newtransforms[iface].append(atrans) + newopposites[iface].append(btrans) + assert not pool, 'some interal edges have no opposites' + + itopos = [] + inames = [] + for i, a in enumerate(self.names): + itopos.append(Topology(elementseq.asreferences(basereferences[a, a], self.ndims-1), + transformseq.WithIdentifierTransforms(baseifaces.transforms[baseindices[a, a]], a), + transformseq.WithIdentifierTransforms(baseifaces.opposites[baseindices[a, a]], a))) + inames.append('{0}:{0}'.format(a)) + for b in self.names[i+1:]: + base = Topology(elementseq.asreferences(basereferences[a, b] + basereferences[b, a], self.ndims-1), + transformseq.WithIdentifierTransforms(transformseq.chain((baseifaces.transforms[baseindices[a, b]], baseifaces.opposites[baseindices[b, a]]), self.ndims-1), a), + transformseq.WithIdentifierTransforms(transformseq.chain((baseifaces.opposites[baseindices[a, b]], baseifaces.transforms[baseindices[b, a]]), self.ndims-1), b)) + new = Topology(elementseq.asreferences(newreferences[a, b], self.ndims-1), + transformseq.PlainTransforms(newtransforms[a, b], self.ndims-1), + transformseq.PlainTransforms(newopposites[a, b], self.ndims-1)) + itopos.append(DisjointUnionTopology((base, new))) + inames.append('{}:{}'.format(a, b)) + return DisjointUnionTopology(itopos, inames) + + def __sub__(self, other): + if self == other: + return EmptyTopology(self.ndims) + elif isinstance(other, _SubsetOfPartitionedTopology) and other._partition == self: + remainder = frozenset(self.names) - frozenset(other._names) + if remainder: + return _SubsetOfPartitionedTopology(self, remainder) + else: + return EmptyTopology(self.ndims) + else: + return super().__sub__(other) + +class _SubsetOfPartitionedTopology(DisjointUnionTopology): + + __slots__ = '_partition', '_names' + __cache__ = 'boundary', 'interfaces' + + @types.apply_annotations + def __init__(self, partition: stricttopology, names: frozenset): + self._partition = partition + if not names <= frozenset(partition.names): + raise ValueError('Not a subset of the partition.') + if not all(isinstance(name, str) for name in names): + raise ValueError('All names should be str objects.') + self._names = tuple(sorted(names, key=partition.names.index)) + super().__init__(tuple(self._partition._parts[self._partition.names.index(name)] for name in self._names), self._names) + + def __getitem__(self, item): + if item in self._names: + return _SubsetOfPartitionedTopology(self._partition, {item}) + elif item in self._partition.names: + return EmptyTopology(self.ndims) + else: + topo = self._partition.getitem(item) + assert not isinstance(topo, _SubsetOfPartitionedTopology) # this is covered by the above two conditionals + if isinstance(topo, EmptyTopology): + return topo + elif isinstance(topo, PartitionedTopology): + return _SubsetOfPartitionedTopology(topo, self._names) + else: + raise NotImplementedError + + @property + def boundary(self): + # The boundary of this subset consists of the boundary of the base that + # touches this subset and the interfaces between all parts in this subset + # and all parts not in this subset. All interfaces are grouped and named by + # the parts not in this subset: given a partition A, B of Ω, then + # `Ω['A'].boundary['B']` is the same as `Ω.interfaces['A:B']` or + # `~Ω.interfaces['B:A']`, whichever exists. + topos = [] + names = [] + for b in self._partition.names: # parts not in this subset + if b in self._names: + continue + btopos = [] + for a in self._names: # parts in this subset + if self._partition.names.index(a) <= self._partition.names.index(b): + btopos.append(self._partition.interfaces.getitem('{}:{}'.format(a, b))) + else: + btopos.append(~self._partition.interfaces.getitem('{}:{}'.format(b, a))) + topos.append(DisjointUnionTopology(btopos)) + names.append(b) + for name in self._names: + topos.append(self._partition.boundary.getitem(name)) + groups = {} + return DisjointUnionTopology(topos, names) + + @property + def interfaces(self): + topos = [] + names = [] + for i, a in enumerate(self._names): + for b in self._names[i:]: + topos.append(self._partition.interfaces.getitem('{}:{}'.format(a, b))) + names.append('{}:{}'.format(a, b)) + return DisjointUnionTopology(topos, names) + + def __or__(self, other): + if isinstance(other, _SubsetOfPartitionedTopology) and other._partition == self._partition: + return _SubsetOfPartitionedTopology(self._partition, frozenset(self._names) | frozenset(other._names)) + else: + return super().__or__(other) + + def __rsub__(self, other): + if self._partition == other or self._partition.basetopo == other: + remainder = frozenset(self._partition.names) - frozenset(self._names) + if remainder: + return _SubsetOfPartitionedTopology(self._partition, remainder) + else: + return EmptyTopology(self.ndims) + else: + return super().__rsub__(other) + class PatchBoundary(types.Singleton): __slots__ = 'id', 'dim', 'side', 'reverse', 'transpose' diff --git a/tests/test_finitecell.py b/tests/test_finitecell.py index 53970a15f..c0b2374b7 100644 --- a/tests/test_finitecell.py +++ b/tests/test_finitecell.py @@ -358,6 +358,7 @@ def test_trimtopright(self): self.assertEqual(len(self.domain5.boundary['trimtopright']), 6) +@parametrize class partialtrim(TestCase): # Test setup: @@ -371,8 +372,13 @@ class partialtrim(TestCase): def setUp(self): self.topo, geom = mesh.rectilinear([2,2]) - self.topoA = self.topo.trim(geom[0]-1+geom[1]*(geom[1]-.5), maxrefine=1) - self.topoB = self.topo - self.topoA + if self.method == 'trim': + self.topoA = self.topo.trim(geom[0]-1+geom[1]*(geom[1]-.5), maxrefine=1) + self.topoB = self.topo - self.topoA + elif self.method == 'partition': + self.partitioned = self.topo.partition(geom[0]-1+geom[1]*(geom[1]-.5), maxrefine=1, posname='A', negname='B') + self.topoA = self.partitioned['A'] + self.topoB = self.partitioned['B'] def test_topos(self): self.assertEqual(len(self.topoA), 4) @@ -381,29 +387,33 @@ def test_topos(self): def test_boundaries(self): self.assertEqual(len(self.topoA.boundary), 11) self.assertEqual(len(self.topoB.boundary), 8) - self.assertEqual(len(self.topoA.boundary['trimmed']), 5) - self.assertEqual(len(self.topoB.boundary['trimmed']), 5) + self.assertEqual(len(self.topoA.boundary['B' if self.method == 'partition' else 'trimmed']), 5) + self.assertEqual(len(self.topoB.boundary['A' if self.method == 'partition' else 'trimmed']), 5) def test_interfaces(self): self.assertEqual(len(self.topoA.interfaces), 4) self.assertEqual(len(self.topoB.interfaces), 1) def test_transforms(self): - self.assertEqual(set(self.topoA.boundary['trimmed'].transforms), set(self.topoB.boundary['trimmed'].opposites)) - self.assertEqual(set(self.topoB.boundary['trimmed'].transforms), set(self.topoA.boundary['trimmed'].opposites)) + self.assertEqual(set(self.topoA.boundary['B' if self.method == 'partition' else 'trimmed'].transforms), set(self.topoB.boundary['A' if self.method == 'partition' else 'trimmed'].opposites)) + self.assertEqual(set(self.topoB.boundary['A' if self.method == 'partition' else 'trimmed'].transforms), set(self.topoA.boundary['B' if self.method == 'partition' else 'trimmed'].opposites)) def test_opposites(self): ielem = function.elemwise(self.topo.transforms, numpy.arange(4)) - sampleA = self.topoA.boundary['trimmed'].sample('uniform', 1) - sampleB = self.topoB.boundary['trimmed'].sample('uniform', 1) + sampleA = self.topoA.boundary['B' if self.method == 'partition' else 'trimmed'].sample('uniform', 1) + sampleB = self.topoB.boundary['A' if self.method == 'partition' else 'trimmed'].sample('uniform', 1) self.assertEqual(set(sampleB.eval(ielem)), {0,1}) self.assertEqual(set(sampleB.eval(function.opposite(ielem))), {0,1,2}) self.assertEqual(set(sampleA.eval(ielem)), {0,1,2}) self.assertEqual(set(sampleA.eval(function.opposite(ielem))), {0,1}) + @parametrize.enable_if(lambda method, **kwargs: method != 'partition') def test_baseboundaries(self): # the base implementation should create the correct boundary topology but # without interface opposites and without the trimmed group for topo in self.topoA, self.topoB: alttopo = topology.ConnectedTopology(topo.references, topo.transforms, topo.opposites, topo.connectivity) self.assertEqual(dict(zip(alttopo.boundary.transforms, alttopo.boundary.references)), dict(zip(topo.boundary.transforms, topo.boundary.references))) + +partialtrim(method='trim') +partialtrim(method='partition')