Skip to content

Commit

Permalink
Merge pull request #2 from luoluoyuyu/Imp-opendb
Browse files Browse the repository at this point in the history
Improvement of openDB
  • Loading branch information
luoluoyuyu authored Oct 17, 2023
2 parents 6ca185f + e594edf commit 8f84aed
Show file tree
Hide file tree
Showing 41 changed files with 1,715 additions and 322 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# OpenDB

Based on LSM tree-structured database with up to 100,000 data writes per second.

## Create DB

```
openDB = OpenDBImp.open(new Options(),"src/test/resources/data/");
ColumnFamilyDescriptor columnFamilyDescriptor= new ColumnFamilyDescriptor();
columnFamilyDescriptor.setName("luoluoyuyu");
columnFamilyDescriptor.setKeyType(KeyType.intKey);
columnFamilyDescriptor.setValueType(ValueType.intValue);
columnFamilyDescriptor.setBlockSize(1<<12);
openDB.createColumnFamily(columnFamilyDescriptor);
Key key = new IntKey(1);
Value value = new IntValue(1);
openDB.put(key, value, columnFamilyHandle);
columnFamilyHandle=openDB.getColumnFamilyHandle("luoluoyuyu").date;
```
Original file line number Diff line number Diff line change
Expand Up @@ -16,73 +16,83 @@
*/
package net.openio.opendb.compression;

import net.openio.opendb.model.SequenceNumber;
import net.openio.opendb.db.ColumnFamily;
import net.openio.opendb.db.SnapshotManager;
import net.openio.opendb.storage.metadata.Levels;
import net.openio.opendb.storage.sstable.SSTable;

import java.util.LinkedList;
import java.util.List;

public class CompactionTask implements Runnable {

Levels levels;
private List<ColumnFamily> list;

int maxLevel;
private int maxLevel;

SizeTieredCompaction compaction;
private SnapshotManager snapshotManager;

LeveledCompaction leveledCompaction;
private int sizeTieredLevel;

SequenceNumber sequenceNumber;
private int ssTableSize;

int sizeTieredLevel;

public CompactionTask(Levels levels, int maxLevel, SizeTieredCompaction compaction,
LeveledCompaction leveledCompaction,
SequenceNumber sequenceNumber, int sizeTieredLevel) {
this.levels = levels;
public CompactionTask(List<ColumnFamily> list, int maxLevel,
SnapshotManager snapshotManager, int sizeTieredLevel, int ssTableSize) {
this.list = list;
this.maxLevel = maxLevel;
this.compaction = compaction;
this.leveledCompaction = leveledCompaction;
this.sequenceNumber = sequenceNumber;
this.snapshotManager = snapshotManager;
this.sizeTieredLevel = sizeTieredLevel;
this.ssTableSize = ssTableSize;
}

@Override
public void run() {
levels.setBeingCompactedLevel(0);
if (!levels.getWaitToMerge().isEmpty()) {
levels.addList(0, levels.getWaitToMerge());
levels.getWaitToMerge().clear();
List<ColumnFamily> columnFamilies = null;
synchronized (list) {
columnFamilies = new LinkedList<>(list);
}
int level = 0;
while (level < maxLevel) {
if (levels.getLevel(level) == null) {
break;
for (ColumnFamily columnFamily : columnFamilies) {

Levels levels = columnFamily.getLevels();
synchronized (levels) {
levels.setBeingCompactedLevel(0);
}
levels.setBeingCompactedLevel(level);
if (levels.getAllSize() > levels.getLevel0CompactionTrigger() << level) {
if (level < sizeTieredLevel) {
try {
List<SSTable> list = compaction.compaction(levels.getLevel(level).getSsTables(), sequenceNumber);
levels.getLevels().get(level + 1).addSSTables(list);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("compaction is wrong");
}
int level = 0;
while (level < maxLevel) {
if (levels.getLevel(level) == null) {
break;
}
levels.setBeingCompactedLevel(level);
if (levels.getAllSize() > levels.getLevel0CompactionTrigger() << level) {
if (level < sizeTieredLevel) {
try {
SizeTieredCompaction compaction = new SizeTieredCompaction(columnFamily.getBufferCache(),
columnFamily.getStorage(), ssTableSize);
List<SSTable> list = compaction.compaction(levels.getLevel(level).getSsTables(),
snapshotManager.getMinSnapshot());
levels.getLevels().get(level + 1).addSSTables(list);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("compaction is wrong");
}

} else {
try {
List<SSTable> list = leveledCompaction.compaction(levels.getLevel(level).getSsTables(), sequenceNumber);
levels.getLevels().get(level + 1).addSSTables(list);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("compaction is wrong");
} else {
try {
LeveledCompaction leveledCompaction = new LeveledCompaction(columnFamily.getBufferCache(),
columnFamily.getStorage(), ssTableSize);
List<SSTable> list = leveledCompaction.compaction(levels.getLevel(level).getSsTables(),
snapshotManager.getMinSnapshot());
levels.getLevels().get(level + 1).addSSTables(list);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("compaction is wrong");
}
}
}
level++;
}
level++;
}

levels.setBeingCompactedLevel(-1);
levels.setBeingCompactedLevel(-1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
package net.openio.opendb.db;

import net.openio.opendb.log.Log;
import net.openio.opendb.model.SequenceNumber;
import net.openio.opendb.storage.wal.LogStorage;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


public class AsynchronousWriteBatch extends WriteBatch {
Expand All @@ -33,53 +36,100 @@ public class AsynchronousWriteBatch extends WriteBatch {

private final int size;

ExecutorService executorService = Executors.newSingleThreadExecutor();
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

public AsynchronousWriteBatch(List<String> file, LogStorage walStorage, int size) {
public AsynchronousWriteBatch(List<String> file, LogStorage walStorage, int size, long maxSueque) {
super(maxSueque);
this.file = file;
this.walStorage = walStorage;
this.size = size;

executorService.scheduleAtFixedRate(new WalTask(this), 1, 1, TimeUnit.SECONDS);

}


@Override
void step(List<Log> list) {
executorService.execute(new WalTask(list));
if (walStorage.logFileSize() > size) {
file.add(walStorage.createNewFile());
public List<String> getFile() {
return file;
}

@Override
public List<Log> getWal(SequenceNumber sequenceNumber) {
List<Log> walLogs = new LinkedList<>();
for (int i = file.size() - 1; i >= 0; i--) {
try {
List<Log> l = null;
walLogs.addAll(l = walStorage.getLogs(sequenceNumber, file.get(i)));
if (l.size() == 0) {
return walLogs;
}
} catch (IOException e) {
e.printStackTrace();
return walLogs;
}
}
this.notifyAll();
return walLogs;
}

@Override
void waitForNextNode(WriteBatch.Node snapshot) {
boolean isSyn() {
return false;
}

@Override
void step(List<Log> list) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
this.notifyAll();
} catch (Exception e) {
}
while (snapshot.next == null) {
}

@Override
void waitForNextNode(WriteBatch.Node snapshot) {
do {
try {
this.wait();
} catch (Exception e) {
}
} while (this.writeSequenceNumber < snapshot.sequenceNumber.getTimes());

}
snapshot.pre.next = null;
}

@Override
void close() {
executorService.shutdown();
}

class WalTask implements Runnable {

List<Log> list;
final WriteBatch writeBatch;

@Override
public void run() {
List<Log> list = null;
synchronized (writeBatch) {
if (writeBatch.logs.size() == 0) {
return;
}
list = writeBatch.logs;
this.writeBatch.logs = new LinkedList<>();
}
try {
walStorage.addLogs(list);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("can not write logs");
}
if (walStorage.logFileSize() > size) {
file.add(walStorage.createNewFile());
}
}

WalTask(List<Log> list) {
this.list = list;

WalTask(WriteBatch writeBatch) {
this.writeBatch = writeBatch;
}
}
}
Loading

0 comments on commit 8f84aed

Please sign in to comment.