diff --git a/examples/webserver/streaming/README.md b/examples/webserver/streaming/README.md
index ca171f6c..1c40b400 100644
--- a/examples/webserver/streaming/README.md
+++ b/examples/webserver/streaming/README.md
@@ -14,6 +14,6 @@ java -jar target/helidon-examples-webserver-streaming.jar
Upload a file and download it back with `curl`:
```shell
-curl --data-binary "@target/classes/large-file.bin" http://localhost:8080/upload
-curl http://localhost:8080/download
+curl --data-binary "@large-file.bin" http://localhost:8080/upload
+curl http://localhost:8080/download --output myfile.bin
```
diff --git a/examples/webserver/streaming/src/main/resources/large-file.bin b/examples/webserver/streaming/large-file.bin
similarity index 100%
rename from examples/webserver/streaming/src/main/resources/large-file.bin
rename to examples/webserver/streaming/large-file.bin
diff --git a/examples/webserver/streaming/pom.xml b/examples/webserver/streaming/pom.xml
index a5194cb5..1a779425 100644
--- a/examples/webserver/streaming/pom.xml
+++ b/examples/webserver/streaming/pom.xml
@@ -57,8 +57,13 @@
test
- io.helidon.webserver
- helidon-webserver-test-support
+ org.junit.jupiter
+ junit-jupiter-params
+ test
+
+
+ io.helidon.webclient
+ helidon-webclient
test
diff --git a/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/Main.java b/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/Main.java
index a7e118e2..e2ffc144 100644
--- a/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/Main.java
+++ b/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/Main.java
@@ -24,8 +24,6 @@
*/
public class Main {
- static final String LARGE_FILE_RESOURCE = "/large-file.bin";
-
private Main() {
}
diff --git a/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/ServerFileReader.java b/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/ServerFileReader.java
deleted file mode 100644
index 12e0c4d1..00000000
--- a/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/ServerFileReader.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright (c) 2018, 2024 Oracle and/or its affiliates.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.helidon.webserver.examples.streaming;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.concurrent.Flow;
-import java.util.logging.Logger;
-
-import io.helidon.common.http.DataChunk;
-
-/**
- * Class ServerFileReader. Reads a file using NIO and produces data chunks for a
- * {@code Subscriber} to process.
- */
-public class ServerFileReader implements Flow.Publisher {
- private static final Logger LOGGER = Logger.getLogger(ServerFileReader.class.getName());
-
- static final int BUFFER_SIZE = 4096;
-
- private final Path path;
-
- ServerFileReader(Path path) {
- this.path = path;
- }
-
- @Override
- public void subscribe(Flow.Subscriber super DataChunk> s) {
- FileChannel channel;
- ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
-
- try {
- channel = FileChannel.open(path, StandardOpenOption.READ);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- s.onSubscribe(new Flow.Subscription() {
- @Override
- public void request(long n) {
- try {
- while (n > 0) {
- int bytes = channel.read(buffer);
- if (bytes < 0) {
- s.onComplete();
- channel.close();
- return;
- }
- if (bytes > 0) {
- LOGGER.info(buffer.toString());
- buffer.flip();
- s.onNext(DataChunk.create(buffer));
- n--;
- }
- buffer.rewind();
- }
- } catch (IOException e) {
- s.onError(e);
- }
- }
-
- @Override
- public void cancel() {
- try {
- channel.close();
- } catch (IOException e) {
- LOGGER.info(e.getMessage());
- }
- }
- });
- }
-}
diff --git a/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/ServerFileWriter.java b/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/ServerFileWriter.java
deleted file mode 100644
index 9264d467..00000000
--- a/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/ServerFileWriter.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright (c) 2018, 2024 Oracle and/or its affiliates.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.helidon.webserver.examples.streaming;
-
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.concurrent.Flow;
-import java.util.logging.Logger;
-
-import io.helidon.common.http.DataChunk;
-import io.helidon.webserver.ServerResponse;
-
-/**
- * Class ServerFileWriter. Process data chunks from a {@code Producer} and
- * writes them to a temporary file using NIO. For simplicity, this {@code
- * Subscriber} requests an unbounded number of chunks on its subscription.
- */
-public class ServerFileWriter implements Flow.Subscriber {
- private static final Logger LOGGER = Logger.getLogger(ServerFileWriter.class.getName());
-
- private final FileChannel channel;
-
- private final ServerResponse response;
-
- ServerFileWriter(ServerResponse response) {
- this.response = response;
- try {
- Path tempFilePath = Files.createTempFile("large-file", ".tmp");
- channel = FileChannel.open(tempFilePath, StandardOpenOption.WRITE);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void onSubscribe(Flow.Subscription subscription) {
- subscription.request(Long.MAX_VALUE);
- }
-
- @Override
- public void onNext(DataChunk chunk) {
- try {
- channel.write(chunk.data());
- LOGGER.info(chunk.data().toString() + " " + Thread.currentThread());
- chunk.release();
- } catch (IOException e) {
- LOGGER.info(e.getMessage());
- }
- }
-
- @Override
- public void onError(Throwable throwable) {
- throwable.printStackTrace();
- }
-
- @Override
- public void onComplete() {
- try {
- channel.close();
- response.send("DONE");
- } catch (IOException e) {
- LOGGER.info(e.getMessage());
- }
- }
-}
diff --git a/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/StreamingService.java b/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/StreamingService.java
index 208d10a2..d50da35d 100644
--- a/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/StreamingService.java
+++ b/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/StreamingService.java
@@ -16,52 +16,87 @@
package io.helidon.webserver.examples.streaming;
-import java.net.URISyntaxException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger;
+import io.helidon.common.configurable.ScheduledThreadPoolSupplier;
+import io.helidon.common.http.DataChunk;
+import io.helidon.common.http.Http;
+import io.helidon.common.reactive.IoMulti;
import io.helidon.webserver.Routing;
import io.helidon.webserver.ServerRequest;
import io.helidon.webserver.ServerResponse;
import io.helidon.webserver.Service;
-import static io.helidon.webserver.examples.streaming.Main.LARGE_FILE_RESOURCE;
-
/**
* StreamingService class. Uses a {@code Subscriber} and a
* {@code Publisher} for uploading and downloading files.
*/
public class StreamingService implements Service {
private static final Logger LOGGER = Logger.getLogger(StreamingService.class.getName());
-
- private final Path filePath;
+ private final ScheduledExecutorService executor = ScheduledThreadPoolSupplier.create().get();
+ private volatile Path filePath;
StreamingService() {
- try {
- filePath = Paths.get(getClass().getResource(LARGE_FILE_RESOURCE).toURI());
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
}
@Override
public void update(Routing.Rules routingRules) {
routingRules.get("/download", this::download)
- .post("/upload", this::upload);
+ .post("/upload", this::upload);
}
private void upload(ServerRequest request, ServerResponse response) {
LOGGER.info("Entering upload ... " + Thread.currentThread());
- request.content().subscribe(new ServerFileWriter(response));
+ Path tempFilePath = createTempFile("large-file", ".tmp");
+ filePath = tempFilePath;
+ LOGGER.info("Storing upload as " + tempFilePath);
+ request.content()
+ .map(DataChunk::data)
+ .flatMapIterable(Arrays::asList)
+ .to(IoMulti.writeToFile(tempFilePath)
+ .executor(executor)
+ .build())
+ .thenRun(() -> response.status(Http.Status.OK_200).send());
LOGGER.info("Exiting upload ...");
}
private void download(ServerRequest request, ServerResponse response) {
LOGGER.info("Entering download ..." + Thread.currentThread());
+ if (filePath == null) {
+ LOGGER.warning("No file to download.");
+ response.status(Http.Status.BAD_REQUEST_400).send("No file to download. Please upload file first.");
+ return;
+ }
long length = filePath.toFile().length();
- response.headers().add("Content-Length", String.valueOf(length));
- response.send(new ServerFileReader(filePath));
- LOGGER.info("Exiting download ...");
+ response.headers().contentLength(length);
+ response.send(IoMulti.multiFromByteChannelBuilder(newByteChannel(filePath))
+ .executor(executor)
+ .build()
+ .map(DataChunk::create));
+ LOGGER.info("Exiting download. Returning " + length + " bytes...");
+ }
+
+ @SuppressWarnings("SameParameterValue")
+ private static Path createTempFile(String prefix, String suffix) {
+ try {
+ return Files.createTempFile(prefix, suffix);
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ }
+
+ private static ReadableByteChannel newByteChannel(Path path) {
+ try {
+ return Files.newByteChannel(path);
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
}
}
diff --git a/examples/webserver/streaming/src/test/java/io/helidon/webserver/examples/streaming/MainTest.java b/examples/webserver/streaming/src/test/java/io/helidon/webserver/examples/streaming/MainTest.java
new file mode 100644
index 00000000..d813c956
--- /dev/null
+++ b/examples/webserver/streaming/src/test/java/io/helidon/webserver/examples/streaming/MainTest.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright (c) 2024 Oracle and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.helidon.webserver.examples.streaming;
+
+import io.helidon.common.reactive.Single;
+import io.helidon.config.Config;
+import io.helidon.config.ConfigSources;
+import io.helidon.webclient.WebClient;
+import io.helidon.webclient.WebClientResponse;
+import io.helidon.webserver.WebServer;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static io.helidon.common.http.Http.Status;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.TestMethodOrder;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+class MainTest {
+
+ private static WebServer webServer;
+ private static WebClient webClient;
+ private static int publicPort;
+
+ private final String TEST_DATA_1 = "Test Data 1";
+ private final String TEST_DATA_2 = "Test Data 2";
+
+ @BeforeAll
+ public static void startTheServer() {
+ webServer = WebServer.builder(Main.createRouting()).build().start().await();
+ webClient = WebClient.builder().build();
+ publicPort = webServer.port();
+ }
+
+ @Test
+ @Order(0)
+ void testBadRequest() throws Exception {
+ webClient.get()
+ .uri("http://localhost:" + publicPort)
+ .path("/download")
+ .request()
+ .thenAccept(response -> assertThat(response.status(), is(Status.BAD_REQUEST_400)))
+ .toCompletableFuture()
+ .get();
+ }
+
+ @Test
+ @Order(1)
+ void testUpload1() throws Exception {
+ WebClientResponse response = webClient.post()
+ .uri("http://localhost:" + publicPort)
+ .path("/upload")
+ .submit(TEST_DATA_1)
+ .toCompletableFuture()
+ .get();
+ assertThat(response.status(), is(Status.OK_200));
+ }
+
+ @Test
+ @Order(2)
+ void testDownload1() throws Exception {
+ WebClientResponse response = webClient.get()
+ .uri("http://localhost:" + publicPort)
+ .path("/download")
+ .request()
+ .toCompletableFuture()
+ .get();
+ assertThat(response.status(), is(Status.OK_200));
+ assertThat(response.content().as(String.class).get(), is (TEST_DATA_1));
+ }
+
+ @Test
+ @Order(3)
+ void testUpload2() throws Exception {
+ WebClientResponse response = webClient.post()
+ .uri("http://localhost:" + publicPort)
+ .path("/upload")
+ .submit(TEST_DATA_2)
+ .toCompletableFuture()
+ .get();
+ assertThat(response.status(), is(Status.OK_200));
+ }
+
+ @Test
+ @Order(4)
+ void testDownload2() throws Exception {
+ WebClientResponse response = webClient.get()
+ .uri("http://localhost:" + publicPort)
+ .path("/download")
+ .request()
+ .toCompletableFuture()
+ .get();
+ assertThat(response.status(), is(Status.OK_200));
+ assertThat(response.content().as(String.class).get(), is (TEST_DATA_2));
+ }
+}
\ No newline at end of file