Skip to content

Commit

Permalink
Implement commit and rollback
Browse files Browse the repository at this point in the history
  • Loading branch information
ben221199 committed Sep 12, 2024
1 parent d729d86 commit 7d2ca00
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 16 deletions.
103 changes: 87 additions & 16 deletions src/main/java/com/lbry/database/PrefixDB.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
package com.lbry.database;

import com.lbry.database.keys.UndoKey;
import com.lbry.database.revert.RevertibleDelete;
import com.lbry.database.revert.RevertibleOperation;
import com.lbry.database.revert.RevertibleOperationStack;
import com.lbry.database.revert.RevertiblePut;
import com.lbry.database.rows.*;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.*;

import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.*;

/**
* Class for a revertible RocksDB database: A RocksDB database where each set of applied changes can be undone.
Expand Down Expand Up @@ -81,6 +76,14 @@ public PrefixDB(String path,int maxOpenFiles,String secondaryPath) throws RocksD
}

public PrefixDB(String path,int maxOpenFiles,String secondaryPath,int maxUndoDepth) throws RocksDBException{
this(path,maxOpenFiles,secondaryPath,maxUndoDepth,null);
}

public PrefixDB(String path,int maxOpenFiles,String secondaryPath,int maxUndoDepth,Set<Byte> unsafePrefixes) throws RocksDBException{
this(path,maxOpenFiles,secondaryPath,maxUndoDepth,unsafePrefixes,true);
}

public PrefixDB(String path,int maxOpenFiles,String secondaryPath,int maxUndoDepth,Set<Byte> unsafePrefixes,boolean enforceIntegrity) throws RocksDBException{
List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
for(Prefix prefix : Prefix.values()){
Expand All @@ -100,8 +103,6 @@ public PrefixDB(String path,int maxOpenFiles,String secondaryPath,int maxUndoDep

this.database = RocksDB.open(new DBOptions(options),path,columnFamilyDescriptors,this.columnFamilyHandles);

Set<Byte> unsafePrefixes = new HashSet<>();//TODO
boolean enforceIntegrity = false;//TODO
this.operationStack = new RevertibleOperationStack((byte[] key) -> {
try{
return Optional.of(this.get(key));
Expand Down Expand Up @@ -161,28 +162,98 @@ public PrefixDB(String path,int maxOpenFiles,String secondaryPath,int maxUndoDep
/**
* Write staged changes to the database without keeping undo information. Changes written cannot be undone.
*/
public void unsafeCommit(){
public void unsafeCommit() throws RocksDBException{
this.applyStash();
WriteOptions writeOptions = new WriteOptions().setSync(true);
try{
//TODO
if(this.operationStack.length()!=0){
return;
}
WriteBatch batch = new WriteBatch();
for(RevertibleOperation stagedChange : this.operationStack.interate()){
ColumnFamilyHandle columnFamily = this.getColumnFamilyByPrefix(Prefix.getByValue(stagedChange.getKey()[0]));
if(!stagedChange.isDelete()){
batch.put(columnFamily,stagedChange.getKey(),stagedChange.getValue());
}else{
batch.delete(columnFamily,stagedChange.getKey());
}
this.database.write(writeOptions,batch);
}
}finally{
writeOptions.close();
this.operationStack.clear();
}
}

public void commit(){
public void commit(int height,byte[] blockHash) throws RocksDBException{
this.applyStash();
byte[] undoOperations = this.operationStack.getUndoOperations();
List<byte[]> deleteUndos = new ArrayList<>();
if(height>this.maxUndoDepth){
byte[] upperBound = ByteBuffer.allocate(1+8).order(ByteOrder.BIG_ENDIAN).put(Prefix.UNDO.getValue()).putLong(height-this.maxUndoDepth).array();
RocksIterator iterator = this.database.newIterator(new ReadOptions().setIterateUpperBound(new Slice(upperBound)));
iterator.seek(ByteBuffer.allocate(1+8).order(ByteOrder.BIG_ENDIAN).put(Prefix.UNDO.getValue()).array());
while(iterator.isValid()){
deleteUndos.add(iterator.key());
iterator.next();
}
}
try{
//TODO
ColumnFamilyHandle undoColumnFamily = this.getColumnFamilyByPrefix(Prefix.UNDO);
WriteOptions writeOptions = new WriteOptions().setSync(true);
try{
WriteBatch batch = new WriteBatch();
for(RevertibleOperation stagedChange : this.operationStack.interate()){
ColumnFamilyHandle columnFamily = this.getColumnFamilyByPrefix(Prefix.getByValue(stagedChange.getKey()[0]));
if(!stagedChange.isDelete()){
batch.put(columnFamily,stagedChange.getKey(),stagedChange.getValue());
}else{
batch.delete(columnFamily,stagedChange.getKey());
}

}
for(byte[] undoToDelete : deleteUndos){
batch.delete(undoColumnFamily,undoToDelete);
}
UndoKey undoKey = new UndoKey();
undoKey.height = height;
undoKey.block_hash = blockHash;
byte[] undoKeyBytes = this.undo.packKey(undoKey);
batch.put(undoColumnFamily,undoKeyBytes,undoOperations);
this.database.write(writeOptions,batch);
}finally{
writeOptions.close();
this.operationStack.clear();
}
}finally{
this.operationStack.clear();
}
}

public void rollback(int height,byte[] blockHash){
public void rollback(int height,byte[] blockHash) throws RocksDBException{
UndoKey undoKey = new UndoKey();
undoKey.height = height;
undoKey.block_hash = blockHash;
byte[] undoKeyBytes = this.undo.packKey(undoKey);
ColumnFamilyHandle undoColumnFamily = this.getColumnFamilyByPrefix(Prefix.UNDO);
byte[] undoInfo = this.database.get(undoColumnFamily,undoKeyBytes);
this.operationStack.applyPackedUndoOperations(undoInfo);
this.operationStack.validateAndApplyStashedOperations();
WriteOptions writeOptions = new WriteOptions().setSync(true);
try{
//TODO
WriteBatch batch = new WriteBatch();
for(RevertibleOperation stagedChange : this.operationStack.interate()){
ColumnFamilyHandle columnFamily = this.getColumnFamilyByPrefix(Prefix.getByValue(stagedChange.getKey()[0]));
if(!stagedChange.isDelete()){
batch.put(columnFamily,stagedChange.getKey(),stagedChange.getValue());
}else{
batch.delete(columnFamily,stagedChange.getKey());
}
this.database.write(writeOptions,batch);
}
// batch.delete(undoKey)
}finally{
writeOptions.close();
this.operationStack.clear();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class RevertibleOperationStack{

Expand Down Expand Up @@ -419,6 +420,14 @@ public void clear(){
this.stashedLastOperationForKey.clear();
}

public int length(){
return this.items.values().stream().mapToInt(x -> x.length).sum();
}

public Iterable<RevertibleOperation> interate(){
return this.items.values().stream().flatMap(Stream::of).collect(Collectors.toList());
}

/**
* Get the serialized bytes to undo all of the changes made by the pending ops
*/
Expand Down

0 comments on commit 7d2ca00

Please sign in to comment.