From 463c3f6127ee746cbef48d426706bc156000455e Mon Sep 17 00:00:00 2001 From: Brian Oliver Date: Fri, 23 Sep 2016 11:04:06 -0400 Subject: [PATCH 1/5] Issue #156: Resolved issue where corrupted/missing DistributableEntry meta-data may put an Event Channel into a paused state. Now warns when encounters such entries and continues. --- .../AbstractInterClusterEventChannel.java | 61 +++++++++++-------- .../src/site/markdown/history.md.vm | 6 ++ 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/coherence-eventdistributionpattern/src/main/java/com/oracle/coherence/patterns/eventdistribution/channels/AbstractInterClusterEventChannel.java b/coherence-eventdistributionpattern/src/main/java/com/oracle/coherence/patterns/eventdistribution/channels/AbstractInterClusterEventChannel.java index b04bdedb..215d1503 100644 --- a/coherence-eventdistributionpattern/src/main/java/com/oracle/coherence/patterns/eventdistribution/channels/AbstractInterClusterEventChannel.java +++ b/coherence-eventdistributionpattern/src/main/java/com/oracle/coherence/patterns/eventdistribution/channels/AbstractInterClusterEventChannel.java @@ -199,44 +199,55 @@ private ClusterMetaInfo getSourceClusterMetaInfo(DistributableEntryEvent entryEv */ private boolean isDistributable(DistributableEntryEvent entryEvent) { - // determine the source ClusterMetaInfo - ClusterMetaInfo sourceClusterMetaInfo = getSourceClusterMetaInfo(entryEvent); - - // determine if we should distribute - boolean isLocal = localClusterMetaInfo.equals(sourceClusterMetaInfo); - - if (isLocal) + try { - // The entry is local, meaning it is the result of some update from an application running on this cluster. - // (hence it is always distributed) - if (logger.isLoggable(Level.FINEST)) - { - logger.log(Level.FINEST, - String.format("Distributing a local event from %s", sourceClusterMetaInfo.getUniqueName())); - } + // determine the source ClusterMetaInfo + ClusterMetaInfo sourceClusterMetaInfo = getSourceClusterMetaInfo(entryEvent); - return true; - } - else - { - // The entry is a result of some other cluster distributing to this cluster. If this a leaf cluster we don't - // further distributed the entry. If it's not, we distribute to other EventChannels that are not the source event - if (getDistributionRole() == InterClusterEventChannel.DistributionRole.LEAF) + // determine if we should distribute + boolean isLocal = localClusterMetaInfo.equals(sourceClusterMetaInfo); + + if (isLocal) { + // The entry is local, meaning it is the result of some update from an application running on this cluster. + // (hence it is always distributed) if (logger.isLoggable(Level.FINEST)) { logger.log(Level.FINEST, - String.format("Won't distribute the non-local event %s from %s", entryEvent, - sourceClusterMetaInfo.getUniqueName())); + String.format("Distributing a local event from %s", + sourceClusterMetaInfo.getUniqueName())); } - return false; + return true; } else { - return (!sourceClusterMetaInfo.equals(getTargetClusterMetaInfo())); + // The entry is a result of some other cluster distributing to this cluster. If this a leaf cluster we don't + // further distributed the entry. If it's not, we distribute to other EventChannels that are not the source event + if (getDistributionRole() == InterClusterEventChannel.DistributionRole.LEAF) + { + if (logger.isLoggable(Level.FINEST)) + { + logger.log(Level.FINEST, + String.format("Won't distribute the non-local event %s from %s", + entryEvent, + sourceClusterMetaInfo.getUniqueName())); + } + + return false; + } + else + { + return (!sourceClusterMetaInfo.equals(getTargetClusterMetaInfo())); + } } } + catch (IllegalStateException e) + { + logger.warning("Failed to determine the cluster source information for " + entryEvent + ". Perhaps your attempting to remove a non-existent entry?"); + + return false; + } } diff --git a/coherence-incubator-site/src/site/markdown/history.md.vm b/coherence-incubator-site/src/site/markdown/history.md.vm index 03078fef..ddcefb35 100644 --- a/coherence-incubator-site/src/site/markdown/history.md.vm +++ b/coherence-incubator-site/src/site/markdown/history.md.vm @@ -33,6 +33,12 @@ reverse chronological order. * [Issue #151](https://github.com/coherence-community/coherence-incubator/issues/151): Refactored the use of EasyMock for the latest version. +

coherence-eventdistributionpattern

+ +* [Issue #156](https://github.com/coherence-community/coherence-incubator/issues/156): Event Channels may + transition to "paused" state when encountering an Entry without meta-data, which may be caused by attempting + to remove a non-existent entry. Such requests will log a warning and will be ignored. + --------------------------------------------------------------------------------

Version: 12.3.2 built on 2016-06-29 10:05

From d7caaebcf0c9d7d5df4d462f1a8652b8cb5390cc Mon Sep 17 00:00:00 2001 From: Brian Oliver Date: Wed, 28 Sep 2016 09:15:01 -0400 Subject: [PATCH 2/5] Upgraded to Oracle Bedrock 4.1.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 2c52a74a..548e7ee7 100644 --- a/pom.xml +++ b/pom.xml @@ -150,7 +150,7 @@ 4.12 3.6 1.10.19 - 4.1.0-SNAPSHOT + 4.1.0 12.1.2 ${oracle.coherence.base.version}-0-5 1.6.5 From cf9404882ae93d38d65bca802581699364783e0f Mon Sep 17 00:00:00 2001 From: Brian Oliver Date: Wed, 28 Sep 2016 09:33:01 -0400 Subject: [PATCH 3/5] [maven-release-plugin] prepare release coherence-incubator-12.4.0 --- coherence-commandpattern-examples/pom.xml | 2 +- coherence-commandpattern-tests/pom.xml | 2 +- coherence-commandpattern/pom.xml | 2 +- coherence-common-tests/pom.xml | 2 +- coherence-common/pom.xml | 2 +- coherence-eventdistributionpattern/pom.xml | 2 +- coherence-functorpattern-examples/pom.xml | 2 +- coherence-functorpattern/pom.xml | 2 +- coherence-incubator-all/pom.xml | 2 +- coherence-incubator-site/pom.xml | 2 +- coherence-jvisualvm/pom.xml | 2 +- coherence-messagingpattern-tests/pom.xml | 2 +- coherence-messagingpattern/pom.xml | 2 +- coherence-processingpattern-examples/pom.xml | 2 +- coherence-processingpattern-tests/pom.xml | 2 +- coherence-processingpattern/pom.xml | 2 +- coherence-pushreplicationpattern-examples/pom.xml | 2 +- coherence-pushreplicationpattern-tests/pom.xml | 2 +- coherence-pushreplicationpattern-web-example/pom.xml | 2 +- .../webapp-test/pom.xml | 2 +- coherence-pushreplicationpattern-web-example/webapp/pom.xml | 2 +- .../webserver/pom.xml | 2 +- coherence-pushreplicationpattern/pom.xml | 2 +- pom.xml | 4 ++-- 24 files changed, 25 insertions(+), 25 deletions(-) diff --git a/coherence-commandpattern-examples/pom.xml b/coherence-commandpattern-examples/pom.xml index d766c5e8..3932f373 100644 --- a/coherence-commandpattern-examples/pom.xml +++ b/coherence-commandpattern-examples/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-commandpattern-tests/pom.xml b/coherence-commandpattern-tests/pom.xml index 5e8a05e5..d20af8fd 100644 --- a/coherence-commandpattern-tests/pom.xml +++ b/coherence-commandpattern-tests/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-commandpattern/pom.xml b/coherence-commandpattern/pom.xml index 9f185450..9db637f2 100644 --- a/coherence-commandpattern/pom.xml +++ b/coherence-commandpattern/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-common-tests/pom.xml b/coherence-common-tests/pom.xml index af820f1f..1365222c 100644 --- a/coherence-common-tests/pom.xml +++ b/coherence-common-tests/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-common/pom.xml b/coherence-common/pom.xml index d1cd7ec1..b9edb892 100644 --- a/coherence-common/pom.xml +++ b/coherence-common/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-eventdistributionpattern/pom.xml b/coherence-eventdistributionpattern/pom.xml index 94bbb228..8e131047 100644 --- a/coherence-eventdistributionpattern/pom.xml +++ b/coherence-eventdistributionpattern/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-functorpattern-examples/pom.xml b/coherence-functorpattern-examples/pom.xml index d4c17e12..ef56da59 100644 --- a/coherence-functorpattern-examples/pom.xml +++ b/coherence-functorpattern-examples/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-functorpattern/pom.xml b/coherence-functorpattern/pom.xml index dd4aadaf..ba10ad37 100644 --- a/coherence-functorpattern/pom.xml +++ b/coherence-functorpattern/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-incubator-all/pom.xml b/coherence-incubator-all/pom.xml index 30970042..84752896 100644 --- a/coherence-incubator-all/pom.xml +++ b/coherence-incubator-all/pom.xml @@ -4,7 +4,7 @@ com.oracle.coherence.incubator coherence-incubator - 12.3.3-SNAPSHOT + 12.4.0 coherence-incubator-all diff --git a/coherence-incubator-site/pom.xml b/coherence-incubator-site/pom.xml index be61a37d..a95eca9c 100644 --- a/coherence-incubator-site/pom.xml +++ b/coherence-incubator-site/pom.xml @@ -5,7 +5,7 @@ coherence-incubator com.oracle.coherence.incubator ../pom.xml - 12.3.3-SNAPSHOT + 12.4.0 coherence-incubator-site diff --git a/coherence-jvisualvm/pom.xml b/coherence-jvisualvm/pom.xml index 6090721e..ab2dcb20 100644 --- a/coherence-jvisualvm/pom.xml +++ b/coherence-jvisualvm/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-messagingpattern-tests/pom.xml b/coherence-messagingpattern-tests/pom.xml index 40b3012e..76e60a1b 100644 --- a/coherence-messagingpattern-tests/pom.xml +++ b/coherence-messagingpattern-tests/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-messagingpattern/pom.xml b/coherence-messagingpattern/pom.xml index e83f633c..a1d9dcf4 100644 --- a/coherence-messagingpattern/pom.xml +++ b/coherence-messagingpattern/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-processingpattern-examples/pom.xml b/coherence-processingpattern-examples/pom.xml index f59bd81a..c6040ba2 100755 --- a/coherence-processingpattern-examples/pom.xml +++ b/coherence-processingpattern-examples/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-processingpattern-tests/pom.xml b/coherence-processingpattern-tests/pom.xml index ef8d87db..5f0e21ca 100755 --- a/coherence-processingpattern-tests/pom.xml +++ b/coherence-processingpattern-tests/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-processingpattern/pom.xml b/coherence-processingpattern/pom.xml index b9bdbaa5..bd604b5b 100755 --- a/coherence-processingpattern/pom.xml +++ b/coherence-processingpattern/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-pushreplicationpattern-examples/pom.xml b/coherence-pushreplicationpattern-examples/pom.xml index 4379171a..6d60fe8f 100644 --- a/coherence-pushreplicationpattern-examples/pom.xml +++ b/coherence-pushreplicationpattern-examples/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-pushreplicationpattern-tests/pom.xml b/coherence-pushreplicationpattern-tests/pom.xml index 02a5bec5..b1ecce71 100644 --- a/coherence-pushreplicationpattern-tests/pom.xml +++ b/coherence-pushreplicationpattern-tests/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-pushreplicationpattern-web-example/pom.xml b/coherence-pushreplicationpattern-web-example/pom.xml index 8ba23c7c..4e228c2b 100644 --- a/coherence-pushreplicationpattern-web-example/pom.xml +++ b/coherence-pushreplicationpattern-web-example/pom.xml @@ -5,7 +5,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-pushreplicationpattern-web-example/webapp-test/pom.xml b/coherence-pushreplicationpattern-web-example/webapp-test/pom.xml index 0e662c70..9b038ab4 100644 --- a/coherence-pushreplicationpattern-web-example/webapp-test/pom.xml +++ b/coherence-pushreplicationpattern-web-example/webapp-test/pom.xml @@ -5,7 +5,7 @@ com.oracle.coherence.incubator coherence-pushreplicationpattern-web-example - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-pushreplicationpattern-web-example/webapp/pom.xml b/coherence-pushreplicationpattern-web-example/webapp/pom.xml index a7961a6a..03818192 100644 --- a/coherence-pushreplicationpattern-web-example/webapp/pom.xml +++ b/coherence-pushreplicationpattern-web-example/webapp/pom.xml @@ -5,7 +5,7 @@ com.oracle.coherence.incubator coherence-pushreplicationpattern-web-example - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-pushreplicationpattern-web-example/webserver/pom.xml b/coherence-pushreplicationpattern-web-example/webserver/pom.xml index 39846d35..015d7194 100644 --- a/coherence-pushreplicationpattern-web-example/webserver/pom.xml +++ b/coherence-pushreplicationpattern-web-example/webserver/pom.xml @@ -5,7 +5,7 @@ com.oracle.coherence.incubator coherence-pushreplicationpattern-web-example - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/coherence-pushreplicationpattern/pom.xml b/coherence-pushreplicationpattern/pom.xml index 3f2accf4..309ef61b 100644 --- a/coherence-pushreplicationpattern/pom.xml +++ b/coherence-pushreplicationpattern/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 ../pom.xml diff --git a/pom.xml b/pom.xml index 548e7ee7..1f867173 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.3.3-SNAPSHOT + 12.4.0 pom Coherence Incubator @@ -51,7 +51,7 @@ scm:git:https://github.com/coherence-community/coherence-incubator.git scm:git:https://github.com/coherence-community/coherence-incubator.git https://github.com/coherence-community/coherence-incubator.git - HEAD + coherence-incubator-12.4.0 From 5d57a63535ed06fa2ea88a194188d3acfe7fa97b Mon Sep 17 00:00:00 2001 From: Brian Oliver Date: Wed, 28 Sep 2016 09:33:06 -0400 Subject: [PATCH 4/5] [maven-release-plugin] prepare for next development iteration --- coherence-commandpattern-examples/pom.xml | 2 +- coherence-commandpattern-tests/pom.xml | 2 +- coherence-commandpattern/pom.xml | 2 +- coherence-common-tests/pom.xml | 2 +- coherence-common/pom.xml | 2 +- coherence-eventdistributionpattern/pom.xml | 2 +- coherence-functorpattern-examples/pom.xml | 2 +- coherence-functorpattern/pom.xml | 2 +- coherence-incubator-all/pom.xml | 2 +- coherence-incubator-site/pom.xml | 2 +- coherence-jvisualvm/pom.xml | 2 +- coherence-messagingpattern-tests/pom.xml | 2 +- coherence-messagingpattern/pom.xml | 2 +- coherence-processingpattern-examples/pom.xml | 2 +- coherence-processingpattern-tests/pom.xml | 2 +- coherence-processingpattern/pom.xml | 2 +- coherence-pushreplicationpattern-examples/pom.xml | 2 +- coherence-pushreplicationpattern-tests/pom.xml | 2 +- coherence-pushreplicationpattern-web-example/pom.xml | 2 +- .../webapp-test/pom.xml | 2 +- coherence-pushreplicationpattern-web-example/webapp/pom.xml | 2 +- .../webserver/pom.xml | 2 +- coherence-pushreplicationpattern/pom.xml | 2 +- pom.xml | 4 ++-- 24 files changed, 25 insertions(+), 25 deletions(-) diff --git a/coherence-commandpattern-examples/pom.xml b/coherence-commandpattern-examples/pom.xml index 3932f373..3fef2f18 100644 --- a/coherence-commandpattern-examples/pom.xml +++ b/coherence-commandpattern-examples/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-commandpattern-tests/pom.xml b/coherence-commandpattern-tests/pom.xml index d20af8fd..409145a7 100644 --- a/coherence-commandpattern-tests/pom.xml +++ b/coherence-commandpattern-tests/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-commandpattern/pom.xml b/coherence-commandpattern/pom.xml index 9db637f2..dfb74113 100644 --- a/coherence-commandpattern/pom.xml +++ b/coherence-commandpattern/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-common-tests/pom.xml b/coherence-common-tests/pom.xml index 1365222c..2039da09 100644 --- a/coherence-common-tests/pom.xml +++ b/coherence-common-tests/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-common/pom.xml b/coherence-common/pom.xml index b9edb892..49e77a5c 100644 --- a/coherence-common/pom.xml +++ b/coherence-common/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-eventdistributionpattern/pom.xml b/coherence-eventdistributionpattern/pom.xml index 8e131047..0b6e9467 100644 --- a/coherence-eventdistributionpattern/pom.xml +++ b/coherence-eventdistributionpattern/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-functorpattern-examples/pom.xml b/coherence-functorpattern-examples/pom.xml index ef56da59..24cdfd11 100644 --- a/coherence-functorpattern-examples/pom.xml +++ b/coherence-functorpattern-examples/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-functorpattern/pom.xml b/coherence-functorpattern/pom.xml index ba10ad37..e96468dd 100644 --- a/coherence-functorpattern/pom.xml +++ b/coherence-functorpattern/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-incubator-all/pom.xml b/coherence-incubator-all/pom.xml index 84752896..464cd8d7 100644 --- a/coherence-incubator-all/pom.xml +++ b/coherence-incubator-all/pom.xml @@ -4,7 +4,7 @@ com.oracle.coherence.incubator coherence-incubator - 12.4.0 + 12.4.1-SNAPSHOT coherence-incubator-all diff --git a/coherence-incubator-site/pom.xml b/coherence-incubator-site/pom.xml index a95eca9c..2b26f942 100644 --- a/coherence-incubator-site/pom.xml +++ b/coherence-incubator-site/pom.xml @@ -5,7 +5,7 @@ coherence-incubator com.oracle.coherence.incubator ../pom.xml - 12.4.0 + 12.4.1-SNAPSHOT coherence-incubator-site diff --git a/coherence-jvisualvm/pom.xml b/coherence-jvisualvm/pom.xml index ab2dcb20..94ba6561 100644 --- a/coherence-jvisualvm/pom.xml +++ b/coherence-jvisualvm/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-messagingpattern-tests/pom.xml b/coherence-messagingpattern-tests/pom.xml index 76e60a1b..c061eed5 100644 --- a/coherence-messagingpattern-tests/pom.xml +++ b/coherence-messagingpattern-tests/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-messagingpattern/pom.xml b/coherence-messagingpattern/pom.xml index a1d9dcf4..400d9ef1 100644 --- a/coherence-messagingpattern/pom.xml +++ b/coherence-messagingpattern/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-processingpattern-examples/pom.xml b/coherence-processingpattern-examples/pom.xml index c6040ba2..d148202a 100755 --- a/coherence-processingpattern-examples/pom.xml +++ b/coherence-processingpattern-examples/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-processingpattern-tests/pom.xml b/coherence-processingpattern-tests/pom.xml index 5f0e21ca..301035b2 100755 --- a/coherence-processingpattern-tests/pom.xml +++ b/coherence-processingpattern-tests/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-processingpattern/pom.xml b/coherence-processingpattern/pom.xml index bd604b5b..408f38ac 100755 --- a/coherence-processingpattern/pom.xml +++ b/coherence-processingpattern/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-pushreplicationpattern-examples/pom.xml b/coherence-pushreplicationpattern-examples/pom.xml index 6d60fe8f..1e028668 100644 --- a/coherence-pushreplicationpattern-examples/pom.xml +++ b/coherence-pushreplicationpattern-examples/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-pushreplicationpattern-tests/pom.xml b/coherence-pushreplicationpattern-tests/pom.xml index b1ecce71..33958e9a 100644 --- a/coherence-pushreplicationpattern-tests/pom.xml +++ b/coherence-pushreplicationpattern-tests/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-pushreplicationpattern-web-example/pom.xml b/coherence-pushreplicationpattern-web-example/pom.xml index 4e228c2b..061a87b4 100644 --- a/coherence-pushreplicationpattern-web-example/pom.xml +++ b/coherence-pushreplicationpattern-web-example/pom.xml @@ -5,7 +5,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-pushreplicationpattern-web-example/webapp-test/pom.xml b/coherence-pushreplicationpattern-web-example/webapp-test/pom.xml index 9b038ab4..29266386 100644 --- a/coherence-pushreplicationpattern-web-example/webapp-test/pom.xml +++ b/coherence-pushreplicationpattern-web-example/webapp-test/pom.xml @@ -5,7 +5,7 @@ com.oracle.coherence.incubator coherence-pushreplicationpattern-web-example - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-pushreplicationpattern-web-example/webapp/pom.xml b/coherence-pushreplicationpattern-web-example/webapp/pom.xml index 03818192..3b4b4758 100644 --- a/coherence-pushreplicationpattern-web-example/webapp/pom.xml +++ b/coherence-pushreplicationpattern-web-example/webapp/pom.xml @@ -5,7 +5,7 @@ com.oracle.coherence.incubator coherence-pushreplicationpattern-web-example - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-pushreplicationpattern-web-example/webserver/pom.xml b/coherence-pushreplicationpattern-web-example/webserver/pom.xml index 015d7194..d19864df 100644 --- a/coherence-pushreplicationpattern-web-example/webserver/pom.xml +++ b/coherence-pushreplicationpattern-web-example/webserver/pom.xml @@ -5,7 +5,7 @@ com.oracle.coherence.incubator coherence-pushreplicationpattern-web-example - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/coherence-pushreplicationpattern/pom.xml b/coherence-pushreplicationpattern/pom.xml index 309ef61b..85c2ebdd 100644 --- a/coherence-pushreplicationpattern/pom.xml +++ b/coherence-pushreplicationpattern/pom.xml @@ -4,7 +4,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT ../pom.xml diff --git a/pom.xml b/pom.xml index 1f867173..b102202c 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ coherence-incubator com.oracle.coherence.incubator - 12.4.0 + 12.4.1-SNAPSHOT pom Coherence Incubator @@ -51,7 +51,7 @@ scm:git:https://github.com/coherence-community/coherence-incubator.git scm:git:https://github.com/coherence-community/coherence-incubator.git https://github.com/coherence-community/coherence-incubator.git - coherence-incubator-12.4.0 + HEAD From 5466f7b6b8aa3829d329fb257a523b9b06fc32ca Mon Sep 17 00:00:00 2001 From: Brian Oliver Date: Tue, 21 Feb 2017 15:01:35 -0500 Subject: [PATCH 5/5] Issue #49: Resolved inappropriate use of CacheFactory.ensureCluster(), replacing with CacheFactory.getCluster() Issue #159: Introduced ability to provide a ConfigurableCacheFactory when creating a ProcessingSession Issue #160: Ensure consistent use of ClassLoaders based on calling context Issue #161: Ensure Processing Pattern is initialized using the Cache Configuration LifecycleEvents Issue #162: Introduce Shared ExecutorService for internal background tasks Issue #163: Resolves fail-over/fail-back of Grid-based Tasks --- .../src/site/markdown/history.md.vm | 29 ++ .../processing/taskprocessor/RestartTask.java | 56 ++-- .../taskprocessor/RollingRestartTest.java | 213 +++++++------- ...processingpattern-restart-cache-config.xml | 2 +- .../builder/TaskDispatchPolicyBuilder.java | 3 +- .../ProcessingPatternConfigProcessor.java | 37 ++- .../dispatchers/AbstractDispatcher.java | 4 +- .../task/DefaultTaskDispatcher.java | 21 +- .../internal/DefaultProcessingSession.java | 74 +++-- .../DefaultSubmissionConfiguration.java | 3 +- .../internal/DefaultSubmissionResult.java | 6 +- .../internal/LifecycleInterceptor.java | 31 +- .../internal/ProcessingPattern.java | 9 +- .../task/DefaultServerLeaseMonitor.java | 4 +- .../task/DefaultTaskProcessorDefinition.java | 3 +- ...DefaultTaskProcessorDefinitionManager.java | 12 +- .../task/DefaultTaskProcessorMediator.java | 104 +++---- .../internal/task/RecoverTasks.java | 56 ++-- .../task/TaskProcessorMBeanManager.java | 6 +- .../internal/task/TaskProcessorMediator.java | 8 - .../TaskProcessorMediatorInterceptor.java | 264 ++++++++++++++++-- .../processing/taskprocessor/TaskRunner.java | 2 +- pom.xml | 4 +- 23 files changed, 628 insertions(+), 323 deletions(-) diff --git a/coherence-incubator-site/src/site/markdown/history.md.vm b/coherence-incubator-site/src/site/markdown/history.md.vm index ddcefb35..fc7ef9fa 100644 --- a/coherence-incubator-site/src/site/markdown/history.md.vm +++ b/coherence-incubator-site/src/site/markdown/history.md.vm @@ -10,6 +10,35 @@ reverse chronological order. * Brian Oliver +

coherence-processingpattern

+ +* [Issue #49](https://github.com/coherence-community/coherence-incubator/issues/49): + Resolved inappropriate use of CacheFactory.ensureCluster(), replacing with CacheFactory.getCluster(). + +* [Issue #159](https://github.com/coherence-community/coherence-incubator/issues/159): + Introduced ability to provide a ConfigurableCacheFactory when creating a ProcessingSession. + +* [Issue #160](https://github.com/coherence-community/coherence-incubator/issues/160): + Ensure consistent use of ClassLoaders based on calling context. + +* [Issue #161](https://github.com/coherence-community/coherence-incubator/issues/161): + Ensure Processing Pattern is initialized using the Cache Configuration LifecycleEvents. + +* [Issue #162](https://github.com/coherence-community/coherence-incubator/issues/162): + Introduce Shared ExecutorService for internal background tasks. + +* [Issue #163](https://github.com/coherence-community/coherence-incubator/issues/163): + Resolves fail-over/fail-back of Grid-based Tasks. + + +-------------------------------------------------------------------------------- + +

Version: 12.4.0 built on 2017-09-28

+ +

Source and Documentation Contributors

+ +* Brian Oliver +

Global and Cross-Module Changes

* [Issue #150](https://github.com/coherence-community/coherence-incubator/issues/150): Upgraded to require diff --git a/coherence-processingpattern-tests/src/test/java/com/oracle/coherence/patterns/processing/taskprocessor/RestartTask.java b/coherence-processingpattern-tests/src/test/java/com/oracle/coherence/patterns/processing/taskprocessor/RestartTask.java index 6a046e6e..030b9e71 100755 --- a/coherence-processingpattern-tests/src/test/java/com/oracle/coherence/patterns/processing/taskprocessor/RestartTask.java +++ b/coherence-processingpattern-tests/src/test/java/com/oracle/coherence/patterns/processing/taskprocessor/RestartTask.java @@ -33,18 +33,19 @@ import com.tangosol.net.CacheFactory; import java.io.IOException; +import java.util.Random; /** - * A simple restartable task. + * A simple restartable task that delays a random amount of time between 0 and 1 seconds. *

- * Copyright (c) 2009. All Rights Reserved. Oracle Corporation.
+ * Copyright (c) 2017. All Rights Reserved. Oracle Corporation.
* Oracle is a registered trademark of Oracle Corporation and/or its affiliates. * - * @author Christer Fahlgren + * @author Brian Oliver */ public class RestartTask implements ResumableTask, PortableObject { - private String sName; + private String name; /** @@ -58,56 +59,53 @@ public RestartTask() /** * Constructs a {@link RestartTask}. * - * @param sName + * @param name the name of the task (that is returned) */ - public RestartTask(String sName) + public RestartTask(String name) { - this.sName = sName; + this.name = name; } - /** - * {@inheritDoc} - */ @Override public String toString() { - return "RestartTask [" + (sName != null ? "sName=" + sName : "") + "]"; + return "RestartTask [" + (name != null ? "sName=" + name : "") + "]"; } - /** - * Method description - * - * @param oEnvironment - * - * @return - */ @Override public Object run(TaskExecutionEnvironment oEnvironment) { - CacheFactory.log(sName + " processed...", CacheFactory.LOG_ALWAYS); - - return sName; + CacheFactory.log(name + " processing...", CacheFactory.LOG_ALWAYS); + + Random random = new Random(); + + try + { + Thread.sleep(random.nextInt(1000)); + } + catch (InterruptedException e) + { + CacheFactory.log(name + " interrupted!"); + } + + CacheFactory.log(name + " processed...", CacheFactory.LOG_ALWAYS); + + return name; } - /** - * {@inheritDoc} - */ @Override public void readExternal(PofReader reader) throws IOException { - this.sName = reader.readString(0); + this.name = reader.readString(0); } - /** - * {@inheritDoc} - */ @Override public void writeExternal(PofWriter writer) throws IOException { - writer.writeString(0, sName); + writer.writeString(0, name); } } diff --git a/coherence-processingpattern-tests/src/test/java/com/oracle/coherence/patterns/processing/taskprocessor/RollingRestartTest.java b/coherence-processingpattern-tests/src/test/java/com/oracle/coherence/patterns/processing/taskprocessor/RollingRestartTest.java index fbb8e10d..c5c2c7ab 100755 --- a/coherence-processingpattern-tests/src/test/java/com/oracle/coherence/patterns/processing/taskprocessor/RollingRestartTest.java +++ b/coherence-processingpattern-tests/src/test/java/com/oracle/coherence/patterns/processing/taskprocessor/RollingRestartTest.java @@ -25,136 +25,135 @@ package com.oracle.coherence.patterns.processing.taskprocessor; +import com.oracle.bedrock.junit.CoherenceClusterResource; +import com.oracle.bedrock.junit.StorageDisabledMember; import com.oracle.bedrock.runtime.LocalPlatform; -import com.oracle.bedrock.runtime.network.AvailablePortIterator; -import org.junit.BeforeClass; +import com.oracle.bedrock.runtime.coherence.options.CacheConfig; +import com.oracle.bedrock.runtime.coherence.options.ClusterPort; +import com.oracle.bedrock.runtime.coherence.options.LocalHost; +import com.oracle.bedrock.runtime.coherence.options.LocalStorage; +import com.oracle.bedrock.runtime.coherence.options.Multicast; +import com.oracle.bedrock.runtime.coherence.options.Pof; +import com.oracle.bedrock.runtime.coherence.options.RoleName; +import com.oracle.bedrock.runtime.concurrent.runnable.RuntimeHalt; +import com.oracle.bedrock.runtime.java.options.SystemProperty; +import com.oracle.bedrock.runtime.options.DisplayName; +import com.oracle.bedrock.util.Capture; +import com.oracle.coherence.common.identifiers.StringBasedIdentifier; +import com.oracle.coherence.patterns.processing.ProcessingSession; +import com.oracle.coherence.patterns.processing.SubmissionConfiguration; +import com.oracle.coherence.patterns.processing.SubmissionOutcome; +import com.oracle.coherence.patterns.processing.SubmissionRetentionPolicy; +import com.oracle.coherence.patterns.processing.internal.DefaultProcessingSession; +import com.oracle.coherence.patterns.processing.internal.DefaultSubmissionConfiguration; +import com.tangosol.net.ConfigurableCacheFactory; +import org.junit.Rule; import org.junit.Test; -import java.net.UnknownHostException; -import java.util.Date; +import java.text.DateFormat; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; /** * Rolling Restart Test *

- * Copyright (c) 2009. All Rights Reserved. Oracle Corporation.
+ * Copyright (c) 2017. All Rights Reserved. Oracle Corporation.
* Oracle is a registered trademark of Oracle Corporation and/or its affiliates. * - * @author Noah Arliss + * @author Brian Oliver */ public class RollingRestartTest { - private final static int cServers = 2; - private final static int cSubmissions = 250; - private final static int cIterations = 5; - private static AvailablePortIterator portIterator; - private final static long CLUSTER_JOIN_TIMEOUT = 10000; + /** + * The number of rolling-restart iterations. + */ + private final static int iterations = 5; + /** + * The number of task submissions per iteration. + */ + private final static int submissionsPerIteration = 250; /** - * Method description - * - * @throws UnknownHostException + * Establish a {@link CoherenceClusterResource} for our test. */ - @BeforeClass - public static void setup() throws UnknownHostException - { - portIterator = LocalPlatform.get().getAvailablePorts(); - } + @Rule + public CoherenceClusterResource coherenceResource = + new CoherenceClusterResource() + .using(LocalPlatform.get()) + .with(LocalHost.only(), + Multicast.ttl(0), + ClusterPort.automatic(), + CacheConfig.of("coherence-processingpattern-restart-cache-config.xml"), + Pof.config("coherence-processingpattern-test-pof-config.xml"), + SystemProperty.of("tangosol.coherence.extend.address", LocalPlatform.get().getLoopbackAddress().getHostAddress()), + SystemProperty.of("tangosol.coherence.extend.port", Capture.of(LocalPlatform.get().getAvailablePorts()))) + .include(4, + DisplayName.of("storage"), + RoleName.of("storage"), + LocalStorage.enabled(), + SystemProperty.of("tangosol.coherence.extend.enabled", false)); /** - * Method description - * - * @throws Throwable + * Ensure that the Processing Pattern can recover tests during a rolling restart. */ @Test - public void testRollingRestart() throws Throwable + public void shouldRecoverTasksDuringRollingRestart() throws Throwable { -// JavaApplication servers[] = new JavaApplication[cServers]; -// try -// { -// System.setProperty("tangosol.coherence.log", "client.txt"); -// int clusterPort = portIterator.next(); -// CoherenceServerBuilder builder = new CoherenceServerBuilder() -// .setEnvironmentVariables(PropertiesBuilder.fromCurrentEnvironmentVariables()) -// .setSystemProperties(PropertiesBuilder.fromCurrentSystemProperties()).setClusterName("testCluster") -// .setCacheConfigURI("coherence-processingpattern-restart-cache-config.xml") -// .setLocalHostAddress("127.0.0.1").setMulticastTTL(0) -// .setPofConfigURI("coherence-processingpattern-test-pof-config.xml").setClusterPort(clusterPort) -// .setLogLevel(6); -// Properties props = new Properties(); -// props.putAll(builder.getSystemPropertiesBuilder().realize(null)); -// System.setProperties(props); -// System.setProperty(CoherenceServerBuilder.PROPERTY_DISTRIBUTED_LOCALSTORAGE, "false"); -// for (int x = 0; x < cServers; x++) -// { -// servers[x] = builder.realize("TestServer" + x, new LogApplicationConsole("TestServer" + x + ".log")); -// } -// wait(CLUSTER_JOIN_TIMEOUT * cServers, "for test servers to start."); -// -// //setup processing session -// ProcessingSession session = new DefaultProcessingSession( -// StringBasedIdentifier.newInstance("TaskExecutionSample" -// + DateFormat.getDateTimeInstance().format(System.currentTimeMillis()))); -// SubmissionConfiguration config = new DefaultSubmissionConfiguration(); -// // Lets do cIterations rolling restarts -// for (int x = 0; x < cIterations; ++x) -// { -// SubmissionOutcome[] results = new SubmissionOutcome[cSubmissions]; -// System.out.println("Submitting work to the grid..."); -// //Submit cSubmissions to the cluster for processing -// for (int y = 0; y < cSubmissions; y++) -// { -// String id = "Task[" + x + ":" + y + "]"; -// results[y] = session.submit(new RestartTask(id), config, StringBasedIdentifier.newInstance(id), SubmissionRetentionPolicy.RemoveOnFinalState, null); -// } -// //wait for results to come back -// for (int y = 0; y < cSubmissions; y++) -// { -// System.out.println("Client received result from: " + results[y].get()); -// } -// //Now lets do a rolling restart -// for (int y = 0; y < cServers; y++) -// { -// System.out.println("Stopping server " + y); -// servers[y].destroy(); -// wait(CLUSTER_JOIN_TIMEOUT, "for test server " + y + " to stop"); -// System.out.println("Starting server " + y); -// servers[y] = builder.realize("TestServer" + y, new LogApplicationConsole("TestServer"+y+".log")); -// wait(CLUSTER_JOIN_TIMEOUT, "for test server " + y + " to start"); -// } -// //Now that we've done a rolling restart let the loop submit work again. -// System.out.println("Iteration " + x + " done successfully..."); -// } -// System.out.println("All done successfully..."); -// } -// catch (Throwable e) -// { -// System.out.println(e); -// e.printStackTrace(); -// throw e; -// } -// finally -// { -// for (int x = 0; x < cServers; x++) -// { -// servers[x].destroy(); -// } -// } - } + // acquire a storage disabled cache factory for the cluster + ConfigurableCacheFactory cacheFactory = coherenceResource.createSession(new StorageDisabledMember()); + // establish a processing session + ProcessingSession session = + new DefaultProcessingSession(StringBasedIdentifier.newInstance("TaskExecutionSample" + + DateFormat.getDateTimeInstance() + .format(System.currentTimeMillis())), + cacheFactory); - /** - *

A simple method to wait a specified amount of time, with a message to stdout.

- * - * @param time The time to wait in ms. - * @param rationale The rationale (message) for waiting. - * @throws InterruptedException When interrupted while waiting. - */ - public void wait(long time, - String rationale) throws InterruptedException - { - System.out.printf("%s: Waiting %dms %s\n", new Date(), time, rationale); - Thread.sleep(time); + SubmissionConfiguration config = new DefaultSubmissionConfiguration(); + + // perform a number of rolling restart iterations + for (int x = 0; x < iterations; ++x) + { + SubmissionOutcome[] results = new SubmissionOutcome[submissionsPerIteration]; + + System.out.println("Submitting work to the cluster..."); + + // submit a number of tasks for processing + for (int y = 0; y < submissionsPerIteration; y++) + { + String id = "Task[" + x + ":" + y + "]"; + + results[y] = session.submit(new RestartTask(id), + config, + StringBasedIdentifier.newInstance(id), + SubmissionRetentionPolicy.RemoveOnFinalState, + null); + } + + // wait for results to come back + for (int y = 0; y < submissionsPerIteration; y++) + { + String id = "Task[" + x + ":" + y + "]"; + + System.out.println("Waiting for result... " + id); + + assertThat(results[y].get(), is(id)); + + System.out.println("Received result... " + id); + + // System.out.println("Client received result from: " + results[y].get()); + } + + // randomly terminate and relaunch all of the servers + coherenceResource.getCluster().relaunch(); + + // we're done submitting work for this iteration + System.out.println("Iteration " + x + " done successfully..."); + } + + System.out.println("All done successfully..."); } } diff --git a/coherence-processingpattern-tests/src/test/resources/coherence-processingpattern-restart-cache-config.xml b/coherence-processingpattern-tests/src/test/resources/coherence-processingpattern-restart-cache-config.xml index 02773937..b5d0253d 100755 --- a/coherence-processingpattern-tests/src/test/resources/coherence-processingpattern-restart-cache-config.xml +++ b/coherence-processingpattern-tests/src/test/resources/coherence-processingpattern-restart-cache-config.xml @@ -26,7 +26,7 @@ Copyright (c) 2009. All Rights Reserved. Oracle Corporation. --> diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/config/builder/TaskDispatchPolicyBuilder.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/config/builder/TaskDispatchPolicyBuilder.java index ecacb9e4..bc7e0e22 100644 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/config/builder/TaskDispatchPolicyBuilder.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/config/builder/TaskDispatchPolicyBuilder.java @@ -34,6 +34,7 @@ import com.tangosol.coherence.config.builder.ParameterizedBuilder; import com.tangosol.config.annotation.Injectable; import com.tangosol.config.expression.NullParameterResolver; +import com.tangosol.util.Base; /** * The TaskDispatcherBuilder is responsible for building a @@ -72,7 +73,7 @@ public TaskDispatchPolicy realize() else { policy = m_bldrCustom.realize(new NullParameterResolver(), - Thread.currentThread().getContextClassLoader(), + Base.getContextClassLoader(), new ResolvableParameterList()); } } diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/config/xml/processor/ProcessingPatternConfigProcessor.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/config/xml/processor/ProcessingPatternConfigProcessor.java index 538f3803..7e754db0 100644 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/config/xml/processor/ProcessingPatternConfigProcessor.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/config/xml/processor/ProcessingPatternConfigProcessor.java @@ -9,8 +9,7 @@ * You may not use this file except in compliance with the License. * * You can obtain a copy of the License by consulting the LICENSE.txt file - * distributed with this file, or by consulting - * or https://oss.oracle.com/licenses/CDDL + * distributed with this file, or by consulting https://oss.oracle.com/licenses/CDDL * * See the License for the specific language governing permissions * and limitations under the License. @@ -26,16 +25,23 @@ package com.oracle.coherence.patterns.processing.config.xml.processor; - +import com.oracle.coherence.common.threading.ThreadFactories; import com.oracle.coherence.patterns.processing.config.ProcessingPatternConfig; import com.oracle.coherence.patterns.processing.config.builder.ProcessingPatternConfigBuilder; import com.oracle.coherence.patterns.processing.internal.DefaultEnvironment; import com.oracle.coherence.patterns.processing.internal.Environment; +import com.oracle.coherence.patterns.processing.internal.ProcessingPattern; import com.tangosol.config.ConfigurationException; import com.tangosol.config.xml.ElementProcessor; import com.tangosol.config.xml.ProcessingContext; import com.tangosol.config.xml.XmlSimpleName; import com.tangosol.run.xml.XmlElement; +import com.tangosol.util.Builder; +import com.tangosol.util.RegistrationBehavior; +import com.tangosol.util.ResourceRegistry; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * A {@link ProcessingPatternConfigProcessor} is responsible for processing a @@ -66,8 +72,31 @@ public ProcessingPatternConfig process(ProcessingContext context, // may customize the processing pattern config context.processForeignElementsOf(element); + // establish an ExecutorService that the Processing Pattern can use for background tasks + context.getResourceRegistry().registerResource(ExecutorService.class, + ProcessingPattern.RESOURCE, + new Builder() + { + @Override + public ExecutorService realize() + { + return Executors.newSingleThreadExecutor(ThreadFactories.newThreadFactory(true, + "ProcessingPattern", + null)); + } + }, + RegistrationBehavior.IGNORE, + new ResourceRegistry.ResourceLifecycleObserver() + { + @Override + public void onRelease(ExecutorService executorService) + { + executorService.shutdownNow(); + } + }); + // Add the environment since it is accessed by the PP session - Environment env = new DefaultEnvironment(); + Environment env = new DefaultEnvironment(); context.getResourceRegistry().registerResource(Environment.class, env); diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/dispatchers/AbstractDispatcher.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/dispatchers/AbstractDispatcher.java index e513e52b..276200ff 100755 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/dispatchers/AbstractDispatcher.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/dispatchers/AbstractDispatcher.java @@ -115,7 +115,7 @@ protected String getMBeanName() */ protected void registerMBean() { - Registry registry = CacheFactory.ensureCluster().getManagement(); + Registry registry = CacheFactory.getCluster().getManagement(); if (registry != null) { @@ -136,7 +136,7 @@ protected void registerMBean() */ protected void unregisterMBean() { - Registry registry = CacheFactory.ensureCluster().getManagement(); + Registry registry = CacheFactory.getCluster().getManagement(); if (registry != null) { diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/dispatchers/task/DefaultTaskDispatcher.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/dispatchers/task/DefaultTaskDispatcher.java index 9882952e..a977ea41 100755 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/dispatchers/task/DefaultTaskDispatcher.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/dispatchers/task/DefaultTaskDispatcher.java @@ -443,6 +443,15 @@ protected boolean dispatchTask(final PendingSubmission pendingSubmission, pendingSubmission .getResultIdentifier()); + if (logger.isLoggable(Level.INFO)) + { + logger.log(Level.INFO, + "Task {0} with result id {1} was offered the Task and returned {2}.", + new Object[] {pendingSubmission.getSubmissionKey(), + pendingSubmission.getResultIdentifier(), offerTaskResult}); + } + + if (!offerTaskResult) { setSubmissionToRetryStatus(pendingSubmission, taskProcessorKey, submissionResult); @@ -779,12 +788,12 @@ private void handleTaskProcessorDefinition(final ConfigurableCacheFactory oCCFac oTaskProcessorDefinition .getAttributeMap()}); - tps.setAttribute("machinename", CacheFactory.ensureCluster().getLocalMember().getMachineName()); - tps.setAttribute("hostname", CacheFactory.ensureCluster().getLocalMember().getAddress().getHostName()); - tps.setAttribute("rackname", CacheFactory.ensureCluster().getLocalMember().getRackName()); - tps.setAttribute("membername", CacheFactory.ensureCluster().getLocalMember().getMemberName()); - tps.setAttribute("processname", CacheFactory.ensureCluster().getLocalMember().getProcessName()); - tps.setAttribute("rolename", CacheFactory.ensureCluster().getLocalMember().getRoleName()); + tps.setAttribute("machinename", CacheFactory.getCluster().getLocalMember().getMachineName()); + tps.setAttribute("hostname", CacheFactory.getCluster().getLocalMember().getAddress().getHostName()); + tps.setAttribute("rackname", CacheFactory.getCluster().getLocalMember().getRackName()); + tps.setAttribute("membername", CacheFactory.getCluster().getLocalMember().getMemberName()); + tps.setAttribute("processname", CacheFactory.getCluster().getLocalMember().getProcessName()); + tps.setAttribute("rolename", CacheFactory.getCluster().getLocalMember().getRoleName()); if (oTaskProcessorDefinition.getTaskProcessorType() == TaskProcessorType.GRID) { diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/DefaultProcessingSession.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/DefaultProcessingSession.java index da74289e..87db4ebe 100755 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/DefaultProcessingSession.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/DefaultProcessingSession.java @@ -36,7 +36,6 @@ import com.oracle.coherence.patterns.processing.SubmissionOutcomeListener; import com.oracle.coherence.patterns.processing.SubmissionRetentionPolicy; import com.oracle.coherence.patterns.processing.SubmissionState; -import com.oracle.coherence.patterns.processing.config.ProcessingPatternConfig; import com.tangosol.net.CacheFactory; import com.tangosol.net.ConfigurableCacheFactory; import com.tangosol.net.NamedCache; @@ -133,28 +132,41 @@ public class DefaultProcessingSession implements ProcessingSession @SuppressWarnings("unchecked") public DefaultProcessingSession(Identifier sessionIdentifier) { - ConfigurableCacheFactory ccFactory = CacheFactory.getConfigurableCacheFactory(); + this(sessionIdentifier, CacheFactory.getConfigurableCacheFactory()); + } + + /** + * Standard Constructor. + * + * @param sessionIdentifier is the {@link Identifier} for this Session. + */ + @SuppressWarnings("unchecked") + public DefaultProcessingSession(Identifier sessionIdentifier, + ConfigurableCacheFactory ccFactory) + { try { - ccFactory.activate(); + ccFactory.activate(); } catch (IllegalStateException e) { - // Could already be active, in which case just move on. + // Could already be active, in which case just move on. } Environment environment = ccFactory.getResourceRegistry().getResource(Environment.class); + if (environment == null) { - environment = new DefaultEnvironment(); + environment = new DefaultEnvironment(); ProcessingPattern.createClientSideObjects(environment); - ccFactory.getResourceRegistry().registerResource(Environment.class, environment); + ccFactory.getResourceRegistry().registerResource(Environment.class, environment); } - - this.executorService = ExecutorServiceFactory.newSingleThreadScheduledExecutor(ThreadFactories - .newThreadFactory(true, "DefaultProcessingSession", null)); + this.executorService = + ExecutorServiceFactory.newSingleThreadScheduledExecutor(ThreadFactories.newThreadFactory(true, + "DefaultProcessingSession", + null)); this.shutdownSync = new Object(); this.sessionId = sessionIdentifier; @@ -185,9 +197,9 @@ public DefaultProcessingSession(ConfigurableCacheFactory ccFactory, Identifier sessionIdentifier) { this.executorService = - ExecutorServiceFactory - .newSingleThreadScheduledExecutor(ThreadFactories - .newThreadFactory(true, "DefaultProcessingSession", null)); + ExecutorServiceFactory.newSingleThreadScheduledExecutor(ThreadFactories.newThreadFactory(true, + "DefaultProcessingSession", + null)); this.shutdownSync = new Object(); this.sessionId = sessionIdentifier; this.submissionOutcomeMap = new ConcurrentHashMap(); @@ -342,25 +354,25 @@ private void removeCacheObjectsAsynch(final SubmissionKey submissionKey, final Identifier resultId) { executorService.execute(new Runnable() - { - public void run() - { - synchronized (shutdownSync) - { - try - { - submissionProxyFactory.destroyRemoteObject(submissionKey); - submissionResultProxyFactory.destroyRemoteObject(resultId); - } - catch (Throwable e) - { - logger.log(Level.SEVERE, - "Failed to remove cache objects with result key {0} and submission key {1} due to {2}", - new Object[] {resultId, submissionKey, e}); - } - } - } - }); + { + public void run() + { + synchronized (shutdownSync) + { + try + { + submissionProxyFactory.destroyRemoteObject(submissionKey); + submissionResultProxyFactory.destroyRemoteObject(resultId); + } + catch (Throwable e) + { + logger.log(Level.SEVERE, + "Failed to remove cache objects with result key {0} and submission key {1} due to {2}", + new Object[] {resultId, submissionKey, e}); + } + } + } + }); } diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/DefaultSubmissionConfiguration.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/DefaultSubmissionConfiguration.java index fc1d80b1..58a59ec7 100755 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/DefaultSubmissionConfiguration.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/DefaultSubmissionConfiguration.java @@ -31,6 +31,7 @@ import com.tangosol.io.pof.PofReader; import com.tangosol.io.pof.PofWriter; import com.tangosol.io.pof.PortableObject; +import com.tangosol.util.Base; import com.tangosol.util.ExternalizableHelper; import java.io.DataInput; @@ -231,7 +232,7 @@ public void readExternal(final DataInput in) throws IOException { this.submissionDelay = ExternalizableHelper.readLong(in); this.configurationDataMap = new HashMap(); - ExternalizableHelper.readMap(in, this.configurationDataMap, Thread.currentThread().getContextClassLoader()); + ExternalizableHelper.readMap(in, this.configurationDataMap, Base.getContextClassLoader()); this.groupAffinity = ExternalizableHelper.readObject(in); } diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/DefaultSubmissionResult.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/DefaultSubmissionResult.java index a039ff86..c047fd67 100755 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/DefaultSubmissionResult.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/DefaultSubmissionResult.java @@ -489,10 +489,12 @@ public Object assign(Object owner) { SubmissionState state = getSubmissionState(); + boolean isOwnershipChanged = owner != null && this.owner != null && !owner.equals(this.owner); + if ((state == SubmissionState.SUBMITTED) || (state == SubmissionState.SUSPENDED) - || (state == SubmissionState.RETRY)) + || (state == SubmissionState.RETRY) || isOwnershipChanged) { - if ((state == SubmissionState.SUSPENDED) || (state == SubmissionState.RETRY)) + if ((state == SubmissionState.SUSPENDED) || (state == SubmissionState.RETRY) || isOwnershipChanged) { isResuming = true; } diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/LifecycleInterceptor.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/LifecycleInterceptor.java index 88c0dd15..227ea7b9 100644 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/LifecycleInterceptor.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/LifecycleInterceptor.java @@ -9,8 +9,7 @@ * You may not use this file except in compliance with the License. * * You can obtain a copy of the License by consulting the LICENSE.txt file - * distributed with this file, or by consulting - * or https://oss.oracle.com/licenses/CDDL + * distributed with this file, or by consulting https://oss.oracle.com/licenses/CDDL * * See the License for the specific language governing permissions * and limitations under the License. @@ -26,11 +25,12 @@ package com.oracle.coherence.patterns.processing.internal; - import com.tangosol.net.ConfigurableCacheFactory; import com.tangosol.net.events.EventInterceptor; import com.tangosol.net.events.application.LifecycleEvent; +import java.util.concurrent.ExecutorService; + /** * The class is a LifecycleEvent interceptor which gets called * when a CCF is activate/disposed. @@ -51,13 +51,22 @@ public void onEvent(LifecycleEvent event) // to CCF do execute from a thread. final ConfigurableCacheFactory ccf = event.getConfigurableCacheFactory(); - new Thread(new Runnable() + if (event.getType() == LifecycleEvent.Type.ACTIVATED) { - @Override - public void run() - { - ProcessingPattern.ensureInfrastructureStarted(ccf); - } - }).start(); - } + // acquire the ExecutorService for the Processing Pattern + ExecutorService executorService = + event.getConfigurableCacheFactory().getResourceRegistry().getResource(ExecutorService.class, + ProcessingPattern.RESOURCE); + + // asynchronously perform initialization + executorService.submit(new Runnable() + { + @Override + public void run() + { + ProcessingPattern.ensureInfrastructureStarted(ccf); + } + }); + } + } } diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/ProcessingPattern.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/ProcessingPattern.java index 8021a56b..93ef8a0e 100644 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/ProcessingPattern.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/ProcessingPattern.java @@ -43,6 +43,7 @@ import com.oracle.coherence.patterns.processing.taskprocessor.DefaultClientLeaseMaintainer; import com.tangosol.net.ConfigurableCacheFactory; import com.tangosol.util.Base; +import com.tangosol.util.ResourceRegistry; /** @@ -53,6 +54,11 @@ */ public class ProcessingPattern { + /** + * The resource name for resources added to the Coherence {@link ResourceRegistry}. + */ + public static String RESOURCE = "ProcessingPattern"; + ConfigurableCacheFactory m_ccf; Environment m_env; ProcessingPatternConfig m_config; @@ -96,9 +102,6 @@ public ProcessingPattern(ConfigurableCacheFactory ccf, Environment env, Processi */ public static void ensureInfrastructureStarted(ConfigurableCacheFactory ccf) { - if (m_fStarted) - return; - synchronized (ProcessingPatternConfig.class) { if (!m_fStarted) diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/DefaultServerLeaseMonitor.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/DefaultServerLeaseMonitor.java index e4cef183..a4ec32d5 100755 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/DefaultServerLeaseMonitor.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/DefaultServerLeaseMonitor.java @@ -240,9 +240,7 @@ public void run() } taskProcessorMediatorCache.invoke(leaseOwner, - new InvokeMethodProcessor("leaseExpired", - new Object[] {lease})); - taskProcessorMediatorCache.remove(leaseOwner); + new InvokeMethodProcessor("recoverTasks")); deregisterLease = true; } diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/DefaultTaskProcessorDefinition.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/DefaultTaskProcessorDefinition.java index 8742cc3c..2df808ce 100755 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/DefaultTaskProcessorDefinition.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/DefaultTaskProcessorDefinition.java @@ -33,6 +33,7 @@ import com.tangosol.io.pof.PofReader; import com.tangosol.io.pof.PofWriter; import com.tangosol.io.pof.PortableObject; +import com.tangosol.util.Base; import com.tangosol.util.ExternalizableHelper; import java.io.DataInput; @@ -212,7 +213,7 @@ public void readExternal(final DataInput in) throws IOException taskProcessorType = TaskProcessorType.valueOf(in.readUTF()); taskProcessor = (TaskProcessor) ExternalizableHelper.readObject(in); attributeMap = new HashMap(); - ExternalizableHelper.readMap(in, attributeMap, this.getClass().getClassLoader()); + ExternalizableHelper.readMap(in, attributeMap, Base.getContextClassLoader()); } diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/DefaultTaskProcessorDefinitionManager.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/DefaultTaskProcessorDefinitionManager.java index abcadd8b..84cc62fe 100755 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/DefaultTaskProcessorDefinitionManager.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/DefaultTaskProcessorDefinitionManager.java @@ -175,13 +175,13 @@ private void storeTaskProcessorDefinition(TaskProcessorDefinition definition) th if (cs instanceof DistributedCacheService) { // We know now that we are on a cluster node - tps.setAttribute("machinename", CacheFactory.ensureCluster().getLocalMember().getMachineName()); + tps.setAttribute("machinename", CacheFactory.getCluster().getLocalMember().getMachineName()); tps.setAttribute("hostname", - CacheFactory.ensureCluster().getLocalMember().getAddress().getHostName()); - tps.setAttribute("rackname", CacheFactory.ensureCluster().getLocalMember().getRackName()); - tps.setAttribute("membername", CacheFactory.ensureCluster().getLocalMember().getMemberName()); - tps.setAttribute("processname", CacheFactory.ensureCluster().getLocalMember().getProcessName()); - tps.setAttribute("rolename", CacheFactory.ensureCluster().getLocalMember().getRoleName()); + CacheFactory.getCluster().getLocalMember().getAddress().getHostName()); + tps.setAttribute("rackname", CacheFactory.getCluster().getLocalMember().getRackName()); + tps.setAttribute("membername", CacheFactory.getCluster().getLocalMember().getMemberName()); + tps.setAttribute("processname", CacheFactory.getCluster().getLocalMember().getProcessName()); + tps.setAttribute("rolename", CacheFactory.getCluster().getLocalMember().getRoleName()); tps.setAttribute("taskprocessortype", "single"); } diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/DefaultTaskProcessorMediator.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/DefaultTaskProcessorMediator.java index 2636cb77..91280129 100755 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/DefaultTaskProcessorMediator.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/DefaultTaskProcessorMediator.java @@ -25,17 +25,19 @@ package com.oracle.coherence.patterns.processing.internal.task; - import com.oracle.coherence.common.identifiers.Identifier; import com.oracle.coherence.common.leasing.Lease; import com.oracle.coherence.common.leasing.Leasing; import com.oracle.coherence.common.util.ChangeIndication; +import com.oracle.coherence.patterns.processing.internal.ProcessingPattern; import com.oracle.coherence.patterns.processing.internal.SubmissionKey; import com.oracle.coherence.patterns.processing.internal.SubmissionKeyPair; import com.tangosol.io.ExternalizableLite; import com.tangosol.io.pof.PofReader; import com.tangosol.io.pof.PofWriter; import com.tangosol.io.pof.PortableObject; +import com.tangosol.net.CacheFactory; +import com.tangosol.util.Base; import com.tangosol.util.BinaryEntry; import com.tangosol.util.ExternalizableHelper; @@ -47,6 +49,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.logging.Level; import java.util.logging.Logger; @@ -67,25 +70,25 @@ public class DefaultTaskProcessorMediator implements ExternalizableLite, ChangeIndication { /** - * The MBean manager for this class. + * The name of the Coherence Cache that will store {@link DefaultTaskProcessorMediator}s. */ - private static TaskProcessorMBeanManager m_mgrMbean; + public static final String CACHENAME = "coherence.patterns.processing.taskprocessormediator"; /** - * A {@link ServerLeaseMonitor} helps keep track of when the {@link Lease} for the associated - * {@link com.oracle.coherence.patterns.processing.task.TaskProcessor} expires. + * The {@link Logger} to use. */ - private static ServerLeaseMonitor leaseMonitor; + private static final Logger logger = Logger.getLogger(DefaultTaskProcessorMediator.class.getName()); /** - * The name of the Coherence Cache that will store {@link DefaultTaskProcessorMediator}s. + * The MBean manager for this class. */ - public static final String CACHENAME = "coherence.patterns.processing.taskprocessormediator"; + private static TaskProcessorMBeanManager m_mgrMbean; /** - * The {@link Logger} to use. + * A {@link ServerLeaseMonitor} helps keep track of when the {@link Lease} for the associated + * {@link com.oracle.coherence.patterns.processing.task.TaskProcessor} expires. */ - private static final Logger logger = Logger.getLogger(DefaultTaskProcessorMediator.class.getName()); + private static ServerLeaseMonitor leaseMonitor; /** * Key for the Executor this queue belongs to. @@ -204,6 +207,7 @@ public DefaultTaskProcessorMediator(final TaskProcessorMediatorKey taskProcessor initialize(); } + /** * Set the MBean manager. * @@ -214,6 +218,7 @@ public static void setMBeanManager(TaskProcessorMBeanManager mgr) m_mgrMbean = mgr; } + /** * Sets the {@link ServerLeaseMonitor} to use for this class. * @@ -224,6 +229,7 @@ public static void setLeaseMonitor(ServerLeaseMonitor leaseMonitor) DefaultTaskProcessorMediator.leaseMonitor = leaseMonitor; } + /** * {@inheritDoc} */ @@ -433,17 +439,25 @@ private void updateExecutionTiming(long executionDuration) @Override public String toString() { - return "DefaultTaskProcessorMediator [" + (attributeMap != null ? "attributeMap=" + attributeMap + ", " : "") - + "lastTaskExecutionDuration=" + lastTaskExecutionDuration + ", maximumTaskExecutionDuration=" - + maximumTaskExecutionDuration + ", minimumTaskExecutionDuration=" + minimumTaskExecutionDuration - + ", noAcceptedTasks=" + noAcceptedTasks + ", noExecutedTasks=" + noExecutedTasks + ", " - + (stateEnum != null ? "stateEnum=" + stateEnum + ", " : "") - + (taskProcessorKey != null ? "taskProcessorKey=" + taskProcessorKey + ", " : "") - + (taskProcessorLease != null ? "taskProcessorLease=" + taskProcessorLease + ", " : "") - + (tasks != null ? "tasks=" + tasks + ", " : "") - + (tasksInProgress != null ? "tasksInProgress=" + tasksInProgress + ", " : "") - + "totalTaskExecutionDuration=" + totalTaskExecutionDuration + ", yieldTaskCount=" + yieldTaskCount - + "]"; + return "DefaultTaskProcessorMediator [" + + (attributeMap != null ? "attributeMap=" + attributeMap + ", " : "") + "lastTaskExecutionDuration=" + + lastTaskExecutionDuration + ", maximumTaskExecutionDuration=" + maximumTaskExecutionDuration + + ", minimumTaskExecutionDuration=" + minimumTaskExecutionDuration + ", noAcceptedTasks=" + + noAcceptedTasks + ", noExecutedTasks=" + noExecutedTasks + ", " + + (stateEnum != null + ? "stateEnum=" + stateEnum + ", " + : "") + (taskProcessorKey != null + ? "taskProcessorKey=" + taskProcessorKey + ", " + : "") + (taskProcessorLease != null + ? "taskProcessorLease=" + taskProcessorLease + ", " + : "") + (tasks != null + ? "tasks=" + tasks + ", " + : "") + (tasksInProgress != null + ? "tasksInProgress=" + tasksInProgress + ", " + : "") + "totalTaskExecutionDuration=" + + totalTaskExecutionDuration + + ", yieldTaskCount=" + yieldTaskCount + + "]"; } @@ -643,45 +657,33 @@ public Lease getTaskProcessorLease(long duration) } - /** - * {@inheritDoc} - */ - public void leaseExpired(Lease lease) - { - if (logger.isLoggable(Level.FINEST)) - { - logger.log(Level.FINEST, - "Lease {0} expired for TaskProcessor {1} setting to INACTIVE", - new Object[] {lease, this}); - } - - stateEnum = TaskProcessorStateEnum.INACTIVE; - recoverTasks(); - setChanged(); - } - - /** * Start the recovery of the tasks for this {@link DefaultTaskProcessorMediator}. */ @SuppressWarnings("unchecked") - private void recoverTasks() + public void recoverTasks() { if (logger.isLoggable(Level.INFO)) { logger.log(Level.INFO, "Recovering tasks {0}", tasks); } - LinkedList mergedlist = (LinkedList) tasks.clone(); + // this task processor is now inactive and won't perform any more processing + stateEnum = TaskProcessorStateEnum.INACTIVE; + + // collect the list of tasks to recover + LinkedList mergedlist = new LinkedList<>(); + mergedlist.addAll(tasks); mergedlist.addAll(tasksInProgress); - RecoverTasks task = new RecoverTasks(mergedlist); - Thread recoverthread = new Thread(task); + // have the Processing Pattern Executor Service perform the recovery asynchronously + ExecutorService executorService = + CacheFactory.getConfigurableCacheFactory().getResourceRegistry().getResource(ExecutorService.class, + ProcessingPattern.RESOURCE); + + executorService.submit(new RecoverTasks(taskProcessorKey, mergedlist)); - recoverthread.start(); - tasks.clear(); - tasksInProgress.clear(); setChanged(); } @@ -690,7 +692,7 @@ private void recoverTasks() * {@inheritDoc} */ public void entryArrived() - { + { if (logger.isLoggable(Level.FINE)) { logger.log(Level.FINE, "TaskProcessorMediator {0} arrived to this member.", this); @@ -731,6 +733,7 @@ public TaskProcessorStateEnum getProcessorState() return stateEnum; } + /** * {@inheritDoc} */ @@ -798,6 +801,7 @@ public void onInserted(BinaryEntry entry) } } + /** * {@inheritDoc} */ @@ -814,6 +818,7 @@ public void onUpdated(BinaryEntry entry) } } + /** * {@inheritDoc} */ @@ -822,6 +827,7 @@ public void onRemoved(BinaryEntry entry) // placeholder } + /** * Called if this entry has arrived to this node due to partitions moving or failover. */ @@ -831,6 +837,7 @@ public void onArrived(BinaryEntry entry) onInserted(entry); } + /** * Called if this entry has departed from this node due to partitions moving or failover. */ @@ -840,6 +847,7 @@ public void onDeparted(BinaryEntry entry) onRemoved(entry); } + /** * {@inheritDoc} */ @@ -859,7 +867,7 @@ public void readExternal(final DataInput in) throws IOException lastTaskExecutionDuration = in.readLong(); yieldTaskCount = in.readInt(); attributeMap = new HashMap(); - ExternalizableHelper.readMap(in, attributeMap, this.getClass().getClassLoader()); + ExternalizableHelper.readMap(in, attributeMap, Base.getContextClassLoader()); } diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/RecoverTasks.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/RecoverTasks.java index 7e24215b..d99a4e9e 100755 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/RecoverTasks.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/RecoverTasks.java @@ -32,6 +32,7 @@ import com.tangosol.net.CacheFactory; import com.tangosol.net.ConfigurableCacheFactory; import com.tangosol.net.NamedCache; +import com.tangosol.util.Base; import java.util.LinkedList; import java.util.logging.Level; @@ -58,40 +59,27 @@ public class RecoverTasks implements Runnable private static final Logger logger = Logger.getLogger(RecoverTasks.class.getName()); /** - * The tasks to recover. - */ - private LinkedList tasks; - - /** - * The {@link ConfigurableCacheFactory} to use. - */ - private ConfigurableCacheFactory ccFactory; - - /** - * The {@link NamedCache} where {@link com.oracle.coherence.patterns.processing.internal.SubmissionResult}s are - * stored. + * The key for the {@link TaskProcessorMediator} being recovered. */ - private NamedCache submissionResultCache; + private TaskProcessorMediatorKey taskProcessorMediatorKey; /** - * The {@link NamedCache} where {@link com.oracle.coherence.patterns.processing.internal.Submission}s are stored. + * The tasks to recover. */ - private NamedCache submissionCache; + private LinkedList tasks; /** * Standard constructor. * - * @param tasks the Tasks to recover + * @param taskProcessorMediatorKey the key of the {@link TaskProcessorMediator} being recovered + * @param tasks the keys of the tasks to recover */ - public RecoverTasks(LinkedList tasks) + public RecoverTasks(TaskProcessorMediatorKey taskProcessorMediatorKey, + LinkedList tasks) { - this.tasks = tasks; - this.ccFactory = - CacheFactory.getCacheFactoryBuilder().getConfigurableCacheFactory(this.getClass().getClassLoader()); - this.submissionResultCache = ccFactory.ensureCache(DefaultSubmissionResult.CACHENAME, null); - this.submissionCache = ccFactory.ensureCache(DefaultSubmission.CACHENAME, null); - + this.taskProcessorMediatorKey = taskProcessorMediatorKey; + this.tasks = tasks; } @@ -100,6 +88,20 @@ public RecoverTasks(LinkedList tasks) */ public void run() { + ConfigurableCacheFactory ccFactory = + CacheFactory.getCacheFactoryBuilder().getConfigurableCacheFactory(Base.getContextClassLoader()); + + // obtain the submissions result cache to update + NamedCache submissionResultCache = ccFactory.ensureCache(DefaultSubmissionResult.CACHENAME, + Base.getContextClassLoader()); + + // obtain the submissions cache to update + NamedCache submissionCache = ccFactory.ensureCache(DefaultSubmission.CACHENAME, Base.getContextClassLoader()); + + // obtain the task mediator cache to update + NamedCache taskProcessorMediatorCache = ccFactory.ensureCache(DefaultTaskProcessorMediator.CACHENAME, + Base.getContextClassLoader()); + if (logger.isLoggable(Level.INFO)) { logger.log(Level.INFO, "Starting recovery of {0,number} tasks.", tasks.size()); @@ -112,7 +114,9 @@ public void run() logger.log(Level.FINER, "Recovering {0}", keyPair); } - submissionResultCache.invoke(keyPair.getResultId(), new InvokeMethodProcessor("retry", new Object[] + submissionResultCache.invoke(keyPair.getResultId(), + new InvokeMethodProcessor("retry", + new Object[] { })); @@ -134,5 +138,9 @@ public void run() } } } + + // remove the TaskProcessorMediator as it's been recovered + taskProcessorMediatorCache.remove(taskProcessorMediatorKey); } } + \ No newline at end of file diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/TaskProcessorMBeanManager.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/TaskProcessorMBeanManager.java index 256fc9bb..bd82489b 100755 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/TaskProcessorMBeanManager.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/TaskProcessorMBeanManager.java @@ -138,7 +138,7 @@ protected void registerMBean(TaskProcessorMediatorKey key, { if (!shuttingDown) { - Registry registry = CacheFactory.ensureCluster().getManagement(); + Registry registry = CacheFactory.getCluster().getManagement(); if (registry != null) { @@ -147,7 +147,7 @@ protected void registerMBean(TaskProcessorMediatorKey key, .ensureGlobalName(String .format("type=ProcessingPattern,subType=TaskProcessor,id=%s", key.getTaskProcessorDefinitionIdentifier().toString() + ":" + key.getMemberId() - + ":" + +CacheFactory.ensureCluster().getLocalMember().getId()))); + + ":" + +CacheFactory.getCluster().getLocalMember().getId()))); if (logger.isLoggable(Level.FINER)) { @@ -171,7 +171,7 @@ protected void unregisterMBean(TaskProcessorMediatorProxy mBeanProxy) { if (!shuttingDown) { - Registry registry = CacheFactory.ensureCluster().getManagement(); + Registry registry = CacheFactory.getCluster().getManagement(); if (registry != null) { diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/TaskProcessorMediator.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/TaskProcessorMediator.java index 7eafffc5..ebfeb4f3 100755 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/TaskProcessorMediator.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/TaskProcessorMediator.java @@ -61,14 +61,6 @@ public interface TaskProcessorMediator List drainQueueToBeExecuted(); - /** - * The lease has expired for this {@link TaskProcessorMediator}. - * - * @param lease the Lease that has expired. - */ - void leaseExpired(Lease lease); - - /** * Enqueue a particular task for this executor. * diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/TaskProcessorMediatorInterceptor.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/TaskProcessorMediatorInterceptor.java index fe22167d..91e7300a 100644 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/TaskProcessorMediatorInterceptor.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/internal/task/TaskProcessorMediatorInterceptor.java @@ -25,17 +25,29 @@ package com.oracle.coherence.patterns.processing.internal.task; - +import com.oracle.coherence.common.processors.InvokeMethodProcessor; import com.oracle.coherence.patterns.processing.internal.ProcessingPattern; +import com.tangosol.net.CacheFactory; +import com.tangosol.net.Cluster; +import com.tangosol.net.Member; +import com.tangosol.net.MemberEvent; +import com.tangosol.net.MemberListener; +import com.tangosol.net.NamedCache; import com.tangosol.net.events.Event; +import com.tangosol.net.events.EventDispatcher; import com.tangosol.net.events.EventInterceptor; +import com.tangosol.net.events.internal.ServiceDispatcher; import com.tangosol.net.events.partition.TransferEvent; import com.tangosol.net.events.partition.cache.EntryEvent; import com.tangosol.util.BinaryEntry; +import com.tangosol.util.UID; +import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Set; - +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; /** * The TaskProcessorMediatorInterceptor intercepts events for the TaskProcessorMediator.\ @@ -43,16 +55,55 @@ * * @author Paul Mackin */ -public class TaskProcessorMediatorInterceptor - implements EventInterceptor +public class TaskProcessorMediatorInterceptor implements EventInterceptor, MemberListener { /** - * {@inheritDoc} + * The keys of member-based {@link TaskProcessorMediator}s resident in this server, keyed by {@link Member#getUid()}. + */ + private ConcurrentHashMap> mediatorKeys; + + /** + * An {@link ExecutorService} to perform recovery of tasks (asynchronously) */ + private ExecutorService recoveryService; + + + /** + * Constructs a {@link TaskProcessorMediatorInterceptor}. + */ + public TaskProcessorMediatorInterceptor() + { + this.mediatorKeys = new ConcurrentHashMap<>(); + } + + @Override public void onEvent(Event event) { - if (event instanceof EntryEvent) + if (event instanceof EventDispatcher.InterceptorRegistrationEvent) + { + EventDispatcher.InterceptorRegistrationEvent registrationEvent = + (EventDispatcher.InterceptorRegistrationEvent) event; + + if (registrationEvent.getType() == EventDispatcher.InterceptorRegistrationEvent.Type.INSERTED) + { + // use the Processing Pattern Executor Service for recovery + recoveryService = + CacheFactory.getConfigurableCacheFactory().getResourceRegistry().getResource(ExecutorService.class, + ProcessingPattern + .RESOURCE); + + EventDispatcher dispatcher = registrationEvent.getDispatcher(); + + if (dispatcher instanceof ServiceDispatcher) + { + ServiceDispatcher serviceDispatcher = (ServiceDispatcher) dispatcher; + + serviceDispatcher.getService().addMemberListener(this); + } + } + } + else if (event instanceof EntryEvent) { processEntryEvent((EntryEvent) event); } @@ -62,6 +113,7 @@ else if (event instanceof TransferEvent) } } + /** * Process the EntryEvent. * @@ -69,31 +121,40 @@ else if (event instanceof TransferEvent) */ private void processEntryEvent(EntryEvent event) { + Cluster cluster = + event.getDispatcher().getBackingMapContext().getManagerContext().getCacheService().getCluster(); + for (BinaryEntry entry : event.getEntrySet()) { - DefaultTaskProcessorMediator mediator = (DefaultTaskProcessorMediator) entry.getValue(); + DefaultTaskProcessorMediator originalMediator = (DefaultTaskProcessorMediator) entry.getOriginalValue(); + DefaultTaskProcessorMediator updatedMediator = (DefaultTaskProcessorMediator) entry.getValue(); switch (event.getType()) { - case INSERTED : - ProcessingPattern.ensureInfrastructureStarted(entry.getContext().getManager().getCacheFactory()); - mediator.onInserted(entry); - break; + case INSERTED : + ProcessingPattern.ensureInfrastructureStarted(entry.getContext().getManager().getCacheFactory()); + updatedMediator.onInserted(entry); + + registerTaskProcessorMediator(updatedMediator, cluster); + break; + + case UPDATED : + updatedMediator.onUpdated(entry); + break; - case UPDATED: - mediator.onUpdated(entry); - break; + case REMOVED : + originalMediator.onRemoved(entry); - case REMOVED: - mediator.onRemoved(entry); - break; + deregisterTaskProcessorMediator(originalMediator); + break; - default: - break; + default : + break; } } } + /** * Process the TransferEvent. * @@ -101,11 +162,14 @@ private void processEntryEvent(EntryEvent event) */ private void processTransferEvent(TransferEvent event) { - Map> map = event.getEntries(); - Set setEntries = map.get(DefaultTaskProcessorMediator.CACHENAME); + Cluster cluster = event.getDispatcher().getService().getCluster(); + Map> map = event.getEntries(); + Set setEntries = map.get(DefaultTaskProcessorMediator.CACHENAME); if (setEntries == null) + { return; + } for (BinaryEntry entry : setEntries) { @@ -113,16 +177,158 @@ private void processTransferEvent(TransferEvent event) switch (event.getType()) { - case ARRIVED: - ProcessingPattern.ensureInfrastructureStarted(entry.getContext().getManager().getCacheFactory()); - mediator.onArrived(entry); - break; - - case DEPARTING: - mediator.onDeparted(entry); - break; + case ARRIVED : + ProcessingPattern.ensureInfrastructureStarted(entry.getContext().getManager().getCacheFactory()); + mediator.onArrived(entry); + + registerTaskProcessorMediator(mediator, cluster); + break; + + case DEPARTING : + mediator.onDeparted(entry); + + deregisterTaskProcessorMediator(mediator); + break; } } } + + private void registerTaskProcessorMediator(DefaultTaskProcessorMediator mediator, + Cluster cluster) + { + final TaskProcessorMediatorKey key = mediator.getTaskProcessorKey(); + + if (key.getMemberId() > 0) + { + UID memberUID = key.getUniqueId(); + + // determine if the Member actually exists. If it doesn't we need to recover tasks! + + // determine the members of the cluster + Set memberSet = cluster.getMemberSet(); + + // ensure the member is in the cluster + boolean exists = false; + + for (Iterator iterator = memberSet.iterator(); iterator.hasNext(); ) + { + Member member = iterator.next(); + + exists = exists || member.getUid().equals(memberUID); + } + + if (exists) + { + HashSet keys = mediatorKeys.get(memberUID); + + if (keys == null) + { + synchronized (mediatorKeys) + { + keys = mediatorKeys.putIfAbsent(memberUID, new HashSet()); + + if (keys == null) + { + keys = mediatorKeys.get(memberUID); + } + + keys.add(key); + } + } + } + else + { + recoveryService.submit(new Runnable() + { + @Override + public void run() + { + // acquire the mediator cache + NamedCache taskProcessorMediatorCache = + CacheFactory.getCache(DefaultTaskProcessorMediator.CACHENAME); + + // asynchronously request recovery of the tasks + taskProcessorMediatorCache.invoke(key, + new InvokeMethodProcessor("recoverTasks")); + } + }); + } + } + } + + + private void deregisterTaskProcessorMediator(DefaultTaskProcessorMediator mediator) + { + TaskProcessorMediatorKey key = mediator.getTaskProcessorKey(); + + if (key.getMemberId() > 0) + { + UID memberUID = key.getUniqueId(); + + HashSet keys = mediatorKeys.get(memberUID); + + if (keys != null) + { + synchronized (mediatorKeys) + { + keys.remove(key); + + if (keys.isEmpty()) + { + mediatorKeys.remove(memberUID); + } + } + } + } + } + + + @Override + public void memberJoined(MemberEvent memberEvent) + { + // nothing to do when joining + } + + + @Override + public void memberLeaving(MemberEvent memberEvent) + { + // nothing to do prior to leaving + } + + + @Override + public void memberLeft(MemberEvent memberEvent) + { + UID memberUID = memberEvent.getMember().getUid(); + + // determine the TaskProcessorMediators that should be recovered due to the member leaving + final HashSet keys = mediatorKeys.remove(memberUID); + + if (keys == null) + { + // nothing to do when there are no mediators for the left member + } + else + { + recoveryService.submit(new Runnable() + { + @Override + public void run() + { + // acquire the mediator cache + NamedCache taskProcessorMediatorCache = + CacheFactory.getCache(DefaultTaskProcessorMediator.CACHENAME); + + for (TaskProcessorMediatorKey key : keys) + { + // asynchronously request recovery of the task + taskProcessorMediatorCache.invoke(key, + new InvokeMethodProcessor("recoverTasks")); + } + } + }); + } + } } diff --git a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/taskprocessor/TaskRunner.java b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/taskprocessor/TaskRunner.java index 2da2a77c..74ebf440 100755 --- a/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/taskprocessor/TaskRunner.java +++ b/coherence-processingpattern/src/main/java/com/oracle/coherence/patterns/processing/taskprocessor/TaskRunner.java @@ -102,7 +102,7 @@ public class TaskRunner implements Runnable, TaskExecutionEnvironment, ObjectCha /** * Whether the current job has been cancelled during execution. - * of Submissions offered to this {@link TaskExecutor}. + * of Submissions offered to this {@link TaskRunner}. */ private transient volatile boolean isCancelled; diff --git a/pom.xml b/pom.xml index b102202c..2f19a847 100644 --- a/pom.xml +++ b/pom.xml @@ -150,8 +150,8 @@ 4.12 3.6 1.10.19 - 4.1.0 - 12.1.2 + 4.3.0-SNAPSHOT + 12.1.3 ${oracle.coherence.base.version}-0-5 1.6.5