Skip to content

Commit

Permalink
First release
Browse files Browse the repository at this point in the history
First release
  • Loading branch information
kulmam92 committed Nov 3, 2018
1 parent 37a507a commit cfe1f2d
Show file tree
Hide file tree
Showing 14 changed files with 542 additions and 35 deletions.
Binary file added .DS_Store
Binary file not shown.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# ChangeLog

## 0.1.1 (2018-10-19)
## 0.1.1 (2018-11-03)

* First release
* Created by converting https://github.com/hiroyuki-sato/digdag-plugin-mysql to mssql version
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# digdag-plugin-mssql

Digdag `mssql>` operator plugin to execute a query on MsSQL server.
Created by converting https://github.com/hiroyuki-sato/digdag-plugin-mysql to mssql version.

**Warning**
* Doesn't support windows authentication yet.
* You can't use GO in the sql file. If you need to use that, you may call SQLCMD using sh operator.

## configuration

Expand All @@ -19,7 +24,7 @@ _export:
mssql:
host: localhost
user: sa
database: digdag_test
database: tempdb
ssl: true

+step1:
Expand Down
8 changes: 6 additions & 2 deletions sample/plugin.dig
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ _export:
plugin:
repositories:
- file://${repos}
#- file:///vmware-host/Shared Folders/Documents/GitHub/digdag-plugin-mssql/build/repo/
#- https://dl.bintray.com/digdag/maven
#- https://jitpack.io
dependencies:
Expand All @@ -14,5 +15,8 @@ _export:
ssl: true

+step1:
mssql>: test.sql
download_file: test.txt
mssql>: test1.sql

+step2:
mssql>: test2.sql
download_file: test.txt
1 change: 0 additions & 1 deletion sample/test.sql

This file was deleted.

5 changes: 2 additions & 3 deletions sample/test.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
seq,col1
1,10
2,20
col1
10
5 changes: 5 additions & 0 deletions sample/test1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 't1')begin
create table t1(col1 int);
end

insert into t1 values(10);
1 change: 1 addition & 0 deletions sample/test2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select * from t1;
124 changes: 124 additions & 0 deletions src/main/java/com/github/kulmam92/digdag/plugin/mssql/CsvWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package com.github.kulmam92.digdag.plugin.mssql;

import java.io.IOException;
import java.io.Writer;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.Exchanger;

class CsvWriter
implements Closeable
{
private final Writer out;

CsvWriter(Writer out)
{
this.out = out;
}

void addCsvHeader(List<String> columnNames)
throws IOException
{
boolean first = true;
for (String columnName : columnNames) {
if (first) { first = false; }
else { out.write(DELIMITER_CHAR); }
addCsvText(columnName);
}
out.write("\r\n");
}

void addCsvRow(List<String> row)
throws IOException
{
for (int i = 0; i < row.size(); i++) {
if (i > 0) {
out.write(DELIMITER_CHAR);
}
String v = row.get(i);
addCsvText(v);
}
out.write("\r\n");
}

private void addCsvText(String value)
throws IOException
{
if (value != null) {
out.write(escapeAndQuoteCsvValue(value));
}
}

private static final char DELIMITER_CHAR = ',';
private static final char ESCAPE_CHAR = '"';
private static final char QUOTE_CHAR = '"';

private String escapeAndQuoteCsvValue(String v)
{
if (v.isEmpty()) {
StringBuilder sb = new StringBuilder();
sb.append(QUOTE_CHAR);
sb.append(QUOTE_CHAR);
return sb.toString();
}

StringBuilder escapedValue = new StringBuilder();
char previousChar = ' ';

boolean isRequireQuote = false;

for (int i = 0; i < v.length(); i++) {
char c = v.charAt(i);

if (c == QUOTE_CHAR) {
escapedValue.append(ESCAPE_CHAR);
escapedValue.append(c);
isRequireQuote = true;
}
else if (c == '\r') {
escapedValue.append('\n');
isRequireQuote = true;
}
else if (c == '\n') {
if (previousChar != '\r') {
escapedValue.append('\n');
isRequireQuote = true;
}
}
else if (c == DELIMITER_CHAR) {
escapedValue.append(c);
isRequireQuote = true;
}
else {
escapedValue.append(c);
}
previousChar = c;
}

if (isRequireQuote) {
StringBuilder sb = new StringBuilder();
sb.append(QUOTE_CHAR);
sb.append(escapedValue);
sb.append(QUOTE_CHAR);
return sb.toString();
}
else {
return escapedValue.toString();
}
}

@Override
public void close()
throws IOException
{
out.close();
}

@Override
public String toString()
{
return "CSVWriter{" +
"out=" + out +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,56 @@ protected MssqlConnection(Connection connection)
super(connection);
}

@Override
public String buildCreateTableStatement(String selectSql, TableReference targetTable)
{
String escapedRef = escapeTableReference(targetTable);
return String.format(ENGLISH,
"IF EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '%s') \n" +
"BEGIN \n" +
"DROP TABLE %s; \n" +
"END \n" +
"SELECT * INTO %s \n" +
"FROM ( \n" +
"%s \n" +
") t ",
escapedRef, escapedRef, selectSql);
}

@Override
public String buildInsertStatement(String selectSql, TableReference targetTable)
{
String escapedRef = escapeTableReference(targetTable);
return String.format(ENGLISH,
"INSERT INTO %s\n%s",
escapedRef, selectSql);
}

// To do - make it compatable with MS
@Override
public SQLException validateStatement(String sql)
{
// Here uses nativeSQL() instead of Connection#prepareStatement because
// prepareStatement() validates a SQL by creating a server-side prepared statement
// and RDBMS wrongly decides that the SQL is broken in this case:
// * the SQL includes multiple statements
// * a statement creates a table and a later statement uses it in the SQL
try {
connection.nativeSQL(sql);
return null;
}
catch (SQLException ex) {
if (ex.getSQLState().startsWith("42")) { // to do
// SQL error class 42
return ex;
}
throw new DatabaseException("Failed to validate statement", ex);
}
}

// To do - implement sqlcmd support
// https://github.com/embulk/embulk-output-jdbc/blob/07b6dfea0c5296c124328d2d17bdc48240f7d159/embulk-output-sqlserver/src/test/java/org/embulk/output/sqlserver/SQLServerTests.java

@Override
public void executeReadOnlyQuery(String sql, Consumer<JdbcResultSet> resultHandler)
throws NotReadOnlyException
Expand Down Expand Up @@ -102,10 +152,10 @@ TableReference statusTableReference()
String buildCreateTable()
{
return String.format(ENGLISH,
"IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '%s') " +
"BEGIN " +
"CREATE TABLE %s " +
"(query_id varchar(50) NOT NULL UNIQUE, created_at datetime2 NOT NULL, completed_at datetime2) " +
"IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '%s') \n" +
"BEGIN \n" +
"CREATE TABLE %s \n" +
"(query_id varchar(50) NOT NULL UNIQUE, created_at datetime2 NOT NULL, completed_at datetime2) \n" +
"END",
statusTableReference.getName(),
statusTableReference.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import org.immutables.value.Value;

import io.digdag.standards.operator.jdbc.AbstractJdbcConnectionConfig;
// logging
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Value.Immutable
public abstract class MssqlConnectionConfig
Expand All @@ -32,10 +35,13 @@ public abstract class MssqlConnectionConfig
public abstract boolean multiSubnetFailover();
public abstract Optional<String> applicationIntent();
public abstract Optional<String> failoverPartner();
// To do options implementations
// To do options implementations - only accept string not map
// https://github.com/embulk/embulk-output-jdbc/blob/07b6dfea0c5296c124328d2d17bdc48240f7d159/embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/ToStringMap.java
// public abstract Optional<ToStringMap> options();
public abstract Optional<String> options();

// logging
private static Logger logger = LoggerFactory.getLogger(MssqlConnectionConfig.class);

@VisibleForTesting
public static MssqlConnectionConfig configure(SecretProvider secrets, Config params)
{
Expand All @@ -57,9 +63,9 @@ public static MssqlConnectionConfig configure(SecretProvider secrets, Config par
.multiSubnetFailover(secrets.getSecretOptional("multiSubnetFailover").transform(Boolean::parseBoolean).or(() -> params.get("multiSubnetFailover", boolean.class, false)))
.applicationIntent(secrets.getSecretOptional("applicationIntent").or(params.getOptional("applicationIntent", String.class)))
// Mirroring failoverPartner
.failoverPartner(secrets.getSecretOptional("failoverPartner").or(params.getOptional("failoverPartner", String.class)))
.failoverPartner(secrets.getSecretOptional("failoverPartner").or(params.getOptional("failoverPartner", String.class)))
// options
//.options(secrets.getSecretOptional("options").or(params.getOptional("options", String.class, "{}")))
.options(secrets.getSecretOptional("options").or(params.getOptional("options", String.class)))
.build();
}

Expand Down Expand Up @@ -103,17 +109,22 @@ public Properties buildProperties()
props.setProperty("encrypt", "false");
}
if (multiSubnetFailover()) {
props.setProperty("multiSubnetFailover", "true");
props.setProperty("multiSubnetFailover", "true");
}
if (applicationIntent().isPresent()) {
props.setProperty("applicationIntent", applicationIntent().get());
props.setProperty("applicationIntent", applicationIntent().get());
}
if (failoverPartner().isPresent()) {
props.setProperty("failoverPartner", failoverPartner().get());
props.setProperty("failoverPartner", failoverPartner().get());
}
// if (options().isPresent()) {
// props.setProperty("options", options().get());
// }
if (options().isPresent()) {
String options = options().get();
String[] arrOptions = options.split(";");
for (int i=0; i < arrOptions.length; i++) {
String[] arrOption = arrOptions[i].split("=");
props.setProperty(arrOption[0], arrOption[1]);
}
}
props.setProperty("applicationName", "digdag");

return props;
Expand All @@ -131,19 +142,16 @@ public String url()
{
// https://github.com/embulk/embulk-output-jdbc/blob/07b6dfea0c5296c124328d2d17bdc48240f7d159/embulk-output-sqlserver/src/main/java/org/embulk/output/SQLServerOutputPlugin.java
// jdbc:sqlserver://localhost:1433;databaseName=master;user=sa;password=your_password
//return String.format(ENGLISH, "jdbc:%s://%s:%d;databaseName=%s", jdbcProtocolName(), host(), port(), database());
StringBuilder urlBuilder = new StringBuilder();
if (instanceName().isPresent()) {
urlBuilder.append(String.format("jdbc:%s://%s\\%s",
jdbcProtocolName(),host(), instanceName().get()));
urlBuilder.append(String.format("jdbc:%s://%s\\%s:%d",
jdbcProtocolName(),host(), instanceName().get(), port()));
} else {
urlBuilder.append(String.format("jdbc:%s://%s:%d",
jdbcProtocolName(),host(), port()));
}
// database is not optional in AbstractJdbcConnectionConfig
if (!database().equals("default")) {
urlBuilder.append(";databaseName=" + database());
}
urlBuilder.append(";databaseName=" + database());
if (integratedSecurity()) {
urlBuilder.append(";integratedSecurity=" + "true");
} else {
Expand All @@ -155,7 +163,9 @@ public String url()
throw new IllegalArgumentException("Field 'password' is not set.");
}
}
return String.format(ENGLISH, urlBuilder.toString());
String jdbcUrl = urlBuilder.toString();
//logger.info("url: {}", jdbcUrl);
return String.format(ENGLISH, jdbcUrl);
}

@Override
Expand Down
Loading

0 comments on commit cfe1f2d

Please sign in to comment.