Skip to content

Commit

Permalink
Merge pull request #164 from kmwtechnology/LC-536-add-JDBC-utils-for-…
Browse files Browse the repository at this point in the history
…type-handling

LC-536: added JDBCUtils for JDBC type handling
  • Loading branch information
Jozurf authored Aug 30, 2024
2 parents fe026a7 + 448502f commit 3377fed
Show file tree
Hide file tree
Showing 15 changed files with 589 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.kmwllc.lucille.core.ConnectorException;
import com.kmwllc.lucille.core.Document;
import com.kmwllc.lucille.core.Publisher;
import com.kmwllc.lucille.util.JDBCUtils;
import com.typesafe.config.Config;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -185,13 +186,13 @@ public void execute(Publisher publisher) throws ConnectorException {
String.format("Field name \"%s\" is reserved, please rename it or add it to the ignore list", fieldName));
}

Object fieldValue = rs.getObject(i);
if (fieldValue != null) {
try {
doc.setOrAdd(fieldName, fieldValue);
} catch (IllegalArgumentException e) {
log.warn("Error encountered while adding database object to Lucille document", e);
}
try {
// parse result into document
// can throw SQL Exception if column cannot be found or resultSet is closed
// will not add to document if fieldValue is null or if field value is unsupported type
JDBCUtils.parseResultToDoc(doc, rs, fieldName, i);
} catch(SQLException e) {
log.warn("Error encountered while processing resultSet", e);
}
}
if (!otherResults.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import com.kmwllc.lucille.core.Document;
import com.kmwllc.lucille.core.Stage;
import com.kmwllc.lucille.core.StageException;
import com.kmwllc.lucille.util.JDBCUtils;
import com.typesafe.config.Config;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.sql.*;
import java.time.Instant;
import java.util.ArrayList;
Expand All @@ -27,7 +27,6 @@ public class QueryDatabase extends Stage {
private String sql;
private List<String> keyFields;
private List<PreparedStatementParameterType> inputTypes;
private List<PreparedStatementParameterType> returnTypes;
private Map<String, Object> fieldMapping;
protected Connection connection = null;
private PreparedStatement preparedStatement;
Expand All @@ -38,7 +37,7 @@ public QueryDatabase(Config config) {
.withOptionalProperties("sql")
.withRequiredParents("fieldMapping")
.withRequiredProperties("driver", "connectionString", "jdbcUser", "jdbcPassword",
"keyFields", "inputTypes", "returnTypes"));
"keyFields", "inputTypes"));

driver = config.getString("driver");
connectionString = config.getString("connectionString");
Expand All @@ -52,11 +51,6 @@ public QueryDatabase(Config config) {
for (String type : inputTypeList) {
inputTypes.add(PreparedStatementParameterType.getType(type));
}
List<String> returnTypeList = config.getStringList("returnTypes");
returnTypes = new ArrayList<>();
for (String type : returnTypeList) {
returnTypes.add(PreparedStatementParameterType.getType(type));
}
}

@Override
Expand All @@ -71,10 +65,6 @@ public void start() throws StageException {
if (inputTypes.size() != keyFields.size()) {
throw new StageException("mismatch between types provided and keyfields provided");
}

if (returnTypes.size() != fieldMapping.size()) {
throw new StageException("mismatch between return types provided and field mapping provided");
}
}

@Override
Expand Down Expand Up @@ -117,40 +107,13 @@ public Iterator<Document> processDocument(Document doc) throws StageException {
throw new StageException("Type " + t + " not recognized");
}
}

ResultSet result = preparedStatement.executeQuery();
// now we need to iterate the results
while (result.next()) {

// Need the ID column from the RS.
int index = 0;
for (String key : fieldMapping.keySet()) {
String field = (String) fieldMapping.get(key);
PreparedStatementParameterType t = returnTypes.get(index);
switch (t) {
case STRING:
doc.addToField(field, result.getString(key));
break;
case INTEGER:
doc.addToField(field, result.getInt(key));
break;
case LONG:
doc.addToField(field, result.getLong(key));
break;
case DOUBLE:
doc.addToField(field, result.getDouble(key));
break;
case BOOLEAN:
doc.addToField(field, result.getBoolean(key));
break;
case DATE:
doc.addToField(field, result.getDate(key).toInstant());
break;
default:
throw new StageException("Type " + t + " not recognized");
}
index++;
}
// parse result into document
// can throw SQL Exception if column cannot be found or resultSet is closed
// will not add to document if fieldValue is null or if field value is unsupported type
JDBCUtils.parseResultToDoc(doc, result, fieldMapping);
}
} catch (SQLException e) {
throw new StageException("Error handling SQL statements", e);
Expand Down
132 changes: 132 additions & 0 deletions lucille-core/src/main/java/com/kmwllc/lucille/util/JDBCUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package com.kmwllc.lucille.util;

import com.kmwllc.lucille.core.Document;
import java.sql.Date;
import java.sql.JDBCType;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class JDBCUtils {

private static final Logger log = LogManager.getLogger(JDBCUtils.class);

public static void parseResultToDoc(Document doc, ResultSet rs, Map<String, Object> fieldMapping) throws SQLException {
ResultSetMetaData metadata = rs.getMetaData();
for (String key : fieldMapping.keySet()) {
String field = (String) fieldMapping.get(key);
int columnIndex = rs.findColumn(key);
int columnType = metadata.getColumnType(columnIndex);
typeHandling(doc, rs, field, columnIndex, columnType);
}
}

public static void parseResultToDoc(Document doc, ResultSet rs, String fieldName)
throws SQLException {
int columnIndex = rs.findColumn(fieldName);
parseResultToDoc(doc, rs, fieldName, columnIndex);
}

public static void parseResultToDoc(Document doc, ResultSet rs, String fieldName, int columnIndex)
throws SQLException {
ResultSetMetaData metadata = rs.getMetaData();
typeHandling(doc, rs, fieldName, columnIndex, metadata.getColumnType(columnIndex));
}

private static void typeHandling(Document doc, ResultSet rs, String fieldName, int columnIndex, int columnType)
throws SQLException {
switch (columnType) {
case Types.CLOB:
case Types.NCLOB:
case Types.VARCHAR:
case Types.CHAR:
case Types.LONGVARCHAR:
case Types.NCHAR:
case Types.NVARCHAR:
case Types.LONGNVARCHAR:
String str = rs.getString(columnIndex);
if (str != null) {
doc.setOrAdd(fieldName, str);
}
break;
case Types.TINYINT:
case Types.SMALLINT:
case Types.INTEGER:
int integer = rs.getInt(columnIndex);
// rs.getInt would return 0 if SQL value is null. This would to check that the field value is not null
if (!rs.wasNull()) {
doc.setOrAdd(fieldName, integer);
}
break;
case Types.BIGINT:
long longVal = rs.getLong(columnIndex);
if (!rs.wasNull()) {
doc.setOrAdd(fieldName,longVal);
}
break;
case Types.DECIMAL:
case Types.NUMERIC:
case Types.DOUBLE:
double doubleVal = rs.getDouble(columnIndex);
// rs.getDouble would return 0 if SQL value is null. This would to check that the field value is not null
if (!rs.wasNull()) {
doc.setOrAdd(fieldName, doubleVal);
}
break;
case Types.FLOAT:
case Types.REAL:
float floatVal = rs.getFloat(columnIndex);
// rs.getFloat would return 0 if SQL value is null. This would to check that the field value is not null
if (!rs.wasNull()) {
doc.setOrAdd(fieldName, floatVal);
}
break;
case Types.BOOLEAN:
case Types.BIT:
boolean boolVal = rs.getBoolean(columnIndex);
// rs.getBoolean would return false if SQL value is null. This would to check that the field value is not null
if (!rs.wasNull()) {
doc.setOrAdd(fieldName, boolVal);
}
break;
case Types.DATE:
// Currently saving date types as Strings, as conversion to an Instant may change the value given
// Date to Timestamp: sets nanoseconds & time to 0, e.g. 2024-07-30 -> 2024-07-30 00:00:00.0
// Timestamp to Instant: checks either DB server local time (which might be from JVM), and converts to UTC,
// e.g. 2024-07-30 00:00:00.0 UTC-4 -> 2024-07-30 04:00:00.0
Date dateVal = rs.getDate(columnIndex);
if (dateVal != null) {
doc.setOrAdd(fieldName, dateVal.toString());
}
break;
case Types.TIMESTAMP:
// saving timestamp as str to prevent conversion to UTC
// if TIMESTAMP = 2023-01-01T00:00:00Z when local time is UTC-4 in JVM or db,
// will be stored as 2023-01-01T04:00:00Z in Document if converted to Instant
Timestamp timestamp = rs.getTimestamp(columnIndex);
if (timestamp != null) {
doc.setOrAdd(fieldName, timestamp.toString());
}
break;
// case Types.TIMESTAMP_WITH_TIMEZONE:
// if TIMESTAMP_WITH_TIMEZONE = 2023-01-01T00:00:00-4, will be retrieved as 2023-01-01T04:00:00Z in getTimestamp()
// better not save it if it would be converted
case Types.BLOB:
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
byte[] bytes = rs.getBytes(columnIndex);
if (bytes != null) {
doc.setOrAdd(fieldName, bytes);
}
break;
default:
log.warn("SQL Type {} ({}) not supported", JDBCType.valueOf(columnType).getName(), columnType);
}
}
}
Loading

0 comments on commit 3377fed

Please sign in to comment.