Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added spark 3.2 #35

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ name := "spark-google-spreadsheets"

organization := "com.github.potix2"

scalaVersion := "2.11.12"
scalaVersion := "2.12.13"

crossScalaVersions := Seq("2.11.12")
crossScalaVersions := Seq("2.12.13")

version := "0.6.4-SNAPSHOT"
version := "0.7.1-SNAPSHOT"

spName := "potix2/spark-google-spreadsheets"

Expand All @@ -16,7 +16,7 @@ spIncludeMaven := true

spIgnoreProvided := true

sparkVersion := "2.3.3"
sparkVersion := "3.2.0"

val testSparkVersion = settingKey[String]("The version of Spark to test against.")

Expand All @@ -26,7 +26,7 @@ sparkComponents := Seq("sql")

libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-api" % "1.7.5" % "provided",
"org.scalatest" %% "scalatest" % "2.2.1" % "test",
"org.scalatest" %% "scalatest" % "3.0.4" % "test",
("com.google.api-client" % "google-api-client" % "1.22.0").
exclude("com.google.guava", "guava-jdk5"),
"com.google.oauth-client" % "google-oauth-client-jetty" % "1.22.0",
Expand Down
6 changes: 3 additions & 3 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)
resolvers += Resolver.url("artifactory", url("https://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
resolvers += "Typesafe Repository" at "https://repo.typesafe.com/typesafe/releases/"

resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/"

resolvers += "Spark Package Main Repo" at "https://dl.bintray.com/spark-packages/maven"
resolvers += "Spark Package Main Repo" at "https://repos.spark-packages.org/"

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
val (spreadsheetName, worksheetName) = pathToSheetNames(parameters)
implicit val context = createSpreadsheetContext(parameters)
val spreadsheet = SparkSpreadsheetService.findSpreadsheet(spreadsheetName)
if(!spreadsheet.isDefined)
if (!spreadsheet.isDefined)
throw new RuntimeException(s"no such a spreadsheet: $spreadsheetName")

spreadsheet.get.addWorksheet(worksheetName, data.schema, data.collect().toList, Util.toRowData)
Expand All @@ -56,7 +56,12 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
private[spreadsheets] def createSpreadsheetContext(parameters: Map[String, String]) = {
val serviceAccountIdOption = parameters.get("serviceAccountId")
val credentialPath = parameters.getOrElse("credentialPath", DEFAULT_CREDENTIAL_PATH)
SparkSpreadsheetService(serviceAccountIdOption, new File(credentialPath))
val credentialData = parameters.get("credentialData")
if (credentialData != null && !credentialData.isEmpty) {
SparkSpreadsheetService(serviceAccountIdOption, credentialData)
} else {
SparkSpreadsheetService(serviceAccountIdOption, new File(credentialPath))
}
}

private[spreadsheets] def createRelation(sqlContext: SQLContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@ import org.apache.spark.sql.types.StructType

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
import com.google.api.services.sheets.v4.SheetsScopes
import java.io.FileInputStream
import java.io.IOException
import java.io.InputStream
import java.util

import org.apache.commons.io.IOUtils

object SparkSpreadsheetService {
private val SPREADSHEET_URL = new URL("https://spreadsheets.google.com/feeds/spreadsheets/private/full")
Expand All @@ -36,15 +43,21 @@ object SparkSpreadsheetService {
private val HTTP_TRANSPORT: NetHttpTransport = GoogleNetHttpTransport.newTrustedTransport()
private val JSON_FACTORY: JacksonFactory = JacksonFactory.getDefaultInstance()

case class SparkSpreadsheetContext(serviceAccountIdOption: Option[String], p12File: File) {
private val credential = authorize(serviceAccountIdOption, p12File)
case class SparkSpreadsheetContext(serviceAccountIdOption: Option[String], credentialData: Option[String], p12File: File) {
var credential: GoogleCredential = _;
if (p12File != null) {
credential = authorize(serviceAccountIdOption, p12File)
} else {
credential = getCredentialFromStream(IOUtils.toInputStream(credentialData.get, "UTF-8"))
}
lazy val service =
new Sheets.Builder(HTTP_TRANSPORT, JSON_FACTORY, credential)
.setApplicationName(APP_NAME)
.build()

private def authorize(serviceAccountIdOption: Option[String], p12File: File): GoogleCredential = {
val credential = serviceAccountIdOption
if (p12File.getName.endsWith("p12")) {
val credential = serviceAccountIdOption
.map {
new Builder()
.setTransport(HTTP_TRANSPORT)
Expand All @@ -55,19 +68,95 @@ object SparkSpreadsheetService {
.build()
}.getOrElse(GoogleCredential.getApplicationDefault.createScoped(scopes))

credential.refreshToken()
credential.refreshToken()
credential
} else {
getCredentials(p12File)
}
}


private def getCredentials(jsonFile: File) = {
val in = new FileInputStream(jsonFile)
try {
getCredentialFromStream(in)
} catch {
case e: IOException =>
throw new RuntimeException(e)
} finally if (in != null) in.close()

}

private def getCredentialFromStream(in: InputStream) = {
val credential = GoogleCredential.fromStream(in).createScoped(scopes)
credential.refreshToken();
credential
}

def findSpreadsheet(spreadSheetId: String): SparkSpreadsheet =

def findSpreadsheet(spreadSheetId: String) =
SparkSpreadsheet(this, service.spreadsheets().get(spreadSheetId).execute())

def query(spreadsheetId: String, range: String): ValueRange =
service.spreadsheets().values().get(spreadsheetId, range).execute()
def query(spreadsheetId: String, range: String): ValueRange = {
var temp = addSingleQuotesIfEndsWithNumber(range);
var x = service.spreadsheets().values().get(spreadsheetId, temp).execute()
if (x.size() > 0) {
var head = x.getValues.get(0);
var max = 0;


if (x.getValues != null) {
var listOfLists = x.getValues
if (listOfLists != null) {
for (i <- 0 until listOfLists.size()) {
var temp = listOfLists.get(i);
if (temp.isInstanceOf[java.util.List[_]]) {
max = temp.asInstanceOf[java.util.List[_]].size();
}
}
}
}

var index = 0;
var listIndex = 0;
head.map({
fieldName => {
if (fieldName == null || fieldName.toString.isEmpty) {
index = index + 1;
head.set(listIndex, "_col" + index);
}
listIndex = listIndex + 1;
}
})

if (head != null && head.size() >=0) {
for (i <- head.size() until max) {
index = index + 1;
head.add("_col" + index);
}
}
}
x
}
}

def endsWithNumber(s: String): Boolean = {
s match {
case _ if s.matches(".*\\d$") => true
case _ => false
}
}

case class SparkSpreadsheet(context:SparkSpreadsheetContext, private var spreadsheet: Spreadsheet) {
def addSingleQuotesIfEndsWithNumber(s: String): String = {
s match {
case _ if s.matches(".*\\d$") => s"'$s'"
case _ => s
}
}

case class SparkSpreadsheet(context: SparkSpreadsheetContext, private var spreadsheet: Spreadsheet) {
def name: String = spreadsheet.getProperties.getTitle

def getWorksheets: Seq[SparkWorksheet] =
spreadsheet.getSheets.map(new SparkWorksheet(context, spreadsheet, _))

Expand Down Expand Up @@ -153,7 +242,7 @@ object SparkSpreadsheetService {

def deleteWorksheet(worksheetName: String): Unit = {
val worksheet: Option[SparkWorksheet] = findWorksheet(worksheetName)
if(worksheet.isDefined) {
if (worksheet.isDefined) {
val request = new Request()
val sheetId = worksheet.get.sheet.getProperties.getSheetId
request.setDeleteSheet(new DeleteSheetRequest()
Expand All @@ -169,9 +258,10 @@ object SparkSpreadsheetService {

case class SparkWorksheet(context: SparkSpreadsheetContext, spreadsheet: Spreadsheet, sheet: Sheet) {
def name: String = sheet.getProperties.getTitle

lazy val values = {
val valueRange = context.query(spreadsheet.getSpreadsheetId, name)
if ( valueRange.getValues != null )
if (valueRange.getValues != null)
valueRange.getValues
else
List[java.util.List[Object]]().asJava
Expand Down Expand Up @@ -229,7 +319,7 @@ object SparkSpreadsheetService {
}

def rows: Seq[Map[String, String]] =
if(values.isEmpty) {
if (values.isEmpty) {
Seq()
}
else {
Expand All @@ -239,17 +329,20 @@ object SparkSpreadsheetService {

/**
* create new context of spareadsheets for spark
*
* @param serviceAccountId
*
* @param serviceAccountId
* @param p12File
* @return
*/
def apply(serviceAccountIdOption: Option[String], p12File: File) = SparkSpreadsheetContext(serviceAccountIdOption, p12File)
def apply(serviceAccountIdOption: Option[String], p12File: File) = SparkSpreadsheetContext(serviceAccountIdOption, null, p12File)

def apply(serviceAccountIdOption: Option[String], credentialData: Option[String]) = SparkSpreadsheetContext(serviceAccountIdOption, credentialData, null)


/**
* find a spreadsheet by name
*
* @param spreadsheetName
*
* @param spreadsheetName
* @param context
* @return
*/
Expand Down