Skip to content

Commit

Permalink
support empty manifests by writing an empty manifest for the sink dat…
Browse files Browse the repository at this point in the history
…aset
  • Loading branch information
cwensel committed Jul 31, 2023
1 parent b7cbb73 commit 18c1da8
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ private Integer executePipeline(PipelineDef pipelineDef) throws IOException {

pipeline.build();

if (pipeline.state() == Pipeline.State.EMPTY_MANIFEST) {
return 0;
}

return pipeline.run();
} finally {
metrics.stop();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2023 Chris K Wensel <[email protected]>. All Rights Reserved.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package io.clusterless.tessellate.factory;

public class ManifestEmptyException extends RuntimeException {
public ManifestEmptyException(String message) {
super(message);
}

public ManifestEmptyException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.clusterless.tessellate.model.Field;
import io.clusterless.tessellate.model.Schema;
import io.clusterless.tessellate.model.Sink;
import io.clusterless.tessellate.model.Source;
import io.clusterless.tessellate.options.PipelineOptions;
import io.clusterless.tessellate.util.Format;
Expand All @@ -36,10 +35,6 @@ public class ManifestReader {
private static final Logger LOG = LoggerFactory.getLogger(ManifestReader.class);
public static final int SHOW_DUPLICATES = 20;

public static ManifestReader from(Sink sink) {
return new ManifestReader(sink.uris());
}

public static ManifestReader from(Source source) {
return new ManifestReader(source);
}
Expand All @@ -53,9 +48,8 @@ public ManifestReader(Source source) {
this.uris = clean(source.uris());
}

public ManifestReader(List<URI> uris) {
this.uris = clean(uris);
this.manifestURI = null;
public boolean isEmptyManifest() {
return manifestURI == null ? uris.isEmpty() : manifestURI.toString().contains("state=empty");
}

public List<URI> uris(PipelineOptions pipelineOptions) throws IOException {
Expand Down Expand Up @@ -84,9 +78,9 @@ public List<URI> uris(PipelineOptions pipelineOptions) throws IOException {

LOG.info("found uris: {}, in manifest: {}", found.size(), manifestURI);

if (found.isEmpty()) {
throw new IllegalStateException("manifest: " + manifestURI + ", is empty");
}
// if (found.isEmpty()) {
// throw new IllegalStateException("manifest: " + manifestURI + ", is empty");
// }

manifestUris = clean(found);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,19 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
* s3://test-clusterless-manifest-086903124729-us-west-2/datasets/name=ingress-python-example-copy/version=20230101/lot={lot}/state={state}{/attempt*}/manifest.json
*/
public class ManifestWriter {
public static ManifestWriter NULL = new ManifestWriter() {
@Override
public void writeSuccess(Properties conf) {
public void writeManifest(Properties conf) {
// do nothing
}
};

public static ManifestWriter from(Dataset dataset, URI uriPrefix) {
if (!(dataset instanceof Sink)) {
return NULL;
Expand Down Expand Up @@ -73,20 +75,30 @@ public ManifestWriter(Sink dataset, URI uriPrefix) {
}
}

public void writeSuccess(Properties conf) throws IOException {
public void writeManifest(Properties conf) throws IOException {
Set<URI> writes = Observed.INSTANCE.writes(uriPrefix);

if (writes.isEmpty()) {
writeManifest(conf, "empty", writes);
} else {
writeManifest(conf, "complete", writes);
}
}

private void writeManifest(Properties conf, String state, Set<URI> writes) throws IOException {
URI complete = template
.expand("lot", lot)
.expand("state", "complete")
.expand("state", state)
.discard("attempt")
.toURI();

Map<String, Object> manifest = new LinkedHashMap<>();

manifest.put("state", "complete");
manifest.put("state", state);
manifest.put("comment", null);
manifest.put("lotId", lot);
manifest.put("uriType", "identifier");
manifest.put("uris", Observed.INSTANCE.writes(uriPrefix));
manifest.put("uris", writes);

Properties properties = new Properties(conf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public Set<URI> reads(URI prefix) {
}

public Set<URI> writes(URI prefix) {
if (prefix == null) {
return new LinkedHashSet<>();
}
return writes.computeIfAbsent(URIs.cleanFileUrls(prefix).toString(), k -> new LinkedHashSet<>());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public static SourceFactory findSourceFactory(PipelineOptions pipelineOptions, S

ManifestReader manifestReader = ManifestReader.from(sourceModel);

if (manifestReader.isEmptyManifest()) {
throw new ManifestEmptyException("manifest is empty: " + sourceModel.manifest());
}

List<URI> uris = manifestReader.uris(pipelineOptions);

sourceModel.uris().addAll(uris);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public int openWritesThreshold() {
public boolean commitResource(Properties conf) throws IOException {
boolean result = super.commitResource(conf);

manifestWriter.writeSuccess(conf);
manifestWriter.writeManifest(conf);

return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.coerce.Coercions;
import io.clusterless.tessellate.factory.SinkFactory;
import io.clusterless.tessellate.factory.SourceFactory;
import io.clusterless.tessellate.factory.TapFactories;
import io.clusterless.tessellate.factory.*;
import io.clusterless.tessellate.model.*;
import io.clusterless.tessellate.options.PipelineOptions;
import io.clusterless.tessellate.util.Format;
Expand All @@ -43,8 +41,17 @@

public class Pipeline {
private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);

public enum State {
NONE,
EMPTY_MANIFEST,
READY,
COMPLETE
}

private final PipelineOptions pipelineOptions;
private final PipelineDef pipelineDef;
private State state = State.NONE;
private Flow flow;
private Properties commonProperties = new Properties();
private LocalFlowProcess localFlowProcess;
Expand All @@ -71,6 +78,10 @@ public LocalFlowProcess flowProcess() {
return localFlowProcess;
}

public State state() {
return state;
}

public Flow flow() {
return flow;
}
Expand All @@ -84,7 +95,22 @@ public boolean isRunning() {
}

public void build() throws IOException {
SourceFactory sourceFactory = TapFactories.findSourceFactory(pipelineOptions, pipelineDef.source());
SourceFactory sourceFactory;
try {
sourceFactory = TapFactories.findSourceFactory(pipelineOptions, pipelineDef.source());
} catch (ManifestEmptyException e) {
SinkFactory sinkFactory = TapFactories.findSinkFactory(pipelineDef.sink());

sinkFactory.applyGlobalProperties(commonProperties);

ManifestWriter manifestWriter = ManifestWriter.from(pipelineDef.sink(), null);

manifestWriter.writeManifest(commonProperties);

state = State.EMPTY_MANIFEST;

return;
}

sourceFactory.applyGlobalProperties(commonProperties);

Expand Down Expand Up @@ -197,6 +223,8 @@ public void build() throws IOException {
.addSource(pipe, sourceTap)
.addSink(pipe, sinkTap)
.addTail(pipe));

state = State.READY;
}

private static void logCurrentFields(Fields currentFields) {
Expand All @@ -205,11 +233,16 @@ private static void logCurrentFields(Fields currentFields) {

public Integer run() throws IOException {

if (flow == null) {
if (state == State.NONE) {
build();
}

if (state != State.READY) {
throw new IllegalStateException("pipeline is not ready to run");
}

running.set(true);

try {
try {
flow.complete();
Expand All @@ -220,6 +253,8 @@ public Integer run() throws IOException {
running.set(false);
}

state = State.COMPLETE;

return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public static URI cleanFileUrls(URI uri) {
}

public static URI makeAbsolute(URI uri) {
if (uri == null) {
return null;
}

if (uri.isAbsolute()) {
return uri;
}
Expand Down

0 comments on commit 18c1da8

Please sign in to comment.