Skip to content

Commit

Permalink
Merge branch 'feature/feature/proxy'
Browse files Browse the repository at this point in the history
  • Loading branch information
mike.wq committed Nov 23, 2023
2 parents cbdddfc + 0dd15eb commit ab170fb
Show file tree
Hide file tree
Showing 22 changed files with 124 additions and 45 deletions.
2 changes: 1 addition & 1 deletion app-stream-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>open-app-stream-client</artifactId>
<groupId>com.dingtalk.open</groupId>
<version>1.2.0</version>
<version>1.2.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.dingtalk.open.app.stream.network.core.NetWorkService;
import com.dingtalk.open.app.stream.network.core.Subscription;

import java.net.Proxy;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand All @@ -33,23 +34,26 @@ class OpenDingTalkStreamClient implements OpenDingTalkClient {
private OpenApiClient openApiClient;
private Set<Subscription> subscriptions;
private final AtomicReference<Status> status;
private final Proxy proxy;

public OpenDingTalkStreamClient(DingTalkCredential credential, CommandDispatcher dispatcher, ExecutorService executor, ClientOption option, Set<Subscription> subscriptions) {
public OpenDingTalkStreamClient(DingTalkCredential credential, CommandDispatcher dispatcher, ExecutorService executor, ClientOption option, Set<Subscription> subscriptions,
Proxy proxy) {
this.credential = credential;
this.dispatcher = dispatcher;
this.executor = executor;
this.option = option;
this.subscriptions = Collections.unmodifiableSet(subscriptions);
this.status = new AtomicReference<>(Status.INIT);
this.proxy = proxy;
}

@Override
public synchronized void start() throws OpenDingTalkAppException {
if (status.get() == Status.INIT) {
this.openApiClient = OpenApiClientBuilder.create().setHost(option.getOpenApiHost()).setTimeout(option.getConnectionTTL()).build();
final EndPointConnectionFactory factory = () -> openConnection(this.credential, subscriptions);
final EndPointConnectionFactory factory = () -> openConnection(this.credential, subscriptions, proxy);
ClientConnectionListener listener = new AppServiceListener(dispatcher, executor);
this.netWorkService = new NetWorkService(factory, listener, option.getMaxConnectionCount(), option.getConnectionTTL(), option.getConnectTimeout(),option.getKeepAliveOption().getKeepAliveIdleMill());
this.netWorkService = new NetWorkService(factory, listener, option.getMaxConnectionCount(), option.getConnectionTTL(), option.getConnectTimeout(), option.getKeepAliveOption().getKeepAliveIdleMill());
this.netWorkService.start();
this.status.set(Status.ACTIVE);
} else if (status.get() == Status.INACTIVE) {
Expand All @@ -70,15 +74,15 @@ public synchronized void stop() throws Exception {
}
}

private EndPointConnection openConnection(DingTalkCredential credential, Set<Subscription> subscriptions) throws Exception {
private EndPointConnection openConnection(DingTalkCredential credential, Set<Subscription> subscriptions, Proxy proxy) throws Exception {
OpenConnectionRequest request = new OpenConnectionRequest();
request.setClientId(credential.getClientId());
request.setClientSecret(credential.getClientSecret());
request.setUa(UserAgent.getUserAgent().getUa());
request.setSubscriptions(subscriptions);
request.setLocalIp(IpUtils.getLocalIP());
OpenConnectionResponse response = openApiClient.openConnection(request);
return new EndPointConnection(credential.getClientId(), response.getEndpoint(), response.getTicket());
return new EndPointConnection(credential.getClientId(), response.getEndpoint(), response.getTicket(), proxy);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package com.dingtalk.open.app.api;

import com.dingtalk.open.app.api.command.CommandDispatcher;
import com.dingtalk.open.app.api.util.ThreadUtil;
import com.dingtalk.open.app.api.callback.CallbackCommandExecutor;
import com.dingtalk.open.app.api.callback.OpenDingTalkCallbackListener;
import com.dingtalk.open.app.api.command.CommandDispatcher;
import com.dingtalk.open.app.api.protocol.CommandExecutor;
import com.dingtalk.open.app.api.protocol.EventCommandExecutor;
import com.dingtalk.open.app.api.security.DingTalkCredential;
import com.dingtalk.open.app.api.util.ThreadUtil;
import com.dingtalk.open.app.stream.network.api.NetProxy;
import com.dingtalk.open.app.stream.network.core.Subscription;
import com.dingtalk.open.app.stream.protocol.CommandType;

import java.util.*;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;

/**
Expand All @@ -27,6 +33,8 @@ public class OpenDingTalkStreamClientBuilder {
private int connectionTimeToLive = 6 * 60 * 60 * 1000;
private long connectTimeout = 3 * 1000L;

private Proxy proxy;

private KeepAliveOption keepAliveOption = new KeepAliveOption();

private String openApiHost = "https://api.dingtalk.com";
Expand Down Expand Up @@ -99,6 +107,17 @@ public OpenDingTalkStreamClientBuilder preEnv() {
return this.openApiHost("https://pre-api.dingtalk.com");
}

/**
* 设置代理方式
*
* @param netProxy
* @return
*/
public OpenDingTalkStreamClientBuilder proxy(NetProxy netProxy) {
this.proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(netProxy.getIp(), netProxy.getPort()));
return this;
}

public OpenDingTalkClient build() {
ClientOption option = new ClientOption();
option.setConnectTimeout(connectTimeout);
Expand All @@ -107,7 +126,7 @@ public OpenDingTalkClient build() {
option.setOpenApiHost(openApiHost);
option.setKeepAliveOption(keepAliveOption);
ExecutorService executor = ThreadUtil.newFixedExecutor(consumeThreads, "DingTalk-Consumer");
return new OpenDingTalkStreamClient(credential, new CommandDispatcher(commands), executor, option, subscriptions);
return new OpenDingTalkStreamClient(credential, new CommandDispatcher(commands), executor, option, subscriptions, proxy);
}

private void subscribe(CommandType type, String topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.dingtalk.open.app.api.util.IoUtils;

import java.net.HttpURLConnection;
import java.net.Proxy;
import java.net.URL;

/**
Expand All @@ -17,16 +18,26 @@ class HttpOpenApiClient implements OpenApiClient {

private final String host;

private final Proxy proxy;

private final int timeout;

public HttpOpenApiClient(String host, int timeout) {
public HttpOpenApiClient(String host, int timeout, Proxy proxy) {
this.host = host;
this.timeout = timeout;
this.proxy = proxy;
}

@Override
public OpenConnectionResponse openConnection(OpenConnectionRequest request) throws Exception {
final HttpURLConnection connection = (HttpURLConnection) new URL(host + "/v1.0/gateway/connections/open").openConnection();
URL url = new URL(host + "/v1.0/gateway/connections/open");

HttpURLConnection connection;
if (proxy != null) {
connection = (HttpURLConnection) url.openConnection(proxy);
} else {
connection = (HttpURLConnection) url.openConnection();
}
connection.setRequestMethod(HttpConstants.METHOD_POST);
connection.setReadTimeout(this.timeout);
connection.setConnectTimeout(this.timeout);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.dingtalk.open.app.api.open;

import java.net.Proxy;

/**
* @author feiyin
* @date 2023/3/1
Expand All @@ -16,6 +18,8 @@ public static OpenApiClientBuilder create() {

public String host;

private Proxy proxy;

private int timeout = 3000;

public OpenApiClientBuilder setHost(String host) {
Expand All @@ -28,7 +32,12 @@ public OpenApiClientBuilder setTimeout(int timeout) {
return this;
}

public OpenApiClientBuilder setProxy(Proxy proxy) {
this.proxy = proxy;
return this;
}

public OpenApiClient build() {
return new HttpOpenApiClient(host, timeout);
return new HttpOpenApiClient(host, timeout, proxy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void receive(Context context) {
try {
commandDispatcher.execute(context);
} catch (Exception e) {
LOGGER.error("[DingTalk] dispatch command failed", e);
LOGGER.error("[DingTalk] dispatch command failed, {}", e);
}
});
}
Expand Down
4 changes: 2 additions & 2 deletions app-stream-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
<parent>
<groupId>com.dingtalk.open</groupId>
<artifactId>open-app-stream-client</artifactId>
<version>1.2.0</version>
<version>1.2.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>app-stream-client</artifactId>
<packaging>jar</packaging>
<version>1.2.0</version>
<version>1.2.1</version>
<name>app-stream-client</name>

<dependencies>
Expand Down
4 changes: 2 additions & 2 deletions app-stream-network/app-stream-network-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
<parent>
<groupId>com.dingtalk.open</groupId>
<artifactId>app-stream-network</artifactId>
<version>1.2.0</version>
<version>1.2.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>app-stream-network-api</artifactId>
<version>1.2.0</version>
<version>1.2.1</version>
<packaging>jar</packaging>

<name>app-stream-network-api</name>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.dingtalk.open.app.stream.network.api;

import java.net.Proxy;
import java.net.URI;
import java.net.URISyntaxException;

Expand All @@ -12,14 +13,17 @@ public class EndPointConnection {
private final URI endPoint;
private final String connectionId;

public EndPointConnection(String clientId, String endPoint, String connectionId) {
private final Proxy proxy;

public EndPointConnection(String clientId, String endPoint, String connectionId, Proxy proxy) {
this.clientId = clientId;
try {
this.endPoint = new URI(endPoint);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
this.connectionId = connectionId;
this.proxy = proxy;
}

public URI getEndPoint() {
Expand All @@ -38,4 +42,7 @@ public TransportProtocol getProtocol() {
return TransportProtocol.parseScheme(endPoint.getScheme());
}

public Proxy getProxy() {
return proxy;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.dingtalk.open.app.stream.network.api;

/**
* @author feiyin
* @date 2023/11/21
*/
public class NetProxy {
private String host;
private Integer port;

public NetProxy(String host, Integer port) {
this.host = host;
this.port = port;
}

public String getIp() {
return host;
}

public Integer getPort() {
return port;
}
}
2 changes: 1 addition & 1 deletion app-stream-network/app-stream-network-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.dingtalk.open</groupId>
<artifactId>app-stream-network</artifactId>
<version>1.2.0</version>
<version>1.2.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private static void ensureActive() {
}
INIT = true;
} catch (Exception e) {
LOGGER.error("[DingTalk] client init transport failed", e);
LOGGER.error("[DingTalk] client init transport failed, {}", e);
} finally {
INIT_LOCK.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void run() {
}
}
} catch (Throwable e) {
LOGGER.error("[DingTalk] establish connection failed", e);
LOGGER.error("[DingTalk] establish connection failed, {}", e);
}
}
}
Expand All @@ -201,7 +201,7 @@ public T run(Callable<T> callable) throws Exception {
try {
return callable.call();
} catch (Exception e) {
LOGGER.error("[DingTalk] retrievable executor execute failed", e);
LOGGER.error("[DingTalk] retrievable executor execute failed, {}", e);
if (count.get() <= 0) {
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.dingtalk.open.app.stream.network.core;

import com.dingtalk.open.app.stream.network.api.ClientConnectionListener;
import com.dingtalk.open.app.stream.network.api.logger.InternalLogger;
import com.dingtalk.open.app.stream.network.api.logger.InternalLoggerFactory;

import java.net.Proxy;


/**
* @author feiyin
Expand All @@ -11,14 +12,10 @@
public class NetWorkService {
private final DefaultSessionPool sessionPool;

public NetWorkService(EndPointConnectionFactory factory,
ClientConnectionListener listener,
int maxConnection,
long ttl,
long connectTimeout,
long keepAliveIdle) {
public NetWorkService(EndPointConnectionFactory factory, ClientConnectionListener listener, int maxConnection, long ttl, long connectTimeout, long keepAliveIdle) {
this.sessionPool = new DefaultSessionPool(factory, maxConnection, ttl, connectTimeout, keepAliveIdle, listener);
}

/**
* 开始
*/
Expand Down
2 changes: 1 addition & 1 deletion app-stream-network/app-stream-network-rsocket/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.dingtalk.open</groupId>
<artifactId>app-stream-network</artifactId>
<version>1.2.0</version>
<version>1.2.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
6 changes: 5 additions & 1 deletion app-stream-network/app-stream-network-ws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.dingtalk.open</groupId>
<artifactId>app-stream-network</artifactId>
<version>1.2.0</version>
<version>1.2.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand All @@ -26,5 +26,9 @@
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit ab170fb

Please sign in to comment.