From 8b001073b02294014ff9287e0e291121a24f6b3e Mon Sep 17 00:00:00 2001 From: Michael Rappazzo Date: Mon, 4 Feb 2019 13:56:26 -0500 Subject: [PATCH] properties: upgrade HBaseVertex to support multi-properties The HBase graph tinkerpop implementation specifies Cardinality.single as the default. Since HBase does not (at the time of this writing) support multiple values, this feature wraps the multi-values in a list and writes the list to HBase. As a consequence of this list wrapper, indexed properties can still only support-single value properties. When reading/writing from HBase, most properties will be written as single objects. Only when a property is specifically added with non-single cardinality will it be written to HBase as a list. In order to avoid java native serialization, the serialization of non-single property values uses Kryo (with a small lightweight wrapper class to accomplish this). --- .../java/io/hgraphdb/HBaseBulkLoader.java | 19 +- src/main/java/io/hgraphdb/HBaseEdge.java | 124 +++++++++- src/main/java/io/hgraphdb/HBaseElement.java | 119 +-------- .../io/hgraphdb/HBaseElementSerializer.java | 12 +- src/main/java/io/hgraphdb/HBaseGraph.java | 13 +- .../java/io/hgraphdb/HBaseGraphUtils.java | 45 +++- src/main/java/io/hgraphdb/HBaseProperty.java | 7 +- src/main/java/io/hgraphdb/HBaseVertex.java | 230 ++++++++++++++++-- .../java/io/hgraphdb/HBaseVertexProperty.java | 2 +- .../java/io/hgraphdb/PropertyValueList.java | 45 ++++ .../io/hgraphdb/models/VertexIndexModel.java | 59 ++++- .../io/hgraphdb/mutators/VertexWriter.java | 2 +- .../io/hgraphdb/readers/VertexReader.java | 28 ++- .../java/io/hgraphdb/HBaseElementTest.java | 21 ++ .../hgraphdb/giraph/HBaseWriteableTest.java | 10 +- 15 files changed, 564 insertions(+), 172 deletions(-) create mode 100644 src/main/java/io/hgraphdb/PropertyValueList.java diff --git a/src/main/java/io/hgraphdb/HBaseBulkLoader.java b/src/main/java/io/hgraphdb/HBaseBulkLoader.java index 3a6462f..cb0c8d6 100644 --- a/src/main/java/io/hgraphdb/HBaseBulkLoader.java +++ b/src/main/java/io/hgraphdb/HBaseBulkLoader.java @@ -1,6 +1,14 @@ package io.hgraphdb; -import io.hgraphdb.mutators.*; +import io.hgraphdb.mutators.Creator; +import io.hgraphdb.mutators.EdgeIndexRemover; +import io.hgraphdb.mutators.EdgeIndexWriter; +import io.hgraphdb.mutators.EdgeWriter; +import io.hgraphdb.mutators.PropertyWriter; +import io.hgraphdb.mutators.VertexIndexRemover; +import io.hgraphdb.mutators.VertexIndexWriter; +import io.hgraphdb.mutators.VertexWriter; + import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.BufferedMutatorParams; @@ -84,7 +92,7 @@ public Vertex addVertex(final Object... keyValues) { idValue = HBaseGraphUtils.generateIdIfNeeded(idValue); long now = System.currentTimeMillis(); HBaseVertex vertex = new HBaseVertex(graph, idValue, label, now, now, - HBaseGraphUtils.propertiesToMap(keyValues)); + HBaseGraphUtils.propertiesToMultimap(keyValues)); vertex.validate(); Iterator indices = vertex.getIndices(OperationType.WRITE); @@ -192,14 +200,14 @@ public void setProperty(Vertex vertex, String key, Object value) { boolean hasIndex = v.hasIndex(OperationType.WRITE, key); if (hasIndex) { // only load old value if using index - oldValue = v.getProperty(key); + oldValue = v.getSingleProperty(key).orElse(null); if (oldValue != null && !oldValue.equals(value)) { VertexIndexRemover indexRemover = new VertexIndexRemover(graph, v, key, null); if (vertexIndicesMutator != null) vertexIndicesMutator.mutate(getMutationList(indexRemover.constructMutations())); } } - v.getProperties().put(key, value); + v.cacheProperty(key, value); v.updatedAt(System.currentTimeMillis()); if (hasIndex) { @@ -220,7 +228,8 @@ private List getMutationList(Iterator mu m -> m.setDurability(skipWAL ? Durability.SKIP_WAL : Durability.USE_DEFAULT))); } - public void close() { + @Override + public void close() { try { if (edgesMutator != null) edgesMutator.close(); if (edgeIndicesMutator != null) edgeIndicesMutator.close(); diff --git a/src/main/java/io/hgraphdb/HBaseEdge.java b/src/main/java/io/hgraphdb/HBaseEdge.java index 0f475d7..a740ea5 100644 --- a/src/main/java/io/hgraphdb/HBaseEdge.java +++ b/src/main/java/io/hgraphdb/HBaseEdge.java @@ -2,6 +2,9 @@ import io.hgraphdb.models.EdgeIndexModel; import io.hgraphdb.models.EdgeModel; +import io.hgraphdb.mutators.Mutator; +import io.hgraphdb.mutators.Mutators; + import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Property; @@ -12,8 +15,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.Iterator; import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Stream; public class HBaseEdge extends HBaseElement implements Edge { @@ -21,6 +29,8 @@ public class HBaseEdge extends HBaseElement implements Edge { private Vertex inVertex; private Vertex outVertex; + protected Map properties; + protected transient boolean propertiesFullyLoaded; public HBaseEdge(HBaseGraph graph, Object id) { this(graph, id, null, null, null, null, null, null); @@ -37,10 +47,12 @@ public HBaseEdge(HBaseGraph graph, Object id, String label, Long createdAt, Long public HBaseEdge(HBaseGraph graph, Object id, String label, Long createdAt, Long updatedAt, Map properties, boolean propertiesFullyLoaded, Vertex inVertex, Vertex outVertex) { - super(graph, id, label, createdAt, updatedAt, properties, propertiesFullyLoaded); + super(graph, id, label, createdAt, updatedAt); this.inVertex = inVertex; this.outVertex = outVertex; + this.properties = properties; + this.propertiesFullyLoaded = propertiesFullyLoaded; } @Override @@ -55,6 +67,14 @@ public ElementType getElementType() { return ElementType.EDGE; } + public Map getProperties() { + if (properties == null || !propertiesFullyLoaded) { + load(); + propertiesFullyLoaded = true; + } + return properties; + } + @Override public void copyFrom(HBaseElement element) { super.copyFrom(element); @@ -62,6 +82,11 @@ public void copyFrom(HBaseElement element) { HBaseEdge copy = (HBaseEdge) element; if (copy.inVertex != null) this.inVertex = copy.inVertex; if (copy.outVertex != null) this.outVertex = copy.outVertex; + if (copy.properties != null + && (copy.propertiesFullyLoaded || this.properties == null)) { + this.properties = new ConcurrentHashMap<>(copy.properties); + this.propertiesFullyLoaded = copy.propertiesFullyLoaded; + } } } @@ -100,6 +125,101 @@ public Vertex getVertex(Direction direction) throws IllegalArgumentException { return Direction.IN.equals(direction) ? inVertex : outVertex; } + public void setProperty(String key, Object value) { + ElementHelper.validateProperty(key, value); + + graph.validateProperty(getElementType(), label, key, value); + + // delete from index model before setting property + Object oldValue = null; + boolean hasIndex = hasIndex(OperationType.WRITE, key); + if (hasIndex) { + // only load old value if using index + oldValue = getProperty(key); + if (oldValue != null && !oldValue.equals(value)) { + deleteFromIndexModel(key, null); + } + } + + getProperties().put(key, value); + updatedAt(System.currentTimeMillis()); + + if (hasIndex) { + if (oldValue == null || !oldValue.equals(value)) { + writeToIndexModel(key); + } + } + Mutator writer = getModel().writeProperty(this, key, value); + Mutators.write(getTable(), writer); + } + + public void incrementProperty(String key, long value) { + if (!graph.configuration().getUseSchema()) { + throw new HBaseGraphNoSchemaException("Schema not enabled"); + } + ElementHelper.validateProperty(key, value); + + graph.validateProperty(getElementType(), label, key, value); + + updatedAt(System.currentTimeMillis()); + + Mutator writer = getModel().incrementProperty(this, key, value); + long newValue = Mutators.increment(getTable(), writer, key); + getProperties().put(key, newValue); + } + + @Override + public Set keys() { + return Collections.unmodifiableSet(properties.keySet()); + } + + public void removeProperty(String key) { + Object value = getProperty(key); + if (value != null) { + // delete from index model before removing property + boolean hasIndex = hasIndex(OperationType.WRITE, key); + if (hasIndex) { + deleteFromIndexModel(key, null); + } + + getProperties().remove(key); + updatedAt(System.currentTimeMillis()); + + Mutator writer = getModel().clearProperty(this, key); + Mutators.write(getTable(), writer); + } + } + + @Override + public Stream> propertyEntriesStream() { + return properties.entrySet().stream(); + } + + @Override + public int propertySize() { + return this.properties.size(); + } + + @SuppressWarnings("unchecked") + public V getProperty(String key) { + if (properties != null) { + // optimization for partially loaded properties + V val = (V) properties.get(key); + if (val != null) return val; + } + return (V) getProperties().get(key); + } + + @Override + public boolean hasProperty(String key) { + if (properties != null) { + // optimization for partially loaded properties + Object val = properties.get(key); + if (val != null) return true; + } + return keys().contains(key); + } + @Override public void remove() { // Get rid of the endpoints and edge themselves. @@ -115,7 +235,7 @@ public void remove() { @Override public Iterator> properties(final String... propertyKeys) { - Iterable keys = getPropertyKeys(); + Iterable keys = keys(); Iterator filter = IteratorUtils.filter(keys.iterator(), key -> ElementHelper.keyExists(key, propertyKeys)); return IteratorUtils.map(filter, diff --git a/src/main/java/io/hgraphdb/HBaseElement.java b/src/main/java/io/hgraphdb/HBaseElement.java index ae3b63b..7182e83 100644 --- a/src/main/java/io/hgraphdb/HBaseElement.java +++ b/src/main/java/io/hgraphdb/HBaseElement.java @@ -2,8 +2,7 @@ import io.hgraphdb.models.BaseModel; import io.hgraphdb.models.ElementModel; -import io.hgraphdb.mutators.Mutator; -import io.hgraphdb.mutators.Mutators; + import org.apache.hadoop.hbase.client.Table; import org.apache.tinkerpop.gremlin.structure.Element; import org.apache.tinkerpop.gremlin.structure.Graph; @@ -11,11 +10,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; import java.util.Iterator; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Stream; public abstract class HBaseElement implements Element { @@ -26,8 +23,6 @@ public abstract class HBaseElement implements Element { protected String label; protected Long createdAt; protected Long updatedAt; - protected Map properties; - protected transient boolean propertiesFullyLoaded; protected transient IndexMetadata.Key indexKey; protected transient long indexTs; protected transient boolean isCached; @@ -37,16 +32,12 @@ protected HBaseElement(HBaseGraph graph, Object id, String label, Long createdAt, - Long updatedAt, - Map properties, - boolean propertiesFullyLoaded) { + Long updatedAt) { this.graph = graph; this.id = id; this.label = label; this.createdAt = createdAt; this.updatedAt = updatedAt; - this.properties = properties; - this.propertiesFullyLoaded = propertiesFullyLoaded; } public abstract void validate(); @@ -103,117 +94,21 @@ public void setDeleted(boolean isDeleted) { this.isDeleted = isDeleted; } - public Map getProperties() { - if (properties == null || !propertiesFullyLoaded) { - load(); - propertiesFullyLoaded = true; - } - return properties; - } - public void copyFrom(HBaseElement element) { if (element.label != null) this.label = element.label; if (element.createdAt != null) this.createdAt = element.createdAt; if (element.updatedAt != null) this.updatedAt = element.updatedAt; - if (element.properties != null - && (element.propertiesFullyLoaded || this.properties == null)) { - this.properties = new ConcurrentHashMap<>(element.properties); - this.propertiesFullyLoaded = element.propertiesFullyLoaded; - } } public void load() { getModel().load(this); } - @SuppressWarnings("unchecked") - public V getProperty(String key) { - if (properties != null) { - // optimization for partially loaded properties - V val = (V) properties.get(key); - if (val != null) return val; - } - return (V) getProperties().get(key); - } - - public boolean hasProperty(String key) { - if (properties != null) { - // optimization for partially loaded properties - Object val = properties.get(key); - if (val != null) return true; - } - return keys().contains(key); - } - - @Override - public Set keys() { - return getPropertyKeys(); - } - - public Set getPropertyKeys() { - return new HashSet<>(getProperties().keySet()); - } - - public void setProperty(String key, Object value) { - ElementHelper.validateProperty(key, value); + public abstract boolean hasProperty(String key); - graph.validateProperty(getElementType(), label, key, value); + public abstract Stream> propertyEntriesStream(); - // delete from index model before setting property - Object oldValue = null; - boolean hasIndex = hasIndex(OperationType.WRITE, key); - if (hasIndex) { - // only load old value if using index - oldValue = getProperty(key); - if (oldValue != null && !oldValue.equals(value)) { - deleteFromIndexModel(key, null); - } - } - - getProperties().put(key, value); - updatedAt(System.currentTimeMillis()); - - if (hasIndex) { - if (oldValue == null || !oldValue.equals(value)) { - writeToIndexModel(key); - } - } - Mutator writer = getModel().writeProperty(this, key, value); - Mutators.write(getTable(), writer); - } - - public void incrementProperty(String key, long value) { - if (!graph.configuration().getUseSchema()) { - throw new HBaseGraphNoSchemaException("Schema not enabled"); - } - ElementHelper.validateProperty(key, value); - - graph.validateProperty(getElementType(), label, key, value); - - updatedAt(System.currentTimeMillis()); - - Mutator writer = getModel().incrementProperty(this, key, value); - long newValue = Mutators.increment(getTable(), writer, key); - getProperties().put(key, newValue); - } - - public V removeProperty(String key) { - V value = getProperty(key); - if (value != null) { - // delete from index model before removing property - boolean hasIndex = hasIndex(OperationType.WRITE, key); - if (hasIndex) { - deleteFromIndexModel(key, null); - } - - getProperties().remove(key); - updatedAt(System.currentTimeMillis()); - - Mutator writer = getModel().clearProperty(this, key); - Mutators.write(getTable(), writer); - } - return value; - } + public abstract int propertySize(); @Override public String label() { @@ -240,7 +135,7 @@ public boolean hasIndex(OperationType op, String propertyKey) { } public Iterator getIndices(OperationType op) { - return graph.getIndices(op, getElementType(), label, getPropertyKeys()); + return graph.getIndices(op, getElementType(), label, keys()); } public abstract ElementModel getModel(); diff --git a/src/main/java/io/hgraphdb/HBaseElementSerializer.java b/src/main/java/io/hgraphdb/HBaseElementSerializer.java index 8038ba5..3287ed7 100644 --- a/src/main/java/io/hgraphdb/HBaseElementSerializer.java +++ b/src/main/java/io/hgraphdb/HBaseElementSerializer.java @@ -8,19 +8,20 @@ import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.Map; +import java.util.stream.Stream; public class HBaseElementSerializer extends Serializer { - public void write(Kryo kryo, Output output, E element) { + @Override + public void write(Kryo kryo, Output output, E element) { byte[] idBytes = ValueUtils.serialize(element.id()); output.writeInt(idBytes.length); output.writeBytes(idBytes); output.writeString(element.label()); output.writeLong(element.createdAt()); output.writeLong(element.updatedAt()); - Map properties = element.getProperties(); - output.writeInt(properties.size()); - properties.entrySet().forEach(entry -> { + output.writeInt(element.propertySize()); + element.propertyEntriesStream().forEach(entry -> { output.writeString(entry.getKey()); byte[] bytes = ValueUtils.serialize(entry.getValue()); output.writeInt(bytes.length); @@ -28,7 +29,8 @@ public void write(Kryo kryo, Output output, E element) { }); } - public E read(Kryo kryo, Input input, Class type) { + @Override + public E read(Kryo kryo, Input input, Class type) { int idBytesLen = input.readInt(); Object id = ValueUtils.deserialize(input.readBytes(idBytesLen)); String label = input.readString(); diff --git a/src/main/java/io/hgraphdb/HBaseGraph.java b/src/main/java/io/hgraphdb/HBaseGraph.java index b8fd906..ca0ef09 100644 --- a/src/main/java/io/hgraphdb/HBaseGraph.java +++ b/src/main/java/io/hgraphdb/HBaseGraph.java @@ -226,7 +226,7 @@ public Vertex addVertex(final Object... keyValues) { idValue = HBaseGraphUtils.generateIdIfNeeded(idValue); long now = System.currentTimeMillis(); - HBaseVertex newVertex = new HBaseVertex(this, idValue, label, now, now, HBaseGraphUtils.propertiesToMap(keyValues)); + HBaseVertex newVertex = new HBaseVertex(this, idValue, label, now, now, HBaseGraphUtils.propertiesToMultimap(keyValues)); newVertex.validate(); newVertex.writeToIndexModel(); newVertex.writeToModel(); @@ -677,21 +677,22 @@ public void validateEdge(String label, Object id, Map properties throw new HBaseGraphNotValidException("Edge label '" + label + "' has not been connected with inVertex '" + inVertex.label() + "' and outVertex '" + outVertex.label() + "'"); } - validateTypes(labelMetadata, id, properties); + validateTypes(labelMetadata, id, properties.entrySet().stream()); } - public void validateVertex(String label, Object id, Map properties) { + public void validateVertex(String label, Object id, Map> properties) { if (!configuration().getUseSchema() || label == null) return; LabelMetadata labelMetadata = getLabel(ElementType.VERTEX, label); - validateTypes(labelMetadata, id, properties); + validateTypes(labelMetadata, id, properties.entrySet().stream() + .flatMap(e -> e.getValue().stream().map(o -> new AbstractMap.SimpleEntry<>(e.getKey(), o)))); } - private void validateTypes(LabelMetadata labelMetadata, Object id, Map properties) { + private void validateTypes(LabelMetadata labelMetadata, Object id, Stream> properties) { ValueType idType = labelMetadata.idType(); if (idType != ValueType.ANY && idType != ValueUtils.getValueType(id)) { throw new HBaseGraphNotValidException("ID '" + id + "' not of type " + idType); } - properties.entrySet().forEach(entry -> + properties.forEach(entry -> getPropertyType(labelMetadata, entry.getKey(), entry.getValue(), true) ); } diff --git a/src/main/java/io/hgraphdb/HBaseGraphUtils.java b/src/main/java/io/hgraphdb/HBaseGraphUtils.java index 77e16e8..4e1aea6 100644 --- a/src/main/java/io/hgraphdb/HBaseGraphUtils.java +++ b/src/main/java/io/hgraphdb/HBaseGraphUtils.java @@ -1,8 +1,23 @@ package io.hgraphdb; +import static io.hgraphdb.Constants.DEFAULT_FAMILY; +import static io.hgraphdb.HBaseGraphConfiguration.Keys.HBASE_CLIENT_KERBEROS_PRINCIPAL; +import static io.hgraphdb.HBaseGraphConfiguration.Keys.HBASE_CLIENT_KEYTAB_FILE; +import static io.hgraphdb.HBaseGraphConfiguration.Keys.HBASE_SECURITY_AUTHENTICATION; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.NamespaceNotFoundException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.mock.MockConnectionFactory; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; @@ -18,13 +33,17 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; -import static io.hgraphdb.Constants.DEFAULT_FAMILY; -import static io.hgraphdb.HBaseGraphConfiguration.Keys.*; - public final class HBaseGraphUtils { private static final Logger LOGGER = LoggerFactory.getLogger(HBaseGraphUtils.class); @@ -246,6 +265,20 @@ public static Map propertiesToMap(Object... keyValues) { return props; } + public static Map> propertiesToMultimap(Object... keyValues) { + Map> props = new HashMap<>(); + for (int i = 0; i < keyValues.length; i = i + 2) { + Object key = keyValues[i]; + if (key.equals(T.id) || key.equals(T.label)) continue; + String keyStr = key.toString(); + Object value = keyValues[i + 1]; + ElementHelper.validateProperty(keyStr, value); + Collection collection = props.computeIfAbsent(keyStr, k -> new LinkedList<>()); + collection.add(value); + } + return props; + } + public static Map propertyKeysAndTypesToMap(Object... keyTypes) { Map props = new HashMap<>(); for (int i = 0; i < keyTypes.length; i = i + 2) { diff --git a/src/main/java/io/hgraphdb/HBaseProperty.java b/src/main/java/io/hgraphdb/HBaseProperty.java index c853deb..fa649a2 100644 --- a/src/main/java/io/hgraphdb/HBaseProperty.java +++ b/src/main/java/io/hgraphdb/HBaseProperty.java @@ -1,5 +1,6 @@ package io.hgraphdb; +import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Element; import org.apache.tinkerpop.gremlin.structure.Property; import org.apache.tinkerpop.gremlin.structure.util.ElementHelper; @@ -26,7 +27,11 @@ public Element element() { @Override public void remove() { - element.removeProperty(this.key); + if (this.element instanceof Edge) { + ((HBaseEdge)this.element).removeProperty(this.key); + } else { + ((HBaseVertex)this.element).removeProperty(this.key, this.value); + } } @Override diff --git a/src/main/java/io/hgraphdb/HBaseVertex.java b/src/main/java/io/hgraphdb/HBaseVertex.java index 10152d5..3396402 100644 --- a/src/main/java/io/hgraphdb/HBaseVertex.java +++ b/src/main/java/io/hgraphdb/HBaseVertex.java @@ -2,8 +2,14 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.MapMaker; + import io.hgraphdb.models.VertexIndexModel; import io.hgraphdb.models.VertexModel; +import io.hgraphdb.mutators.Mutator; +import io.hgraphdb.mutators.Mutators; + import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Graph; @@ -16,35 +22,61 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; public class HBaseVertex extends HBaseElement implements Vertex { private static final Logger LOGGER = LoggerFactory.getLogger(HBaseVertex.class); private transient Cache> edgeCache; + protected Map> properties; + protected transient boolean propertiesFullyLoaded; public HBaseVertex(HBaseGraph graph, Object id) { this(graph, id, null, null, null, null, false); } - public HBaseVertex(HBaseGraph graph, Object id, String label, Long createdAt, Long updatedAt, Map properties) { + public HBaseVertex(HBaseGraph graph, Object id, String label, Long createdAt, Long updatedAt, Map> properties) { this(graph, id, label, createdAt, updatedAt, properties, properties != null); } public HBaseVertex(HBaseGraph graph, Object id, String label, Long createdAt, Long updatedAt, - Map properties, boolean propertiesFullyLoaded) { - super(graph, id, label, createdAt, updatedAt, properties, propertiesFullyLoaded); + Map> properties, boolean propertiesFullyLoaded) { + super(graph, id, label, createdAt, updatedAt); if (graph != null) { - this.edgeCache = CacheBuilder.>newBuilder() + this.edgeCache = CacheBuilder.newBuilder() .maximumSize(graph.configuration().getRelationshipCacheMaxSize()) .expireAfterAccess(graph.configuration().getRelationshipCacheTtlSecs(), TimeUnit.SECONDS) .build(); } + this.properties = properties; + this.propertiesFullyLoaded = propertiesFullyLoaded; + } + + @Override + public void copyFrom(HBaseElement element) { + super.copyFrom(element); + if (element instanceof HBaseVertex) { + HBaseVertex copy = (HBaseVertex) element; + if (copy.properties != null && (copy.propertiesFullyLoaded || this.properties == null)) { + this.properties = new ConcurrentHashMap<>(copy.properties); + this.propertiesFullyLoaded = copy.propertiesFullyLoaded; + } + } } @Override @@ -126,29 +158,197 @@ public void remove() { } } + private Map> getProperties() { + if (this.properties == null) { + if (this.propertiesFullyLoaded) { + //read-optimized concurrent map + this.properties = new MapMaker().concurrencyLevel(1).makeMap(); + } else { + if (properties == null || !propertiesFullyLoaded) { + load(); + propertiesFullyLoaded = true; + } + } + } + return properties; + } + @Override public VertexProperty property(final VertexProperty.Cardinality cardinality, final String key, final V value, final Object... keyValues) { - if (cardinality != VertexProperty.Cardinality.single) - throw VertexProperty.Exceptions.multiPropertiesNotSupported(); if (keyValues.length > 0) throw VertexProperty.Exceptions.metaPropertiesNotSupported(); - setProperty(key, value); + + Object oldSingleValue = null; + if (cardinality == VertexProperty.Cardinality.single) { + Collection values = getProperties().get(key); + if (values != null && !values.isEmpty()) { + oldSingleValue = values.iterator().next(); + } + } + boolean hasIndex = hasIndex(OperationType.WRITE, key); + if (hasIndex && cardinality != VertexProperty.Cardinality.single) { + throw VertexProperty.Exceptions.multiPropertiesNotSupported(); + } + + //this will mutate the graph + VertexProperty existingProperty = null; + if (cardinality.equals(VertexProperty.Cardinality.single)) { + this.properties.remove(key); + } else if (cardinality.equals(VertexProperty.Cardinality.set)) { + Iterator> itty = this.properties(key); + while (itty.hasNext()) { + final VertexProperty property = itty.next(); + if (property.value().equals(value)) { + ElementHelper.attachProperties(property, keyValues); + existingProperty = property; + break; + } + } + } + if (existingProperty != null) { + if (existingProperty instanceof HBaseVertexProperty) { + return existingProperty; + } else { + return new HBaseVertexProperty<>(graph, this, existingProperty.key(), existingProperty.value()); + } + } + + if (hasIndex) { + // only load old value if using index + if (oldSingleValue != null && !oldSingleValue.equals(value)) { + deleteFromIndexModel(key, null); + } + } + Collection plist = getProperties().computeIfAbsent(key, k -> new LinkedList<>()); + plist.add(value); + updatedAt(System.currentTimeMillis()); + + if (hasIndex) { + if (oldSingleValue == null || !oldSingleValue.equals(value)) { + writeToIndexModel(key); + } + } + //we only really want to write multi-properties when necessary + Mutator writer = getModel().writeProperty(this, key, preparePropertyWriteValue(cardinality, value, plist)); + Mutators.write(getTable(), writer); return new HBaseVertexProperty<>(graph, this, key, value); } + private Object preparePropertyWriteValue(final VertexProperty.Cardinality cardinality, final V value, Collection plist) { + return cardinality == VertexProperty.Cardinality.single ? value : new PropertyValueList(plist); + } + + @SuppressWarnings("unchecked") + public Optional getSingleProperty(String key) { + if (this.properties != null && this.properties.containsKey(key)) { + // optimization for partially loaded properties + Collection values = this.properties.getOrDefault(key, Collections.emptyList()); + if (values.size() == 1) { + return Optional.ofNullable((V)values.iterator().next()); + } + } + Collection values = getProperties().getOrDefault(key, Collections.emptyList()); + if (values.size() == 1) { + return Optional.ofNullable((V)values.iterator().next()); + } + return Optional.empty(); + } + public boolean hasProperty(String key, Object value) { + if (this.properties != null && this.properties.containsKey(key)) { + // optimization for partially loaded properties + if (this.properties.getOrDefault(key, Collections.emptyList()).contains(value)) { + return true; + } + } + if (getProperties().getOrDefault(key, Collections.emptyList()).contains(value)) { + return true; + } + return false; + } + public void cacheProperty(String key, Object value) { + getProperties().computeIfAbsent(key, k -> new LinkedHashSet<>()).add(value); + } + @Override - public VertexProperty property(final String key) { - V value = getProperty(key); - return value != null ? new HBaseVertexProperty<>(graph, this, key, value) : VertexProperty.empty(); + public boolean hasProperty(String key) { + if (this.properties != null && !properties.getOrDefault(key, Collections.emptyList()).isEmpty()) { + // optimization for partially loaded properties + return true; + } + return !getProperties().getOrDefault(key, Collections.emptyList()).isEmpty(); + } + + public boolean removeProperty(String key, Object value) { + boolean removed = false; + if (null != this.properties && this.properties.containsKey(key)) { + Collection values = this.properties.get(key); + removed = values.remove(value); + if (removed) { + updatedAt(System.currentTimeMillis()); + Mutator writer; + if (values.isEmpty()) { + boolean hasIndex = hasIndex(OperationType.WRITE, key); + deleteFromIndexModel(key, null); + //clear + writer = getModel().clearProperty(this, key); + } else { + //cardinality is non-single + writer = getModel().writeProperty(this, key, preparePropertyWriteValue(VertexProperty.Cardinality.list, value, values)); + } + Mutators.write(getTable(), writer); + } + } + return removed; + } + + @Override + public Stream> propertyEntriesStream() { + return this.getProperties().entrySet().stream() + .flatMap(e -> e.getValue().stream().map(o -> new AbstractMap.SimpleEntry<>(e.getKey(), o))); } @Override + public int propertySize() { + return this.getProperties().values().stream().filter(c -> c != null).mapToInt(c -> c.size()).sum(); + } + + public void incrementProperty(String key, long value) { + if (!graph.configuration().getUseSchema()) { + throw new HBaseGraphNoSchemaException("Schema not enabled"); + } + ElementHelper.validateProperty(key, value); + + graph.validateProperty(getElementType(), label, key, value); + + updatedAt(System.currentTimeMillis()); + + Mutator writer = getModel().incrementProperty(this, key, value); + long newValue = Mutators.increment(getTable(), writer, key); + cacheProperty(key, newValue); + } + + @Override @SuppressWarnings("unchecked") + public VertexProperty property(final String key) { + if (getProperties().containsKey(key)) { + Collection values = getProperties().getOrDefault(key, Collections.emptyList()); + if (values.size() > 1) { + throw Vertex.Exceptions.multiplePropertiesExistForProvidedKey(key); + } else { + return new HBaseVertexProperty<>(graph, this, key, (V)values.iterator().next()); + } + } else { + return VertexProperty.empty(); + } + } + + @Override @SuppressWarnings("unchecked") public Iterator> properties(final String... propertyKeys) { - Iterable keys = getPropertyKeys(); - Iterator filter = IteratorUtils.filter(keys.iterator(), - key -> ElementHelper.keyExists(key, propertyKeys)); - return IteratorUtils.map(filter, - key -> new HBaseVertexProperty<>(graph, this, key, getProperty(key))); + Set desiredKeys = ImmutableSet.copyOf(propertyKeys); + return getProperties().entrySet().stream() + .filter(e -> propertyKeys.length == 0 || desiredKeys.contains(e.getKey())) + .flatMap(e -> e.getValue().stream().map(o -> new AbstractMap.SimpleEntry<>(e.getKey(), o))) + .map(e -> (VertexProperty)new HBaseVertexProperty<>(graph, this, e.getKey(), (V)e.getValue())) + .iterator(); } @Override diff --git a/src/main/java/io/hgraphdb/HBaseVertexProperty.java b/src/main/java/io/hgraphdb/HBaseVertexProperty.java index e40f26a..2c345b9 100644 --- a/src/main/java/io/hgraphdb/HBaseVertexProperty.java +++ b/src/main/java/io/hgraphdb/HBaseVertexProperty.java @@ -62,7 +62,7 @@ public Property property(final String key, final U value) { @Override public void remove() { - vertex.removeProperty(this.key); + ((HBaseVertex)vertex).removeProperty(this.key, this.value); } @Override diff --git a/src/main/java/io/hgraphdb/PropertyValueList.java b/src/main/java/io/hgraphdb/PropertyValueList.java new file mode 100644 index 0000000..6be3039 --- /dev/null +++ b/src/main/java/io/hgraphdb/PropertyValueList.java @@ -0,0 +1,45 @@ +package io.hgraphdb; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import com.google.common.collect.ForwardingCollection; + +import java.util.Collection; +import java.util.LinkedList; + +public class PropertyValueList extends ForwardingCollection implements KryoSerializable { + + private static final int KRYO_ID = 225; + private static final int KRYO_LL_ID = 226; + static { + new Kryo().register(PropertyValueList.class, KRYO_ID); + new Kryo().register(LinkedList.class, KRYO_LL_ID); + } + + private Collection list; + + public PropertyValueList() { + this(new LinkedList<>()); + } + + public PropertyValueList(Collection list) { + this.list = list; + } + + @Override protected Collection delegate() { + return list; + } + + @Override public void read(Kryo kryo, Input input) { + Collection c = (Collection)kryo.readClassAndObject(input); + this.list.addAll(c); + } + + @Override public void write(Kryo kryo, Output output) { + kryo.writeClassAndObject(output, this.list); + } + +} diff --git a/src/main/java/io/hgraphdb/models/VertexIndexModel.java b/src/main/java/io/hgraphdb/models/VertexIndexModel.java index ce7e366..5c362ec 100644 --- a/src/main/java/io/hgraphdb/models/VertexIndexModel.java +++ b/src/main/java/io/hgraphdb/models/VertexIndexModel.java @@ -1,12 +1,24 @@ package io.hgraphdb.models; -import io.hgraphdb.*; +import io.hgraphdb.CloseableIteratorUtils; +import io.hgraphdb.Constants; +import io.hgraphdb.ElementType; +import io.hgraphdb.HBaseGraph; +import io.hgraphdb.HBaseGraphConfiguration; +import io.hgraphdb.HBaseGraphException; +import io.hgraphdb.HBaseGraphNotFoundException; +import io.hgraphdb.HBaseGraphUtils; +import io.hgraphdb.HBaseVertex; +import io.hgraphdb.IndexMetadata; +import io.hgraphdb.OperationType; +import io.hgraphdb.ValueUtils; import io.hgraphdb.mutators.Mutator; import io.hgraphdb.mutators.Mutators; import io.hgraphdb.mutators.VertexIndexRemover; import io.hgraphdb.mutators.VertexIndexWriter; import io.hgraphdb.readers.VertexIndexReader; import io.hgraphdb.util.DynamicPositionedMutableByteRange; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; @@ -17,15 +29,23 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.PrefixFilter; -import org.apache.hadoop.hbase.util.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Order; +import org.apache.hadoop.hbase.util.OrderedBytes; +import org.apache.hadoop.hbase.util.PositionedByteRange; +import org.apache.hadoop.hbase.util.SimplePositionedByteRange; import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.VertexProperty; import org.apache.tinkerpop.gremlin.structure.util.DefaultCloseableIterator; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.function.Predicate; @@ -62,7 +82,7 @@ public void deleteVertexIndex(Vertex vertex, String key, Long ts) { public Iterator vertices(String label, boolean isUnique, String key, Object value) { byte[] valueBytes = ValueUtils.serialize(value); return vertices(getVertexIndexScan(label, isUnique, key, value), vertex -> { - byte[] propValueBytes = ValueUtils.serialize(vertex.getProperty(key)); + byte[] propValueBytes = vertex.hasProperty(key, value) ? ValueUtils.serialize(value) : new byte[] {}; return Bytes.compareTo(propValueBytes, valueBytes) == 0; }); } @@ -71,9 +91,15 @@ public Iterator verticesInRange(String label, boolean isUnique, String k byte[] fromBytes = ValueUtils.serialize(inclusiveFrom); byte[] toBytes = ValueUtils.serialize(exclusiveTo); return vertices(getVertexIndexScanInRange(label, isUnique, key, inclusiveFrom, exclusiveTo), vertex -> { - byte[] propValueBytes = ValueUtils.serialize(vertex.getProperty(key)); - return Bytes.compareTo(propValueBytes, fromBytes) >= 0 - && Bytes.compareTo(propValueBytes, toBytes) < 0; + Iterator> properties = vertex.properties(key); + while (properties.hasNext()) { + VertexProperty prop = properties.next(); + byte[] propValueBytes = ValueUtils.serialize(prop.value()); + if (!(Bytes.compareTo(propValueBytes, fromBytes) >= 0 && Bytes.compareTo(propValueBytes, toBytes) < 0)) { + return false; + } + } + return true; }); } @@ -81,9 +107,18 @@ public Iterator verticesWithLimit(String label, boolean isUnique, String byte[] fromBytes = from != null ? ValueUtils.serialize(from) : HConstants.EMPTY_BYTE_ARRAY; return CloseableIteratorUtils.limit(vertices(getVertexIndexScanWithLimit(label, isUnique, key, from, limit, reversed), vertex -> { if (fromBytes == HConstants.EMPTY_BYTE_ARRAY) return true; - byte[] propValueBytes = ValueUtils.serialize(vertex.getProperty(key)); - int compare = Bytes.compareTo(propValueBytes, fromBytes); - return reversed ? compare <= 0 : compare >= 0; + Iterator> properties = vertex.properties(key); + while (properties.hasNext()) { + VertexProperty prop = properties.next(); + byte[] propValueBytes = ValueUtils.serialize(prop.value()); + int compare = Bytes.compareTo(propValueBytes, fromBytes); + if (reversed && compare > 0) { + return false; + } else if (!reversed && compare < 0) { + return false; + } + } + return true; }), limit); } @@ -223,8 +258,10 @@ public Vertex deserialize(Result result) { } Cell createdAtCell = result.getColumnLatestCell(Constants.DEFAULT_FAMILY_BYTES, Constants.CREATED_AT_BYTES); Long createdAt = ValueUtils.deserialize(CellUtil.cloneValue(createdAtCell)); - Map properties = new HashMap<>(); - properties.put(key, value); + Map> properties = new HashMap<>(); + List list = new LinkedList<>(); + list.add(value); + properties.put(key, list); HBaseVertex newVertex = new HBaseVertex(graph, vertexId, label, createdAt, null, properties, false); HBaseVertex vertex = (HBaseVertex) graph.findOrCreateVertex(vertexId); vertex.copyFrom(newVertex); diff --git a/src/main/java/io/hgraphdb/mutators/VertexWriter.java b/src/main/java/io/hgraphdb/mutators/VertexWriter.java index f7982fd..1094bff 100644 --- a/src/main/java/io/hgraphdb/mutators/VertexWriter.java +++ b/src/main/java/io/hgraphdb/mutators/VertexWriter.java @@ -38,7 +38,7 @@ public Iterator constructInsertions() { ValueUtils.serialize(((HBaseVertex) vertex).createdAt())); put.addColumn(Constants.DEFAULT_FAMILY_BYTES, Constants.UPDATED_AT_BYTES, ValueUtils.serialize(((HBaseVertex) vertex).updatedAt())); - ((HBaseVertex) vertex).getProperties().entrySet() + ((HBaseVertex) vertex).propertyEntriesStream() .forEach(entry -> { byte[] bytes = ValueUtils.serializePropertyValue(graph, ElementType.VERTEX, label, entry.getKey(), entry.getValue()); put.addColumn(Constants.DEFAULT_FAMILY_BYTES, Bytes.toBytes(entry.getKey()), bytes); diff --git a/src/main/java/io/hgraphdb/readers/VertexReader.java b/src/main/java/io/hgraphdb/readers/VertexReader.java index 11f477f..7a199b9 100644 --- a/src/main/java/io/hgraphdb/readers/VertexReader.java +++ b/src/main/java/io/hgraphdb/readers/VertexReader.java @@ -1,6 +1,12 @@ package io.hgraphdb.readers; -import io.hgraphdb.*; +import io.hgraphdb.Constants; +import io.hgraphdb.ElementType; +import io.hgraphdb.HBaseGraph; +import io.hgraphdb.HBaseGraphNotFoundException; +import io.hgraphdb.HBaseVertex; +import io.hgraphdb.ValueUtils; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Result; @@ -8,9 +14,12 @@ import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Vertex; +import java.util.Collection; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; -import java.util.stream.Collectors; + +import com.google.common.collect.MapMaker; public class VertexReader extends LoadingElementReader { @@ -48,8 +57,19 @@ public void load(Vertex vertex, Result result) { } } final String labelStr = label; - Map props = rawProps.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, - e -> ValueUtils.deserializePropertyValue(graph, ElementType.VERTEX, labelStr, e.getKey(), e.getValue()))); + Map> props = new MapMaker().concurrencyLevel(1).makeMap(); + for (Map.Entry entry : rawProps.entrySet()) { + String key = entry.getKey(); + Object deserialized = ValueUtils.deserializePropertyValue(graph, ElementType.VERTEX, labelStr, key, entry.getValue()); + props.put(key, new LinkedList<>()); + if (deserialized instanceof Iterable) { + for (Object item : (Iterable)deserialized) { + props.get(key).add(item); + } + } else { + props.get(key).add(deserialized); + } + } HBaseVertex newVertex = new HBaseVertex(graph, vertex.id(), label, createdAt, updatedAt, props); ((HBaseVertex) vertex).copyFrom(newVertex); } diff --git a/src/test/java/io/hgraphdb/HBaseElementTest.java b/src/test/java/io/hgraphdb/HBaseElementTest.java index 5925a77..83df010 100644 --- a/src/test/java/io/hgraphdb/HBaseElementTest.java +++ b/src/test/java/io/hgraphdb/HBaseElementTest.java @@ -5,9 +5,11 @@ import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.google.common.collect.ImmutableMap; + import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.T; import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality; import org.junit.Test; import java.io.Serializable; @@ -105,6 +107,23 @@ public void testAllVerticesWithLabel() { assertEquals(2, count(it)); } + @Test + public void testVertexMultiProperty() { + assertEquals(0, count(graph.vertices())); + Vertex vertex = graph.addVertex(T.id, id(0), T.label, "a", "key1", "value1"); + int expect = 2; + + vertex.property(Cardinality.set, "key1", "value2"); + assertEquals(expect, count(vertex.properties("key1"))); + + vertex.property(Cardinality.set, "key1", "value1"); + assertEquals(expect, count(vertex.properties("key1"))); + + vertex.property(Cardinality.set, "key1", "value3"); + expect++; + assertEquals(expect, count(vertex.properties("key1"))); + } + @Test public void testCountersNotSupportedWithoutSchema() { assertEquals(0, count(graph.vertices())); @@ -128,10 +147,12 @@ public KryoObject(ValueType id) { this.id = id; } + @Override public void write(Kryo kryo, Output output) { output.write(id.getCode()); } + @Override public void read(Kryo kryo, Input input) { id = ValueType.valueOf(input.readByte()); } diff --git a/src/test/java/io/hgraphdb/giraph/HBaseWriteableTest.java b/src/test/java/io/hgraphdb/giraph/HBaseWriteableTest.java index 058cc90..5406798 100644 --- a/src/test/java/io/hgraphdb/giraph/HBaseWriteableTest.java +++ b/src/test/java/io/hgraphdb/giraph/HBaseWriteableTest.java @@ -1,23 +1,27 @@ package io.hgraphdb.giraph; +import static org.junit.Assert.assertEquals; + import com.google.common.collect.ImmutableMap; + import io.hgraphdb.HBaseEdge; +import io.hgraphdb.HBaseGraphUtils; import io.hgraphdb.HBaseVertex; + import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.util.Collection; import java.util.Map; -import static org.junit.Assert.assertEquals; - public class HBaseWriteableTest { @Test public void testVertexWritable() throws Exception { - Map properties = ImmutableMap.of("foo", 4L, "bar", "barVal"); + Map> properties = HBaseGraphUtils.propertiesToMultimap("foo", 4L, "bar", "barVal"); HBaseVertex v = new HBaseVertex(null, 1, "mylabel", 2L, 3L, properties); VertexValueWritable writable = new VertexValueWritable(v); ByteArrayOutputStream baos = new ByteArrayOutputStream();