diff --git a/etc/initrc.groovy b/etc/initrc.groovy index 96de638d..a1817c7c 100644 --- a/etc/initrc.groovy +++ b/etc/initrc.groovy @@ -24,10 +24,12 @@ platform = new RealTimePlatform() container = new MasterContainer(platform, port) if (devname != null) container.addConnector(new SerialPortConnector(devname, baud, 'N81')) if (web) { - WebServer.getInstance(8080).add("/", "/org/arl/fjage/web") - Connector conn = new WebSocketConnector(8080, "/shell/ws") + WebServer webserver = WebServer.getInstance(8080); + webserver.add("/", "/org/arl/fjage/web") + Connector conn = new WebSocketHubConnector(8080, "/shell/ws") shell = new ShellAgent(new ConsoleShell(conn), new GroovyScriptEngine()) - container.addConnector(new WebSocketConnector(8080, "/ws", true)) + webserver.addWebSocketConnector('/ws', container) +// container.addConnector(new WebSocketConnector(8080, "/ws", true)) } else { shell = new ShellAgent(new ConsoleShell(), new GroovyScriptEngine()) } diff --git a/src/main/java/org/arl/fjage/connectors/WebServer.java b/src/main/java/org/arl/fjage/connectors/WebServer.java index 834d90ec..fd314785 100644 --- a/src/main/java/org/arl/fjage/connectors/WebServer.java +++ b/src/main/java/org/arl/fjage/connectors/WebServer.java @@ -1,12 +1,12 @@ /****************************************************************************** -Copyright (c) 2013, Mandar Chitre + Copyright (c) 2013, Mandar Chitre -This file is part of fjage which is released under Simplified BSD License. -See file LICENSE.txt or go to http://www.opensource.org/licenses/BSD-3-Clause -for full license details. + This file is part of fjage which is released under Simplified BSD License. + See file LICENSE.txt or go to http://www.opensource.org/licenses/BSD-3-Clause + for full license details. -******************************************************************************/ + ******************************************************************************/ package org.arl.fjage.connectors; @@ -23,6 +23,11 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.ThreadPool; +import org.eclipse.jetty.websocket.server.WebSocketHandler; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; import javax.servlet.MultipartConfigElement; import javax.servlet.ServletException; @@ -30,9 +35,7 @@ import javax.servlet.http.HttpServletResponse; import javax.servlet.http.Part; import java.io.*; -import java.net.InetSocketAddress; -import java.net.URL; -import java.net.URLEncoder; +import java.net.*; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -154,6 +157,7 @@ public static void shutdown() { protected HandlerCollection handlerCollection = new HandlerCollection(); protected boolean started; protected int port; + protected String host; protected WebServer(int port) { this(port, "127.0.0.1"); @@ -176,6 +180,11 @@ protected WebServer(int port, String ip) { server.setHandler(gzipHandler); ThreadPool pool = server.getThreadPool(); if (pool instanceof QueuedThreadPool) ((QueuedThreadPool)pool).setDaemon(true); + try { + host = InetAddress.getLocalHost().getHostAddress()+":"+port; + } catch (UnknownHostException ex) { + host = "0.0.0.0:"+port; + } started = false; } @@ -399,6 +408,21 @@ public void addRule(Rule rule) { } } + /** + * Add a WebSocket context handler for a given context. + * @param context context path. + */ + public void addWebSocketConnector(String context, ConnectionListener listener) { + ContextHandler handler = new ContextHandler(context); + handler.setHandler(new WebSocketHandler() { + @Override + public void configure(WebSocketServletFactory factory) { + factory.setCreator((req, resp) -> new WebSocketConnector(listener, host+context)); + } + }); + add(handler); + } + public static class UploadHandler extends AbstractHandler { private final MultipartConfigElement multipartConfig; private final Path outputDir; diff --git a/src/main/java/org/arl/fjage/connectors/WebSocketConnector.java b/src/main/java/org/arl/fjage/connectors/WebSocketConnector.java index 2adcb2e3..f6448347 100644 --- a/src/main/java/org/arl/fjage/connectors/WebSocketConnector.java +++ b/src/main/java/org/arl/fjage/connectors/WebSocketConnector.java @@ -1,4 +1,161 @@ package org.arl.fjage.connectors; -public class WebSocketConnector { +import org.eclipse.jetty.websocket.api.BatchMode; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.*; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; + +@WebSocket(maxIdleTime = Integer.MAX_VALUE, batchMode = BatchMode.OFF) +public class WebSocketConnector implements Connector { + + private ConnectionListener listener; + private Session session; + private OutputThread outThread = null; + private final String name; + + private final PseudoInputStream pin = new PseudoInputStream(); + private final PseudoOutputStream pout = new PseudoOutputStream(); + + private final Logger log = Logger.getLogger(getClass().getName()); + + public WebSocketConnector(ConnectionListener listener, String hostpath) { + this.listener = listener; + this.name = "ws://" + hostpath; + outThread = new OutputThread(); + outThread.start(); + } + + @OnWebSocketConnect + public void onConnect(Session session) { + log.fine("New connection from "+session.getRemoteAddress()); + this.session = session; + listener.connected(this); + } + + @OnWebSocketClose + public void onClose(int statusCode, String reason) { + log.fine("Connection from "+session.getRemoteAddress()+" closed"); + session = null; + } + + @OnWebSocketError + public void onError(Throwable t) { + log.warning(t.toString()); + } + + @OnWebSocketMessage + public void onMessage(String message) { + try { + pout.write(message.getBytes(StandardCharsets.UTF_8)); + } catch (IOException e) { + log.warning(e.toString()); + } + } + + void write(String s) { + try { + if (session != null && session.isOpen()) { + Future f = session.getRemote().sendStringByFuture(s); + try { + f.get(500, TimeUnit.MILLISECONDS); + } catch (TimeoutException e){ + log.fine("Sending timed out. Closing connection to " + session.getRemoteAddress()); + session.disconnect(); + } catch (Exception e){ + log.warning(e.toString()); + } + } + } catch (Exception e) { + log.warning(e.toString()); + } + } + + @Override + public String getName() { + return name; + } + + @Override + public InputStream getInputStream() { + return pin; + } + + @Override + public OutputStream getOutputStream() { + return pout; + } + + @Override + public boolean isReliable() { + return true; + } + + @Override + public boolean waitOutputCompletion(long timeout) { + return true; + } + + @Override + public void setConnectionListener(ConnectionListener listener) { + this.listener = listener; + } + + @Override + public void close() { + if(session != null) { + session.close(); + } + } + + private class OutputThread extends Thread { + + OutputThread() { + setName(getClass().getSimpleName()+":"+name); + setDaemon(true); + setPriority(MIN_PRIORITY); + } + + @Override + public void run() { + while (true) { + String s; + s = pout.readLine(); + if (s == null) break; + write(s); + try { + Thread.sleep(10); + } catch (InterruptedException ex) { + break; + } + } + } + + void close() { + try { + if (pout != null) { + pout.close(); + join(); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + } + + @Override + public String toString() { + if (session == null) { + return name + " (disconnected)"; + }else { + return name + "/" + session.getRemoteAddress(); + } + } } diff --git a/src/main/java/org/arl/fjage/connectors/WebSocketHubConnector.java b/src/main/java/org/arl/fjage/connectors/WebSocketHubConnector.java index 5f900396..fa17f43f 100644 --- a/src/main/java/org/arl/fjage/connectors/WebSocketHubConnector.java +++ b/src/main/java/org/arl/fjage/connectors/WebSocketHubConnector.java @@ -27,7 +27,7 @@ /** * Web socket connector. */ -public class WebSocketHubConnector implements Connector, WebSocketCreator { +public class WebSocketHubConnector implements Connector { protected String name; protected boolean linemode = false; @@ -89,7 +89,7 @@ protected void init(int port, String context, int maxMsgSize) { handler.setHandler(new WebSocketHandler() { @Override public void configure(WebSocketServletFactory factory) { - factory.setCreator(WebSocketHubConnector.this); + factory.setCreator((req, resp) -> new WSHandler(WebSocketHubConnector.this)); if (maxMsgSize > 0) factory.getPolicy().setMaxTextMessageSize(maxMsgSize); } }); @@ -99,11 +99,6 @@ public void configure(WebSocketServletFactory factory) { outThread.start(); } - @Override - public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) { - return new WSHandler(this); - } - @Override public String getName() { return name; diff --git a/src/main/java/org/arl/fjage/connectors/WebSocketServer.java b/src/main/java/org/arl/fjage/connectors/WebSocketServer.java deleted file mode 100644 index cd12455b..00000000 --- a/src/main/java/org/arl/fjage/connectors/WebSocketServer.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.arl.fjage.connectors; - -import java.io.Closeable; -import java.io.IOException; -import java.net.InetAddress; -import java.net.ServerSocket; -import java.net.UnknownHostException; -import java.util.logging.Logger; - -public class WebSocketServer implements Closeable { - - protected int port; - protected WebServer server = null; - protected ConnectionListener listener; - protected String name = null; - protected Logger log = Logger.getLogger(getClass().getName()); - - /** - * Create a TCP server running on a specified port. - * - * @param port TCP port number (0 to autoselect). - */ - public WebSocketServer(int port, String context, WebServer server, ConnectionListener listener) { - this.port = port; - this.server = server; - this.listener = listener; - try { - name = "ws://"+ InetAddress.getLocalHost().getHostAddress()+":"+port+context; - } catch (UnknownHostException ex) { - name = "ws://0.0.0.0:"+port+context; - } - } - - @Override - public void close() throws IOException { -// server.remove(); - } -}