Skip to content
This repository has been archived by the owner on Sep 28, 2023. It is now read-only.

Use atomic batching of writes #4

Open
wants to merge 1 commit into
base: rocks_3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions src/java/org/apache/cassandra/rocksdb/RocksDBCF.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Hex;

import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.CassandraCompactionFilter;
import org.rocksdb.CassandraValueMergeOperator;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionPriority;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.Env;
import org.rocksdb.FlushOptions;
Expand All @@ -75,6 +75,7 @@
import org.rocksdb.RocksDBException;
import org.rocksdb.Statistics;
import org.rocksdb.StatsLevel;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

import static org.apache.cassandra.rocksdb.RocksDBConfigs.MERGE_OPERANDS_LIMIT;
Expand All @@ -98,7 +99,6 @@ public class RocksDBCF implements RocksDBCFMBean
private final CassandraValueMergeOperator mergeOperator;

private final ReadOptions readOptions;
private final WriteOptions disableWAL;
private final FlushOptions flushOptions;

private final int gcGraceSeconds;
Expand Down Expand Up @@ -152,7 +152,6 @@ public RocksDBCF(ColumnFamilyStore cfs) throws RocksDBException
// until compaction happens. However in our case, range deletion is only used to remove ranges
// no longer owned by this node. In such case, stale keys would never be quried.
readOptions = new ReadOptions().setIgnoreRangeDeletions(true);
disableWAL = new WriteOptions().setDisableWAL(true);
flushOptions = new FlushOptions().setWaitForFlush(true);

// Register the mbean.
Expand Down Expand Up @@ -309,17 +308,14 @@ public RocksDBIteratorAdapter newShardIterator(int shardId, ReadOptions options)
return new RocksDBIteratorAdapter(rocksDB.newIterator(options), rocksMetrics);
}

public void merge(DecoratedKey partitionKey, byte[] key, byte[] value) throws RocksDBException
public void write(DecoratedKey partitionKey, WriteBatch batch, boolean writeCommitLog) throws RocksDBException
{
RocksDB rocksDB = getRocksDBFromKey(partitionKey);
if (RocksDBConfigs.DISABLE_WRITE_TO_COMMITLOG)
{
rocksDB.merge(disableWAL, key, value);
}
else
{
rocksDB.merge(key, value);
WriteOptions options = new WriteOptions();
if(!writeCommitLog) {
options.setDisableWAL(true);
}

getRocksDBFromKey(partitionKey).write(options, batch);
}

public void deleteRange(byte[] start, byte[] end) throws RocksDBException
Expand Down
92 changes: 47 additions & 45 deletions src/java/org/apache/cassandra/rocksdb/RocksDBEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFutureTask;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -114,20 +115,64 @@ public void openColumnFamilyStore(ColumnFamilyStore cfs)

public void apply(ColumnFamilyStore cfs, PartitionUpdate update, UpdateTransaction indexer, boolean writeCommitLog)
{
//TODO: WriteBatch takes a hint for how many bytes to reserve that we might want to use.
WriteBatch batch = new WriteBatch();
DecoratedKey partitionKey = update.partitionKey();

for (Row row : update)
{
applyRowToRocksDB(cfs, writeCommitLog, partitionKey, indexer, row);
addWriteToBatch(batch, cfs, partitionKey, row, indexer);
}

Row staticRow = update.staticRow();
if (!staticRow.isEmpty())
{
applyRowToRocksDB(cfs, writeCommitLog, partitionKey, indexer, staticRow);
addWriteToBatch(batch, cfs, partitionKey, staticRow, indexer);
}

RocksDBCF dbcf = rocksDBFamily.get(cfs.metadata.cfId);
try {
dbcf.write(partitionKey, batch, writeCommitLog);
}
catch (RocksDBException e)
{
logger.error(e.toString(), e);
} finally {
indexer.commit();
}
}

private void addWriteToBatch(WriteBatch b,
ColumnFamilyStore cfs,
DecoratedKey partitionKey,
Row row,
UpdateTransaction indexer)
{

Clustering clustering = row.clustering();

byte[] rocksDBKey = RowKeyEncoder.encode(partitionKey, clustering, cfs.metadata);
byte[] rocksDBValue = RowValueEncoder.encode(cfs.metadata, row);

b.merge(rocksDBKey, rocksDBValue);

if (indexer != UpdateTransaction.NO_OP)
{
try
{
secondaryIndexMetrics.rsiTotalInsertions.inc();
indexer.onInserted(row);
}
catch (RuntimeException e)
{
secondaryIndexMetrics.rsiInsertionFailures.inc();
logger.error(e.toString(), e);
throw new StorageEngineException("Index update failed", e);
}
}
}


public UnfilteredRowIterator queryStorage(ColumnFamilyStore cfs, SinglePartitionReadCommand readCommand)
{
Partition partition = new RocksDBPartition(rocksDBFamily.get(cfs.metadata.cfId),
Expand Down Expand Up @@ -221,49 +266,6 @@ public AbstractStreamReceiveTask getStreamReceiveTask(StreamSession session, Str
return new RocksDBStreamReceiveTask(session, summary.cfId, summary.files, summary.totalSize);
}

private void applyRowToRocksDB(ColumnFamilyStore cfs,
boolean writeCommitLog,
DecoratedKey partitionKey,
UpdateTransaction indexer,
Row row)
{

Clustering clustering = row.clustering();

byte[] rocksDBKey = RowKeyEncoder.encode(partitionKey, clustering, cfs.metadata);
byte[] rocksDBValue = RowValueEncoder.encode(cfs.metadata, row);

try
{
indexer.start();
rocksDBFamily.get(cfs.metadata.cfId).merge(partitionKey, rocksDBKey, rocksDBValue);
if (indexer != UpdateTransaction.NO_OP)
{
try
{
secondaryIndexMetrics.rsiTotalInsertions.inc();
indexer.onInserted(row);
}
catch (RuntimeException e)
{
secondaryIndexMetrics.rsiInsertionFailures.inc();
logger.error(e.toString(), e);
throw new StorageEngineException("Index update failed", e);
}

}
}
catch (RocksDBException e)
{
logger.error(e.toString(), e);
throw new StorageEngineException("Row merge failed", e);
}
finally
{
indexer.commit();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this. Why do we commit the indexer when we've experienced an exception?

}
}

public static RocksDBCF getRocksDBCF(UUID cfId)
{
ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(cfId);
Expand Down
29 changes: 19 additions & 10 deletions test/unit/org/apache/cassandra/rocksdb/RocksDBCFTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import org.apache.cassandra.rocksdb.encoding.value.RowValueEncoder;
import org.apache.cassandra.utils.Hex;
import org.rocksdb.IndexType;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand All @@ -47,6 +49,13 @@ public class RocksDBCFTest extends RocksDBTestBase
{
final DecoratedKey dk = Util.dk("test_key");

public void writeKeyValue(RocksDBCF cf, DecoratedKey partitionKey, byte[] key, byte[] value) throws RocksDBException
{
WriteBatch batch = new WriteBatch();
batch.merge(key, value);
cf.write(partitionKey, batch, true);
}

@Test
public void testMerge() throws RocksDBException
{
Expand All @@ -57,7 +66,7 @@ public void testMerge() throws RocksDBException
RocksDBCF rocksDBCF = RocksDBEngine.getRocksDBCF(cfs.metadata.cfId);
byte[] key = "test_key".getBytes();
byte[] value = encodeValue(cfs, "test_value");
rocksDBCF.merge(dk, key, value);
writeKeyValue(rocksDBCF, dk, key, value);
assertArrayEquals(value, rocksDBCF.get(dk, key));
}

Expand All @@ -84,10 +93,10 @@ public void testDeleteRange() throws RocksDBException
byte[] d = "d".getBytes();
byte[] value = encodeValue(cfs, "test_value");

rocksDBCF.merge(dk, a, value);
rocksDBCF.merge(dk, b, value);
rocksDBCF.merge(dk, c, value);
rocksDBCF.merge(dk, d, value);
writeKeyValue(rocksDBCF, dk, a, value);
writeKeyValue(rocksDBCF, dk, b, value);
writeKeyValue(rocksDBCF, dk, c, value);
writeKeyValue(rocksDBCF, dk, d, value);

rocksDBCF.deleteRange(b, d);
rocksDBCF.compactRange();
Expand All @@ -109,7 +118,7 @@ public void testTruncate() throws RocksDBException
byte[] key = "test_key".getBytes();
byte[] value = encodeValue(cfs, "test_value");

rocksDBCF.merge(dk, key, value);
writeKeyValue(rocksDBCF, dk, key, value);
assertArrayEquals(value, rocksDBCF.get(dk, key));

rocksDBCF.truncate();
Expand All @@ -130,7 +139,7 @@ public void testClose() throws RocksDBException
byte[] key = "test_key".getBytes();
byte[] value = encodeValue(cfs, "test_value");

rocksDBCF.merge(dk, key, value);
writeKeyValue(rocksDBCF, dk, key, value);

assertArrayEquals(value, rocksDBCF.get(dk, key));

Expand All @@ -157,9 +166,9 @@ public void testDumpPrefix() throws Exception
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
RocksDBCF rocksDBCF = RocksDBEngine.getRocksDBCF(cfs.metadata.cfId);

rocksDBCF.merge(dk, "test_key1".getBytes(), "test_value11".getBytes());
rocksDBCF.merge(dk, "test_key1".getBytes(), "test_value12".getBytes());
rocksDBCF.merge(dk, "test_key2".getBytes(), "test_value2".getBytes());
writeKeyValue(rocksDBCF, dk, "test_key1".getBytes(), "test_value11".getBytes());
writeKeyValue(rocksDBCF, dk, "test_key1".getBytes(), "test_value12".getBytes());
writeKeyValue(rocksDBCF, dk, "test_key2".getBytes(), "test_value2".getBytes());

String dump = rocksDBCF.dumpPrefix(dk, "test_key".getBytes(), Integer.MAX_VALUE);
assertEquals(2, dump.split("\n").length);
Expand Down