Skip to content

Commit

Permalink
Fix: read a value of the specified type
Browse files Browse the repository at this point in the history
  • Loading branch information
potix2 committed Mar 10, 2016
1 parent d15d0aa commit 1a8c80f
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 4 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ scalaVersion := "2.11.7"

crossScalaVersions := Seq("2.10.6", "2.11.7")

version := "0.3.0-SNAPSHOT"
version := "0.3.1-SNAPSHOT"

spName := "potix2/spark-google-spreadsheets"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.github.potix2.spark.google.spreadsheets

import com.github.potix2.spark.google.spreadsheets.SparkSpreadsheetService.SparkSpreadsheetContext
import com.github.potix2.spark.google.spreadsheets.util._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, TableScan}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
Expand Down Expand Up @@ -46,7 +47,14 @@ case class SpreadsheetRelation protected[spark] (
val aSchema = schema
sqlContext.sparkContext.makeRDD(rows).mapPartitions { iter =>
iter.map { m =>
Row.fromSeq(aSchema.fields.map(field => m(field.name)))
var index = 0
val rowArray = new Array[Any](aSchema.fields.length)
while(index < aSchema.fields.length) {
val field = aSchema.fields(index)
rowArray(index) = TypeCast.castTo(m(field.name), field.dataType, field.nullable)
index += 1
}
Row.fromSeq(rowArray)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.github.potix2.spark.google.spreadsheets.util

import java.math.BigDecimal
import java.sql.{Date, Timestamp}
import java.text.NumberFormat
import java.util.Locale

import org.apache.spark.sql.types._

import scala.util.Try

object TypeCast {

private[spreadsheets] def castTo(
datum: String,
castType: DataType,
nullable: Boolean = true
): Any = {
castType match {
case _: ByteType => datum.toByte
case _: ShortType => datum.toShort
case _: IntegerType => datum.toInt
case _: LongType => datum.toLong
case _: FloatType => Try(datum.toFloat)
.getOrElse(NumberFormat.getInstance(Locale.getDefault()).parse(datum).floatValue())
case _: DoubleType => Try(datum.toFloat)
.getOrElse(NumberFormat.getInstance(Locale.getDefault()).parse(datum).doubleValue())
case _: BooleanType => datum.toBoolean
case _: DecimalType => new BigDecimal(datum.replaceAll(",", ""))
case _: TimestampType => Timestamp.valueOf(datum)
case _: DateType => Date.valueOf(datum)
case _: StringType => datum
case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}")

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import java.io.File

import com.github.potix2.spark.google.spreadsheets.SparkSpreadsheetService.{SparkWorksheet, SparkSpreadsheetContext}
import org.apache.spark.SparkContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.scalatest.{BeforeAndAfter, FlatSpec}

Expand Down Expand Up @@ -75,7 +75,9 @@ class SpreadsheetSuite extends FlatSpec with BeforeAndAfter {
}
}

"A sheet" should "behave as a dataFrame" in {
behavior of "A sheet"

it should "behave as a dataFrame" in {
val results = sqlContext.read
.option("serviceAccountId", serviceAccountId)
.option("credentialPath", testCredentialPath)
Expand All @@ -86,6 +88,26 @@ class SpreadsheetSuite extends FlatSpec with BeforeAndAfter {
assert(results.size === 15)
}

it should "have a value as long" in {
val schema = StructType(Seq(
StructField("col1", DataTypes.LongType),
StructField("col2", DataTypes.StringType),
StructField("col3", DataTypes.StringType)
))

val results = sqlContext.read
.option("serviceAccountId", serviceAccountId)
.option("credentialPath", testCredentialPath)
.schema(schema)
.spreadsheet("SpreadsheetSuite/case1")
.select("col1", "col2", "col3")
.collect()

assert(results.head.getLong(0) === 1L)
assert(results.head.getString(1) === "2")
assert(results.head.getString(2) === "3")
}

trait PersonDataFrame {
val personsSchema = StructType(List(
StructField("id", IntegerType, true),
Expand Down

0 comments on commit 1a8c80f

Please sign in to comment.