Skip to content

Commit

Permalink
feat(websocket): implementing the new websocket connector
Browse files Browse the repository at this point in the history
  • Loading branch information
notthetup committed Oct 3, 2024
1 parent 57d53a7 commit 7469500
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 57 deletions.
8 changes: 5 additions & 3 deletions etc/initrc.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ platform = new RealTimePlatform()
container = new MasterContainer(platform, port)
if (devname != null) container.addConnector(new SerialPortConnector(devname, baud, 'N81'))
if (web) {
WebServer.getInstance(8080).add("/", "/org/arl/fjage/web")
Connector conn = new WebSocketConnector(8080, "/shell/ws")
WebServer webserver = WebServer.getInstance(8080);
webserver.add("/", "/org/arl/fjage/web")
Connector conn = new WebSocketHubConnector(8080, "/shell/ws")
shell = new ShellAgent(new ConsoleShell(conn), new GroovyScriptEngine())
container.addConnector(new WebSocketConnector(8080, "/ws", true))
webserver.addWebSocketConnector('/ws', container)
// container.addConnector(new WebSocketConnector(8080, "/ws", true))
} else {
shell = new ShellAgent(new ConsoleShell(), new GroovyScriptEngine())
}
Expand Down
40 changes: 32 additions & 8 deletions src/main/java/org/arl/fjage/connectors/WebServer.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/******************************************************************************
Copyright (c) 2013, Mandar Chitre
Copyright (c) 2013, Mandar Chitre
This file is part of fjage which is released under Simplified BSD License.
See file LICENSE.txt or go to http://www.opensource.org/licenses/BSD-3-Clause
for full license details.
This file is part of fjage which is released under Simplified BSD License.
See file LICENSE.txt or go to http://www.opensource.org/licenses/BSD-3-Clause
for full license details.
******************************************************************************/
******************************************************************************/

package org.arl.fjage.connectors;

Expand All @@ -23,16 +23,19 @@
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.websocket.server.WebSocketHandler;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;

import javax.servlet.MultipartConfigElement;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.Part;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.URL;
import java.net.URLEncoder;
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -154,6 +157,7 @@ public static void shutdown() {
protected HandlerCollection handlerCollection = new HandlerCollection();
protected boolean started;
protected int port;
protected String host;

protected WebServer(int port) {
this(port, "127.0.0.1");
Expand All @@ -176,6 +180,11 @@ protected WebServer(int port, String ip) {
server.setHandler(gzipHandler);
ThreadPool pool = server.getThreadPool();
if (pool instanceof QueuedThreadPool) ((QueuedThreadPool)pool).setDaemon(true);
try {
host = InetAddress.getLocalHost().getHostAddress()+":"+port;
} catch (UnknownHostException ex) {
host = "0.0.0.0:"+port;
}
started = false;
}

Expand Down Expand Up @@ -399,6 +408,21 @@ public void addRule(Rule rule) {
}
}

/**
* Add a WebSocket context handler for a given context.
* @param context context path.
*/
public void addWebSocketConnector(String context, ConnectionListener listener) {
ContextHandler handler = new ContextHandler(context);
handler.setHandler(new WebSocketHandler() {
@Override
public void configure(WebSocketServletFactory factory) {
factory.setCreator((req, resp) -> new WebSocketConnector(listener, host+context));
}
});
add(handler);
}

public static class UploadHandler extends AbstractHandler {
private final MultipartConfigElement multipartConfig;
private final Path outputDir;
Expand Down
159 changes: 158 additions & 1 deletion src/main/java/org/arl/fjage/connectors/WebSocketConnector.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,161 @@
package org.arl.fjage.connectors;

public class WebSocketConnector {
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.*;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;

@WebSocket(maxIdleTime = Integer.MAX_VALUE, batchMode = BatchMode.OFF)
public class WebSocketConnector implements Connector {

private ConnectionListener listener;
private Session session;
private OutputThread outThread = null;
private final String name;

private final PseudoInputStream pin = new PseudoInputStream();
private final PseudoOutputStream pout = new PseudoOutputStream();

private final Logger log = Logger.getLogger(getClass().getName());

public WebSocketConnector(ConnectionListener listener, String hostpath) {
this.listener = listener;
this.name = "ws://" + hostpath;
outThread = new OutputThread();
outThread.start();
}

@OnWebSocketConnect
public void onConnect(Session session) {
log.fine("New connection from "+session.getRemoteAddress());
this.session = session;
listener.connected(this);
}

@OnWebSocketClose
public void onClose(int statusCode, String reason) {
log.fine("Connection from "+session.getRemoteAddress()+" closed");
session = null;
}

@OnWebSocketError
public void onError(Throwable t) {
log.warning(t.toString());
}

@OnWebSocketMessage
public void onMessage(String message) {
try {
pout.write(message.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
log.warning(e.toString());
}
}

void write(String s) {
try {
if (session != null && session.isOpen()) {
Future<Void> f = session.getRemote().sendStringByFuture(s);
try {
f.get(500, TimeUnit.MILLISECONDS);
} catch (TimeoutException e){
log.fine("Sending timed out. Closing connection to " + session.getRemoteAddress());
session.disconnect();
} catch (Exception e){
log.warning(e.toString());
}
}
} catch (Exception e) {
log.warning(e.toString());
}
}

@Override
public String getName() {
return name;
}

@Override
public InputStream getInputStream() {
return pin;
}

@Override
public OutputStream getOutputStream() {
return pout;
}

@Override
public boolean isReliable() {
return true;
}

@Override
public boolean waitOutputCompletion(long timeout) {
return true;
}

@Override
public void setConnectionListener(ConnectionListener listener) {
this.listener = listener;
}

@Override
public void close() {
if(session != null) {
session.close();
}
}

private class OutputThread extends Thread {

OutputThread() {
setName(getClass().getSimpleName()+":"+name);
setDaemon(true);
setPriority(MIN_PRIORITY);
}

@Override
public void run() {
while (true) {
String s;
s = pout.readLine();
if (s == null) break;
write(s);
try {
Thread.sleep(10);
} catch (InterruptedException ex) {
break;
}
}
}

void close() {
try {
if (pout != null) {
pout.close();
join();
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}

@Override
public String toString() {
if (session == null) {
return name + " (disconnected)";
}else {
return name + "/" + session.getRemoteAddress();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
/**
* Web socket connector.
*/
public class WebSocketHubConnector implements Connector, WebSocketCreator {
public class WebSocketHubConnector implements Connector {

protected String name;
protected boolean linemode = false;
Expand Down Expand Up @@ -89,7 +89,7 @@ protected void init(int port, String context, int maxMsgSize) {
handler.setHandler(new WebSocketHandler() {
@Override
public void configure(WebSocketServletFactory factory) {
factory.setCreator(WebSocketHubConnector.this);
factory.setCreator((req, resp) -> new WSHandler(WebSocketHubConnector.this));
if (maxMsgSize > 0) factory.getPolicy().setMaxTextMessageSize(maxMsgSize);
}
});
Expand All @@ -99,11 +99,6 @@ public void configure(WebSocketServletFactory factory) {
outThread.start();
}

@Override
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
return new WSHandler(this);
}

@Override
public String getName() {
return name;
Expand Down
38 changes: 0 additions & 38 deletions src/main/java/org/arl/fjage/connectors/WebSocketServer.java

This file was deleted.

0 comments on commit 7469500

Please sign in to comment.