From cd33a2224f76030bd05b105c7ec990dd67a01c1b Mon Sep 17 00:00:00 2001 From: sbarnoud060710 Date: Wed, 12 Dec 2018 20:10:55 +0100 Subject: [PATCH 1/2] added HBase timestamp support https://github.com/hortonworks-spark/shc/issues/210 --- .../datasources/hbase/HBaseRelation.scala | 4 +- .../datasources/hbase/HBaseTableCatalog.scala | 70 +++++++- .../datasources/hbase/HBaseTableScan.scala | 35 +++- .../apache/spark/sql/AvroSourceKeySuite.scala | 4 +- .../apache/spark/sql/AvroSourceSuite.scala | 4 +- .../spark/sql/CatalogWithTimestampSuite.scala | 58 ++++++ .../apache/spark/sql/CompositeKeySuite.scala | 4 +- .../org/apache/spark/sql/DataTypeSuite.scala | 6 +- .../sql/DataTypeWithTimestampSuite.scala | 169 ++++++++++++++++++ .../apache/spark/sql/DefaultSourceSuite.scala | 8 +- .../spark/sql/PhoenixCompositeKeySuite.scala | 4 +- .../org/apache/spark/sql/PhoenixSuite.scala | 4 +- .../test/scala/org/apache/spark/sql/SHC.scala | 33 ++-- 13 files changed, 369 insertions(+), 34 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/sql/CatalogWithTimestampSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/sql/DataTypeWithTimestampSuite.scala diff --git a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseRelation.scala b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseRelation.scala index bec41885..4bf2895d 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseRelation.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseRelation.scala @@ -305,8 +305,8 @@ case class HBaseRelation( requiredColumns.map(catalog.sMap.getField(_)).zipWithIndex } // Retrieve all columns we will return in the scanner - def splitRowKeyColumns(requiredColumns: Array[String]): (Seq[Field], Seq[Field]) = { - val (l, r) = requiredColumns.map(catalog.sMap.getField(_)).partition(_.cf == HBaseTableCatalog.rowKey) + def splitReservedColumns(requiredColumns: Array[String]): (Seq[Field], Seq[Field]) = { + val (l, r) = requiredColumns.map(catalog.sMap.getField(_)).partition(c => HBaseTableCatalog.isReserved(c.cf)) (l, r) } diff --git a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseTableCatalog.scala b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseTableCatalog.scala index 2c7403a6..63dea51b 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseTableCatalog.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseTableCatalog.scala @@ -148,6 +148,26 @@ case class RowKey(k: String) { } } +// The hbase timestamp definition +// ts +case class Timestamp(k: String) { + val key: String = k + var fields: Seq[Field] = _ + var varLength = false + def isPresent: Boolean = key != null + def length: Int = { + val tmp = fields.foldLeft(0) { case (x, y) => + val yLen = if (y.length == -1) { + MaxLength + } else { + y.length + } + x + yLen + } + tmp + } +} + // The map between the column presented to Spark and the HBase field case class SchemaMap(map: mutable.LinkedHashMap[String, Field]) { def toFields = map.map { case (name, field) => @@ -164,6 +184,7 @@ case class HBaseTableCatalog( val namespace: String, val name: String, row: RowKey, + ts: Timestamp, sMap: SchemaMap, tCoder: String, coderSet: Set[String], @@ -171,9 +192,10 @@ case class HBaseTableCatalog( def toDataType = StructType(sMap.toFields) def getField(name: String) = sMap.getField(name) def getRowKey: Seq[Field] = row.fields + def getTimestamp: Seq[Field] = ts.fields def getPrimaryKey= row.keys(0) def getColumnFamilies = { - sMap.fields.map(_.cf).filter(_ != HBaseTableCatalog.rowKey).toSeq.distinct + sMap.fields.map(_.cf).filter(HBaseTableCatalog.isNotReserved).toSeq.distinct } //this is required to read fromBytes column families and qualifiers @@ -203,6 +225,32 @@ case class HBaseTableCatalog( } initRowKey() + def initTimestamp(): Unit = { + if (!ts.isPresent) { + return + } + val fields = sMap.fields.filter(_.cf == HBaseTableCatalog.timestamp) + ts.fields = fields.find(_.col == ts.key).toSeq + + // If the tCoder is PrimitiveType, We only allowed there is one key at the end + // that is determined at runtime. + if (tCoder == SparkHBaseConf.PrimitiveType) { + if (!ts.fields.reverse.tail.exists(_.length == -1)) { + var start = 0 + ts.fields.foreach { f => + f.start = start + start += f.length + } + } else { + throw new Exception("PrimitiveType: only the last dimension of Timestamp is allowed to have " + + "varied length. You may want to add 'length' to the dimensions which have " + + "varied length or use dimensions which are scala/java primitive data " + + "types of fixed length.") + } + } + } + initTimestamp() + def validateCatalogDef() = { if (!shcTableCoder.isRowKeySupported()) { throw new UnsupportedOperationException(s"$tCoder does not support row key, and can not be " + @@ -219,6 +267,12 @@ case class HBaseTableCatalog( // If the row key of the table is composite, check if the coder supports composite key if (row.fields.size > 1 && !shcTableCoder.isCompositeKeySupported) throw new UnsupportedOperationException(s"$tCoder: Composite key is not supported") + + if (ts.isPresent) { + // Check that the timestamp is type long + if (!ts.fields.exists(field => field.sType.get.equals("long"))) + throw new UnsupportedOperationException(s"$tCoder: Only timestamp as long type is supported") + } } validateCatalogDef() } @@ -231,6 +285,8 @@ object HBaseTableCatalog { val tableCatalog = "catalog" // The row key with format key1:key2 specifying table row key val rowKey = "rowkey" + // The hbase timestamp field as long + val timestamp = "timestamp" // The key for hbase table whose value specify namespace and table name val table = "table" // The namespace of hbase table @@ -250,6 +306,9 @@ object HBaseTableCatalog { val tableCoder = "tableCoder" // The version number of catalog val cVersion = "version" + val reserved = Array(rowKey, timestamp) + def isReserved(columnName: String): Boolean = reserved.contains(columnName) + def isNotReserved(columnName: String): Boolean = !isReserved(columnName) /** * User provide table schema definition * {"tablename":"name", "rowkey":"key1:key2", @@ -294,8 +353,9 @@ object HBaseTableCatalog { } val numReg = parameters.get(newTable).map(x => x.toInt).getOrElse(0) val rKey = RowKey(map.get(rowKey).get.asInstanceOf[String]) + val ts = Timestamp(map.get(timestamp).orNull.asInstanceOf[String]) - HBaseTableCatalog(nSpace, tName, rKey, SchemaMap(schemaMap), tCoder, coderSet, numReg) + HBaseTableCatalog(nSpace, tName, rKey, ts, SchemaMap(schemaMap), tCoder, coderSet, numReg) } /** @@ -323,9 +383,11 @@ object HBaseTableCatalog { val catalog = s"""{ |"table":{"namespace":"default", "name":"htable"}, |"rowkey":"key1:key2", + |"timestamp":"ts", |"columns":{ - |"col1":{"cf":"rowkey", "col":"key1", "type":"string"}, - |"col2":{"cf":"rowkey", "col":"key2", "type":"double"}, + |"col1":{"cf":"rowkey", "col":"key1", "type":"double"}, + |"col2":{"cf":"rowkey", "col":"key2", "type":"string"}, + |"colT":{"cf":"timestamp", "col":"ts", "type":"long"}, |"col3":{"cf":"cf1", "col":"col1", "avro":"schema1"}, |"col4":{"cf":"cf1", "col":"col2", "type":"binary"}, |"col5":{"cf":"cf1", "col":"col3", "type":"double"}, diff --git a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseTableScan.scala b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseTableScan.scala index e94bfe43..0f598f9b 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseTableScan.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseTableScan.scala @@ -56,7 +56,7 @@ private[hbase] class HBaseTableScanRDD( requiredColumns: Array[String], filters: Array[Filter]) extends RDD[Row](relation.sqlContext.sparkContext, Nil) { val outputs = StructType(requiredColumns.map(relation.schema(_))).toAttributes - val columnFields = relation.splitRowKeyColumns(requiredColumns)._2 + val columnFields = relation.splitReservedColumns(requiredColumns)._2 private def sparkConf = SparkEnv.get.conf val serializedToken = relation.serializedToken @@ -125,6 +125,15 @@ private[hbase] class HBaseTableScanRDD( } } + val tsSeq = { + if (relation.catalog.ts.isPresent) { + val f = relation.catalog.getTimestamp.head + Seq((f, result.rawCells()(0).getTimestamp)).toMap + } else { + Seq.empty + } + } + import scala.collection.JavaConverters.mapAsScalaMapConverter val scalaMap = result.getMap.asScala @@ -165,7 +174,7 @@ private[hbase] class HBaseTableScanRDD( }.toMap } - val unioned = keySeq ++ valuesSeq + val unioned = keySeq ++ tsSeq ++ valuesSeq // Return the row ordered by the requested order val ordered = fields.map(unioned.getOrElse(_, null)) @@ -187,6 +196,15 @@ private[hbase] class HBaseTableScanRDD( } } + val tsSeq = { + if (relation.catalog.ts.isPresent) { + val f = relation.catalog.getTimestamp.head + Seq((f, result.rawCells()(0).getTimestamp)).toMap + } else { + Seq.empty + } + } + import scala.collection.JavaConverters.mapAsScalaMapConverter val scalaMap = result.getNoVersionMap.asScala @@ -206,7 +224,7 @@ private[hbase] class HBaseTableScanRDD( }).toMap } - val unioned = keySeq ++ valuesSeq + val unioned = keySeq ++ tsSeq ++ valuesSeq // Return the row ordered by the requested order val ordered = fields.map(unioned.getOrElse(_, null)) @@ -228,7 +246,16 @@ private[hbase] class HBaseTableScanRDD( } } - val valueSeq: Seq[Map[Long, (Field, Any)]] = fields.filter(!_.isRowKey).map { x => + val tsSeq = { + if (relation.catalog.ts.isPresent) { + val f = relation.catalog.getTimestamp.head + Seq((f, result.rawCells()(0).getTimestamp)).toMap + } else { + Seq.empty + } + } + + val valueSeq: Seq[Map[Long, (Field, Any)]] = fields.filter(c => HBaseTableCatalog.isNotReserved(c.cf)).map { x => import scala.collection.JavaConverters.asScalaBufferConverter val dataType = SHCDataTypeFactory.create(x) val kvs = result.getColumnCells( diff --git a/core/src/test/scala/org/apache/spark/sql/AvroSourceKeySuite.scala b/core/src/test/scala/org/apache/spark/sql/AvroSourceKeySuite.scala index 24db74f1..dc8f7454 100644 --- a/core/src/test/scala/org/apache/spark/sql/AvroSourceKeySuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/AvroSourceKeySuite.scala @@ -90,7 +90,9 @@ class AvroSourceKeySuite extends SHC with Logging{ .load() } - test("populate table") { + override def beforeAll() { + super.beforeAll() + println("populate table") //createTable(tableName, columnFamilies) val sql = sqlContext import sql.implicits._ diff --git a/core/src/test/scala/org/apache/spark/sql/AvroSourceSuite.scala b/core/src/test/scala/org/apache/spark/sql/AvroSourceSuite.scala index d65a7c44..dbe815eb 100644 --- a/core/src/test/scala/org/apache/spark/sql/AvroSourceSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/AvroSourceSuite.scala @@ -102,7 +102,9 @@ class AvroSourceSuite extends SHC with Logging{ .load() } - test("populate table") { + override def beforeAll() { + super.beforeAll() + println("populate table") //createTable(tableName, columnFamilies) val sql = sqlContext import sql.implicits._ diff --git a/core/src/test/scala/org/apache/spark/sql/CatalogWithTimestampSuite.scala b/core/src/test/scala/org/apache/spark/sql/CatalogWithTimestampSuite.scala new file mode 100644 index 00000000..cba2edf0 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/sql/CatalogWithTimestampSuite.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.execution.datasources.hbase.Logging +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} +import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog + +class CatalogWithTimestampSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll with Logging{ + def catalog = s"""{ + |"table":{"namespace":"default", "name":"table1", "tableCoder":"PrimitiveType"}, + |"rowkey":"key1:key2", + |"timestamp":"ts", + |"columns":{ + |"col00":{"cf":"rowkey", "col":"key1", "type":"string", "length":"6"}, + |"col01":{"cf":"rowkey", "col":"key2", "type":"int"}, + |"colT":{"cf":"timestamp", "col":"ts", "type":"long"}, + |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, + |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, + |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, + |"col4":{"cf":"cf4", "col":"col4", "type":"int"}, + |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, + |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, + |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}, + |"col7":{"cf":"cf7", "col":"col7", "type":"string"} + |} + |}""".stripMargin + + test("Catalog with timestamp field meta data check") { + val m = HBaseTableCatalog(Map(HBaseTableCatalog.tableCatalog->catalog)) + assert(!m.row.fields.exists(_.length == -1)) + assert(m.row.length == 10) + assert(!m.ts.fields.exists(_.length == -1)) + assert(m.ts.length == 8) + assert(m.getColumnFamilies.intersect(HBaseTableCatalog.reserved).isEmpty) + } + + test("Catalog with timestamp field should preserve the columns order") { + val m = HBaseTableCatalog(Map(HBaseTableCatalog.tableCatalog->catalog)) + assert(m.toDataType.fields.map(_.name).sameElements( + Array("col00", "col01", "colT", "col1", "col2", "col3", "col4", "col5", "col6", "col8", "col7"))) + } +} diff --git a/core/src/test/scala/org/apache/spark/sql/CompositeKeySuite.scala b/core/src/test/scala/org/apache/spark/sql/CompositeKeySuite.scala index aef4136a..57c2aa8c 100644 --- a/core/src/test/scala/org/apache/spark/sql/CompositeKeySuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/CompositeKeySuite.scala @@ -77,7 +77,9 @@ class CompositeKeySuite extends SHC with Logging { .load() } - test("populate table with composite key") { + override def beforeAll() { + super.beforeAll() + println("populate table with composite key") //createTable(tableName, columnFamilies) val sql = sqlContext import sql.implicits._ diff --git a/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala b/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala index e891bf76..944c85de 100644 --- a/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala @@ -73,7 +73,9 @@ class DataTypeSuite extends SHC with Logging { .load() } - test("populate table") { + override def beforeAll() { + super.beforeAll() + println("populate table") //createTable(tableName, columnFamilies) val sql = sqlContext import sql.implicits._ @@ -115,7 +117,7 @@ class DataTypeSuite extends SHC with Logging { assert(s.count() == 21) } - test("greaterequal than 0") { + test("greaterequal than 0") { val df = withCatalog(catalog) val s = df.filter($"col0" >= 0) s.show diff --git a/core/src/test/scala/org/apache/spark/sql/DataTypeWithTimestampSuite.scala b/core/src/test/scala/org/apache/spark/sql/DataTypeWithTimestampSuite.scala new file mode 100644 index 00000000..05679938 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/sql/DataTypeWithTimestampSuite.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.time.Instant + +import org.apache.spark.sql.execution.datasources.hbase.{HBaseTableCatalog, Logging} + +class DataTypeWithTimestampSuite extends SHC with Logging { + + var saveTS: Long = _ + + override def catalog = s"""{ + |"table":{"namespace":"default", "name":"table1", "tableCoder":"PrimitiveType"}, + |"rowkey":"key", + |"timestamp":"ts", + |"columns":{ + |"col0":{"cf":"rowkey", "col":"key", "type":"int"}, + |"colT":{"cf":"timestamp", "col":"ts", "type":"long"}, + |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, + |"col2":{"cf":"cf1", "col":"col2", "type":"double"}, + |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, + |"col4":{"cf":"cf4", "col":"col4", "type":"int"}, + |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, + |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, + |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, + |"col8":{"cf":"cf7", "col":"col8", "type":"tinyint"} + |} + |}""".stripMargin + + def withCatalog(cat: String): DataFrame = { + sqlContext + .read + .options(Map(HBaseTableCatalog.tableCatalog->cat)) + .format("org.apache.spark.sql.execution.datasources.hbase") + .load() + } + + override def beforeAll() { + super.beforeAll() + println("populate table") + //createTable(tableName, columnFamilies) + val sql = sqlContext + import sql.implicits._ + + val data = (0 until 32).map { i => + IntKeyRecord(i) + } + saveTS = Instant.now.toEpochMilli + sc.parallelize(data).toDF.write.options( + Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")) + .format("org.apache.spark.sql.execution.datasources.hbase") + .save() + } + + test("less than 0") { + val testTS = Instant.now.toEpochMilli + val df = withCatalog(catalog) + val s = df.filter($"col0" < 0) + s.show + assert(df.columns.contains("colT")) + assert(df.filter($"colT" < saveTS && $"colT" > testTS).count() == 0) + assert(s.count() == 16) + } + + test("lessequal than -10") { + val testTS = Instant.now.toEpochMilli + val df = withCatalog(catalog) + val s = df.filter($"col0" <= -10) + s.show + assert(df.columns.contains("colT")) + assert(df.filter($"colT" < saveTS && $"colT" > testTS).count() == 0) + assert(s.count() == 11) + } + + test("lessequal than -9") { + val testTS = Instant.now.toEpochMilli + val df = withCatalog(catalog) + val s = df.filter($"col0" <= -9) + s.show + assert(df.columns.contains("colT")) + assert(df.filter($"colT" < saveTS && $"colT" > testTS).count() == 0) + assert(s.count() == 12) + } + + test("greaterequal than -9") { + val testTS = Instant.now.toEpochMilli + val df = withCatalog(catalog) + val s = df.filter($"col0" >= -9) + s.show + assert(df.columns.contains("colT")) + assert(df.filter($"colT" < saveTS && $"colT" > testTS).count() == 0) + assert(s.count() == 21) + } + + test("greaterequal than 0") { + val testTS = Instant.now.toEpochMilli + val df = withCatalog(catalog) + val s = df.filter($"col0" >= 0) + s.show + assert(df.columns.contains("colT")) + assert(df.filter($"colT" < saveTS && $"colT" > testTS).count() == 0) + assert(s.count() == 16) + } + + test("greater than 10") { + val testTS = Instant.now.toEpochMilli + val df = withCatalog(catalog) + val s = df.filter($"col0" > 10) + s.show + assert(df.columns.contains("colT")) + assert(df.filter($"colT" < saveTS && $"colT" > testTS).count() == 0) + assert(s.count() == 10) + } + + test("and") { + val testTS = Instant.now.toEpochMilli + val df = withCatalog(catalog) + val s = df.filter($"col0" > -10 && $"col0" <= 10) + s.show + assert(df.columns.contains("colT")) + assert(df.filter($"colT" < saveTS && $"colT" > testTS).count() == 0) + assert(s.count() == 11) + } + + test("or") { + val testTS = Instant.now.toEpochMilli + val df = withCatalog(catalog) + val s = df.filter($"col0" <= -10 || $"col0" > 10) + s.show + assert(df.columns.contains("colT")) + assert(df.filter($"colT" < saveTS && $"colT" > testTS).count() == 0) + assert(s.count() == 21) + } + + test("all") { + val testTS = Instant.now.toEpochMilli + val df = withCatalog(catalog) + val s = df.filter($"col0" >= -100) + s.show + assert(df.columns.contains("colT")) + assert(df.filter($"colT" < saveTS && $"colT" > testTS).count() == 0) + assert(s.count() == 32) + } + + test("full query") { + val testTS = Instant.now.toEpochMilli + val df = withCatalog(catalog) + df.show + assert(df.columns.contains("colT")) + assert(df.filter($"colT" < saveTS && $"colT" > testTS).count() == 0) + assert(df.count() == 32) + } +} diff --git a/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala b/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala index f6b530b3..441c894f 100644 --- a/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/DefaultSourceSuite.scala @@ -91,7 +91,9 @@ class DefaultSourceSuite extends SHC with Logging { .save() } - test("populate table") { + override def beforeAll() { + super.beforeAll() + println("populate table") //createTable(tableName, columnFamilies) val sql = sqlContext import sql.implicits._ @@ -379,7 +381,7 @@ class DefaultSourceSuite extends SHC with Logging { assert(data.count() == 4) val rows = data.take(10) - assert(rows.count(_.getString(7) == null) == 2) - assert(rows.count(_.getString(7) != null) == 2) + assert(rows.count(_.getString(8) == null) == 2) + assert(rows.count(_.getString(8) != null) == 2) } } diff --git a/core/src/test/scala/org/apache/spark/sql/PhoenixCompositeKeySuite.scala b/core/src/test/scala/org/apache/spark/sql/PhoenixCompositeKeySuite.scala index a9f4851e..8444f874 100644 --- a/core/src/test/scala/org/apache/spark/sql/PhoenixCompositeKeySuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/PhoenixCompositeKeySuite.scala @@ -76,7 +76,9 @@ class PhoenixCompositeKeySuite extends SHC with Logging { .load() } - test("populate table with composite key") { + override def beforeAll() { + super.beforeAll() + println("populate table with composite key") //createTable(tableName, columnFamilies) val sql = sqlContext import sql.implicits._ diff --git a/core/src/test/scala/org/apache/spark/sql/PhoenixSuite.scala b/core/src/test/scala/org/apache/spark/sql/PhoenixSuite.scala index 064354fa..7b706f68 100644 --- a/core/src/test/scala/org/apache/spark/sql/PhoenixSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/PhoenixSuite.scala @@ -85,7 +85,9 @@ class PhoenixSuite extends SHC with Logging { .load() } - test("populate table") { + override def beforeAll() { + super.beforeAll() + println("populate table") val sql = sqlContext import sql.implicits._ diff --git a/core/src/test/scala/org/apache/spark/sql/SHC.scala b/core/src/test/scala/org/apache/spark/sql/SHC.scala index 7ca5a606..4b689aca 100644 --- a/core/src/test/scala/org/apache/spark/sql/SHC.scala +++ b/core/src/test/scala/org/apache/spark/sql/SHC.scala @@ -54,25 +54,30 @@ class SHC extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll with // private[spark] var columnFamilyStr = Bytes.toString(columnFamily) def defineCatalog(tName: String) = s"""{ - |"table":{"namespace":"default", "name":"$tName"}, - |"rowkey":"key", - |"columns":{ - |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, - |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, - |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, - |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, - |"col4":{"cf":"cf4", "col":"col4", "type":"int"}, - |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, - |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, - |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, - |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} - |} - |}""".stripMargin + |"table":{"namespace":"default", "name":"$tName"}, + |"rowkey":"key", + |"timestamp":"ts", + |"columns":{ + |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, + |"colT":{"cf":"timestamp", "col":"ts", "type":"long"}, + |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, + |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, + |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, + |"col4":{"cf":"cf4", "col":"col4", "type":"int"}, + |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, + |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, + |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, + |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} + |} + |}""".stripMargin @deprecated(since = "04.12.2017(dd/mm/year)", message = "use `defineCatalog` instead") def catalog = defineCatalog(tableName) override def beforeAll() { + if (sc != null) { + return + } val tempDir: File = Files.createTempDir tempDir.deleteOnExit htu.startMiniCluster From 6b46aaaf7397777670cd77aa8450431826b99a5f Mon Sep 17 00:00:00 2001 From: sbarnoud060710 Date: Thu, 13 Dec 2018 10:18:50 +0100 Subject: [PATCH 2/2] missing update of TU --- .../apache/spark/sql/MaxVersionsSuite.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/sql/MaxVersionsSuite.scala b/core/src/test/scala/org/apache/spark/sql/MaxVersionsSuite.scala index a814b159..8fa6e102 100644 --- a/core/src/test/scala/org/apache/spark/sql/MaxVersionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/MaxVersionsSuite.scala @@ -70,10 +70,10 @@ class MaxVersionsSuite extends SHC with Logging { val rows = twoVersions.take(10) assert(rows.size == 6) - assert(rows.count(_.getString(7).contains("ancient")) == 0) - assert(rows.count(_.getString(7).contains("old")) == 1) - assert(rows.count(_.getString(7).contains("new")) == 3) - assert(rows.count(_.getString(7).contains("latest")) == 2) + assert(rows.count(_.getString(8).contains("ancient")) == 0) + assert(rows.count(_.getString(8).contains("old")) == 1) + assert(rows.count(_.getString(8).contains("new")) == 3) + assert(rows.count(_.getString(8).contains("latest")) == 2) //we cannot take more then three because we create table with that size val threeVersions: DataFrame = withCatalog(catalog, Map( @@ -83,17 +83,17 @@ class MaxVersionsSuite extends SHC with Logging { val threeRows = threeVersions.take(10) assert(threeRows.size == 9) - assert(threeRows.count(_.getString(7).contains("ancient")) == 1) - assert(threeRows.count(_.getString(7).contains("old")) == 3) - assert(threeRows.count(_.getString(7).contains("new")) == 3) - assert(threeRows.count(_.getString(7).contains("latest")) == 2) + assert(threeRows.count(_.getString(8).contains("ancient")) == 1) + assert(threeRows.count(_.getString(8).contains("old")) == 3) + assert(threeRows.count(_.getString(8).contains("new")) == 3) + assert(threeRows.count(_.getString(8).contains("latest")) == 2) // Test specific only last versions val lastVersions: DataFrame = withCatalog(catalog, Map.empty) val lastRows = lastVersions.take(10) assert(lastRows.size == 3) - assert(lastRows.count(_.getString(7).contains("new")) == 1) - assert(lastRows.count(_.getString(7).contains("latest")) == 2) + assert(lastRows.count(_.getString(8).contains("new")) == 1) + assert(lastRows.count(_.getString(8).contains("latest")) == 2) } }