Skip to content

Commit

Permalink
Merge pull request #90 from ConsenSys/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
Grégoire Jeanmart authored Oct 2, 2019
2 parents e02415a + 4d2456a commit b80bca9
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 125 deletions.
2 changes: 0 additions & 2 deletions mahuta-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>mahuta-core</artifactId>
<name>Mahuta IPFS Core</name>
<description>Mahuta IPFS - Core module</description>

<parent>
<groupId>net.consensys.mahuta</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,43 @@ public AsynchonousPinningMahutaService(StorageService storageService, IndexingSe
*/
public void run() {
log.debug("Run asynchromous pinning process");

final Query query = Query.newQuery().equals(IndexingService.PINNED_KEY, false);

indexingService.getIndexes().forEach(indexName -> {
log.trace("indexName: {}", indexName);
Page<Metadata> page = null;
do {
PageRequest pageReq = Optional.ofNullable(page)
.map(Page::nextPageRequest)
.orElse(PageRequest.of(0, PAGE_SIZE));

page = indexingService.searchDocuments(indexName, query, pageReq);

page.getElements().forEach(m ->
storageService.getReplicaSet().forEach(pinningService -> {
// Pin
pinningService.pin(m.getContentId());

// Set the flag __pinned to true
indexingService.updateField(indexName, m.getIndexDocId(), IndexingService.PINNED_KEY, true);
})
);
} while(!page.isLast());
});

try {

final Query query = Query.newQuery().equals(IndexingService.PINNED_KEY, false);

indexingService.getIndexes().forEach(indexName -> {
log.trace("indexName: {}", indexName);
Page<Metadata> page = null;
do {
PageRequest pageReq = Optional.ofNullable(page)
.map(Page::nextPageRequest)
.orElse(PageRequest.of(0, PAGE_SIZE));

page = indexingService.searchDocuments(indexName, query, pageReq);

page.getElements().forEach(m -> {
String[] current = new String[1];
try {
// Pin each replica node
storageService.getReplicaSet().forEach(pinningService -> {
current[0] = pinningService.getName();
pinningService.pin(m.getContentId());
});

// Set the flag __pinned to true
indexingService.updateField(indexName, m.getIndexDocId(), IndexingService.PINNED_KEY, true);

} catch(Exception ex) {
log.warn("Error while pinning content during the asynchromous pinning process [node: {}, cid {}]: {} - retry soon",
current[0], m.getContentId(), ex.getMessage());
}
});
} while(!page.isLast());
});

} catch(Exception ex) {
log.error("Error while running the asynchromous pinning process", ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@
*
*/
public interface PinningService {


/**
* Return the name of the service
* @return Name of the service
*/
String getName();

/**
* Pin content
* @param id Content ID (hash, CID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,53 @@
import net.consensys.mahuta.core.utils.ValidatorUtils;

@Slf4j
public class IPFSClusterPinningService implements PinningService {
public class IPFSClusterPinningService implements PinningService {

private static final String BASE_URI = "%s://%s:%s/";
private static final String DEFAULT_PROTOCOL = "http";
private static final String DEFAULT_HOST = "localhost";
private static final int DEFAULT_PORT = 9094;
private static final ObjectMapper mapper = new ObjectMapper();

private final String protocol;
private final String host;
private final Integer port;

private IPFSClusterPinningService(String host, Integer port, String protocol) {
this.host = host;
this.port = port;
this.protocol = protocol;
}

public static IPFSClusterPinningService connect() {
return connect(DEFAULT_HOST, DEFAULT_PORT);
}

public static IPFSClusterPinningService connect(String host, Integer port) {
return connect(host, port, DEFAULT_PROTOCOL);
}

public static IPFSClusterPinningService connect(String host, Integer port, String protocol) {
ValidatorUtils.rejectIfEmpty("host", host);
ValidatorUtils.rejectIfNegative("port", port);
ValidatorUtils.rejectIfDifferentThan("protocol", protocol, "http", "https");

try {
log.trace("call GET {}://{}:{}/id", protocol, host, port);
HttpResponse<String> response = Unirest.get(String.format(BASE_URI + "/id", protocol, host, port))
.asString();
log.info("Connected to IPFS-Cluster [protocol: {}, host: {}, port: {}]: Info {}", protocol, host, port,
response.getBody());

return new IPFSClusterPinningService(host, port, protocol);

} catch (UnirestException ex) {
String msg = String.format("Error whilst connecting to IPFS-Cluster [host: %s, port: %s]", host, port);
log.error(msg, ex);
throw new ConnectionException(msg, ex);
}
}

private static final String BASE_URI = "%s://%s:%s/";
private static final String DEFAULT_PROTOCOL = "http";
private static final String DEFAULT_HOST = "localhost";
private static final int DEFAULT_PORT = 9094;
private static final ObjectMapper mapper = new ObjectMapper();

private final String protocol;
private final String host;
private final Integer port;

private IPFSClusterPinningService(String host, Integer port, String protocol) {
this.host = host;
this.port = port;
this.protocol = protocol;
}

public static IPFSClusterPinningService connect() {
return connect(DEFAULT_HOST, DEFAULT_PORT);
}

public static IPFSClusterPinningService connect(String host, Integer port) {
return connect(host, port, DEFAULT_PROTOCOL);
}

public static IPFSClusterPinningService connect(String host, Integer port, String protocol) {
ValidatorUtils.rejectIfEmpty("host", host);
ValidatorUtils.rejectIfNegative("port", port);
ValidatorUtils.rejectIfDifferentThan("protocol", protocol, "http", "https");

try {
log.trace("call GET {}://{}:{}/id", protocol, host, port);
HttpResponse<String> response = Unirest.get(String.format(BASE_URI + "/id", protocol, host, port)).asString();
log.info("Connected to IPFS-Cluster [protocol: {}, host: {}, port: {}]: Info {}", protocol, host, port, response.getBody());

return new IPFSClusterPinningService(host, port, protocol);

} catch (UnirestException ex) {
String msg = String.format("Error whilst connecting to IPFS-Cluster [host: %s, port: %s]", host, port);
log.error(msg, ex);
throw new ConnectionException(msg, ex);
}
}

@Override
public void pin(String cid) {
log.debug("pin CID {} on IPFS-cluster", cid);
Expand All @@ -79,7 +81,7 @@ public void pin(String cid) {
throw new TechnicalException(msg, ex);
}
}

@Override
public void unpin(String cid) {
log.debug("unpin CID {} on IPFS-cluster", cid);
Expand All @@ -97,28 +99,30 @@ public void unpin(String cid) {
log.error(msg, ex);
throw new TechnicalException(msg, ex);
}

}

@Override
public List<String> getTracked() {
log.debug("get pinned files on IPFS-cluster");

try {

log.trace("GET GET {}://{}:{}/pins", protocol, host, port);
HttpResponse<String> response = Unirest.get(String.format(BASE_URI + "/pins", protocol, host, port))
.asString();
log.debug("response: {}", response);
TrackedResponse result = mapper.readValue(response.getBody(), TrackedResponse.class);

log.debug("get pinned files on IPFS-cluster");
return result.getPins();

} catch (UnirestException | IOException ex) {
log.error("Exception converting HTTP response to JSON", ex);
throw new TechnicalException("Exception converting HTTP response to JSON", ex);
}
}

@Override
public List<String> getTracked() {
log.debug("get pinned files on IPFS-cluster");

try {
log.trace("GET GET {}://{}:{}/pins", protocol, host, port);
HttpResponse<String> response = Unirest.get(String.format(BASE_URI + "/pins", protocol, host, port))
.asString();
log.debug("response: {}", response);
TrackedResponse result = mapper.readValue(response.getBody(), TrackedResponse.class);

log.debug("get pinned files on IPFS-cluster");
return result.getPins();

} catch (UnirestException | IOException ex) {
log.error("Exception converting HTTP response to JSON", ex);
throw new TechnicalException("Exception converting HTTP response to JSON", ex);
}
}

@Override
public String getName() {
return "ipfs-cluster ["+host + ":" + port +"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,50 +202,50 @@ public void pin(String cid) {

ValidatorUtils.rejectIfEmpty("cid", cid);

Failsafe.with(retryPolicy)
.onFailure(event -> log.error("Exception pinning cid {} on IPFS after {} attemps", cid, event.getAttemptCount()))
.onSuccess(event -> log.debug("CID {} pinned on IPFS", cid))
.run(() -> {
try {
Multihash hash = Multihash.fromBase58(cid);
this.ipfs.pin.add(hash);
} catch (Exception ex) {
log.error("Exception pinning cid [cid: {}]", cid, ex);
throw new TechnicalException("Exception pinning cid [cid: " + cid + "]", ex);
}
});
}
try {
Multihash hash = Multihash.fromBase58(cid);
this.ipfs.pin.add(hash);

} catch (Exception ex) {
log.error("Exception pinning cid {} on IPFS", cid, ex);
throw new TechnicalException("Exception pinning cid " +cid+ " on IPFS", ex);
}
}

@Override
public void unpin(String cid) {
log.debug("Unpin CID {} on IPFS", cid);

ValidatorUtils.rejectIfEmpty("cid", cid);

Failsafe.with(retryPolicy)
.onFailure(event -> log.error("Exception unpinning cid {} on IPFS after {} attemps", cid, event.getAttemptCount()))
.onSuccess(event -> log.debug("CID {} unpinned on IPFS", cid))
.run(() -> {
Multihash hash = Multihash.fromBase58(cid);
this.ipfs.pin.rm(hash);
});
try {
Multihash hash = Multihash.fromBase58(cid);
this.ipfs.pin.rm(hash);

} catch (Exception ex) {
log.error("Exception unpinning cid {} on IPFS", cid, ex);
throw new TechnicalException("Exception unpinning cid " +cid+ " on IPFS", ex);
}
}

@Override
public List<String> getTracked() {

log.debug("Get pinned files on IPFS");

return Failsafe.with(retryPolicy)
.onFailure(event -> log.error("Exception getting pinned files on IPFS after {} attemps", event.getAttemptCount()))
.onSuccess(event -> log.debug("Get pinned files on IPFS: {}", event.getResult()))
.get(() -> {
Map<Multihash, Object> cids = this.ipfs.pin.ls(PinType.all);

return cids.entrySet().stream()
.map(e-> e.getKey().toBase58())
.collect(Collectors.toList());
});
try {
Map<Multihash, Object> cids = this.ipfs.pin.ls(PinType.all);

return cids.entrySet().stream()
.map(e-> e.getKey().toBase58())
.collect(Collectors.toList());

} catch (Exception ex) {
log.error("Exception getting pinned files on IPFS", ex);
throw new TechnicalException("Exception getting pinned files on IPFS", ex);
}


}

@Override
Expand Down Expand Up @@ -351,4 +351,10 @@ private List<MerkleNode> add(NamedStreamable.ByteArrayWrapper file) throws IOExc
.collect(Collectors.toList());
}
}

@Override
public String getName() {
return "ipfs node [" + settings.getHost() + ":" + settings.getPort() + "]";
}

}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<logback.version>1.2.3</logback.version>
<common-io.version>2.6</common-io.version>
<testcontainers.version>1.10.4</testcontainers.version>
<jackson.version>2.9.9.2</jackson.version>
<jackson.version>2.9.10</jackson.version>
<powermock.version>2.0.0</powermock.version>
<mockneat.version>0.3.0</mockneat.version>
<failsafe.version>2.0.0</failsafe.version>
Expand Down

0 comments on commit b80bca9

Please sign in to comment.