diff --git a/changes/add_group_onreject.md b/changes/add_group_onreject.md new file mode 100644 index 000000000..cbf4355bf --- /dev/null +++ b/changes/add_group_onreject.md @@ -0,0 +1 @@ +* Add `OnReject` support to `Group` clauses diff --git a/language.md b/language.md index bdfa2be23..d45ab33b1 100644 --- a/language.md +++ b/language.md @@ -215,8 +215,8 @@ dropped. This reshapes the data. - - `Group` `By` _discriminator1_[`,` ...] [`Where` _condition_] _collectionname1_ `=` _collector1_[`,` ...] - - `Group` `By` _discriminator1_[`,` ...] `Using` _grouper_ _param_ `=` _expr1_[`,` ...] [`With` _output_[`,` ...]] [`Where` _condition_] _collectionname1_ `=` _collector1_[`,` ...] + - `Group` `By` _discriminator1_[`,` ...] [`Where` _condition_] `Into` _collectionname1_ `=` _collector1_[`,` ...] [`OnReject` _reject1_[ _reject2_[ ...]] `Resume`] + - `Group` `By` _discriminator1_[`,` ...] `Using` _grouper_ _param_ `=` _expr1_[`,` ...] [`With` _output_[`,` ...]] [`Where` _condition_] `Into` _collectionname1_ `=` _collector1_[`,` ...] [`OnReject` _reject1_[ _reject2_[ ...]] `Resume`] Performs a grouping of the data. First, rows are collected in subgroups by their _discriminators_. If `Using` is provided, those subgroups are modified by @@ -245,6 +245,9 @@ another section. Each collector can have `Where` filters that limit the collected data. Optionally, a `Where` filter can be applied to all the collectors by providing _condition_. +Rows which are rejected are passed to the rejection handlers. These are +`Monitor` or `Dump` clauses or an `Alert` terminal. Rejection handlers can only +access the discriminators. This reshapes the data. diff --git a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/BaseOliveBuilder.java b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/BaseOliveBuilder.java index 7157314b4..326c57a88 100644 --- a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/BaseOliveBuilder.java +++ b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/BaseOliveBuilder.java @@ -216,11 +216,11 @@ public static void renderSigner( new Method("dynamicSignature", A_OBJECT_TYPE, new Type[] {A_STRING_TYPE, A_OBJECT_TYPE}); protected static final Method METHOD_SIGNATURE_ACCESSOR__STATIC_SIGNATURE = new Method("staticSignature", A_OBJECT_TYPE, new Type[] {A_STRING_TYPE}); - private static final Method METHOD_STREAM__FILTER = + static final Method METHOD_STREAM__FILTER = new Method("filter", A_STREAM_TYPE, new Type[] {A_PREDICATE_TYPE}); private static final Method METHOD_STREAM__MAP = new Method("map", A_STREAM_TYPE, new Type[] {A_FUNCTION_TYPE}); - private static final Method METHOD_STREAM__ON_CLOSE = + static final Method METHOD_STREAM__ON_CLOSE = new Method("onClose", A_BASE_STREAM_TYPE, new Type[] {A_RUNNABLE_TYPE}); private static final Method METHOD_STREAM__PEEK = new Method("peek", A_STREAM_TYPE, new Type[] {A_CONSUMER_TYPE}); @@ -248,7 +248,7 @@ public BaseOliveBuilder(RootBuilder owner, InputFormatDefinition initialFormat) */ public final void call( CallableDefinitionRenderer defineOlive, Stream> arguments) { - final var arglist = arguments.collect(Collectors.toList()); + final var arglist = arguments.toList(); if (arglist.size() != defineOlive.parameters()) { throw new IllegalArgumentException( String.format( @@ -803,7 +803,7 @@ public Renderer pick( } public final RegroupVariablesBuilder regroup( - int line, int column, LoadableValue... capturedVariables) { + int line, int column, FilterBuilder filterBuilder, LoadableValue... capturedVariables) { final var className = String.format("%s/Group_%d_%d", BaseHotloadingCompiler.PACKAGE_INTERNAL, line, column); @@ -830,11 +830,7 @@ public final RegroupVariablesBuilder regroup( collectLambda.push(renderer); renderer.methodGen().invokeStatic(A_RUNTIME_SUPPORT_TYPE, METHOD_REGROUP); - LambdaBuilder.pushVirtual( - renderer, - RegroupVariablesBuilder.METHOD_IS_OK.getName(), - LambdaBuilder.predicate(newType)); - renderer.methodGen().invokeInterface(A_STREAM_TYPE, METHOD_STREAM__FILTER); + filterBuilder.pushFilterMethod(line, column, renderer, newType); }); final var newRenderer = newLambda.renderer(oldType, this::emitSigner); @@ -868,6 +864,7 @@ public final RegroupVariablesBuilder regroupWithGrouper( LambdaBuilder.LambdaType collectorBuilderType, LoadableValue[] grouperCaptures, List> grouperVariables, + FilterBuilder filterBuilder, LoadableValue... capturedVariables) { final var className = String.format( @@ -960,11 +957,7 @@ public Type type() { newLambda.push(renderer); renderer.methodGen().invokeStatic(A_RUNTIME_SUPPORT_TYPE, METHOD_REGROUP_WITH_GROUPER); - LambdaBuilder.pushVirtual( - renderer, - RegroupVariablesBuilder.METHOD_IS_OK.getName(), - LambdaBuilder.predicate(newType)); - renderer.methodGen().invokeInterface(A_STREAM_TYPE, METHOD_STREAM__FILTER); + filterBuilder.pushFilterMethod(line, column, renderer, newType); }); final var newRenderer = newLambda.renderer(oldType, this::emitSigner); diff --git a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/FilterBuilder.java b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/FilterBuilder.java new file mode 100644 index 000000000..1cb6b2296 --- /dev/null +++ b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/FilterBuilder.java @@ -0,0 +1,84 @@ +package ca.on.oicr.gsi.shesmu.compiler; + +import static ca.on.oicr.gsi.shesmu.compiler.BaseOliveBuilder.A_STREAM_TYPE; +import static ca.on.oicr.gsi.shesmu.compiler.BaseOliveBuilder.METHOD_STREAM__FILTER; +import static ca.on.oicr.gsi.shesmu.compiler.BaseOliveBuilder.METHOD_STREAM__ON_CLOSE; + +import java.util.List; +import java.util.TreeSet; +import java.util.stream.Stream; +import org.objectweb.asm.Opcodes; +import org.objectweb.asm.Type; +import org.objectweb.asm.commons.GeneratorAdapter; + +public interface FilterBuilder { + FilterBuilder SIMPLE = + (line, column, renderer, streamType) -> { + LambdaBuilder.pushVirtual( + renderer, + RegroupVariablesBuilder.METHOD_IS_OK.getName(), + LambdaBuilder.predicate(streamType)); + renderer.methodGen().invokeInterface(A_STREAM_TYPE, METHOD_STREAM__FILTER); + }; + + static FilterBuilder of(List rejectHandlers) { + if (rejectHandlers.isEmpty()) { + return SIMPLE; + } + final var freeVariables = new TreeSet(); + for (final var handler : rejectHandlers) { + handler.collectFreeVariables(freeVariables); + } + return (line, column, renderer, streamType) -> { + final var captures = + rejectHandlers.stream() + .flatMap(handler -> handler.requiredCaptures(renderer.root())) + .toArray(LoadableValue[]::new); + final var filterBuilder = + new LambdaBuilder( + renderer.root(), + String.format("Okay? %d:%d 🔍", line, column), + LambdaBuilder.predicate(streamType), + Stream.concat( + Stream.of(captures), + renderer.allValues().filter(v -> freeVariables.contains(v.name()))) + .toArray(LoadableValue[]::new)); + + final var filterRenderer = filterBuilder.renderer(streamType, null); + filterRenderer.methodGen().visitCode(); + filterRenderer.loadStream(); + filterRenderer.methodGen().invokeVirtual(streamType, RegroupVariablesBuilder.METHOD_IS_OK); + final var failPath = filterRenderer.methodGen().newLabel(); + filterRenderer.methodGen().ifZCmp(GeneratorAdapter.EQ, failPath); + filterRenderer.methodGen().push(true); + filterRenderer.methodGen().returnValue(); + filterRenderer.methodGen().mark(failPath); + rejectHandlers.forEach(handler -> handler.render(renderer.root(), filterRenderer)); + filterRenderer.methodGen().push(false); + filterRenderer.methodGen().returnValue(); + filterRenderer.methodGen().endMethod(); + + filterBuilder.push(renderer); + renderer.methodGen().invokeInterface(A_STREAM_TYPE, METHOD_STREAM__FILTER); + + final var closeBuilder = + new LambdaBuilder( + renderer.root(), + String.format("Okay? %d:%d 🗑️", line, column), + LambdaBuilder.RUNNABLE, + captures); + final var closeRenderer = closeBuilder.renderer(); + closeRenderer.methodGen().visitCode(); + rejectHandlers.forEach(handler -> handler.renderOnClose(closeRenderer)); + closeRenderer.methodGen().visitInsn(Opcodes.RETURN); + closeRenderer.methodGen().visitMaxs(0, 0); + closeRenderer.methodGen().visitEnd(); + + closeBuilder.push(renderer); + renderer.methodGen().invokeInterface(A_STREAM_TYPE, METHOD_STREAM__ON_CLOSE); + renderer.methodGen().checkCast(A_STREAM_TYPE); + }; + } + + void pushFilterMethod(int line, int column, Renderer renderer, Type streamType); +} diff --git a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/OliveClauseNode.java b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/OliveClauseNode.java index 8b5a29cad..be5eb08ae 100644 --- a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/OliveClauseNode.java +++ b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/OliveClauseNode.java @@ -89,6 +89,7 @@ private interface NodeConstructor { final var inputs = new AtomicReference>>(); final var outputs = new AtomicReference>(); final var where = new AtomicReference>(); + final var handlers = new AtomicReference>(List.of()); var result = parser @@ -139,6 +140,17 @@ private interface NodeConstructor { ip.whitespace() .listEmpty(collectors::set, GroupNode::parse, ',') .whitespace()); + final var rejectResult = result.keyword("OnReject"); + if (rejectResult.isGood()) { + result = + rejectResult + .whitespace() + .list(handlers::set, (rp, ro) -> rp.whitespace().dispatch(REJECT_CLAUSES, ro)) + .whitespace() + .keyword("Resume") + .whitespace(); + } + if (result.isGood()) { if (name.get() == null) { output.accept( @@ -149,7 +161,8 @@ private interface NodeConstructor { parser.column(), collectors.get(), discriminators.get(), - where.get())); + where.get(), + handlers.get())); } else { output.accept( label -> @@ -162,7 +175,8 @@ private interface NodeConstructor { outputs.get(), collectors.get(), discriminators.get(), - where.get())); + where.get(), + handlers.get())); } } return result; diff --git a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/OliveClauseNodeGroup.java b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/OliveClauseNodeGroup.java index 67db46c43..a4bb4ad97 100644 --- a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/OliveClauseNodeGroup.java +++ b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/OliveClauseNodeGroup.java @@ -62,10 +62,11 @@ public static Optional> checkDiscriminators( } private final List children; - protected final int column; + private final int column; private final List discriminators; private final Optional label; - protected final int line; + private final int line; + private final List rejectHandlers; private final Optional where; public OliveClauseNodeGroup( @@ -74,13 +75,15 @@ public OliveClauseNodeGroup( int column, List children, List discriminators, - Optional where) { + Optional where, + List rejectHandlers) { this.label = label; this.line = line; this.column = column; this.children = children; this.discriminators = discriminators; this.where = where; + this.rejectHandlers = rejectHandlers; } @Override @@ -103,6 +106,7 @@ public void collectPlugins(Set pluginFileNames) { discriminators.forEach(discriminator -> discriminator.collectPlugins(pluginFileNames)); children.forEach(child -> child.collectPlugins(pluginFileNames)); where.ifPresent(w -> w.collectPlugins(pluginFileNames)); + rejectHandlers.forEach(r -> r.collectPlugins(pluginFileNames)); } @Override @@ -134,7 +138,7 @@ public Stream dashboard() { } @Override - public final ClauseStreamOrder ensureRoot( + public ClauseStreamOrder ensureRoot( ClauseStreamOrder state, Set signableNames, Consumer addSignableCheck, @@ -169,6 +173,7 @@ public void render( oliveBuilder.regroup( line, column, + FilterBuilder.of(rejectHandlers), oliveBuilder .loadableValues() .filter(value -> freeVariables.contains(value.name())) @@ -183,17 +188,27 @@ public void render( } @Override - public final NameDefinitions resolve( + public NameDefinitions resolve( OliveCompilerServices oliveCompilerServices, NameDefinitions defs, Consumer errorHandler) { + final var rejectDefs = + defs.replaceStream(discriminators.stream().flatMap(DiscriminatorNode::targets), true); var ok = children.stream().filter(child -> child.resolve(defs, defs, errorHandler)).count() == children.size() & discriminators.stream() .filter(discriminator -> discriminator.resolve(defs, errorHandler)) .count() - == discriminators.size(); + == discriminators.size() + & rejectHandlers.stream() + .filter( + handler -> + handler + .resolve(oliveCompilerServices, rejectDefs, errorHandler) + .isGood()) + .count() + == rejectHandlers.size(); ok = ok @@ -246,7 +261,7 @@ public final NameDefinitions resolve( } @Override - public final boolean resolveDefinitions( + public boolean resolveDefinitions( OliveCompilerServices oliveCompilerServices, Consumer errorHandler) { return children.stream() @@ -257,11 +272,17 @@ public final boolean resolveDefinitions( .filter(group -> group.resolveDefinitions(oliveCompilerServices, errorHandler)) .count() == discriminators.size() - & where.map(w -> w.resolveDefinitions(oliveCompilerServices, errorHandler)).orElse(true); + & where.map(w -> w.resolveDefinitions(oliveCompilerServices, errorHandler)).orElse(true) + & rejectHandlers.stream() + .filter( + rejectHandler -> + rejectHandler.resolveDefinitions(oliveCompilerServices, errorHandler)) + .count() + == rejectHandlers.size(); } @Override - public final boolean typeCheck(Consumer errorHandler) { + public boolean typeCheck(Consumer errorHandler) { return children.stream().filter(group -> group.typeCheck(errorHandler)).count() == children.size() && discriminators.stream() @@ -280,6 +301,10 @@ public final boolean typeCheck(Consumer errorHandler) { } return whereOk; }) - .orElse(true); + .orElse(true) + & rejectHandlers.stream() + .filter(rejectHandler -> rejectHandler.typeCheck(errorHandler)) + .count() + == rejectHandlers.size(); } } diff --git a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/OliveClauseNodeGroupWithGrouper.java b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/OliveClauseNodeGroupWithGrouper.java index 506310418..a77d9b2f0 100644 --- a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/OliveClauseNodeGroupWithGrouper.java +++ b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/OliveClauseNodeGroupWithGrouper.java @@ -52,6 +52,7 @@ public static Stream definitions() { protected final int line; private List outputNames; private final List> rawInputExpressions; + private final List rejectHandlers; private final Map typeVariables = new HashMap<>(); private final Optional where; @@ -64,7 +65,8 @@ public OliveClauseNodeGroupWithGrouper( List outputNames, List children, List discriminators, - Optional where) { + Optional where, + List rejectHandlers) { this.label = label; this.line = line; this.column = column; @@ -75,6 +77,7 @@ public OliveClauseNodeGroupWithGrouper( this.children = children; this.discriminators = discriminators; this.where = where; + this.rejectHandlers = rejectHandlers; } @Override @@ -98,6 +101,7 @@ public void collectPlugins(Set pluginFileNames) { discriminators.forEach(discrminator -> discrminator.collectPlugins(pluginFileNames)); inputExpressions.forEach(expression -> expression.collectPlugins(pluginFileNames)); where.ifPresent(w -> w.collectPlugins(pluginFileNames)); + rejectHandlers.forEach(r -> r.collectPlugins(pluginFileNames)); } @Override @@ -290,6 +294,7 @@ public Type type() { lambdaType, grouperCaptures, outputBindings, + FilterBuilder.of(rejectHandlers), oliveBuilder .loadableValues() .filter(value -> freeVariables.contains(value.name())) @@ -335,7 +340,7 @@ public Type type() { } @Override - public final NameDefinitions resolve( + public NameDefinitions resolve( OliveCompilerServices oliveCompilerServices, NameDefinitions defs, Consumer errorHandler) { @@ -384,6 +389,8 @@ public Imyhat type() { ok = false; } } + final var rejectDefs = + defs.replaceStream(discriminators.stream().flatMap(DiscriminatorNode::targets), true); ok = ok && children.stream() @@ -394,7 +401,15 @@ public Imyhat type() { & discriminators.stream() .filter(discriminator -> discriminator.resolve(defs, errorHandler)) .count() - == discriminators.size(); + == discriminators.size() + & rejectHandlers.stream() + .filter( + handler -> + handler + .resolve(oliveCompilerServices, rejectDefs, errorHandler) + .isGood()) + .count() + == rejectHandlers.size(); ok = ok @@ -519,7 +534,14 @@ public final boolean resolveDefinitions( == discriminators.size() & where .map(w -> w.resolveDefinitions(oliveCompilerServices, errorHandler)) - .orElse(true); + .orElse(true) + & rejectHandlers.stream() + .filter( + rejectHandler -> + rejectHandler.resolveDefinitions( + oliveCompilerServices, errorHandler)) + .count() + == rejectHandlers.size(); return ok; } @@ -557,18 +579,22 @@ public final boolean typeCheck(Consumer errorHandler) { ok = ok && where - .map( - w -> { - var whereOk = w.typeCheck(errorHandler); - if (whereOk) { - if (!w.type().isSame(Imyhat.BOOLEAN)) { - w.typeError(Imyhat.BOOLEAN, w.type(), errorHandler); - whereOk = false; - } - } - return whereOk; - }) - .orElse(true); + .map( + w -> { + var whereOk = w.typeCheck(errorHandler); + if (whereOk) { + if (!w.type().isSame(Imyhat.BOOLEAN)) { + w.typeError(Imyhat.BOOLEAN, w.type(), errorHandler); + whereOk = false; + } + } + return whereOk; + }) + .orElse(true) + & rejectHandlers.stream() + .filter(rejectHandler -> rejectHandler.typeCheck(errorHandler)) + .count() + == rejectHandlers.size(); return ok; } diff --git a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/OliveNodeAlert.java b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/OliveNodeAlert.java index 3280057ac..2e5b465f5 100644 --- a/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/OliveNodeAlert.java +++ b/shesmu-server/src/main/java/ca/on/oicr/gsi/shesmu/compiler/OliveNodeAlert.java @@ -78,6 +78,7 @@ public Imyhat type() { protected static final Type A_STRING_ARRAY_TYPE = Type.getType(String[].class); protected static final Type A_STRING_TYPE = Type.getType(String.class); + private final List annotations; private final int column; private final String description; @@ -141,6 +142,7 @@ public void collectFreeVariables(Set freeVariables) { freeVariables.add(SOURCE_LOCATION_LINE); freeVariables.add(SOURCE_LOCATION_COLUMN); freeVariables.add(SOURCE_LOCATION_HASH); + freeVariables.add("Olive Services"); } @Override diff --git a/shesmu-server/src/test/resources/run/group-onreject.shesmu b/shesmu-server/src/test/resources/run/group-onreject.shesmu new file mode 100644 index 000000000..f61524b3c --- /dev/null +++ b/shesmu-server/src/test/resources/run/group-onreject.shesmu @@ -0,0 +1,11 @@ +Version 1; +Input test; + +Olive + Group + By workflow + Into a = Univalued accession + OnReject + Alert alertname = "NotUnivalued", workflow = workflow, value = "true" For 15mins + Resume + Run ok With ok = a != "";