diff --git a/README.md b/README.md index 4161adb..c467571 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,14 @@ Google Spreadsheets datasource for [SparkSQL and DataFrames](http://spark.apache [![Build Status](https://travis-ci.org/potix2/spark-google-spreadsheets.svg?branch=master)](https://travis-ci.org/potix2/spark-google-spreadsheets) +## Notice + +The latest version (0.4.0) breaks compatibility with previous version. You must +use a ** spreadsheetId ** to identify which spreadsheets is to be accessed or altered. +On older versions a spreadsheet name is used. + +If you don't know spreadsheetId, please read the [Introduction to the Google Sheets API v4](https://developers.google.com/sheets/guides/concepts). + ## Requirements ## Linking @@ -11,7 +19,7 @@ Google Spreadsheets datasource for [SparkSQL and DataFrames](http://spark.apache Using SBT: ``` -libraryDependenicies += "com.github.potix2" %% "spark-google-spreadsheets" % "0.3.1" +libraryDependenicies += "com.github.potix2" %% "spark-google-spreadsheets" % "0.4.0" ``` Using Maven: @@ -20,7 +28,7 @@ Using Maven: com.github.potix2 spark-google-spreadsheets_2.11 - 0.3.1 + 0.4.0 ``` @@ -30,7 +38,7 @@ Using Maven: CREATE TABLE cars USING com.github.potix2.spark.google.spreadsheets OPTIONS ( - path "YourSpreadsheet/worksheet1", + path "/worksheet1", serviceAccountId "xxxxxx@developer.gserviceaccount.com", credentialPath "/path/to/credentail.p12" ) @@ -48,20 +56,20 @@ val df = sqlContext.read. format("com.github.potix2.spark.google.spreadsheets"). option("serviceAccountId", "xxxxxx@developer.gserviceaccount.com"). option("credentialPath", "/path/to/credentail.p12"). - load("YourSpreadsheet/worksheet1") + load("/worksheet1") // Saves a DataFrame to a new worksheet df.write. format("com.github.potix2.spark.google.spreadsheets"). option("serviceAccountId", "xxxxxx@developer.gserviceaccount.com"). option("credentialPath", "/path/to/credentail.p12"). - save("YourSpreadsheet/newWorksheet") + save("/newWorksheet") ``` ## License -Copyright 2015, Katsunori Kanda +Copyright 2016, Katsunori Kanda Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/build.sbt b/build.sbt index 8b30452..29f5b24 100644 --- a/build.sbt +++ b/build.sbt @@ -2,11 +2,11 @@ name := "spark-google-spreadsheets" organization := "com.github.potix2" -scalaVersion := "2.11.7" +scalaVersion := "2.11.8" -crossScalaVersions := Seq("2.10.6", "2.11.7") +crossScalaVersions := Seq("2.10.6", "2.11.8") -version := "0.3.1-SNAPSHOT" +version := "0.4.0-SNAPSHOT" spName := "potix2/spark-google-spreadsheets" @@ -16,7 +16,7 @@ spIncludeMaven := true spIgnoreProvided := true -sparkVersion := "1.5.0" +sparkVersion := "1.6.2" val testSparkVersion = settingKey[String]("The version of Spark to test against.") @@ -27,9 +27,10 @@ sparkComponents := Seq("sql") libraryDependencies ++= Seq( "org.slf4j" % "slf4j-api" % "1.7.5" % "provided", "org.scalatest" %% "scalatest" % "2.2.1" % "test", - ("com.google.api-client" % "google-api-client" % "1.20.0"). + ("com.google.api-client" % "google-api-client" % "1.22.0"). exclude("com.google.guava", "guava-jdk5"), - "com.google.gdata" % "core" % "1.47.1" + "com.google.oauth-client" % "google-oauth-client-jetty" % "1.22.0", + "com.google.apis" % "google-api-services-sheets" % "v4-rev18-1.22.0" ) libraryDependencies ++= Seq( diff --git a/src/main/scala/com/github/potix2/spark/google/spreadsheets/DefaultSource.scala b/src/main/scala/com/github/potix2/spark/google/spreadsheets/DefaultSource.scala index 3ff6bf3..8db7374 100644 --- a/src/main/scala/com/github/potix2/spark/google/spreadsheets/DefaultSource.scala +++ b/src/main/scala/com/github/potix2/spark/google/spreadsheets/DefaultSource.scala @@ -15,7 +15,6 @@ package com.github.potix2.spark.google.spreadsheets import java.io.File -import com.github.potix2.spark.google.spreadsheets.SparkSpreadsheetService.{SparkSpreadsheet, SparkWorksheet} import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} @@ -44,23 +43,13 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = { - def createWorksheet(spreadsheet: SparkSpreadsheet, worksheetName: String) - (implicit context:SparkSpreadsheetService.SparkSpreadsheetContext): SparkWorksheet = { - val columns = data.schema.fieldNames - val worksheet = spreadsheet.addWorksheet(worksheetName, columns.length, data.count().toInt) - worksheet.insertHeaderRow(columns) - - worksheet - } - val (spreadsheetName, worksheetName) = pathToSheetNames(parameters) implicit val context = createSpreadsheetContext(parameters) val spreadsheet = SparkSpreadsheetService.findSpreadsheet(spreadsheetName) if(!spreadsheet.isDefined) throw new RuntimeException(s"no such a spreadsheet: $spreadsheetName") - val worksheet = createWorksheet(spreadsheet.get, worksheetName) - data.collect().foreach(row => worksheet.insertRow(Util.convert(data.schema, row))) + spreadsheet.get.addWorksheet(worksheetName, data.schema, data.collect().toList, Util.toRowData) createRelation(sqlContext, context, spreadsheetName, worksheetName, data.schema) } diff --git a/src/main/scala/com/github/potix2/spark/google/spreadsheets/SparkSpreadsheetService.scala b/src/main/scala/com/github/potix2/spark/google/spreadsheets/SparkSpreadsheetService.scala index 2d03309..54db796 100644 --- a/src/main/scala/com/github/potix2/spark/google/spreadsheets/SparkSpreadsheetService.scala +++ b/src/main/scala/com/github/potix2/spark/google/spreadsheets/SparkSpreadsheetService.scala @@ -19,28 +19,34 @@ import java.net.URL import com.google.api.client.googleapis.auth.oauth2.GoogleCredential import com.google.api.client.googleapis.auth.oauth2.GoogleCredential.Builder import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport +import com.google.api.client.http.javanet.NetHttpTransport import com.google.api.client.json.jackson2.JacksonFactory -import com.google.gdata.client.Query -import com.google.gdata.client.spreadsheet.{SpreadsheetQuery, SpreadsheetService} -import com.google.gdata.data.spreadsheet._ -import com.google.gdata.data.{IEntry, IFeed, PlainTextConstruct} +import com.google.api.services.sheets.v4.model._ +import com.google.api.services.sheets.v4.{Sheets, SheetsScopes} +import org.apache.spark.sql.types.StructType import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ object SparkSpreadsheetService { private val SPREADSHEET_URL = new URL("https://spreadsheets.google.com/feeds/spreadsheets/private/full") - private val scopes = List("https://spreadsheets.google.com/feeds") + private val scopes = List(SheetsScopes.SPREADSHEETS) private val APP_NAME = "spark-google-spreadsheets-1.0.0" + private val HTTP_TRANSPORT: NetHttpTransport = GoogleNetHttpTransport.newTrustedTransport() + private val JSON_FACTORY: JacksonFactory = JacksonFactory.getDefaultInstance() case class SparkSpreadsheetContext(serviceAccountId: String, p12File: File) { - private val service = new SpreadsheetService(APP_NAME) private val credential = authorize(serviceAccountId, p12File) + lazy val service = + new Sheets.Builder(HTTP_TRANSPORT, JSON_FACTORY, credential) + .setApplicationName(APP_NAME) + .build() private def authorize(serviceAccountId: String, p12File: File): GoogleCredential = { val credential = new Builder() - .setTransport(GoogleNetHttpTransport.newTrustedTransport()) - .setJsonFactory(new JacksonFactory()) + .setTransport(HTTP_TRANSPORT) + .setJsonFactory(JSON_FACTORY) .setServiceAccountId(serviceAccountId) .setServiceAccountPrivateKeyFromP12File(p12File) .setServiceAccountScopes(scopes) @@ -50,75 +56,200 @@ object SparkSpreadsheetService { credential } - def insert[E <: IEntry](feedUrl: URL, entry: E) = service.insert(feedUrl, entry) - def getFeed[F <: IFeed](feedUrl: URL, feedClass: Class[F]) = service.getFeed(feedUrl, feedClass) - def query[F <: IFeed](query: Query, feedClass: Class[F]) = service.query(query, feedClass) + def findSpreadsheet(spreadSheetId: String): SparkSpreadsheet = + SparkSpreadsheet(this, service.spreadsheets().get(spreadSheetId).execute()) - service.setOAuth2Credentials(credential) + def query(spreadsheetId: String, range: String): ValueRange = + service.spreadsheets().values().get(spreadsheetId, range).execute() } - case class SparkSpreadsheet(entry: SpreadsheetEntry) { - def addWorksheet(sheetName: String, colNum: Int, rowNum: Int)(implicit context: SparkSpreadsheetContext): SparkWorksheet = { - val worksheetEntry = new WorksheetEntry() - worksheetEntry.setTitle(new PlainTextConstruct(sheetName)) - worksheetEntry.setColCount(colNum) - worksheetEntry.setRowCount(rowNum) - new SparkWorksheet(context.insert(entry.getWorksheetFeedUrl, worksheetEntry)) + case class SparkSpreadsheet(context:SparkSpreadsheetContext, private var spreadsheet: Spreadsheet) { + def name: String = spreadsheet.getProperties.getTitle + def getWorksheets: Seq[SparkWorksheet] = + spreadsheet.getSheets.map(new SparkWorksheet(context, spreadsheet, _)) + + def nextSheetId: Integer = + getWorksheets.map(_.sheet.getProperties.getSheetId).max + 1 + + def addWorksheet(sheetName: String, colNum: Integer, rowNum: Integer): Unit = { + val addSheetRequest = new AddSheetRequest() + addSheetRequest.setProperties( + new SheetProperties() + .setSheetId(nextSheetId) + .setTitle(sheetName) + .setGridProperties( + new GridProperties() + .setColumnCount(colNum) + .setRowCount(rowNum))) + + val requests = List( + new Request().setAddSheet(addSheetRequest) + ) + + context.service.spreadsheets().batchUpdate(spreadsheet.getSpreadsheetId, + new BatchUpdateSpreadsheetRequest() + .setRequests(requests)).execute() + + spreadsheet = context.service.spreadsheets().get(spreadsheet.getSpreadsheetId).execute() } - def findWorksheet(worksheetName: String)(implicit context: SparkSpreadsheetContext): Option[SparkWorksheet] = - worksheets.find(_.entry.getTitle.getPlainText == worksheetName) + def addWorksheet[T](sheetName: String, schema: StructType, data: List[T], extractor: T => RowData): Unit = { + val colNum = schema.fields.size + val rowNum = data.size + 1 + val nextId = nextSheetId + + val addSheetRequest = new AddSheetRequest() + addSheetRequest.setProperties( + new SheetProperties() + .setSheetId(nextId) + .setTitle(sheetName) + .setGridProperties( + new GridProperties() + .setColumnCount(colNum) + .setRowCount(rowNum))) + + val headerValues: List[CellData] = schema.fields.map { field => + new CellData() + .setUserEnteredValue(new ExtendedValue() + .setStringValue(field.name)) + }.toList + + val updateHeaderRequest = new UpdateCellsRequest() + .setStart(new GridCoordinate() + .setSheetId(nextId) + .setRowIndex(0) + .setColumnIndex(0)) + .setRows(List(new RowData().setValues(headerValues))) + .setFields("userEnteredValue") + + val updateRowsRequest = new UpdateCellsRequest() + .setStart(new GridCoordinate() + .setSheetId(nextId) + .setRowIndex(1) + .setColumnIndex(0)) + .setRows(data.map(extractor)) + .setFields("userEnteredValue") + + val requests = List( + new Request().setAddSheet(addSheetRequest), + new Request().setUpdateCells(updateHeaderRequest), + new Request().setUpdateCells(updateRowsRequest) + ) + + context.service.spreadsheets().batchUpdate(spreadsheet.getSpreadsheetId, + new BatchUpdateSpreadsheetRequest() + .setRequests(requests)).execute() + + spreadsheet = context.service.spreadsheets().get(spreadsheet.getSpreadsheetId).execute() + } - private def worksheets(implicit context: SparkSpreadsheetContext): Seq[SparkWorksheet] = - entry.getWorksheets.map(new SparkWorksheet(_)) - } + def findWorksheet(worksheetName: String): Option[SparkWorksheet] = { + val worksheets: Seq[SparkWorksheet] = getWorksheets + worksheets.find(_.sheet.getProperties.getTitle == worksheetName) + } - case class SparkWorksheet(entry: WorksheetEntry) { - def insertHeaderRow(headerColumnNames: Seq[String])(implicit context: SparkSpreadsheetContext) = { - val cellFeed = context.getFeed(entry.getCellFeedUrl, classOf[CellFeed]) - headerColumnNames.zipWithIndex.foreach { case(colName, i) => cellFeed.insert(new CellEntry(1, i + 1, colName)) } + def deleteWorksheet(worksheetName: String): Unit = { + val worksheet: Option[SparkWorksheet] = findWorksheet(worksheetName) + if(worksheet.isDefined) { + val request = new Request() + val sheetId = worksheet.get.sheet.getProperties.getSheetId + request.setDeleteSheet(new DeleteSheetRequest() + .setSheetId(sheetId)) + + context.service.spreadsheets().batchUpdate(spreadsheet.getSpreadsheetId, + new BatchUpdateSpreadsheetRequest().setRequests(List(request).asJava)).execute() + + spreadsheet = context.service.spreadsheets().get(spreadsheet.getSpreadsheetId).execute() + } } + } - def insertRow(values: Map[String, Object])(implicit context: SparkSpreadsheetContext) = { - val dataRow = new ListEntry() - values.foreach { case(title, value) => dataRow.getCustomElements.setValueLocal(title, value.toString) } + case class SparkWorksheet(context: SparkSpreadsheetContext, spreadsheet: Spreadsheet, sheet: Sheet) { + def name: String = sheet.getProperties.getTitle + lazy val values = { + val valueRange = context.query(spreadsheet.getSpreadsheetId, name) + if ( valueRange.getValues != null ) + valueRange.getValues + else + List[java.util.List[Object]]().asJava + } - val listFeedUrl = entry.getListFeedUrl - context.insert(listFeedUrl, dataRow) + lazy val headers = + values.headOption.map { row => row.map(_.toString) }.getOrElse(List()) + + def updateCells[T](schema: StructType, data: List[T], extractor: T => RowData): Unit = { + val colNum = schema.fields.size + val rowNum = data.size + 2 + val sheetId = sheet.getProperties.getSheetId + + val updatePropertiesRequest = new UpdateSheetPropertiesRequest() + updatePropertiesRequest.setProperties( + new SheetProperties() + .setSheetId(sheetId) + .setGridProperties( + new GridProperties() + .setColumnCount(colNum) + .setRowCount(rowNum))) + .setFields("gridProperties(rowCount,columnCount)") + + val headerValues: List[CellData] = schema.fields.map { field => + new CellData() + .setUserEnteredValue(new ExtendedValue() + .setStringValue(field.name)) + }.toList + + val updateHeaderRequest = new UpdateCellsRequest() + .setStart(new GridCoordinate() + .setSheetId(sheetId) + .setRowIndex(0) + .setColumnIndex(0)) + .setRows(List(new RowData().setValues(headerValues))) + .setFields("userEnteredValue") + + val updateRowsRequest = new UpdateCellsRequest() + .setStart(new GridCoordinate() + .setSheetId(sheetId) + .setRowIndex(1) + .setColumnIndex(0)) + .setRows(data.map(extractor)) + .setFields("userEnteredValue") + + val requests = List( + new Request().setUpdateSheetProperties(updatePropertiesRequest), + new Request().setUpdateCells(updateHeaderRequest), + new Request().setUpdateCells(updateRowsRequest) + ) + + context.service.spreadsheets().batchUpdate(spreadsheet.getSpreadsheetId, + new BatchUpdateSpreadsheetRequest() + .setRequests(requests)).execute() } - def rows(implicit context: SparkSpreadsheetContext): Seq[Map[String, String]] = - context.getFeed(entry.getListFeedUrl, classOf[ListFeed]).getEntries.toList.map { e => - val elems = e.getCustomElements - elems.getTags.map(tag => (tag, elems.getValue(tag))).toMap + def rows: Seq[Map[String, String]] = + if(values.isEmpty) { + Seq() + } + else { + values.tail.map { row => headers.zip(row.map(_.toString)).toMap } } } /** * create new context of spareadsheets for spark - * @param serviceAccountId + * + * @param serviceAccountId * @param p12File * @return */ def apply(serviceAccountId: String, p12File: File) = SparkSpreadsheetContext(serviceAccountId, p12File) /** - * list of all spreadsheets - * @param context - * @return - */ - def allSheets()(implicit context:SparkSpreadsheetContext): Seq[SparkSpreadsheet] = - context.getFeed(SPREADSHEET_URL, classOf[SpreadsheetFeed]).getEntries.map(new SparkSpreadsheet(_)) - - /** - * find a sparedsheet by name - * @param spreadsheetName + * find a spreadsheet by name + * + * @param spreadsheetName * @param context * @return */ - def findSpreadsheet(spreadsheetName: String)(implicit context: SparkSpreadsheetContext): Option[SparkSpreadsheet] = { - val query = new SpreadsheetQuery(SPREADSHEET_URL) - query.setTitleQuery(spreadsheetName) - context.query(query, classOf[SpreadsheetFeed]).getEntries.map(new SparkSpreadsheet(_)).headOption - } + def findSpreadsheet(spreadsheetName: String)(implicit context: SparkSpreadsheetContext): Option[SparkSpreadsheet] = + Some(context.findSpreadsheet(spreadsheetName)) } diff --git a/src/main/scala/com/github/potix2/spark/google/spreadsheets/SpreadsheetRelation.scala b/src/main/scala/com/github/potix2/spark/google/spreadsheets/SpreadsheetRelation.scala index ce0c1c0..f4bb0df 100644 --- a/src/main/scala/com/github/potix2/spark/google/spreadsheets/SpreadsheetRelation.scala +++ b/src/main/scala/com/github/potix2/spark/google/spreadsheets/SpreadsheetRelation.scala @@ -33,7 +33,7 @@ case class SpreadsheetRelation protected[spark] ( private lazy val rows: Seq[Map[String, String]] = findWorksheet(spreadsheetName, worksheetName)(context) match { - case Right(aWorksheet) => aWorksheet.rows(context) + case Right(aWorksheet) => aWorksheet.rows case Left(e) => throw e } @@ -64,13 +64,11 @@ case class SpreadsheetRelation protected[spark] ( sys.error("Spreadsheet tables only support INSERT OVERWRITE for now.") } - val columns = data.schema.fieldNames findWorksheet(spreadsheetName, worksheetName)(context) match { - case Right(w) => { - w.insertHeaderRow(columns)(context) - data.collect().foreach(row => w.insertRow(Util.convert(data.schema, row))(context)) - } - case Left(e) => throw e + case Right(w) => + w.updateCells(data.schema, data.collect().toList, Util.toRowData) + case Left(e) => + throw e } } diff --git a/src/main/scala/com/github/potix2/spark/google/spreadsheets/Util.scala b/src/main/scala/com/github/potix2/spark/google/spreadsheets/Util.scala index 02d349c..0f655cc 100644 --- a/src/main/scala/com/github/potix2/spark/google/spreadsheets/Util.scala +++ b/src/main/scala/com/github/potix2/spark/google/spreadsheets/Util.scala @@ -1,9 +1,32 @@ package com.github.potix2.spark.google.spreadsheets +import com.google.api.services.sheets.v4.model.{ExtendedValue, CellData, RowData} import org.apache.spark.sql.Row -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataTypes, StructType} + +import scala.collection.JavaConverters._ object Util { def convert(schema: StructType, row: Row): Map[String, Object] = schema.iterator.zipWithIndex.map { case (f, i) => f.name -> row(i).asInstanceOf[AnyRef]} toMap + + def toRowData(row: Row): RowData = + new RowData().setValues( + row.schema.fields.zipWithIndex.map { case (f, i) => + new CellData() + .setUserEnteredValue( + f.dataType match { + case DataTypes.StringType => new ExtendedValue().setStringValue(row.getString(i)) + case DataTypes.LongType => new ExtendedValue().setNumberValue(row.getLong(i).toDouble) + case DataTypes.IntegerType => new ExtendedValue().setNumberValue(row.getInt(i).toDouble) + case DataTypes.FloatType => new ExtendedValue().setNumberValue(row.getFloat(i).toDouble) + case DataTypes.BooleanType => new ExtendedValue().setBoolValue(row.getBoolean(i)) + case DataTypes.DateType => new ExtendedValue().setStringValue(row.getDate(i).toString) + case DataTypes.ShortType => new ExtendedValue().setNumberValue(row.getShort(i).toDouble) + case DataTypes.TimestampType => new ExtendedValue().setStringValue(row.getTimestamp(i).toString) + } + ) + }.toList.asJava + ) + } diff --git a/src/test/scala/com/github/potix2/spark/google/spreadsheets/SparkSpreadsheetServiceReadSuite.scala b/src/test/scala/com/github/potix2/spark/google/spreadsheets/SparkSpreadsheetServiceReadSuite.scala new file mode 100644 index 0000000..ac13f44 --- /dev/null +++ b/src/test/scala/com/github/potix2/spark/google/spreadsheets/SparkSpreadsheetServiceReadSuite.scala @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.potix2.spark.google.spreadsheets + +import java.io.File + +import org.scalatest.{BeforeAndAfter, FlatSpec} + +class SparkSpreadsheetServiceReadSuite extends FlatSpec with BeforeAndAfter { + private val serviceAccountId = "53797494708-ds5v22b6cbpchrv2qih1vg8kru098k9i@developer.gserviceaccount.com" + private val testCredentialPath = "src/test/resources/spark-google-spreadsheets-test-eb7b191d1e1d.p12" + private val TEST_SPREADSHEET_NAME = "SpreadsheetSuite" + private val TEST_SPREADSHEET_ID = "1H40ZeqXrMRxgHIi3XxmHwsPs2SgVuLUFbtaGcqCAk6c" + + private val context: SparkSpreadsheetService.SparkSpreadsheetContext = + SparkSpreadsheetService.SparkSpreadsheetContext(serviceAccountId, new File(testCredentialPath)) + private val spreadsheet: SparkSpreadsheetService.SparkSpreadsheet = + context.findSpreadsheet(TEST_SPREADSHEET_ID) + + behavior of "A Spreadsheet" + + + it should "have a name" in { + assert(spreadsheet.name == TEST_SPREADSHEET_NAME) + } + + behavior of "A worksheet" + it should "be None when a worksheet is missing" in { + assert(spreadsheet.findWorksheet("foo").isEmpty) + } + + it should "be retrieved when the worksheet exists" in { + val worksheet = spreadsheet.findWorksheet("case2") + assert(worksheet.isDefined) + assert(worksheet.get.name == "case2") + assert(worksheet.get.headers == List("id", "firstname", "lastname", "email", "country", "ipaddress")) + + val firstRow = worksheet.get.rows(0) + assert(firstRow == Map( + "id" -> "1", + "firstname" -> "Annie", + "lastname" -> "Willis", + "email" -> "awillis0@princeton.edu", + "country" -> "Burundi", + "ipaddress" -> "241.162.49.104")) + } +} diff --git a/src/test/scala/com/github/potix2/spark/google/spreadsheets/SparkSpreadsheetServiceWriteSuite.scala b/src/test/scala/com/github/potix2/spark/google/spreadsheets/SparkSpreadsheetServiceWriteSuite.scala new file mode 100644 index 0000000..9fab2a1 --- /dev/null +++ b/src/test/scala/com/github/potix2/spark/google/spreadsheets/SparkSpreadsheetServiceWriteSuite.scala @@ -0,0 +1,105 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.potix2.spark.google.spreadsheets + +import com.github.potix2.spark.google.spreadsheets.SparkSpreadsheetService.SparkSpreadsheet +import com.google.api.services.sheets.v4.model.{ExtendedValue, CellData, RowData} +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.scalatest.{BeforeAndAfter, FlatSpec} + +import scala.collection.JavaConverters._ + +class SparkSpreadsheetServiceWriteSuite extends FlatSpec with BeforeAndAfter { + private val serviceAccountId = "53797494708-ds5v22b6cbpchrv2qih1vg8kru098k9i@developer.gserviceaccount.com" + private val testCredentialPath = "src/test/resources/spark-google-spreadsheets-test-eb7b191d1e1d.p12" + private val TEST_SPREADSHEET_NAME = "WriteSuite" + private val TEST_SPREADSHEET_ID = "163Ja2OWUephWjIa-jpwTlvGcg8EJwCFCfxrF7aI117s" + + private val context: SparkSpreadsheetService.SparkSpreadsheetContext = + SparkSpreadsheetService.SparkSpreadsheetContext(serviceAccountId, new java.io.File(testCredentialPath)) + + var spreadsheet: SparkSpreadsheet = null + var worksheetName: String = "" + + def definedSchema: StructType = { + new StructType() + .add(new StructField("col_1", DataTypes.StringType)) + .add(new StructField("col_2", DataTypes.LongType)) + .add(new StructField("col_3", DataTypes.StringType)) + } + + case class Elem(col_1: String, col_2: Long, col_3: String) + + def extractor(e: Elem): RowData = + new RowData().setValues( + List( + new CellData().setUserEnteredValue( + new ExtendedValue().setStringValue(e.col_1) + ), + new CellData().setUserEnteredValue( + new ExtendedValue().setNumberValue(e.col_2.toDouble) + ), + new CellData().setUserEnteredValue( + new ExtendedValue().setStringValue(e.col_3) + ) + ).asJava + ) + + before { + spreadsheet = context.findSpreadsheet(TEST_SPREADSHEET_ID) + worksheetName = scala.util.Random.alphanumeric.take(16).mkString + val data = List( + Elem("a", 1L, "x"), + Elem("b", 2L, "y"), + Elem("c", 3L, "z") + ) + + spreadsheet.addWorksheet(worksheetName, definedSchema, data, extractor) + } + + after { + spreadsheet.deleteWorksheet(worksheetName) + } + + behavior of "A Spreadsheet" + it should "find the new worksheet" in { + val newWorksheet = spreadsheet.findWorksheet(worksheetName) + assert(newWorksheet.isDefined) + assert(newWorksheet.get.name == worksheetName) + assert(newWorksheet.get.headers == Seq("col_1", "col_2", "col_3")) + + val rows = newWorksheet.get.rows + assert(rows.head == Map("col_1" -> "a", "col_2" -> "1", "col_3" -> "x")) + } + + behavior of "SparkWorksheet#updateCells" + it should "update values in a worksheet" in { + val newWorksheet = spreadsheet.findWorksheet(worksheetName) + assert(newWorksheet.isDefined) + + val newData = List( + Elem("f", 5L, "yy"), + Elem("e", 4L, "xx"), + Elem("c", 3L, "z"), + Elem("b", 2L, "y"), + Elem("a", 1L, "x") + ) + + newWorksheet.get.updateCells(definedSchema, newData, extractor) + + val rows = newWorksheet.get.rows + assert(rows.head == Map("col_1" -> "f", "col_2" -> "5", "col_3" -> "yy")) + assert(rows.last == Map("col_1" -> "a", "col_2" -> "1", "col_3" -> "x")) + } +} diff --git a/src/test/scala/com/github/potix2/spark/google/spreadsheets/SpreadsheetSuite.scala b/src/test/scala/com/github/potix2/spark/google/spreadsheets/SpreadsheetSuite.scala index e1d30a5..6f11db7 100644 --- a/src/test/scala/com/github/potix2/spark/google/spreadsheets/SpreadsheetSuite.scala +++ b/src/test/scala/com/github/potix2/spark/google/spreadsheets/SpreadsheetSuite.scala @@ -15,7 +15,7 @@ package com.github.potix2.spark.google.spreadsheets import java.io.File -import com.github.potix2.spark.google.spreadsheets.SparkSpreadsheetService.{SparkWorksheet, SparkSpreadsheetContext} +import com.github.potix2.spark.google.spreadsheets.SparkSpreadsheetService.SparkSpreadsheetContext import org.apache.spark.SparkContext import org.apache.spark.sql.types._ import org.apache.spark.sql.{Row, SQLContext} @@ -26,6 +26,7 @@ import scala.util.Random class SpreadsheetSuite extends FlatSpec with BeforeAndAfter { private val serviceAccountId = "53797494708-ds5v22b6cbpchrv2qih1vg8kru098k9i@developer.gserviceaccount.com" private val testCredentialPath = "src/test/resources/spark-google-spreadsheets-test-eb7b191d1e1d.p12" + private val TEST_SPREADSHEET_ID = "1H40ZeqXrMRxgHIi3XxmHwsPs2SgVuLUFbtaGcqCAk6c" private var sqlContext: SQLContext = _ before { @@ -38,29 +39,23 @@ class SpreadsheetSuite extends FlatSpec with BeforeAndAfter { private[spreadsheets] def deleteWorksheet(spreadSheetName: String, worksheetName: String) (implicit spreadSheetContext: SparkSpreadsheetContext): Unit = { - for { - s <- SparkSpreadsheetService.findSpreadsheet(spreadSheetName) - w <- s.findWorksheet(worksheetName) - } yield w.entry.delete() - } - - private[spreadsheets] def addWorksheet(spreadSheetName: String, worksheetName: String) - (implicit spreadSheetContext: SparkSpreadsheetContext):Option[SparkWorksheet] = { - for { - s <- SparkSpreadsheetService.findSpreadsheet(spreadSheetName) - w <- Some(s.addWorksheet(worksheetName, 1000, 1000)) - } yield w + SparkSpreadsheetService + .findSpreadsheet(spreadSheetName) + .foreach(_.deleteWorksheet(worksheetName)) } def withNewEmptyWorksheet(testCode:(String) => Any): Unit = { implicit val spreadSheetContext = SparkSpreadsheetService(serviceAccountId, new File(testCredentialPath)) - val workSheetName = Random.alphanumeric.take(16).mkString - val worksheet = addWorksheet("SpreadsheetSuite", workSheetName) - try { - testCode(workSheetName) - } - finally { - worksheet.get.entry.delete() + val spreadsheet = SparkSpreadsheetService.findSpreadsheet(TEST_SPREADSHEET_ID) + spreadsheet.foreach { s => + val workSheetName = Random.alphanumeric.take(16).mkString + s.addWorksheet(workSheetName, 1000, 1000) + try { + testCode(workSheetName) + } + finally { + s.deleteWorksheet(workSheetName) + } } } @@ -71,24 +66,24 @@ class SpreadsheetSuite extends FlatSpec with BeforeAndAfter { testCode(workSheetName) } finally { - deleteWorksheet("SpreadsheetSuite", workSheetName) + deleteWorksheet(TEST_SPREADSHEET_ID, workSheetName) } } behavior of "A sheet" - it should "behave as a dataFrame" in { + it should "behave as a DataFrame" in { val results = sqlContext.read .option("serviceAccountId", serviceAccountId) .option("credentialPath", testCredentialPath) - .spreadsheet("SpreadsheetSuite/case1") + .spreadsheet(s"$TEST_SPREADSHEET_ID/case1") .select("col1") .collect() assert(results.size === 15) } - it should "have a value as long" in { + it should "have a `long` value" in { val schema = StructType(Seq( StructField("col1", DataTypes.LongType), StructField("col2", DataTypes.StringType), @@ -99,7 +94,7 @@ class SpreadsheetSuite extends FlatSpec with BeforeAndAfter { .option("serviceAccountId", serviceAccountId) .option("credentialPath", testCredentialPath) .schema(schema) - .spreadsheet("SpreadsheetSuite/case1") + .spreadsheet(s"$TEST_SPREADSHEET_ID/case1") .select("col1", "col2", "col3") .collect() @@ -118,18 +113,18 @@ class SpreadsheetSuite extends FlatSpec with BeforeAndAfter { val personsDF = sqlContext.createDataFrame(personsRDD, personsSchema) } - "A dataFrame" should "save as a sheet" in new PersonDataFrame { + "A DataFrame" should "be saved as a sheet" in new PersonDataFrame { import com.github.potix2.spark.google.spreadsheets._ withEmptyWorksheet { workSheetName => personsDF.write .option("serviceAccountId", serviceAccountId) .option("credentialPath", testCredentialPath) - .spreadsheet(s"SpreadsheetSuite/$workSheetName") + .spreadsheet(s"$TEST_SPREADSHEET_ID/$workSheetName") val result = sqlContext.read .option("serviceAccountId", serviceAccountId) .option("credentialPath", testCredentialPath) - .spreadsheet(s"SpreadsheetSuite/$workSheetName") + .spreadsheet(s"$TEST_SPREADSHEET_ID/$workSheetName") .collect() assert(result.size == 3) @@ -146,19 +141,19 @@ class SpreadsheetSuite extends FlatSpec with BeforeAndAfter { |CREATE TEMPORARY TABLE people |(id int, firstname string, lastname string) |USING com.github.potix2.spark.google.spreadsheets - |OPTIONS (path "SpreadsheetSuite/$worksheetName", serviceAccountId "$serviceAccountId", credentialPath "$testCredentialPath") + |OPTIONS (path "$TEST_SPREADSHEET_ID/$worksheetName", serviceAccountId "$serviceAccountId", credentialPath "$testCredentialPath") """.stripMargin.replaceAll("\n", " ")) assert(sqlContext.sql("SELECT * FROM people").collect().size == 0) } } - it should "be created from DDL with infered schema" in { + it should "be created from DDL with inferred schema" in { sqlContext.sql( s""" |CREATE TEMPORARY TABLE SpreadsheetSuite |USING com.github.potix2.spark.google.spreadsheets - |OPTIONS (path "SpreadsheetSuite/case2", serviceAccountId "$serviceAccountId", credentialPath "$testCredentialPath") + |OPTIONS (path "$TEST_SPREADSHEET_ID/case2", serviceAccountId "$serviceAccountId", credentialPath "$testCredentialPath") """.stripMargin.replaceAll("\n", " ")) assert(sqlContext.sql("SELECT id, firstname, lastname FROM SpreadsheetSuite").collect().size == 10) @@ -171,18 +166,45 @@ class SpreadsheetSuite extends FlatSpec with BeforeAndAfter { |CREATE TEMPORARY TABLE accesslog |(id string, firstname string, lastname string, email string, country string, ipaddress string) |USING com.github.potix2.spark.google.spreadsheets - |OPTIONS (path "SpreadsheetSuite/$worksheetName", serviceAccountId "$serviceAccountId", credentialPath "$testCredentialPath") + |OPTIONS (path "$TEST_SPREADSHEET_ID/$worksheetName", serviceAccountId "$serviceAccountId", credentialPath "$testCredentialPath") """.stripMargin.replaceAll("\n", " ")) sqlContext.sql( s""" |CREATE TEMPORARY TABLE SpreadsheetSuite |USING com.github.potix2.spark.google.spreadsheets - |OPTIONS (path "SpreadsheetSuite/case2", serviceAccountId "$serviceAccountId", credentialPath "$testCredentialPath") + |OPTIONS (path "$TEST_SPREADSHEET_ID/case2", serviceAccountId "$serviceAccountId", credentialPath "$testCredentialPath") """.stripMargin.replaceAll("\n", " ")) sqlContext.sql("INSERT OVERWRITE TABLE accesslog SELECT * FROM SpreadsheetSuite") assert(sqlContext.sql("SELECT id, firstname, lastname FROM accesslog").collect().size == 10) } } + + trait UnderscoreDataFrame { + val aSchema = StructType(List( + StructField("foo_bar", IntegerType, true))) + val aRows = Seq(Row(1), Row(2), Row(3)) + val aRDD = sqlContext.sparkContext.parallelize(aRows) + val aDF = sqlContext.createDataFrame(aRDD, aSchema) + } + + "The underscore" should "be used in a column name" in new UnderscoreDataFrame { + import com.github.potix2.spark.google.spreadsheets._ + withEmptyWorksheet { workSheetName => + aDF.write + .option("serviceAccountId", serviceAccountId) + .option("credentialPath", testCredentialPath) + .spreadsheet(s"$TEST_SPREADSHEET_ID/$workSheetName") + + val result = sqlContext.read + .option("serviceAccountId", serviceAccountId) + .option("credentialPath", testCredentialPath) + .spreadsheet(s"$TEST_SPREADSHEET_ID/$workSheetName") + .collect() + + assert(result.size == 3) + assert(result(0).getString(0) == "1") + } + } }