Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added HBase timestamp support #297

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,8 @@ case class HBaseRelation(
requiredColumns.map(catalog.sMap.getField(_)).zipWithIndex
}
// Retrieve all columns we will return in the scanner
def splitRowKeyColumns(requiredColumns: Array[String]): (Seq[Field], Seq[Field]) = {
val (l, r) = requiredColumns.map(catalog.sMap.getField(_)).partition(_.cf == HBaseTableCatalog.rowKey)
def splitReservedColumns(requiredColumns: Array[String]): (Seq[Field], Seq[Field]) = {
val (l, r) = requiredColumns.map(catalog.sMap.getField(_)).partition(c => HBaseTableCatalog.isReserved(c.cf))
(l, r)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,26 @@ case class RowKey(k: String) {
}
}

// The hbase timestamp definition
// ts
case class Timestamp(k: String) {
val key: String = k
var fields: Seq[Field] = _
var varLength = false
def isPresent: Boolean = key != null
def length: Int = {
val tmp = fields.foldLeft(0) { case (x, y) =>
val yLen = if (y.length == -1) {
MaxLength
} else {
y.length
}
x + yLen
}
tmp
}
}

// The map between the column presented to Spark and the HBase field
case class SchemaMap(map: mutable.LinkedHashMap[String, Field]) {
def toFields = map.map { case (name, field) =>
Expand All @@ -164,16 +184,18 @@ case class HBaseTableCatalog(
val namespace: String,
val name: String,
row: RowKey,
ts: Timestamp,
sMap: SchemaMap,
tCoder: String,
coderSet: Set[String],
val numReg: Int) extends Logging {
def toDataType = StructType(sMap.toFields)
def getField(name: String) = sMap.getField(name)
def getRowKey: Seq[Field] = row.fields
def getTimestamp: Seq[Field] = ts.fields
def getPrimaryKey= row.keys(0)
def getColumnFamilies = {
sMap.fields.map(_.cf).filter(_ != HBaseTableCatalog.rowKey).toSeq.distinct
sMap.fields.map(_.cf).filter(HBaseTableCatalog.isNotReserved).toSeq.distinct
}

//this is required to read fromBytes column families and qualifiers
Expand Down Expand Up @@ -203,6 +225,32 @@ case class HBaseTableCatalog(
}
initRowKey()

def initTimestamp(): Unit = {
if (!ts.isPresent) {
return
}
val fields = sMap.fields.filter(_.cf == HBaseTableCatalog.timestamp)
ts.fields = fields.find(_.col == ts.key).toSeq

// If the tCoder is PrimitiveType, We only allowed there is one key at the end
// that is determined at runtime.
if (tCoder == SparkHBaseConf.PrimitiveType) {
if (!ts.fields.reverse.tail.exists(_.length == -1)) {
var start = 0
ts.fields.foreach { f =>
f.start = start
start += f.length
}
} else {
throw new Exception("PrimitiveType: only the last dimension of Timestamp is allowed to have " +
"varied length. You may want to add 'length' to the dimensions which have " +
"varied length or use dimensions which are scala/java primitive data " +
"types of fixed length.")
}
}
}
initTimestamp()

def validateCatalogDef() = {
if (!shcTableCoder.isRowKeySupported()) {
throw new UnsupportedOperationException(s"$tCoder does not support row key, and can not be " +
Expand All @@ -219,6 +267,12 @@ case class HBaseTableCatalog(
// If the row key of the table is composite, check if the coder supports composite key
if (row.fields.size > 1 && !shcTableCoder.isCompositeKeySupported)
throw new UnsupportedOperationException(s"$tCoder: Composite key is not supported")

if (ts.isPresent) {
// Check that the timestamp is type long
if (!ts.fields.exists(field => field.sType.get.equals("long")))
throw new UnsupportedOperationException(s"$tCoder: Only timestamp as long type is supported")
}
}
validateCatalogDef()
}
Expand All @@ -231,6 +285,8 @@ object HBaseTableCatalog {
val tableCatalog = "catalog"
// The row key with format key1:key2 specifying table row key
val rowKey = "rowkey"
// The hbase timestamp field as long
val timestamp = "timestamp"
// The key for hbase table whose value specify namespace and table name
val table = "table"
// The namespace of hbase table
Expand All @@ -250,6 +306,9 @@ object HBaseTableCatalog {
val tableCoder = "tableCoder"
// The version number of catalog
val cVersion = "version"
val reserved = Array(rowKey, timestamp)
def isReserved(columnName: String): Boolean = reserved.contains(columnName)
def isNotReserved(columnName: String): Boolean = !isReserved(columnName)
/**
* User provide table schema definition
* {"tablename":"name", "rowkey":"key1:key2",
Expand Down Expand Up @@ -294,8 +353,9 @@ object HBaseTableCatalog {
}
val numReg = parameters.get(newTable).map(x => x.toInt).getOrElse(0)
val rKey = RowKey(map.get(rowKey).get.asInstanceOf[String])
val ts = Timestamp(map.get(timestamp).orNull.asInstanceOf[String])

HBaseTableCatalog(nSpace, tName, rKey, SchemaMap(schemaMap), tCoder, coderSet, numReg)
HBaseTableCatalog(nSpace, tName, rKey, ts, SchemaMap(schemaMap), tCoder, coderSet, numReg)
}

/**
Expand Down Expand Up @@ -323,9 +383,11 @@ object HBaseTableCatalog {
val catalog = s"""{
|"table":{"namespace":"default", "name":"htable"},
|"rowkey":"key1:key2",
|"timestamp":"ts",
|"columns":{
|"col1":{"cf":"rowkey", "col":"key1", "type":"string"},
|"col2":{"cf":"rowkey", "col":"key2", "type":"double"},
|"col1":{"cf":"rowkey", "col":"key1", "type":"double"},
|"col2":{"cf":"rowkey", "col":"key2", "type":"string"},
|"colT":{"cf":"timestamp", "col":"ts", "type":"long"},
|"col3":{"cf":"cf1", "col":"col1", "avro":"schema1"},
|"col4":{"cf":"cf1", "col":"col2", "type":"binary"},
|"col5":{"cf":"cf1", "col":"col3", "type":"double"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[hbase] class HBaseTableScanRDD(
requiredColumns: Array[String],
filters: Array[Filter]) extends RDD[Row](relation.sqlContext.sparkContext, Nil) {
val outputs = StructType(requiredColumns.map(relation.schema(_))).toAttributes
val columnFields = relation.splitRowKeyColumns(requiredColumns)._2
val columnFields = relation.splitReservedColumns(requiredColumns)._2
private def sparkConf = SparkEnv.get.conf
val serializedToken = relation.serializedToken

Expand Down Expand Up @@ -125,6 +125,15 @@ private[hbase] class HBaseTableScanRDD(
}
}

val tsSeq = {
if (relation.catalog.ts.isPresent) {
val f = relation.catalog.getTimestamp.head
Seq((f, result.rawCells()(0).getTimestamp)).toMap
} else {
Seq.empty
}
}

import scala.collection.JavaConverters.mapAsScalaMapConverter
val scalaMap = result.getMap.asScala

Expand Down Expand Up @@ -165,7 +174,7 @@ private[hbase] class HBaseTableScanRDD(
}.toMap
}

val unioned = keySeq ++ valuesSeq
val unioned = keySeq ++ tsSeq ++ valuesSeq

// Return the row ordered by the requested order
val ordered = fields.map(unioned.getOrElse(_, null))
Expand All @@ -187,6 +196,15 @@ private[hbase] class HBaseTableScanRDD(
}
}

val tsSeq = {
if (relation.catalog.ts.isPresent) {
val f = relation.catalog.getTimestamp.head
Seq((f, result.rawCells()(0).getTimestamp)).toMap
} else {
Seq.empty
}
}

import scala.collection.JavaConverters.mapAsScalaMapConverter
val scalaMap = result.getNoVersionMap.asScala

Expand All @@ -206,7 +224,7 @@ private[hbase] class HBaseTableScanRDD(
}).toMap
}

val unioned = keySeq ++ valuesSeq
val unioned = keySeq ++ tsSeq ++ valuesSeq

// Return the row ordered by the requested order
val ordered = fields.map(unioned.getOrElse(_, null))
Expand All @@ -228,7 +246,16 @@ private[hbase] class HBaseTableScanRDD(
}
}

val valueSeq: Seq[Map[Long, (Field, Any)]] = fields.filter(!_.isRowKey).map { x =>
val tsSeq = {
if (relation.catalog.ts.isPresent) {
val f = relation.catalog.getTimestamp.head
Seq((f, result.rawCells()(0).getTimestamp)).toMap
Copy link

@regata regata Apr 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sbarnoud when there are multiple cells in a single column, each cell will have different timestamp. However, this code reads the timestamp only from the latest cell. Is this intended behavior?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plus tsSeq is not used inside buildRows

} else {
Seq.empty
}
}

val valueSeq: Seq[Map[Long, (Field, Any)]] = fields.filter(c => HBaseTableCatalog.isNotReserved(c.cf)).map { x =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about this?

    val tsField = relation.catalog.getTimestamp.head // ADDED
    val valueSeq: Seq[Map[Long, (Field, Any)]] = fields.filter(c => HBaseTableCatalog.isNotReserved(c.cf)).map { x =>
      import scala.collection.JavaConverters.asScalaBufferConverter
      val dataType = SHCDataTypeFactory.create(x)
      val kvs = result.getColumnCells(
        relation.catalog.shcTableCoder.toBytes(x.cf),
        relation.catalog.shcTableCoder.toBytes(x.col)).asScala

      kvs.map(kv => {
        val v = CellUtil.cloneValue(kv)
        (kv.getTimestamp, x -> dataType.fromBytes(v))
      }).toMap.withDefaultValue(x -> null)
    }

    val ts = valueSeq.foldLeft(Set.empty[Long])((acc, map) => acc ++ map.keySet)
    //we are loosing duplicate here, because we didn't support passing version (timestamp) to the row
    ts.map(version => {
      val tsSeq = Seq((tsField, version)).toMap // ADDED
      keySeq ++ tsSeq ++ valueSeq.map(_.apply(version)).toMap // MODIFIED
    }).map { unioned =>
      // Return the row ordered by the requested order
      Row.fromSeq(fields.map(unioned.get(_).getOrElse(null)))
    }

The only extra change that is required is to specify timestamp column when mergeToLatest is false

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad.In my use case, i have a single version, and i forget this point.
Yes, we should have the ts corresponding to the version!

import scala.collection.JavaConverters.asScalaBufferConverter
val dataType = SHCDataTypeFactory.create(x)
val kvs = result.getColumnCells(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ class AvroSourceKeySuite extends SHC with Logging{
.load()
}

test("populate table") {
override def beforeAll() {
super.beforeAll()
println("populate table")
//createTable(tableName, columnFamilies)
val sql = sqlContext
import sql.implicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ class AvroSourceSuite extends SHC with Logging{
.load()
}

test("populate table") {
override def beforeAll() {
super.beforeAll()
println("populate table")
//createTable(tableName, columnFamilies)
val sql = sqlContext
import sql.implicits._
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.sql.execution.datasources.hbase.Logging
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog

class CatalogWithTimestampSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll with Logging{
def catalog = s"""{
|"table":{"namespace":"default", "name":"table1", "tableCoder":"PrimitiveType"},
|"rowkey":"key1:key2",
|"timestamp":"ts",
|"columns":{
|"col00":{"cf":"rowkey", "col":"key1", "type":"string", "length":"6"},
|"col01":{"cf":"rowkey", "col":"key2", "type":"int"},
|"colT":{"cf":"timestamp", "col":"ts", "type":"long"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"}
|}
|}""".stripMargin

test("Catalog with timestamp field meta data check") {
val m = HBaseTableCatalog(Map(HBaseTableCatalog.tableCatalog->catalog))
assert(!m.row.fields.exists(_.length == -1))
assert(m.row.length == 10)
assert(!m.ts.fields.exists(_.length == -1))
assert(m.ts.length == 8)
assert(m.getColumnFamilies.intersect(HBaseTableCatalog.reserved).isEmpty)
}

test("Catalog with timestamp field should preserve the columns order") {
val m = HBaseTableCatalog(Map(HBaseTableCatalog.tableCatalog->catalog))
assert(m.toDataType.fields.map(_.name).sameElements(
Array("col00", "col01", "colT", "col1", "col2", "col3", "col4", "col5", "col6", "col8", "col7")))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ class CompositeKeySuite extends SHC with Logging {
.load()
}

test("populate table with composite key") {
override def beforeAll() {
super.beforeAll()
println("populate table with composite key")
//createTable(tableName, columnFamilies)
val sql = sqlContext
import sql.implicits._
Expand Down
6 changes: 4 additions & 2 deletions core/src/test/scala/org/apache/spark/sql/DataTypeSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ class DataTypeSuite extends SHC with Logging {
.load()
}

test("populate table") {
override def beforeAll() {
super.beforeAll()
println("populate table")
//createTable(tableName, columnFamilies)
val sql = sqlContext
import sql.implicits._
Expand Down Expand Up @@ -115,7 +117,7 @@ class DataTypeSuite extends SHC with Logging {
assert(s.count() == 21)
}

test("greaterequal than 0") {
test("greaterequal than 0") {
val df = withCatalog(catalog)
val s = df.filter($"col0" >= 0)
s.show
Expand Down
Loading