Skip to content

Commit

Permalink
Merge pull request #63 from tuplejump/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
trulite committed Jul 7, 2015
2 parents e9b6bf7 + 5b83460 commit 0e5427b
Show file tree
Hide file tree
Showing 63 changed files with 3,174 additions and 9,746 deletions.
20 changes: 5 additions & 15 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ apply plugin: 'eclipse'
apply plugin: 'maven'
apply plugin: 'signing'

version = '0.9.9'
version = '1.0.0'
group = 'com.tuplejump'

sourceCompatibility = 1.7
Expand Down Expand Up @@ -39,12 +39,12 @@ dependencies {
//provided dependencies - do not copy into dist
compile('org.codehaus.jackson:jackson-mapper-asl:1.9.2')
compile('org.codehaus.jackson:jackson-core-asl:1.9.2')
compile('com.google.guava:guava:15.0')
compile('com.google.guava:guava:16.0')
compile('org.apache.commons:commons-lang3:3.1')
compile('org.slf4j:slf4j-api:1.7.2')
compile('org.slf4j:slf4j-log4j12:1.7.2')

compile('org.apache.cassandra:cassandra-all:2.0.11') {
compile('org.apache.cassandra:cassandra-all:2.1.3') {
exclude group: 'asm'
exclude group: 'org.apache.mina'
exclude group: 'com.google.guava'
Expand All @@ -53,14 +53,14 @@ dependencies {
//testing dependencies
testCompile group: 'junit', name: 'junit', version: '4.11'

testCompile('com.datastax.cassandra:cassandra-driver-core:2.0.3') {
testCompile('com.datastax.cassandra:cassandra-driver-core:2.1.5') {
exclude group: 'org.apache.cassandra', module: 'cassandra-all'
exclude group: 'log4j'
exclude group: 'io.netty'
exclude group: 'com.google.guava'
}

testCompile('org.cassandraunit:cassandra-unit:2.0.2.0') {
testCompile('org.cassandraunit:cassandra-unit:2.0.2.2') {
exclude group: 'org.apache.cassandra', module: 'cassandra-all'
exclude group: 'log4j'
exclude group: 'org.sl4j'
Expand All @@ -72,16 +72,6 @@ dependencies {

}

test {
systemProperty "cluster", System.getProperty("cluster")
jvmArgs '-javaagent:ext/jamm-0.2.5.jar'
}

task perfTest(type: Test) << {
jvmArgs '-javaagent:ext/jamm-0.2.5.jar'
include 'com/tuplejump/perf/**'
}

task javadocJar(type: Jar) {
classifier = 'javadoc'
from javadoc
Expand Down
Binary file removed ext/jamm-0.2.5.jar
Binary file not shown.
4,413 changes: 0 additions & 4,413 deletions out2.txt

This file was deleted.

4,426 changes: 0 additions & 4,426 deletions out3.txt

This file was deleted.

83 changes: 41 additions & 42 deletions src/main/java/com/tuplejump/stargate/Fields.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,21 @@

package com.tuplejump.stargate;

import com.tuplejump.stargate.lucene.LuceneUtils;
import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BooleanType;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.lucene.document.*;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.util.BytesRef;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.Date;
import java.util.List;
import java.util.StringTokenizer;
import java.util.UUID;

import static com.tuplejump.stargate.lucene.Constants.*;
import static com.tuplejump.stargate.lucene.Constants.striped;

/**
* User: satya
Expand All @@ -46,35 +39,47 @@
*/
public class Fields {

public static final FieldType STRING_FIELD_TYPE = new FieldType();

static {
STRING_FIELD_TYPE.setIndexed(true);
STRING_FIELD_TYPE.setTokenized(false);
}

public static Field field(String name, AbstractType type, ByteBuffer byteBufferValue, FieldType fieldType) {
if (fieldType.docValueType() != null) {
if (fieldType.numericType() != null) return numericDocValuesField(name, type, byteBufferValue);
else return stringDocValuesField(name, type, byteBufferValue);
}
CQL3Type cqlType = type.asCQL3Type();
if (cqlType == CQL3Type.Native.INT) {
return new IntField(name, (Integer) type.compose(byteBufferValue), fieldType);
} else if (cqlType == CQL3Type.Native.VARINT || cqlType == CQL3Type.Native.BIGINT || cqlType == CQL3Type.Native.COUNTER) {
return new LongField(name, ((Number) type.compose(byteBufferValue)).longValue(), fieldType);
} else if (cqlType == CQL3Type.Native.DECIMAL || cqlType == CQL3Type.Native.DOUBLE) {
return new DoubleField(name, ((Number) type.compose(byteBufferValue)).doubleValue(), fieldType);
} else if (cqlType == CQL3Type.Native.FLOAT) {
return new FloatField(name, ((Number) type.compose(byteBufferValue)).floatValue(), fieldType);
} else if (cqlType == CQL3Type.Native.ASCII || cqlType == CQL3Type.Native.TEXT || cqlType == CQL3Type.Native.VARCHAR) {
return new Field(name, type.getString(byteBufferValue), fieldType);
} else if (cqlType == CQL3Type.Native.UUID) {
return new Field(name, type.getString(byteBufferValue), fieldType);
} else if (cqlType == CQL3Type.Native.TIMEUUID) {
//TimeUUID toString is not comparable. So we reorder to get a comparable value while searching
return new Field(name, reorderTimeUUId(type.getString(byteBufferValue)), fieldType);
} else if (cqlType == CQL3Type.Native.TIMESTAMP) {
return new LongField(name, ((Date) type.compose(byteBufferValue)).getTime(), fieldType);
} else if (cqlType == CQL3Type.Native.BOOLEAN) {
Boolean val = ((Boolean) type.compose(byteBufferValue));
return new Field(name, val.toString(), fieldType);
} else {
return new Field(name, toString(byteBufferValue, type), fieldType);
try {
Object value = type.compose(byteBufferValue);
if (cqlType == CQL3Type.Native.INT) {
return new IntField(name, (Integer) value, fieldType);
} else if (cqlType == CQL3Type.Native.VARINT || cqlType == CQL3Type.Native.BIGINT || cqlType == CQL3Type.Native.COUNTER) {
return new LongField(name, ((Number) value).longValue(), fieldType);
} else if (cqlType == CQL3Type.Native.DECIMAL || cqlType == CQL3Type.Native.DOUBLE) {
return new DoubleField(name, ((Number) value).doubleValue(), fieldType);
} else if (cqlType == CQL3Type.Native.FLOAT) {
return new FloatField(name, ((Number) value).floatValue(), fieldType);
} else if (cqlType == CQL3Type.Native.ASCII || cqlType == CQL3Type.Native.TEXT || cqlType == CQL3Type.Native.VARCHAR) {
return new Field(name, value.toString(), fieldType);
} else if (cqlType == CQL3Type.Native.UUID) {
return new Field(name, UUIDType.instance.getSerializer().toString((UUID) value), fieldType);
} else if (cqlType == CQL3Type.Native.TIMEUUID) {
//TimeUUID toString is not comparable. So we reorder to get a comparable value while searching
return new Field(name, reorderTimeUUId(TimeUUIDType.instance.getSerializer().toString((UUID) value)), fieldType);
} else if (cqlType == CQL3Type.Native.TIMESTAMP) {
return new LongField(name, ((Date) value).getTime(), fieldType);
} else if (cqlType == CQL3Type.Native.BOOLEAN) {
return new Field(name, value.toString(), fieldType);
} else {
return new Field(name, toString(byteBufferValue, type), fieldType);
}
} catch (MarshalException e) {
return new Field(name, "_null_", STRING_FIELD_TYPE);
} catch (IndexOutOfBoundsException e) {
return new Field(name, "_null_", STRING_FIELD_TYPE);
}
}

Expand Down Expand Up @@ -116,12 +121,6 @@ public static ByteBuffer defaultValue(AbstractType type, boolean min) {
return ByteBufferUtil.bytes(0l);
} else if (cqlType == CQL3Type.Native.BOOLEAN) {
return BooleanType.instance.decompose(min ? false : true);
} else if (type.isCollection()) {
CollectionType collectionType = (CollectionType) type;
List<Pair<ByteBuffer, Column>> collection = new ArrayList<>();
ByteBuffer dummyColumn = defaultValue(collectionType.nameComparator());
collection.add(Pair.create(dummyColumn, new Column(dummyColumn, defaultValue(collectionType.valueComparator(), min))));
return collectionType.serialize(collection);
} else {
return ByteBufferUtil.EMPTY_BYTE_BUFFER;
}
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/com/tuplejump/stargate/IndexContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.search.IndexSearcher;
import org.mapdb.Atomic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,6 +42,7 @@
* User: satya
*/
public class IndexContainer {
public static final String INDEX_RECORDS = "index-num-records";
protected static final Logger logger = LoggerFactory.getLogger(RowIndex.class);
static ExecutorService executorService = Executors.newFixedThreadPool(10);
Map<Range<Token>, Indexer> indexers = new HashMap<>();
Expand All @@ -65,7 +67,9 @@ public void updateIndexers(Collection<Range<Token>> ranges) {
if (indexers.isEmpty()) {
logger.warn("Adding VNode indexers");
for (Range<Token> range : ranges) {
Indexer indexer = new BasicIndexer(analyzer, keyspace, cf, indexName, range.left.toString());
String rangeStr = range.left.toString();
Atomic.Long records = Stargate.getInstance().getAtomicLong(INDEX_RECORDS + "-" + indexName + "-" + rangeStr);
Indexer indexer = new BasicIndexer(records, analyzer, keyspace, cf, indexName, rangeStr);
indexers.put(range, indexer);
logger.warn("Added VNode indexers for range {}", range);
}
Expand Down Expand Up @@ -159,6 +163,15 @@ public long liveSize() {
return size;
}

public long rowCount() {
long size = 0;
for (Indexer indexer : indexers.values()) {
size += (indexer == null) ? 0 : indexer.approxRowCount();
}
return size;

}

public void remove() {
for (Indexer indexer : indexers.values()) {
if (indexer != null) {
Expand Down
23 changes: 18 additions & 5 deletions src/main/java/com/tuplejump/stargate/IndexEntryEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package com.tuplejump.stargate;

import org.apache.cassandra.db.AtomicBTreeColumns;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnSerializer;
import org.apache.cassandra.db.TreeMapBackedSortedColumns;
import org.apache.cassandra.io.util.AbstractDataOutput;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.mapdb.Serializer;
Expand Down Expand Up @@ -46,19 +48,30 @@ public IndexEntryEvent(Type type, ByteBuffer rowKey, ColumnFamily columnFamily)
public static class IndexEntryEventSerializer implements Serializer<IndexEntryEvent>, Serializable {

@Override
public void serialize(DataOutput appender, IndexEntryEvent value) throws IOException {
public void serialize(final DataOutput appender, IndexEntryEvent value) throws IOException {
appender.writeByte(value.type.getByte());
appender.writeInt(value.rowKey.remaining());
ByteBufferUtil.write(value.rowKey, appender);
ColumnFamily.serializer.serialize(value.columnFamily, appender, MessagingService.current_version);
Utils.write(value.rowKey, appender);
DataOutputPlus dataOutputPlus = new AbstractDataOutput() {
@Override
public void write(byte[] buffer, int offset, int count) throws IOException {
appender.write(buffer, offset, count);
}

@Override
public void write(int oneByte) throws IOException {
appender.write(oneByte);
}
};
ColumnFamily.serializer.serialize(value.columnFamily, dataOutputPlus, MessagingService.current_version);
}

@Override
public IndexEntryEvent deserialize(DataInput in, int available) throws IOException {
Type type = Type.fromByte(in.readByte());
int rowKeyLength = in.readInt();
ByteBuffer rowKeyBuffer = ByteBufferUtil.read(in, rowKeyLength);
ColumnFamily columnFamily = ColumnFamily.serializer.deserialize(in, TreeMapBackedSortedColumns.factory, ColumnSerializer.Flag.LOCAL, MessagingService.current_version);
ColumnFamily columnFamily = ColumnFamily.serializer.deserialize(in, AtomicBTreeColumns.factory, ColumnSerializer.Flag.LOCAL, MessagingService.current_version);
return new IndexEntryEvent(type, rowKeyBuffer, columnFamily);
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/tuplejump/stargate/IndexingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public IndexingService(Atomic.Long reads) {
}

public void register(RowIndexSupport rowIndexSupport) {
this.support.put(rowIndexSupport.getCFMetaData().cfName, rowIndexSupport);
this.support.put(rowIndexSupport.tableMapper.cfMetaData.cfName, rowIndexSupport);
}

public void index(IndexEntryEvent entryEvent) {
Expand Down
Loading

0 comments on commit 0e5427b

Please sign in to comment.