diff --git a/src/main/java/blockchains/iaas/uni/stuttgart/de/BlockchainAccessLayerApplication.java b/src/main/java/blockchains/iaas/uni/stuttgart/de/BlockchainAccessLayerApplication.java index e826e8a..d5972e2 100644 --- a/src/main/java/blockchains/iaas/uni/stuttgart/de/BlockchainAccessLayerApplication.java +++ b/src/main/java/blockchains/iaas/uni/stuttgart/de/BlockchainAccessLayerApplication.java @@ -1,18 +1,8 @@ package blockchains.iaas.uni.stuttgart.de; -import blockchains.iaas.uni.stuttgart.de.management.BlockchainPluginManager; - import lombok.extern.log4j.Log4j2; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.Bean; -import org.springframework.core.env.Environment; - -import java.util.List; - -import static blockchains.iaas.uni.stuttgart.de.Constants.PF4J_AUTOLOAD_PROPERTY; @SpringBootApplication @Log4j2 @@ -21,19 +11,4 @@ public class BlockchainAccessLayerApplication { public static void main(String[] args) { SpringApplication.run(BlockchainAccessLayerApplication.class, args); } - - @Bean - ApplicationRunner loadPlugins(BlockchainPluginManager blockchainPluginManager, @Value("${" + PF4J_AUTOLOAD_PROPERTY + ":false}") String strConf) { - return args -> { - boolean enablePlugins = Boolean.parseBoolean(strConf); - log.debug("{}={}", PF4J_AUTOLOAD_PROPERTY, enablePlugins); - - if (enablePlugins) { - log.info("pf4j.autoLoadPlugins=true -> attempting to enable blockchain adapter plugins"); - blockchainPluginManager.startPlugins(); - } - }; - - } - } diff --git a/src/main/java/blockchains/iaas/uni/stuttgart/de/adaptation/AdapterManager.java b/src/main/java/blockchains/iaas/uni/stuttgart/de/adaptation/AdapterManager.java index fc9c2f8..b884338 100644 --- a/src/main/java/blockchains/iaas/uni/stuttgart/de/adaptation/AdapterManager.java +++ b/src/main/java/blockchains/iaas/uni/stuttgart/de/adaptation/AdapterManager.java @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (c) 2019-2022 Institute for the Architecture of Application System - + * Copyright (c) 2019-2024 Institute for the Architecture of Application System - * University of Stuttgart * Author: Ghareeb Falazi * Co-author: Akshay Patel diff --git a/src/main/java/blockchains/iaas/uni/stuttgart/de/jsonrpc/BalService.java b/src/main/java/blockchains/iaas/uni/stuttgart/de/jsonrpc/BalService.java index c13834e..4197dcf 100644 --- a/src/main/java/blockchains/iaas/uni/stuttgart/de/jsonrpc/BalService.java +++ b/src/main/java/blockchains/iaas/uni/stuttgart/de/jsonrpc/BalService.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2019-2024 Institute for the Architecture of Application System - University of Stuttgart + * Copyright (c) 2019 Institute for the Architecture of Application System - University of Stuttgart * Author: Ghareeb Falazi * * This program and the accompanying materials are made available under the @@ -12,12 +12,15 @@ package blockchains.iaas.uni.stuttgart.de.jsonrpc; import java.util.List; +import java.util.UUID; import blockchains.iaas.uni.stuttgart.de.api.exceptions.InvalidScipParameterException; import blockchains.iaas.uni.stuttgart.de.management.BlockchainManager; import blockchains.iaas.uni.stuttgart.de.api.model.Parameter; import blockchains.iaas.uni.stuttgart.de.api.model.QueryResult; import blockchains.iaas.uni.stuttgart.de.api.model.TimeFrame; +import blockchains.iaas.uni.stuttgart.de.management.tccsci.DistributedTransactionManager; +import blockchains.iaas.uni.stuttgart.de.management.tccsci.DistributedTransactionRepository; import com.github.arteam.simplejsonrpc.core.annotation.JsonRpcMethod; import com.github.arteam.simplejsonrpc.core.annotation.JsonRpcOptional; import com.github.arteam.simplejsonrpc.core.annotation.JsonRpcParam; @@ -32,12 +35,15 @@ public class BalService { private final String blockchainId; private final String smartContractPath; private final BlockchainManager manager; + private final DistributedTransactionManager dtxManager; + private static final String DTX_ID_FIELD_NAME = "dtx_id"; - public BalService(String blockchainType, String blockchainId, String smartContractPath, BlockchainManager manager) { + public BalService(String blockchainType, String blockchainId, String smartContractPath, BlockchainManager manager, DistributedTransactionManager dtxManager) { this.blockchainType = blockchainType; this.blockchainId = blockchainId; this.smartContractPath = smartContractPath; this.manager = manager; + this.dtxManager = dtxManager; } @JsonRpcMethod @@ -52,9 +58,13 @@ public String Invoke( @JsonRpcParam("signature") String signature ) { log.info("SCIP Invoke method is executed!"); - manager.invokeSmartContractFunction(blockchainId, smartContractPath, functionIdentifier, inputs, outputs, - requiredConfidence, callbackUrl, timeoutMillis, correlationId, signature); - + if (inputs.stream().anyMatch(p -> p.getName().equals(DTX_ID_FIELD_NAME))) { + dtxManager.invokeSc(blockchainId, smartContractPath, functionIdentifier, inputs, outputs, + requiredConfidence, callbackUrl, timeoutMillis, correlationId, signature); + } else { + manager.invokeSmartContractFunction(blockchainId, smartContractPath, functionIdentifier, inputs, outputs, + requiredConfidence, callbackUrl, timeoutMillis, correlationId, signature); + } return "OK"; } @@ -94,7 +104,6 @@ public String Unsubscribe(@JsonRpcOptional @JsonRpcParam("functionIdentifier") S throw new InvalidScipParameterException(); } - if (!Strings.isNullOrEmpty(functionIdentifier)) { manager.cancelFunctionSubscriptions(blockchainId, smartContractPath, correlationId, functionIdentifier, parameters); } else { @@ -123,4 +132,29 @@ public QueryResult Query( throw new InvalidScipParameterException(); } + + @JsonRpcMethod + public String Start_Dtx() { + log.info("SCIP-T Start_Dtx method is executed!"); + + return dtxManager.startDtx().toString(); + } + + @JsonRpcMethod + public String Commit_Dtx(@JsonRpcParam(DTX_ID_FIELD_NAME) String dtxId) { + log.info("SCIP-T Commit_Dtx method is executed!"); + UUID uuid = UUID.fromString(dtxId); + dtxManager.commitDtx(uuid); + + return "OK"; + } + + @JsonRpcMethod + public String Abort_Dtx(@JsonRpcParam(DTX_ID_FIELD_NAME) String dtxId) { + log.info("SCIP-T Abort_Dtx method is executed!"); + UUID uuid = UUID.fromString(dtxId); + dtxManager.abortDtx(uuid); + + return "OK"; + } } diff --git a/src/main/java/blockchains/iaas/uni/stuttgart/de/management/BlockchainManager.java b/src/main/java/blockchains/iaas/uni/stuttgart/de/management/BlockchainManager.java index 53b942a..5593356 100644 --- a/src/main/java/blockchains/iaas/uni/stuttgart/de/management/BlockchainManager.java +++ b/src/main/java/blockchains/iaas/uni/stuttgart/de/management/BlockchainManager.java @@ -20,6 +20,7 @@ import blockchains.iaas.uni.stuttgart.de.adaptation.AdapterManager; import blockchains.iaas.uni.stuttgart.de.api.exceptions.*; +import blockchains.iaas.uni.stuttgart.de.api.model.*; import blockchains.iaas.uni.stuttgart.de.management.callback.CallbackManager; import blockchains.iaas.uni.stuttgart.de.management.callback.CamundaMessageTranslator; import blockchains.iaas.uni.stuttgart.de.management.callback.ScipMessageTranslator; @@ -31,12 +32,8 @@ import blockchains.iaas.uni.stuttgart.de.management.model.Subscription; import blockchains.iaas.uni.stuttgart.de.management.model.SubscriptionKey; import blockchains.iaas.uni.stuttgart.de.management.model.SubscriptionType; -import blockchains.iaas.uni.stuttgart.de.api.model.Parameter; -import blockchains.iaas.uni.stuttgart.de.api.model.QueryResult; -import blockchains.iaas.uni.stuttgart.de.api.model.TimeFrame; -import blockchains.iaas.uni.stuttgart.de.api.model.Transaction; -import blockchains.iaas.uni.stuttgart.de.api.model.TransactionState; import com.google.common.base.Strings; +import io.reactivex.Observable; import io.reactivex.disposables.Disposable; import lombok.extern.log4j.Log4j2; import org.springframework.stereotype.Component; @@ -365,41 +362,33 @@ public void invokeSmartContractFunction( final String correlationId, final String signature) throws BalException { - // Validate scip parameters! - if (Strings.isNullOrEmpty(blockchainIdentifier) - || Strings.isNullOrEmpty(smartContractPath) - || Strings.isNullOrEmpty(functionIdentifier) - || timeoutMillis < 0 - || MathUtils.doubleCompare(requiredConfidence, 0.0) < 0 - || MathUtils.doubleCompare(requiredConfidence, 100.0) > 0) { - throw new InvalidScipParameterException(); - } - - final double minimumConfidenceAsProbability = requiredConfidence / 100.0; - final BlockchainAdapter adapter = adapterManager.getAdapter(blockchainIdentifier); - final CompletableFuture future = adapter.invokeSmartContract(smartContractPath, - functionIdentifier, inputs, outputs, minimumConfidenceAsProbability, timeoutMillis); + final CompletableFuture future = this.invokeSmartContractFunction(blockchainIdentifier, smartContractPath, + functionIdentifier, inputs, outputs, requiredConfidence, timeoutMillis, signature); future. thenAccept(tx -> { if (tx != null) { - if (tx.getState() == TransactionState.CONFIRMED || tx.getState() == TransactionState.RETURN_VALUE) { - CallbackManager.getInstance().sendCallback(callbackUrl, - ScipMessageTranslator.getInvocationResponseMessage( - correlationId, - tx.getReturnValues())); - } else {// it is NOT_FOUND (it was dropped from the system due to invalidation) or ERRORED - if (tx.getState() == TransactionState.NOT_FOUND) { + if (callbackUrl != null) { + if (tx.getState() == TransactionState.CONFIRMED || tx.getState() == TransactionState.RETURN_VALUE) { CallbackManager.getInstance().sendCallback(callbackUrl, - ScipMessageTranslator.getAsynchronousErrorResponseMessage( + ScipMessageTranslator.getInvocationResponseMessage( correlationId, - new TransactionNotFoundException("The transaction associated with a function invocation is invalidated after it was mined."))); - } else { - CallbackManager.getInstance().sendCallback(callbackUrl, - ScipMessageTranslator.getAsynchronousErrorResponseMessage( - correlationId, - new InvokeSmartContractFunctionFailure("The smart contract function invocation reported an error."))); + tx.getReturnValues())); + } else {// it is NOT_FOUND (it was dropped from the system due to invalidation) or ERRORED + if (tx.getState() == TransactionState.NOT_FOUND) { + CallbackManager.getInstance().sendCallback(callbackUrl, + ScipMessageTranslator.getAsynchronousErrorResponseMessage( + correlationId, + new TransactionNotFoundException("The transaction associated with a function invocation is invalidated after it was mined."))); + } else { + CallbackManager.getInstance().sendCallback(callbackUrl, + ScipMessageTranslator.getAsynchronousErrorResponseMessage( + correlationId, + new InvokeSmartContractFunctionFailure("The smart contract function invocation reported an error."))); + } } + } else { + log.info("callbackUrl is null"); } } else { log.info("Resulting transaction is null"); @@ -412,6 +401,9 @@ public void invokeSmartContractFunction( CallbackManager.getInstance().sendCallback(callbackUrl, ScipMessageTranslator.getAsynchronousErrorResponseMessage(correlationId, (BalException) e.getCause())); + if (e instanceof ManualUnsubscriptionException || e.getCause() instanceof ManualUnsubscriptionException) { + log.info("Manual unsubscription of SC invocation!"); + } // ManualUnsubscriptionException is also captured here return null; }). @@ -425,6 +417,32 @@ public void invokeSmartContractFunction( SubscriptionManager.getInstance().createSubscription(correlationId, blockchainIdentifier, smartContractPath, subscription); } + public CompletableFuture invokeSmartContractFunction( + final String blockchainIdentifier, + final String smartContractPath, + final String functionIdentifier, + final List inputs, + final List outputs, + final double requiredConfidence, + final long timeoutMillis, + final String signature) throws BalException { + + // Validate scip parameters! + if (Strings.isNullOrEmpty(blockchainIdentifier) + || Strings.isNullOrEmpty(smartContractPath) + || Strings.isNullOrEmpty(functionIdentifier) + || timeoutMillis < 0 + || MathUtils.doubleCompare(requiredConfidence, 0.0) < 0 + || MathUtils.doubleCompare(requiredConfidence, 100.0) > 0) { + throw new InvalidScipParameterException(); + } + + final double minimumConfidenceAsProbability = requiredConfidence / 100.0; + final BlockchainAdapter adapter = adapterManager.getAdapter(blockchainIdentifier); + return adapter.invokeSmartContract(smartContractPath, + functionIdentifier, inputs, outputs, minimumConfidenceAsProbability, timeoutMillis); + } + public void subscribeToEvent( final String blockchainIdentifier, final String smartContractPath, @@ -435,21 +453,13 @@ public void subscribeToEvent( final String callbackUrl, final String correlationIdentifier) { - // Validate scip parameters! - if (Strings.isNullOrEmpty(blockchainIdentifier) - || Strings.isNullOrEmpty(smartContractPath) - || Strings.isNullOrEmpty(eventIdentifier) - || MathUtils.doubleCompare(degreeOfConfidence, 0.0) < 0 - || MathUtils.doubleCompare(degreeOfConfidence, 100.0) > 0) { - throw new InvalidScipParameterException(); - } - final double minimumConfidenceAsProbability = degreeOfConfidence / 100.0; // first, we cancel previous identical subscriptions. this.cancelEventSubscriptions(blockchainIdentifier, smartContractPath, correlationIdentifier, eventIdentifier, outputParameters); - Disposable result = this.adapterManager.getAdapter(blockchainIdentifier) - .subscribeToEvent(smartContractPath, eventIdentifier, outputParameters, minimumConfidenceAsProbability, filter) + + + Disposable result = this.subscribeToEvent(blockchainIdentifier, smartContractPath, eventIdentifier, outputParameters, degreeOfConfidence, filter) .doFinally(() -> { // remove subscription from subscription list SubscriptionManager.getInstance().removeSubscription(correlationIdentifier, blockchainIdentifier, smartContractPath); @@ -469,6 +479,28 @@ public void subscribeToEvent( SubscriptionManager.getInstance().createSubscription(correlationIdentifier, blockchainIdentifier, smartContractPath, subscription); } + public Observable subscribeToEvent(String blockchainIdentifier, + final String smartContractPath, + final String eventIdentifier, + final List outputParameters, + final double degreeOfConfidence, + final String filter) { + // Validate scip parameters! + if (Strings.isNullOrEmpty(blockchainIdentifier) + || Strings.isNullOrEmpty(smartContractPath) + || Strings.isNullOrEmpty(eventIdentifier) + || MathUtils.doubleCompare(degreeOfConfidence, 0.0) < 0 + || MathUtils.doubleCompare(degreeOfConfidence, 100.0) > 0) { + throw new InvalidScipParameterException(); + } + + final double minimumConfidenceAsProbability = degreeOfConfidence / 100.0; + + return this.adapterManager.getAdapter(blockchainIdentifier) + .subscribeToEvent(smartContractPath, eventIdentifier, outputParameters, minimumConfidenceAsProbability, filter); + } + + public void cancelEventSubscriptions(String blockchainId, String smartContractId, String correlationId, String eventIdentifier, List parameters) { // Validate scip parameters! if (Strings.isNullOrEmpty(blockchainId) || Strings.isNullOrEmpty(smartContractId)) { diff --git a/src/main/java/blockchains/iaas/uni/stuttgart/de/management/BlockchainPluginManager.java b/src/main/java/blockchains/iaas/uni/stuttgart/de/management/BlockchainPluginManager.java index 7b131b5..25b6da0 100644 --- a/src/main/java/blockchains/iaas/uni/stuttgart/de/management/BlockchainPluginManager.java +++ b/src/main/java/blockchains/iaas/uni/stuttgart/de/management/BlockchainPluginManager.java @@ -24,24 +24,28 @@ import java.nio.file.Paths; import java.util.List; +import static blockchains.iaas.uni.stuttgart.de.Constants.PF4J_AUTOLOAD_PROPERTY; + @Log4j2 @Component public class BlockchainPluginManager { private PluginManager pluginManager = null; private final String pluginDirStr; + private static final String DEFAULT_PLUGIN_DIR = Paths.get(System.getProperty("user.home"), ".bal").toString(); - private BlockchainPluginManager(@Value("${" + Constants.PF4J_PLUGIN_DIR_PROPERTY + "}") - String pluginDir) { - log.info("Initializing Blockchain Plugin Manager: pluginDir={}.", pluginDir); - this.pluginDirStr = pluginDir; - Path[] dirPaths = new Path[0]; - Path pluginDirPath = getPluginsPath(); + private BlockchainPluginManager(@Value("${" + Constants.PF4J_PLUGIN_DIR_PROPERTY + ":}") + String pluginDir, @Value("${" + PF4J_AUTOLOAD_PROPERTY + ":false}") String strConf) { + log.info("Initializing Blockchain Plugin Manager: pluginDir={}, autoLoadPlugins={}.", pluginDir, strConf); - if (pluginDirPath != null) { - dirPaths = new Path[]{pluginDirPath}; + if (pluginDir == null || pluginDir.trim().isEmpty()) { + log.info("No plugin directory is provided. Using default directory instead: {}", DEFAULT_PLUGIN_DIR); + pluginDir = DEFAULT_PLUGIN_DIR; } - this.pluginManager = new DefaultPluginManager(dirPaths) { + this.pluginDirStr = pluginDir; + Path pluginDirPath = getPluginsPath(); + + this.pluginManager = new DefaultPluginManager(pluginDirPath) { // @Override protected PluginLoader createPluginLoader() { @@ -56,10 +60,10 @@ protected PluginDescriptorFinder createPluginDescriptorFinder() { } }; - if (pluginDirPath == null) { - log.info("Plugin directory not specified. Not loading plugins at startup."); + if (pluginDirPath == null || !Boolean.parseBoolean(strConf)) { + log.info("Plugin directory not specified or auto loading is disabled. Not loading plugins at startup."); } else { - log.info("Attempting to load blockchain adapter plugins from: '{}'...", () -> pluginDirPath); + log.info("Attempting to load blockchain adapter plugins from: '{}'", () -> pluginDirPath); pluginManager.loadPlugins(); startPlugins(); } diff --git a/src/main/java/blockchains/iaas/uni/stuttgart/de/management/model/DistributedTransaction.java b/src/main/java/blockchains/iaas/uni/stuttgart/de/management/model/DistributedTransaction.java new file mode 100644 index 0000000..541e5aa --- /dev/null +++ b/src/main/java/blockchains/iaas/uni/stuttgart/de/management/model/DistributedTransaction.java @@ -0,0 +1,35 @@ +package blockchains.iaas.uni.stuttgart.de.management.model; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +@Setter +@Getter +public class DistributedTransaction { + private final UUID id; + private final List blockchainIds; + private DistributedTransactionState state; + private DistributedTransactionVerdict verdict; + private int yes; + + public DistributedTransaction(UUID id) { + this.id = id; + this.blockchainIds = new ArrayList<>(); + this.state = DistributedTransactionState.AWAITING_REQUESTS; + this.verdict = DistributedTransactionVerdict.NOT_DECIDED; + } + + public DistributedTransaction() { + this(UUID.randomUUID()); + } + + public void addBlockchainId(String blockchainId) { + this.blockchainIds.add(blockchainId); + } + +} diff --git a/src/main/java/blockchains/iaas/uni/stuttgart/de/management/model/DistributedTransactionState.java b/src/main/java/blockchains/iaas/uni/stuttgart/de/management/model/DistributedTransactionState.java new file mode 100644 index 0000000..932782e --- /dev/null +++ b/src/main/java/blockchains/iaas/uni/stuttgart/de/management/model/DistributedTransactionState.java @@ -0,0 +1,8 @@ +package blockchains.iaas.uni.stuttgart.de.management.model; + +public enum DistributedTransactionState { + AWAITING_REQUESTS, + AWAITING_VOTES, + ABORTED, + COMMITTED +} diff --git a/src/main/java/blockchains/iaas/uni/stuttgart/de/management/model/DistributedTransactionVerdict.java b/src/main/java/blockchains/iaas/uni/stuttgart/de/management/model/DistributedTransactionVerdict.java new file mode 100644 index 0000000..8b05b96 --- /dev/null +++ b/src/main/java/blockchains/iaas/uni/stuttgart/de/management/model/DistributedTransactionVerdict.java @@ -0,0 +1,7 @@ +package blockchains.iaas.uni.stuttgart.de.management.model; + +public enum DistributedTransactionVerdict { + NOT_DECIDED, + COMMIT, + ABORT +} diff --git a/src/main/java/blockchains/iaas/uni/stuttgart/de/management/tccsci/DistributedTransactionManager.java b/src/main/java/blockchains/iaas/uni/stuttgart/de/management/tccsci/DistributedTransactionManager.java new file mode 100644 index 0000000..bfa1317 --- /dev/null +++ b/src/main/java/blockchains/iaas/uni/stuttgart/de/management/tccsci/DistributedTransactionManager.java @@ -0,0 +1,222 @@ +/******************************************************************************** + * Copyright (c) 2023-2024 Institute for the Architecture of Application System - + * University of Stuttgart + * Author: Ghareeb Falazi + * + * This program and the accompanying materials are made available under the + * terms the Apache Software License 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +package blockchains.iaas.uni.stuttgart.de.management.tccsci; + +import blockchains.iaas.uni.stuttgart.de.adaptation.AdapterManager; +import blockchains.iaas.uni.stuttgart.de.api.model.*; +import blockchains.iaas.uni.stuttgart.de.management.BlockchainManager; +import blockchains.iaas.uni.stuttgart.de.management.model.DistributedTransaction; +import blockchains.iaas.uni.stuttgart.de.management.model.DistributedTransactionState; +import blockchains.iaas.uni.stuttgart.de.management.model.DistributedTransactionVerdict; +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +@Log4j2 +@Component +public class DistributedTransactionManager { + private final AdapterManager adapterManager; + private final BlockchainManager blockchainManager; + + public DistributedTransactionManager(AdapterManager adapterManager, BlockchainManager blockchainManager){ + this.adapterManager = adapterManager; + this.blockchainManager = blockchainManager; + } + + public UUID startDtx() { + DistributedTransaction tx = new DistributedTransaction(); + DistributedTransactionRepository.getInstance().addDistributedTransaction(tx); + final UUID transactionId = tx.getId(); + log.info("Received start_tx request and generated the following id: {}", () -> transactionId); + return transactionId; + } + + public void invokeSc(final String blockchainIdentifier, + final String smartContractPath, + final String functionIdentifier, + final List inputs, + final List outputs, + final double requiredConfidence, + final String callbackUrl, + final long timeoutMillis, + final String correlationId, + final String signature) { + UUID txId = UUID.fromString(inputs.get(0).getValue()); + log.info("Received invoke_sc request for dtx: {}", txId); + DistributedTransaction dtx = DistributedTransactionRepository.getInstance().getById(txId); + + if (dtx.getState() == DistributedTransactionState.AWAITING_REQUESTS) { + if (!dtx.getBlockchainIds().contains(blockchainIdentifier)) { + ResourceManagerSmartContract rmsc = this.adapterManager.getAdapter(blockchainIdentifier).getResourceManagerSmartContract(); + SmartContractEvent abortEvent = rmsc.getAbortEvent(); + final UUID dtxId = dtx.getId(); + this.blockchainManager.subscribeToEvent(blockchainIdentifier, + rmsc.getSmartContractPath(), + abortEvent.getFunctionIdentifier(), + abortEvent.getOutputs(), + 0.0, + buildEventFilter(abortEvent, dtxId)) + .take(1) + .subscribe(this::handleScError); + log.info("Subscribed to the abort error of blockchain: {} for the dtx: {}", blockchainIdentifier, dtxId); + dtx.getBlockchainIds().add(blockchainIdentifier); + } + + blockchainManager.invokeSmartContractFunction(blockchainIdentifier, smartContractPath, functionIdentifier, inputs, + outputs, requiredConfidence, callbackUrl, timeoutMillis, correlationId, signature); + } + + } + + public void abortDtx(UUID txId) { + log.info("Received abort_dtx request for dtx: {}", txId); + DistributedTransaction dtx = DistributedTransactionRepository.getInstance().getById(txId); + + if (dtx.getState() == DistributedTransactionState.AWAITING_REQUESTS) { + doAbort(txId); + } + } + + public void commitDtx(UUID txId) { + log.info("Received commit_dtx request for dtx: {}", txId); + DistributedTransaction dtx = DistributedTransactionRepository.getInstance().getById(txId); + + if (dtx.getState() == DistributedTransactionState.AWAITING_REQUESTS) { + dtx.setState(DistributedTransactionState.AWAITING_VOTES); + dtx.setYes(0); + List ids = dtx.getBlockchainIds(); + + + for (String blockchainIdentifier : ids) { + ResourceManagerSmartContract rmsc = adapterManager.getAdapter(blockchainIdentifier).getResourceManagerSmartContract(); + SmartContractEvent voteEvent = rmsc.getVoteEvent(); + final UUID dtxId = dtx.getId(); + blockchainManager.subscribeToEvent(blockchainIdentifier, + rmsc.getSmartContractPath(), + voteEvent.getFunctionIdentifier(), + voteEvent.getOutputs(), + 0.0, + buildEventFilter(voteEvent, dtxId)) + .take(1) + .subscribe(occurrence -> handleVoteEvent(occurrence, dtx, ids.size())); + log.info("Subscribed to the Vote event of blockchain: {} for the dtx: {}", blockchainIdentifier, dtxId); + } + + + CompletableFuture.allOf(ids + .stream() + .map(bcId -> invokePrepare(bcId, txId)) + .toList() + .toArray(new CompletableFuture[ids.size()])) + .whenComplete((v, th) -> { + log.info("Invoked prepare* of all RMSCs of dtx: {}", txId); + }); + } + } + + private static String buildEventFilter(SmartContractEvent abortEvent, UUID txId) { + String param1Name = abortEvent.getOutputs().get(0).getName(); + return param1Name + "==\"" + txId.toString() + "\""; + } + + private void handleScError(Occurrence errorDetails) { + String txIdString = errorDetails.getParameters().get(0).getValue(); + log.info("Received an abort event for dtx: {}", txIdString); + UUID txId = UUID.fromString(txIdString); + doAbort(txId); + } + + // todo make synchronized so we do not miss counting votes! + // todo use a better way to get to the special event arguments (a different blockchain system might have a different order!) + private void handleVoteEvent(Occurrence voteDetails, DistributedTransaction tx, int bcCount) { + final UUID txId = tx.getId(); + log.info("Received Vote event for dtx: {}", txId); + boolean isYesVote = Boolean.parseBoolean(voteDetails.getParameters().get(1).getValue()); + + if (!isYesVote) { + doAbort(txId); + } else { + tx.setYes(tx.getYes() + 1); + + if (tx.getYes() == bcCount) { + doCommit(txId); + } + } + } + + private void doAbort(UUID txId) { + log.info("Aborting transaction: {}", txId); + DistributedTransaction tx = DistributedTransactionRepository.getInstance().getById(txId); + tx.setVerdict(DistributedTransactionVerdict.ABORT); + + CompletableFuture.allOf(tx.getBlockchainIds() + .stream() + .map(bcId -> invokeAbort(bcId, txId)) + .toList() + .toArray(new CompletableFuture[tx.getBlockchainIds().size()])) + .whenComplete((v, th) -> { + tx.setState(DistributedTransactionState.ABORTED); + }); + } + + private void doCommit(UUID txId) { + log.info("Committing transaction: {}", txId); + DistributedTransaction tx = DistributedTransactionRepository.getInstance().getById(txId); + tx.setVerdict(DistributedTransactionVerdict.COMMIT); + + CompletableFuture.allOf(tx.getBlockchainIds() + .stream() + .map(bcId -> invokeCommit(bcId, txId)) + .toList() + .toArray(new CompletableFuture[tx.getBlockchainIds().size()])) + .whenComplete((v, th) -> { + tx.setState(DistributedTransactionState.COMMITTED); + }); + } + + + private CompletableFuture invokeAbort(String blockchainId, UUID txId) { + ResourceManagerSmartContract rmsc = adapterManager.getAdapter(blockchainId).getResourceManagerSmartContract(); + SmartContractFunction abortFunction = rmsc.getAbortFunction(); + List functionInputs = abortFunction.getInputs(); + functionInputs.get(0).setValue(txId.toString()); + + return blockchainManager.invokeSmartContractFunction(blockchainId, rmsc.getSmartContractPath(), abortFunction.getFunctionIdentifier(), + functionInputs, abortFunction.getOutputs(), 0.0, 0, null); + } + + private CompletableFuture invokeCommit(String blockchainId, UUID txId) { + ResourceManagerSmartContract rmsc = adapterManager.getAdapter(blockchainId).getResourceManagerSmartContract(); + SmartContractFunction commitFunction = rmsc.getCommitFunction(); + List functionInputs = commitFunction.getInputs(); + functionInputs.get(0).setValue(txId.toString()); + + return blockchainManager.invokeSmartContractFunction(blockchainId, rmsc.getSmartContractPath(), commitFunction.getFunctionIdentifier(), + functionInputs, commitFunction.getOutputs(), 0.0, 0, null); + } + + private CompletableFuture invokePrepare(String blockchainId, UUID txId) { + ResourceManagerSmartContract rmsc = adapterManager.getAdapter(blockchainId).getResourceManagerSmartContract(); + SmartContractFunction prepareFunction = rmsc.getPrepareFunction(); + List functionInputs = prepareFunction.getInputs(); + functionInputs.get(0).setValue(txId.toString()); + + return blockchainManager.invokeSmartContractFunction(blockchainId, rmsc.getSmartContractPath(), prepareFunction.getFunctionIdentifier(), + functionInputs, prepareFunction.getOutputs(), 0.0, 0, null); + } + + +} diff --git a/src/main/java/blockchains/iaas/uni/stuttgart/de/management/tccsci/DistributedTransactionRepository.java b/src/main/java/blockchains/iaas/uni/stuttgart/de/management/tccsci/DistributedTransactionRepository.java new file mode 100644 index 0000000..44876f6 --- /dev/null +++ b/src/main/java/blockchains/iaas/uni/stuttgart/de/management/tccsci/DistributedTransactionRepository.java @@ -0,0 +1,70 @@ +/******************************************************************************** + * Copyright (c) 2023 Institute for the Architecture of Application System - + * University of Stuttgart + * Author: Ghareeb Falazi + * + * This program and the accompanying materials are made available under the + * terms the Apache Software License 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +package blockchains.iaas.uni.stuttgart.de.management.tccsci; + +import blockchains.iaas.uni.stuttgart.de.management.model.DistributedTransaction; +import blockchains.iaas.uni.stuttgart.de.management.model.DistributedTransactionState; +import blockchains.iaas.uni.stuttgart.de.management.model.DistributedTransactionVerdict; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +public class DistributedTransactionRepository { + private final List distributedTransactions = new ArrayList<>(); + private static DistributedTransactionRepository instance; + + private DistributedTransactionRepository() { + + } + + public static DistributedTransactionRepository getInstance() { + if(instance == null) { + instance = new DistributedTransactionRepository(); + } + + return instance; + } + + public void addDistributedTransaction(DistributedTransaction tx) { + if(getById(tx.getId()) == null) { + this.distributedTransactions.add(tx); + } else { + throw new IllegalArgumentException("A distributed transaction with the same id " + tx.getId() + "already exists!"); + } + } + + public DistributedTransaction getById(UUID txId) { + return distributedTransactions.stream().filter(tx->tx.getId().equals(txId)).findFirst().orElse(null); + } + + public List getAll() { + return this.distributedTransactions; + } + + public Collection getByState(DistributedTransactionState state) { + return distributedTransactions.stream().filter(tx->tx.getState().equals(state)).collect(Collectors.toList()); + } + + public Collection getByVerdict(DistributedTransactionVerdict verdict) { + return distributedTransactions.stream().filter(tx->tx.getVerdict().equals(verdict)).collect(Collectors.toList()); + } + + public Collection getByBlockchainId(String bcId) { + return distributedTransactions.stream().filter(tx->tx.getBlockchainIds().contains(bcId)).collect(Collectors.toList()); + } + + +} diff --git a/src/main/java/blockchains/iaas/uni/stuttgart/de/restapi/Controllers/DistributedTransactionsController.java b/src/main/java/blockchains/iaas/uni/stuttgart/de/restapi/Controllers/DistributedTransactionsController.java new file mode 100644 index 0000000..295c52c --- /dev/null +++ b/src/main/java/blockchains/iaas/uni/stuttgart/de/restapi/Controllers/DistributedTransactionsController.java @@ -0,0 +1,49 @@ +/******************************************************************************** + * Copyright (c) 2023-2024 Institute for the Architecture of Application System - + * University of Stuttgart + * + * Author: Ghareeb Falazi + * + * This program and the accompanying materials are made available under the + * terms the Apache Software License 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +package blockchains.iaas.uni.stuttgart.de.restapi.Controllers; + +import blockchains.iaas.uni.stuttgart.de.management.model.DistributedTransaction; +import blockchains.iaas.uni.stuttgart.de.management.tccsci.DistributedTransactionRepository; +import lombok.extern.log4j.Log4j2; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; +import java.util.UUID; + +@RestController() +@RequestMapping("distributed-transactions") +@Log4j2 +public class DistributedTransactionsController { + + @GetMapping() + public List get() { + return DistributedTransactionRepository.getInstance().getAll(); + } + + @GetMapping(path = "/{dtxId}") + public ResponseEntity getSubscriptionDetails(@PathVariable("dtxId") final String dtxId) { + UUID uuid = UUID.fromString(dtxId); + DistributedTransaction dtx = DistributedTransactionRepository.getInstance().getById(uuid); + if (dtx != null) { + return ResponseEntity.ok(dtx); + } else { + return ResponseEntity.notFound().build(); + } + } + +} diff --git a/src/main/java/blockchains/iaas/uni/stuttgart/de/restapi/Controllers/RootController.java b/src/main/java/blockchains/iaas/uni/stuttgart/de/restapi/Controllers/RootController.java index 225c401..72f3d28 100644 --- a/src/main/java/blockchains/iaas/uni/stuttgart/de/restapi/Controllers/RootController.java +++ b/src/main/java/blockchains/iaas/uni/stuttgart/de/restapi/Controllers/RootController.java @@ -13,6 +13,7 @@ import blockchains.iaas.uni.stuttgart.de.jsonrpc.BalService; import blockchains.iaas.uni.stuttgart.de.management.BlockchainManager; +import blockchains.iaas.uni.stuttgart.de.management.tccsci.DistributedTransactionManager; import com.github.arteam.simplejsonrpc.server.JsonRpcServer; import lombok.extern.log4j.Log4j2; import org.springframework.http.ResponseEntity; @@ -24,9 +25,11 @@ @Log4j2 public class RootController { private final BlockchainManager manager; + private final DistributedTransactionManager distributedTransactionManager; - public RootController(BlockchainManager manager) { + public RootController(BlockchainManager manager, DistributedTransactionManager distributedTransactionManager) { this.manager = manager; + this.distributedTransactionManager = distributedTransactionManager; } @PostMapping @@ -34,7 +37,7 @@ public ResponseEntity performJsonRpcCall(@RequestBody String jsonRequest @RequestParam(name = "blockchain") final String blockchainType, @RequestParam(name = "blockchain-id") final String blockchainId, @RequestParam(name = "address") final String smartContractAddress) { - BalService service = new BalService(blockchainType, blockchainId, smartContractAddress, manager); + BalService service = new BalService(blockchainType, blockchainId, smartContractAddress, manager, distributedTransactionManager); JsonRpcServer server = new JsonRpcServer(); String response = server.handle(jsonRequest, service); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 8a10117..af08708 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,3 +1,3 @@ spring.application.name=Blockchain Access Layer -pf4j.autoLoadPlugins=true -pf4j.pluginsDir=C:\\Users\\Ghareeb\\.bal\\plugins \ No newline at end of file +#pf4j.autoLoadPlugins=true +#pf4j.pluginsDir=C:\\Users\\Ghareeb\\.bal\\plugins \ No newline at end of file diff --git a/src/test/java/blockchains/iaas/uni/stuttgart/de/adaptation/TestLoadAdapter.java b/src/test/java/blockchains/iaas/uni/stuttgart/de/adaptation/TestLoadAdapter.java index 4da4726..991ed29 100644 --- a/src/test/java/blockchains/iaas/uni/stuttgart/de/adaptation/TestLoadAdapter.java +++ b/src/test/java/blockchains/iaas/uni/stuttgart/de/adaptation/TestLoadAdapter.java @@ -26,12 +26,14 @@ import java.io.File; import java.io.IOException; import java.net.URISyntaxException; +import java.nio.file.CopyOption; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutionException; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -43,7 +45,7 @@ public class TestLoadAdapter { private static final String NETWORK_NAME = "eth-0"; private static String originalPf4jPath; private final Path pluginPath = Paths.get(Objects.requireNonNull(TestLoadAdapter.class.getClassLoader().getResource("plugins/ethereum.jar")).toURI()); - private final String KEYSTORE_PATH_KEY = "ethereum.keystorePath"; + private final static String KEYSTORE_PATH_KEY = "ethereum.keystorePath"; @Autowired private BlockchainPluginManager pluginManager; @Autowired @@ -52,35 +54,16 @@ public class TestLoadAdapter { public TestLoadAdapter() throws URISyntaxException { } - @BeforeAll - public static void setPf4jDir() throws IOException { - originalPf4jPath = System.getProperty(Constants.PF4J_PLUGIN_DIR_PROPERTY); - String tmpdir = Files.createTempDirectory("pf4j-temp-plugin-directory").toFile().getAbsolutePath(); - log.info("BeforeAll: Changing system property {} from \"{}\" to \"{}\"", Constants.PF4J_PLUGIN_DIR_PROPERTY, originalPf4jPath, tmpdir); - System.setProperty(Constants.PF4J_PLUGIN_DIR_PROPERTY, tmpdir); - } - - @AfterAll - public static void restorePf4Dir() { - - String temp = System.getProperty(Constants.PF4J_PLUGIN_DIR_PROPERTY); - log.info("AfterAll: Changing system property {} from \"{}\" to \"{}\"", Constants.PF4J_PLUGIN_DIR_PROPERTY, temp, originalPf4jPath); - System.setProperty(Constants.PF4J_PLUGIN_DIR_PROPERTY, originalPf4jPath == null ? "" : originalPf4jPath); - } - @BeforeEach - public void setUp() { - clearPluginDirectory(); - } - - @AfterEach - public void tearDown() { + public void setUp() throws IOException { clearPluginDirectory(); } private void loadPlugin() throws IOException { if (pluginManager.getPlugins().stream().filter(p -> "ethereum-plugin".equals(p.getPluginId())).findAny().isEmpty()) { - Path uploadedPluginPath = Paths.get(pluginManager.getPluginsPath() + "/ethereum.jar"); + Path uploadedPluginPath = pluginManager.getPluginsPath().resolve("ethereum.jar"); + log.info("Loading Plugin: Copying {} to {}...", pluginPath, uploadedPluginPath); + Files.createDirectories(pluginManager.getPluginsPath()); Files.copy(pluginPath, uploadedPluginPath); pluginManager.loadJar(pluginPath); } @@ -101,7 +84,7 @@ public void testLoadEthereumPlugin() throws IOException { } @Test - public void testLoadConnectionProfile() throws IOException, URISyntaxException, ExecutionException, InterruptedException { + public void testLoadConnectionProfile() throws IOException, URISyntaxException { loadPlugin(); String pluginId = pluginManager.getPlugins().get(0).getPluginId(); pluginManager.startPlugin(pluginId); @@ -128,12 +111,22 @@ public void testLoadConnectionProfile() throws IOException, URISyntaxException, assertNotNull(adapter); } - private void clearPluginDirectory() { + private void clearPluginDirectory() throws IOException { + log.info("Cleaning up plugin directory from potential plugin files: {}", () -> pluginManager.getPluginsPath()); Path path = pluginManager.getPluginsPath(); - final File[] files = path.toFile().listFiles(); - if (files != null) { - for (File f : files) { - f.delete(); + + if (Files.exists(path)) { + try (Stream files = Files.list(path)) { + files.forEach(filePath -> { + try { + if (Files.isRegularFile(filePath)) { + log.info("Removing file: {}", filePath); + Files.delete(filePath); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } } } diff --git a/src/test/java/blockchains/iaas/uni/stuttgart/de/management/tccsci/DistributedTransactionManagerTest.java b/src/test/java/blockchains/iaas/uni/stuttgart/de/management/tccsci/DistributedTransactionManagerTest.java new file mode 100644 index 0000000..178560a --- /dev/null +++ b/src/test/java/blockchains/iaas/uni/stuttgart/de/management/tccsci/DistributedTransactionManagerTest.java @@ -0,0 +1,410 @@ +package blockchains.iaas.uni.stuttgart.de.management.tccsci; + +import blockchains.iaas.uni.stuttgart.de.adaptation.AdapterManager; +import blockchains.iaas.uni.stuttgart.de.api.exceptions.BalException; +import blockchains.iaas.uni.stuttgart.de.api.interfaces.BlockchainAdapter; +import blockchains.iaas.uni.stuttgart.de.api.model.*; +import blockchains.iaas.uni.stuttgart.de.management.BlockchainManager; +import blockchains.iaas.uni.stuttgart.de.management.model.DistributedTransaction; +import blockchains.iaas.uni.stuttgart.de.management.model.DistributedTransactionState; +import blockchains.iaas.uni.stuttgart.de.management.model.DistributedTransactionVerdict; +import io.reactivex.Observable; +import io.reactivex.ObservableEmitter; +import lombok.extern.log4j.Log4j2; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +import java.util.*; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@SpringBootTest +@Log4j2 +class DistributedTransactionManagerTest { + + static ResourceManagerSmartContract generateResourceManagerSmartContract() { + + Parameter eventParameter0 = new Parameter("txId", "string", null); + Parameter eventParameter1 = new Parameter("vote", "boolean", null); + SmartContractEvent abortEvent = new SmartContractEvent("abortEvent", List.of(eventParameter0)); + SmartContractEvent voteEvent = new SmartContractEvent("voteEvent", List.of(eventParameter0, eventParameter1)); + Parameter inputParameter = new Parameter(); + SmartContractFunction abortFunction = new SmartContractFunction("abort*", List.of(inputParameter), null); + SmartContractFunction commitFunction = new SmartContractFunction("commit*", List.of(inputParameter), null); + SmartContractFunction prepareFunction = new SmartContractFunction("prepare*", List.of(inputParameter), null); + + return new ResourceManagerSmartContract() { + @Override + public SmartContractEvent getAbortEvent() { + return abortEvent; + } + + @Override + public SmartContractEvent getVoteEvent() { + return voteEvent; + } + + @Override + public SmartContractFunction getPrepareFunction() { + return prepareFunction; + } + + @Override + public SmartContractFunction getAbortFunction() { + return abortFunction; + } + + @Override + public SmartContractFunction getCommitFunction() { + return commitFunction; + } + + }; + } + + static AdapterManager generateAdapterManager() { + BlockchainAdapter adapter = mock(BlockchainAdapter.class); + when(adapter.getResourceManagerSmartContract()).thenReturn(generateResourceManagerSmartContract()); + AdapterManager adapterManager = mock(AdapterManager.class); + when(adapterManager.getAdapter(anyString())).thenReturn(adapter); + + return adapterManager; + } + + @Test + void testStartingDtx() { + AdapterManager adapterManager = generateAdapterManager(); + MockBlockchainManager manager = new MockBlockchainManager(adapterManager); + DistributedTransactionManager dManager = new DistributedTransactionManager(adapterManager, manager); + UUID uuid = dManager.startDtx(); + + assertNotNull(uuid); + DistributedTransaction dtx = DistributedTransactionRepository.getInstance().getById(uuid); + assertNotNull(dtx); + assertEquals(DistributedTransactionState.AWAITING_REQUESTS, dtx.getState()); + } + + @Test + void testAwaitingRequestsState() { + /* DtxStart */ + AdapterManager adapterManager = generateAdapterManager(); + MockBlockchainManager manager = new MockBlockchainManager(adapterManager); + DistributedTransactionManager dManager = new DistributedTransactionManager(adapterManager, manager); + UUID uuid = dManager.startDtx(); + + /* DtxInvoke 1 */ + Parameter uuidParameter = new Parameter("txid", "string", uuid.toString()); + dManager.invokeSc("bc1", "sc1", "userF1", List.of(uuidParameter), List.of(), + 0, "callback-url", 0, "ABC", ""); + DistributedTransaction dtx = DistributedTransactionRepository.getInstance().getById(uuid); + assertEquals(1, dtx.getBlockchainIds().size()); + assertTrue(dtx.getBlockchainIds().contains("bc1")); + // we remain in the same state + assertEquals(DistributedTransactionState.AWAITING_REQUESTS, dtx.getState()); + assertEquals(1, manager.functionInvocations.size()); + assertTrue(manager.functionInvocations.containsKey("bc1")); + assertEquals(1, manager.functionInvocations.get("bc1").size()); + + /* DtxInvoke 2 */ + dManager.invokeSc("bc1", "sc2", "userF2", List.of(uuidParameter), List.of(), + 0, "callback-url", 0, "ABC", ""); + dtx = DistributedTransactionRepository.getInstance().getById(uuid); + assertEquals(1, dtx.getBlockchainIds().size()); + assertTrue(dtx.getBlockchainIds().contains("bc1")); + // we remain in the same state + assertEquals(DistributedTransactionState.AWAITING_REQUESTS, dtx.getState()); + assertEquals(1, manager.functionInvocations.size()); + assertTrue(manager.functionInvocations.containsKey("bc1")); + assertEquals(2, manager.functionInvocations.get("bc1").size()); + + /* DtxInvoke 3 */ + dManager.invokeSc("bc2", "sc3", "userF3", List.of(uuidParameter), List.of(), + 0, "callback-url", 0, "ABC", ""); + dtx = DistributedTransactionRepository.getInstance().getById(uuid); + assertEquals(2, dtx.getBlockchainIds().size()); + assertTrue(dtx.getBlockchainIds().contains("bc1")); + assertTrue(dtx.getBlockchainIds().contains("bc2")); + // we remain in the same state + assertEquals(DistributedTransactionState.AWAITING_REQUESTS, dtx.getState()); + assertEquals(2, manager.functionInvocations.size()); + assertTrue(manager.functionInvocations.containsKey("bc1")); + assertEquals(2, manager.functionInvocations.get("bc1").size()); + assertTrue(manager.functionInvocations.containsKey("bc2")); + assertEquals(1, manager.functionInvocations.get("bc2").size()); + + } + + @Test + void testAwaitingVotesStateToCommit() { + /* DtxStart */ + AdapterManager adapterManager = generateAdapterManager(); + MockBlockchainManager manager = new MockBlockchainManager(adapterManager); + DistributedTransactionManager dManager = new DistributedTransactionManager(adapterManager, manager); + UUID uuid = dManager.startDtx(); + + /* DtxInvoke bc1 */ + Parameter uuidParameter = new Parameter("txid", "string", uuid.toString()); + dManager.invokeSc("bc1", "sc1", "userF1", List.of(uuidParameter), List.of(), + 0, "callback-url", 0, "ABC", ""); + + /* DtxInvoke bc2 */ + dManager.invokeSc("bc2", "sc2", "userF2", List.of(uuidParameter), List.of(), + 0, "callback-url", 0, "ABC", ""); + + /* DtxCommit */ + dManager.commitDtx(uuid); + DistributedTransaction dtx = DistributedTransactionRepository.getInstance().getById(uuid); + assertEquals(DistributedTransactionState.AWAITING_VOTES, dtx.getState()); + assertEquals(DistributedTransactionVerdict.NOT_DECIDED, dtx.getVerdict()); + assertEquals(0, dtx.getYes()); + + /* vote 1 */ + assertTrue(manager.emitVotes(uuid, "bc1", true)); + dtx = DistributedTransactionRepository.getInstance().getById(uuid); + assertEquals(DistributedTransactionState.AWAITING_VOTES, dtx.getState()); + assertEquals(DistributedTransactionVerdict.NOT_DECIDED, dtx.getVerdict()); + assertEquals(1, dtx.getYes()); + + + /* vote 2 */ + assertTrue(manager.emitVotes(uuid, "bc2", true)); + dtx = DistributedTransactionRepository.getInstance().getById(uuid); + assertEquals(DistributedTransactionState.COMMITTED, dtx.getState()); + assertEquals(DistributedTransactionVerdict.COMMIT, dtx.getVerdict()); + assertEquals(2, dtx.getYes()); + + } + + @Test + void testAwaitingVotesStateToAbort() { + /* DtxStart */ + AdapterManager adapterManager = generateAdapterManager(); + MockBlockchainManager manager = new MockBlockchainManager(adapterManager); + DistributedTransactionManager dManager = new DistributedTransactionManager(adapterManager, manager); + UUID uuid = dManager.startDtx(); + + /* DtxInvoke bc1 */ + Parameter uuidParameter = new Parameter("txid", "string", uuid.toString()); + dManager.invokeSc("bc1", "sc1", "userF1", List.of(uuidParameter), List.of(), + 0, "callback-url", 0, "ABC", ""); + + /* DtxInvoke bc2 */ + dManager.invokeSc("bc2", "sc2", "userF2", List.of(uuidParameter), List.of(), + 0, "callback-url", 0, "ABC", ""); + + /* DtxCommit */ + dManager.commitDtx(uuid); + DistributedTransaction dtx = DistributedTransactionRepository.getInstance().getById(uuid); + assertEquals(DistributedTransactionState.AWAITING_VOTES, dtx.getState()); + assertEquals(DistributedTransactionVerdict.NOT_DECIDED, dtx.getVerdict()); + assertEquals(0, dtx.getYes()); + + /* yes vote */ + assertTrue(manager.emitVotes(uuid, "bc1", true)); + dtx = DistributedTransactionRepository.getInstance().getById(uuid); + assertEquals(DistributedTransactionState.AWAITING_VOTES, dtx.getState()); + assertEquals(DistributedTransactionVerdict.NOT_DECIDED, dtx.getVerdict()); + assertEquals(1, dtx.getYes()); + + /* no vote */ + assertTrue(manager.emitVotes(uuid, "bc2", false)); + dtx = DistributedTransactionRepository.getInstance().getById(uuid); + assertEquals(DistributedTransactionState.ABORTED, dtx.getState()); + assertEquals(DistributedTransactionVerdict.ABORT, dtx.getVerdict()); + assertEquals(1, dtx.getYes()); + } + + @Test + void testAbortViaError() { + /* DtxStart */ + AdapterManager adapterManager = generateAdapterManager(); + MockBlockchainManager manager = new MockBlockchainManager(adapterManager); + DistributedTransactionManager dManager = new DistributedTransactionManager(adapterManager, manager); + UUID uuid = dManager.startDtx(); + + /* DtxInvoke bc1 */ + Parameter uuidParameter = new Parameter("txid", "string", uuid.toString()); + dManager.invokeSc("bc1", "sc1", "userF1", List.of(uuidParameter), List.of(), + 0, "callback-url", 0, "ABC", ""); + + /* DtxInvoke bc2 */ + dManager.invokeSc("bc2", "sc2", "userF2", List.of(uuidParameter), List.of(), + 0, "callback-url", 0, "ABC", ""); + + /* SC Error */ + manager.emitAborts(uuid, "bc2"); + DistributedTransaction dtx = DistributedTransactionRepository.getInstance().getById(uuid); + assertEquals(DistributedTransactionState.ABORTED, dtx.getState()); + assertEquals(DistributedTransactionVerdict.ABORT, dtx.getVerdict()); + assertEquals(0, dtx.getYes()); + + // At this point the distributed transaction manager is not listening to votes anymore. + /* yes vote */ + assertFalse(manager.emitVotes(uuid, "bc1", true)); + + /* yes vote */ + assertFalse(manager.emitVotes(uuid, "bc2", true)); + } + + @Test + void testAbortViaDtxAbort() { + /* DtxStart */ + AdapterManager adapterManager = generateAdapterManager(); + MockBlockchainManager manager = new MockBlockchainManager(adapterManager); + DistributedTransactionManager dManager = new DistributedTransactionManager(adapterManager, manager); + UUID uuid = dManager.startDtx(); + + /* DtxInvoke bc1 */ + Parameter uuidParameter = new Parameter("txid", "string", uuid.toString()); + dManager.invokeSc("bc1", "sc1", "userF1", List.of(uuidParameter), List.of(), + 0, "callback-url", 0, "ABC", ""); + + /* DtxInvoke bc2 */ + dManager.invokeSc("bc2", "sc2", "userF2", List.of(uuidParameter), List.of(), + 0, "callback-url", 0, "ABC", ""); + + /* SC Error */ + dManager.abortDtx(uuid); + DistributedTransaction dtx = DistributedTransactionRepository.getInstance().getById(uuid); + assertEquals(DistributedTransactionState.ABORTED, dtx.getState()); + assertEquals(DistributedTransactionVerdict.ABORT, dtx.getVerdict()); + assertEquals(0, dtx.getYes()); + + // At this point the distributed transaction manager is not listening to votes anymore. + /* yes vote */ + assertFalse(manager.emitVotes(uuid, "bc1", true)); + + /* yes vote */ + assertFalse(manager.emitVotes(uuid, "bc2", true)); + } + + static class MockBlockchainManager extends BlockchainManager { + + Map> functionInvocations = new HashMap<>(); + Map> eventInvocations = new HashMap<>(); + Map, List>> abortEventEmitters = new HashMap<>(); + Map, List>> voteEventEmitters = new HashMap<>(); + + public MockBlockchainManager(AdapterManager adapterManager) { + super(adapterManager); + } + + + @Override + public CompletableFuture invokeSmartContractFunction( + final String blockchainIdentifier, + final String smartContractPath, + final String functionIdentifier, + final List inputs, + final List outputs, + final double requiredConfidence, + final long timeoutMillis, + final String signature) throws BalException { + log.info("Mock-invoking function: {}.{}.{}", blockchainIdentifier, smartContractPath, functionIdentifier); + SmartContractFunction function = new SmartContractFunction(functionIdentifier, inputs, outputs); + + if (!functionInvocations.containsKey(blockchainIdentifier)) { + functionInvocations.put(blockchainIdentifier, new ArrayList<>()); + } + + functionInvocations.get(blockchainIdentifier).add(function); + + return CompletableFuture.completedFuture(new Transaction()); + } + + @Override + public void invokeSmartContractFunction( + final String blockchainIdentifier, + final String smartContractPath, + final String functionIdentifier, + final List inputs, + final List outputs, + final double requiredConfidence, + final String callbackUrl, + final long timeoutMillis, + final String correlationId, + final String signature) throws BalException { + this.invokeSmartContractFunction(blockchainIdentifier, smartContractPath, + functionIdentifier, inputs, outputs, requiredConfidence, timeoutMillis, signature); + } + + @Override + public Observable subscribeToEvent(String blockchainIdentifier, + final String smartContractPath, + final String eventIdentifier, + final List outputParameters, + final double degreeOfConfidence, + final String filter) { + log.info("Mock-subscribing to event: {}.{}.{} Filter = {}", blockchainIdentifier, smartContractPath, eventIdentifier, filter); + SmartContractEvent event = new SmartContractEvent(eventIdentifier, outputParameters); + String uuIdStr = filter.split("==")[1]; + uuIdStr = uuIdStr.substring(1, uuIdStr.length() - 1); + UUID uuid = UUID.fromString(uuIdStr); + + if (!eventInvocations.containsKey(blockchainIdentifier)) { + eventInvocations.put(blockchainIdentifier, new ArrayList<>()); + } + + eventInvocations.get(blockchainIdentifier).add(event); + Observable observable; + ImmutablePair key = ImmutablePair.of(uuid, blockchainIdentifier); + + if ("abortEvent".equals(eventIdentifier)) { + + if (!abortEventEmitters.containsKey(key)) { + abortEventEmitters.put(key, new ArrayList<>()); + } + + observable = Observable.create(observableEmitter -> { + abortEventEmitters.get(key).add(observableEmitter); + }); + + } else { + if (!voteEventEmitters.containsKey(key)) { + voteEventEmitters.put(key, new ArrayList<>()); + } + + observable = Observable.create(observableEmitter -> { + voteEventEmitters.get(key).add(observableEmitter); + }); + } + + return observable; + } + + boolean emitAborts(UUID txId, String blockchainIdentifier) { + log.info("Emitting aborts for the txid: {} and blockchainId: {}", txId, blockchainIdentifier); + ImmutablePair key = ImmutablePair.of(txId, blockchainIdentifier); + + if (this.abortEventEmitters.containsKey(key)) { + this.abortEventEmitters.get(key).forEach(emitter -> emitter.onNext(new Occurrence(List.of(new Parameter("txId", "string", txId.toString())), ""))); + return true; + } + + return false; + } + + boolean emitVotes(UUID txId, String blockchainIdentifier, boolean isYes) { + log.info("Emitting votes for the txid: {} and blockchainId: {}. Vote={}", txId, blockchainIdentifier, isYes); + ImmutablePair key = ImmutablePair.of(txId, blockchainIdentifier); + Parameter txIdPar = new Parameter("txId", "string", txId.toString()); + Parameter votePar = new Parameter("vote", "boolean", String.valueOf(isYes)); + + if (this.voteEventEmitters.containsKey(key)) { + this.voteEventEmitters.get(key).forEach(emitter -> emitter.onNext(new Occurrence(List.of(txIdPar, votePar), ""))); + return true; + } + + return false; + } + + + } + + +} \ No newline at end of file diff --git a/src/test/resources/gatewayConfiguration.json b/src/test/resources/gatewayConfiguration.json index 8be40e3..2cd6db4 100644 --- a/src/test/resources/gatewayConfiguration.json +++ b/src/test/resources/gatewayConfiguration.json @@ -5,6 +5,7 @@ "keystorePath":"UTC--2019-05-30T11-21-08.970000000Z--90645dc507225d61cb81cf83e7470f5a6aa1215a.json", "keystorePassword":"123456789", "adversaryVotingRatio": 0.2, - "pollingTimeSeconds": 2 + "blockTimeSeconds": 2, + "resourceManagerSmartContractAddress": "0x90645Dc507225d61cB81cF83e7470F5a6AA1215A" } } \ No newline at end of file