From cfe1f2d5299d373a62d7be6f2ea8e163d8d56f24 Mon Sep 17 00:00:00 2001 From: kulmam92 Date: Sat, 3 Nov 2018 12:35:03 -0700 Subject: [PATCH] First release First release --- .DS_Store | Bin 0 -> 8196 bytes CHANGELOG.md | 3 +- README.md | 7 +- sample/plugin.dig | 8 +- sample/test.sql | 1 - sample/test.txt | 5 +- sample/test1.sql | 5 + sample/test2.sql | 1 + .../digdag/plugin/mssql/CsvWriter.java | 124 +++++++ .../digdag/plugin/mssql/MssqlConnection.java | 58 +++- .../plugin/mssql/MssqlConnectionConfig.java | 44 ++- .../plugin/mssql/MssqlOperatorFactory.java | 317 +++++++++++++++++- .../digdag/plugin/mssql/MssqlResultSet.java | 2 +- .../mssql/MssqlOperatorFactoryTest.java | 2 + 14 files changed, 542 insertions(+), 35 deletions(-) create mode 100644 .DS_Store delete mode 100644 sample/test.sql create mode 100644 sample/test1.sql create mode 100644 sample/test2.sql create mode 100644 src/main/java/com/github/kulmam92/digdag/plugin/mssql/CsvWriter.java diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..0d470d738c4d8fdbb08c49431003ff3730cf9937 GIT binary patch literal 8196 zcmeHM-EI;=6h2cZEVM=vP1?)7G4Y}jP-~kQ6G}^z#J`Bq3pMR-m(Wd?-3p6D5yOq| zpuT~R;uH8FK7fAnBkm4xXAHqPWagVabI$yHb9R>lM8xU?uS&E{L>3Ci(gvzI!uxrd zku{~c1Z421=+q4&7gwbjTca=x7zPXjh5^HXVPH8Jz&o3Zwcx!kdrfT^Fbw>c4DjcJ zjl!{}a3WDWI#7ue0GNeZ2-@PyKhWk1U`^pfA~X;tU4hb-sT6~$+<{4V^w*UAM51&j zCT7MsYG$S~6sA%SW(#v-HHoG+3>XGx8Q{5lhFtO~YzC?P{Wf~8{uCnIl$AT|pQ|kY zn|mJk{kT&3&ek&7^<3VvidOML@m1?A8n$959`x%@?*+el?8IlTYCnylVA^)vXMTHl zT)KQ1g|Xv@?VfPpe+8&MeclUSfo7b!39ecF5 zS3t?{^EZC$C}X`Xc7z8k$CfA}o-&D67lF3Rg^{RJ(xy*I617(`(Q8g9J~Y6c)D zAWx&`D|(akBrI2zqQn+8nl}H?L}ltwj~wzqT02b|+tNteq(iz#2lSBcQiJZ(BVd`j zI@#=_=C_5>eaz!wo(^53bIdhB3sH=#F7@$w4P66Hf)O)%2{FD+_l)|`u^b%rABT_2P0agNohhNdDx z3(EpM0f`56L=S*-0*fCnvl7RfL*O_F86Hqt@bbe=*=Atdqf_m85aKB-oP!a=z%nwB zk&D{q`F~^a@BhoVbh9SIfMMV-F~G7%t)n^y`~Ce`AD8FaHp&SK7j8F^C|pp9bQ~zs map3e1L$qy}a-W*Qi9~21|NMslJ;_PEt` operator plugin to execute a query on MsSQL server. +Created by converting https://github.com/hiroyuki-sato/digdag-plugin-mysql to mssql version. + +**Warning** +* Doesn't support windows authentication yet. +* You can't use GO in the sql file. If you need to use that, you may call SQLCMD using sh operator. ## configuration @@ -19,7 +24,7 @@ _export: mssql: host: localhost user: sa - database: digdag_test + database: tempdb ssl: true +step1: diff --git a/sample/plugin.dig b/sample/plugin.dig index bda5427..5f69842 100644 --- a/sample/plugin.dig +++ b/sample/plugin.dig @@ -2,6 +2,7 @@ _export: plugin: repositories: - file://${repos} + #- file:///vmware-host/Shared Folders/Documents/GitHub/digdag-plugin-mssql/build/repo/ #- https://dl.bintray.com/digdag/maven #- https://jitpack.io dependencies: @@ -14,5 +15,8 @@ _export: ssl: true +step1: - mssql>: test.sql - download_file: test.txt + mssql>: test1.sql + ++step2: + mssql>: test2.sql + download_file: test.txt \ No newline at end of file diff --git a/sample/test.sql b/sample/test.sql deleted file mode 100644 index c7e96be..0000000 --- a/sample/test.sql +++ /dev/null @@ -1 +0,0 @@ -select * from test diff --git a/sample/test.txt b/sample/test.txt index af0807e..a614475 100644 --- a/sample/test.txt +++ b/sample/test.txt @@ -1,3 +1,2 @@ -seq,col1 -1,10 -2,20 +col1 +10 diff --git a/sample/test1.sql b/sample/test1.sql new file mode 100644 index 0000000..069f09f --- /dev/null +++ b/sample/test1.sql @@ -0,0 +1,5 @@ +IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 't1')begin + create table t1(col1 int); +end + +insert into t1 values(10); \ No newline at end of file diff --git a/sample/test2.sql b/sample/test2.sql new file mode 100644 index 0000000..56e87d2 --- /dev/null +++ b/sample/test2.sql @@ -0,0 +1 @@ +select * from t1; \ No newline at end of file diff --git a/src/main/java/com/github/kulmam92/digdag/plugin/mssql/CsvWriter.java b/src/main/java/com/github/kulmam92/digdag/plugin/mssql/CsvWriter.java new file mode 100644 index 0000000..a09360f --- /dev/null +++ b/src/main/java/com/github/kulmam92/digdag/plugin/mssql/CsvWriter.java @@ -0,0 +1,124 @@ +package com.github.kulmam92.digdag.plugin.mssql; + +import java.io.IOException; +import java.io.Writer; +import java.io.Closeable; +import java.util.List; +import java.util.concurrent.Exchanger; + +class CsvWriter + implements Closeable +{ + private final Writer out; + + CsvWriter(Writer out) + { + this.out = out; + } + + void addCsvHeader(List columnNames) + throws IOException + { + boolean first = true; + for (String columnName : columnNames) { + if (first) { first = false; } + else { out.write(DELIMITER_CHAR); } + addCsvText(columnName); + } + out.write("\r\n"); + } + + void addCsvRow(List row) + throws IOException + { + for (int i = 0; i < row.size(); i++) { + if (i > 0) { + out.write(DELIMITER_CHAR); + } + String v = row.get(i); + addCsvText(v); + } + out.write("\r\n"); + } + + private void addCsvText(String value) + throws IOException + { + if (value != null) { + out.write(escapeAndQuoteCsvValue(value)); + } + } + + private static final char DELIMITER_CHAR = ','; + private static final char ESCAPE_CHAR = '"'; + private static final char QUOTE_CHAR = '"'; + + private String escapeAndQuoteCsvValue(String v) + { + if (v.isEmpty()) { + StringBuilder sb = new StringBuilder(); + sb.append(QUOTE_CHAR); + sb.append(QUOTE_CHAR); + return sb.toString(); + } + + StringBuilder escapedValue = new StringBuilder(); + char previousChar = ' '; + + boolean isRequireQuote = false; + + for (int i = 0; i < v.length(); i++) { + char c = v.charAt(i); + + if (c == QUOTE_CHAR) { + escapedValue.append(ESCAPE_CHAR); + escapedValue.append(c); + isRequireQuote = true; + } + else if (c == '\r') { + escapedValue.append('\n'); + isRequireQuote = true; + } + else if (c == '\n') { + if (previousChar != '\r') { + escapedValue.append('\n'); + isRequireQuote = true; + } + } + else if (c == DELIMITER_CHAR) { + escapedValue.append(c); + isRequireQuote = true; + } + else { + escapedValue.append(c); + } + previousChar = c; + } + + if (isRequireQuote) { + StringBuilder sb = new StringBuilder(); + sb.append(QUOTE_CHAR); + sb.append(escapedValue); + sb.append(QUOTE_CHAR); + return sb.toString(); + } + else { + return escapedValue.toString(); + } + } + + @Override + public void close() + throws IOException + { + out.close(); + } + + @Override + public String toString() + { + return "CSVWriter{" + + "out=" + out + + '}'; + } +} \ No newline at end of file diff --git a/src/main/java/com/github/kulmam92/digdag/plugin/mssql/MssqlConnection.java b/src/main/java/com/github/kulmam92/digdag/plugin/mssql/MssqlConnection.java index 257ad3d..159db13 100644 --- a/src/main/java/com/github/kulmam92/digdag/plugin/mssql/MssqlConnection.java +++ b/src/main/java/com/github/kulmam92/digdag/plugin/mssql/MssqlConnection.java @@ -35,6 +35,56 @@ protected MssqlConnection(Connection connection) super(connection); } + @Override + public String buildCreateTableStatement(String selectSql, TableReference targetTable) + { + String escapedRef = escapeTableReference(targetTable); + return String.format(ENGLISH, + "IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '%s') \n" + + "BEGIN \n" + + "DROP TABLE %s; \n" + + "END \n" + + "SELECT * INTO %s \n" + + "FROM ( \n" + + "%s \n" + + ") t ", + escapedRef, escapedRef, selectSql); + } + + @Override + public String buildInsertStatement(String selectSql, TableReference targetTable) + { + String escapedRef = escapeTableReference(targetTable); + return String.format(ENGLISH, + "INSERT INTO %s\n%s", + escapedRef, selectSql); + } + + // To do - make it compatable with MS + @Override + public SQLException validateStatement(String sql) + { + // Here uses nativeSQL() instead of Connection#prepareStatement because + // prepareStatement() validates a SQL by creating a server-side prepared statement + // and RDBMS wrongly decides that the SQL is broken in this case: + // * the SQL includes multiple statements + // * a statement creates a table and a later statement uses it in the SQL + try { + connection.nativeSQL(sql); + return null; + } + catch (SQLException ex) { + if (ex.getSQLState().startsWith("42")) { // to do + // SQL error class 42 + return ex; + } + throw new DatabaseException("Failed to validate statement", ex); + } + } + + // To do - implement sqlcmd support + // https://github.com/embulk/embulk-output-jdbc/blob/07b6dfea0c5296c124328d2d17bdc48240f7d159/embulk-output-sqlserver/src/test/java/org/embulk/output/sqlserver/SQLServerTests.java + @Override public void executeReadOnlyQuery(String sql, Consumer resultHandler) throws NotReadOnlyException @@ -102,10 +152,10 @@ TableReference statusTableReference() String buildCreateTable() { return String.format(ENGLISH, - "IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '%s') " + - "BEGIN " + - "CREATE TABLE %s " + - "(query_id varchar(50) NOT NULL UNIQUE, created_at datetime2 NOT NULL, completed_at datetime2) " + + "IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '%s') \n" + + "BEGIN \n" + + "CREATE TABLE %s \n" + + "(query_id varchar(50) NOT NULL UNIQUE, created_at datetime2 NOT NULL, completed_at datetime2) \n" + "END", statusTableReference.getName(), statusTableReference.getName()); diff --git a/src/main/java/com/github/kulmam92/digdag/plugin/mssql/MssqlConnectionConfig.java b/src/main/java/com/github/kulmam92/digdag/plugin/mssql/MssqlConnectionConfig.java index 517810e..c37b84b 100644 --- a/src/main/java/com/github/kulmam92/digdag/plugin/mssql/MssqlConnectionConfig.java +++ b/src/main/java/com/github/kulmam92/digdag/plugin/mssql/MssqlConnectionConfig.java @@ -19,6 +19,9 @@ import org.immutables.value.Value; import io.digdag.standards.operator.jdbc.AbstractJdbcConnectionConfig; +// logging +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Value.Immutable public abstract class MssqlConnectionConfig @@ -32,10 +35,13 @@ public abstract class MssqlConnectionConfig public abstract boolean multiSubnetFailover(); public abstract Optional applicationIntent(); public abstract Optional failoverPartner(); - // To do options implementations + // To do options implementations - only accept string not map // https://github.com/embulk/embulk-output-jdbc/blob/07b6dfea0c5296c124328d2d17bdc48240f7d159/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/ToStringMap.java - // public abstract Optional options(); + public abstract Optional options(); + // logging + private static Logger logger = LoggerFactory.getLogger(MssqlConnectionConfig.class); + @VisibleForTesting public static MssqlConnectionConfig configure(SecretProvider secrets, Config params) { @@ -57,9 +63,9 @@ public static MssqlConnectionConfig configure(SecretProvider secrets, Config par .multiSubnetFailover(secrets.getSecretOptional("multiSubnetFailover").transform(Boolean::parseBoolean).or(() -> params.get("multiSubnetFailover", boolean.class, false))) .applicationIntent(secrets.getSecretOptional("applicationIntent").or(params.getOptional("applicationIntent", String.class))) // Mirroring failoverPartner - .failoverPartner(secrets.getSecretOptional("failoverPartner").or(params.getOptional("failoverPartner", String.class))) + .failoverPartner(secrets.getSecretOptional("failoverPartner").or(params.getOptional("failoverPartner", String.class))) // options - //.options(secrets.getSecretOptional("options").or(params.getOptional("options", String.class, "{}"))) + .options(secrets.getSecretOptional("options").or(params.getOptional("options", String.class))) .build(); } @@ -103,17 +109,22 @@ public Properties buildProperties() props.setProperty("encrypt", "false"); } if (multiSubnetFailover()) { - props.setProperty("multiSubnetFailover", "true"); + props.setProperty("multiSubnetFailover", "true"); } if (applicationIntent().isPresent()) { - props.setProperty("applicationIntent", applicationIntent().get()); + props.setProperty("applicationIntent", applicationIntent().get()); } if (failoverPartner().isPresent()) { - props.setProperty("failoverPartner", failoverPartner().get()); + props.setProperty("failoverPartner", failoverPartner().get()); } - // if (options().isPresent()) { - // props.setProperty("options", options().get()); - // } + if (options().isPresent()) { + String options = options().get(); + String[] arrOptions = options.split(";"); + for (int i=0; i < arrOptions.length; i++) { + String[] arrOption = arrOptions[i].split("="); + props.setProperty(arrOption[0], arrOption[1]); + } + } props.setProperty("applicationName", "digdag"); return props; @@ -131,19 +142,16 @@ public String url() { // https://github.com/embulk/embulk-output-jdbc/blob/07b6dfea0c5296c124328d2d17bdc48240f7d159/embulk-output-sqlserver/src/main/java/org/embulk/output/SQLServerOutputPlugin.java // jdbc:sqlserver://localhost:1433;databaseName=master;user=sa;password=your_password - //return String.format(ENGLISH, "jdbc:%s://%s:%d;databaseName=%s", jdbcProtocolName(), host(), port(), database()); StringBuilder urlBuilder = new StringBuilder(); if (instanceName().isPresent()) { - urlBuilder.append(String.format("jdbc:%s://%s\\%s", - jdbcProtocolName(),host(), instanceName().get())); + urlBuilder.append(String.format("jdbc:%s://%s\\%s:%d", + jdbcProtocolName(),host(), instanceName().get(), port())); } else { urlBuilder.append(String.format("jdbc:%s://%s:%d", jdbcProtocolName(),host(), port())); } // database is not optional in AbstractJdbcConnectionConfig - if (!database().equals("default")) { - urlBuilder.append(";databaseName=" + database()); - } + urlBuilder.append(";databaseName=" + database()); if (integratedSecurity()) { urlBuilder.append(";integratedSecurity=" + "true"); } else { @@ -155,7 +163,9 @@ public String url() throw new IllegalArgumentException("Field 'password' is not set."); } } - return String.format(ENGLISH, urlBuilder.toString()); + String jdbcUrl = urlBuilder.toString(); + //logger.info("url: {}", jdbcUrl); + return String.format(ENGLISH, jdbcUrl); } @Override diff --git a/src/main/java/com/github/kulmam92/digdag/plugin/mssql/MssqlOperatorFactory.java b/src/main/java/com/github/kulmam92/digdag/plugin/mssql/MssqlOperatorFactory.java index bebd9b6..8a881b2 100644 --- a/src/main/java/com/github/kulmam92/digdag/plugin/mssql/MssqlOperatorFactory.java +++ b/src/main/java/com/github/kulmam92/digdag/plugin/mssql/MssqlOperatorFactory.java @@ -1,28 +1,57 @@ package com.github.kulmam92.digdag.plugin.mssql; +import com.google.inject.Inject; +import com.google.common.base.Optional; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + import io.digdag.client.config.Config; -import io.digdag.spi.Operator; +import io.digdag.client.config.ConfigException; +import io.digdag.client.config.ConfigElement; +//import io.digdag.spi.Operator; import io.digdag.spi.OperatorContext; import io.digdag.spi.OperatorFactory; import io.digdag.spi.SecretProvider; import io.digdag.spi.TemplateEngine; +import io.digdag.spi.ImmutableTaskResult; +import io.digdag.spi.TaskExecutionException; +import io.digdag.spi.TaskResult; + import io.digdag.standards.operator.jdbc.AbstractJdbcJobOperator; +import io.digdag.standards.operator.jdbc.StoreLastResultsOption; +import io.digdag.standards.operator.jdbc.TableReference; +import io.digdag.standards.operator.jdbc.TransactionHelper; +import io.digdag.standards.operator.jdbc.NotReadOnlyException; +import io.digdag.standards.operator.jdbc.LockConflictException; +import io.digdag.standards.operator.jdbc.DatabaseException; +import io.digdag.standards.operator.jdbc.NoTransactionHelper; +import io.digdag.standards.operator.jdbc.JdbcResultSet; -import com.google.inject.Inject; +//CsvWriter is not public +//import io.digdag.standards.operator.jdbc.CsvWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.nio.charset.StandardCharsets.UTF_8; + public class MssqlOperatorFactory implements OperatorFactory { private static final String OPERATOR_TYPE = "mssql"; - - private final TemplateEngine templateEngine; private final Config systemConfig; + private final TemplateEngine templateEngine; private static Logger logger = LoggerFactory.getLogger(MssqlOperatorFactory.class); + @Inject public MssqlOperatorFactory(Config systemConfig,TemplateEngine templateEngine) { this.templateEngine = templateEngine; @@ -36,7 +65,7 @@ public String getType() } @Override - public Operator newOperator(OperatorContext context) + public MssqlOperator newOperator(OperatorContext context) { return new MssqlOperator(systemConfig,context,templateEngine); } @@ -44,9 +73,29 @@ public Operator newOperator(OperatorContext context) static class MssqlOperator extends AbstractJdbcJobOperator { + private static final String POLL_INTERVAL = "pollInterval"; + private static final int INITIAL_POLL_INTERVAL = 1; + private static final int MAX_POLL_INTERVAL = 1200; + private static final String QUERY_ID = "queryId"; + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + private final long maxStoreLastResultsRows; + private final int maxStoreLastResultsColumns; + private final int maxStoreLastResultsValueSize; + + private Optional getConfigValue(Config systemConfig, String key, Class clazz) + { + return systemConfig.getOptional(String.format("config.%s.%s", type(), key), clazz) + .or(systemConfig.getOptional(String.format("config.jdbc.%s", key), clazz)); + } + MssqlOperator(Config systemConfig,OperatorContext context, TemplateEngine templateEngine) { super(systemConfig,context, templateEngine); + this.maxStoreLastResultsRows = getConfigValue(systemConfig, "max_store_last_results_rows", long.class).or(8192L); + this.maxStoreLastResultsColumns = getConfigValue(systemConfig, "max_store_last_results_columns", int.class).or(64); + this.maxStoreLastResultsValueSize = getConfigValue(systemConfig, "max_store_last_results_value_size", int.class).or(256); } @Override @@ -72,5 +121,263 @@ protected SecretProvider getSecretsForConnectionConfig() { return context.getSecrets().getSecrets(type()); } + + // Incldued this due to transaction keyword needs to be changed in jdbc.AbstractJdbcConnection + @Override + protected TaskResult run(Config params, Config state, MssqlConnectionConfig connectionConfig) + { + String query = workspace.templateCommand(templateEngine, params, "query", UTF_8); + + Optional insertInto = params.getOptional("insert_into", TableReference.class); + Optional createTable = params.getOptional("create_table", TableReference.class); + + int queryModifier = 0; + if (insertInto.isPresent()) queryModifier++; + if (createTable.isPresent()) queryModifier++; + if (queryModifier > 1) { + throw new ConfigException("Can't use both of insert_into and create_table"); + } + + Optional downloadFile = params.getOptional("download_file", String.class); + if (downloadFile.isPresent() && queryModifier > 0) { + throw new ConfigException("Can't use download_file with insert_into or create_table"); + } + + StoreLastResultsOption storeResultsOption = params.get("store_last_results", StoreLastResultsOption.class, StoreLastResultsOption.FALSE); + if (storeResultsOption.isEnabled() && queryModifier > 0) { + throw new ConfigException("Can't use store_last_results with insert_into or create_table"); + } + if (downloadFile.isPresent() && storeResultsOption.isEnabled()) { + throw new ConfigException("Can't use both download_file and store_last_results at once"); + } + + boolean readOnlyMode = downloadFile.isPresent() || storeResultsOption.isEnabled(); + + boolean strictTransaction = strictTransaction(params); + + UUID queryId; + if (readOnlyMode) { + // queryId is not used + queryId = null; + } + else { + // generate query id + if (!state.has(QUERY_ID)) { + // this is the first execution of this task + logger.debug("Generating query id for a new {} task", type()); + queryId = UUID.randomUUID(); + state.set(QUERY_ID, queryId); + throw TaskExecutionException.ofNextPolling(0, ConfigElement.copyOf(state)); + } + queryId = state.get(QUERY_ID, UUID.class); + } + + try (MssqlConnection connection = connect(connectionConfig)) { + Exception statementError = connection.validateStatement(query); + if (statementError != null) { + throw new ConfigException("Given query is invalid", statementError); + } + + if (readOnlyMode) { + ImmutableTaskResult.Builder builder = TaskResult.defaultBuilder(request); + if (downloadFile.isPresent()) { + connection.executeReadOnlyQuery(query, (results) -> downloadResultsToFile(results, downloadFile.get())); + } + else if (storeResultsOption.isEnabled()) { + connection.executeReadOnlyQuery(query, (results) -> storeResultsInTaskResult(results, storeResultsOption, builder)); + } + else { + connection.executeReadOnlyQuery(query, (results) -> skipResults(results)); + } + return builder.build(); + } + else { + String statement; + boolean statementMayReturnResults; + if (insertInto.isPresent() || createTable.isPresent()) { + if (insertInto.isPresent()) { + statement = connection.buildInsertStatement(query, insertInto.get()); + } + else { + statement = connection.buildCreateTableStatement(query, createTable.get()); + } + + Exception modifiedStatementError = connection.validateStatement(statement); + if (modifiedStatementError != null) { + throw new ConfigException("Given query is valid but failed to build INSERT INTO statement (this may happen if given query includes multiple statements or semicolon \";\"?)", modifiedStatementError); + } + statementMayReturnResults = false; + logger.debug("Running a modified statement: {}", statement); + } + else { + statement = query; + statementMayReturnResults = true; + } + + TransactionHelper txHelper; + if (strictTransaction) { + txHelper = connection.getStrictTransactionHelper( + statusTableSchema, statusTableName, statusTableCleanupDuration.getDuration()); + } + else { + txHelper = new NoTransactionHelper(); + } + + txHelper.prepare(queryId); + + boolean executed = txHelper.lockedTransaction(queryId, () -> { + if (statementMayReturnResults) { + connection.executeScript(statement); + } + else { + connection.executeUpdate(statement); + } + }); + + if (!executed) { + logger.debug("Query is already completed according to status table. Skipping statement execution."); + } + + try { + txHelper.cleanup(); + } + catch (Exception ex) { + logger.warn("Error during cleaning up status table. Ignoring.", ex); + } + + return TaskResult.defaultBuilder(request).build(); + } + } + catch (NotReadOnlyException ex) { + throw new ConfigException("Query must be read-only if download_file is set", ex.getCause()); + } + catch (LockConflictException ex) { + int pollingInterval = state.get(POLL_INTERVAL, Integer.class, INITIAL_POLL_INTERVAL); + // Set next interval for exponential backoff + state.set(POLL_INTERVAL, Math.min(pollingInterval * 2, MAX_POLL_INTERVAL)); + throw TaskExecutionException.ofNextPolling(pollingInterval, ConfigElement.copyOf(state)); + } + catch (DatabaseException ex) { + // expected error that should suppress stacktrace by default + String message = String.format("%s [%s]", ex.getMessage(), ex.getCause().getMessage()); + throw new TaskExecutionException(message, ex); + } + } + + private void downloadResultsToFile(JdbcResultSet results, String fileName) + { + try (CsvWriter csvWriter = new CsvWriter(workspace.newBufferedWriter(fileName, UTF_8))) { + List columnNames = results.getColumnNames(); + csvWriter.addCsvHeader(columnNames); + while (true) { + List values = results.next(); + if (values == null) { + break; + } + List row = values.stream().map(value -> { + if (value == null) { + return (String) value; + } + else if (value instanceof String) { + return (String) value; + } + else { + return value.toString(); // TODO use jackson to serialize? + } + }) + .collect(Collectors.toList()); + csvWriter.addCsvRow(row); + } + } + catch (IOException ex) { + throw Throwables.propagate(ex); + } + } + + private void skipResults(JdbcResultSet results) + { + while (results.next() != null) + ; + } + + private void storeResultsInTaskResult(JdbcResultSet jdbcResultSet, StoreLastResultsOption option, ImmutableTaskResult.Builder builder) + { + int columnsCount = jdbcResultSet.getColumnNames().size(); + if (columnsCount > maxStoreLastResultsColumns) { + throw new TaskExecutionException("The number of result columns exceeded the limit: " + columnsCount + " > " + maxStoreLastResultsColumns); + } + + Object lastResults; + switch (option) { + case ALL: + lastResults = collectAllResults(jdbcResultSet); + break; + case FIRST: + lastResults = collectFirstResults(jdbcResultSet); + break; + default: + throw new AssertionError("Unexpected StoreLastResultsOption: " + option); + } + + Config storeParams = request.getConfig().getFactory().create(); + storeParams.getNestedOrSetEmpty(type()) + .set("last_results", lastResults); + builder.storeParams(storeParams); + } + + private List> collectAllResults(JdbcResultSet jdbcResultSet) + { + List columnNames = jdbcResultSet.getColumnNames(); + ImmutableList.Builder> lastResults = ImmutableList.builder(); + + long rows = 0; + while (true) { + List values = jdbcResultSet.next(); + if (values == null) { + break; + } + + rows += 1; + if (rows > maxStoreLastResultsRows) { + throw new TaskExecutionException("The number of result rows exceeded the limit: " + rows + " > " + maxStoreLastResultsRows); + } + + lastResults.add(buildResultsMap(columnNames, values)); + } + + return lastResults.build(); + } + + private Map collectFirstResults(JdbcResultSet jdbcResultSet) + { + List values = jdbcResultSet.next(); + if (values == null) { + return new HashMap<>(); + } + + Map lastResults = buildResultsMap(jdbcResultSet.getColumnNames(), values); + + // consume all results + while (jdbcResultSet.skip()) + ; + + return lastResults; + } + + private Map buildResultsMap(List columnNames, List values) + { + HashMap map = new HashMap<>(); + for (int i = 0; i < columnNames.size(); i++) { + Object v = values.get(i); + if (v instanceof String) { + String s = (String) v; + if (s.length() > maxStoreLastResultsValueSize) { + throw new TaskExecutionException("The size of result value exceeded the limit: " + s.length() + " > " + maxStoreLastResultsValueSize); + } + } + map.put(columnNames.get(i), v); + } + return map; + } } } diff --git a/src/main/java/com/github/kulmam92/digdag/plugin/mssql/MssqlResultSet.java b/src/main/java/com/github/kulmam92/digdag/plugin/mssql/MssqlResultSet.java index 2bc2dcf..48f54c3 100644 --- a/src/main/java/com/github/kulmam92/digdag/plugin/mssql/MssqlResultSet.java +++ b/src/main/java/com/github/kulmam92/digdag/plugin/mssql/MssqlResultSet.java @@ -15,7 +15,7 @@ public class MssqlResultSet @Override protected Object serializableObject(Object raw) { - // TODO add more conversion logics here. MySQL jdbc may return objects that are not serializable using Jackson. + // TODO add more conversion logics here. MSSQL jdbc may return objects that are not serializable using Jackson. return raw; } diff --git a/src/test/java/com/github/kulmam92/digdag/plugin/mssql/MssqlOperatorFactoryTest.java b/src/test/java/com/github/kulmam92/digdag/plugin/mssql/MssqlOperatorFactoryTest.java index 0f5240f..a1a3af6 100644 --- a/src/test/java/com/github/kulmam92/digdag/plugin/mssql/MssqlOperatorFactoryTest.java +++ b/src/test/java/com/github/kulmam92/digdag/plugin/mssql/MssqlOperatorFactoryTest.java @@ -24,6 +24,8 @@ public class MssqlOperatorFactoryTest public void setUp() { operatorFactory = testHelper.injector().getInstance(MssqlOperatorFactory.class); + System.out.println("GetKEY"); + System.out.println(operatorFactory.getType()); } @Test