Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EAP7-2103 #4

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class RecoveryEnvironmentBean implements RecoveryEnvironmentBeanMBean
private volatile int expiryScanInterval = 12; // hours
private volatile int transactionStatusManagerExpiryTime = 12; // hours

private volatile boolean waitForRecovery = false;

@ConcatenationPrefix(prefix = "com.arjuna.ats.arjuna.recovery.expiryScanner")
private volatile List<String> expiryScannerClassNames = new ArrayList<String>();
private volatile List<ExpiryScanner> expiryScanners = null;
Expand Down Expand Up @@ -597,4 +599,32 @@ public void setTimeoutSocket(boolean timeoutSocket)
{
this.timeoutSocket = timeoutSocket;
}

/**
* Returns information about the behaviour of the Recovery Manager when suspending.
* Note that this functionality requires that the transaction system and the transaction reaper
* are disabled in advance.
*
* @return true if the recovery manager should wait that all RecoveryModules implementing
* SuspendBlockingRecoveryModule recover all their transactions before shutting down;
* false otherwise.
*/
public boolean isWaitForFinalRecovery()
{
return waitForRecovery;
}

/**
* Configure the suspension behaviour of the Recovery Manager.
* Note that this functionality requires that the transaction system and the transaction reaper
* are disabled in advance.
*
* @param waitForRecovery true if the recovery manager should wait that all RecoveryModules implementing
* SuspendBlockingRecoveryModule recover all their transactions before shutting down;
* false otherwise.
*/
public void setWaitForFinalRecovery(boolean waitForRecovery)
{
this.waitForRecovery = waitForRecovery;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -725,18 +725,7 @@ private final void shutdown(boolean waitForTransactions)
notifyAll();
}

/*
* Wait for all of the transactions to
* terminate normally.
*/
while (!_reaperElements.isEmpty()) {

try {
this.wait(1000);
}
catch (final Exception ex) {
}
}
waitForAllTxnsToTerminate();

_reaperThread.shutdown();

Expand Down Expand Up @@ -768,6 +757,29 @@ private final void shutdown(boolean waitForTransactions)
_reaperWorkerThread = null;
}

/**
* <p>
* Wait for all the transactions to terminate normally.
* </p>
* <p>
* Note:
* this method must only be used when the transaction
* system is not currently capable of creating new
* transactions (via TxControl.disable()).
* </p>
*/
public void waitForAllTxnsToTerminate() {
synchronized (this) {
while (!_reaperElements.isEmpty()) {
try {
this.wait(1000);
} catch (final InterruptedException ignore) {
// Ignored
}
}
}
}

// called (indirectly) by user code doing removals on e.g. commit/rollback
// does not reset the wakeup time - we prefer leaving an unnecessary wakeup as it's
// cheaper than locking to recalculate the new time here.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.arjuna.ats.arjuna.recovery;

/**
* <p>
* When a Recovery Module implements this interface, it can veto the suspension
* of the Recovery Manager.
* </p>
* <p>
* Note for the implementer:
* While the Recovery Manager is suspending, Recovery Modules
* (implementing SuspendBlockingRecoveryModule) that indicate they do not want
* to block recovery cannot change their decision to want to block recovery
* in subsequent recovery cycles.
* In other words, during the suspension of the Recovery Manager, once a
* Recovery Module (implementing SuspendBlockingRecoveryModule)
* switches from `shouldBlockShutdown() == true` to `shouldBlockShutdown() == false`,
* it cannot change its mind.
* </p>
*/
public interface SuspendBlockingRecoveryModule extends RecoveryModule {

/**
* <p>
* This method returns true when the Recovery Manager should block its
* suspension, false otherwise.
* </p>
* <p>
* Note: This method should be invoked only at the end of the recovery cycle,
* i.e. at the end of the second pass. Any invocation that happens before that
* point does not guarantee to return the correct value.
* </p>
*
* @return whether this implementation of SuspendBlockingRecoveryModule
* should block the suspension of the Recovery Manager or not
*/
default boolean shouldBlockSuspension() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.arjuna.ats.arjuna.objectstore.StateStatus;
import com.arjuna.ats.arjuna.objectstore.StoreManager;
import com.arjuna.ats.arjuna.recovery.RecoverAtomicAction;
import com.arjuna.ats.arjuna.recovery.RecoveryModule;
import com.arjuna.ats.arjuna.recovery.SuspendBlockingRecoveryModule;
import com.arjuna.ats.arjuna.recovery.TransactionStatusConnectionManager;
import com.arjuna.ats.arjuna.state.InputObjectState;
import com.arjuna.ats.internal.arjuna.common.UidHelper;
Expand All @@ -27,7 +27,7 @@
* It is responsible for recovering failed AtomicAction transactions.
*/

public class AtomicActionRecoveryModule implements RecoveryModule
public class AtomicActionRecoveryModule implements SuspendBlockingRecoveryModule
{
public AtomicActionRecoveryModule()
{
Expand All @@ -50,6 +50,8 @@ public void periodicWorkFirstPass()
{
// Transaction type
boolean AtomicActions = false ;
// Does not block the suspension of the Recovery Manager by default
this.shouldBlockSuspension = false;

// uids per transaction type
InputObjectState aa_uids = new InputObjectState() ;
Expand Down Expand Up @@ -214,6 +216,7 @@ private void processTransactionsStatus()
// to recover anything but if this module is still configured it would
// get an NPE
if (_transactionUidVector != null) {

// Process the Vector of transaction Uids
Enumeration transactionUidEnum = _transactionUidVector.elements();

Expand All @@ -225,6 +228,20 @@ private void processTransactionsStatus()
_transactionType) != StateStatus.OS_UNKNOWN) {
doRecoverTransaction(currentUid);
}

/*
* If the current AtomicAction has been recovered,
* its StateStatus should be OS_UNKNOWN.
* If that is not the case, it means that the current
* AtomicAction still needs to be recovered and this
* SuspendBlockingRecoveryModule implementation should
* block the suspension of the Recovery Manager
*/
if (_recoveryStore.currentState(currentUid,
_transactionType) != StateStatus.OS_UNKNOWN) {
this.shouldBlockSuspension = true;
}

} catch (ObjectStoreException ex) {
tsLogger.i18NLogger
.warn_recovery_AtomicActionRecoveryModule_3(
Expand All @@ -234,6 +251,11 @@ private void processTransactionsStatus()
}
}

@Override
public boolean shouldBlockSuspension() {
return this.shouldBlockSuspension;
}

// 'type' within the Object Store for AtomicActions.
private String _transactionType = new AtomicAction().type() ;

Expand All @@ -248,4 +270,11 @@ private void processTransactionsStatus()
// processes(JVMs) on this system/node.
private TransactionStatusConnectionManager _transactionStatusConnectionMgr ;

/*
* This class field is not declared as volatile as Recovery Module's
* first pass and second pass can only be invoked sequentially and by
* one thread per time.
*/
private boolean shouldBlockSuspension;

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.arjuna.ats.arjuna.logging.tsLogger;
import com.arjuna.ats.arjuna.recovery.RecoveryManager;
import com.arjuna.ats.arjuna.recovery.RecoveryModule;
import com.arjuna.ats.arjuna.recovery.SuspendBlockingRecoveryModule;
import com.arjuna.ats.arjuna.utils.Utility;

/**
Expand Down Expand Up @@ -186,31 +187,80 @@ public void shutdown (boolean async)
}

/**
* make all scanning operations suspend.
*
* This switches the recovery operation mode to SUSPENDED. Any attempt to start a new scan either by an ad hoc
* threads or by the periodic recovery thread will suspend its thread until the mode changes. If a scan is in
* progress when this method is called it will complete its scan without suspending.
*
* @param async false if the calling thread should wait for any in-progress scan to complete before returning
* @return the previous mode before attempting the suspend
* <p>
* Make all scanning operations suspend.
* </p>
* <p>
* This switches the recovery operation mode to SUSPENDED. Any attempt to start a new scan either by an ad hoc
* threads or by the periodic recovery thread will suspend its thread until the mode changes. If a scan is in
* progress when this method is called it will complete its scan without suspending.
* </p>
* <p>
* Note that this method is also influenced by
* <code>RecoveryEnvironmentBean.setWaitForFinalRecovery(boolean)</code>.
* Please, refer to the its javadoc for further details.
* In case <code>isWaitForFinalRecovery() == true</code>, it is <b>very important</b> that, before invoking
* <code>suspendScan</code>:
* * The Transaction System (i.e. TxControl) gets disabled (i.e. no new transactions will be created)
* * The Transaction Reaper finishes monitoring all in-flight transactions
* </p>
* @param async false if the calling thread should wait for any in-progress scan to complete before returning.
* In case <code>isWaitForFinalRecovery() == true</code>, this parameter is override.
* @return the previous mode before attempting the suspension
*/

public Mode suspendScan (boolean async)
{
synchronized (_stateLock)
{
// only switch and kick everyone if we are currently ENABLED
Mode currentMode = getMode();

/*
* isWaitForFinalRecovery checks whether suspension should delay until RecoveryModules
* implementing SuspendBlockingRecoveryModule recover all their transactions.
*/
if (currentMode == Mode.ENABLED && recoveryPropertyManager.getRecoveryEnvironmentBean().isWaitForFinalRecovery()) {

/*
* At least, one clear scan (after the Transaction Manager has been shut down)
* should be completed to make sure that there are no transactions to recover.
*/
doScanningWait();
doWork();

/*
* Now, it is finally possible to start checking if there are transactions that
* are still in need of recovery (or completion, in case of heuristics)
*/
while (_blockSuspension) {
/*
* There might be another thread (the embedded thread or a user-request scan)
* running the recovery cycle.
* It would be possible to use again doScanningWait() here to wait the end of
* the ongoing scan, but the best bet is to wait for the end of the next
* recycle scan with doPeriodicWait()
*/
doPeriodicWait();

// I don't know the best place to make this comment but checking for subordinates could be a new recovery module wrapping something like https://github.com/jbosstm/narayana/blob/86182416bea64368ecdfc7e78767f798b15c14db/ArjunaJTS/jtax/classes/com/arjuna/ats/internal/jta/transaction/jts/jca/XATerminatorImple.java#L438C4-L438C4
// but please know that these checks are called by external processes at any time and so that would need to be blocked while this suspension process continues and appropriately handled
}
}

// only switch and kick everyone if we are currently ENABLED
if (currentMode == Mode.ENABLED) {
if (tsLogger.logger.isDebugEnabled()) {
tsLogger.logger.debug("PeriodicRecovery: Mode <== SUSPENDED");
}
setMode(Mode.SUSPENDED);
_stateLock.notifyAll();
}
if (!async) {

// At this point, in case isWaitForFinalRecovery is equal to true, an ongoing scan
// will not recover any transaction.
// In fact, all transactions have already recovered thanks to the above logic
// triggered with isWaitForFinalRecovery == true.
if (!recoveryPropertyManager.getRecoveryEnvironmentBean().isWaitForFinalRecovery() && !async) {
// synchronous, so we keep waiting until the currently active scan stops
while (getStatus() == Status.SCANNING) {
try {
Expand Down Expand Up @@ -719,6 +769,12 @@ private void doWorkInternal()
{
// n.b. we only get here if status is SCANNING

/* Let's assume that we want to block the suspension of the Recovery Manager.
* Recovery Modules implementing SuspendBlockingRecoveryModule should prove
* this assumption wrong :-)
*/
_blockSuspension = true;

if (tsLogger.logger.isDebugEnabled()) {
tsLogger.logger.debug("Periodic recovery first pass at "+_theTimestamper.format(new Date()));
}
Expand Down Expand Up @@ -779,6 +835,7 @@ private void doWorkInternal()
}

modules = copyOfModules.elements();
boolean tempBlockSuspension = false;

while (modules.hasMoreElements())
{
Expand All @@ -787,10 +844,23 @@ private void doWorkInternal()
ClassLoader cl = switchClassLoader(m);
try {
m.periodicWorkSecondPass();
if (m instanceof SuspendBlockingRecoveryModule) {
/*
* Once Recovery Modules (implementing SuspendBlockingRecoveryModule) indicate
* they do not want to block recovery, they will never change to wanting to block
* recovery in subsequent recovery cycles. In other words, once a Recovery Module
* (implementing SuspendBlockingRecoveryModule) switches from
* `shouldBlockShutdown() == true` to `shouldBlockShutdown() == false`, it cannot
* change its mind.
*/
tempBlockSuspension = tempBlockSuspension || ((SuspendBlockingRecoveryModule) m).shouldBlockSuspension();
}
} finally {
restoreClassLoader(cl);
}

this._blockSuspension = tempBlockSuspension;

if (tsLogger.logger.isDebugEnabled()) {
tsLogger.logger.debugf("Recovery module '%s' second pass processed", m);
}
Expand Down Expand Up @@ -937,6 +1007,13 @@ private void initialise ()
*/
private WorkerService _workerService = null;

/**
* flag to signal that there are transactions to recover;
* this flag could ONLY be set to false during the
* periodic recovery cycle (i.e. doWorkInternal()).
*/
private volatile boolean _blockSuspension = true;

/*
* Read the system properties to set the configurable options
*
Expand Down
Loading