Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add BulkImport APIs and cron #211

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens.storage.postgresql;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLTransactionRollbackException;
import java.util.Set;

import org.postgresql.util.PSQLException;

import com.google.gson.JsonObject;

import io.supertokens.pluginInterface.LOG_LEVEL;
import io.supertokens.pluginInterface.exceptions.InvalidConfigException;
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.pluginInterface.sqlStorage.TransactionConnection;
import io.supertokens.storage.postgresql.config.Config;


/**
* BulkImportProxyStorage is a class extending Start, serving as a Storage instance in the bulk import user cronjob.
* This cronjob extensively utilizes existing queries to import users, all of which internally operate within transactions.
*
* For the purpose of bulkimport cronjob, we aim to employ a single connection for all queries and rollback any operations in case of query failures.
* To achieve this, we override the startTransaction method to utilize the same connection and prevent automatic query commits even upon transaction success.
* Subsequently, the cronjob is responsible for committing the transaction after ensuring the successful execution of all queries.
*/

public class BulkImportProxyStorage extends Start {
private Connection transactionConnection;
sattvikc marked this conversation as resolved.
Show resolved Hide resolved

public Connection getTransactionConnection() throws SQLException {
sattvikc marked this conversation as resolved.
Show resolved Hide resolved
if (this.transactionConnection == null || this.transactionConnection.isClosed()) {
sattvikc marked this conversation as resolved.
Show resolved Hide resolved
this.transactionConnection = ConnectionPool.getConnectionForProxyStorage(this);
sattvikc marked this conversation as resolved.
Show resolved Hide resolved
}
return this.transactionConnection;
}

@Override
public <T> T startTransaction(TransactionLogic<T> logic)
throws StorageTransactionLogicException, StorageQueryException {
return startTransaction(logic, TransactionIsolationLevel.SERIALIZABLE);
}

@Override
public <T> T startTransaction(TransactionLogic<T> logic, TransactionIsolationLevel isolationLevel)
sattvikc marked this conversation as resolved.
Show resolved Hide resolved
throws StorageTransactionLogicException, StorageQueryException {
final int NUM_TRIES = 50;
int tries = 0;
while (true) {
tries++;
try {
return startTransactionHelper(logic, isolationLevel);
} catch (SQLException | StorageQueryException | StorageTransactionLogicException e) {
Throwable actualException = e;
if (e instanceof StorageQueryException) {
actualException = e.getCause();
} else if (e instanceof StorageTransactionLogicException) {
actualException = ((StorageTransactionLogicException) e).actualException;
}
String exceptionMessage = actualException.getMessage();
if (exceptionMessage == null) {
exceptionMessage = "";
}

// see: https://github.com/supertokens/supertokens-postgresql-plugin/pull/3

// We set this variable to the current (or cause) exception casted to
// PSQLException if we can safely cast it
PSQLException psqlException = actualException instanceof PSQLException ? (PSQLException) actualException
: null;

// PSQL error class 40 is transaction rollback. See:
// https://www.postgresql.org/docs/12/errcodes-appendix.html
boolean isPSQLRollbackException = psqlException != null
&& psqlException.getServerErrorMessage().getSQLState().startsWith("40");

// We keep the old exception detection logic to ensure backwards compatibility.
// We could get here if the new logic hits a false negative,
// e.g., in case someone renamed constraints/tables
boolean isDeadlockException = actualException instanceof SQLTransactionRollbackException
|| exceptionMessage.toLowerCase().contains("concurrent update")
|| exceptionMessage.toLowerCase().contains("concurrent delete")
|| exceptionMessage.toLowerCase().contains("the transaction might succeed if retried") ||

// we have deadlock as well due to the DeadlockTest.java
exceptionMessage.toLowerCase().contains("deadlock");

if ((isPSQLRollbackException || isDeadlockException) && tries < NUM_TRIES) {
try {
Thread.sleep((long) (10 + (250 + Math.min(Math.pow(2, tries), 3000)) * Math.random()));
} catch (InterruptedException ignored) {
}
ProcessState.getInstance(this).addState(ProcessState.PROCESS_STATE.DEADLOCK_FOUND, e);
// this because deadlocks are not necessarily a result of faulty logic. They can
// happen
continue;
}

if ((isPSQLRollbackException || isDeadlockException) && tries == NUM_TRIES) {
ProcessState.getInstance(this).addState(ProcessState.PROCESS_STATE.DEADLOCK_NOT_RESOLVED, e);
}
if (e instanceof StorageQueryException) {
throw (StorageQueryException) e;
} else if (e instanceof StorageTransactionLogicException) {
throw (StorageTransactionLogicException) e;
}
throw new StorageQueryException(e);
}
}
}

private <T> T startTransactionHelper(TransactionLogic<T> logic, TransactionIsolationLevel isolationLevel)
sattvikc marked this conversation as resolved.
Show resolved Hide resolved
throws StorageQueryException, StorageTransactionLogicException, SQLException {
Connection con = null;
try {
con = (Connection) getTransactionConnection();
int libIsolationLevel = Connection.TRANSACTION_SERIALIZABLE;
switch (isolationLevel) {
case SERIALIZABLE:
libIsolationLevel = Connection.TRANSACTION_SERIALIZABLE;
break;
case REPEATABLE_READ:
libIsolationLevel = Connection.TRANSACTION_REPEATABLE_READ;
break;
case READ_COMMITTED:
libIsolationLevel = Connection.TRANSACTION_READ_COMMITTED;
break;
case READ_UNCOMMITTED:
libIsolationLevel = Connection.TRANSACTION_READ_UNCOMMITTED;
break;
case NONE:
libIsolationLevel = Connection.TRANSACTION_NONE;
break;
}

if (libIsolationLevel != Connection.TRANSACTION_SERIALIZABLE) {
con.setTransactionIsolation(libIsolationLevel);
}
con.setAutoCommit(false);
return logic.mainLogicAndCommit(new TransactionConnection(con));
} catch (Exception e) {
if (con != null) {
con.rollback();
}
sattvikc marked this conversation as resolved.
Show resolved Hide resolved
throw e;
}
}

sattvikc marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void commitTransaction(TransactionConnection con) throws StorageQueryException {
// We do not want to commit the queries when using the BulkImportProxyStorage to be able to rollback everything
// if any query fails while importing the user
}

@Override
public void loadConfig(JsonObject configJson, Set<LOG_LEVEL> logLevels, TenantIdentifier tenantIdentifier)
throws InvalidConfigException {
// We are overriding the loadConfig method to set the connection pool size
// to 1 to avoid creating many connections for the bulk import cronjob
configJson.addProperty("postgresql_connection_pool_size", 1);
Config.loadConfig(this, configJson, logLevels, tenantIdentifier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to just call super.loadConfig instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ static void initPool(Start start, boolean shouldWait) throws DbInitException {
}
}

public static Connection getConnection(Start start) throws SQLException {
private static Connection getNewConnection(Start start) throws SQLException {
if (getInstance(start) == null) {
throw new IllegalStateException("Please call initPool before getConnection");
}
Expand All @@ -202,6 +202,17 @@ public static Connection getConnection(Start start) throws SQLException {
return getInstance(start).hikariDataSource.getConnection();
}

public static Connection getConnectionForProxyStorage(Start start) throws SQLException {
return getNewConnection(start);
}

public static Connection getConnection(Start start) throws SQLException {
if (start instanceof BulkImportProxyStorage) {
return ((BulkImportProxyStorage) start).getTransactionConnection();
}
return getNewConnection(start);
}

static void close(Start start) {
if (getInstance(start) == null) {
return;
Expand All @@ -216,4 +227,12 @@ static void close(Start start) {
}
}
}

public static void closeConnection(Start start, Connection con) throws SQLException {
if (start instanceof BulkImportProxyStorage) {
// Keep the connection open for future queries
} else {
con.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ public interface QueryExecutorTemplate {

static <T> T execute(Start start, String QUERY, PreparedStatementValueSetter setter,
ResultSetValueExtractor<T> mapper) throws SQLException, StorageQueryException {
try (Connection con = ConnectionPool.getConnection(start)) {

Connection con = ConnectionPool.getConnection(start);
try {
return execute(con, QUERY, setter, mapper);
} finally {
ConnectionPool.closeConnection(start, con);
}
}

Expand All @@ -44,15 +48,31 @@ static <T> T execute(Connection con, String QUERY, PreparedStatementValueSetter
}
}

static int update(Start start, String QUERY, PreparedStatementValueSetter setter)
throws SQLException, StorageQueryException {
try (Connection con = ConnectionPool.getConnection(start)) {
static int update(Start start, String QUERY, PreparedStatementValueSetter setter) throws SQLException {
Connection con = ConnectionPool.getConnection(start);
try {
return update(con, QUERY, setter);
} finally {
ConnectionPool.closeConnection(start, con);
}
}

static int update(Connection con, String QUERY, PreparedStatementValueSetter setter)
static <T> T update(Start start, String QUERY, PreparedStatementValueSetter setter, ResultSetValueExtractor<T> mapper)
throws SQLException, StorageQueryException {
Connection con = ConnectionPool.getConnection(start);
try {
try (PreparedStatement pst = con.prepareStatement(QUERY)) {
setter.setValues(pst);
try (ResultSet result = pst.executeQuery()) {
return mapper.extract(result);
}
}
} finally {
ConnectionPool.closeConnection(start, con);
}
}

static int update(Connection con, String QUERY, PreparedStatementValueSetter setter) throws SQLException {
try (PreparedStatement pst = con.prepareStatement(QUERY)) {
setter.setValues(pst);
return pst.executeUpdate();
Expand Down
82 changes: 80 additions & 2 deletions src/main/java/io/supertokens/storage/postgresql/Start.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo;
import io.supertokens.pluginInterface.authRecipe.LoginMethod;
import io.supertokens.pluginInterface.authRecipe.sqlStorage.AuthRecipeSQLStorage;
import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BULK_IMPORT_USER_STATUS;
import io.supertokens.pluginInterface.bulkimport.sqlStorage.BulkImportSQLStorage;
import io.supertokens.pluginInterface.bulkimport.BulkImportUser;
import io.supertokens.pluginInterface.dashboard.DashboardSearchTags;
import io.supertokens.pluginInterface.dashboard.DashboardSessionInfo;
import io.supertokens.pluginInterface.dashboard.DashboardUser;
Expand Down Expand Up @@ -109,7 +112,7 @@ public class Start
implements SessionSQLStorage, EmailPasswordSQLStorage, EmailVerificationSQLStorage, ThirdPartySQLStorage,
JWTRecipeSQLStorage, PasswordlessSQLStorage, UserMetadataSQLStorage, UserRolesSQLStorage, UserIdMappingStorage,
UserIdMappingSQLStorage, MultitenancyStorage, MultitenancySQLStorage, DashboardSQLStorage, TOTPSQLStorage,
ActiveUsersStorage, ActiveUsersSQLStorage, AuthRecipeSQLStorage {
ActiveUsersStorage, ActiveUsersSQLStorage, AuthRecipeSQLStorage, BulkImportSQLStorage {

// these configs are protected from being modified / viewed by the dev using the SuperTokens
// SaaS. If the core is not running in SuperTokens SaaS, this array has no effect.
Expand Down Expand Up @@ -152,6 +155,11 @@ public STORAGE_TYPE getType() {
return STORAGE_TYPE.SQL;
}

@Override
public Storage createBulkImportProxyStorageInstance() {
return new BulkImportProxyStorage();
}

@Override
public void loadConfig(JsonObject configJson, Set<LOG_LEVEL> logLevels, TenantIdentifier tenantIdentifier)
throws InvalidConfigException {
Expand Down Expand Up @@ -2359,7 +2367,7 @@ public boolean addUserIdToTenant_Transaction(TenantIdentifier tenantIdentifier,
throw new IllegalStateException("Should never come here!");
}

sqlCon.commit();
this.commitTransaction(con);
return added;
} catch (SQLException throwables) {
PostgreSQLConfig config = Config.getConfig(this);
Expand Down Expand Up @@ -3046,4 +3054,74 @@ public int getDbActivityCount(String dbname) throws SQLException, StorageQueryEx
return -1;
});
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the functions below are all supposed to be called NOT on the bulkimportproxystorage instance right? In that case, please assert in these that the instance of this is not of type bulkimportproxystorage

public void addBulkImportUsers(AppIdentifier appIdentifier, List<BulkImportUser> users)
throws StorageQueryException,
TenantOrAppNotFoundException,
io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException {
try {
BulkImportQueries.insertBulkImportUsers(this, appIdentifier, users);
} catch (SQLException e) {
if (e instanceof PSQLException) {
ServerErrorMessage serverErrorMessage = ((PSQLException) e).getServerErrorMessage();
if (isPrimaryKeyError(serverErrorMessage, Config.getConfig(this).getBulkImportUsersTable())) {
throw new io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException();
}
if (isForeignKeyConstraintError(serverErrorMessage, Config.getConfig(this).getBulkImportUsersTable(), "app_id")) {
throw new TenantOrAppNotFoundException(appIdentifier);
}
}
throw new StorageQueryException(e);
}
}

@Override
public List<BulkImportUser> getBulkImportUsers(AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status,
@Nullable String bulkImportUserId, @Nullable Long createdAt) throws StorageQueryException {
try {
return BulkImportQueries.getBulkImportUsers(this, appIdentifier, limit, status, bulkImportUserId, createdAt);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
}

@Override
public void updateBulkImportUserStatus_Transaction(AppIdentifier appIdentifier, TransactionConnection con, @Nonnull String[] bulkImportUserIds, @Nonnull BULK_IMPORT_USER_STATUS status, @Nullable String errorMessage)
throws StorageQueryException {
Connection sqlCon = (Connection) con.getConnection();
try {
BulkImportQueries.updateBulkImportUserStatus_Transaction(this, sqlCon, appIdentifier, bulkImportUserIds, status, errorMessage);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
}

@Override
public List<String> deleteBulkImportUsers(AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds) throws StorageQueryException {
try {
return BulkImportQueries.deleteBulkImportUsers(this, appIdentifier, bulkImportUserIds);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
}

@Override
public List<BulkImportUser> getBulkImportUsersForProcessing(AppIdentifier appIdentifier, @Nonnull Integer limit) throws StorageQueryException {
try {
return BulkImportQueries.getBulkImportUsersForProcessing(this, appIdentifier, limit);
} catch (StorageTransactionLogicException e) {
throw new StorageQueryException(e.actualException);
}
}

@Override
public void deleteBulkImportUser_Transaction(AppIdentifier appIdentifier, TransactionConnection con, @Nonnull String bulkImportUserId) throws StorageQueryException {
Connection sqlCon = (Connection) con.getConnection();
try {
BulkImportQueries.deleteBulkImportUser_Transaction(this, sqlCon, appIdentifier, bulkImportUserId);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ public String getTotpUsedCodesTable() {
return addSchemaAndPrefixToTableName("totp_used_codes");
}

public String getBulkImportUsersTable() {
return addSchemaAndPrefixToTableName("bulk_import_users");
}

private String addSchemaAndPrefixToTableName(String tableName) {
return addSchemaToTableName(postgresql_table_names_prefix + tableName);
}
Expand Down
Loading
Loading