From 737d1832e5dce4eab97733922c492e92958c6866 Mon Sep 17 00:00:00 2001 From: Joe DiPol Date: Wed, 17 Apr 2024 15:05:52 -0700 Subject: [PATCH] 2.x: Fix #44: streaming example fails to start (#46) * 2.x: Fix #42: streaming example fails to start --- examples/webserver/streaming/README.md | 4 +- .../{src/main/resources => }/large-file.bin | Bin examples/webserver/streaming/pom.xml | 9 +- .../webserver/examples/streaming/Main.java | 2 - .../examples/streaming/ServerFileReader.java | 89 -------------- .../examples/streaming/ServerFileWriter.java | 82 ------------- .../examples/streaming/StreamingService.java | 67 +++++++--- .../examples/streaming/MainTest.java | 115 ++++++++++++++++++ 8 files changed, 175 insertions(+), 193 deletions(-) rename examples/webserver/streaming/{src/main/resources => }/large-file.bin (100%) delete mode 100644 examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/ServerFileReader.java delete mode 100644 examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/ServerFileWriter.java create mode 100644 examples/webserver/streaming/src/test/java/io/helidon/webserver/examples/streaming/MainTest.java diff --git a/examples/webserver/streaming/README.md b/examples/webserver/streaming/README.md index ca171f6c..1c40b400 100644 --- a/examples/webserver/streaming/README.md +++ b/examples/webserver/streaming/README.md @@ -14,6 +14,6 @@ java -jar target/helidon-examples-webserver-streaming.jar Upload a file and download it back with `curl`: ```shell -curl --data-binary "@target/classes/large-file.bin" http://localhost:8080/upload -curl http://localhost:8080/download +curl --data-binary "@large-file.bin" http://localhost:8080/upload +curl http://localhost:8080/download --output myfile.bin ``` diff --git a/examples/webserver/streaming/src/main/resources/large-file.bin b/examples/webserver/streaming/large-file.bin similarity index 100% rename from examples/webserver/streaming/src/main/resources/large-file.bin rename to examples/webserver/streaming/large-file.bin diff --git a/examples/webserver/streaming/pom.xml b/examples/webserver/streaming/pom.xml index a5194cb5..1a779425 100644 --- a/examples/webserver/streaming/pom.xml +++ b/examples/webserver/streaming/pom.xml @@ -57,8 +57,13 @@ test - io.helidon.webserver - helidon-webserver-test-support + org.junit.jupiter + junit-jupiter-params + test + + + io.helidon.webclient + helidon-webclient test diff --git a/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/Main.java b/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/Main.java index a7e118e2..e2ffc144 100644 --- a/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/Main.java +++ b/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/Main.java @@ -24,8 +24,6 @@ */ public class Main { - static final String LARGE_FILE_RESOURCE = "/large-file.bin"; - private Main() { } diff --git a/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/ServerFileReader.java b/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/ServerFileReader.java deleted file mode 100644 index 12e0c4d1..00000000 --- a/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/ServerFileReader.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (c) 2018, 2024 Oracle and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.helidon.webserver.examples.streaming; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.concurrent.Flow; -import java.util.logging.Logger; - -import io.helidon.common.http.DataChunk; - -/** - * Class ServerFileReader. Reads a file using NIO and produces data chunks for a - * {@code Subscriber} to process. - */ -public class ServerFileReader implements Flow.Publisher { - private static final Logger LOGGER = Logger.getLogger(ServerFileReader.class.getName()); - - static final int BUFFER_SIZE = 4096; - - private final Path path; - - ServerFileReader(Path path) { - this.path = path; - } - - @Override - public void subscribe(Flow.Subscriber s) { - FileChannel channel; - ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); - - try { - channel = FileChannel.open(path, StandardOpenOption.READ); - } catch (IOException e) { - throw new RuntimeException(e); - } - - s.onSubscribe(new Flow.Subscription() { - @Override - public void request(long n) { - try { - while (n > 0) { - int bytes = channel.read(buffer); - if (bytes < 0) { - s.onComplete(); - channel.close(); - return; - } - if (bytes > 0) { - LOGGER.info(buffer.toString()); - buffer.flip(); - s.onNext(DataChunk.create(buffer)); - n--; - } - buffer.rewind(); - } - } catch (IOException e) { - s.onError(e); - } - } - - @Override - public void cancel() { - try { - channel.close(); - } catch (IOException e) { - LOGGER.info(e.getMessage()); - } - } - }); - } -} diff --git a/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/ServerFileWriter.java b/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/ServerFileWriter.java deleted file mode 100644 index 9264d467..00000000 --- a/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/ServerFileWriter.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright (c) 2018, 2024 Oracle and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.helidon.webserver.examples.streaming; - -import java.io.IOException; -import java.nio.channels.FileChannel; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.concurrent.Flow; -import java.util.logging.Logger; - -import io.helidon.common.http.DataChunk; -import io.helidon.webserver.ServerResponse; - -/** - * Class ServerFileWriter. Process data chunks from a {@code Producer} and - * writes them to a temporary file using NIO. For simplicity, this {@code - * Subscriber} requests an unbounded number of chunks on its subscription. - */ -public class ServerFileWriter implements Flow.Subscriber { - private static final Logger LOGGER = Logger.getLogger(ServerFileWriter.class.getName()); - - private final FileChannel channel; - - private final ServerResponse response; - - ServerFileWriter(ServerResponse response) { - this.response = response; - try { - Path tempFilePath = Files.createTempFile("large-file", ".tmp"); - channel = FileChannel.open(tempFilePath, StandardOpenOption.WRITE); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void onSubscribe(Flow.Subscription subscription) { - subscription.request(Long.MAX_VALUE); - } - - @Override - public void onNext(DataChunk chunk) { - try { - channel.write(chunk.data()); - LOGGER.info(chunk.data().toString() + " " + Thread.currentThread()); - chunk.release(); - } catch (IOException e) { - LOGGER.info(e.getMessage()); - } - } - - @Override - public void onError(Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void onComplete() { - try { - channel.close(); - response.send("DONE"); - } catch (IOException e) { - LOGGER.info(e.getMessage()); - } - } -} diff --git a/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/StreamingService.java b/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/StreamingService.java index 208d10a2..d50da35d 100644 --- a/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/StreamingService.java +++ b/examples/webserver/streaming/src/main/java/io/helidon/webserver/examples/streaming/StreamingService.java @@ -16,52 +16,87 @@ package io.helidon.webserver.examples.streaming; -import java.net.URISyntaxException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.channels.ReadableByteChannel; +import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; +import java.util.Arrays; +import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Logger; +import io.helidon.common.configurable.ScheduledThreadPoolSupplier; +import io.helidon.common.http.DataChunk; +import io.helidon.common.http.Http; +import io.helidon.common.reactive.IoMulti; import io.helidon.webserver.Routing; import io.helidon.webserver.ServerRequest; import io.helidon.webserver.ServerResponse; import io.helidon.webserver.Service; -import static io.helidon.webserver.examples.streaming.Main.LARGE_FILE_RESOURCE; - /** * StreamingService class. Uses a {@code Subscriber} and a * {@code Publisher} for uploading and downloading files. */ public class StreamingService implements Service { private static final Logger LOGGER = Logger.getLogger(StreamingService.class.getName()); - - private final Path filePath; + private final ScheduledExecutorService executor = ScheduledThreadPoolSupplier.create().get(); + private volatile Path filePath; StreamingService() { - try { - filePath = Paths.get(getClass().getResource(LARGE_FILE_RESOURCE).toURI()); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } } @Override public void update(Routing.Rules routingRules) { routingRules.get("/download", this::download) - .post("/upload", this::upload); + .post("/upload", this::upload); } private void upload(ServerRequest request, ServerResponse response) { LOGGER.info("Entering upload ... " + Thread.currentThread()); - request.content().subscribe(new ServerFileWriter(response)); + Path tempFilePath = createTempFile("large-file", ".tmp"); + filePath = tempFilePath; + LOGGER.info("Storing upload as " + tempFilePath); + request.content() + .map(DataChunk::data) + .flatMapIterable(Arrays::asList) + .to(IoMulti.writeToFile(tempFilePath) + .executor(executor) + .build()) + .thenRun(() -> response.status(Http.Status.OK_200).send()); LOGGER.info("Exiting upload ..."); } private void download(ServerRequest request, ServerResponse response) { LOGGER.info("Entering download ..." + Thread.currentThread()); + if (filePath == null) { + LOGGER.warning("No file to download."); + response.status(Http.Status.BAD_REQUEST_400).send("No file to download. Please upload file first."); + return; + } long length = filePath.toFile().length(); - response.headers().add("Content-Length", String.valueOf(length)); - response.send(new ServerFileReader(filePath)); - LOGGER.info("Exiting download ..."); + response.headers().contentLength(length); + response.send(IoMulti.multiFromByteChannelBuilder(newByteChannel(filePath)) + .executor(executor) + .build() + .map(DataChunk::create)); + LOGGER.info("Exiting download. Returning " + length + " bytes..."); + } + + @SuppressWarnings("SameParameterValue") + private static Path createTempFile(String prefix, String suffix) { + try { + return Files.createTempFile(prefix, suffix); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + + private static ReadableByteChannel newByteChannel(Path path) { + try { + return Files.newByteChannel(path); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } } } diff --git a/examples/webserver/streaming/src/test/java/io/helidon/webserver/examples/streaming/MainTest.java b/examples/webserver/streaming/src/test/java/io/helidon/webserver/examples/streaming/MainTest.java new file mode 100644 index 00000000..d813c956 --- /dev/null +++ b/examples/webserver/streaming/src/test/java/io/helidon/webserver/examples/streaming/MainTest.java @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.examples.streaming; + +import io.helidon.common.reactive.Single; +import io.helidon.config.Config; +import io.helidon.config.ConfigSources; +import io.helidon.webclient.WebClient; +import io.helidon.webclient.WebClientResponse; +import io.helidon.webserver.WebServer; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static io.helidon.common.http.Http.Status; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.TestMethodOrder; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class MainTest { + + private static WebServer webServer; + private static WebClient webClient; + private static int publicPort; + + private final String TEST_DATA_1 = "Test Data 1"; + private final String TEST_DATA_2 = "Test Data 2"; + + @BeforeAll + public static void startTheServer() { + webServer = WebServer.builder(Main.createRouting()).build().start().await(); + webClient = WebClient.builder().build(); + publicPort = webServer.port(); + } + + @Test + @Order(0) + void testBadRequest() throws Exception { + webClient.get() + .uri("http://localhost:" + publicPort) + .path("/download") + .request() + .thenAccept(response -> assertThat(response.status(), is(Status.BAD_REQUEST_400))) + .toCompletableFuture() + .get(); + } + + @Test + @Order(1) + void testUpload1() throws Exception { + WebClientResponse response = webClient.post() + .uri("http://localhost:" + publicPort) + .path("/upload") + .submit(TEST_DATA_1) + .toCompletableFuture() + .get(); + assertThat(response.status(), is(Status.OK_200)); + } + + @Test + @Order(2) + void testDownload1() throws Exception { + WebClientResponse response = webClient.get() + .uri("http://localhost:" + publicPort) + .path("/download") + .request() + .toCompletableFuture() + .get(); + assertThat(response.status(), is(Status.OK_200)); + assertThat(response.content().as(String.class).get(), is (TEST_DATA_1)); + } + + @Test + @Order(3) + void testUpload2() throws Exception { + WebClientResponse response = webClient.post() + .uri("http://localhost:" + publicPort) + .path("/upload") + .submit(TEST_DATA_2) + .toCompletableFuture() + .get(); + assertThat(response.status(), is(Status.OK_200)); + } + + @Test + @Order(4) + void testDownload2() throws Exception { + WebClientResponse response = webClient.get() + .uri("http://localhost:" + publicPort) + .path("/download") + .request() + .toCompletableFuture() + .get(); + assertThat(response.status(), is(Status.OK_200)); + assertThat(response.content().as(String.class).get(), is (TEST_DATA_2)); + } +} \ No newline at end of file