Skip to content

Commit

Permalink
Issue #49: Resolved inappropriate use of CacheFactory.ensureCluster()…
Browse files Browse the repository at this point in the history
…, replacing with CacheFactory.getCluster()

Issue #159: Introduced ability to provide a ConfigurableCacheFactory when creating a ProcessingSession
Issue #160: Ensure consistent use of ClassLoaders based on calling context
Issue #161: Ensure Processing Pattern is initialized using the Cache Configuration LifecycleEvents
Issue #162: Introduce Shared ExecutorService for internal background tasks
Issue #163: Resolves fail-over/fail-back of Grid-based Tasks
  • Loading branch information
Brian Oliver committed Feb 21, 2017
1 parent 5d57a63 commit 5466f7b
Show file tree
Hide file tree
Showing 23 changed files with 628 additions and 323 deletions.
29 changes: 29 additions & 0 deletions coherence-incubator-site/src/site/markdown/history.md.vm
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,35 @@ reverse chronological order.

* Brian Oliver

<h4>coherence-processingpattern</h4>

* [Issue #49](https://github.com/coherence-community/coherence-incubator/issues/49):
Resolved inappropriate use of CacheFactory.ensureCluster(), replacing with CacheFactory.getCluster().

* [Issue #159](https://github.com/coherence-community/coherence-incubator/issues/159):
Introduced ability to provide a ConfigurableCacheFactory when creating a ProcessingSession.

* [Issue #160](https://github.com/coherence-community/coherence-incubator/issues/160):
Ensure consistent use of ClassLoaders based on calling context.

* [Issue #161](https://github.com/coherence-community/coherence-incubator/issues/161):
Ensure Processing Pattern is initialized using the Cache Configuration LifecycleEvents.

* [Issue #162](https://github.com/coherence-community/coherence-incubator/issues/162):
Introduce Shared ExecutorService for internal background tasks.

* [Issue #163](https://github.com/coherence-community/coherence-incubator/issues/163):
Resolves fail-over/fail-back of Grid-based Tasks.


--------------------------------------------------------------------------------

<h3>Version: 12.4.0 built on 2017-09-28</h3>

<h4>Source and Documentation Contributors</h4>

* Brian Oliver

<h4>Global and Cross-Module Changes</h4>

* [Issue #150](https://github.com/coherence-community/coherence-incubator/issues/150): Upgraded to require
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,19 @@
import com.tangosol.net.CacheFactory;

import java.io.IOException;
import java.util.Random;

/**
* A simple restartable task.
* A simple restartable task that delays a random amount of time between 0 and 1 seconds.
* <p>
* Copyright (c) 2009. All Rights Reserved. Oracle Corporation.<br>
* Copyright (c) 2017. All Rights Reserved. Oracle Corporation.<br>
* Oracle is a registered trademark of Oracle Corporation and/or its affiliates.
*
* @author Christer Fahlgren
* @author Brian Oliver
*/
public class RestartTask implements ResumableTask, PortableObject
{
private String sName;
private String name;


/**
Expand All @@ -58,56 +59,53 @@ public RestartTask()
/**
* Constructs a {@link RestartTask}.
*
* @param sName
* @param name the name of the task (that is returned)
*/
public RestartTask(String sName)
public RestartTask(String name)
{
this.sName = sName;
this.name = name;
}


/**
* {@inheritDoc}
*/
@Override
public String toString()
{
return "RestartTask [" + (sName != null ? "sName=" + sName : "") + "]";
return "RestartTask [" + (name != null ? "sName=" + name : "") + "]";
}


/**
* Method description
*
* @param oEnvironment
*
* @return
*/
@Override
public Object run(TaskExecutionEnvironment oEnvironment)
{
CacheFactory.log(sName + " processed...", CacheFactory.LOG_ALWAYS);

return sName;
CacheFactory.log(name + " processing...", CacheFactory.LOG_ALWAYS);

Random random = new Random();

try
{
Thread.sleep(random.nextInt(1000));
}
catch (InterruptedException e)
{
CacheFactory.log(name + " interrupted!");
}

CacheFactory.log(name + " processed...", CacheFactory.LOG_ALWAYS);

return name;
}


/**
* {@inheritDoc}
*/
@Override
public void readExternal(PofReader reader) throws IOException
{
this.sName = reader.readString(0);
this.name = reader.readString(0);
}


/**
* {@inheritDoc}
*/
@Override
public void writeExternal(PofWriter writer) throws IOException
{
writer.writeString(0, sName);
writer.writeString(0, name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,136 +25,135 @@

package com.oracle.coherence.patterns.processing.taskprocessor;

import com.oracle.bedrock.junit.CoherenceClusterResource;
import com.oracle.bedrock.junit.StorageDisabledMember;
import com.oracle.bedrock.runtime.LocalPlatform;
import com.oracle.bedrock.runtime.network.AvailablePortIterator;
import org.junit.BeforeClass;
import com.oracle.bedrock.runtime.coherence.options.CacheConfig;
import com.oracle.bedrock.runtime.coherence.options.ClusterPort;
import com.oracle.bedrock.runtime.coherence.options.LocalHost;
import com.oracle.bedrock.runtime.coherence.options.LocalStorage;
import com.oracle.bedrock.runtime.coherence.options.Multicast;
import com.oracle.bedrock.runtime.coherence.options.Pof;
import com.oracle.bedrock.runtime.coherence.options.RoleName;
import com.oracle.bedrock.runtime.concurrent.runnable.RuntimeHalt;
import com.oracle.bedrock.runtime.java.options.SystemProperty;
import com.oracle.bedrock.runtime.options.DisplayName;
import com.oracle.bedrock.util.Capture;
import com.oracle.coherence.common.identifiers.StringBasedIdentifier;
import com.oracle.coherence.patterns.processing.ProcessingSession;
import com.oracle.coherence.patterns.processing.SubmissionConfiguration;
import com.oracle.coherence.patterns.processing.SubmissionOutcome;
import com.oracle.coherence.patterns.processing.SubmissionRetentionPolicy;
import com.oracle.coherence.patterns.processing.internal.DefaultProcessingSession;
import com.oracle.coherence.patterns.processing.internal.DefaultSubmissionConfiguration;
import com.tangosol.net.ConfigurableCacheFactory;
import org.junit.Rule;
import org.junit.Test;

import java.net.UnknownHostException;
import java.util.Date;
import java.text.DateFormat;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;

/**
* Rolling Restart Test
* <p>
* Copyright (c) 2009. All Rights Reserved. Oracle Corporation.<br>
* Copyright (c) 2017. All Rights Reserved. Oracle Corporation.<br>
* Oracle is a registered trademark of Oracle Corporation and/or its affiliates.
*
* @author Noah Arliss
* @author Brian Oliver
*/
public class RollingRestartTest
{
private final static int cServers = 2;
private final static int cSubmissions = 250;
private final static int cIterations = 5;
private static AvailablePortIterator portIterator;
private final static long CLUSTER_JOIN_TIMEOUT = 10000;
/**
* The number of rolling-restart iterations.
*/
private final static int iterations = 5;

/**
* The number of task submissions per iteration.
*/
private final static int submissionsPerIteration = 250;

/**
* Method description
*
* @throws UnknownHostException
* Establish a {@link CoherenceClusterResource} for our test.
*/
@BeforeClass
public static void setup() throws UnknownHostException
{
portIterator = LocalPlatform.get().getAvailablePorts();
}
@Rule
public CoherenceClusterResource coherenceResource =
new CoherenceClusterResource()
.using(LocalPlatform.get())
.with(LocalHost.only(),
Multicast.ttl(0),
ClusterPort.automatic(),
CacheConfig.of("coherence-processingpattern-restart-cache-config.xml"),
Pof.config("coherence-processingpattern-test-pof-config.xml"),
SystemProperty.of("tangosol.coherence.extend.address", LocalPlatform.get().getLoopbackAddress().getHostAddress()),
SystemProperty.of("tangosol.coherence.extend.port", Capture.of(LocalPlatform.get().getAvailablePorts())))
.include(4,
DisplayName.of("storage"),
RoleName.of("storage"),
LocalStorage.enabled(),
SystemProperty.of("tangosol.coherence.extend.enabled", false));


/**
* Method description
*
* @throws Throwable
* Ensure that the Processing Pattern can recover tests during a rolling restart.
*/
@Test
public void testRollingRestart() throws Throwable
public void shouldRecoverTasksDuringRollingRestart() throws Throwable
{
// JavaApplication servers[] = new JavaApplication[cServers];
// try
// {
// System.setProperty("tangosol.coherence.log", "client.txt");
// int clusterPort = portIterator.next();
// CoherenceServerBuilder builder = new CoherenceServerBuilder()
// .setEnvironmentVariables(PropertiesBuilder.fromCurrentEnvironmentVariables())
// .setSystemProperties(PropertiesBuilder.fromCurrentSystemProperties()).setClusterName("testCluster")
// .setCacheConfigURI("coherence-processingpattern-restart-cache-config.xml")
// .setLocalHostAddress("127.0.0.1").setMulticastTTL(0)
// .setPofConfigURI("coherence-processingpattern-test-pof-config.xml").setClusterPort(clusterPort)
// .setLogLevel(6);
// Properties props = new Properties();
// props.putAll(builder.getSystemPropertiesBuilder().realize(null));
// System.setProperties(props);
// System.setProperty(CoherenceServerBuilder.PROPERTY_DISTRIBUTED_LOCALSTORAGE, "false");
// for (int x = 0; x < cServers; x++)
// {
// servers[x] = builder.realize("TestServer" + x, new LogApplicationConsole("TestServer" + x + ".log"));
// }
// wait(CLUSTER_JOIN_TIMEOUT * cServers, "for test servers to start.");
//
// //setup processing session
// ProcessingSession session = new DefaultProcessingSession(
// StringBasedIdentifier.newInstance("TaskExecutionSample"
// + DateFormat.getDateTimeInstance().format(System.currentTimeMillis())));
// SubmissionConfiguration config = new DefaultSubmissionConfiguration();
// // Lets do cIterations rolling restarts
// for (int x = 0; x < cIterations; ++x)
// {
// SubmissionOutcome[] results = new SubmissionOutcome[cSubmissions];
// System.out.println("Submitting work to the grid...");
// //Submit cSubmissions to the cluster for processing
// for (int y = 0; y < cSubmissions; y++)
// {
// String id = "Task[" + x + ":" + y + "]";
// results[y] = session.submit(new RestartTask(id), config, StringBasedIdentifier.newInstance(id), SubmissionRetentionPolicy.RemoveOnFinalState, null);
// }
// //wait for results to come back
// for (int y = 0; y < cSubmissions; y++)
// {
// System.out.println("Client received result from: " + results[y].get());
// }
// //Now lets do a rolling restart
// for (int y = 0; y < cServers; y++)
// {
// System.out.println("Stopping server " + y);
// servers[y].destroy();
// wait(CLUSTER_JOIN_TIMEOUT, "for test server " + y + " to stop");
// System.out.println("Starting server " + y);
// servers[y] = builder.realize("TestServer" + y, new LogApplicationConsole("TestServer"+y+".log"));
// wait(CLUSTER_JOIN_TIMEOUT, "for test server " + y + " to start");
// }
// //Now that we've done a rolling restart let the loop submit work again.
// System.out.println("Iteration " + x + " done successfully...");
// }
// System.out.println("All done successfully...");
// }
// catch (Throwable e)
// {
// System.out.println(e);
// e.printStackTrace();
// throw e;
// }
// finally
// {
// for (int x = 0; x < cServers; x++)
// {
// servers[x].destroy();
// }
// }
}
// acquire a storage disabled cache factory for the cluster
ConfigurableCacheFactory cacheFactory = coherenceResource.createSession(new StorageDisabledMember());

// establish a processing session
ProcessingSession session =
new DefaultProcessingSession(StringBasedIdentifier.newInstance("TaskExecutionSample"
+ DateFormat.getDateTimeInstance()
.format(System.currentTimeMillis())),
cacheFactory);

/**
* <p>A simple method to wait a specified amount of time, with a message to stdout.</p>
*
* @param time The time to wait in ms.
* @param rationale The rationale (message) for waiting.
* @throws InterruptedException When interrupted while waiting.
*/
public void wait(long time,
String rationale) throws InterruptedException
{
System.out.printf("%s: Waiting %dms %s\n", new Date(), time, rationale);
Thread.sleep(time);
SubmissionConfiguration config = new DefaultSubmissionConfiguration();

// perform a number of rolling restart iterations
for (int x = 0; x < iterations; ++x)
{
SubmissionOutcome[] results = new SubmissionOutcome[submissionsPerIteration];

System.out.println("Submitting work to the cluster...");

// submit a number of tasks for processing
for (int y = 0; y < submissionsPerIteration; y++)
{
String id = "Task[" + x + ":" + y + "]";

results[y] = session.submit(new RestartTask(id),
config,
StringBasedIdentifier.newInstance(id),
SubmissionRetentionPolicy.RemoveOnFinalState,
null);
}

// wait for results to come back
for (int y = 0; y < submissionsPerIteration; y++)
{
String id = "Task[" + x + ":" + y + "]";

System.out.println("Waiting for result... " + id);

assertThat(results[y].get(), is(id));

System.out.println("Received result... " + id);

// System.out.println("Client received result from: " + results[y].get());
}

// randomly terminate and relaunch all of the servers
coherenceResource.getCluster().relaunch();

// we're done submitting work for this iteration
System.out.println("Iteration " + x + " done successfully...");
}

System.out.println("All done successfully...");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
Copyright (c) 2009. All Rights Reserved. Oracle Corporation.
-->
<cache-config
xmlns:processing="class:com.oracle.coherence.patterns.processing.configuration.ProcessingPatternNamespaceHandler"
xmlns:processing="class:com.oracle.coherence.patterns.processing.config.xml.ProcessingPatternNamespaceHandler"

xmlns:element="class://com.oracle.coherence.common.namespace.preprocessing.XmlPreprocessingNamespaceHandler"
element:introduce-cache-config="coherence-processingpattern-cache-config.xml">
Expand Down
Loading

0 comments on commit 5466f7b

Please sign in to comment.