Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added HBase timestamp support #297

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

sbarnoud
Copy link

#210

What changes were proposed in this pull request?

Added support of HBase timestamp.

How was this patch tested?

New TU. Added control in TU that "reserved" column families (rowkey and timestamp) are not created.

val tsSeq = {
if (relation.catalog.ts.isPresent) {
val f = relation.catalog.getTimestamp.head
Seq((f, result.rawCells()(0).getTimestamp)).toMap
Copy link

@regata regata Apr 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sbarnoud when there are multiple cells in a single column, each cell will have different timestamp. However, this code reads the timestamp only from the latest cell. Is this intended behavior?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plus tsSeq is not used inside buildRows

}
}

val valueSeq: Seq[Map[Long, (Field, Any)]] = fields.filter(c => HBaseTableCatalog.isNotReserved(c.cf)).map { x =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about this?

    val tsField = relation.catalog.getTimestamp.head // ADDED
    val valueSeq: Seq[Map[Long, (Field, Any)]] = fields.filter(c => HBaseTableCatalog.isNotReserved(c.cf)).map { x =>
      import scala.collection.JavaConverters.asScalaBufferConverter
      val dataType = SHCDataTypeFactory.create(x)
      val kvs = result.getColumnCells(
        relation.catalog.shcTableCoder.toBytes(x.cf),
        relation.catalog.shcTableCoder.toBytes(x.col)).asScala

      kvs.map(kv => {
        val v = CellUtil.cloneValue(kv)
        (kv.getTimestamp, x -> dataType.fromBytes(v))
      }).toMap.withDefaultValue(x -> null)
    }

    val ts = valueSeq.foldLeft(Set.empty[Long])((acc, map) => acc ++ map.keySet)
    //we are loosing duplicate here, because we didn't support passing version (timestamp) to the row
    ts.map(version => {
      val tsSeq = Seq((tsField, version)).toMap // ADDED
      keySeq ++ tsSeq ++ valueSeq.map(_.apply(version)).toMap // MODIFIED
    }).map { unioned =>
      // Return the row ordered by the requested order
      Row.fromSeq(fields.map(unioned.get(_).getOrElse(null)))
    }

The only extra change that is required is to specify timestamp column when mergeToLatest is false

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad.In my use case, i have a single version, and i forget this point.
Yes, we should have the ts corresponding to the version!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants