Skip to content

Commit

Permalink
feat: Add ProcessBulkImportUsers cron job
Browse files Browse the repository at this point in the history
  • Loading branch information
anku255 committed Mar 8, 2024
1 parent 61c9225 commit 669f6b1
Show file tree
Hide file tree
Showing 17 changed files with 1,226 additions and 321 deletions.
4 changes: 4 additions & 0 deletions src/main/java/io/supertokens/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.supertokens.config.Config;
import io.supertokens.config.CoreConfig;
import io.supertokens.cronjobs.Cronjobs;
import io.supertokens.cronjobs.bulkimport.ProcessBulkImportUsers;
import io.supertokens.cronjobs.deleteExpiredAccessTokenSigningKeys.DeleteExpiredAccessTokenSigningKeys;
import io.supertokens.cronjobs.deleteExpiredDashboardSessions.DeleteExpiredDashboardSessions;
import io.supertokens.cronjobs.deleteExpiredEmailVerificationTokens.DeleteExpiredEmailVerificationTokens;
Expand Down Expand Up @@ -254,6 +255,9 @@ private void init() throws IOException, StorageQueryException {
// starts DeleteExpiredAccessTokenSigningKeys cronjob if the access token signing keys can change
Cronjobs.addCronjob(this, DeleteExpiredAccessTokenSigningKeys.init(this, uniqueUserPoolIdsTenants));

// starts ProcessBulkImportUsers cronjob
Cronjobs.addCronjob(this, ProcessBulkImportUsers.init(this, uniqueUserPoolIdsTenants));

// this is to ensure tenantInfos are in sync for the new cron job as well
MultitenancyHelper.getInstance(this).refreshCronjobs();

Expand Down
200 changes: 135 additions & 65 deletions src/main/java/io/supertokens/authRecipe/AuthRecipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -336,41 +336,84 @@ public static LinkAccountsResult linkAccounts(Main main, AppIdentifierWithStorag
AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException,
RecipeUserIdAlreadyLinkedWithAnotherPrimaryUserIdException, InputUserIdIsNotAPrimaryUserException,
UnknownUserIdException, TenantOrAppNotFoundException, FeatureNotEnabledException {
AuthRecipeSQLStorage storage = (AuthRecipeSQLStorage) appIdentifierWithStorage.getAuthRecipeStorage();
try {
return storage.startTransaction(con -> {
return linkAccountsInternal(main, con, appIdentifierWithStorage, _recipeUserId, _primaryUserId);
});
} catch (StorageTransactionLogicException e) {
return handleLinkAccountsExceptions(e);
}
}

public static LinkAccountsResult bulkImport_linkAccounts_Transaction(Main main, TransactionConnection con,
AppIdentifierWithStorage appIdentifierWithStorage,
String _recipeUserId, String _primaryUserId)
throws StorageQueryException,
AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException,
RecipeUserIdAlreadyLinkedWithAnotherPrimaryUserIdException, InputUserIdIsNotAPrimaryUserException,
UnknownUserIdException, TenantOrAppNotFoundException, FeatureNotEnabledException {
try {
return linkAccountsInternal(main, con, appIdentifierWithStorage, _recipeUserId, _primaryUserId);
} catch (StorageTransactionLogicException e) {
return handleLinkAccountsExceptions(e);
}
}

if (Arrays.stream(FeatureFlag.getInstance(main, appIdentifierWithStorage).getEnabledFeatures())
.noneMatch(t -> (t == EE_FEATURES.ACCOUNT_LINKING || t == EE_FEATURES.MFA))) {
throw new FeatureNotEnabledException(
"Account linking feature is not enabled for this app. Please contact support to enable it.");
private static LinkAccountsResult handleLinkAccountsExceptions(StorageTransactionLogicException e)
throws StorageQueryException,
AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException,
RecipeUserIdAlreadyLinkedWithAnotherPrimaryUserIdException,
InputUserIdIsNotAPrimaryUserException, UnknownUserIdException, TenantOrAppNotFoundException,
FeatureNotEnabledException {
if (e.actualException instanceof UnknownUserIdException) {
throw (UnknownUserIdException) e.actualException;
} else if (e.actualException instanceof InputUserIdIsNotAPrimaryUserException) {
throw (InputUserIdIsNotAPrimaryUserException) e.actualException;
} else if (e.actualException instanceof RecipeUserIdAlreadyLinkedWithAnotherPrimaryUserIdException) {
throw (RecipeUserIdAlreadyLinkedWithAnotherPrimaryUserIdException) e.actualException;
} else if (e.actualException instanceof AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException) {
throw (AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException) e.actualException;
} else if (e.actualException instanceof TenantOrAppNotFoundException) {
throw (TenantOrAppNotFoundException) e.actualException;
} else if (e.actualException instanceof FeatureNotEnabledException) {
throw (FeatureNotEnabledException) e.actualException;
}
throw new StorageQueryException(e);
}

private static LinkAccountsResult linkAccountsInternal(Main main, TransactionConnection con,
AppIdentifierWithStorage appIdentifierWithStorage,
String _recipeUserId, String _primaryUserId)
throws StorageTransactionLogicException {

AuthRecipeSQLStorage storage = (AuthRecipeSQLStorage) appIdentifierWithStorage.getAuthRecipeStorage();
try {
LinkAccountsResult result = storage.startTransaction(con -> {
if (Arrays.stream(FeatureFlag.getInstance(main, appIdentifierWithStorage).getEnabledFeatures())
.noneMatch(t -> (t == EE_FEATURES.ACCOUNT_LINKING || t == EE_FEATURES.MFA))) {
throw new FeatureNotEnabledException(
"Account linking feature is not enabled for this app. Please contact support to enable it.");
}

try {
CanLinkAccountsResult canLinkAccounts = canLinkAccountsHelper(con, appIdentifierWithStorage,
_recipeUserId, _primaryUserId);
AuthRecipeSQLStorage storage = (AuthRecipeSQLStorage) appIdentifierWithStorage.getAuthRecipeStorage();

if (canLinkAccounts.alreadyLinked) {
return new LinkAccountsResult(getUserById(appIdentifierWithStorage, canLinkAccounts.primaryUserId), true);
}
// now we can link accounts in the db.
storage.linkAccounts_Transaction(appIdentifierWithStorage, con, canLinkAccounts.recipeUserId,
canLinkAccounts.primaryUserId);
CanLinkAccountsResult canLinkAccounts = canLinkAccountsHelper(con, appIdentifierWithStorage,
_recipeUserId, _primaryUserId);

storage.commitTransaction(con);
if (canLinkAccounts.alreadyLinked) {
return new LinkAccountsResult(getUserById(appIdentifierWithStorage, canLinkAccounts.primaryUserId),
true);
}
// now we can link accounts in the db.
storage.linkAccounts_Transaction(appIdentifierWithStorage, con, canLinkAccounts.recipeUserId,
canLinkAccounts.primaryUserId);

return new LinkAccountsResult(getUserById(appIdentifierWithStorage, canLinkAccounts.primaryUserId), false);
} catch (UnknownUserIdException | InputUserIdIsNotAPrimaryUserException |
RecipeUserIdAlreadyLinkedWithAnotherPrimaryUserIdException |
AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException e) {
throw new StorageTransactionLogicException(e);
}
});

LinkAccountsResult result = new LinkAccountsResult(
getUserById(appIdentifierWithStorage, canLinkAccounts.primaryUserId), false);

if (!result.wasAlreadyLinked) {
io.supertokens.pluginInterface.useridmapping.UserIdMapping mappingResult =
io.supertokens.useridmapping.UserIdMapping.getUserIdMapping(
io.supertokens.pluginInterface.useridmapping.UserIdMapping mappingResult = io.supertokens.useridmapping.UserIdMapping
.getUserIdMapping(
appIdentifierWithStorage,
_recipeUserId, UserIdType.SUPERTOKENS);
// finally, we revoke all sessions of the recipeUser Id cause their user ID has changed.
Expand All @@ -379,17 +422,11 @@ public static LinkAccountsResult linkAccounts(Main main, AppIdentifierWithStorag
}

return result;
} catch (StorageTransactionLogicException e) {
if (e.actualException instanceof UnknownUserIdException) {
throw (UnknownUserIdException) e.actualException;
} else if (e.actualException instanceof InputUserIdIsNotAPrimaryUserException) {
throw (InputUserIdIsNotAPrimaryUserException) e.actualException;
} else if (e.actualException instanceof RecipeUserIdAlreadyLinkedWithAnotherPrimaryUserIdException) {
throw (RecipeUserIdAlreadyLinkedWithAnotherPrimaryUserIdException) e.actualException;
} else if (e.actualException instanceof AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException) {
throw (AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException) e.actualException;
}
throw new StorageQueryException(e);
} catch (FeatureNotEnabledException | TenantOrAppNotFoundException | StorageQueryException
| UnknownUserIdException | InputUserIdIsNotAPrimaryUserException
| RecipeUserIdAlreadyLinkedWithAnotherPrimaryUserIdException
| AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException e) {
throw new StorageTransactionLogicException(e);
}
}

Expand Down Expand Up @@ -543,43 +580,76 @@ public static CreatePrimaryUserResult createPrimaryUser(Main main,
RecipeUserIdAlreadyLinkedWithPrimaryUserIdException, UnknownUserIdException, TenantOrAppNotFoundException,
FeatureNotEnabledException {

if (Arrays.stream(FeatureFlag.getInstance(main, appIdentifierWithStorage).getEnabledFeatures())
.noneMatch(t -> (t == EE_FEATURES.ACCOUNT_LINKING || t == EE_FEATURES.MFA))) {
throw new FeatureNotEnabledException(
"Account linking feature is not enabled for this app. Please contact support to enable it.");
}

AuthRecipeSQLStorage storage = (AuthRecipeSQLStorage) appIdentifierWithStorage.getAuthRecipeStorage();
try {
return storage.startTransaction(con -> {
return createPrimaryUserInternal(main, con, appIdentifierWithStorage, recipeUserId);
});
} catch (StorageTransactionLogicException e) {
return handleCreatePrimaryUserExceptions(e);
}
}

try {
CreatePrimaryUserResult result = canCreatePrimaryUserHelper(con, appIdentifierWithStorage,
recipeUserId);
if (result.wasAlreadyAPrimaryUser) {
return result;
}
storage.makePrimaryUser_Transaction(appIdentifierWithStorage, con, result.user.getSupertokensUserId());
public static CreatePrimaryUserResult bulkImport_createPrimaryUser_Transaction(Main main, TransactionConnection con,
AppIdentifierWithStorage appIdentifierWithStorage,
String recipeUserId)
throws StorageQueryException, AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException,
RecipeUserIdAlreadyLinkedWithPrimaryUserIdException, UnknownUserIdException, TenantOrAppNotFoundException,
FeatureNotEnabledException {
try {
return createPrimaryUserInternal(main, con, appIdentifierWithStorage, recipeUserId);
} catch (StorageTransactionLogicException e) {
return handleCreatePrimaryUserExceptions(e);
}
}

storage.commitTransaction(con);
public static CreatePrimaryUserResult handleCreatePrimaryUserExceptions(StorageTransactionLogicException e)
throws StorageQueryException, AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException,
RecipeUserIdAlreadyLinkedWithPrimaryUserIdException, UnknownUserIdException, TenantOrAppNotFoundException,
FeatureNotEnabledException {
if (e.actualException instanceof UnknownUserIdException) {
throw (UnknownUserIdException) e.actualException;
} else if (e.actualException instanceof RecipeUserIdAlreadyLinkedWithPrimaryUserIdException) {
throw (RecipeUserIdAlreadyLinkedWithPrimaryUserIdException) e.actualException;
} else if (e.actualException instanceof AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException) {
throw (AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException) e.actualException;
} else if (e.actualException instanceof TenantOrAppNotFoundException) {
throw (TenantOrAppNotFoundException) e.actualException;
} else if (e.actualException instanceof FeatureNotEnabledException) {
throw (FeatureNotEnabledException) e.actualException;
}
throw new StorageQueryException(e);
}

result.user.isPrimaryUser = true;
public static CreatePrimaryUserResult createPrimaryUserInternal(Main main, TransactionConnection con,
AppIdentifierWithStorage appIdentifierWithStorage,
String recipeUserId)
throws StorageTransactionLogicException {
try {
if (Arrays.stream(FeatureFlag.getInstance(main, appIdentifierWithStorage).getEnabledFeatures())
.noneMatch(t -> (t == EE_FEATURES.ACCOUNT_LINKING || t == EE_FEATURES.MFA))) {
throw new FeatureNotEnabledException(
"Account linking feature is not enabled for this app. Please contact support to enable it.");
}

return result;
} catch (UnknownUserIdException | RecipeUserIdAlreadyLinkedWithPrimaryUserIdException |
AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException e) {
throw new StorageTransactionLogicException(e);
}
});
} catch (StorageTransactionLogicException e) {
if (e.actualException instanceof UnknownUserIdException) {
throw (UnknownUserIdException) e.actualException;
} else if (e.actualException instanceof RecipeUserIdAlreadyLinkedWithPrimaryUserIdException) {
throw (RecipeUserIdAlreadyLinkedWithPrimaryUserIdException) e.actualException;
} else if (e.actualException instanceof AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException) {
throw (AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException) e.actualException;
AuthRecipeSQLStorage storage = (AuthRecipeSQLStorage) appIdentifierWithStorage.getAuthRecipeStorage();

CreatePrimaryUserResult result = canCreatePrimaryUserHelper(con, appIdentifierWithStorage,
recipeUserId);
if (result.wasAlreadyAPrimaryUser) {
return result;
}
throw new StorageQueryException(e);
storage.makePrimaryUser_Transaction(appIdentifierWithStorage, con, result.user.getSupertokensUserId());


result.user.isPrimaryUser = true;

return result;

} catch (TenantOrAppNotFoundException | FeatureNotEnabledException | StorageQueryException
| UnknownUserIdException | RecipeUserIdAlreadyLinkedWithPrimaryUserIdException
| AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException e) {
throw new StorageTransactionLogicException(e);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/supertokens/bulkimport/BulkImport.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class BulkImport {
public static final int GET_USERS_PAGINATION_LIMIT = 500;
public static final int GET_USERS_DEFAULT_LIMIT = 100;
public static final int DELETE_USERS_LIMIT = 500;
public static final int PROCESS_USERS_BATCH_SIZE = 1000;
public static final int PROCESS_USERS_INTERVAL = 60; // 60 seconds

public static void addUsers(AppIdentifierWithStorage appIdentifierWithStorage, List<BulkImportUser> users)
throws StorageQueryException, TenantOrAppNotFoundException {
Expand Down
Loading

0 comments on commit 669f6b1

Please sign in to comment.