Skip to content

Commit

Permalink
Update sheets api v4 (#4)
Browse files Browse the repository at this point in the history
* Migrate to the Google Sheets API v4

* Update README.md
potix2 authored Sep 13, 2016
1 parent c8190a0 commit 05804a2
Showing 9 changed files with 453 additions and 118 deletions.
20 changes: 14 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -4,14 +4,22 @@ Google Spreadsheets datasource for [SparkSQL and DataFrames](http://spark.apache

[![Build Status](https://travis-ci.org/potix2/spark-google-spreadsheets.svg?branch=master)](https://travis-ci.org/potix2/spark-google-spreadsheets)

## Notice

The latest version (0.4.0) breaks compatibility with previous version. You must
use a ** spreadsheetId ** to identify which spreadsheets is to be accessed or altered.
On older versions a spreadsheet name is used.

If you don't know spreadsheetId, please read the [Introduction to the Google Sheets API v4](https://developers.google.com/sheets/guides/concepts).

## Requirements

## Linking

Using SBT:

```
libraryDependenicies += "com.github.potix2" %% "spark-google-spreadsheets" % "0.3.1"
libraryDependenicies += "com.github.potix2" %% "spark-google-spreadsheets" % "0.4.0"
```

Using Maven:
@@ -20,7 +28,7 @@ Using Maven:
<dependency>
<groupId>com.github.potix2<groupId>
<artifactId>spark-google-spreadsheets_2.11</artifactId>
<version>0.3.1</version>
<version>0.4.0</version>
</dependency>
```

@@ -30,7 +38,7 @@ Using Maven:
CREATE TABLE cars
USING com.github.potix2.spark.google.spreadsheets
OPTIONS (
path "YourSpreadsheet/worksheet1",
path "<spreadsheetId>/worksheet1",
serviceAccountId "xxxxxx@developer.gserviceaccount.com",
credentialPath "/path/to/credentail.p12"
)
@@ -48,20 +56,20 @@ val df = sqlContext.read.
format("com.github.potix2.spark.google.spreadsheets").
option("serviceAccountId", "xxxxxx@developer.gserviceaccount.com").
option("credentialPath", "/path/to/credentail.p12").
load("YourSpreadsheet/worksheet1")
load("<spreadsheetId>/worksheet1")

// Saves a DataFrame to a new worksheet
df.write.
format("com.github.potix2.spark.google.spreadsheets").
option("serviceAccountId", "xxxxxx@developer.gserviceaccount.com").
option("credentialPath", "/path/to/credentail.p12").
save("YourSpreadsheet/newWorksheet")
save("<spreadsheetId>/newWorksheet")

```

## License

Copyright 2015, Katsunori Kanda
Copyright 2016, Katsunori Kanda

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

13 changes: 7 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -2,11 +2,11 @@ name := "spark-google-spreadsheets"

organization := "com.github.potix2"

scalaVersion := "2.11.7"
scalaVersion := "2.11.8"

crossScalaVersions := Seq("2.10.6", "2.11.7")
crossScalaVersions := Seq("2.10.6", "2.11.8")

version := "0.3.1-SNAPSHOT"
version := "0.4.0-SNAPSHOT"

spName := "potix2/spark-google-spreadsheets"

@@ -16,7 +16,7 @@ spIncludeMaven := true

spIgnoreProvided := true

sparkVersion := "1.5.0"
sparkVersion := "1.6.2"

val testSparkVersion = settingKey[String]("The version of Spark to test against.")

@@ -27,9 +27,10 @@ sparkComponents := Seq("sql")
libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-api" % "1.7.5" % "provided",
"org.scalatest" %% "scalatest" % "2.2.1" % "test",
("com.google.api-client" % "google-api-client" % "1.20.0").
("com.google.api-client" % "google-api-client" % "1.22.0").
exclude("com.google.guava", "guava-jdk5"),
"com.google.gdata" % "core" % "1.47.1"
"com.google.oauth-client" % "google-oauth-client-jetty" % "1.22.0",
"com.google.apis" % "google-api-services-sheets" % "v4-rev18-1.22.0"
)

libraryDependencies ++= Seq(
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@ package com.github.potix2.spark.google.spreadsheets

import java.io.File

import com.github.potix2.spark.google.spreadsheets.SparkSpreadsheetService.{SparkSpreadsheet, SparkWorksheet}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
@@ -44,23 +43,13 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr


override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
def createWorksheet(spreadsheet: SparkSpreadsheet, worksheetName: String)
(implicit context:SparkSpreadsheetService.SparkSpreadsheetContext): SparkWorksheet = {
val columns = data.schema.fieldNames
val worksheet = spreadsheet.addWorksheet(worksheetName, columns.length, data.count().toInt)
worksheet.insertHeaderRow(columns)

worksheet
}

val (spreadsheetName, worksheetName) = pathToSheetNames(parameters)
implicit val context = createSpreadsheetContext(parameters)
val spreadsheet = SparkSpreadsheetService.findSpreadsheet(spreadsheetName)
if(!spreadsheet.isDefined)
throw new RuntimeException(s"no such a spreadsheet: $spreadsheetName")

val worksheet = createWorksheet(spreadsheet.get, worksheetName)
data.collect().foreach(row => worksheet.insertRow(Util.convert(data.schema, row)))
spreadsheet.get.addWorksheet(worksheetName, data.schema, data.collect().toList, Util.toRowData)
createRelation(sqlContext, context, spreadsheetName, worksheetName, data.schema)
}

Original file line number Diff line number Diff line change
@@ -19,28 +19,34 @@ import java.net.URL
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential.Builder
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport
import com.google.api.client.http.javanet.NetHttpTransport
import com.google.api.client.json.jackson2.JacksonFactory
import com.google.gdata.client.Query
import com.google.gdata.client.spreadsheet.{SpreadsheetQuery, SpreadsheetService}
import com.google.gdata.data.spreadsheet._
import com.google.gdata.data.{IEntry, IFeed, PlainTextConstruct}
import com.google.api.services.sheets.v4.model._
import com.google.api.services.sheets.v4.{Sheets, SheetsScopes}
import org.apache.spark.sql.types.StructType

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._


object SparkSpreadsheetService {
private val SPREADSHEET_URL = new URL("https://spreadsheets.google.com/feeds/spreadsheets/private/full")
private val scopes = List("https://spreadsheets.google.com/feeds")
private val scopes = List(SheetsScopes.SPREADSHEETS)
private val APP_NAME = "spark-google-spreadsheets-1.0.0"
private val HTTP_TRANSPORT: NetHttpTransport = GoogleNetHttpTransport.newTrustedTransport()
private val JSON_FACTORY: JacksonFactory = JacksonFactory.getDefaultInstance()

case class SparkSpreadsheetContext(serviceAccountId: String, p12File: File) {
private val service = new SpreadsheetService(APP_NAME)
private val credential = authorize(serviceAccountId, p12File)
lazy val service =
new Sheets.Builder(HTTP_TRANSPORT, JSON_FACTORY, credential)
.setApplicationName(APP_NAME)
.build()

private def authorize(serviceAccountId: String, p12File: File): GoogleCredential = {
val credential = new Builder()
.setTransport(GoogleNetHttpTransport.newTrustedTransport())
.setJsonFactory(new JacksonFactory())
.setTransport(HTTP_TRANSPORT)
.setJsonFactory(JSON_FACTORY)
.setServiceAccountId(serviceAccountId)
.setServiceAccountPrivateKeyFromP12File(p12File)
.setServiceAccountScopes(scopes)
@@ -50,75 +56,200 @@ object SparkSpreadsheetService {
credential
}

def insert[E <: IEntry](feedUrl: URL, entry: E) = service.insert(feedUrl, entry)
def getFeed[F <: IFeed](feedUrl: URL, feedClass: Class[F]) = service.getFeed(feedUrl, feedClass)
def query[F <: IFeed](query: Query, feedClass: Class[F]) = service.query(query, feedClass)
def findSpreadsheet(spreadSheetId: String): SparkSpreadsheet =
SparkSpreadsheet(this, service.spreadsheets().get(spreadSheetId).execute())

service.setOAuth2Credentials(credential)
def query(spreadsheetId: String, range: String): ValueRange =
service.spreadsheets().values().get(spreadsheetId, range).execute()
}

case class SparkSpreadsheet(entry: SpreadsheetEntry) {
def addWorksheet(sheetName: String, colNum: Int, rowNum: Int)(implicit context: SparkSpreadsheetContext): SparkWorksheet = {
val worksheetEntry = new WorksheetEntry()
worksheetEntry.setTitle(new PlainTextConstruct(sheetName))
worksheetEntry.setColCount(colNum)
worksheetEntry.setRowCount(rowNum)
new SparkWorksheet(context.insert(entry.getWorksheetFeedUrl, worksheetEntry))
case class SparkSpreadsheet(context:SparkSpreadsheetContext, private var spreadsheet: Spreadsheet) {
def name: String = spreadsheet.getProperties.getTitle
def getWorksheets: Seq[SparkWorksheet] =
spreadsheet.getSheets.map(new SparkWorksheet(context, spreadsheet, _))

def nextSheetId: Integer =
getWorksheets.map(_.sheet.getProperties.getSheetId).max + 1

def addWorksheet(sheetName: String, colNum: Integer, rowNum: Integer): Unit = {
val addSheetRequest = new AddSheetRequest()
addSheetRequest.setProperties(
new SheetProperties()
.setSheetId(nextSheetId)
.setTitle(sheetName)
.setGridProperties(
new GridProperties()
.setColumnCount(colNum)
.setRowCount(rowNum)))

val requests = List(
new Request().setAddSheet(addSheetRequest)
)

context.service.spreadsheets().batchUpdate(spreadsheet.getSpreadsheetId,
new BatchUpdateSpreadsheetRequest()
.setRequests(requests)).execute()

spreadsheet = context.service.spreadsheets().get(spreadsheet.getSpreadsheetId).execute()
}

def findWorksheet(worksheetName: String)(implicit context: SparkSpreadsheetContext): Option[SparkWorksheet] =
worksheets.find(_.entry.getTitle.getPlainText == worksheetName)
def addWorksheet[T](sheetName: String, schema: StructType, data: List[T], extractor: T => RowData): Unit = {
val colNum = schema.fields.size
val rowNum = data.size + 1
val nextId = nextSheetId

val addSheetRequest = new AddSheetRequest()
addSheetRequest.setProperties(
new SheetProperties()
.setSheetId(nextId)
.setTitle(sheetName)
.setGridProperties(
new GridProperties()
.setColumnCount(colNum)
.setRowCount(rowNum)))

val headerValues: List[CellData] = schema.fields.map { field =>
new CellData()
.setUserEnteredValue(new ExtendedValue()
.setStringValue(field.name))
}.toList

val updateHeaderRequest = new UpdateCellsRequest()
.setStart(new GridCoordinate()
.setSheetId(nextId)
.setRowIndex(0)
.setColumnIndex(0))
.setRows(List(new RowData().setValues(headerValues)))
.setFields("userEnteredValue")

val updateRowsRequest = new UpdateCellsRequest()
.setStart(new GridCoordinate()
.setSheetId(nextId)
.setRowIndex(1)
.setColumnIndex(0))
.setRows(data.map(extractor))
.setFields("userEnteredValue")

val requests = List(
new Request().setAddSheet(addSheetRequest),
new Request().setUpdateCells(updateHeaderRequest),
new Request().setUpdateCells(updateRowsRequest)
)

context.service.spreadsheets().batchUpdate(spreadsheet.getSpreadsheetId,
new BatchUpdateSpreadsheetRequest()
.setRequests(requests)).execute()

spreadsheet = context.service.spreadsheets().get(spreadsheet.getSpreadsheetId).execute()
}

private def worksheets(implicit context: SparkSpreadsheetContext): Seq[SparkWorksheet] =
entry.getWorksheets.map(new SparkWorksheet(_))
}
def findWorksheet(worksheetName: String): Option[SparkWorksheet] = {
val worksheets: Seq[SparkWorksheet] = getWorksheets
worksheets.find(_.sheet.getProperties.getTitle == worksheetName)
}

case class SparkWorksheet(entry: WorksheetEntry) {
def insertHeaderRow(headerColumnNames: Seq[String])(implicit context: SparkSpreadsheetContext) = {
val cellFeed = context.getFeed(entry.getCellFeedUrl, classOf[CellFeed])
headerColumnNames.zipWithIndex.foreach { case(colName, i) => cellFeed.insert(new CellEntry(1, i + 1, colName)) }
def deleteWorksheet(worksheetName: String): Unit = {
val worksheet: Option[SparkWorksheet] = findWorksheet(worksheetName)
if(worksheet.isDefined) {
val request = new Request()
val sheetId = worksheet.get.sheet.getProperties.getSheetId
request.setDeleteSheet(new DeleteSheetRequest()
.setSheetId(sheetId))

context.service.spreadsheets().batchUpdate(spreadsheet.getSpreadsheetId,
new BatchUpdateSpreadsheetRequest().setRequests(List(request).asJava)).execute()

spreadsheet = context.service.spreadsheets().get(spreadsheet.getSpreadsheetId).execute()
}
}
}

def insertRow(values: Map[String, Object])(implicit context: SparkSpreadsheetContext) = {
val dataRow = new ListEntry()
values.foreach { case(title, value) => dataRow.getCustomElements.setValueLocal(title, value.toString) }
case class SparkWorksheet(context: SparkSpreadsheetContext, spreadsheet: Spreadsheet, sheet: Sheet) {
def name: String = sheet.getProperties.getTitle
lazy val values = {
val valueRange = context.query(spreadsheet.getSpreadsheetId, name)
if ( valueRange.getValues != null )
valueRange.getValues
else
List[java.util.List[Object]]().asJava
}

val listFeedUrl = entry.getListFeedUrl
context.insert(listFeedUrl, dataRow)
lazy val headers =
values.headOption.map { row => row.map(_.toString) }.getOrElse(List())

def updateCells[T](schema: StructType, data: List[T], extractor: T => RowData): Unit = {
val colNum = schema.fields.size
val rowNum = data.size + 2
val sheetId = sheet.getProperties.getSheetId

val updatePropertiesRequest = new UpdateSheetPropertiesRequest()
updatePropertiesRequest.setProperties(
new SheetProperties()
.setSheetId(sheetId)
.setGridProperties(
new GridProperties()
.setColumnCount(colNum)
.setRowCount(rowNum)))
.setFields("gridProperties(rowCount,columnCount)")

val headerValues: List[CellData] = schema.fields.map { field =>
new CellData()
.setUserEnteredValue(new ExtendedValue()
.setStringValue(field.name))
}.toList

val updateHeaderRequest = new UpdateCellsRequest()
.setStart(new GridCoordinate()
.setSheetId(sheetId)
.setRowIndex(0)
.setColumnIndex(0))
.setRows(List(new RowData().setValues(headerValues)))
.setFields("userEnteredValue")

val updateRowsRequest = new UpdateCellsRequest()
.setStart(new GridCoordinate()
.setSheetId(sheetId)
.setRowIndex(1)
.setColumnIndex(0))
.setRows(data.map(extractor))
.setFields("userEnteredValue")

val requests = List(
new Request().setUpdateSheetProperties(updatePropertiesRequest),
new Request().setUpdateCells(updateHeaderRequest),
new Request().setUpdateCells(updateRowsRequest)
)

context.service.spreadsheets().batchUpdate(spreadsheet.getSpreadsheetId,
new BatchUpdateSpreadsheetRequest()
.setRequests(requests)).execute()
}

def rows(implicit context: SparkSpreadsheetContext): Seq[Map[String, String]] =
context.getFeed(entry.getListFeedUrl, classOf[ListFeed]).getEntries.toList.map { e =>
val elems = e.getCustomElements
elems.getTags.map(tag => (tag, elems.getValue(tag))).toMap
def rows: Seq[Map[String, String]] =
if(values.isEmpty) {
Seq()
}
else {
values.tail.map { row => headers.zip(row.map(_.toString)).toMap }
}
}

/**
* create new context of spareadsheets for spark
* @param serviceAccountId
*
* @param serviceAccountId
* @param p12File
* @return
*/
def apply(serviceAccountId: String, p12File: File) = SparkSpreadsheetContext(serviceAccountId, p12File)

/**
* list of all spreadsheets
* @param context
* @return
*/
def allSheets()(implicit context:SparkSpreadsheetContext): Seq[SparkSpreadsheet] =
context.getFeed(SPREADSHEET_URL, classOf[SpreadsheetFeed]).getEntries.map(new SparkSpreadsheet(_))

/**
* find a sparedsheet by name
* @param spreadsheetName
* find a spreadsheet by name
*
* @param spreadsheetName
* @param context
* @return
*/
def findSpreadsheet(spreadsheetName: String)(implicit context: SparkSpreadsheetContext): Option[SparkSpreadsheet] = {
val query = new SpreadsheetQuery(SPREADSHEET_URL)
query.setTitleQuery(spreadsheetName)
context.query(query, classOf[SpreadsheetFeed]).getEntries.map(new SparkSpreadsheet(_)).headOption
}
def findSpreadsheet(spreadsheetName: String)(implicit context: SparkSpreadsheetContext): Option[SparkSpreadsheet] =
Some(context.findSpreadsheet(spreadsheetName))
}
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ case class SpreadsheetRelation protected[spark] (

private lazy val rows: Seq[Map[String, String]] =
findWorksheet(spreadsheetName, worksheetName)(context) match {
case Right(aWorksheet) => aWorksheet.rows(context)
case Right(aWorksheet) => aWorksheet.rows
case Left(e) => throw e
}

@@ -64,13 +64,11 @@ case class SpreadsheetRelation protected[spark] (
sys.error("Spreadsheet tables only support INSERT OVERWRITE for now.")
}

val columns = data.schema.fieldNames
findWorksheet(spreadsheetName, worksheetName)(context) match {
case Right(w) => {
w.insertHeaderRow(columns)(context)
data.collect().foreach(row => w.insertRow(Util.convert(data.schema, row))(context))
}
case Left(e) => throw e
case Right(w) =>
w.updateCells(data.schema, data.collect().toList, Util.toRowData)
case Left(e) =>
throw e
}
}

Original file line number Diff line number Diff line change
@@ -1,9 +1,32 @@
package com.github.potix2.spark.google.spreadsheets

import com.google.api.services.sheets.v4.model.{ExtendedValue, CellData, RowData}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataTypes, StructType}

import scala.collection.JavaConverters._

object Util {
def convert(schema: StructType, row: Row): Map[String, Object] =
schema.iterator.zipWithIndex.map { case (f, i) => f.name -> row(i).asInstanceOf[AnyRef]} toMap

def toRowData(row: Row): RowData =
new RowData().setValues(
row.schema.fields.zipWithIndex.map { case (f, i) =>
new CellData()
.setUserEnteredValue(
f.dataType match {
case DataTypes.StringType => new ExtendedValue().setStringValue(row.getString(i))
case DataTypes.LongType => new ExtendedValue().setNumberValue(row.getLong(i).toDouble)
case DataTypes.IntegerType => new ExtendedValue().setNumberValue(row.getInt(i).toDouble)
case DataTypes.FloatType => new ExtendedValue().setNumberValue(row.getFloat(i).toDouble)
case DataTypes.BooleanType => new ExtendedValue().setBoolValue(row.getBoolean(i))
case DataTypes.DateType => new ExtendedValue().setStringValue(row.getDate(i).toString)
case DataTypes.ShortType => new ExtendedValue().setNumberValue(row.getShort(i).toDouble)
case DataTypes.TimestampType => new ExtendedValue().setStringValue(row.getTimestamp(i).toString)
}
)
}.toList.asJava
)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.potix2.spark.google.spreadsheets

import java.io.File

import org.scalatest.{BeforeAndAfter, FlatSpec}

class SparkSpreadsheetServiceReadSuite extends FlatSpec with BeforeAndAfter {
private val serviceAccountId = "53797494708-ds5v22b6cbpchrv2qih1vg8kru098k9i@developer.gserviceaccount.com"
private val testCredentialPath = "src/test/resources/spark-google-spreadsheets-test-eb7b191d1e1d.p12"
private val TEST_SPREADSHEET_NAME = "SpreadsheetSuite"
private val TEST_SPREADSHEET_ID = "1H40ZeqXrMRxgHIi3XxmHwsPs2SgVuLUFbtaGcqCAk6c"

private val context: SparkSpreadsheetService.SparkSpreadsheetContext =
SparkSpreadsheetService.SparkSpreadsheetContext(serviceAccountId, new File(testCredentialPath))
private val spreadsheet: SparkSpreadsheetService.SparkSpreadsheet =
context.findSpreadsheet(TEST_SPREADSHEET_ID)

behavior of "A Spreadsheet"


it should "have a name" in {
assert(spreadsheet.name == TEST_SPREADSHEET_NAME)
}

behavior of "A worksheet"
it should "be None when a worksheet is missing" in {
assert(spreadsheet.findWorksheet("foo").isEmpty)
}

it should "be retrieved when the worksheet exists" in {
val worksheet = spreadsheet.findWorksheet("case2")
assert(worksheet.isDefined)
assert(worksheet.get.name == "case2")
assert(worksheet.get.headers == List("id", "firstname", "lastname", "email", "country", "ipaddress"))

val firstRow = worksheet.get.rows(0)
assert(firstRow == Map(
"id" -> "1",
"firstname" -> "Annie",
"lastname" -> "Willis",
"email" -> "awillis0@princeton.edu",
"country" -> "Burundi",
"ipaddress" -> "241.162.49.104"))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.potix2.spark.google.spreadsheets

import com.github.potix2.spark.google.spreadsheets.SparkSpreadsheetService.SparkSpreadsheet
import com.google.api.services.sheets.v4.model.{ExtendedValue, CellData, RowData}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.scalatest.{BeforeAndAfter, FlatSpec}

import scala.collection.JavaConverters._

class SparkSpreadsheetServiceWriteSuite extends FlatSpec with BeforeAndAfter {
private val serviceAccountId = "53797494708-ds5v22b6cbpchrv2qih1vg8kru098k9i@developer.gserviceaccount.com"
private val testCredentialPath = "src/test/resources/spark-google-spreadsheets-test-eb7b191d1e1d.p12"
private val TEST_SPREADSHEET_NAME = "WriteSuite"
private val TEST_SPREADSHEET_ID = "163Ja2OWUephWjIa-jpwTlvGcg8EJwCFCfxrF7aI117s"

private val context: SparkSpreadsheetService.SparkSpreadsheetContext =
SparkSpreadsheetService.SparkSpreadsheetContext(serviceAccountId, new java.io.File(testCredentialPath))

var spreadsheet: SparkSpreadsheet = null
var worksheetName: String = ""

def definedSchema: StructType = {
new StructType()
.add(new StructField("col_1", DataTypes.StringType))
.add(new StructField("col_2", DataTypes.LongType))
.add(new StructField("col_3", DataTypes.StringType))
}

case class Elem(col_1: String, col_2: Long, col_3: String)

def extractor(e: Elem): RowData =
new RowData().setValues(
List(
new CellData().setUserEnteredValue(
new ExtendedValue().setStringValue(e.col_1)
),
new CellData().setUserEnteredValue(
new ExtendedValue().setNumberValue(e.col_2.toDouble)
),
new CellData().setUserEnteredValue(
new ExtendedValue().setStringValue(e.col_3)
)
).asJava
)

before {
spreadsheet = context.findSpreadsheet(TEST_SPREADSHEET_ID)
worksheetName = scala.util.Random.alphanumeric.take(16).mkString
val data = List(
Elem("a", 1L, "x"),
Elem("b", 2L, "y"),
Elem("c", 3L, "z")
)

spreadsheet.addWorksheet(worksheetName, definedSchema, data, extractor)
}

after {
spreadsheet.deleteWorksheet(worksheetName)
}

behavior of "A Spreadsheet"
it should "find the new worksheet" in {
val newWorksheet = spreadsheet.findWorksheet(worksheetName)
assert(newWorksheet.isDefined)
assert(newWorksheet.get.name == worksheetName)
assert(newWorksheet.get.headers == Seq("col_1", "col_2", "col_3"))

val rows = newWorksheet.get.rows
assert(rows.head == Map("col_1" -> "a", "col_2" -> "1", "col_3" -> "x"))
}

behavior of "SparkWorksheet#updateCells"
it should "update values in a worksheet" in {
val newWorksheet = spreadsheet.findWorksheet(worksheetName)
assert(newWorksheet.isDefined)

val newData = List(
Elem("f", 5L, "yy"),
Elem("e", 4L, "xx"),
Elem("c", 3L, "z"),
Elem("b", 2L, "y"),
Elem("a", 1L, "x")
)

newWorksheet.get.updateCells(definedSchema, newData, extractor)

val rows = newWorksheet.get.rows
assert(rows.head == Map("col_1" -> "f", "col_2" -> "5", "col_3" -> "yy"))
assert(rows.last == Map("col_1" -> "a", "col_2" -> "1", "col_3" -> "x"))
}
}
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ package com.github.potix2.spark.google.spreadsheets

import java.io.File

import com.github.potix2.spark.google.spreadsheets.SparkSpreadsheetService.{SparkWorksheet, SparkSpreadsheetContext}
import com.github.potix2.spark.google.spreadsheets.SparkSpreadsheetService.SparkSpreadsheetContext
import org.apache.spark.SparkContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
@@ -26,6 +26,7 @@ import scala.util.Random
class SpreadsheetSuite extends FlatSpec with BeforeAndAfter {
private val serviceAccountId = "53797494708-ds5v22b6cbpchrv2qih1vg8kru098k9i@developer.gserviceaccount.com"
private val testCredentialPath = "src/test/resources/spark-google-spreadsheets-test-eb7b191d1e1d.p12"
private val TEST_SPREADSHEET_ID = "1H40ZeqXrMRxgHIi3XxmHwsPs2SgVuLUFbtaGcqCAk6c"

private var sqlContext: SQLContext = _
before {
@@ -38,29 +39,23 @@ class SpreadsheetSuite extends FlatSpec with BeforeAndAfter {

private[spreadsheets] def deleteWorksheet(spreadSheetName: String, worksheetName: String)
(implicit spreadSheetContext: SparkSpreadsheetContext): Unit = {
for {
s <- SparkSpreadsheetService.findSpreadsheet(spreadSheetName)
w <- s.findWorksheet(worksheetName)
} yield w.entry.delete()
}

private[spreadsheets] def addWorksheet(spreadSheetName: String, worksheetName: String)
(implicit spreadSheetContext: SparkSpreadsheetContext):Option[SparkWorksheet] = {
for {
s <- SparkSpreadsheetService.findSpreadsheet(spreadSheetName)
w <- Some(s.addWorksheet(worksheetName, 1000, 1000))
} yield w
SparkSpreadsheetService
.findSpreadsheet(spreadSheetName)
.foreach(_.deleteWorksheet(worksheetName))
}

def withNewEmptyWorksheet(testCode:(String) => Any): Unit = {
implicit val spreadSheetContext = SparkSpreadsheetService(serviceAccountId, new File(testCredentialPath))
val workSheetName = Random.alphanumeric.take(16).mkString
val worksheet = addWorksheet("SpreadsheetSuite", workSheetName)
try {
testCode(workSheetName)
}
finally {
worksheet.get.entry.delete()
val spreadsheet = SparkSpreadsheetService.findSpreadsheet(TEST_SPREADSHEET_ID)
spreadsheet.foreach { s =>
val workSheetName = Random.alphanumeric.take(16).mkString
s.addWorksheet(workSheetName, 1000, 1000)
try {
testCode(workSheetName)
}
finally {
s.deleteWorksheet(workSheetName)
}
}
}

@@ -71,24 +66,24 @@ class SpreadsheetSuite extends FlatSpec with BeforeAndAfter {
testCode(workSheetName)
}
finally {
deleteWorksheet("SpreadsheetSuite", workSheetName)
deleteWorksheet(TEST_SPREADSHEET_ID, workSheetName)
}
}

behavior of "A sheet"

it should "behave as a dataFrame" in {
it should "behave as a DataFrame" in {
val results = sqlContext.read
.option("serviceAccountId", serviceAccountId)
.option("credentialPath", testCredentialPath)
.spreadsheet("SpreadsheetSuite/case1")
.spreadsheet(s"$TEST_SPREADSHEET_ID/case1")
.select("col1")
.collect()

assert(results.size === 15)
}

it should "have a value as long" in {
it should "have a `long` value" in {
val schema = StructType(Seq(
StructField("col1", DataTypes.LongType),
StructField("col2", DataTypes.StringType),
@@ -99,7 +94,7 @@ class SpreadsheetSuite extends FlatSpec with BeforeAndAfter {
.option("serviceAccountId", serviceAccountId)
.option("credentialPath", testCredentialPath)
.schema(schema)
.spreadsheet("SpreadsheetSuite/case1")
.spreadsheet(s"$TEST_SPREADSHEET_ID/case1")
.select("col1", "col2", "col3")
.collect()

@@ -118,18 +113,18 @@ class SpreadsheetSuite extends FlatSpec with BeforeAndAfter {
val personsDF = sqlContext.createDataFrame(personsRDD, personsSchema)
}

"A dataFrame" should "save as a sheet" in new PersonDataFrame {
"A DataFrame" should "be saved as a sheet" in new PersonDataFrame {
import com.github.potix2.spark.google.spreadsheets._
withEmptyWorksheet { workSheetName =>
personsDF.write
.option("serviceAccountId", serviceAccountId)
.option("credentialPath", testCredentialPath)
.spreadsheet(s"SpreadsheetSuite/$workSheetName")
.spreadsheet(s"$TEST_SPREADSHEET_ID/$workSheetName")

val result = sqlContext.read
.option("serviceAccountId", serviceAccountId)
.option("credentialPath", testCredentialPath)
.spreadsheet(s"SpreadsheetSuite/$workSheetName")
.spreadsheet(s"$TEST_SPREADSHEET_ID/$workSheetName")
.collect()

assert(result.size == 3)
@@ -146,19 +141,19 @@ class SpreadsheetSuite extends FlatSpec with BeforeAndAfter {
|CREATE TEMPORARY TABLE people
|(id int, firstname string, lastname string)
|USING com.github.potix2.spark.google.spreadsheets
|OPTIONS (path "SpreadsheetSuite/$worksheetName", serviceAccountId "$serviceAccountId", credentialPath "$testCredentialPath")
|OPTIONS (path "$TEST_SPREADSHEET_ID/$worksheetName", serviceAccountId "$serviceAccountId", credentialPath "$testCredentialPath")
""".stripMargin.replaceAll("\n", " "))

assert(sqlContext.sql("SELECT * FROM people").collect().size == 0)
}
}

it should "be created from DDL with infered schema" in {
it should "be created from DDL with inferred schema" in {
sqlContext.sql(
s"""
|CREATE TEMPORARY TABLE SpreadsheetSuite
|USING com.github.potix2.spark.google.spreadsheets
|OPTIONS (path "SpreadsheetSuite/case2", serviceAccountId "$serviceAccountId", credentialPath "$testCredentialPath")
|OPTIONS (path "$TEST_SPREADSHEET_ID/case2", serviceAccountId "$serviceAccountId", credentialPath "$testCredentialPath")
""".stripMargin.replaceAll("\n", " "))

assert(sqlContext.sql("SELECT id, firstname, lastname FROM SpreadsheetSuite").collect().size == 10)
@@ -171,18 +166,45 @@ class SpreadsheetSuite extends FlatSpec with BeforeAndAfter {
|CREATE TEMPORARY TABLE accesslog
|(id string, firstname string, lastname string, email string, country string, ipaddress string)
|USING com.github.potix2.spark.google.spreadsheets
|OPTIONS (path "SpreadsheetSuite/$worksheetName", serviceAccountId "$serviceAccountId", credentialPath "$testCredentialPath")
|OPTIONS (path "$TEST_SPREADSHEET_ID/$worksheetName", serviceAccountId "$serviceAccountId", credentialPath "$testCredentialPath")
""".stripMargin.replaceAll("\n", " "))

sqlContext.sql(
s"""
|CREATE TEMPORARY TABLE SpreadsheetSuite
|USING com.github.potix2.spark.google.spreadsheets
|OPTIONS (path "SpreadsheetSuite/case2", serviceAccountId "$serviceAccountId", credentialPath "$testCredentialPath")
|OPTIONS (path "$TEST_SPREADSHEET_ID/case2", serviceAccountId "$serviceAccountId", credentialPath "$testCredentialPath")
""".stripMargin.replaceAll("\n", " "))

sqlContext.sql("INSERT OVERWRITE TABLE accesslog SELECT * FROM SpreadsheetSuite")
assert(sqlContext.sql("SELECT id, firstname, lastname FROM accesslog").collect().size == 10)
}
}

trait UnderscoreDataFrame {
val aSchema = StructType(List(
StructField("foo_bar", IntegerType, true)))
val aRows = Seq(Row(1), Row(2), Row(3))
val aRDD = sqlContext.sparkContext.parallelize(aRows)
val aDF = sqlContext.createDataFrame(aRDD, aSchema)
}

"The underscore" should "be used in a column name" in new UnderscoreDataFrame {
import com.github.potix2.spark.google.spreadsheets._
withEmptyWorksheet { workSheetName =>
aDF.write
.option("serviceAccountId", serviceAccountId)
.option("credentialPath", testCredentialPath)
.spreadsheet(s"$TEST_SPREADSHEET_ID/$workSheetName")

val result = sqlContext.read
.option("serviceAccountId", serviceAccountId)
.option("credentialPath", testCredentialPath)
.spreadsheet(s"$TEST_SPREADSHEET_ID/$workSheetName")
.collect()

assert(result.size == 3)
assert(result(0).getString(0) == "1")
}
}
}

0 comments on commit 05804a2

Please sign in to comment.