Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add delete bulk import users api #197

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 53 additions & 1 deletion src/main/java/io/supertokens/storage/postgresql/Start.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo;
import io.supertokens.pluginInterface.authRecipe.LoginMethod;
import io.supertokens.pluginInterface.authRecipe.sqlStorage.AuthRecipeSQLStorage;
import io.supertokens.pluginInterface.bulkimport.BulkImportStorage;
import io.supertokens.pluginInterface.bulkimport.BulkImportUser;
import io.supertokens.pluginInterface.bulkimport.BulkImportUserInfo;
import io.supertokens.pluginInterface.dashboard.DashboardSearchTags;
import io.supertokens.pluginInterface.dashboard.DashboardSessionInfo;
import io.supertokens.pluginInterface.dashboard.DashboardUser;
Expand Down Expand Up @@ -109,7 +112,7 @@
public class Start
implements SessionSQLStorage, EmailPasswordSQLStorage, EmailVerificationSQLStorage, ThirdPartySQLStorage,
JWTRecipeSQLStorage, PasswordlessSQLStorage, UserMetadataSQLStorage, UserRolesSQLStorage, UserIdMappingStorage,
UserIdMappingSQLStorage, MultitenancyStorage, MultitenancySQLStorage, DashboardSQLStorage, TOTPSQLStorage, ActiveUsersStorage, AuthRecipeSQLStorage {
UserIdMappingSQLStorage, MultitenancyStorage, MultitenancySQLStorage, DashboardSQLStorage, TOTPSQLStorage, ActiveUsersStorage, AuthRecipeSQLStorage, BulkImportStorage {

// these configs are protected from being modified / viewed by the dev using the SuperTokens
// SaaS. If the core is not running in SuperTokens SaaS, this array has no effect.
Expand Down Expand Up @@ -2990,4 +2993,53 @@ public UserIdMapping[] getUserIdMapping_Transaction(TransactionConnection con, A
throw new StorageQueryException(e);
}
}

@Override
public void addBulkImportUsers(AppIdentifier appIdentifier, List<BulkImportUser> users)
throws StorageQueryException,
TenantOrAppNotFoundException,
io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException {
try {
BulkImportQueries.insertBulkImportUsers(this, appIdentifier, users);
} catch (SQLException e) {
if (e instanceof PSQLException) {
ServerErrorMessage serverErrorMessage = ((PSQLException) e).getServerErrorMessage();
if (isPrimaryKeyError(serverErrorMessage, Config.getConfig(this).getBulkImportUsersTable())) {
throw new io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException();
}
if (isForeignKeyConstraintError(serverErrorMessage, Config.getConfig(this).getBulkImportUsersTable(), "app_id")) {
throw new TenantOrAppNotFoundException(appIdentifier);
}
}
throw new StorageQueryException(e);
}
}

@Override
public List<BulkImportUserInfo> getBulkImportUsers(AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BulkImportUserStatus status,
@Nullable String bulkImportUserId, @Nullable Long createdAt) throws StorageQueryException {
try {
return BulkImportQueries.getBulkImportUsers(this, appIdentifier, limit, status, bulkImportUserId, createdAt);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
}

@Override
public void updateBulkImportUserStatus(AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull BulkImportUserStatus status) throws StorageQueryException {
try {
BulkImportQueries.updateBulkImportUserStatus(this, appIdentifier, bulkImportUserId, status);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
}

@Override
public void deleteBulkImportUsers(AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds) throws StorageQueryException {
try {
BulkImportQueries.deleteFailedBulkImportUsers(this, appIdentifier, bulkImportUserIds);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ public String getTotpUsedCodesTable() {
return addSchemaAndPrefixToTableName("totp_used_codes");
}

public String getBulkImportUsersTable() {
return addSchemaAndPrefixToTableName("bulk_import_users");
}

private String addSchemaAndPrefixToTableName(String tableName) {
return addSchemaToTableName(postgresql_table_names_prefix + tableName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens.storage.postgresql.queries;

import static io.supertokens.storage.postgresql.QueryExecutorTemplate.update;
import static io.supertokens.storage.postgresql.QueryExecutorTemplate.execute;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import io.supertokens.pluginInterface.RowMapper;
import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BulkImportUserStatus;
import io.supertokens.pluginInterface.bulkimport.BulkImportUser;
import io.supertokens.pluginInterface.bulkimport.BulkImportUserInfo;
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.storage.postgresql.Start;
import io.supertokens.storage.postgresql.config.Config;
import io.supertokens.storage.postgresql.utils.Utils;

public class BulkImportQueries {
static String getQueryToCreateBulkImportUsersTable(Start start) {
String schema = Config.getConfig(start).getTableSchema();
String tableName = Config.getConfig(start).getBulkImportUsersTable();
return "CREATE TABLE IF NOT EXISTS " + tableName + " ("
+ "id CHAR(36),"
+ "app_id VARCHAR(64) NOT NULL DEFAULT 'public',"
+ "raw_data TEXT NOT NULL,"
+ "status VARCHAR(128) DEFAULT 'NEW',"
+ "error_msg TEXT,"
+ "created_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP),"
+ "updated_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP),"
+ "CONSTRAINT " + Utils.getConstraintName(schema, tableName, null, "pkey")
+ " PRIMARY KEY(app_id, id),"
+ "CONSTRAINT " + Utils.getConstraintName(schema, tableName, "app_id", "fkey") + " "
+ "FOREIGN KEY(app_id) "
+ "REFERENCES " + Config.getConfig(start).getAppsTable() + " (app_id) ON DELETE CASCADE"
+ " );";
}

public static String getQueryToCreateStatusUpdatedAtIndex(Start start) {
return "CREATE INDEX IF NOT EXISTS bulk_import_users_status_updated_at_index ON "
+ Config.getConfig(start).getBulkImportUsersTable() + " (app_id, status, updated_at)";
}

public static String getQueryToCreateCreatedAtIndex(Start start) {
return "CREATE INDEX IF NOT EXISTS bulk_import_users_created_at_index ON "
+ Config.getConfig(start).getBulkImportUsersTable() + " (app_id, created_at)";
}

public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifier, List<BulkImportUser> users)
throws SQLException, StorageQueryException {
StringBuilder queryBuilder = new StringBuilder(
"INSERT INTO " + Config.getConfig(start).getBulkImportUsersTable() + " (id, app_id, raw_data) VALUES ");

int userCount = users.size();

for (int i = 0; i < userCount; i++) {
queryBuilder.append(" (?, ?, ?)");

if (i < userCount - 1) {
queryBuilder.append(",");
}
}

update(start, queryBuilder.toString(), pst -> {
int parameterIndex = 1;
for (BulkImportUser user : users) {
pst.setString(parameterIndex++, user.id);
pst.setString(parameterIndex++, appIdentifier.getAppId());
pst.setString(parameterIndex++, user.toString());
}
});
}

public static void updateBulkImportUserStatus(Start start, AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull BulkImportUserStatus status)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return if the update was successful or not (in case the entry does not exist in the db, we can return false, else true)

throws SQLException, StorageQueryException {
String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, updated_at = EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) WHERE id = ? AND app_id = ?";

update(start, query, pst -> {
pst.setString(1, status.toString());
pst.setString(2, bulkImportUserId);
pst.setString(3, appIdentifier.getAppId());
});
}

public static List<BulkImportUserInfo> getBulkImportUsers(Start start, AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BulkImportUserStatus status,
@Nullable String bulkImportUserId, @Nullable Long createdAt)
throws SQLException, StorageQueryException {

String baseQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable();

StringBuilder queryBuilder = new StringBuilder(baseQuery);
List<Object> parameters = new ArrayList<>();

queryBuilder.append(" WHERE app_id = ?");
parameters.add(appIdentifier.getAppId());

if (status != null) {
queryBuilder.append(" AND status = ?");
parameters.add(status.toString());
}

if (bulkImportUserId != null && createdAt != null) {
queryBuilder
.append(" AND created_at < ? OR (created_at = ? AND id <= ?)");
parameters.add(createdAt);
parameters.add(createdAt);
parameters.add(bulkImportUserId);
}

queryBuilder.append(" ORDER BY created_at DESC, id DESC LIMIT ?");
parameters.add(limit);

String query = queryBuilder.toString();

return execute(start, query, pst -> {
for (int i = 0; i < parameters.size(); i++) {
pst.setObject(i + 1, parameters.get(i));
}
}, result -> {
List<BulkImportUserInfo> bulkImportUsers = new ArrayList<>();
while (result.next()) {
bulkImportUsers.add(BulkImportUserInfoRowMapper.getInstance().mapOrThrow(result));
}
return bulkImportUsers;
});
}

public static void deleteFailedBulkImportUsers(Start start, AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds) throws SQLException, StorageQueryException {
if (bulkImportUserIds.length == 0) {
return;
}

String baseQuery = "DELETE FROM " + Config.getConfig(start).getBulkImportUsersTable();
StringBuilder queryBuilder = new StringBuilder(baseQuery);

List<Object> parameters = new ArrayList<>();

queryBuilder.append(" WHERE app_id = ?");
parameters.add(appIdentifier.getAppId());

queryBuilder.append(" AND id IN (");
for (int i = 0; i < bulkImportUserIds.length; i++) {
if (i != 0) {
queryBuilder.append(", ");
}
queryBuilder.append("?");
parameters.add(bulkImportUserIds[i]);
}
queryBuilder.append(")");

String query = queryBuilder.toString();

update(start, query, pst -> {
for (int i = 0; i < parameters.size(); i++) {
pst.setObject(i + 1, parameters.get(i));
}
});
}
private static class BulkImportUserInfoRowMapper implements RowMapper<BulkImportUserInfo, ResultSet> {
private static final BulkImportUserInfoRowMapper INSTANCE = new BulkImportUserInfoRowMapper();

private BulkImportUserInfoRowMapper() {
}

private static BulkImportUserInfoRowMapper getInstance() {
return INSTANCE;
}

@Override
public BulkImportUserInfo map(ResultSet result) throws Exception {
return new BulkImportUserInfo(result.getString("id"), result.getString("raw_data"),
BulkImportUserStatus.valueOf(result.getString("status")),
result.getLong("created_at"), result.getLong("updated_at"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,14 @@ public static void createTablesIfNotExists(Start start) throws SQLException, Sto
update(start, TOTPQueries.getQueryToCreateTenantIdIndexForUsedCodesTable(start), NO_OP_SETTER);
}

if (!doesTableExists(start, Config.getConfig(start).getBulkImportUsersTable())) {
getInstance(start).addState(CREATING_NEW_TABLE, null);
update(start, BulkImportQueries.getQueryToCreateBulkImportUsersTable(start), NO_OP_SETTER);
// index:
update(start, BulkImportQueries.getQueryToCreateStatusUpdatedAtIndex(start), NO_OP_SETTER);
update(start, BulkImportQueries.getQueryToCreateCreatedAtIndex(start), NO_OP_SETTER);
}

} catch (Exception e) {
if (e.getMessage().contains("schema") && e.getMessage().contains("does not exist")
&& numberOfRetries < 1) {
Expand Down Expand Up @@ -557,7 +565,10 @@ public static void deleteAllTables(Start start) throws SQLException, StorageQuer
String DROP_QUERY = "DROP INDEX IF EXISTS all_auth_recipe_users_pagination_index";
update(start, DROP_QUERY, NO_OP_SETTER);
}

{
String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_status_updated_at_index";
update(start, DROP_QUERY, NO_OP_SETTER);
}
{
String DROP_QUERY = "DROP TABLE IF EXISTS "
+ getConfig(start).getAppsTable() + ","
Expand Down Expand Up @@ -591,7 +602,8 @@ public static void deleteAllTables(Start start) throws SQLException, StorageQuer
+ getConfig(start).getDashboardUsersTable() + ","
+ getConfig(start).getDashboardSessionsTable() + ","
+ getConfig(start).getTotpUsedCodesTable() + "," + getConfig(start).getTotpUserDevicesTable() + ","
+ getConfig(start).getTotpUsersTable();
+ getConfig(start).getTotpUsersTable() + ","
+ getConfig(start).getBulkImportUsersTable();
update(start, DROP_QUERY, NO_OP_SETTER);
}
}
Expand Down
Loading