Skip to content

Commit

Permalink
Merge pull request #48 from cdapio/responsestream
Browse files Browse the repository at this point in the history
 [CDAP-17651] Add support streaming http response
  • Loading branch information
rmstar authored Feb 26, 2021
2 parents be6fee0 + 45ae059 commit eec68de
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.common.http;

import java.nio.ByteBuffer;

/**
* Consumer for {@link HttpResponse} body.
*/
public abstract class HttpContentConsumer {
// Default 64K chunk size
private static final int DEFAULT_CHUNK_SIZE = 65536;
private int chunkSize;

public HttpContentConsumer() {
this.chunkSize = DEFAULT_CHUNK_SIZE;
}

public HttpContentConsumer(int chunkSize) {
this.chunkSize = chunkSize;
}

/**
* This method is invoked when a new chunk of the response body is available to be consumed.
*
* @param chunk a {@link ByteBuffer} containing a chunk of the response body
* @return true to continue reading from the response stream, false to stop reading and close the connection.
*/
public abstract boolean onReceived(ByteBuffer chunk);

/**
* This method is invoked when the end of the response body is reached.
*/
public abstract void onFinished();

int getChunkSize() {
return chunkSize;
}
}
25 changes: 24 additions & 1 deletion common-http/src/main/java/io/cdap/common/http/HttpRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,23 @@ public class HttpRequest {
private final Multimap<String, String> headers;
private final ContentProvider<? extends InputStream> body;
private final Long bodyLength;
private HttpContentConsumer consumer;

public HttpRequest(HttpMethod method, URL url, @Nullable Multimap<String, String> headers,
@Nullable ContentProvider<? extends InputStream> body,
@Nullable Long bodyLength) {
this(method, url, headers, body, bodyLength, null);
}

public HttpRequest(HttpMethod method, URL url, @Nullable Multimap<String, String> headers,
@Nullable ContentProvider<? extends InputStream> body,
@Nullable Long bodyLength, @Nullable HttpContentConsumer consumer) {
this.method = method;
this.url = url;
this.headers = headers;
this.body = body;
this.bodyLength = bodyLength;
this.consumer = consumer;
}

public static Builder get(URL url) {
Expand Down Expand Up @@ -103,6 +111,15 @@ public Long getBodyLength() {
return bodyLength;
}

@Nullable
public HttpContentConsumer getConsumer() {
return consumer;
}

public boolean hasContentConsumer() {
return consumer != null;
}

/**
* Builder for {@link HttpRequest}.
*/
Expand All @@ -112,6 +129,7 @@ public static final class Builder {
private final Multimap<String, String> headers;
private ContentProvider<? extends InputStream> body;
private Long bodyLength;
private HttpContentConsumer consumer;

Builder(HttpMethod method, URL url) {
this.method = method;
Expand Down Expand Up @@ -203,8 +221,13 @@ public InputStream getInput() {
return this;
}

public Builder withContentConsumer(HttpContentConsumer consumer) {
this.consumer = consumer;
return this;
}

public HttpRequest build() {
return new HttpRequest(method, url, headers, body, bodyLength);
return new HttpRequest(method, url, headers, body, bodyLength, consumer);
}
}
}
47 changes: 16 additions & 31 deletions common-http/src/main/java/io/cdap/common/http/HttpRequests.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -62,6 +61,20 @@ private HttpRequests() { }
* @return HTTP response
*/
public static HttpResponse execute(HttpRequest request, HttpRequestConfig requestConfig) throws IOException {
HttpURLConnection conn = getConnection(request, requestConfig);
conn.connect();

ContentProvider<? extends InputStream> bodySrc = request.getBody();
if (bodySrc != null) {
try (InputStream input = bodySrc.getInput(); OutputStream os = conn.getOutputStream()) {
ByteStreams.copy(input, os);
}
}
return request.hasContentConsumer() ? new HttpResponse(conn, request.getConsumer()) : new HttpResponse(conn);
}

private static HttpURLConnection getConnection(HttpRequest request, HttpRequestConfig requestConfig)
throws IOException {
String requestMethod = request.getMethod().name();
URL url = request.getURL();

Expand All @@ -77,8 +90,7 @@ public static HttpResponse execute(HttpRequest request, HttpRequestConfig reques
}
}

ContentProvider<? extends InputStream> bodySrc = request.getBody();
if (bodySrc != null) {
if (request.getBody() != null) {
conn.setDoOutput(true);
Long bodyLength = request.getBodyLength();
if (bodyLength != null) {
Expand All @@ -100,34 +112,7 @@ public static HttpResponse execute(HttpRequest request, HttpRequestConfig reques
LOG.error("Got exception while disabling SSL certificate check for {}", request.getURL());
}
}

conn.connect();

try {
if (bodySrc != null) {
try (InputStream input = bodySrc.getInput(); OutputStream os = conn.getOutputStream()) {
ByteStreams.copy(input, os);
}
}

try {
if (isSuccessful(conn.getResponseCode())) {
try (InputStream inputStream = conn.getInputStream()) {
return new HttpResponse(conn.getResponseCode(), conn.getResponseMessage(),
ByteStreams.toByteArray(inputStream), conn.getHeaderFields());
}
}
} catch (FileNotFoundException e) {
// Server returns 404. Hence handle as error flow below. Intentional having empty catch block.
}

// Non 2xx response
InputStream es = conn.getErrorStream();
byte[] content = (es == null) ? new byte[0] : ByteStreams.toByteArray(es);
return new HttpResponse(conn.getResponseCode(), conn.getResponseMessage(), content, conn.getHeaderFields());
} finally {
conn.disconnect();
}
return conn;
}

/**
Expand Down
75 changes: 74 additions & 1 deletion common-http/src/main/java/io/cdap/common/http/HttpResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,37 @@
package io.cdap.common.http;

import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

/**
* Return type for http requests executed by {@link HttpResponse}
*/
public class HttpResponse {
private static final Logger LOG = LoggerFactory.getLogger(HttpResponse.class);
private final int responseCode;
private final String responseMessage;
private final byte[] responseBody;
private byte[] responseBody;
private final Multimap<String, String> headers;
private InputStream inputStream;
private HttpURLConnection conn;
private HttpContentConsumer consumer;

HttpResponse(int responseCode, String responseMessage,
byte[] responseBody, Map<String, List<String>> headers) {
Expand All @@ -45,6 +61,20 @@ public class HttpResponse {
this.headers = headers;
}

HttpResponse(HttpURLConnection conn) throws IOException {
this(conn, null);
this.responseBody = getResponseBodyFromStream();
}

HttpResponse(HttpURLConnection conn, @Nullable HttpContentConsumer consumer) throws IOException {
this.conn = conn;
this.responseCode = conn.getResponseCode();
this.responseMessage = conn.getResponseMessage();
this.headers = parseHeaders(conn.getHeaderFields());
this.inputStream = isSuccessful(responseCode) ? conn.getInputStream() : conn.getErrorStream();
this.consumer = consumer;
}

public int getResponseCode() {
return responseCode;
}
Expand All @@ -69,6 +99,30 @@ public Multimap<String, String> getHeaders() {
return headers;
}

public void consumeContent() throws IOException {
if (inputStream == null) {
conn.disconnect();
consumer.onFinished();
return;
}

try (ReadableByteChannel channel = Channels.newChannel(inputStream)) {
ByteBuffer buffer = ByteBuffer.allocate(consumer.getChunkSize());
while (channel.read(buffer) >= 0) {
// Flip the buffer for the consumer to read
buffer.flip();
boolean continueReading = consumer.onReceived(buffer);
buffer.clear();
if (!continueReading) {
break;
}
}
} finally {
conn.disconnect();
consumer.onFinished();
}
}

private static Multimap<String, String> parseHeaders(Map<String, List<String>> headers) {
ImmutableListMultimap.Builder<String, String> builder = new ImmutableListMultimap.Builder<String, String>();
for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
Expand All @@ -80,6 +134,25 @@ private static Multimap<String, String> parseHeaders(Map<String, List<String>> h
return builder.build();
}

private byte[] getResponseBodyFromStream() {
try {
if (inputStream == null) {
return new byte[0];
}
return ByteStreams.toByteArray(inputStream);
} catch (IOException e) {
throw Throwables.propagate(e);
} finally {
Closeables.closeQuietly(inputStream);
inputStream = null;
conn.disconnect();
}
}

private boolean isSuccessful(int responseCode) {
return 200 <= responseCode && responseCode < 300;
}

@Override
public String toString() {
return String.format("Response code: %s, message: '%s', body: '%s'",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright © 2021 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.common.http;

import org.junit.After;
import org.junit.Before;

import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;

public class HttpRequestsStreamTest extends HttpRequestsTestBase {

private static TestHttpService httpService;

@Before
public void setUp() throws Exception {
httpService = new TestHttpService(false);
httpService.startAndWait();
}

@After
public void tearDown() {
httpService.stopAndWait();
}

@Override
protected URI getBaseURI() throws URISyntaxException {
InetSocketAddress bindAddress = httpService.getBindAddress();
return new URI("http://" + bindAddress.getHostName() + ":" + bindAddress.getPort());
}

@Override
protected HttpRequestConfig getHttpRequestsConfig() {
return new HttpRequestConfig(0, 0, false);
}

@Override
protected int getNumConnectionsOpened() {
return httpService.getNumConnectionsOpened();
}

@Override
protected boolean returnResponseStream() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,9 @@ protected HttpRequestConfig getHttpRequestsConfig() {
protected int getNumConnectionsOpened() {
return httpService.getNumConnectionsOpened();
}

@Override
protected boolean returnResponseStream() {
return false;
}
}
Loading

0 comments on commit eec68de

Please sign in to comment.