diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..a7bd0fed5aecf5f745cd0ba6f829c8db41f6ac67 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.idea/ +spangraph.iml +target/ + diff --git a/README.md b/README.md index 326c4da59afc9168eb656c3c7213f585d5a315b9..13403cfa6c247c91e272d1f3e6fbc1886127120c 100644 --- a/README.md +++ b/README.md @@ -1 +1,41 @@ -# spangraph +# BluePrints <- InfiniSpan + * http://github.com/tinkerpop/blueprints + * http://infinispan.org + +(In development. Unit tests coverage is limited.) + +Features: +``` + FEATURES.supportsDuplicateEdges = true; + FEATURES.supportsSelfLoops = true; + FEATURES.supportsSerializableObjectProperty = true; + FEATURES.supportsBooleanProperty = true; + FEATURES.supportsDoubleProperty = true; + FEATURES.supportsFloatProperty = true; + FEATURES.supportsIntegerProperty = true; + FEATURES.supportsPrimitiveArrayProperty = true; + FEATURES.supportsUniformListProperty = true; + FEATURES.supportsMixedListProperty = true; + FEATURES.supportsLongProperty = true; + FEATURES.supportsMapProperty = true; + FEATURES.supportsStringProperty = true; + + FEATURES.ignoresSuppliedIds = false; + FEATURES.isPersistent = false; + FEATURES.isWrapper = false; + + FEATURES.supportsIndices = false; + FEATURES.supportsKeyIndices = false; + FEATURES.supportsVertexKeyIndex = false; + FEATURES.supportsEdgeKeyIndex = false; + FEATURES.supportsVertexIndex = false; + FEATURES.supportsEdgeIndex = false; + FEATURES.supportsTransactions = false; + FEATURES.supportsVertexIteration = true; + FEATURES.supportsEdgeIteration = true; + FEATURES.supportsEdgeRetrieval = true; + FEATURES.supportsVertexProperties = true; + FEATURES.supportsEdgeProperties = true; + FEATURES.supportsThreadedTransactions = false; + FEATURES.supportsThreadIsolatedTransactions = false; +``` diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..25de644f37a302abb2432fd88d9f393aa58e3344 --- /dev/null +++ b/pom.xml @@ -0,0 +1,65 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <version>1.0-SNAPSHOT</version> + <groupId>com.syncleus</groupId> + <artifactId>spangraph</artifactId> + <packaging>jar</packaging> + + <name>spangraph</name> + <url>https://syncleus.com</url> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.2</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + <!--<useIncrementalCompilation>true</useIncrementalCompilation>--> + </configuration> + </plugin> + </plugins> + + </build> + + <dependencies> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>18.0</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + + + <dependency> + <groupId>org.infinispan</groupId> + <artifactId>infinispan-embedded</artifactId> + <version>8.0.0.Alpha1</version> + </dependency> + + + <dependency> + <groupId>com.tinkerpop.blueprints</groupId> + <artifactId>blueprints-core</artifactId> + <version>2.6.0</version> + </dependency> + + </dependencies> + + +</project> + diff --git a/src/main/java/com/syncleus/spangraph/InfiniPeer.java b/src/main/java/com/syncleus/spangraph/InfiniPeer.java new file mode 100644 index 0000000000000000000000000000000000000000..b1179d09da39f728f81b173d0de014a1d31838b9 --- /dev/null +++ b/src/main/java/com/syncleus/spangraph/InfiniPeer.java @@ -0,0 +1,219 @@ +package com.syncleus.spangraph; + +import org.infinispan.Cache; +import org.infinispan.configuration.cache.CacheMode; +import org.infinispan.configuration.cache.Configuration; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.configuration.global.GlobalConfiguration; +import org.infinispan.configuration.global.GlobalConfigurationBuilder; +import org.infinispan.configuration.global.TransportConfigurationBuilder; +import org.infinispan.manager.DefaultCacheManager; +import org.infinispan.notifications.Listener; +import org.infinispan.notifications.cachelistener.annotation.CacheEntriesEvicted; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved; +import org.infinispan.notifications.cachelistener.event.Event; +import org.infinispan.notifications.cachemanagerlistener.annotation.Merged; +import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged; +import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent; + +import java.io.PrintStream; +import java.util.function.Consumer; + +/** + * Peer-to-peer node manager : wraps CacheManager functionality + * + * @see https://github.com/belaban/JGroups/tree/master/conf + */ +@Listener(sync = true) +public class InfiniPeer extends DefaultCacheManager { + + public final String userID; + + public InfiniPeer(String userID, GlobalConfiguration globalConfig, Configuration config) { + super(globalConfig, config); + + this.userID = userID; + + addListener(this); + + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + stop(); + } + }); + + + } + + public void print(Cache cache, PrintStream out) { + + System.out.println(userID + " -----"); + System.out.println(cache + " " + cache.size()); + System.out.println(cache.entrySet()); + System.out.println("---\n"); + + } + + public <K,V> Cache<K, V> the(String cacheID) { + return the(cacheID, true); + } + + public <K,V> Cache<K, V> the(String cacheID, boolean createIfMissing) { + Cache<K, V> cache = getCache(cacheID, createIfMissing); + cache.addListener(this); + return cache; + } + + + @ViewChanged + @Merged + public void viewChangeEvent(ViewChangedEvent e) { + + //System.out.println("viewChanged: " + e.getNewMembers() + " <- " + e.getOldMembers()); + //updateClusterTable(e.getNewMembers()); + } + + @CacheEntryCreated + @CacheEntryModified + @CacheEntryRemoved + @CacheEntriesEvicted + public void entryChanged(Event<?, ?> e) { + /*if (!e.isPre()) + System.out.println("change: " + e);*/ + } + +// /** creates a local infinipeer */ +// public static InfiniPeer local(String userID) { +// +// GlobalConfiguration globalConfig = new GlobalConfigurationBuilder() +// .build(); +// +// Configuration config = new ConfigurationBuilder() +// .unsafe() +// .build(); +// +// return new InfiniPeer( +// userID, +// globalConfig, +// config +// ); +// +// } + + + + + /** for local only mode on the same host */ + public static InfiniPeer local(String userID) { + return cluster(userID, t -> + t.nodeName(userID) + .defaultTransport() + .addProperty("configurationFile", "fast.xml") + ); + + } + + + /** creates a cluster infinipeer */ + public static InfiniPeer cluster(String userID) { + return cluster(userID, t -> + t.nodeName(userID) + .defaultTransport() + .addProperty("configurationFile", "udp-largecluster.xml") + ); + + } + + + /** creates a web-scale (> 100 nodes) infinipeer */ + public static InfiniPeer webLarge(String userID) { + return cluster(userID, t -> + t.nodeName(userID) + .defaultTransport() + .addProperty("configurationFile", "udp-largecluster.xml") + ); + + //https://github.com/infinispan/infinispan/tree/master/core/src/main/resources/default-configs + //https://github.com/belaban/JGroups/blob/master/conf/udp-largecluster.xml + //https://github.com/infinispan/infinispan/blob/master/core/src/test/java/org/infinispan/test/fwk/JGroupsConfigBuilder.java + } + + /** creates a web-enabled infinipeer */ + public static InfiniPeer cluster(String userID, Consumer<TransportConfigurationBuilder> transportConfigger) { + + + GlobalConfigurationBuilder globalConfigBuilder = new GlobalConfigurationBuilder(); + + transportConfigger.accept( globalConfigBuilder.transport() ); + + + + globalConfigBuilder.globalJmxStatistics().allowDuplicateDomains(true) + .build(); + + Configuration config = new ConfigurationBuilder() + + .unsafe() + .clustering() + //.cacheMode(CacheMode.DIST_SYNC) + .cacheMode(CacheMode.DIST_SYNC) + .sync() + .l1().lifespan(25000L) + .hash().numOwners(3) + .build(); + + return new InfiniPeer( + userID, + globalConfigBuilder.build(), + config + ); + } + +// public static void main(String args[]) throws Exception { +// +// +// InfiniPeer.web(UUID.randomUUID().toString()); +// +// +//// EmbeddedCacheManager manager = new DefaultCacheManager(); +//// manager.defineConfiguration("custom-cache", new ConfigurationBuilder() +//// .eviction().strategy(EvictionStrategy.LIRS).maxEntries(10) +//// .build()); +//// Cache<Object, Object> c = manager.getCache("custom-cache"); +// +// +// /* +// ConfigurationBuilder builder = new ConfigurationBuilder(); +// builder.persistence() +// .passivation(false) +// .addSingleFileStore() +// .preload(true) +// .shared(false) +// .fetchPersistentState(true) +// .ignoreModifications(false) +// .purgeOnStartup(false) +// .location(System.getProperty("java.io.tmpdir")) +// .async() +// .enabled(true) +// .threadPoolSize(5) +// .singleton() +// .enabled(true) +// .pushStateWhenCoordinator(true) +// .pushStateTimeout(20000); +// */ +// +// +// /* +// +// ConfigurationBuilder b = new ConfigurationBuilder(); +//b.persistence() +// .addStore(SoftIndexFileStoreConfigurationBuilder.class) +// .indexLocation("/tmp/sifs/testCache/index"); +// .dataLocation("/tmp/sifs/testCache/data") +// */ +// } +// + +} diff --git a/src/main/java/com/syncleus/spangraph/MapGraph.java b/src/main/java/com/syncleus/spangraph/MapGraph.java new file mode 100644 index 0000000000000000000000000000000000000000..b82a7251cc2874d7ec0cf6840d1e2acb94295923 --- /dev/null +++ b/src/main/java/com/syncleus/spangraph/MapGraph.java @@ -0,0 +1,729 @@ +package com.syncleus.spangraph; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.tinkerpop.blueprints.*; +import com.tinkerpop.blueprints.util.*; +import org.infinispan.commons.util.InfinispanCollections; +import org.infinispan.commons.util.WeakValueHashMap; + +import java.io.*; +import java.util.*; + +/** + * Blueprints Graph interface with adjacency implemented by some Map implementation. + * Iterables in Elements, Vertices, and Edges are implemented by guava Iterator/Iterable lazy wrappers, avoiding collection allocation + */ +abstract public class MapGraph<X extends Serializable> implements Graph { + + private static Map<String,MapGraph> global = new WeakValueHashMap<>(); + + /** name of the graph combined with the peerID */ + public final String id; + + /** name of the graph as shared on the network */ + public final String globalID; + + protected Map<X, Vertex> vertices; + protected Map<X, Edge> edges; + + //protected Map<String, TinkerIndex> indices = new HashMap<String, TinkerIndex>(); + //protected TinkerKeyIndex<InfiniVertex> vertexKeyIndex = new TinkerKeyIndex<InfiniVertex>(InfiniVertex.class, this); + //protected TinkerKeyIndex<InfiniEdge> edgeKeyIndex = new TinkerKeyIndex<InfiniEdge>(InfiniEdge.class, this); + + private static final Features FEATURES = new Features(); + + + static { + FEATURES.supportsDuplicateEdges = true; + FEATURES.supportsSelfLoops = true; + FEATURES.supportsSerializableObjectProperty = true; + FEATURES.supportsBooleanProperty = true; + FEATURES.supportsDoubleProperty = true; + FEATURES.supportsFloatProperty = true; + FEATURES.supportsIntegerProperty = true; + FEATURES.supportsPrimitiveArrayProperty = true; + FEATURES.supportsUniformListProperty = true; + FEATURES.supportsMixedListProperty = true; + FEATURES.supportsLongProperty = true; + FEATURES.supportsMapProperty = true; + FEATURES.supportsStringProperty = true; + + FEATURES.ignoresSuppliedIds = false; + FEATURES.isPersistent = false; + FEATURES.isWrapper = false; + + FEATURES.supportsIndices = false; + FEATURES.supportsKeyIndices = false; + FEATURES.supportsVertexKeyIndex = false; + FEATURES.supportsEdgeKeyIndex = false; + FEATURES.supportsVertexIndex = false; + FEATURES.supportsEdgeIndex = false; + FEATURES.supportsTransactions = false; + FEATURES.supportsVertexIteration = true; + FEATURES.supportsEdgeIteration = true; + FEATURES.supportsEdgeRetrieval = true; + FEATURES.supportsVertexProperties = true; + FEATURES.supportsEdgeProperties = true; + FEATURES.supportsThreadedTransactions = false; + FEATURES.supportsThreadIsolatedTransactions = false; + + } + + public int vertexCount() { + return vertexCollection().size(); + } + public int edgeCount() { + return edgeCollection().size(); + } + + public enum FileType { + JAVA, + GML, + GRAPHML, + GRAPHSON + } + + + protected MapGraph(String id) { + this(id, id); + } + + protected MapGraph(String globalID, String peerID) { + super(); + + this.globalID = globalID; + this.id = globalID + ':' + peerID; + + if (global.put(id, this)!=null) + throw new RuntimeException("graph " + id + " already exists"); + + } + + /** call this at the end of implementing class constructors */ + protected void init() { + this.vertices = newVertexMap(); + this.edges = newEdgeMap(); + } + + protected abstract Map<X, Edge> newEdgeMap(); + + protected abstract Map<X, Vertex> newVertexMap(); + + public MVertex<X> addVertex(final Object id) { + + + if (null != id) { + if (this.vertices.containsKey(id)) { + throw ExceptionFactory.vertexWithIdAlreadyExists(id); + } + } else { + throw new RuntimeException("id must be non-null"); +// boolean done = false; +// while (!done) { +// idString = this.getNextId(); +// vertex = this.vertices.get(idString); +// if (null == vertex) +// done = true; +// } + } + + MVertex<X> vertex = new MVertex<X>((X) id, this); + this.vertices.put(vertex.id, vertex); + return vertex; + } + + public MVertex<X> getVertex(final Object id) { + if (null == id) + throw ExceptionFactory.vertexIdCanNotBeNull(); + + return (MVertex<X>) this.vertices.get(id); + } + + public MEdge<X> getEdge(final Object id) { + if (null == id) + throw ExceptionFactory.edgeIdCanNotBeNull(); + + return (MEdge<X>) this.edges.get(id); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof MapGraph)) + return false; + MapGraph m = (MapGraph)obj; + + + + if ((vertices.size() != m.vertices.size()) || + (edges.size() != m.edges.size())) + return false; + + return m.vertexSet().equals(vertexSet()) && + m.edgeSet().equals(edgeSet()); + } + + public Set<MVertex<X>> differentVertices(MapGraph<X> y) { + Set<MVertex<X>> xv = vertexSet(); + Set<MVertex<X>> yv = y.vertexSet(); + return Sets.difference(xv, yv); + } + public Set<MEdge<X>> differentEdges(MapGraph<X> y) { + Set<MEdge<X>> xe = edgeSet(); + Set<MEdge<X>> ye = y.edgeSet(); + return Sets.difference(xe, ye); + } + + + + //TODO deepEquals which will compare the properties of each vertex and edge + + + public Collection<Vertex> vertexCollection() { + return vertices.values(); + } + public Collection<Edge> edgeCollection() { + return edges.values(); + } + + public Set<MVertex<X>> vertexSet() { + return new HashSet(vertices.values()); + } + public TreeSet<MVertex<X>> vertexSetSorted() { + return new TreeSet(vertices.values()); + } + + public Set<MEdge<X>> edgeSet() { + return new HashSet(edges.values()); + } + public TreeSet<MEdge<X>> edgeSetSorted() { + return new TreeSet(edges.values()); + } + + public Iterable<Vertex> getVertices() { + //Unmodifiable? + return this.vertices.values(); + } + + @Override + public Iterable<Vertex> getVertices(String s, Object o) { + return null; + } + + public Iterable<Edge> getEdges() { + ////Unmodifiable? + return this.edges.values(); + } + + @Override + public Iterable<Edge> getEdges(String s, Object o) { + return null; + } + + public void removeVertex(final Vertex vertex) { + if (null == this.vertices.remove(vertex.getId())) + throw ExceptionFactory.vertexWithIdDoesNotExist(vertex.getId()); + + for (Edge edge : vertex.getEdges(Direction.BOTH)) { + this.removeEdge(edge); + } + + this.vertices.remove(vertex.getId()); + } + + public Edge addEdge(final Object edgeID, Object out, Object in) { + final Vertex o = getVertex(out); + if (o == null) throw new RuntimeException("Unknown source vertex " + out); + final Vertex i = getVertex(in); + if (i == null) throw new RuntimeException("Unknown target vertex " + in); + + return addEdge(edgeID, o, i, null); + } + + public Edge addEdge(final Object id, final Vertex outVertex, final Vertex inVertex, final String label) { + /*if (label == null) + throw ExceptionFactory.edgeLabelCanNotBeNull();*/ + + + + if (null != id) { + if (this.edges.containsKey(id)) { + throw ExceptionFactory.edgeWithIdAlreadyExist(id); + } + } else { + throw new RuntimeException("ID must be non-null"); +// boolean done = false; +// while (!done) { +// idString = this.getNextId(); +// edge = this.edges.get(idString); +// if (null == edge) +// done = true; +// } + } + + MEdge<X> edge = new MEdge<X>((X)id, + (MVertex)outVertex, + (MVertex)inVertex, + label, this); + this.edges.put(edge.id, edge); + + + ((MVertex) outVertex).add(edge, true); + ((MVertex) inVertex).add(edge, false); + return edge; + + } + + public void removeEdge(final Edge edge) { + + if (null == this.edges.remove(edge.getId().toString())) { + return; + } + + + MVertex<X> outVertex = ((MEdge) edge).outVertex; + MVertex<X> inVertex = ((MEdge) edge).inVertex; + + Object edgeID = edge.getId(); + if (null != outVertex && null != outVertex.outEdges) { + outVertex.remove(edgeID, false); + } + if (null != inVertex && null != inVertex.inEdges) { + inVertex.remove(edgeID, true); + } + + + + } + + public GraphQuery query() { + return new DefaultGraphQuery(this); + } + + + public String toString() { + return StringFactory.graphString(this, "vertices:" + this.vertices.size() + " edges:" + this.edges.size()); + } + + public void clear() { + this.vertices.clear(); + this.edges.clear(); + } + + public void shutdown() { + } + +// private String getNextId() { +// String idString; +// while (true) { +// idString = this.currentId.toString(); +// this.currentId++; +// if (null == this.vertices.get(idString) || null == this.edges.get(idString) || this.currentId == Long.MAX_VALUE) +// break; +// } +// return idString; +// } + + public Features getFeatures() { + return FEATURES; + } + +// protected class TinkerKeyIndex<T extends TinkerElement> extends TinkerIndex<T> implements Serializable { +// +// private final Set<String> indexedKeys = new HashSet<String>(); +// private TinkerGraph graph; +// +// public TinkerKeyIndex(final Class<T> indexClass, final TinkerGraph graph) { +// super(null, indexClass); +// this.graph = graph; +// } +// +// public void autoUpdate(final String key, final Object newValue, final Object oldValue, final T element) { +// if (this.indexedKeys.contains(key)) { +// if (oldValue != null) +// this.remove(key, oldValue, element); +// this.put(key, newValue, element); +// } +// } +// +// public void autoRemove(final String key, final Object oldValue, final T element) { +// if (this.indexedKeys.contains(key)) { +// this.remove(key, oldValue, element); +// } +// } +// +// public void createKeyIndex(final String key) { +// if (this.indexedKeys.contains(key)) +// return; +// +// this.indexedKeys.add(key); +// +// if (InfiniVertex.class.equals(this.indexClass)) { +// KeyIndexableGraphHelper.reIndexElements(graph, graph.getVertices(), new HashSet<String>(Arrays.asList(key))); +// } else { +// KeyIndexableGraphHelper.reIndexElements(graph, graph.getEdges(), new HashSet<String>(Arrays.asList(key))); +// } +// } +// +// public void dropKeyIndex(final String key) { +// if (!this.indexedKeys.contains(key)) +// return; +// +// this.indexedKeys.remove(key); +// this.index.remove(key); +// +// } +// +// public Set<String> getIndexedKeys() { +// if (null != this.indexedKeys) +// return new HashSet<String>(this.indexedKeys); +// else +// return Collections.emptySet(); +// } +// } + + abstract protected static class MElement<X extends Serializable> implements Element, Serializable { + + public Map<String, Serializable> properties = null; + public X id; + public String graphID; + transient String globalID; + transient MapGraph<X> graph; + + public MElement() { + + } + + protected MElement(final X id, final MapGraph<X> graph) { + setGraph(graph.id); + this.graph = graph; + this.id = id; + } + + + + public Map<String, Serializable> prop(final boolean createIfMissing) { + if (properties == null) { + if (createIfMissing) + properties = new LinkedHashMap(2); + else + return InfinispanCollections.emptyMap(); + } + return properties; + } + + public MapGraph<X> graph() { + if (graph == null) { + graph = MapGraph.the(graphID); + if (graph == null) + throw new RuntimeException(this + " refers to unknown graph " + graphID); + } + return graph; + } + + protected void setGraph(String newGraphID) { + if (this.graphID== null || !this.graphID.equals(newGraphID)) { + graphID = newGraphID; + globalID = null; + graph = null; //invalidates it, so use graph() to access this field + } + } + + public Set<String> getPropertyKeys() { + return prop(false).keySet(); + } + + public <T> T getProperty(final String key) { + return (T) prop(false).get(key); + } + + public void setProperty(final String key, final Object value) { + Serializable v = (Serializable)value; + ElementHelper.validateProperty(this, key, value); + Object oldValue = prop(true).put(key, v); + afterAdd(key, v); + } + + abstract protected void afterAdd(final String key, final Serializable value); + + public <T> T removeProperty(final String key) { + Serializable oldValue = prop(false).remove(key); + if (oldValue!=null) { + beforeRemove(key, oldValue); + return (T) oldValue; + } + return null; + } + + + + public String global() { + if (graphID == null) return null; + + if (globalID == null) + globalID = graphID.substring(0, graphID.indexOf(':')); + return globalID; + } + abstract protected void beforeRemove(final String key, final Serializable oldValue); + + final static int PRIME = 31; + final static int PRIME2 = 92821; + + public int hashCode() { + return hashL(this.id.hashCode(), global().hashCode()) ; + } + + + /** also includes properties in the hash */ + public int hashCodeDeep() { + return PRIME * hashCode() + properties.hashCode(); + } + public final static int hashL(int a, int b) { + return PRIME2 * (31 + a ) + b ; + } + + public X getId() { + return this.id; + } + + public boolean equals(final Object object) { + if (object.getClass()!=getClass()) return false; + MElement o = (MElement)object; + return o.getId().equals(getId()) && o.global().equals(global()); + } + + /** also includes properties in the equality test */ + public boolean deepEquals(final Object object) { + if (object.getClass()!=getClass()) return false; + MElement o = (MElement)object; + return o.getId().equals(getId()) && o.global().equals(global()) && properties.equals(o.properties); + } + + } + + public static <X extends Serializable> MapGraph<X> the(String graphID) { + MapGraph<X> g = global.get(graphID); + return g; + } + + + public static class MVertex<X extends Serializable> extends MElement<X> implements Vertex, Serializable { + + public final Map<X, Set<Edge>> outEdges = new LinkedHashMap(); + public final Map<X, Set<Edge>> inEdges = new LinkedHashMap(); + + protected MVertex(final X id, final MapGraph graph) { + super(id, graph); + } + + public Iterable<Edge> getEdges(final Direction direction, final X... labels) { + if (direction.equals(Direction.OUT)) { + return this.getOutEdges(labels); + } else if (direction.equals(Direction.IN)) + return this.getInEdges(labels); + else { + return Iterables.concat(this.getInEdges(labels), this.getOutEdges(labels)); + } + } + + @Override + public Iterable<Edge> getEdges(Direction direction, String... strings) { + return null; + } + + public Iterable<Vertex> getVertices(final Direction direction, final String... labels) { + return new VerticesFromEdgesIterable(this, direction, labels); + } + + private Iterable<Edge> getEdges(final Map<X, Set<Edge>> e, final X... labels) { + if (labels.length == 0) { + return Iterables.concat(e.values()); + } else if (labels.length == 1) { + final Set<Edge> edges = e.get(labels[0]); + if (null == edges) { + return Collections.emptyList(); + } else { + return edges; + } + } else { + final Set<X> labelSet = Sets.newHashSet(labels); + return Iterables.concat(Iterables.transform(e.entrySet(), x -> { + if (labelSet.contains(x.getKey())) + return x.getValue(); + return Collections.emptyList(); + })); + } + } + + private Iterable<Edge> getInEdges(final X... labels) { + return getEdges(inEdges, labels); + } + private Iterable<Edge> getOutEdges(final X... labels) { + return getEdges(outEdges, labels); + } + + public VertexQuery query() { + return new DefaultVertexQuery(this); + } + + public String toString() { + return new StringBuilder().append("v[").append(global()).append(':').append(getId()).append("]").toString(); + } + + public Edge addEdge(final String label, final Vertex vertex) { + return graph().addEdge(null, this, vertex, label); + } + + + + + private Set<Edge> newEdgeSet(int size) { + return new LinkedHashSet<Edge>(size); + } + + + + @Override + protected void afterAdd(String key, Serializable value) { + //this.graph.vertexKeyIndex.autoUpdate(key, value, oldValue, (InfiniVertex) this); + graph().update(this); + } + + @Override + protected void beforeRemove(String key, Serializable oldValue) { + //this.graph.vertexKeyIndex.autoRemove(key, oldValue, (InfiniVertex) this); + } + + + @Override + public void remove() { + graph().removeVertex(this); + } + + + public boolean remove(Object edgeID, boolean incoming) { + Map<X, Set<Edge>> target = incoming ? inEdges : outEdges; + if (target.remove(edgeID)!=null) { + graph().update(this); + return true; + } + + return false; + } + + public boolean add(MEdge<X> edge, boolean incoming) { + Map<X, Set<Edge>> target = incoming ? inEdges : outEdges; + X e = edge.id; + Set<Edge> edges = target.get(e); + if (null == edges) { + target.put(e, edges = newEdgeSet(1)); + } + if (edges.add(edge)) { + graph().update(this); + return true; + } + + return false; + } + } + + + /** called if an vertex changes its properties */ + protected void update(MVertex<X> xmVertex) { + + } + + /** called if an edge changes its properties */ + protected void update(MEdge<X> xmVertex) { + + } + + public static class MEdge<X extends Serializable> extends MElement<X> implements Edge, Externalizable { + + protected String label; + protected MVertex<X> inVertex; + protected MVertex<X> outVertex; + + public MEdge( ) { + + } + + protected MEdge(final X id, final MVertex<X> outVertex, final MVertex<X> inVertex, final String label, final MapGraph<X> graph) { + super(id, graph); + this.label = label; + this.outVertex = outVertex; + this.inVertex = inVertex; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(id); + out.writeUTF(graphID); + out.writeUTF(label == null ? "" : label); + out.writeObject(outVertex.getId()); + out.writeObject(inVertex.getId()); + out.writeObject(prop(false)); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + id = (X) in.readObject(); + + setGraph(in.readUTF()); + + label = in.readUTF(); + outVertex = graph().getVertex( in.readObject() ); + inVertex = graph().getVertex( in.readObject() ); + properties = (Map<String, Serializable>) in.readObject(); + } + + + + public String getLabel() { + return this.label; + } + + public Vertex getVertex(final Direction direction) throws IllegalArgumentException { + if (direction.equals(Direction.IN)) + return this.inVertex; + else if (direction.equals(Direction.OUT)) + return this.outVertex; + else + throw ExceptionFactory.bothIsNotSupported(); + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("e[").append(global()).append(':').append(getId()).append("][").append(getVertex(Direction.OUT).getId()).append("-"); + String label = getLabel(); + if (label!=null) + sb.append(getLabel()); + return sb.append("->").append(getVertex(Direction.IN).getId()).append(']').toString(); + } + + @Override + protected void afterAdd(String key, Serializable value) { + //this.graph.edgeKeyIndex.autoUpdate(key, value, oldValue, (InfiniEdge) this);*/ + graph().update(this); + } + + @Override + protected void beforeRemove(String key, Serializable oldValue) { + //this.graph.edgeKeyIndex.autoRemove(key, oldValue, (InfiniEdge) this);*/ + graph().update(this); + } + + @Override + public void remove() { + graph().removeEdge(this); + } + + + +// @Override +// public boolean equals(Object object) { +// return label.equals(((MEdge)object).label); +// } + } +} \ No newline at end of file diff --git a/src/main/java/com/syncleus/spangraph/SpanGraph.java b/src/main/java/com/syncleus/spangraph/SpanGraph.java new file mode 100644 index 0000000000000000000000000000000000000000..2e505395690199d609bf1aa4d7794a0d16f7d682 --- /dev/null +++ b/src/main/java/com/syncleus/spangraph/SpanGraph.java @@ -0,0 +1,39 @@ +package com.syncleus.spangraph; + +import com.tinkerpop.blueprints.Edge; +import com.tinkerpop.blueprints.Vertex; + +import java.io.Serializable; +import java.util.Map; + +/** + * Blueprints Graph interface with adjacency implemented by Infinispan collections + */ +public class SpanGraph<X extends Serializable> extends MapGraph<X> { + + public final InfiniPeer peer; + private final String globalID; + + public SpanGraph(String id, InfiniPeer peer) { + super(id, peer.userID); //local ID + this.globalID = id; //ID as known on the network + this.peer = peer; + init(); + } + + protected <Y> Map<X, Y> newElementMap(String suffix) { + return peer.the(globalID + '_' + suffix); + } + + @Override + protected Map<X, Edge> newEdgeMap() { + return newElementMap("e"); + } + + @Override + protected Map<X, Vertex> newVertexMap() { + return newElementMap("v"); + } + + +} diff --git a/src/test/java/com/syncleus/spangraph/InfiniPeerTest.java b/src/test/java/com/syncleus/spangraph/InfiniPeerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6ee432241bb02c0fc8194e3281e91d68ab5ae3ec --- /dev/null +++ b/src/test/java/com/syncleus/spangraph/InfiniPeerTest.java @@ -0,0 +1,67 @@ +package com.syncleus.spangraph; + +import org.infinispan.Cache; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Created by me on 6/7/15. + */ +public class InfiniPeerTest { + + + @Test + public void testInfiniPeerCache() { + + /*GlobalConfiguration globalConfig = new GlobalConfigurationBuilder() + .serialization() + .addAdvancedExternalizer(998, new PersonExternalizer()) + .addAdvancedExternalizer(999, new AddressExternalizer()) + .build();*/ + + /*Configuration config = new ConfigurationBuilder() + .persistence().passivation(false) + .addSingleFileStore().location("/tmp").async().enable() + .preload(false).shared(false).threadPoolSize(20).build();*/ + + + + InfiniPeer cacheMan = InfiniPeer.local("me"); + + final Cache<Object, Object> cache = cacheMan.the("abc", true); + + + // Add a entry + cache.put("key", "value"); + + //cache.putForExternalRead(UUID.randomUUID().toString(), cacheMan.getAddress()); + //cache.put(UUID.randomUUID().toString(), cacheMan.getAddress()); + +// Validate the entry is now in the cache + assertEquals(1, cache.size()); + assertTrue(cache.containsKey("key")); +// Remove the entry from the cache + Object v = cache.remove("key"); +// Validate the entry is no longer in the cache + assertEquals("value", v); + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + /* + + //synchronize for display purposes + synchronized(cache) { + cacheMan.print(cache, System.out); + } + */ + + } + + +} diff --git a/src/test/java/com/syncleus/spangraph/SpanGraphTest.java b/src/test/java/com/syncleus/spangraph/SpanGraphTest.java new file mode 100644 index 0000000000000000000000000000000000000000..c2cf3d8746b4fcb371185c77f2423e0e15eafc99 --- /dev/null +++ b/src/test/java/com/syncleus/spangraph/SpanGraphTest.java @@ -0,0 +1,91 @@ +package com.syncleus.spangraph; + +import com.tinkerpop.blueprints.Vertex; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class SpanGraphTest { + + +// @Test +// public void testVertexEdgePropagationMemory() throws InterruptedException { +// testVertexPropagation(x -> +// new SpanGraph("memory", InfiniPeer.cluster(x)) +// ); +// } + + @Test + public void testVertexEdgePropagationNetwork() throws InterruptedException { + testVertexPropagation(x -> + new SpanGraph("network", InfiniPeer.cluster(x)) + ); + } + + public void testVertexPropagation(Function<String, SpanGraph> graph) throws InterruptedException { + + final AtomicReference<SpanGraph> bRef = new AtomicReference(null); + + + SpanGraph a = graph.apply("PeerA"); + + Vertex vx = a.addVertex("x"); + Vertex vy = a.addVertex("y"); + assertEquals("correct vertex id", vx.getId(), "x"); + assertEquals("correct vertex id", vy.getId(), "y"); + assertEquals("non-string vertex id", ((MapGraph.MVertex) a.addVertex(17)).getId(), 17); + assertEquals(3, a.vertexCount()); + + Thread x = new Thread(() -> { + + try { + int preDelayMS = 10; + int afterConnectedDelayMS = 100; + + sleep(preDelayMS); + + SpanGraph b = graph.apply("PeerB"); + bRef.set(b); + + b.addEdge("xy", "x", "y"); + assertEquals(1, b.edgeCount()); + + sleep(afterConnectedDelayMS); + } catch (Throwable e) { + e.printStackTrace(); + assertTrue(e.toString(), false); + } + + }); + + x.start(); + x.join(); + + + SpanGraph b = bRef.get(); + + + assertEquals(3, a.vertexCount()); + assertEquals(1, a.edgeCount()); + assertEquals(0, a.differentEdges(b).size()); + assertEquals(0, b.differentEdges(a).size()); + assertEquals("Graphs:\n" + a.toString() + "\n" + b.toString() + "\n", a, b); + + + } + + + static void sleep(int ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + +}