diff --git a/app-stream-api/pom.xml b/app-stream-api/pom.xml index 99fd628..b2afdde 100644 --- a/app-stream-api/pom.xml +++ b/app-stream-api/pom.xml @@ -5,7 +5,7 @@ open-app-stream-client com.dingtalk.open - 1.2.0 + 1.2.1 ../pom.xml 4.0.0 diff --git a/app-stream-api/src/main/java/com/dingtalk/open/app/api/OpenDingTalkStreamClient.java b/app-stream-api/src/main/java/com/dingtalk/open/app/api/OpenDingTalkStreamClient.java index d3690f2..4a593d8 100644 --- a/app-stream-api/src/main/java/com/dingtalk/open/app/api/OpenDingTalkStreamClient.java +++ b/app-stream-api/src/main/java/com/dingtalk/open/app/api/OpenDingTalkStreamClient.java @@ -14,6 +14,7 @@ import com.dingtalk.open.app.stream.network.core.NetWorkService; import com.dingtalk.open.app.stream.network.core.Subscription; +import java.net.Proxy; import java.util.Collections; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -33,23 +34,26 @@ class OpenDingTalkStreamClient implements OpenDingTalkClient { private OpenApiClient openApiClient; private Set subscriptions; private final AtomicReference status; + private final Proxy proxy; - public OpenDingTalkStreamClient(DingTalkCredential credential, CommandDispatcher dispatcher, ExecutorService executor, ClientOption option, Set subscriptions) { + public OpenDingTalkStreamClient(DingTalkCredential credential, CommandDispatcher dispatcher, ExecutorService executor, ClientOption option, Set subscriptions, + Proxy proxy) { this.credential = credential; this.dispatcher = dispatcher; this.executor = executor; this.option = option; this.subscriptions = Collections.unmodifiableSet(subscriptions); this.status = new AtomicReference<>(Status.INIT); + this.proxy = proxy; } @Override public synchronized void start() throws OpenDingTalkAppException { if (status.get() == Status.INIT) { this.openApiClient = OpenApiClientBuilder.create().setHost(option.getOpenApiHost()).setTimeout(option.getConnectionTTL()).build(); - final EndPointConnectionFactory factory = () -> openConnection(this.credential, subscriptions); + final EndPointConnectionFactory factory = () -> openConnection(this.credential, subscriptions, proxy); ClientConnectionListener listener = new AppServiceListener(dispatcher, executor); - this.netWorkService = new NetWorkService(factory, listener, option.getMaxConnectionCount(), option.getConnectionTTL(), option.getConnectTimeout(),option.getKeepAliveOption().getKeepAliveIdleMill()); + this.netWorkService = new NetWorkService(factory, listener, option.getMaxConnectionCount(), option.getConnectionTTL(), option.getConnectTimeout(), option.getKeepAliveOption().getKeepAliveIdleMill()); this.netWorkService.start(); this.status.set(Status.ACTIVE); } else if (status.get() == Status.INACTIVE) { @@ -70,7 +74,7 @@ public synchronized void stop() throws Exception { } } - private EndPointConnection openConnection(DingTalkCredential credential, Set subscriptions) throws Exception { + private EndPointConnection openConnection(DingTalkCredential credential, Set subscriptions, Proxy proxy) throws Exception { OpenConnectionRequest request = new OpenConnectionRequest(); request.setClientId(credential.getClientId()); request.setClientSecret(credential.getClientSecret()); @@ -78,7 +82,7 @@ private EndPointConnection openConnection(DingTalkCredential credential, Set com.dingtalk.open open-app-stream-client - 1.2.0 + 1.2.1 ../pom.xml app-stream-client jar - 1.2.0 + 1.2.1 app-stream-client diff --git a/app-stream-network/app-stream-network-api/pom.xml b/app-stream-network/app-stream-network-api/pom.xml index ecda205..3b596dd 100644 --- a/app-stream-network/app-stream-network-api/pom.xml +++ b/app-stream-network/app-stream-network-api/pom.xml @@ -4,12 +4,12 @@ com.dingtalk.open app-stream-network - 1.2.0 + 1.2.1 ../pom.xml app-stream-network-api - 1.2.0 + 1.2.1 jar app-stream-network-api diff --git a/app-stream-network/app-stream-network-api/src/main/java/com/dingtalk/open/app/stream/network/api/EndPointConnection.java b/app-stream-network/app-stream-network-api/src/main/java/com/dingtalk/open/app/stream/network/api/EndPointConnection.java index 39551a1..bcf2f5f 100644 --- a/app-stream-network/app-stream-network-api/src/main/java/com/dingtalk/open/app/stream/network/api/EndPointConnection.java +++ b/app-stream-network/app-stream-network-api/src/main/java/com/dingtalk/open/app/stream/network/api/EndPointConnection.java @@ -1,5 +1,6 @@ package com.dingtalk.open.app.stream.network.api; +import java.net.Proxy; import java.net.URI; import java.net.URISyntaxException; @@ -12,7 +13,9 @@ public class EndPointConnection { private final URI endPoint; private final String connectionId; - public EndPointConnection(String clientId, String endPoint, String connectionId) { + private final Proxy proxy; + + public EndPointConnection(String clientId, String endPoint, String connectionId, Proxy proxy) { this.clientId = clientId; try { this.endPoint = new URI(endPoint); @@ -20,6 +23,7 @@ public EndPointConnection(String clientId, String endPoint, String connectionId) throw new RuntimeException(e); } this.connectionId = connectionId; + this.proxy = proxy; } public URI getEndPoint() { @@ -38,4 +42,7 @@ public TransportProtocol getProtocol() { return TransportProtocol.parseScheme(endPoint.getScheme()); } + public Proxy getProxy() { + return proxy; + } } diff --git a/app-stream-network/app-stream-network-api/src/main/java/com/dingtalk/open/app/stream/network/api/NetProxy.java b/app-stream-network/app-stream-network-api/src/main/java/com/dingtalk/open/app/stream/network/api/NetProxy.java new file mode 100644 index 0000000..c31d761 --- /dev/null +++ b/app-stream-network/app-stream-network-api/src/main/java/com/dingtalk/open/app/stream/network/api/NetProxy.java @@ -0,0 +1,23 @@ +package com.dingtalk.open.app.stream.network.api; + +/** + * @author feiyin + * @date 2023/11/21 + */ +public class NetProxy { + private String host; + private Integer port; + + public NetProxy(String host, Integer port) { + this.host = host; + this.port = port; + } + + public String getIp() { + return host; + } + + public Integer getPort() { + return port; + } +} diff --git a/app-stream-network/app-stream-network-core/pom.xml b/app-stream-network/app-stream-network-core/pom.xml index 89fbb0d..2d3ce02 100644 --- a/app-stream-network/app-stream-network-core/pom.xml +++ b/app-stream-network/app-stream-network-core/pom.xml @@ -4,7 +4,7 @@ com.dingtalk.open app-stream-network - 1.2.0 + 1.2.1 ../pom.xml diff --git a/app-stream-network/app-stream-network-core/src/main/java/com/dingtalk/open/app/stream/network/core/Connector.java b/app-stream-network/app-stream-network-core/src/main/java/com/dingtalk/open/app/stream/network/core/Connector.java index a48658a..847ab96 100644 --- a/app-stream-network/app-stream-network-core/src/main/java/com/dingtalk/open/app/stream/network/core/Connector.java +++ b/app-stream-network/app-stream-network-core/src/main/java/com/dingtalk/open/app/stream/network/core/Connector.java @@ -69,7 +69,7 @@ private static void ensureActive() { } INIT = true; } catch (Exception e) { - LOGGER.error("[DingTalk] client init transport failed", e); + LOGGER.error("[DingTalk] client init transport failed, {}", e); } finally { INIT_LOCK.unlock(); } diff --git a/app-stream-network/app-stream-network-core/src/main/java/com/dingtalk/open/app/stream/network/core/DefaultSessionPool.java b/app-stream-network/app-stream-network-core/src/main/java/com/dingtalk/open/app/stream/network/core/DefaultSessionPool.java index 126d950..d3e4669 100644 --- a/app-stream-network/app-stream-network-core/src/main/java/com/dingtalk/open/app/stream/network/core/DefaultSessionPool.java +++ b/app-stream-network/app-stream-network-core/src/main/java/com/dingtalk/open/app/stream/network/core/DefaultSessionPool.java @@ -177,7 +177,7 @@ public void run() { } } } catch (Throwable e) { - LOGGER.error("[DingTalk] establish connection failed", e); + LOGGER.error("[DingTalk] establish connection failed, {}", e); } } } @@ -201,7 +201,7 @@ public T run(Callable callable) throws Exception { try { return callable.call(); } catch (Exception e) { - LOGGER.error("[DingTalk] retrievable executor execute failed", e); + LOGGER.error("[DingTalk] retrievable executor execute failed, {}", e); if (count.get() <= 0) { throw e; } diff --git a/app-stream-network/app-stream-network-core/src/main/java/com/dingtalk/open/app/stream/network/core/NetWorkService.java b/app-stream-network/app-stream-network-core/src/main/java/com/dingtalk/open/app/stream/network/core/NetWorkService.java index c1175c8..549cac9 100644 --- a/app-stream-network/app-stream-network-core/src/main/java/com/dingtalk/open/app/stream/network/core/NetWorkService.java +++ b/app-stream-network/app-stream-network-core/src/main/java/com/dingtalk/open/app/stream/network/core/NetWorkService.java @@ -1,8 +1,9 @@ package com.dingtalk.open.app.stream.network.core; import com.dingtalk.open.app.stream.network.api.ClientConnectionListener; -import com.dingtalk.open.app.stream.network.api.logger.InternalLogger; -import com.dingtalk.open.app.stream.network.api.logger.InternalLoggerFactory; + +import java.net.Proxy; + /** * @author feiyin @@ -11,14 +12,10 @@ public class NetWorkService { private final DefaultSessionPool sessionPool; - public NetWorkService(EndPointConnectionFactory factory, - ClientConnectionListener listener, - int maxConnection, - long ttl, - long connectTimeout, - long keepAliveIdle) { + public NetWorkService(EndPointConnectionFactory factory, ClientConnectionListener listener, int maxConnection, long ttl, long connectTimeout, long keepAliveIdle) { this.sessionPool = new DefaultSessionPool(factory, maxConnection, ttl, connectTimeout, keepAliveIdle, listener); } + /** * 开始 */ diff --git a/app-stream-network/app-stream-network-rsocket/pom.xml b/app-stream-network/app-stream-network-rsocket/pom.xml index 465aa81..2dd72bb 100644 --- a/app-stream-network/app-stream-network-rsocket/pom.xml +++ b/app-stream-network/app-stream-network-rsocket/pom.xml @@ -4,7 +4,7 @@ com.dingtalk.open app-stream-network - 1.2.0 + 1.2.1 ../pom.xml diff --git a/app-stream-network/app-stream-network-ws/pom.xml b/app-stream-network/app-stream-network-ws/pom.xml index fac5b3d..1f10097 100644 --- a/app-stream-network/app-stream-network-ws/pom.xml +++ b/app-stream-network/app-stream-network-ws/pom.xml @@ -4,7 +4,7 @@ com.dingtalk.open app-stream-network - 1.2.0 + 1.2.1 ../pom.xml @@ -26,5 +26,9 @@ com.alibaba fastjson + + io.netty + netty-handler-proxy + diff --git a/app-stream-network/app-stream-network-ws/src/main/java/com/dingtalk/open/app/stream/network/ws/WebsocketTransportConnector.java b/app-stream-network/app-stream-network-ws/src/main/java/com/dingtalk/open/app/stream/network/ws/WebsocketTransportConnector.java index 04d969a..cfd6c97 100644 --- a/app-stream-network/app-stream-network-ws/src/main/java/com/dingtalk/open/app/stream/network/ws/WebsocketTransportConnector.java +++ b/app-stream-network/app-stream-network-ws/src/main/java/com/dingtalk/open/app/stream/network/ws/WebsocketTransportConnector.java @@ -12,6 +12,7 @@ import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolConfig; import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler; +import io.netty.handler.proxy.HttpProxyHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; @@ -31,6 +32,7 @@ @Protocol(protocol = {TransportProtocol.WSS}) public class WebsocketTransportConnector implements TransportConnector { private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(WebsocketTransportConnector.class); + @Override public Session connect(EndPointConnection connection, ClientConnectionListener listener, ConnectOption option) throws Exception { LOGGER.info("[DingTalk] start websocket connection, uri={}", connection.getEndPoint().toString()); @@ -41,12 +43,10 @@ public Session connect(EndPointConnection connection, ClientConnectionListener l bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { - WebSocketClientProtocolConfig config = WebSocketClientProtocolConfig.newBuilder().dropPongFrames(false) - .webSocketUri(configureWebsocketUri(connection)) - .handshakeTimeoutMillis(option.getTtl()) - .dropPongFrames(false) - .handleCloseFrames(true) - .build(); + if (connection.getProxy() != null) { + socketChannel.pipeline().addLast(new HttpProxyHandler(connection.getProxy().address())); + } + WebSocketClientProtocolConfig config = WebSocketClientProtocolConfig.newBuilder().dropPongFrames(false).webSocketUri(configureWebsocketUri(connection)).handshakeTimeoutMillis(option.getTtl()).dropPongFrames(false).handleCloseFrames(true).build(); SslContext sslContext = SslContextBuilder.forClient().build(); if (connection.getProtocol().isTls()) { final SSLEngine engine = sslContext.newEngine(socketChannel.alloc()); diff --git a/app-stream-network/pom.xml b/app-stream-network/pom.xml index 7f72e23..0361fc6 100644 --- a/app-stream-network/pom.xml +++ b/app-stream-network/pom.xml @@ -5,7 +5,7 @@ open-app-stream-client com.dingtalk.open - 1.2.0 + 1.2.1 ../pom.xml pom diff --git a/app-stream-protocol/pom.xml b/app-stream-protocol/pom.xml index 2b2e3c4..1306145 100644 --- a/app-stream-protocol/pom.xml +++ b/app-stream-protocol/pom.xml @@ -6,7 +6,7 @@ com.dingtalk.open open-app-stream-client - 1.2.0 + 1.2.1 app-stream-protocol diff --git a/dingtalk-stream/pom.xml b/dingtalk-stream/pom.xml index 1d499ad..4734336 100644 --- a/dingtalk-stream/pom.xml +++ b/dingtalk-stream/pom.xml @@ -4,13 +4,13 @@ com.dingtalk.open open-app-stream-client - 1.2.0 + 1.2.1 ../pom.xml dingtalk-stream jar - 1.2.0 + 1.2.1 app-stream-client diff --git a/pom.xml b/pom.xml index 1508295..964e735 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.dingtalk.open open-app-stream-client pom - 1.2.0 + 1.2.1 app-stream-client app-stream-api @@ -69,6 +69,11 @@ netty-codec-http 4.1.81.Final + + io.netty + netty-handler-proxy + 4.1.81.Final + io.netty netty-common diff --git a/version.sh b/version.sh index 69b1fca..011cf51 100755 --- a/version.sh +++ b/version.sh @@ -1,3 +1,3 @@ #!/usr/bin/env bash -mvn versions:set -DnewVersion=1.2.0 +mvn versions:set -DnewVersion=1.2.1