Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: test fixes #140

Merged
merged 7 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public synchronized Connection getTransactionConnection() throws SQLException, S
if (this.connection == null) {
Connection con = ConnectionPool.getConnectionForProxyStorage(this);
this.connection = new BulkImportProxyConnection(con);
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); // TODO: Change this to READ COMMITTED when mysql 5.7 support is dropped and SKIP LOCKED is implemented in loading the bulkimportUsers // https://github.com/supertokens/supertokens-mysql-plugin/issues/141
connection.setAutoCommit(false);
}
return this.connection;
Expand Down
37 changes: 23 additions & 14 deletions src/main/java/io/supertokens/storage/mysql/Start.java
Original file line number Diff line number Diff line change
Expand Up @@ -3759,23 +3759,32 @@ public int getDbActivityCount(String dbname) throws SQLException, StorageQueryEx

@Override
public void addBulkImportUsers(AppIdentifier appIdentifier, List<BulkImportUser> users)
throws StorageQueryException,
TenantOrAppNotFoundException,
io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException {
throws StorageQueryException {
try {
BulkImportQueries.insertBulkImportUsers(this, appIdentifier, users);
} catch (SQLException e) {
if (e instanceof SQLIntegrityConstraintViolationException) {
MySQLConfig config = Config.getConfig(this);
String errorMessage = e.getMessage();
if (isPrimaryKeyError(errorMessage, config.getBulkImportUsersTable())) {
throw new io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException();
}
if (isForeignKeyConstraintError(errorMessage, config.getBulkImportUsersTable(), "app_id")) {
throw new TenantOrAppNotFoundException(appIdentifier);
this.startTransaction(con -> {
try {
BulkImportQueries.insertBulkImportUsers_Transaction(this, (Connection) con.getConnection(), appIdentifier, users);
} catch (SQLException e) {
if (e instanceof SQLIntegrityConstraintViolationException) {
MySQLConfig config = Config.getConfig(this);
String errorMessage = e.getMessage();
if (isPrimaryKeyError(errorMessage, config.getBulkImportUsersTable())) {
throw new StorageTransactionLogicException(new io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException());
}
if (isForeignKeyConstraintError(errorMessage, config.getBulkImportUsersTable(), "app_id")) {
throw new TenantOrAppNotFoundException(appIdentifier);
}
}
throw new StorageQueryException(e);
}
return null;
});
} catch (StorageTransactionLogicException e) {
if(e.actualException instanceof StorageQueryException) {
throw (StorageQueryException) e.actualException;
} else {
throw new StorageQueryException(e.actualException);
}
throw new StorageQueryException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,31 +71,24 @@ public static String getQueryToCreatePaginationIndex2(Start start) {
+ Config.getConfig(start).getBulkImportUsersTable() + " (app_id, created_at DESC, id DESC)";
}

public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifier, List<BulkImportUser> users)
public static void insertBulkImportUsers_Transaction(Start start, Connection con, AppIdentifier appIdentifier, List<BulkImportUser> users)
throws SQLException, StorageQueryException {
StringBuilder queryBuilder = new StringBuilder(
"INSERT INTO " + Config.getConfig(start).getBulkImportUsersTable() + " (id, app_id, raw_data, created_at, updated_at) VALUES ");
String queryBuilder = "INSERT INTO " + Config.getConfig(start).getBulkImportUsersTable() +
" (id, app_id, raw_data, created_at, updated_at) VALUES "
+ " (?, ?, ?, ?, ?)";

int userCount = users.size();

for (int i = 0; i < userCount; i++) {
queryBuilder.append(" (?, ?, ?, ?, ?)");

if (i < userCount - 1) {
queryBuilder.append(",");
}
}

update(start, queryBuilder.toString(), pst -> {
int parameterIndex = 1;
List<PreparedStatementValueSetter> valueSetters = new ArrayList<>();
for (BulkImportUser user : users) {
pst.setString(parameterIndex++, user.id);
pst.setString(parameterIndex++, appIdentifier.getAppId());
pst.setString(parameterIndex++, user.toRawDataForDbStorage());
pst.setLong(parameterIndex++, System.currentTimeMillis());
pst.setLong(parameterIndex++, System.currentTimeMillis());
valueSetters.add(pst -> {
pst.setString(1, user.id);
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, user.toRawDataForDbStorage());
pst.setLong(4, System.currentTimeMillis());
pst.setLong(5, System.currentTimeMillis());
});
}
});

executeBatch(con, queryBuilder, valueSetters);
}

public static void updateBulkImportUserStatus_Transaction(Start start, Connection con, AppIdentifier appIdentifier,
Expand Down Expand Up @@ -134,8 +127,8 @@ public static List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing
// "SKIP LOCKED" allows other processes to skip locked rows and select the next 1000 available rows.
String selectQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable()
+ " WHERE app_id = ?"
+ " AND (status = 'NEW' OR status = 'PROCESSING')" /* 10 mins */
+ " LIMIT ? FOR UPDATE";
+ " AND (status = 'NEW' OR status = 'PROCESSING')"
+ " LIMIT ? FOR UPDATE"; // TODO: add 'SKIP LOCKED' when mysql 5.7 support is dropped // https://github.com/supertokens/supertokens-mysql-plugin/issues/141


List<BulkImportUser> bulkImportUsers = new ArrayList<>();
Expand All @@ -155,18 +148,20 @@ public static List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing
}

String updateQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable()
+ " SET status = ?, updated_at = ? WHERE app_id = ? AND id IN (" + Utils
.generateCommaSeperatedQuestionMarks(bulkImportUsers.size()) + ")";

update(sqlCon, updateQuery, pst -> {
int index = 1;
pst.setString(index++, BULK_IMPORT_USER_STATUS.PROCESSING.toString());
pst.setLong(index++, System.currentTimeMillis());
pst.setString(index++, appIdentifier.getAppId());
for (BulkImportUser user : bulkImportUsers) {
pst.setObject(index++, user.id);
}
});
+ " SET status = ?, updated_at = ? WHERE app_id = ? AND id = ?";

List<PreparedStatementValueSetter> updateSetters = new ArrayList<>();
for(BulkImportUser user : bulkImportUsers){
updateSetters.add(pst -> {
pst.setString(1, BULK_IMPORT_USER_STATUS.PROCESSING.toString());
pst.setLong(2, System.currentTimeMillis());
pst.setString(3, appIdentifier.getAppId());
pst.setObject(4, user.id);
});
}

executeBatch(sqlCon, updateQuery, updateSetters);

return bulkImportUsers;
} catch (SQLException throwables) {
throw new StorageTransactionLogicException(throwables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,9 +991,9 @@ public void testWithOneMillionUsers() throws Exception {
return;
}

Main main = startCronProcess(String.valueOf(16));
Main main = startCronProcess(String.valueOf(4));

int NUMBER_OF_USERS_TO_UPLOAD = 500000; // half million
int NUMBER_OF_USERS_TO_UPLOAD = 100000; // hundred thousand users // change this when mysql 5.7 support is dropped // https://github.com/supertokens/supertokens-mysql-plugin/issues/141

if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) {
return;
Expand Down
Loading