Skip to content
This repository has been archived by the owner on Sep 28, 2023. It is now read-only.

Commit

Permalink
Use atomic batching of writes
Browse files Browse the repository at this point in the history
Under the hood, RocksDB->put uses batches anyway. This makes the use "over the
hood". I'm guessing it also means that we only have to acquire locks against
the memtable+WAL once, instead of multiple times.
  • Loading branch information
hashbrowncipher committed Mar 21, 2018
1 parent b5a1bb5 commit 782a835
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 67 deletions.
20 changes: 8 additions & 12 deletions src/java/org/apache/cassandra/rocksdb/RocksDBCF.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Hex;

import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.CassandraCompactionFilter;
import org.rocksdb.CassandraValueMergeOperator;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionPriority;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.Env;
import org.rocksdb.FlushOptions;
Expand All @@ -75,6 +75,7 @@
import org.rocksdb.RocksDBException;
import org.rocksdb.Statistics;
import org.rocksdb.StatsLevel;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

import static org.apache.cassandra.rocksdb.RocksDBConfigs.MERGE_OPERANDS_LIMIT;
Expand All @@ -98,7 +99,6 @@ public class RocksDBCF implements RocksDBCFMBean
private final CassandraValueMergeOperator mergeOperator;

private final ReadOptions readOptions;
private final WriteOptions disableWAL;
private final FlushOptions flushOptions;

private final int gcGraceSeconds;
Expand Down Expand Up @@ -152,7 +152,6 @@ public RocksDBCF(ColumnFamilyStore cfs) throws RocksDBException
// until compaction happens. However in our case, range deletion is only used to remove ranges
// no longer owned by this node. In such case, stale keys would never be quried.
readOptions = new ReadOptions().setIgnoreRangeDeletions(true);
disableWAL = new WriteOptions().setDisableWAL(true);
flushOptions = new FlushOptions().setWaitForFlush(true);

// Register the mbean.
Expand Down Expand Up @@ -309,17 +308,14 @@ public RocksDBIteratorAdapter newShardIterator(int shardId, ReadOptions options)
return new RocksDBIteratorAdapter(rocksDB.newIterator(options), rocksMetrics);
}

public void merge(DecoratedKey partitionKey, byte[] key, byte[] value) throws RocksDBException
public void write(DecoratedKey partitionKey, WriteBatch batch, boolean writeCommitLog) throws RocksDBException
{
RocksDB rocksDB = getRocksDBFromKey(partitionKey);
if (RocksDBConfigs.DISABLE_WRITE_TO_COMMITLOG)
{
rocksDB.merge(disableWAL, key, value);
}
else
{
rocksDB.merge(key, value);
WriteOptions options = new WriteOptions();
if(!writeCommitLog) {
options.setDisableWAL(true);
}

getRocksDBFromKey(partitionKey).write(options, batch);
}

public void deleteRange(byte[] start, byte[] end) throws RocksDBException
Expand Down
92 changes: 47 additions & 45 deletions src/java/org/apache/cassandra/rocksdb/RocksDBEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFutureTask;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -114,20 +115,64 @@ public void openColumnFamilyStore(ColumnFamilyStore cfs)

public void apply(ColumnFamilyStore cfs, PartitionUpdate update, UpdateTransaction indexer, boolean writeCommitLog)
{
//TODO: WriteBatch takes a hint for how many bytes to reserve that we might want to use.
WriteBatch batch = new WriteBatch();
DecoratedKey partitionKey = update.partitionKey();

for (Row row : update)
{
applyRowToRocksDB(cfs, writeCommitLog, partitionKey, indexer, row);
addWriteToBatch(batch, cfs, partitionKey, row, indexer);
}

Row staticRow = update.staticRow();
if (!staticRow.isEmpty())
{
applyRowToRocksDB(cfs, writeCommitLog, partitionKey, indexer, staticRow);
addWriteToBatch(batch, cfs, partitionKey, staticRow, indexer);
}

RocksDBCF dbcf = rocksDBFamily.get(cfs.metadata.cfId);
try {
dbcf.write(partitionKey, batch, writeCommitLog);
}
catch (RocksDBException e)
{
logger.error(e.toString(), e);
} finally {
indexer.commit();
}
}

private void addWriteToBatch(WriteBatch b,
ColumnFamilyStore cfs,
DecoratedKey partitionKey,
Row row,
UpdateTransaction indexer)
{

Clustering clustering = row.clustering();

byte[] rocksDBKey = RowKeyEncoder.encode(partitionKey, clustering, cfs.metadata);
byte[] rocksDBValue = RowValueEncoder.encode(cfs.metadata, row);

b.merge(rocksDBKey, rocksDBValue);

if (indexer != UpdateTransaction.NO_OP)
{
try
{
secondaryIndexMetrics.rsiTotalInsertions.inc();
indexer.onInserted(row);
}
catch (RuntimeException e)
{
secondaryIndexMetrics.rsiInsertionFailures.inc();
logger.error(e.toString(), e);
throw new StorageEngineException("Index update failed", e);
}
}
}


public UnfilteredRowIterator queryStorage(ColumnFamilyStore cfs, SinglePartitionReadCommand readCommand)
{
Partition partition = new RocksDBPartition(rocksDBFamily.get(cfs.metadata.cfId),
Expand Down Expand Up @@ -221,49 +266,6 @@ public AbstractStreamReceiveTask getStreamReceiveTask(StreamSession session, Str
return new RocksDBStreamReceiveTask(session, summary.cfId, summary.files, summary.totalSize);
}

private void applyRowToRocksDB(ColumnFamilyStore cfs,
boolean writeCommitLog,
DecoratedKey partitionKey,
UpdateTransaction indexer,
Row row)
{

Clustering clustering = row.clustering();

byte[] rocksDBKey = RowKeyEncoder.encode(partitionKey, clustering, cfs.metadata);
byte[] rocksDBValue = RowValueEncoder.encode(cfs.metadata, row);

try
{
indexer.start();
rocksDBFamily.get(cfs.metadata.cfId).merge(partitionKey, rocksDBKey, rocksDBValue);
if (indexer != UpdateTransaction.NO_OP)
{
try
{
secondaryIndexMetrics.rsiTotalInsertions.inc();
indexer.onInserted(row);
}
catch (RuntimeException e)
{
secondaryIndexMetrics.rsiInsertionFailures.inc();
logger.error(e.toString(), e);
throw new StorageEngineException("Index update failed", e);
}

}
}
catch (RocksDBException e)
{
logger.error(e.toString(), e);
throw new StorageEngineException("Row merge failed", e);
}
finally
{
indexer.commit();
}
}

public static RocksDBCF getRocksDBCF(UUID cfId)
{
ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(cfId);
Expand Down
29 changes: 19 additions & 10 deletions test/unit/org/apache/cassandra/rocksdb/RocksDBCFTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import org.apache.cassandra.rocksdb.encoding.value.RowValueEncoder;
import org.apache.cassandra.utils.Hex;
import org.rocksdb.IndexType;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand All @@ -47,6 +49,13 @@ public class RocksDBCFTest extends RocksDBTestBase
{
final DecoratedKey dk = Util.dk("test_key");

public void writeKeyValue(RocksDBCF cf, DecoratedKey partitionKey, byte[] key, byte[] value) throws RocksDBException
{
WriteBatch batch = new WriteBatch();
batch.merge(key, value);
cf.write(partitionKey, batch, true);
}

@Test
public void testMerge() throws RocksDBException
{
Expand All @@ -57,7 +66,7 @@ public void testMerge() throws RocksDBException
RocksDBCF rocksDBCF = RocksDBEngine.getRocksDBCF(cfs.metadata.cfId);
byte[] key = "test_key".getBytes();
byte[] value = encodeValue(cfs, "test_value");
rocksDBCF.merge(dk, key, value);
writeKeyValue(rocksDBCF, dk, key, value);
assertArrayEquals(value, rocksDBCF.get(dk, key));
}

Expand All @@ -84,10 +93,10 @@ public void testDeleteRange() throws RocksDBException
byte[] d = "d".getBytes();
byte[] value = encodeValue(cfs, "test_value");

rocksDBCF.merge(dk, a, value);
rocksDBCF.merge(dk, b, value);
rocksDBCF.merge(dk, c, value);
rocksDBCF.merge(dk, d, value);
writeKeyValue(rocksDBCF, dk, a, value);
writeKeyValue(rocksDBCF, dk, b, value);
writeKeyValue(rocksDBCF, dk, c, value);
writeKeyValue(rocksDBCF, dk, d, value);

rocksDBCF.deleteRange(b, d);
rocksDBCF.compactRange();
Expand All @@ -109,7 +118,7 @@ public void testTruncate() throws RocksDBException
byte[] key = "test_key".getBytes();
byte[] value = encodeValue(cfs, "test_value");

rocksDBCF.merge(dk, key, value);
writeKeyValue(rocksDBCF, dk, key, value);
assertArrayEquals(value, rocksDBCF.get(dk, key));

rocksDBCF.truncate();
Expand All @@ -130,7 +139,7 @@ public void testClose() throws RocksDBException
byte[] key = "test_key".getBytes();
byte[] value = encodeValue(cfs, "test_value");

rocksDBCF.merge(dk, key, value);
writeKeyValue(rocksDBCF, dk, key, value);

assertArrayEquals(value, rocksDBCF.get(dk, key));

Expand All @@ -157,9 +166,9 @@ public void testDumpPrefix() throws Exception
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
RocksDBCF rocksDBCF = RocksDBEngine.getRocksDBCF(cfs.metadata.cfId);

rocksDBCF.merge(dk, "test_key1".getBytes(), "test_value11".getBytes());
rocksDBCF.merge(dk, "test_key1".getBytes(), "test_value12".getBytes());
rocksDBCF.merge(dk, "test_key2".getBytes(), "test_value2".getBytes());
writeKeyValue(rocksDBCF, dk, "test_key1".getBytes(), "test_value11".getBytes());
writeKeyValue(rocksDBCF, dk, "test_key1".getBytes(), "test_value12".getBytes());
writeKeyValue(rocksDBCF, dk, "test_key2".getBytes(), "test_value2".getBytes());

String dump = rocksDBCF.dumpPrefix(dk, "test_key".getBytes(), Integer.MAX_VALUE);
assertEquals(2, dump.split("\n").length);
Expand Down

0 comments on commit 782a835

Please sign in to comment.