Skip to content

Commit

Permalink
fix: review fixes, reworking cron start/stop
Browse files Browse the repository at this point in the history
  • Loading branch information
tamassoltesz committed Dec 18, 2024
1 parent d6402ab commit dc41f30
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 599 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/supertokens/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ private void init() throws IOException, StorageQueryException {
// starts DeleteExpiredAccessTokenSigningKeys cronjob if the access token signing keys can change
Cronjobs.addCronjob(this, DeleteExpiredAccessTokenSigningKeys.init(this, uniqueUserPoolIdsTenants));

// initializes ProcessBulkImportUsers cronjob to process bulk import users - start happens via API call @see BulkImportBackgroundJobManager
ProcessBulkImportUsers.init(this, uniqueUserPoolIdsTenants);
// initializes ProcessBulkImportUsers cronjob to process bulk import users
Cronjobs.addCronjob(this, ProcessBulkImportUsers.init(this, uniqueUserPoolIdsTenants));

Cronjobs.addCronjob(this, CleanupOAuthSessionsAndChallenges.init(this, uniqueUserPoolIdsTenants));

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/supertokens/bulkimport/BulkImport.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ public class BulkImport {
// Maximum number of users that can be deleted in a single operation
public static final int DELETE_USERS_MAX_LIMIT = 500;
// Number of users to process in a single batch of ProcessBulkImportUsers Cron Job
public static final int PROCESS_USERS_BATCH_SIZE = 10000;
public static final int PROCESS_USERS_BATCH_SIZE = 8000;
// Time interval in seconds between two consecutive runs of ProcessBulkImportUsers Cron Job
public static final int PROCESS_USERS_INTERVAL_SECONDS = 1;
public static final int PROCESS_USERS_INTERVAL_SECONDS = 5*60; // 5 minutes
private static final Logger log = LoggerFactory.getLogger(BulkImport.class);

// This map allows reusing proxy storage for all tenants in the app and closing connections after import.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.supertokens.cronjobs.CronTaskTest;
import io.supertokens.pluginInterface.STORAGE_TYPE;
import io.supertokens.pluginInterface.StorageUtils;
import io.supertokens.pluginInterface.bulkimport.BulkImportStorage;
import io.supertokens.pluginInterface.bulkimport.BulkImportUser;
import io.supertokens.pluginInterface.bulkimport.sqlStorage.BulkImportSQLStorage;
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
Expand All @@ -47,7 +48,6 @@ public class ProcessBulkImportUsers extends CronTask {

public static final String RESOURCE_KEY = "io.supertokens.ee.cronjobs.ProcessBulkImportUsers";

private Integer batchSize;
private ExecutorService executorService;

private ProcessBulkImportUsers(Main main, List<List<TenantIdentifier>> tenantsInfo) {
Expand All @@ -71,44 +71,52 @@ protected void doTaskPerApp(AppIdentifier app)
BulkImportSQLStorage bulkImportSQLStorage = (BulkImportSQLStorage) StorageLayer
.getStorage(app.getAsPublicTenantIdentifier(), main);

//split the loaded users list into smaller chunks
int NUMBER_OF_BATCHES = Config.getConfig(app.getAsPublicTenantIdentifier(), main)
.getBulkMigrationParallelism();
executorService = Executors.newFixedThreadPool(NUMBER_OF_BATCHES);
String[] allUserRoles = StorageUtils.getUserRolesStorage(bulkImportSQLStorage).getRoles(app);
BulkImportUserUtils bulkImportUserUtils = new BulkImportUserUtils(allUserRoles);

List<BulkImportUser> users = bulkImportSQLStorage.getBulkImportUsersAndChangeStatusToProcessing(app,
this.batchSize);
long newUsers = bulkImportSQLStorage.getBulkImportUsersCount(app, BulkImportStorage.BULK_IMPORT_USER_STATUS.NEW);
long processingUsers = bulkImportSQLStorage.getBulkImportUsersCount(app, BulkImportStorage.BULK_IMPORT_USER_STATUS.PROCESSING);
//taking a "snapshot" here and processing in this round as many users as there are uploaded now. After this the processing will go on
//with another app and gets back here when all the apps had a chance.
long usersProcessed = 0;

if(users == null || users.isEmpty()) {
// "No more users to process!"
return;
}
while(usersProcessed < (newUsers + processingUsers)) {

//split the loaded users list into smaller chunks
int NUMBER_OF_BATCHES = Config.getConfig(app.getAsPublicTenantIdentifier(), main).getBulkMigrationParallelism();
List<List<BulkImportUser>> loadedUsersChunks = makeChunksOf(users, NUMBER_OF_BATCHES);

//pass the chunks for processing for the workers

executorService = Executors.newFixedThreadPool(NUMBER_OF_BATCHES);
List<BulkImportUser> users = bulkImportSQLStorage.getBulkImportUsersAndChangeStatusToProcessing(app,
BulkImport.PROCESS_USERS_BATCH_SIZE);
if (users == null || users.isEmpty()) {
// "No more users to process!"
break;
}

List<List<BulkImportUser>> loadedUsersChunks = makeChunksOf(users, NUMBER_OF_BATCHES);

try {
List<Future<?>> tasks = new ArrayList<>();
for (int i =0; i< NUMBER_OF_BATCHES && i < loadedUsersChunks.size(); i++) {
tasks.add(executorService.submit(new ProcessBulkUsersImportWorker(main, app, loadedUsersChunks.get(i),
bulkImportSQLStorage, bulkImportUserUtils)));
}
try {
List<Future<?>> tasks = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_BATCHES && i < loadedUsersChunks.size(); i++) {
tasks.add(
executorService.submit(new ProcessBulkUsersImportWorker(main, app, loadedUsersChunks.get(i),
bulkImportSQLStorage, bulkImportUserUtils)));
}

for (Future<?> task : tasks) {
while(!task.isDone()) {
Thread.sleep(1000);
for (Future<?> task : tasks) {
while (!task.isDone()) {
Thread.sleep(1000);
}
Void result = (Void) task.get(); //to know if there were any errors while executing and for waiting in this thread for all the other threads to finish up
usersProcessed += loadedUsersChunks.get(tasks.indexOf(task)).size();
}
Void result = (Void) task.get(); //to know if there were any errors while executing and for waiting in this thread for all the other threads to finish up
}
executorService.shutdownNow();

} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}

executorService.shutdownNow();
}

@Override
Expand All @@ -133,14 +141,6 @@ public int getInitialWaitTimeSeconds() {
return 0;
}

public Integer getBatchSize() {
return batchSize;
}

public void setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
}

private List<List<BulkImportUser>> makeChunksOf(List<BulkImportUser> users, int numberOfChunks) {
List<List<BulkImportUser>> chunks = new ArrayList<>();
if (users != null && !users.isEmpty() && numberOfChunks > 0) {
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/supertokens/webserver/Webserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.pluginInterface.multitenancy.exceptions.TenantOrAppNotFoundException;
import io.supertokens.webserver.api.accountlinking.*;
import io.supertokens.webserver.api.bulkimport.*;
import io.supertokens.webserver.api.bulkimport.BulkImportAPI;
import io.supertokens.webserver.api.bulkimport.CountBulkImportUsersAPI;
import io.supertokens.webserver.api.bulkimport.DeleteBulkImportUserAPI;
import io.supertokens.webserver.api.bulkimport.ImportUserAPI;
import io.supertokens.webserver.api.core.*;
import io.supertokens.webserver.api.dashboard.*;
import io.supertokens.webserver.api.emailpassword.UserAPI;
Expand Down Expand Up @@ -285,7 +288,6 @@ private void setupRoutes() {
addAPI(new DeleteBulkImportUserAPI(main));
addAPI(new ImportUserAPI(main));
addAPI(new CountBulkImportUsersAPI(main));
addAPI(new BulkImportBackgroundJobManagerAPI(main));

addAPI(new OAuthAuthAPI(main));
addAPI(new OAuthTokenAPI(main));
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws S
super.sendJsonResponse(200, result, resp);
} catch (BulkImportBatchInsertException e) {
JsonArray errors = new JsonArray();
BulkImportBatchInsertException insertException = (BulkImportBatchInsertException) e.getCause();
errors.addAll(
insertException.exceptionByUserId.values().stream().map(exc -> exc.getMessage()).map(JsonPrimitive::new)
e.exceptionByUserId.values().stream().map(exc -> exc.getMessage()).map(JsonPrimitive::new)
.collect(JsonArray::new, JsonArray::add, JsonArray::addAll)
);
JsonObject errorResponseJson = new JsonObject();
Expand Down
Loading

0 comments on commit dc41f30

Please sign in to comment.