From e15836e39675a79ecf65e64383347463c4cc46f4 Mon Sep 17 00:00:00 2001 From: Paul Gregoire Date: Tue, 22 Oct 2024 11:33:18 -0700 Subject: [PATCH] Refactor timeout and close routines; seems to plug the container leak --- .../net/websocket/WebSocketConnection.java | 90 +++++++++---------- .../red5/net/websocket/WebSocketScope.java | 4 +- .../net/websocket/WebSocketScopeManager.java | 11 +-- .../server/DefaultWebSocketEndpoint.java | 2 +- .../server/DefaultWsServerContainer.java | 2 +- .../server/WsHttpUpgradeHandler.java | 76 +++++++++++++--- 6 files changed, 112 insertions(+), 73 deletions(-) diff --git a/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java b/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java index 1380371af..197e5ab55 100644 --- a/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java +++ b/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java @@ -25,6 +25,9 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.stream.Stream; +import javax.websocket.CloseReason; +import javax.websocket.CloseReason.CloseCode; +import javax.websocket.CloseReason.CloseCodes; import javax.websocket.Extension; import javax.websocket.Session; @@ -65,6 +68,7 @@ public class WebSocketConnection extends AttributeStore implements Comparable scope; // unique identifier for the session @@ -163,6 +167,8 @@ public WebSocketConnection(WebSocketScope scope, Session session) { // add the timeouts to the user props userProps.put(Constants.READ_IDLE_TIMEOUT_MS, readTimeout); userProps.put(Constants.WRITE_IDLE_TIMEOUT_MS, sendTimeout); + // set the close timeout to 5 seconds + userProps.put(Constants.SESSION_CLOSE_TIMEOUT_PROPERTY, TimeUnit.SECONDS.toMillis(5)); if (isDebug) { log.debug("userProps: {}", userProps); } @@ -186,9 +192,8 @@ public void send(String data) throws UnsupportedEncodingException, IOException { } // process the incoming string if (StringUtils.isNotBlank(data)) { - final WsSession session = wsSession; // attempt send only if the session is not closed - if (session != null && !session.isClosed()) { + if (!wsSession.isClosed()) { try { if (useAsync) { if (sendFuture != null && !sendFuture.isDone()) { @@ -197,7 +202,7 @@ public void send(String data) throws UnsupportedEncodingException, IOException { } catch (TimeoutException e) { log.warn("Send timed out {}", wsSessionId); // if the session is not open, cancel the future - if (!session.isOpen()) { + if (!wsSession.isOpen()) { sendFuture.cancel(true); return; } @@ -205,13 +210,13 @@ public void send(String data) throws UnsupportedEncodingException, IOException { } synchronized (wsSessionId) { int lengthToWrite = data.getBytes().length; - sendFuture = session.getAsyncRemote().sendText(data); + sendFuture = wsSession.getAsyncRemote().sendText(data); updateWriteBytes(lengthToWrite); } } else { synchronized (wsSessionId) { int lengthToWrite = data.getBytes().length; - session.getBasicRemote().sendText(data); + wsSession.getBasicRemote().sendText(data); updateWriteBytes(lengthToWrite); } } @@ -236,8 +241,7 @@ public void send(byte[] buf) throws IOException { if (isDebug) { log.debug("send binary: {}", Arrays.toString(buf)); } - WsSession session = wsSession; - if (session != null && session.isOpen()) { + if (!wsSession.isClosed()) { try { // send the bytes if (useAsync) { @@ -253,12 +257,12 @@ public void send(byte[] buf) throws IOException { } } synchronized (wsSessionId) { - sendFuture = session.getAsyncRemote().sendBinary(ByteBuffer.wrap(buf)); + sendFuture = wsSession.getAsyncRemote().sendBinary(ByteBuffer.wrap(buf)); updateWriteBytes(buf.length); } } else { synchronized (wsSessionId) { - session.getBasicRemote().sendBinary(ByteBuffer.wrap(buf)); + wsSession.getBasicRemote().sendBinary(ByteBuffer.wrap(buf)); updateWriteBytes(buf.length); } } @@ -281,11 +285,10 @@ public void sendPing(byte[] buf) throws IllegalArgumentException, IOException { if (isTrace) { log.trace("send ping: {}", buf); } - WsSession session = wsSession; - if (session != null && session.isOpen()) { + if (!wsSession.isClosed()) { synchronized (wsSessionId) { // send the bytes - session.getBasicRemote().sendPing(ByteBuffer.wrap(buf)); + wsSession.getBasicRemote().sendPing(ByteBuffer.wrap(buf)); // update counter updateWriteBytes(buf.length); } @@ -305,11 +308,10 @@ public void sendPong(byte[] buf) throws IllegalArgumentException, IOException { if (isTrace) { log.trace("send pong: {}", buf); } - WsSession session = wsSession; - if (session != null && session.isOpen()) { + if (!wsSession.isClosed()) { synchronized (wsSessionId) { // send the bytes - session.getBasicRemote().sendPong(ByteBuffer.wrap(buf)); + wsSession.getBasicRemote().sendPong(ByteBuffer.wrap(buf)); // update counter updateWriteBytes(buf.length); } @@ -319,14 +321,34 @@ public void sendPong(byte[] buf) throws IllegalArgumentException, IOException { } /** - * close Connection + * Close the connection. */ public void close() { + close(CloseCodes.NORMAL_CLOSURE, ""); + } + + /** + * Close the connection with a reason. + * + * @param code CloseCode + * @param reasonPhrase short reason for closing + */ + public void close(CloseCode code, String reasonPhrase) { if (connected.compareAndSet(true, false)) { - log.debug("close: {}", wsSessionId); - // trying to close the session nicely + // no blank reasons + if (reasonPhrase == null) { + reasonPhrase = ""; + } + log.debug("close: {} code: {} reason: {}", wsSessionId, code, reasonPhrase); try { - wsSession.close(); + // close the session if open + if (wsSession.isOpen()) { + CloseReason reason = new CloseReason(code, reasonPhrase); + if (isDebug) { + log.debug("Closing session: {} with reason: {}", wsSessionId, reason); + } + wsSession.close(reason); + } } catch (Exception e) { log.debug("Exception closing session", e); } @@ -343,39 +365,9 @@ public void close() { if (headers != null) { headers = null; } - if (scope.get() != null) { - // disconnect from scope - scope.get().removeConnection(this); - // clear weak refs - scope.clear(); - } } } - /* - WsSession uses these userProperties for checkExpiration along with maxIdleTimeout - - configuration for read idle timeout on WebSocket session - READ_IDLE_TIMEOUT_MS = "org.apache.tomcat.websocket.READ_IDLE_TIMEOUT_MS"; - configuration for write idle timeout on WebSocket session - WRITE_IDLE_TIMEOUT_MS = "org.apache.tomcat.websocket.WRITE_IDLE_TIMEOUT_MS"; - */ - public void timeoutAsync(long now) { - // XXX(paul) only logging here as we should more than likely rely upon the container checking expiration - log.trace("timeoutAsync: {} on session id: {} read: {} written: {}", now, wsSessionId, readBytes, writtenBytes); - /* - WsSession session = wsSession; - Map props = session.getUserProperties(); - log.debug("Session properties: {}", props); - long maxIdleTimeout = session.getMaxIdleTimeout(); - long readTimeout = (long) props.get(Constants.READ_IDLE_TIMEOUT_MS); - long sendTimeout = (long) props.get(Constants.WRITE_IDLE_TIMEOUT_MS); - log.debug("Session timeouts - max: {} read: {} write: {}", maxIdleTimeout, readTimeout, sendTimeout); - //long readDelta = (now - lastReadTime), writeDelta = (now - lastWriteTime); - //log.debug("timeoutAsync: {} on {} last read: {} last write: {}", now, wsSessionId, readDelta, writeDelta); - */ - } - /** * Async send is enabled in non-Windows based systems; this provides a means to override it. * diff --git a/server/src/main/java/org/red5/net/websocket/WebSocketScope.java b/server/src/main/java/org/red5/net/websocket/WebSocketScope.java index 767178402..172d84d9a 100644 --- a/server/src/main/java/org/red5/net/websocket/WebSocketScope.java +++ b/server/src/main/java/org/red5/net/websocket/WebSocketScope.java @@ -14,6 +14,8 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArraySet; +import javax.websocket.CloseReason.CloseCodes; + import org.red5.net.websocket.listener.IWebSocketDataListener; import org.red5.net.websocket.model.WSMessage; import org.red5.server.api.scope.IScope; @@ -91,7 +93,7 @@ public void unregister() { // clean up the connections by first closing them conns.forEach(conn -> { if (conns.remove(conn)) { - conn.close(); + conn.close(CloseCodes.GOING_AWAY, "WebSocket scope removed"); } }); // clean up the listeners by first stopping them diff --git a/server/src/main/java/org/red5/net/websocket/WebSocketScopeManager.java b/server/src/main/java/org/red5/net/websocket/WebSocketScopeManager.java index 70ab277e5..d01f07d8d 100644 --- a/server/src/main/java/org/red5/net/websocket/WebSocketScopeManager.java +++ b/server/src/main/java/org/red5/net/websocket/WebSocketScopeManager.java @@ -15,6 +15,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; +import javax.websocket.CloseReason.CloseCodes; + import org.red5.net.websocket.listener.DefaultWebSocketDataListener; import org.red5.net.websocket.listener.IWebSocketDataListener; import org.red5.net.websocket.listener.IWebSocketScopeListener; @@ -172,17 +174,12 @@ public boolean addWebSocketScope(WebSocketScope webSocketScope) { wsConn.sendPing(PING_BYTES); } catch (Exception e) { log.debug("Exception pinging connection: {} connection will be closed", wsConn.getSessionId(), e); - // if the connection isn't connected, remove them - wsScope.removeConnection(wsConn); - // if the ping fails, consider them gone - wsConn.close(); + wsConn.close(CloseCodes.CLOSED_ABNORMALLY, e.getMessage()); } } else { log.debug("Removing unconnected connection: {} during ping loop", wsConn.getSessionId()); // if the connection isn't connected, remove them - wsScope.removeConnection(wsConn); - // if connection is not connected, close it (ensure closed / removed) - wsConn.close(); + wsConn.close(CloseCodes.UNEXPECTED_CONDITION, "Connection not connected"); } } catch (Exception e) { log.warn("Exception in WS pinger", e); diff --git a/server/src/main/java/org/red5/net/websocket/server/DefaultWebSocketEndpoint.java b/server/src/main/java/org/red5/net/websocket/server/DefaultWebSocketEndpoint.java index 47b6bcbcb..41e72a2f4 100644 --- a/server/src/main/java/org/red5/net/websocket/server/DefaultWebSocketEndpoint.java +++ b/server/src/main/java/org/red5/net/websocket/server/DefaultWebSocketEndpoint.java @@ -91,7 +91,7 @@ public void onClose(Session session, CloseReason closeReason) { // force remove on exception scope.removeConnection(conn); // fire close, to be sure - conn.close(); + conn.close(closeReason.getCloseCode(), closeReason.getReasonPhrase()); } } } diff --git a/server/src/main/java/org/red5/net/websocket/server/DefaultWsServerContainer.java b/server/src/main/java/org/red5/net/websocket/server/DefaultWsServerContainer.java index fbdb973f8..7ea01dae5 100644 --- a/server/src/main/java/org/red5/net/websocket/server/DefaultWsServerContainer.java +++ b/server/src/main/java/org/red5/net/websocket/server/DefaultWsServerContainer.java @@ -301,7 +301,7 @@ protected void registerSession(Object endpoint, WsSession wsSession) { */ @Override protected void unregisterSession(Object endpoint, WsSession wsSession) { - if (wsSession.getUserPrincipal() != null && wsSession.getHttpSessionId() != null) { + if (wsSession.getHttpSessionId() != null) { unregisterAuthenticatedSession(wsSession, wsSession.getHttpSessionId()); log.debug("unregisterSession - unregisterAuthenticatedSession: {}", wsSession.getId()); } diff --git a/server/src/main/java/org/red5/net/websocket/server/WsHttpUpgradeHandler.java b/server/src/main/java/org/red5/net/websocket/server/WsHttpUpgradeHandler.java index fd1abdfc4..f46664779 100644 --- a/server/src/main/java/org/red5/net/websocket/server/WsHttpUpgradeHandler.java +++ b/server/src/main/java/org/red5/net/websocket/server/WsHttpUpgradeHandler.java @@ -21,6 +21,7 @@ import org.apache.tomcat.util.net.SocketEvent; import org.apache.tomcat.util.net.SocketWrapperBase; import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.websocket.Constants; import org.apache.tomcat.websocket.Transformation; import org.apache.tomcat.websocket.WsIOException; import org.apache.tomcat.websocket.WsSession; @@ -78,6 +79,10 @@ public class WsHttpUpgradeHandler implements InternalHttpUpgradeHandler { private WsSession wsSession; + private long lastTimeoutCheck = System.currentTimeMillis(); + + private long lastReadBytes, lastWrittenBytes; + public WsHttpUpgradeHandler() { applicationClassLoader = Thread.currentThread().getContextClassLoader(); } @@ -284,23 +289,66 @@ public void setSslSupport(SSLSupport sslSupport) { @Override public void timeoutAsync(long now) { log.trace("timeoutAsync: {} on session: {}", now, wsSession); - // session methods may not be called if the session is not open if (wsSession != null) { - if (wsSession.isOpen()) { - try { - // if we have a timeout, inform the ws connection - WebSocketConnection conn = (WebSocketConnection) wsSession.getUserProperties().get(WSConstants.WS_CONNECTION); - if (conn != null) { - conn.timeoutAsync(now); + try { + final String wsSessionId = wsSession.getId(); + // get scope from endpoint config + WebSocketScope scope = (WebSocketScope) endpointConfig.getUserProperties().get(WSConstants.WS_SCOPE); + // do lookup by session id, skips need for session user props + WebSocketConnection conn = scope.getConnectionBySessionId(wsSessionId); + // if we don't get it from the scope, try the session lookup + if (conn == null && wsSession.isOpen()) { + // session methods may not be called if its not open + conn = (WebSocketConnection) wsSession.getUserProperties().get(WSConstants.WS_CONNECTION); + } + // last check, if we don't have a connection, log a warning + if (conn == null) { + log.warn("Connection for id: {} was not found in the scope or session: {}", wsSession.getId(), scope.getPath()); + return; + } + // negative now means always treat as expired + if (now > 0) { + long checkDelta = now - lastTimeoutCheck; + long readBytes = conn.getReadBytes(), writtenBytes = conn.getWrittenBytes(); + log.info("timeoutAsync: {}ms on session id: {} read: {} written: {}", checkDelta, wsSessionId, readBytes, writtenBytes); + Map props = wsSession.getUserProperties(); + log.debug("Session properties: {}", props); + long maxIdleTimeout = wsSession.getMaxIdleTimeout(); + long readTimeout = (long) props.get(Constants.READ_IDLE_TIMEOUT_MS); + long writeTimeout = (long) props.get(Constants.WRITE_IDLE_TIMEOUT_MS); + log.debug("Session timeouts - max: {} read: {} write: {}", maxIdleTimeout, readTimeout, writeTimeout); + if (maxIdleTimeout > 0) { + if (checkDelta > maxIdleTimeout && (readBytes == lastReadBytes || writtenBytes == lastWrittenBytes)) { + log.info("Max idle timeout: {}ms on session id: {}", checkDelta, wsSessionId); + conn.close(CloseCodes.GOING_AWAY, "Max idle timeout"); + } + } else { + if (readTimeout > 0) { + if (readBytes == lastReadBytes) { + if (checkDelta > readTimeout) { + log.info("Read timeout: {}ms on session id: {}", checkDelta, wsSessionId); + conn.close(CloseCodes.GOING_AWAY, "Read timeout"); + } + } + } + if (writeTimeout > 0) { + if (writtenBytes == lastWrittenBytes) { + if (checkDelta > writeTimeout) { + log.info("Write timeout: {}ms on session id: {}", checkDelta, wsSessionId); + conn.close(CloseCodes.GOING_AWAY, "Write timeout"); + } + } + } } - } catch (Throwable t) { - log.warn(sm.getString("wsHttpUpgradeHandler.timeoutAsyncFailed"), t); + lastReadBytes = readBytes; + lastWrittenBytes = writtenBytes; + lastTimeoutCheck = now; + } else { + log.warn("timeoutAsync: negative time on session id: {}", wsSessionId); + conn.close(CloseCodes.GOING_AWAY, "Timeout expired"); } - } else { - log.debug("timeoutAsync: {} session is not open for session id: {}", now, wsSession.getId()); - // we need the processor released from the async waitingProcessors list - // located in abstract protocol - //socketWrapper.close(); + } catch (Throwable t) { + log.warn(sm.getString("wsHttpUpgradeHandler.timeoutAsyncFailed"), t); } } }