Skip to content

Commit

Permalink
feat: add prefix api in LsmStorageInner
Browse files Browse the repository at this point in the history
  • Loading branch information
Leibnizhu committed May 7, 2024
1 parent 8090271 commit 5aabd66
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 70 deletions.
27 changes: 21 additions & 6 deletions src/main/scala/io/github/leibnizhu/tinylsm/Key.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.github.leibnizhu.tinylsm

import io.github.leibnizhu.tinylsm.utils.{Bound, Excluded, Included, Unbounded}

import java.util
import scala.util.hashing.MurmurHash3

Expand All @@ -16,12 +18,21 @@ trait Key {

def keyHash(): Int = MurmurHash3.seqHash(this.bytes)

protected def nextKeyBytes(): Array[Byte] = if (bytes.last == Byte.MaxValue) {
Array()
} else {
val newBytes = bytes.clone()
newBytes(newBytes.length - 1) = (newBytes.last + 1).toByte
newBytes
def prefixUpperEdge(): Bound = {
val lastAddIndex = bytes.lastIndexWhere(_ != Byte.MaxValue)
if (lastAddIndex > 0) {
val newBytes = bytes.clone()
newBytes(lastAddIndex) = (newBytes(lastAddIndex) + 1).toByte
Excluded(MemTableKey.withEndTs(newBytes))
} else {
Unbounded()
}
}

def prefixRange(): (Bound,Bound) = {
val lower = Included(MemTableKey.withBeginTs(this))
val upper = prefixUpperEdge()
(lower, upper)
}
}

Expand All @@ -38,4 +49,8 @@ case class RawKey(bytes: Array[Byte]) extends Comparable[RawKey] with Key {
override def rawKey(): RawKey = this

override def toString: String = new String(bytes)
}

object RawKey {
def fromString(str: String): RawKey = RawKey(str.getBytes)
}
53 changes: 45 additions & 8 deletions src/main/scala/io/github/leibnizhu/tinylsm/LsmStorageInner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ private[tinylsm] case class LsmStorageInner(
val compactionFilters: ConcurrentLinkedDeque[CompactionFilter] = new ConcurrentLinkedDeque()
) {
private val log = LoggerFactory.getLogger(this.getClass)
private val prefixBloom = PrefixBloom.fromConfig(this.options)

/**
* 按key获取
Expand Down Expand Up @@ -315,7 +314,8 @@ private[tinylsm] case class LsmStorageInner(
// 倒序,从早到新遍历所有已冻结的memtable进行flush操作
for (flushMemTable <- state.immutableMemTables.reverse) {
// 构建SST、写入SST文件
val builder = SsTableBuilder(options.blockSize, SsTableCompressor.create(options.compressorOptions), prefixBloom)
val builder = SsTableBuilder(options.blockSize, SsTableCompressor.create(options.compressorOptions),
PrefixBloomBuilder.fromConfig(options))
flushMemTable.flush(builder)
val sstId = flushMemTable.id
val sst = builder.build(sstId, Some(blockCache), fileOfSst(sstId))
Expand Down Expand Up @@ -349,21 +349,21 @@ private[tinylsm] case class LsmStorageInner(
val snapshot = state.read(_.copy())
// MemTable 部分的迭代器
val memTableIters = (snapshot.memTable :: snapshot.immutableMemTables)
.map(mt => mt.scan(Bound.withBeginTs(lower), Bound.withEndTs(upper)))
.map(_.scan(Bound.withBeginTs(lower), Bound.withEndTs(upper)))
val memTablesIter = MergeIterator(memTableIters)

// L0 SST 部分的迭代器
val ssTableIters = snapshot.l0SsTables.map(snapshot.ssTables(_))
// 过滤key范围可能包含当前scan范围的sst,减少IO
.filter(sst => sst.containsRange(lower, upper))
.map(sst => SsTableIterator.createByLowerBound(sst, lower))
.filter(_.containsRange(lower, upper))
.map(SsTableIterator.createByLowerBound(_, lower))
val l0ssTablesIter = MergeIterator(ssTableIters)

// levels SST 部分的迭代器
val levelIters = snapshot.levels.map((_, levelSstIds) => {
val levelSsts = levelSstIds
.map(snapshot.ssTables(_))
.filter(sst => sst.containsRange(lower, upper))
.filter(_.containsRange(lower, upper))
SstConcatIterator.createByLowerBound(levelSsts, lower)
})
val levelTablesIter = MergeIterator(levelIters)
Expand All @@ -381,6 +381,41 @@ private[tinylsm] case class LsmStorageInner(
FusedIterator(LsmIterator(mergedIter, upper, readTs))
}



def prefix(prefix: Key): StorageIterator[RawKey] = this.mvcc match
case None => prefixWithTs(prefix, TS_RANGE_END)
case Some(mvcc) => mvcc.newTxn(this.clone(), options.serializable, true).prefix(prefix)

def prefixWithTs(prefix: Key, readTs: Long): FusedIterator[RawKey] = {
val snapshot = state.read(_.copy())
// MemTable 部分的迭代器
val memTableIters = (snapshot.memTable :: snapshot.immutableMemTables).map(_.prefix(prefix))
val memTablesIter = MergeIterator(memTableIters)

val (lower, upper) = prefix.prefixRange()
// L0 SST 部分的迭代器
val ssTableIters = snapshot.l0SsTables.map(snapshot.ssTables(_))
// 过滤key范围可能包含当前scan范围的sst,减少IO
.filter(_.mayContainsPrefix(prefix))
.map(SsTableIterator.createByLowerBound(_, lower))
val l0ssTablesIter = MergeIterator(ssTableIters)

// levels SST 部分的迭代器
val levelIters = snapshot.levels.map((_, levelSstIds) => {
val levelSsts = levelSstIds
.map(snapshot.ssTables(_))
.filter(_.mayContainsPrefix(prefix))
SstConcatIterator.createByLowerBound(levelSsts, lower)
})
val levelTablesIter = MergeIterator(levelIters)

log.debug("{}, {}, {}", memTablesIter.numActiveIterators(), l0ssTablesIter.numActiveIterators(), levelTablesIter.numActiveIterators())
val mergedIter = TwoMergeIterator(TwoMergeIterator(memTablesIter, l0ssTablesIter), levelTablesIter)
// 再包两层,分别做熔断(异常处理)和 多版本控制+删除墓碑处理
FusedIterator(LsmIterator(mergedIter, upper, readTs))
}

def forceFullCompaction(): Unit = {
/*if (options.compactionOptions == CompactionOptions.NoCompaction) {
throw new IllegalStateException("full compaction can only be called with compaction is enabled")
Expand Down Expand Up @@ -421,7 +456,8 @@ private[tinylsm] case class LsmStorageInner(
val compactionFilters = this.compactionFilters.asScala.toList
while (iter.isValid) breakable {
if (builder.isEmpty) {
builder = Some(SsTableBuilder(options.blockSize, SsTableCompressor.create(options.compressorOptions), prefixBloom))
builder = Some(SsTableBuilder(options.blockSize, SsTableCompressor.create(options.compressorOptions),
PrefixBloomBuilder.fromConfig(options)))
}

val sameAsLastKey = iter.key().rawKey().equals(lastKey)
Expand Down Expand Up @@ -469,7 +505,8 @@ private[tinylsm] case class LsmStorageInner(
val sstId = nextSstId.incrementAndGet()
val sst = innerBuilder.build(sstId, Some(blockCache), fileOfSst(sstId))
newSstList += sst
innerBuilder = SsTableBuilder(options.blockSize, SsTableCompressor.create(options.compressorOptions), prefixBloom)
innerBuilder = SsTableBuilder(options.blockSize, SsTableCompressor.create(options.compressorOptions),
PrefixBloomBuilder.fromConfig(options))
builder = Some(innerBuilder)
}
innerBuilder.add(iter.key(), iter.value())
Expand Down
11 changes: 11 additions & 0 deletions src/main/scala/io/github/leibnizhu/tinylsm/MemTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ case class MemTable(
new MemTableIterator(map.subMap(l, il, r, ir).entrySet().iterator().asScala, id)
case (_, _) => null

def prefix(prefix: Key): MemTableIterator = {
val lower = MemTableKey.withBeginTs(prefix)
val upper = prefix.prefixUpperEdge()
upper match
case Unbounded() =>
new MemTableIterator(map.tailMap(lower, true).entrySet().iterator().asScala, id)
case Bounded(r: MemTableKey, ir: Boolean) =>
new MemTableIterator(map.subMap(lower, true, r, ir).entrySet().iterator().asScala, id)
case _ => null
}

def flush(builder: SsTableBuilder): Unit = {
map.forEach((k, v) => builder.add(k, v))
}
Expand Down
41 changes: 28 additions & 13 deletions src/main/scala/io/github/leibnizhu/tinylsm/SsTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import io.github.leibnizhu.tinylsm.block.{Block, BlockBuilder, BlockCache, Block
import io.github.leibnizhu.tinylsm.compress.CompressState.Decompress
import io.github.leibnizhu.tinylsm.compress.SsTableCompressor
import io.github.leibnizhu.tinylsm.iterator.*
import io.github.leibnizhu.tinylsm.utils.*
import io.github.leibnizhu.tinylsm.utils.ByteTransOps.bytesToInt
import io.github.leibnizhu.tinylsm.utils.{Bloom, Bound, ByteArrayReader, ByteArrayWriter, Excluded, FileObject, Included, PrefixBloom}
import org.slf4j.LoggerFactory

import java.io.*
Expand Down Expand Up @@ -36,7 +36,7 @@ class SsTable(val file: FileObject,
val firstKey: MemTableKey,
val lastKey: MemTableKey,
val bloom: Option[Bloom],
val prefixBloom: Option[Bloom],
val prefixBloomFilter: Option[PrefixBloom],
// SST存储的最大时间戳
val maxTimestamp: Long = 0,
val compressor: SsTableCompressor) {
Expand Down Expand Up @@ -107,12 +107,27 @@ class SsTable(val file: FileObject,
}

/**
* 当前sst里面是否可能包含指定前缀的key
*
* @param prefix 要判断的前缀
* @return 当前sst里面是否可能包含指定前缀的key
*/
def mayContainsPrefix(prefix: Array[Byte]): Boolean = {
// TODO 根据: 1.firstKey lastKey 2.bloom如果刚好有这个key(prefix就是key) 3.按前缀bloom 切割前缀判定
def mayContainsPrefix(prefix: Key): Boolean = {
// prefix 超过当前sst的 lastkey,不可能包含这个前缀的key
if (prefix.rawKey() > lastKey.rawKey()) {
return false
}
// prefix 的下个值不到当前sst的 firstKey,不可能包含这个前缀的key
prefix.prefixUpperEdge() match
case Excluded(right: Key) if right.rawKey() <= firstKey.rawKey() => return false
case _ =>

// 3.按前缀bloom 切割前缀判定
if (prefixBloomFilter.isDefined) {
if (prefixBloomFilter.get.notContainsPrefix(prefix)) {
return false
}
}
true
}

Expand Down Expand Up @@ -158,7 +173,7 @@ object SsTable {

val prefixBloomOffset = bytesToInt(file.read(dictOffset - 4, 4))
val rawPrefixBloom = file.read(prefixBloomOffset, dictOffset - 4 - prefixBloomOffset)
val prefixBloomFilter = Bloom.decode(rawPrefixBloom)
val prefixBloomFilter = PrefixBloom.decode(rawPrefixBloom)

val bloomOffset = bytesToInt(file.read(prefixBloomOffset - 4, 4))
val rawBloom = file.read(bloomOffset, prefixBloomOffset - 4 - bloomOffset)
Expand All @@ -179,7 +194,7 @@ object SsTable {
firstKey = blockMeta.head.firstKey.copy(),
lastKey = blockMeta.last.lastKey.copy(),
bloom = Some(bloomFilter),
prefixBloom = Some(prefixBloomFilter),
prefixBloomFilter = Some(prefixBloomFilter),
maxTimestamp = maxTs,
compressor = compressor
)
Expand All @@ -195,7 +210,7 @@ object SsTable {
firstKey = firstKey,
lastKey = lastKey,
bloom = None,
prefixBloom = None,
prefixBloomFilter = None,
maxTimestamp = 0,
compressor = SsTableCompressor.none()
)
Expand All @@ -208,7 +223,7 @@ object SsTable {
* @param blockSize Block大小
*/
class SsTableBuilder(val blockSize: Int, val compressor: SsTableCompressor,
val prefixBloom: PrefixBloom = PrefixBloom.empty()) {
val prefixBloomBuilder: PrefixBloomBuilder = PrefixBloomBuilder.empty()) {
private val log = LoggerFactory.getLogger(this.getClass)
// 当前Block的builder
private var builder = BlockBuilder(blockSize, compressor)
Expand All @@ -235,7 +250,7 @@ class SsTableBuilder(val blockSize: Int, val compressor: SsTableCompressor,
maxTs = key.ts
}
keyHashes.addOne(key.keyHash())
this.prefixBloom.addKey(key)
this.prefixBloomBuilder.addKey(key)
// value字典采样
if (compressor.needTrainDict()) {
compressor.addDictSample(value)
Expand Down Expand Up @@ -284,7 +299,7 @@ class SsTableBuilder(val blockSize: Int, val compressor: SsTableCompressor,

// 生成压缩字典、对value执行压缩
val dict = compressor.generateDict()
val (buffer, finalMatas) = compressor.compressSsTable(blockSize, data, blocks, meta.toArray, prefixBloom)
val (buffer, finalMatas) = compressor.compressSsTable(blockSize, data, blocks, meta.toArray, prefixBloomBuilder)

// meta写入buffer
val metaOffset = buffer.length
Expand All @@ -298,9 +313,9 @@ class SsTableBuilder(val blockSize: Int, val compressor: SsTableCompressor,
buffer.putUint32(bloomOffset)

// 前缀bloom 写入buffer
val prefixBloomFilter = prefixBloom.bloom()
val prefixBloomOffset = buffer.length
prefixBloomFilter.encode(buffer)
val prefixBloom = prefixBloomBuilder.build()
prefixBloom.encode(buffer)
buffer.putUint32(prefixBloomOffset)

// 压缩字典写入buffer
Expand All @@ -322,7 +337,7 @@ class SsTableBuilder(val blockSize: Int, val compressor: SsTableCompressor,
firstKey = finalMatas.head.firstKey.copy(),
lastKey = finalMatas.last.lastKey.copy(),
bloom = Some(bloomFilter),
prefixBloom = Some(prefixBloomFilter),
prefixBloomFilter = Some(prefixBloom),
maxTimestamp = maxTs,
compressor = compressor
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import io.github.leibnizhu.tinylsm.compress.CompressState.{Compress, Decompress,
import io.github.leibnizhu.tinylsm.compress.CompressorOptions.{Lz4, Zlib, Zstd}
import io.github.leibnizhu.tinylsm.compress.SsTableCompressor.none
import io.github.leibnizhu.tinylsm.iterator.SsTableIterator
import io.github.leibnizhu.tinylsm.utils.{ByteArrayWriter, PrefixBloom}
import io.github.leibnizhu.tinylsm.utils.{ByteArrayWriter, PrefixBloomBuilder}
import io.github.leibnizhu.tinylsm.{MemTableValue, SsTable, SsTableBuilder}
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -57,7 +57,7 @@ trait SsTableCompressor extends AutoCloseable {
*/
def compressSsTable(blockSize: Int, blockData: ByteArrayWriter,
blocks: ArrayBuffer[Block], meta: Array[BlockMeta],
prefixBloom: PrefixBloom): (ByteArrayWriter, Array[BlockMeta]) =
prefixBloom: PrefixBloomBuilder): (ByteArrayWriter, Array[BlockMeta]) =
if (needTrainDict()) {
// 如果需要训练字典,那么之前生成的sst是没压缩的,需要重新遍历这个未完成的sst的数据,重新压缩生成新的block和meta数据
val sstId = -1
Expand All @@ -75,7 +75,7 @@ trait SsTableCompressor extends AutoCloseable {
firstKey = meta.head.firstKey.copy(),
lastKey = meta.last.lastKey.copy(),
bloom = None,
prefixBloom = None,
prefixBloomFilter = None,
maxTimestamp = 0,
// 前面没压缩,可以直接读
compressor = none(Decompress)
Expand Down
18 changes: 16 additions & 2 deletions src/main/scala/io/github/leibnizhu/tinylsm/mvcc/Transaction.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.github.leibnizhu.tinylsm.mvcc

import io.github.leibnizhu.tinylsm.iterator.TwoMergeIterator
import io.github.leibnizhu.tinylsm.utils.{Bound, Mutex}
import io.github.leibnizhu.tinylsm.{DELETE_TOMBSTONE, LsmStorageInner, RawKey, WriteBatchRecord}
import io.github.leibnizhu.tinylsm.utils.{Bound, Included, Mutex}
import io.github.leibnizhu.tinylsm.{DELETE_TOMBSTONE, Key, LsmStorageInner, MemTableKey, RawKey, WriteBatchRecord}
import org.slf4j.LoggerFactory

import java.util
Expand Down Expand Up @@ -71,6 +71,20 @@ case class Transaction(
iterator
}

def prefix(prefix: Key): TxnIterator = {
if (committed.get()) {
throw new IllegalStateException(s"cannot operate on committed Transaction(ID=$tid)!")
}
val fuseIter = inner.prefixWithTs(prefix, readTs)
val (lower, upper) = prefix.prefixRange()
val localIter = TxnLocalIterator(localStorage, lower, upper)
val iterator = TxnIterator(this.copy(), TwoMergeIterator(localIter, fuseIter))
if (readOnce) {
rollback()
}
iterator
}

def put(key: String, value: String): Unit = put(key.getBytes, value.getBytes)

def put(bytes: Array[Byte], value: Array[Byte]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ class ByteArrayWriter {
this
}

def putBoolean(b: Boolean): ByteArrayWriter = {
if (b) {
buffer.append(1.toByte)
} else {
buffer.append(0.toByte)
}
this
}

}


Loading

0 comments on commit 5aabd66

Please sign in to comment.