Skip to content

Commit

Permalink
feat:
Browse files Browse the repository at this point in the history
1. rename app class
2. cli supports 'state' command, and print data dir & config file path
3. fix redundant WAL files remaining bug
  • Loading branch information
Leibnizhu committed Apr 28, 2024
1 parent 0e9fa52 commit 505d7a7
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 31 deletions.
29 changes: 21 additions & 8 deletions src/main/scala/io/github/leibnizhu/tinylsm/LsmStorageInner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object LsmStorageInner {
val nextSstId = AtomicInteger(0)
val blockCache = BlockCache.apply(128)
val compactionController = CompactionController(options.compactionOptions)
val manifestFile = new File(path, "MANIFEST")
val manifestFile = new File(path, Manifest.fileName)
val manifest = new Manifest(manifestFile, options.targetManifestSize)
var lastCommitTs = 0L;
if (manifestFile.exists()) {
Expand Down Expand Up @@ -70,17 +70,27 @@ object LsmStorageInner {
log.info("{} SST opened", sstCnt)

val newMemtableId = nextSstId.incrementAndGet()

if (options.enableWal) {
// 从 wal 恢复Memtable,到了这里,memTables 里面是从Manifest恢复的、没flush的Memtable
var walCnt = 0
for (mtId <- memTables) {
val memTable = MemTable.recoverFromWal(mtId, fileOfWal(path, mtId))
if (memTable.nonEmpty) {
state.immutableMemTables = memTable :: state.immutableMemTables
val memTableMaxTs = memTable.map.keySet().asScala.map(_.ts).max
lastCommitTs = lastCommitTs.max(memTableMaxTs)
walCnt += 1
val walFile = fileOfWal(path, mtId)
if (walFile.exists()) {
val memTable = MemTable.recoverFromWal(mtId, walFile)
if (memTable.nonEmpty) {
state.immutableMemTables = memTable :: state.immutableMemTables
val memTableMaxTs = memTable.map.keySet().asScala.map(_.ts).max
lastCommitTs = lastCommitTs.max(memTableMaxTs)
walCnt += 1
} else {
// 如果wal为空,那么对应的memtable不会加入到immutableMemTables,在下次flush的时候也不会被处理,所以wal可以删除
// 一般来说wal为空有两种情况:1.最新的Memtable,没有数据 2.手动强制flush了,flush的时候Memtable里没有数据
val deletedWal = walFile.delete()
log.info("Deleted empty wal file {} success: {}", walFile, deletedWal)
}
}
// 有manifest记录,但没有wal文件,一般是wal文件为空、在上一次启动恢复的时候删除了
}
log.info("{} MemTable recovered from wal", walCnt)
state.memTable = MemTable(newMemtableId, Some(fileOfWal(path, newMemtableId)))
Expand Down Expand Up @@ -516,7 +526,10 @@ private[tinylsm] case class LsmStorageInner(
println()
}

def dumpState(): String = state.dumpState()
def dumpState(): String =
s"""Data dir:\t${path.getAbsolutePath}
Config file path:\t${Config.configFilePath}
${state.dumpState()}"""

def triggerFlush(): Unit = {
val (memtableNum, numLimit) = state.read(st => (st.immutableMemTables.length, options.numMemTableLimit))
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/io/github/leibnizhu/tinylsm/Manifest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ class Manifest(file: File, targetSize: Int = 1024) {
}
}

object Manifest {
val fileName = "MANIFEST"
}

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import akka.actor.typed.{ActorRef, Behavior}
import akka.http.scaladsl.server.Directives.complete
import akka.http.scaladsl.server.StandardRoute
import io.github.leibnizhu.tinylsm.app.BizCode.*
import io.github.leibnizhu.tinylsm.iterator.StorageIterator
import io.github.leibnizhu.tinylsm.mvcc.Transaction
import io.github.leibnizhu.tinylsm.utils.Bound
import io.github.leibnizhu.tinylsm.{Key, MemTableEntry, MemTableValue, RawKey, TinyLsm}
import io.github.leibnizhu.tinylsm.{Key, MemTableValue, TinyLsm}

import java.util.StringJoiner

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@ package io.github.leibnizhu.tinylsm.app
import akka.actor.typed.ActorSystem
import com.google.protobuf.ByteString
import io.github.leibnizhu.tinylsm.app.ApiCommands.*
import io.github.leibnizhu.tinylsm.app.BizCode.{Success, TransactionInvalid, TransactionNotExists}
import io.github.leibnizhu.tinylsm.app.BizCode.Success
import io.github.leibnizhu.tinylsm.grpc.*
import io.github.leibnizhu.tinylsm.iterator.StorageIterator
import io.github.leibnizhu.tinylsm.mvcc.Transaction
import io.github.leibnizhu.tinylsm.utils.Bound
import io.github.leibnizhu.tinylsm.{Key, MemTableValue, RawKey, TinyLsm}
import io.github.leibnizhu.tinylsm.{Key, MemTableValue, TinyLsm}

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future

class TinyLsmRpcServiceImpl(state: ApiCommands.InnerState,
system: ActorSystem[_]) extends TinyLsmRpcService {
class GrpcServiceImpl(state: ApiCommands.InnerState,
system: ActorSystem[_]) extends TinyLsmRpcService {

override def getKey(in: GetKeyRequest): Future[ValueReply] = {
val response = GetByKey(in.key.toByteArray, in.tid, null).wrapBehavior(state)
Expand Down Expand Up @@ -86,7 +84,7 @@ class TinyLsmRpcServiceImpl(state: ApiCommands.InnerState,
}
}

object TinyLsmRpcServiceImpl {
def apply(storage: TinyLsm, transactions: java.util.Map[Int, Transaction], system: ActorSystem[_]): TinyLsmRpcServiceImpl =
new TinyLsmRpcServiceImpl(ApiCommands.InnerState(storage, transactions), system)
object GrpcServiceImpl {
def apply(storage: TinyLsm, transactions: java.util.Map[Int, Transaction], system: ActorSystem[_]): GrpcServiceImpl =
new GrpcServiceImpl(ApiCommands.InnerState(storage, transactions), system)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import akka.http.scaladsl.server.Route
import akka.util.Timeout
import io.github.leibnizhu.tinylsm.app.ApiCommands.*
import io.github.leibnizhu.tinylsm.utils.Bound
import io.github.leibnizhu.tinylsm.{Key, MemTableValue}

import java.time.Duration
import scala.concurrent.Future

class TinyLsmHttpRoutes(registry: ActorRef[ApiCommands.Command])
(implicit val system: ActorSystem[_]) {
class HttpRoutes(registry: ActorRef[ApiCommands.Command])
(implicit val system: ActorSystem[_]) {

//#user-routes-class

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object TinyLsmCli {
case "commit" => cliContext.commitCurrentTxn()
case "rollback" => cliContext.rollbackCurrentTxn()
case "flush" => cliContext.flush()
case "status" => cliContext.status()
case "status" | "state" => cliContext.status()
case _ => println(s"Unsupported command: '${words.head}', you can type :help for more information or <TAB> for auto complete")
}

Expand All @@ -79,11 +79,11 @@ object TinyLsmCli {
| -p: TinyLSM port, default value is 9527.
|Commands:
| get <key> : Get value by key.
| delete <key> : Delete a key.
| del|delete <key> : Delete a key.
| put <key> <value> : Put value by key.
| scan <Unbound|Excluded|Included> <fromKey> <Unbound|Excluded|Included> <toKey> : Scan by key range.
| flush : Force flush MemTable to SST.
| status : Show TinyLSM status.
| state|status : Show TinyLSM status.
| txn: Start a new transaction
| commit: Commit current transaction
| rollback: rollback current transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class TinyLsmServer(storage: TinyLsm, host: String, httpPort: Int, rpcPort: Int)
val httpRegistry = TinyLsmHttpRegistry(storage, transactions).registry()
val tinyLsmActor = context.spawn(httpRegistry, "TinyLsmActor")
context.watch(tinyLsmActor)
val routes = new TinyLsmHttpRoutes(tinyLsmActor)(context.system)
val routes = new HttpRoutes(tinyLsmActor)(context.system)
startHttpServer(routes.routes)(context.system)
startRpcServer()(context.system)
Behaviors.empty
Expand Down Expand Up @@ -63,7 +63,7 @@ class TinyLsmServer(storage: TinyLsm, host: String, httpPort: Int, rpcPort: Int)
implicit val ec: ExecutionContext = system.executionContext

val service: HttpRequest => Future[HttpResponse] =
TinyLsmRpcServiceHandler(TinyLsmRpcServiceImpl(storage, transactions, system))
TinyLsmRpcServiceHandler(GrpcServiceImpl(storage, transactions, system))

val bound: Future[Http.ServerBinding] = Http()
.newServerAt(host, rpcPort)
Expand Down
13 changes: 9 additions & 4 deletions src/main/scala/io/github/leibnizhu/tinylsm/utils/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,22 @@ object Config {
}

private def loadConfigFile(): Properties = {
val configFileEnvName = "TINY_LSM_CONFIG_FILE"
val configFileSysPropName = toPropertyName("CONFIG_FILE")
val configFile = System.getProperty(configFileSysPropName,
System.getenv().getOrDefault(configFileEnvName, "/etc/tinylsm/tinylsm.conf"))
val configFile: String = configFilePath
val prop = Properties()
if (new File(configFile).exists()) {
prop.load(new FileInputStream(configFile))
}
prop
}

def configFilePath = {
val configFileEnvName = "TINY_LSM_CONFIG_FILE"
val configFileSysPropName = toPropertyName("CONFIG_FILE")
val configFile = System.getProperty(configFileSysPropName,
System.getenv().getOrDefault(configFileEnvName, "/etc/tinylsm/tinylsm.conf"))
configFile
}

def print(): Unit = {
println("TinyLsm configurations:")
Config.values.foreach(c => println(s"\t${c.sysPropName} => ${c.get()}"))
Expand Down

0 comments on commit 505d7a7

Please sign in to comment.