-
Notifications
You must be signed in to change notification settings - Fork 537
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: multithreaded bulk import #1077
base: feat/bulk-import-base
Are you sure you want to change the base?
feat: multithreaded bulk import #1077
Conversation
|
||
# (DIFFERENT_ACROSS_TENANTS | OPTIONAL | Default: number of available processor cores) int value. If specified, | ||
# the supertokens core will use the specified number of threads to complete the migration of users. | ||
# bulk_migration_parallelism: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be same for a given app? Does bulk import run for app or a tenant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it runs for app, so yeah, it should be the same for the tenants. My bad, just followed the instructions without thinking it through.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
devConfig.yaml
Outdated
@@ -170,3 +170,8 @@ disable_telemetry: true | |||
|
|||
# (Optional | Default: null) string value. The encryption key used for saving OAuth client secret on the database. | |||
# oauth_client_secret_encryption_key: | |||
|
|||
# (DIFFERENT_ACROSS_TENANTS | OPTIONAL | Default: number of available processor cores) int value. If specified, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refer comment from config.yaml
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
// Number of users to process in a single batch of ProcessBulkImportUsers Cron Job | ||
public static final int PROCESS_USERS_BATCH_SIZE = 10000; | ||
// Time interval in seconds between two consecutive runs of ProcessBulkImportUsers Cron Job | ||
public static final int PROCESS_USERS_INTERVAL_SECONDS = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it have to be this frequent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. We want to squeeze out as much performance as we can. Setting it to 1 sec means that the new round of batch import processing is going to happen 1 seconds after the current round finished. This is the reason why the start/stop cronjob api was done: to not waste the resources when we don't need to import anything.
|
||
public class BulkImportBackgroundJobManager { | ||
|
||
public static BULK_IMPORT_BACKGROUND_PROCESS_STATUS startBackgroundJob(Main main, Integer batchSize) throws TenantOrAppNotFoundException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does calling this twice create 2 cron jobs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no
} | ||
|
||
public static BULK_IMPORT_BACKGROUND_PROCESS_STATUS stopBackgroundJob(Main main) throws TenantOrAppNotFoundException { | ||
ProcessBulkImportUsers processBulkImportUsersCron = (ProcessBulkImportUsers) main.getResourceDistributor().getResource(new TenantIdentifier(null, null, null), ProcessBulkImportUsers.RESOURCE_KEY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if the cron was already stopped and removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add test if possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nothing. added a test for that
@@ -772,6 +782,10 @@ void normalizeAndValidate(Main main, boolean includeConfigFilePath) throws Inval | |||
} | |||
} | |||
|
|||
if (bulk_migration_parallelism < 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we also need an upper limit? what are the implications?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
theoretically speaking the upper end is limitless, purely depending on the hardware. Of course, if this is set to an incredibly high number without the supporting hardware architecture beneath it, the JVM can crash when actually trying to bulk import. (The number of threads that are defined here are only instantiated when a bulk import batch is loaded for processing)
@@ -100,6 +100,17 @@ public static void addCronjob(Main main, CronTask task) { | |||
} | |||
} | |||
|
|||
// TODO test for this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -228,7 +228,7 @@ public static boolean addNewOrUpdateAppOrTenant(Main main, TenantConfig newTenan | |||
} | |||
|
|||
|
|||
public static boolean addNewOrUpdateAppOrTenant(Main main, TenantConfig newTenant, | |||
public synchronized static boolean addNewOrUpdateAppOrTenant(Main main, TenantConfig newTenant, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this needed? I think we already take a lock for updating tenants.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, I am not sure what happened here. This is not even used in the bulk import..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the synchronized keyword
return HttpRequestForTesting.sendGETRequest(main, "", BACKGROUNDJOB_MANAGER_ENDPOINT, null, | ||
1000, 1000, null, Utils.getCdiVersionStringLatestForTests(), null); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the behaviour if the job is stopped when an import is happening? can we test that as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Processing is stopped. Added a new test for that.
|
||
if (limit != null) { | ||
if (limit > BulkImport.GET_USERS_PAGINATION_MAX_LIMIT) { | ||
throw new ServletException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are u aware of typical post body size if say we are adding 10000 users? it would be a useful information to be able to set max post body size in the nginx for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, 10k randomized users with multiple LMs and so on, are around 7.5MB (written to a json file). I don't know if that counts as typical or not.
return "/bulk-import/backgroundjob"; | ||
} | ||
|
||
//TODO what's stopping two independent calls interfering with each-other? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, but I don't have an answer for this. I can remove the comment but I don't have a solution for the written problem, apart from the clients who are orchestrating the imports should be aware. So, should I remove the comment? Do you have a better idea for solving this?
|
||
public class CountBulkImportUsersAPI extends WebserverAPI { | ||
public CountBulkImportUsersAPI(Main main) { | ||
super(main, ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could have an rid for bulk import. you could add it to all the bulk import APIs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
super.sendJsonResponse(200, result, resp); | ||
} catch (StorageQueryException e) { | ||
JsonArray errors = new JsonArray(); | ||
if(e.getCause() instanceof BulkImportBatchInsertException){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this exception should be caught and thrown in BulkImport.importUser
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public static boolean isAccountLinkingEnabled(Main main, AppIdentifier appIdentifier) throws StorageQueryException, TenantOrAppNotFoundException { | ||
return Arrays.stream(FeatureFlag.getInstance(main, appIdentifier).getEnabledFeatures()) | ||
.anyMatch(t -> t == EE_FEATURES.ACCOUNT_LINKING || t == EE_FEATURES.MFA); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure why this change appears here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because this was moved from AuthRecipe.java
to the Utils.java
as it was used in BulkImportUserUtils
too, but with code duplication. I removed the code duplication and extracted this to a place where both participants can reach and doesn't go against the dependencies between the code layers.
Summary of change
Core implementation of Bulk User Migration.
Related issues
Test Plan
(Write your test plan here. If you changed any code, please provide us with clear instructions on how you verified your
changes work. Bonus points for screenshots and videos!)
Documentation changes
(If relevant, please create a PR in our docs repo, or create a checklist here
highlighting the necessary changes)
Checklist for important updates
coreDriverInterfaceSupported.json
file has been updated (if needed)pluginInterfaceSupported.json
file has been updated (if needed)build.gradle
getPaidFeatureStats
function in FeatureFlag.java filebuild.gradle
, please make sure to add themin
implementationDependencies.json
.getValidFields
inio/supertokens/config/CoreConfig.java
if new aliases were added for any coreconfig (similar to the
access_token_signing_key_update_interval
config alias).git tag
) in the formatvX.Y.Z
, and then find thelatest branch (
git branch --all
) whoseX.Y
is greater than the latest released tag.app_id_to_user_id
table, make sure to delete from this table when deletingthe user as well if
deleteUserIdMappingToo
is false.Remaining TODOs for this PR