diff --git a/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Pipeline.java b/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Pipeline.java index 99c7eda..a677f6b 100644 --- a/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Pipeline.java +++ b/tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Pipeline.java @@ -8,9 +8,11 @@ package io.clusterless.tessellate.pipeline; +import cascading.CascadingException; import cascading.flow.Flow; import cascading.flow.local.LocalFlowConnector; import cascading.flow.local.LocalFlowProcess; +import cascading.flow.stream.duct.DuctException; import cascading.operation.Debug; import cascading.operation.Insert; import cascading.operation.regex.RegexParser; @@ -190,11 +192,36 @@ public Integer run() throws IOException { running.set(true); try { - flow.complete(); + try { + flow.complete(); + } catch (CascadingException e) { + return handleCascadingException(e); + } } finally { running.set(false); } return 0; } + + private Integer handleCascadingException(CascadingException cascadingException) { + Throwable cause = cascadingException.getCause(); + + if (cause instanceof DuctException) { + LOG.error("flow failed with: {}: {}", cause.getMessage(), cause.getCause().getMessage(), cascadingException); + System.err.println("flow failed with: " + cause.getMessage() + ": " + cause.getCause().getMessage()); + return -1; + } + + if (cause instanceof CascadingException) { + LOG.error("flow failed with: {}: {}", cause.getMessage(), cause.getCause().getMessage(), cascadingException); + System.err.println("flow failed with: " + cause.getMessage() + ": " + cause.getCause().getMessage()); + return -1; + } + + LOG.error("flow failed with: {}", cascadingException.getMessage(), cascadingException); + System.err.println("flow failed with: " + cause.getMessage()); + + return -1; + } }