Skip to content

Commit

Permalink
fix transaction keyword
Browse files Browse the repository at this point in the history
changed transaction keyword
changed connection url construction
  • Loading branch information
kulmam92 committed Oct 31, 2018
1 parent e642671 commit 37a507a
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 61 deletions.
9 changes: 3 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

Digdag `mssql>` operator plugin to execute a query on MsSQL server.

**Caution**: This isnot working yet.

## configuration

[Release list](https://github.com/kulmam92/digdag-plugin-mssql/releases).
Expand All @@ -12,10 +10,9 @@ Digdag `mssql>` operator plugin to execute a query on MsSQL server.
_export:
plugin:
repositories:
repositories:
#- file://${repos}
- file://${repos}
#- file:///path/to/digdag-plugin-mssql/build/repo
- https://jitpack.io
#- https://jitpack.io
dependencies:
- com.github.kulmam92:digdag-plugin-mssql:0.1.1

Expand Down Expand Up @@ -66,6 +63,6 @@ Artifacts are build on local repos: `./build/repo`.
```sh
digdag selfupdate

rm -rf sample/.digdag/plugin
rm -rf .digdag/plugin
digdag run -a --project sample plugin.dig -p repos=`pwd`/build/repo
```
2 changes: 1 addition & 1 deletion sample/plugin.dig
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ _export:
plugin:
repositories:
- file://${repos}
- https://dl.bintray.com/digdag/maven
#- https://dl.bintray.com/digdag/maven
#- https://jitpack.io
dependencies:
- com.github.kulmam92:digdag-plugin-mssql:0.1.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,57 @@ public void cleanup()
);
}

@Override
public boolean lockedTransaction(UUID queryId, TransactionAction action)
throws LockConflictException
{
boolean completed = beginTransactionAndLockStatusRow(queryId);

// status row is locked here until this transaction is committed or aborted.

if (completed) {
// query was completed successfully before. skip the action.
abortTransaction();
return false;
}
else {
// query is not completed. run the action.
action.run();
updateStatusRowAndCommit(queryId);
return true;
}
}

private boolean beginTransactionAndLockStatusRow(UUID queryId)
throws LockConflictException
{
do {
beginTransaction();

String status = lockStatusRowString(queryId);
switch (status) {
case "LOCKED_COMPLETED":
return true;
case "LOCKED_NOT_COMPLETED":
return false;
case "NOT_EXISTS":
// status row doesn't exist. insert one.
insertStatusRowAndCommit(queryId);
}
} while (true);
}

private void beginTransaction()
{
executeStatement("begin a transaction", "BEGIN TRANSACTION");
}

@Override
protected void abortTransaction()
{
executeStatement("rollback a transaction", "ROLLBACK TRANSACTION");
}

// Referenced by AbstractPersistentTransactionHelper
@Override
protected StatusRow lockStatusRow(UUID queryId)
Expand Down Expand Up @@ -166,12 +217,37 @@ protected StatusRow lockStatusRow(UUID queryId)
}
}
catch (SQLException ex) {
// if (ex.getSQLState().equals("55P03")) {
// throw new LockConflictException("Failed to acquire a status row lock", ex);
// }
// else {
throw new DatabaseException("Failed to lock a status row", ex);
// }
}
}

// Did this since enum StatusRow is not public
protected String lockStatusRowString(UUID queryId)
throws LockConflictException
{
try (Statement stmt = connection.createStatement()) {
// Sql Server doesn't support select for update -- (updlock)
ResultSet rs = stmt.executeQuery(String.format(ENGLISH,
"SELECT completed_at FROM %s WHERE query_id = '%s'",
statusTableReference().getName(),
queryId.toString())
);
if (rs.next()) {
// status row exists and locked. get status of it.
rs.getTimestamp(1);
if (rs.wasNull()) {
return "LOCKED_NOT_COMPLETED";
}
else {
return "LOCKED_COMPLETED";
}
}
else {
return "NOT_EXISTS";
}
}
catch (SQLException ex) {
throw new DatabaseException("Failed to lock a status row", ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,49 @@
public abstract class MssqlConnectionConfig
extends AbstractJdbcConnectionConfig
{
// public abstract Optional<String> schema();
// Not defined in AbstractJdbcConnectionConfig.class
// user, database are not optional
public abstract Optional<String> instanceName();
public abstract Optional<String> schema();
public abstract boolean integratedSecurity();
public abstract boolean multiSubnetFailover();
public abstract Optional<String> applicationIntent();
public abstract Optional<String> failoverPartner();
// To do options implementations
// https://github.com/embulk/embulk-output-jdbc/blob/07b6dfea0c5296c124328d2d17bdc48240f7d159/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/ToStringMap.java
// public abstract Optional<ToStringMap> options();

@VisibleForTesting
public static MssqlConnectionConfig configure(SecretProvider secrets, Config params)
{
return ImmutableMssqlConnectionConfig.builder()
.host(secrets.getSecretOptional("host").or(() -> params.get("host", String.class)))
.port(secrets.getSecretOptional("port").transform(Integer::parseInt).or(() -> params.get("port", int.class, 1433)))
.user(secrets.getSecretOptional("user").or(() -> params.get("user", String.class)))
.password(secrets.getSecretOptional("password"))
.database(secrets.getSecretOptional("database").or(() -> params.get("database", String.class)))
.ssl(secrets.getSecretOptional("ssl").transform(Boolean::parseBoolean).or(() -> params.get("ssl", boolean.class, false)))
.connectTimeout(secrets.getSecretOptional("connect_timeout").transform(DurationParam::parse).or(() ->
params.get("connect_timeout", DurationParam.class, DurationParam.of(Duration.ofSeconds(30)))))
.socketTimeout(secrets.getSecretOptional("socket_timeout").transform(DurationParam::parse).or(() ->
params.get("socket_timeout", DurationParam.class, DurationParam.of(Duration.ofSeconds(1800)))))
// .schema(secrets.getSecretOptional("schema").or(params.getOptional("schema", String.class)))
// // Not defined in AbstractJdbcConnectionConfig.class
// // Instance name
// .instanceName(secrets.getSecretOptional("instanceName").or(params.getOptional("instanceName", String.class)))
// // integratedSecurity
// .integratedSecurity(secrets.getSecretOptional("integratedSecurity").or(params.getOptional("integratedSecurity", String.class)))
// // AG multiSubnetFailover, applicationIntent
// .multiSubnetFailover(secrets.getSecretOptional("multiSubnetFailover").transform(Boolean::parseBoolean).or(params.getOptional("multiSubnetFailover", boolean.class)))
// .applicationIntent(secrets.getSecretOptional("applicationIntent").or(params.getOptional("applicationIntent", String.class)))
// // Mirroring failoverPartner
// .failoverPartner(secrets.getSecretOptional("failoverPartner").or(params.getOptional("failoverPartner", String.class)))
.build();
.host(secrets.getSecretOptional("host").or(() -> params.get("host", String.class)))
.port(secrets.getSecretOptional("port").transform(Integer::parseInt).or(() -> params.get("port", int.class, 1433)))
.user(secrets.getSecretOptional("user").or(() -> params.get("user", String.class)))
.password(secrets.getSecretOptional("password"))
.database(secrets.getSecretOptional("database").or(() -> params.get("database", String.class)))
.ssl(secrets.getSecretOptional("ssl").transform(Boolean::parseBoolean).or(() -> params.get("ssl", boolean.class, false)))
.connectTimeout(secrets.getSecretOptional("connect_timeout").transform(DurationParam::parse).or(() ->
params.get("connect_timeout", DurationParam.class, DurationParam.of(Duration.ofSeconds(30)))))
.socketTimeout(secrets.getSecretOptional("socket_timeout").transform(DurationParam::parse).or(() ->
params.get("socket_timeout", DurationParam.class, DurationParam.of(Duration.ofSeconds(1800)))))
.schema(secrets.getSecretOptional("schema").or(params.getOptional("schema", String.class)))
.instanceName(secrets.getSecretOptional("instanceName").or(params.getOptional("instanceName", String.class)))
.integratedSecurity(secrets.getSecretOptional("integratedSecurity").transform(Boolean::parseBoolean).or(() -> params.get("integratedSecurity", boolean.class, false)))
// AG multiSubnetFailover, applicationIntent
.multiSubnetFailover(secrets.getSecretOptional("multiSubnetFailover").transform(Boolean::parseBoolean).or(() -> params.get("multiSubnetFailover", boolean.class, false)))
.applicationIntent(secrets.getSecretOptional("applicationIntent").or(params.getOptional("applicationIntent", String.class)))
// Mirroring failoverPartner
.failoverPartner(secrets.getSecretOptional("failoverPartner").or(params.getOptional("failoverPartner", String.class)))
// options
//.options(secrets.getSecretOptional("options").or(params.getOptional("options", String.class, "{}")))
.build();
}

@Override
public String jdbcDriverName()
{
// doesn't support JtdsDriver
return "com.microsoft.sqlserver.jdbc.SQLServerDriver";
}

Expand All @@ -72,7 +82,9 @@ public Properties buildProperties()
// mssql jdbc properties - https://docs.microsoft.com/en-us/sql/connect/jdbc/setting-the-connection-properties?view=sql-server-2017
Properties props = new Properties();

props.setProperty("user", user());
if (!integratedSecurity()) {
props.setProperty("user", user());
}
if (password().isPresent()) {
props.setProperty("password", password().get());
}
Expand All @@ -89,26 +101,19 @@ public Properties buildProperties()
props.setProperty("trustServerCertificate", "true");
} else {
props.setProperty("encrypt", "false");
}
// Not defined in AbstractJdbcConnectionConfig.class
// if (user().isPresent()) {
// props.setProperty("user", user().get());
// }
// if (instanceName().isPresent()) {
// props.setProperty("instanceName", instanceName().get());
// }
// if (integratedSecurity().isPresent()) {
// props.setProperty("integratedSecurity", integratedSecurity().get());
// }
// if (multiSubnetFailover().isPresent() && multiSubnetFailover()) {
// props.setProperty("multiSubnetFailover", "true");
// }
// if (applicationIntent().isPresent()) {
// props.setProperty("applicationIntent", applicationIntent().get());
// }
// if (failoverPartner().isPresent()) {
// props.setProperty("failoverPartner", failoverPartner().get());
// }
}
if (multiSubnetFailover()) {
props.setProperty("multiSubnetFailover", "true");
}
if (applicationIntent().isPresent()) {
props.setProperty("applicationIntent", applicationIntent().get());
}
if (failoverPartner().isPresent()) {
props.setProperty("failoverPartner", failoverPartner().get());
}
// if (options().isPresent()) {
// props.setProperty("options", options().get());
// }
props.setProperty("applicationName", "digdag");

return props;
Expand All @@ -124,14 +129,35 @@ public String toString()
@Override
public String url()
{
// https://github.com/embulk/embulk-output-jdbc/blob/07b6dfea0c5296c124328d2d17bdc48240f7d159/embulk-output-sqlserver/src/main/java/org/embulk/output/SQLServerOutputPlugin.java
// jdbc:sqlserver://localhost:1433;databaseName=master;user=sa;password=your_password
return String.format(ENGLISH, "jdbc:%s://%s:%d;databaseName=%s", jdbcProtocolName(), host(), port(), database());
//return String.format(ENGLISH, "jdbc:%s://%s:%d;databaseName=%s", jdbcProtocolName(), host(), port(), database());
StringBuilder urlBuilder = new StringBuilder();
if (instanceName().isPresent()) {
urlBuilder.append(String.format("jdbc:%s://%s\\%s",
jdbcProtocolName(),host(), instanceName().get()));
} else {
urlBuilder.append(String.format("jdbc:%s://%s:%d",
jdbcProtocolName(),host(), port()));
}
// database is not optional in AbstractJdbcConnectionConfig
if (!database().equals("default")) {
urlBuilder.append(";databaseName=" + database());
}
if (integratedSecurity()) {
urlBuilder.append(";integratedSecurity=" + "true");
} else {
// user is not optional in AbstractJdbcConnectionConfig
// if (!user().isPresent()) {
// throw new IllegalArgumentException("Field 'user' is not set.");
// }
if (!password().isPresent()) {
throw new IllegalArgumentException("Field 'password' is not set.");
}
}
return String.format(ENGLISH, urlBuilder.toString());
}

//
// Class.forName("com.mysql.jdbc.Driver") does not work as I expected.
// That's why I override this method.
//
@Override
public Connection openConnection()
{
Expand Down

0 comments on commit 37a507a

Please sign in to comment.