diff --git a/OrientDB_logo.svg b/OrientDB_logo.svg new file mode 100644 index 00000000000..5f87b150c2c --- /dev/null +++ b/OrientDB_logo.svg @@ -0,0 +1,153 @@ + + + + + + + + + + + + + + + + + + + + + + diff --git a/client/src/main/java/com/orientechnologies/orient/client/binary/OChannelBinaryAsynchClient.java b/client/src/main/java/com/orientechnologies/orient/client/binary/OChannelBinaryAsynchClient.java index 834854aa2e6..2b1d1560c25 100755 --- a/client/src/main/java/com/orientechnologies/orient/client/binary/OChannelBinaryAsynchClient.java +++ b/client/src/main/java/com/orientechnologies/orient/client/binary/OChannelBinaryAsynchClient.java @@ -56,15 +56,17 @@ import com.orientechnologies.orient.enterprise.channel.binary.OResponseProcessingException; public class OChannelBinaryAsynchClient extends OChannelBinary { - private int socketTimeout; // IN MS - protected final short srvProtocolVersion; - private final Condition readCondition = getLockRead().getUnderlying().newCondition(); - private final int maxUnreadResponses; - private String serverURL; + private int socketTimeout; // IN MS + protected final short srvProtocolVersion; + private final Condition readCondition = getLockRead().getUnderlying().newCondition(); + private final int maxUnreadResponses; + private String serverURL; private volatile boolean channelRead = false; - private byte currentStatus; - private int currentSessionId; + private byte currentStatus; + private int currentSessionId; private volatile OAsynchChannelServiceThread serviceThread; + private volatile long lastUse; + private volatile boolean inUse; public OChannelBinaryAsynchClient(final String remoteHost, final int remotePort, final String iDatabaseName, final OContextConfiguration iConfig, final int iProtocolVersion) throws IOException { @@ -195,8 +197,8 @@ else if (!getLockRead().tryAcquireLock(iTimeout, TimeUnit.MILLISECONDS)) currentSessionId = readInt(); if (debug) - OLogManager.instance().debug(this, "%s - Read response: %d-%d", socket.getLocalAddress(), (int) currentStatus, - currentSessionId); + OLogManager.instance() + .debug(this, "%s - Read response: %d-%d", socket.getLocalAddress(), (int) currentStatus, currentSessionId); } catch (IOException e) { // UNLOCK THE RESOURCE AND PROPAGATES THE EXCEPTION @@ -222,14 +224,16 @@ else if (!getLockRead().tryAcquireLock(iTimeout, TimeUnit.MILLISECONDS)) try { if (debug) - OLogManager.instance().debug(this, "%s - Session %d skip response, it is for %d", socket.getLocalAddress(), - iRequesterId, currentSessionId); + OLogManager.instance() + .debug(this, "%s - Session %d skip response, it is for %d", socket.getLocalAddress(), iRequesterId, + currentSessionId); if (iTimeout > 0 && (System.currentTimeMillis() - startClock) > iTimeout) { readLock = false; - throw new IOException("Timeout on reading response from the server " - + (socket != null ? socket.getRemoteSocketAddress() : "") + " for the request " + iRequesterId); + throw new IOException( + "Timeout on reading response from the server " + (socket != null ? socket.getRemoteSocketAddress() : "") + + " for the request " + iRequesterId); } // IN CASE OF TOO MUCH TIME FOR READ A MESSAGE, ASYNC THREAD SHOULD NOT BE INCLUDE IN THIS CHECK @@ -255,8 +259,9 @@ else if (!getLockRead().tryAcquireLock(iTimeout, TimeUnit.MILLISECONDS)) if (debug) { final long now = System.currentTimeMillis(); - OLogManager.instance().debug(this, "Waked up: slept %dms, checking again from %s for session %d", (now - start), - socket.getLocalAddress(), iRequesterId); + OLogManager.instance() + .debug(this, "Waked up: slept %dms, checking again from %s for session %d", (now - start), socket.getLocalAddress(), + iRequesterId); } unreadResponse++; @@ -352,7 +357,6 @@ public boolean isConnected() { /** * Gets the major supported protocol version - * */ public short getSrvProtocolVersion() { return srvProtocolVersion; @@ -465,8 +469,9 @@ private void throwSerializedException(final byte[] serializedException) throws I // WRAP IT else OLogManager.instance().error(this, - "Error during exception serialization, serialized exception is not Throwable, exception type is " - + (throwable != null ? throwable.getClass().getName() : "null")); + "Error during exception serialization, serialized exception is not Throwable, exception type is " + (throwable != null ? + throwable.getClass().getName() : + "null")); } public void beginRequest(final byte iCommand, final OStorageRemoteSession session) throws IOException { @@ -494,4 +499,25 @@ public void setSocketTimeout(int socketTimeout) { this.socketTimeout = socketTimeout; } + private void markLastUse() { + lastUse = System.currentTimeMillis(); + } + + public long getLastUse() { + return lastUse; + } + + public void markReturned() { + markLastUse(); + inUse = false; + } + + public void markInUse() { + markLastUse(); + inUse = false; + } + + public boolean isInUse() { + return inUse; + } } diff --git a/client/src/main/java/com/orientechnologies/orient/client/remote/ORemoteConnectionManager.java b/client/src/main/java/com/orientechnologies/orient/client/remote/ORemoteConnectionManager.java index afa2dd982f5..e5df99e7ffa 100755 --- a/client/src/main/java/com/orientechnologies/orient/client/remote/ORemoteConnectionManager.java +++ b/client/src/main/java/com/orientechnologies/orient/client/remote/ORemoteConnectionManager.java @@ -24,12 +24,12 @@ import com.orientechnologies.orient.core.config.OContextConfiguration; import com.orientechnologies.orient.core.config.OGlobalConfiguration; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import static com.orientechnologies.orient.core.config.OGlobalConfiguration.*; /** * Manages network connections against OrientDB servers. All the connection pools are managed in a Map, but in the future @@ -42,10 +42,25 @@ public class ORemoteConnectionManager { protected final ConcurrentMap connections; protected final long timeout; + protected final long idleTimeout; + private final TimerTask idleTask; - public ORemoteConnectionManager(final long iTimeout) { + public ORemoteConnectionManager(final OContextConfiguration clientConfiguration, Timer timer) { connections = new ConcurrentHashMap(); - timeout = iTimeout; + timeout = clientConfiguration.getValueAsLong(NETWORK_LOCK_TIMEOUT); + int idleSecs = clientConfiguration.getValueAsInteger(CLIENT_CHANNEL_IDLE_TIMEOUT); + this.idleTimeout = TimeUnit.MILLISECONDS.convert(idleSecs, TimeUnit.SECONDS); + if (clientConfiguration.getValueAsBoolean(CLIENT_CHANNEL_IDLE_CLOSE)) { + idleTask = new TimerTask() { + @Override + public void run() { + checkIdle(); + } + }; + timer.schedule(this.idleTask, this.idleTimeout / 3); + } else { + idleTask = null; + } } public void close() { @@ -54,6 +69,9 @@ public void close() { } connections.clear(); + if (idleTask != null) { + idleTask.cancel(); + } } public OChannelBinaryAsynchClient acquire(String iServerURL, final OContextConfiguration clientConfiguration, @@ -82,7 +100,7 @@ public OChannelBinaryAsynchClient acquire(String iServerURL, final OContextConfi if (max != null) maxPool = Integer.parseInt(max.toString()); - final Object netLockTimeout = clientConfiguration.getValue(OGlobalConfiguration.NETWORK_LOCK_TIMEOUT); + final Object netLockTimeout = clientConfiguration.getValue(NETWORK_LOCK_TIMEOUT); if (netLockTimeout != null) localTimeout = Integer.parseInt(netLockTimeout.toString()); } @@ -99,6 +117,7 @@ public OChannelBinaryAsynchClient acquire(String iServerURL, final OContextConfi try { // RETURN THE RESOURCE OChannelBinaryAsynchClient ret = pool.acquire(iServerURL, localTimeout, clientConfiguration, iConfiguration, iListener); + ret.markInUse(); return ret; } catch (RuntimeException e) { @@ -115,6 +134,7 @@ public void release(final OChannelBinaryAsynchClient conn) { if (conn == null) return; + conn.markReturned(); final ORemoteConnectionPool pool = connections.get(conn.getServerURL()); if (pool != null) { if (!conn.isConnected()) { @@ -213,4 +233,10 @@ public ORemoteConnectionPool getPool(String url) { return connections.get(url); } + public void checkIdle() { + for (Map.Entry entry : connections.entrySet()) { + entry.getValue().checkIdle(idleTimeout); + } + } + } diff --git a/client/src/main/java/com/orientechnologies/orient/client/remote/ORemoteConnectionPool.java b/client/src/main/java/com/orientechnologies/orient/client/remote/ORemoteConnectionPool.java index b8273563bd7..5c971cc6a49 100644 --- a/client/src/main/java/com/orientechnologies/orient/client/remote/ORemoteConnectionPool.java +++ b/client/src/main/java/com/orientechnologies/orient/client/remote/ORemoteConnectionPool.java @@ -84,6 +84,7 @@ public boolean reuseResource(final String iKey, final Object[] iAdditionalArgs, } catch (Exception e) { OLogManager.instance().debug(this, "Error on closing socket connection", e); } + iValue.markInUse(); return canReuse; } @@ -91,14 +92,21 @@ public OResourcePool getPool() { return pool; } - public OChannelBinaryAsynchClient acquire(final String iServerURL, final long timeout, final OContextConfiguration clientConfiguration, final Map iConfiguration, final OStorageRemoteAsynchEventListener iListener) { - final OChannelBinaryAsynchClient ret = pool.getResource(iServerURL, timeout, clientConfiguration, iConfiguration, - iListener != null); + final OChannelBinaryAsynchClient ret = pool + .getResource(iServerURL, timeout, clientConfiguration, iConfiguration, iListener != null); if (listener != null && iListener != null) listener.addListener(this, ret, iListener); return ret; } + + public void checkIdle(long timeout) { + for (OChannelBinaryAsynchClient resource : pool.getResources()) { + if (!resource.isInUse() && resource.getLastUse() + timeout < System.currentTimeMillis()) { + resource.close(); + } + } + } } diff --git a/client/src/main/java/com/orientechnologies/orient/core/db/OrientDBRemote.java b/client/src/main/java/com/orientechnologies/orient/core/db/OrientDBRemote.java new file mode 100755 index 00000000000..c8fcf1e8cff --- /dev/null +++ b/client/src/main/java/com/orientechnologies/orient/core/db/OrientDBRemote.java @@ -0,0 +1,364 @@ +/* + * + * * Copyright 2010-2016 OrientDB LTD (http://orientdb.com) + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * + * * For more information: http://orientdb.com + * + */ + +package com.orientechnologies.orient.core.db; + +import com.orientechnologies.common.exception.OException; +import com.orientechnologies.common.log.OLogManager; +import com.orientechnologies.orient.client.remote.ORemoteConnectionManager; +import com.orientechnologies.orient.client.remote.OServerAdmin; +import com.orientechnologies.orient.client.remote.OStorageRemote; +import com.orientechnologies.orient.core.Orient; +import com.orientechnologies.orient.core.command.OCommandOutputListener; +import com.orientechnologies.orient.core.config.OGlobalConfiguration; +import com.orientechnologies.orient.core.db.document.ODatabaseDocumentRemote; +import com.orientechnologies.orient.core.exception.ODatabaseException; +import com.orientechnologies.orient.core.record.impl.ODocument; +import com.orientechnologies.orient.core.storage.OStorage; + +import java.io.IOException; +import java.io.InputStream; +import java.util.*; +import java.util.concurrent.Callable; + +import static com.orientechnologies.orient.client.remote.OStorageRemote.ADDRESS_SEPARATOR; +import static com.orientechnologies.orient.core.config.OGlobalConfiguration.NETWORK_LOCK_TIMEOUT; +import static com.orientechnologies.orient.core.config.OGlobalConfiguration.NETWORK_SOCKET_RETRY; + +/** + * Created by tglman on 08/04/16. + */ +public class OrientDBRemote implements OrientDBInternal { + private final Map storages = new HashMap<>(); + private final Set pools = new HashSet<>(); + private final String[] hosts; + private final OrientDBConfig configurations; + private final Orient orient; + protected volatile ORemoteConnectionManager connectionManager; + private volatile boolean open = true; + private Timer timer; + + public OrientDBRemote(String[] hosts, OrientDBConfig configurations, Orient orient) { + super(); + timer = new Timer(); + this.hosts = hosts; + this.orient = orient; + this.configurations = configurations != null ? configurations : OrientDBConfig.defaultConfig(); + connectionManager = new ORemoteConnectionManager(this.configurations.getConfigurations(), timer); + orient.addOrientDB(this); + } + + private String buildUrl(String name) { + return String.join(ADDRESS_SEPARATOR, hosts) + "/" + name; + } + + public ODatabaseDocumentInternal open(String name, String user, String password) { + return open(name, user, password, null); + } + + @Override + public synchronized ODatabaseDocumentInternal open(String name, String user, String password, OrientDBConfig config) { + checkOpen(); + OrientDBConfig resolvedConfig = solveConfig(config); + try { + OStorageRemote storage; + storage = storages.get(name); + if (storage == null) { + storage = new OStorageRemote(buildUrl(name), this, "rw", connectionManager, resolvedConfig); + storages.put(name, storage); + } + ODatabaseDocumentRemote db = new ODatabaseDocumentRemote(storage); + db.internalOpen(user, password, resolvedConfig); + return db; + } catch (Exception e) { + throw OException.wrapException(new ODatabaseException("Cannot open database '" + name + "'"), e); + } + } + + @Override + public void create(String name, String user, String password, ODatabaseType databaseType) { + create(name, user, password, databaseType, null); + } + + @Override + public synchronized void create(String name, String user, String password, ODatabaseType databaseType, OrientDBConfig config) { + connectEndExecute(name, user, password, admin -> { + String sendType = null; + if (databaseType == ODatabaseType.MEMORY) { + sendType = "memory"; + } else if (databaseType == ODatabaseType.PLOCAL) { + sendType = "plocal"; + } + admin.createDatabase(name, null, sendType); + return null; + }); + } + + public synchronized ODatabaseDocumentRemotePooled poolOpen(String name, String user, String password, + ODatabasePoolInternal pool) { + OStorageRemote storage = storages.get(name); + if (storage == null) { + try { + storage = new OStorageRemote(buildUrl(name), this, "rw", connectionManager, solveConfig(pool.getConfig())); + storages.put(name, storage); + } catch (Exception e) { + throw OException.wrapException(new ODatabaseException("Cannot open database '" + name + "'"), e); + } + } + ODatabaseDocumentRemotePooled db = new ODatabaseDocumentRemotePooled(pool, storage); + db.internalOpen(user, password, pool.getConfig()); + return db; + } + + public synchronized void closeStorage(OStorageRemote remote) { + ODatabaseDocumentRemote.deInit(remote); + storages.remove(remote.getName()); + remote.shutdown(); + } + + public ODocument getServerInfo(String username, String password) { + return connectEndExecute(null, username, password, (admin) -> { + return admin.getServerInfo(); + }); + } + + public ODocument getClusterStatus(String username, String password) { + return connectEndExecute(null, username, password, (admin) -> { + return admin.clusterStatus(); + }); + } + + public String getGlobalConfiguration(String username, String password, OGlobalConfiguration config) { + return connectEndExecute(null, username, password, (admin) -> { + return admin.getGlobalConfiguration(config); + }); + } + + public void setGlobalConfiguration(String username, String password, OGlobalConfiguration config, String iConfigValue) { + connectEndExecute(null, username, password, (admin) -> { + admin.setGlobalConfiguration(config, iConfigValue); + return null; + }); + } + + public Map getGlobalConfigurations(String username, String password) { + return connectEndExecute(null, username, password, (admin) -> { + return admin.getGlobalConfigurations(); + }); + } + + public ORemoteConnectionManager getConnectionManager() { + return connectionManager; + } + + private interface Operation { + T execute(OServerAdmin admin) throws IOException; + } + + private T connectEndExecute(String name, String user, String password, Operation operation) { + checkOpen(); + OServerAdmin admin = null; + int retry = configurations.getConfigurations().getValueAsInteger(NETWORK_SOCKET_RETRY); + while (retry > 0) { + try { + admin = new OServerAdmin(this, buildUrl(name)); + admin.connect(user, password); + return operation.execute(admin); + } catch (IOException e) { + retry--; + if (retry == 0) + throw OException + .wrapException(new ODatabaseException("Reached maximum retry limit on admin operations, the server may be offline"), + e); + } finally { + if (admin != null) + admin.close(); + } + } + // SHOULD NEVER REACH THIS POINT + throw new ODatabaseException("Reached maximum retry limit on admin operations, the server may be offline"); + } + + @Override + public synchronized boolean exists(String name, String user, String password) { + return connectEndExecute(name, user, password, admin -> { + // TODO: check for memory cases + return admin.existsDatabase(name, null); + }); + } + + @Override + public synchronized void drop(String name, String user, String password) { + connectEndExecute(name, user, password, admin -> { + // TODO: check for memory cases + return admin.dropDatabase(name, null); + }); + } + + @Override + public Set listDatabases(String user, String password) { + return connectEndExecute("", user, password, admin -> { + // TODO: check for memory cases + return admin.listDatabases().keySet(); + }); + } + + @Override + public void restore(String name, String user, String password, ODatabaseType type, String path, OrientDBConfig config) { + connectEndExecute(name, user, password, admin -> { + admin.createDatabase(name, "", type.name().toLowerCase(), path).close(); + return null; + }); + + } + + public ODatabasePoolInternal openPool(String name, String user, String password) { + return openPool(name, user, password, null); + } + + @Override + public ODatabasePoolInternal openPool(String name, String user, String password, OrientDBConfig config) { + checkOpen(); + ODatabasePoolImpl pool = new ODatabasePoolImpl(this, name, user, password, solveConfig(config)); + pools.add(pool); + return pool; + } + + public void removePool(ODatabasePoolInternal pool) { + pools.remove(pool); + } + + @Override + public void close() { + if (!open) + return; + removeShutdownHook(); + internalClose(); + } + + public void internalClose() { + if (!open) + return; + final List storagesCopy; + synchronized (this) { + // SHUTDOWN ENGINES AVOID OTHER OPENS + open = false; + storagesCopy = new ArrayList<>(storages.values()); + } + + for (OStorageRemote stg : storagesCopy) { + try { + ODatabaseDocumentRemote.deInit(stg); + OLogManager.instance().info(this, "- shutdown storage: " + stg.getName() + "..."); + stg.shutdown(); + } catch (Exception e) { + OLogManager.instance().warn(this, "-- error on shutdown storage", e); + } catch (Error e) { + OLogManager.instance().warn(this, "-- error on shutdown storage", e); + throw e; + } + } + synchronized (this) { + storages.clear(); + + connectionManager.close(); + } + } + + private OrientDBConfig solveConfig(OrientDBConfig config) { + if (config != null) { + config.setParent(this.configurations); + return config; + } else { + OrientDBConfig cfg = OrientDBConfig.defaultConfig(); + cfg.setParent(this.configurations); + return cfg; + } + } + + private void checkOpen() { + if (!open) + throw new ODatabaseException("OrientDB Instance is closed"); + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public boolean isEmbedded() { + return false; + } + + @Override + public void removeShutdownHook() { + orient.removeOrientDB(this); + } + + @Override + public void loadAllDatabases() { + //In remote does nothing + } + + @Override + public ODatabaseDocumentInternal openNoAuthenticate(String iDbUrl, String user) { + throw new UnsupportedOperationException("Open with no authentication is not supported in remote"); + } + + @Override + public void initCustomStorage(String name, String baseUrl, String userName, String userPassword) { + throw new UnsupportedOperationException("Custom storage is not supported in remote"); + } + + @Override + public Collection getStorages() { + throw new UnsupportedOperationException("List storage is not supported in remote"); + } + + @Override + public void replaceFactory(OEmbeddedDatabaseInstanceFactory instanceFactory) { + throw new UnsupportedOperationException("instance factory is not supported in remote"); + } + + @Override + public synchronized void forceDatabaseClose(String databaseName) { + OStorageRemote remote = storages.get(databaseName); + if (remote != null) + closeStorage(remote); + } + + @Override + public OEmbeddedDatabaseInstanceFactory getFactory() { + throw new UnsupportedOperationException("instance factory is not supported in remote"); + } + + @Override + public void restore(String name, InputStream in, Map options, Callable callable, + OCommandOutputListener iListener) { + throw new UnsupportedOperationException("raw restore is not supported in remote"); + } + + @Override + public ODatabaseDocumentInternal openNoAuthorization(String name) { + throw new UnsupportedOperationException("impossible skip authentication and authorization in remote"); + } + +} diff --git a/core/src/main/java/com/orientechnologies/orient/core/config/OGlobalConfiguration.java b/core/src/main/java/com/orientechnologies/orient/core/config/OGlobalConfiguration.java index cfe0aeca2e1..e23d25a88a7 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/config/OGlobalConfiguration.java +++ b/core/src/main/java/com/orientechnologies/orient/core/config/OGlobalConfiguration.java @@ -948,7 +948,12 @@ public void change(final Object iCurrentValue, final Object iNewValue) { // DEPRECATED IN 2.0, LEVEL1 CACHE CANNOT BE DISABLED ANYMORE @Deprecated CACHE_LOCAL_ENABLED("cache.local.enabled", "Deprecated, Level1 cache cannot be disabled anymore", Boolean.class, - true); + true), + + CLIENT_CHANNEL_IDLE_CLOSE("client.channel.idleAutoClose", "Enable the automatic close of idle sockets after a specific timeout", + Boolean.class, false), + + CLIENT_CHANNEL_IDLE_TIMEOUT("client.channel.idleTimeout", "sockets maximum time idle in seconds", Integer.class, 900); private final String key; private final Object defValue; @@ -1019,7 +1024,6 @@ public static void dumpConfiguration(final PrintStream out) { * Find the OGlobalConfiguration instance by the key. Key is case insensitive. * * @param iKey Key to find. It's case insensitive. - * * @return OGlobalConfiguration instance if found, otherwise null */ public static OGlobalConfiguration findByKey(final String iKey) { diff --git a/core/src/main/java/com/orientechnologies/orient/enterprise/channel/binary/OChannelBinary.java b/core/src/main/java/com/orientechnologies/orient/enterprise/channel/binary/OChannelBinary.java index 3baf633de42..93240d858b4 100755 --- a/core/src/main/java/com/orientechnologies/orient/enterprise/channel/binary/OChannelBinary.java +++ b/core/src/main/java/com/orientechnologies/orient/enterprise/channel/binary/OChannelBinary.java @@ -47,7 +47,7 @@ public abstract class OChannelBinary extends OChannel { public OChannelBinary(final Socket iSocket, final OContextConfiguration iConfig) throws IOException { super(iSocket, iConfig); - + socket.setKeepAlive(true); maxChunkSize = iConfig.getValueAsInteger(OGlobalConfiguration.NETWORK_BINARY_MAX_CONTENT_LENGTH) * 1024; debug = iConfig.getValueAsBoolean(OGlobalConfiguration.NETWORK_BINARY_DEBUG);