Skip to content

Commit

Permalink
httpclient curry InputStream bytes for immediate reading, more pre-do…
Browse files Browse the repository at this point in the history
…wngraded apis.

solves #21
  • Loading branch information
wagyourtail committed Nov 24, 2024
1 parent f56eead commit 77afef1
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 87 deletions.
105 changes: 44 additions & 61 deletions java-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -220,72 +220,47 @@ fun JavaCompile.configCompile(version: JavaVersion) {
}
}

val tempFile11 =
project.layout.buildDirectory.get().asFile.resolve("jvmdg").resolve("java-api-${project.version}-downgraded-11.jar")

val downgradeJar11Exec by tasks.registering(JavaExec::class) {
group = "jvmdg"
dependsOn(tasks.jar)
val apiJar = tasks.jar.get().archiveFile.get().asFile.absolutePath

val rootMain = project(":").sourceSets.main.get()

mainClass.set("xyz.wagyourtail.jvmdg.compile.ZipDowngrader")
classpath = files(rootMain.output, rootMain.runtimeClasspath)
workingDir = project.layout.buildDirectory.get().asFile
jvmArgs = listOf("-Djvmdg.java-api=$apiJar")
args = listOf(
JavaVersion.VERSION_11.toOpcode().toString(),
apiJar,
tempFile11.absolutePath
)

javaLauncher = javaToolchains.launcherFor {
languageVersion.set(JavaLanguageVersion.of((toVersion - 1).majorVersion))
fun downgradeApi(version: JavaVersion): TaskProvider<Jar> {
val tempFile = project.layout.buildDirectory.get().asFile.resolve("jvmdg").resolve("javaApi-${project.version}-downgraded-${version.majorVersion}.jar")

val downgradeJarExec = tasks.register("downgradeJar${version.majorVersion}Exec", JavaExec::class) {
group = "jvmdg"
dependsOn(tasks.jar)
val apiJar = tasks.jar.get().archiveFile.get().asFile.absolutePath

val rootMain = project(":").sourceSets.main.get()

mainClass.set("xyz.wagyourtail.jvmdg.compile.ZipDowngrader")
classpath = files(rootMain.output, rootMain.runtimeClasspath)
workingDir = project.layout.buildDirectory.get().asFile
jvmArgs = listOf("-Djvmdg.javaApi=$apiJar")
args = listOf(
version.toOpcode().toString(),
apiJar,
tempFile.absolutePath
)

javaLauncher = javaToolchains.launcherFor {
languageVersion.set(JavaLanguageVersion.of((toVersion - 1).majorVersion))
}
}
}

val downgradeJar11 by tasks.registering(Jar::class) {
group = "jvmdg"
dependsOn(downgradeJar11Exec)
archiveClassifier.set("downgraded-11")
from(zipTree(tempFile11))
}

val tempFile8 =
project.layout.buildDirectory.get().asFile.resolve("jvmdg").resolve("java-api-${project.version}-downgraded-8.jar")

val downgradeJar8Exec by tasks.registering(JavaExec::class) {
group = "jvmdg"
dependsOn(tasks.jar)
val apiJar = tasks.jar.get().archiveFile.get().asFile.absolutePath

val rootMain = project(":").sourceSets.main.get()

mainClass.set("xyz.wagyourtail.jvmdg.compile.ZipDowngrader")
classpath = files(rootMain.output, rootMain.runtimeClasspath)
workingDir = project.layout.buildDirectory.get().asFile
jvmArgs = listOf("-Djvmdg.java-api=$apiJar")
args = listOf(
JavaVersion.VERSION_1_8.toOpcode().toString(),
apiJar,
tempFile8.absolutePath
)

javaLauncher = javaToolchains.launcherFor {
languageVersion.set(JavaLanguageVersion.of((toVersion - 1).majorVersion))
return tasks.register("downgradeJar${version.majorVersion}", Jar::class) {
group = "jvmdg"
dependsOn(downgradeJarExec)
archiveClassifier.set("downgraded-${version.majorVersion}")
from(zipTree(tempFile))
}
}


val downgradeJar8 by tasks.registering(Jar::class) {
group = "jvmdg"
dependsOn(downgradeJar8Exec)
archiveClassifier.set("downgraded-8")
from(zipTree(tempFile8))
}
val downgradeJar21 = downgradeApi(JavaVersion.VERSION_21)
val downgradeJar17 = downgradeApi(JavaVersion.VERSION_17)
val downgradeJar11 = downgradeApi(JavaVersion.VERSION_11)
val downgradeJar8 = downgradeApi(JavaVersion.VERSION_1_8)

tasks.assemble {
dependsOn(downgradeJar21)
dependsOn(downgradeJar17)
dependsOn(downgradeJar11)
dependsOn(downgradeJar8)
}
Expand Down Expand Up @@ -314,11 +289,19 @@ publishing {

from(components["java"])

artifact(tasks["downgradeJar11"]) {
artifact(downgradeJar21) {
classifier = "downgraded-21"
}

artifact(downgradeJar17) {
classifier = "downgraded-17"
}

artifact(downgradeJar11) {
classifier = "downgraded-11"
}

artifact(tasks["downgradeJar8"]) {
artifact(downgradeJar8) {
classifier = "downgraded-8"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public void onSubscribe(Flow.Subscription subscription) {
public void onNext(ByteBuffer item) {
try {
out.write(item.array(), item.arrayOffset() + item.position(), item.remaining());
item.position(item.limit());
} catch (IOException e) {
Utils.sneakyThrow(e);
}
Expand Down Expand Up @@ -218,15 +219,25 @@ public void onComplete() {
Version version = J_N_H_HttpClient.Version.HTTP_1_1;
HttpResponseInfo info = new HttpResponseInfo(responseCode, new J_N_H_HttpHeaders(headers), version);
J_N_H_HttpResponse.BodySubscriber<T> subscriber = handler.apply(info);
try (InputStream is = responseCode >= 400 ? connection.getErrorStream() : connection.getInputStream()) {
if (is != null) {
subscriber.onNext(List.of(ByteBuffer.wrap(is.readAllBytes())));

CompletableFuture.runAsync(() -> {
try (InputStream is = responseCode >= 400 ? connection.getErrorStream() : connection.getInputStream()) {

byte[] buffer = new byte[StreamIterator.BUFSIZE];
int bytesRead;
while ((bytesRead = is.read(buffer, 0, buffer.length)) != -1) {
subscriber.onNext(List.of(ByteBuffer.wrap(buffer, 0, bytesRead)));
buffer = new byte[StreamIterator.BUFSIZE];
}

subscriber.onComplete();
} catch (IOException e) {
subscriber.onError(e);
}
subscriber.onComplete();
T body = subscriber.getBody().toCompletableFuture().join();
}, executor == null ? ForkJoinPool.commonPool() : executor);

return new HttpResponseImpl<>(var1, info, body, null);
}
T body = subscriber.getBody().toCompletableFuture().join();
return new HttpResponseImpl<>(var1, info, body, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,10 @@
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Flow;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
Expand Down Expand Up @@ -470,30 +463,77 @@ public CompletionStage<Void> getBody() {
}

public static BodySubscriber<InputStream> ofInputStream() {
CompletableFuture<InputStream> result = new CompletableFuture<>();
return new BodySubscriber<>() {
ByteArrayOutputStream out = new ByteArrayOutputStream();
final BlockingDeque<ByteBuffer> deq = new LinkedBlockingDeque<>();
final AtomicBoolean completed = new AtomicBoolean(false);
final AtomicReference<Throwable> exception = new AtomicReference<>(null);

InputStream is = new InputStream() {
ByteBuffer current;

@Override
public int read() throws IOException {
// Check if an exception was set
Throwable t = exception.get();
if (t != null) {
if (completed.get()) {
throw new IOException("closed");
}
completed.set(true);
if (t instanceof IOException) {
throw (IOException) t;
} else {
throw new IOException(t);
}
}

// If current buffer is null, try to fetch a new one
if (current == null) {
if (completed.get() && deq.isEmpty()) {
return -1; // End of stream when nothing left
}
try {
current = deq.take(); // Blocks until a new ByteBuffer is available
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Preserve interrupt status
throw new IOException("Interrupted while waiting for data", e);
}
}

// If the current buffer is empty, mark it as done and try again
if (!current.hasRemaining()) {
current = null; // Reset to fetch new data
return read(); // Recursive call to read more data
}

return current.get() & 0xFF; // Return the next byte from the current buffer
}
};

// Create the result CompletionStage
CompletableFuture<InputStream> result = CompletableFuture.completedFuture(is);

return new BodySubscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}

@Override
public void onNext(List<ByteBuffer> item) {
item.forEach((b) -> {
out.write(b.array(), b.arrayOffset() + b.position(), b.remaining());
});
for (ByteBuffer b : item) {
deq.offer(b);
}
}

@Override
public void onError(Throwable throwable) {
result.completeExceptionally(throwable);
exception.set(throwable);
}

@Override
public void onComplete() {
result.complete(new ByteArrayInputStream(out.toByteArray()));
completed.set(true);
deq.add(ByteBuffer.allocate(0)); // ensure not stuck taking forever.
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import com.sun.net.httpserver.HttpServer;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.ForkJoinPool;

Expand Down Expand Up @@ -60,6 +62,16 @@ public static void main(String[] args) throws IOException, InterruptedException
.POST(HttpRequest.BodyPublishers.ofString("test body"))
.build();

HttpRequest request3 = HttpRequest.newBuilder()
.header("User-Agent", "JVMDG Test 1.0")
.uri(URI.create("http://localhost:" + port))
.build();

HttpResponse<InputStream> stream = client.send(request2, HttpResponse.BodyHandlers.ofInputStream());
try (InputStream is = stream.body()) {
System.out.println(new String(is.readAllBytes(), StandardCharsets.UTF_8));
}

HttpResponse<String> resp2 = client.send(request2, HttpResponse.BodyHandlers.ofString());
System.out.println(resp2.body());
} catch (IOException | InterruptedException e) {
Expand Down

0 comments on commit 77afef1

Please sign in to comment.