Skip to content

Commit

Permalink
fix apache#808 fix bug of file based manager (apache#807)
Browse files Browse the repository at this point in the history
  • Loading branch information
leizhiyuan authored and zhangthen committed Apr 16, 2019
1 parent d159f6e commit 76734a9
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@

package com.alibaba.fescar.server.session;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import com.alibaba.fescar.common.exception.ShouldNeverHappenException;
import com.alibaba.fescar.config.ConfigurationFactory;
import com.alibaba.fescar.core.constants.ConfigurationKeys;
Expand All @@ -31,6 +25,13 @@
import com.alibaba.fescar.server.store.TransactionStoreManager;
import com.alibaba.fescar.server.store.TransactionWriteStore;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
* The type File based session manager.
*
Expand All @@ -39,7 +40,7 @@
public class FileBasedSessionManager extends AbstractSessionManager implements Reloadable {

private static final int READ_SIZE = ConfigurationFactory.getInstance().getInt(
ConfigurationKeys.SERVICE_SESSION_RELOAD_READ_SIZE, 100);
ConfigurationKeys.SERVICE_SESSION_RELOAD_READ_SIZE, 100);

/**
* Instantiates a new File based session manager.
Expand All @@ -50,7 +51,7 @@ public class FileBasedSessionManager extends AbstractSessionManager implements R
*/
public FileBasedSessionManager(String name, String sessionStoreFilePath) throws IOException {
super(name);
transactionStoreManager = new FileTransactionStoreManager(sessionStoreFilePath + name, this);
transactionStoreManager = new FileTransactionStoreManager(sessionStoreFilePath + File.separator + name, this);
}

@Override
Expand Down Expand Up @@ -128,7 +129,7 @@ private void restore(List<TransactionWriteStore> stores, Map<Long, BranchSession
switch (logOperation) {
case GLOBAL_ADD:
case GLOBAL_UPDATE: {
GlobalSession globalSession = (GlobalSession)sessionStorable;
GlobalSession globalSession = (GlobalSession) sessionStorable;
long tid = globalSession.getTransactionId();
GlobalSession foundGlobalSession = sessionMap.get(tid);
if (foundGlobalSession == null) {
Expand All @@ -139,7 +140,7 @@ private void restore(List<TransactionWriteStore> stores, Map<Long, BranchSession
break;
}
case GLOBAL_REMOVE: {
GlobalSession globalSession = (GlobalSession)sessionStorable;
GlobalSession globalSession = (GlobalSession) sessionStorable;
long tid = globalSession.getTransactionId();
if (sessionMap.remove(tid) == null) {
if (LOGGER.isInfoEnabled()) {
Expand All @@ -150,7 +151,7 @@ private void restore(List<TransactionWriteStore> stores, Map<Long, BranchSession
}
case BRANCH_ADD:
case BRANCH_UPDATE: {
BranchSession branchSession = (BranchSession)sessionStorable;
BranchSession branchSession = (BranchSession) sessionStorable;
long tid = branchSession.getTransactionId();
GlobalSession foundGlobalSession = sessionMap.get(tid);
if (foundGlobalSession == null) {
Expand All @@ -166,15 +167,15 @@ private void restore(List<TransactionWriteStore> stores, Map<Long, BranchSession
break;
}
case BRANCH_REMOVE: {
BranchSession branchSession = (BranchSession)sessionStorable;
BranchSession branchSession = (BranchSession) sessionStorable;
long tid = branchSession.getTransactionId();
long bid = branchSession.getBranchId();
GlobalSession found = sessionMap.get(tid);
if (found == null) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"GlobalSession To Be Updated (Remove Branch) Does Not Exists [" + bid + "/" + tid
+ "]");
"GlobalSession To Be Updated (Remove Branch) Does Not Exists [" + bid + "/" + tid
+ "]");
}
} else {
BranchSession theBranch = found.getBranch(bid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@

package com.alibaba.fescar.server.session;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;

import com.alibaba.fescar.common.exception.ShouldNeverHappenException;
import com.alibaba.fescar.common.exception.StoreException;
import com.alibaba.fescar.common.util.StringUtils;
Expand All @@ -29,11 +24,14 @@
import com.alibaba.fescar.core.constants.ConfigurationKeys;
import com.alibaba.fescar.core.exception.TransactionException;
import com.alibaba.fescar.core.model.GlobalStatus;

import com.alibaba.fescar.core.store.StoreMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;

/**
* The type Session holder.
*
Expand Down Expand Up @@ -77,16 +75,16 @@ public class SessionHolder {
* @throws IOException the io exception
*/
public static void init(String mode) throws IOException {
if(StringUtils.isBlank(mode)){
if (StringUtils.isBlank(mode)) {
//use default
mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
}
//the store mode
StoreMode storeMode = StoreMode.valueof(mode);
if(StoreMode.DB.equals(storeMode)){
if (StoreMode.DB.equals(storeMode)) {
//TODO database store

}else if(StoreMode.FILE.equals(storeMode)){
} else if (StoreMode.FILE.equals(storeMode)) {
//file store
String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR);
if (sessionStorePath == null) {
Expand All @@ -96,13 +94,13 @@ public static void init(String mode) throws IOException {
ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME);
RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME);
RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME);
}else {
} else {
//unknown store
throw new IllegalArgumentException("unknown store mode:" + mode);
}

if (ROOT_SESSION_MANAGER instanceof Reloadable) {
((Reloadable)ROOT_SESSION_MANAGER).reload();
((Reloadable) ROOT_SESSION_MANAGER).reload();

Collection<GlobalSession> reloadedSessions = ROOT_SESSION_MANAGER.allSessions();
if (reloadedSessions != null && !reloadedSessions.isEmpty()) {
Expand Down Expand Up @@ -142,7 +140,7 @@ public static void init(String mode) throws IOException {
case CommitRetrying:
try {
globalSession.addSessionLifecycleListener(
getRetryCommittingSessionManager());
getRetryCommittingSessionManager());
getRetryCommittingSessionManager().addGlobalSession(globalSession);
} catch (TransactionException e) {
throw new ShouldNeverHappenException(e);
Expand All @@ -154,7 +152,7 @@ public static void init(String mode) throws IOException {
case TimeoutRollbackRetrying:
try {
globalSession.addSessionLifecycleListener(
getRetryRollbackingSessionManager());
getRetryRollbackingSessionManager());
getRetryRollbackingSessionManager().addGlobalSession(globalSession);
} catch (TransactionException e) {
throw new ShouldNeverHappenException(e);
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/resources/file.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ store {

## file store
file {
dir = "root.data"
dir = "sessionStore"

# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
max-branch-session-size = 16384
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@

package com.alibaba.fescar.server.session;

import com.alibaba.fescar.core.constants.ConfigurationKeys;
import com.alibaba.fescar.core.store.StoreMode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.IOException;

import org.junit.Test;

import static com.alibaba.fescar.server.session.SessionHolder.ROOT_SESSION_MANAGER_NAME;
import static org.assertj.core.api.Assertions.assertThat;

/**
* The type Session holder test.
Expand All @@ -31,25 +35,33 @@
* @date 2019 /3/6 The type Session holder test.
*/
public class SessionHolderTest {
private String pathname;

@Before
public void before() {
String sessionStorePath = SessionHolder.CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR);
//delete file previously created
pathname = sessionStorePath + File.separator + ROOT_SESSION_MANAGER_NAME;
}

/**
* Test init.
*
* @throws IOException the io exception
*/
@Test
public void testInit() throws IOException {
String sessionStorePath = System.getProperty("user.dir") + File.separator + "sessionStore";
//delete file previously created
File rootSessionFile = new File(sessionStorePath + File.separator + ROOT_SESSION_MANAGER_NAME);
File rootSessionFile = new File(pathname);
if (rootSessionFile.exists()) {
rootSessionFile.delete();
}
File file = new File(sessionStorePath);
if (!file.exists() && !file.isDirectory()) {
file.mkdirs();
final String mode = StoreMode.FILE.toString();
SessionHolder.init(mode);
final File actual = new File(pathname);
Assert.assertTrue(actual.exists());
Assert.assertTrue(actual.isFile());
}

@After
public void after() {
final File actual = new File(pathname);
if (actual.exists()) {
actual.delete();
}
SessionHolder.init(sessionStorePath);
assertThat(new File(sessionStorePath + File.separator + ROOT_SESSION_MANAGER_NAME)).exists().isFile();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

package com.alibaba.fescar.server.store;

import java.io.File;

import com.alibaba.fescar.config.Configuration;
import com.alibaba.fescar.config.ConfigurationFactory;
import com.alibaba.fescar.core.constants.ConfigurationKeys;
import com.alibaba.fescar.core.model.BranchStatus;
import com.alibaba.fescar.core.model.BranchType;
import com.alibaba.fescar.core.model.GlobalStatus;
Expand All @@ -27,11 +28,12 @@
import com.alibaba.fescar.server.session.GlobalSession;
import com.alibaba.fescar.server.session.SessionHelper;
import com.alibaba.fescar.server.session.SessionHolder;

import org.junit.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.io.File;

/**
* The type Session store test.
*/
Expand All @@ -42,19 +44,22 @@ public class SessionStoreTest {
*/
public static final String RESOURCE_ID = "mysql:xxx";


private static Configuration CONFIG = ConfigurationFactory.getInstance();

/**
* Clean.
*
* @throws Exception the exception
*/
@BeforeMethod
public void clean() throws Exception {
File rootDataFile = new File("./" + SessionHolder.ROOT_SESSION_MANAGER_NAME);
File rootDataFileHis = new File("./" + SessionHolder.ROOT_SESSION_MANAGER_NAME + ".1");
String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR);
File rootDataFile = new File(sessionStorePath + File.separator + SessionHolder.ROOT_SESSION_MANAGER_NAME);
File rootDataFileHis = new File(sessionStorePath + File.separator + SessionHolder.ROOT_SESSION_MANAGER_NAME + ".1");

if (rootDataFile.exists()) {
rootDataFile.delete();
System.currentTimeMillis();
}
if (rootDataFileHis.exists()) {
rootDataFileHis.delete();
Expand All @@ -76,7 +81,7 @@ public void testRestoredFromFile() throws Exception {
globalSession.begin();

BranchSession branchSession1 = SessionHelper.newBranchByGlobal(globalSession, BranchType.AT, RESOURCE_ID,
"ta:1,2;tb:3", "xxx");
"ta:1,2;tb:3", "xxx");
branchSession1.lock();
globalSession.addBranch(branchSession1);

Expand Down Expand Up @@ -145,7 +150,7 @@ public void testRestoredFromFileAsyncCommitting() throws Exception {
globalSession.begin();

BranchSession branchSession1 = SessionHelper.newBranchByGlobal(globalSession, BranchType.AT, RESOURCE_ID,
"ta:1", "xxx");
"ta:1", "xxx");
Assert.assertTrue(branchSession1.lock());
globalSession.addBranch(branchSession1);

Expand All @@ -167,7 +172,7 @@ public void testRestoredFromFileAsyncCommitting() throws Exception {
Assert.assertEquals(reloadSession.getStatus(), GlobalStatus.AsyncCommitting);

GlobalSession sessionInAsyncCommittingQueue = SessionHolder.getAsyncCommittingSessionManager()
.findGlobalSession(tid);
.findGlobalSession(tid);
Assert.assertTrue(reloadSession == sessionInAsyncCommittingQueue);

// No locking for session in AsyncCommitting status
Expand All @@ -192,7 +197,7 @@ public void testRestoredFromFileCommitRetry() throws Exception {
globalSession.begin();

BranchSession branchSession1 = SessionHelper.newBranchByGlobal(globalSession, BranchType.AT, RESOURCE_ID,
"ta:1", "xxx");
"ta:1", "xxx");
branchSession1.lock();
globalSession.addBranch(branchSession1);

Expand All @@ -216,7 +221,7 @@ public void testRestoredFromFileCommitRetry() throws Exception {
Assert.assertEquals(reloadSession.getStatus(), GlobalStatus.CommitRetrying);

GlobalSession sessionInRetryCommittingQueue = SessionHolder.getRetryCommittingSessionManager()
.findGlobalSession(tid);
.findGlobalSession(tid);
Assert.assertTrue(reloadSession == sessionInRetryCommittingQueue);
BranchSession reloadBranchSession = reloadSession.getBranch(branchSession1.getBranchId());
Assert.assertEquals(reloadBranchSession.getStatus(), BranchStatus.PhaseTwo_CommitFailed_Retryable);
Expand Down Expand Up @@ -244,7 +249,7 @@ public void testRestoredFromFileRollbackRetry() throws Exception {
globalSession.begin();

BranchSession branchSession1 = SessionHelper.newBranchByGlobal(globalSession, BranchType.AT, RESOURCE_ID,
"ta:1", "xxx");
"ta:1", "xxx");
branchSession1.lock();
globalSession.addBranch(branchSession1);

Expand All @@ -268,7 +273,7 @@ public void testRestoredFromFileRollbackRetry() throws Exception {
Assert.assertEquals(reloadSession.getStatus(), GlobalStatus.RollbackRetrying);

GlobalSession sessionInRetryRollbackingQueue = SessionHolder.getRetryRollbackingSessionManager()
.findGlobalSession(tid);
.findGlobalSession(tid);
Assert.assertTrue(reloadSession == sessionInRetryRollbackingQueue);
BranchSession reloadBranchSession = reloadSession.getBranch(branchSession1.getBranchId());
Assert.assertEquals(reloadBranchSession.getStatus(), BranchStatus.PhaseTwo_RollbackFailed_Retryable);
Expand Down Expand Up @@ -296,7 +301,7 @@ public void testRestoredFromFileRollbackFailed() throws Exception {
globalSession.begin();

BranchSession branchSession1 = SessionHelper.newBranchByGlobal(globalSession, BranchType.AT, RESOURCE_ID,
"ta:1", "xxx");
"ta:1", "xxx");
branchSession1.lock();
globalSession.addBranch(branchSession1);

Expand Down

0 comments on commit 76734a9

Please sign in to comment.