Skip to content

Commit

Permalink
Updates signature of split method to return a Stream.
Browse files Browse the repository at this point in the history
Signed-off-by: Santiago Pericas-Geertsen <[email protected]>
  • Loading branch information
spericas committed Jul 24, 2024
1 parent 9feb8a8 commit 025935d
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,57 @@

import io.helidon.grpc.api.Grpc;
import io.helidon.grpc.core.CollectingObserver;
import io.helidon.grpc.core.ResponseHelper;

import io.grpc.stub.StreamObserver;
import jakarta.enterprise.context.ApplicationScoped;

/**
* An implementation of a string service.
*/
@Grpc.GrpcService
@ApplicationScoped
public class StringService {

/**
* Uppercase a string.
*
* @param request string message
* @return string message
*/
@Grpc.Unary("Upper")
public Strings.StringMessage upper(Strings.StringMessage request) {
return newMessage(request.getText().toUpperCase());
}

/**
* Lowercase a string.
*
* @param request string message
* @return string message
*/
@Grpc.Unary("Lower")
public Strings.StringMessage lower(Strings.StringMessage request) {
return newMessage(request.getText().toLowerCase());
}

/**
* Split a string using space delimiters.
*
* @param request string message
* @return stream of string messages
*/
@Grpc.ServerStreaming("Split")
public void split(Strings.StringMessage request, StreamObserver<Strings.StringMessage> observer) {
public Stream<Strings.StringMessage> split(Strings.StringMessage request) {
String[] parts = request.getText().split(" ");
ResponseHelper.stream(observer, Stream.of(parts).map(this::newMessage));
return Stream.of(parts).map(this::newMessage);
}

/**
* Join a stream of messages using spaces.
*
* @param observer stream of messages
* @return single message as a stream
*/
@Grpc.ClientStreaming("Join")
public StreamObserver<Strings.StringMessage> join(StreamObserver<Strings.StringMessage> observer) {
return CollectingObserver.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,52 @@

package io.helidon.examples.microprofile.grpc;

import java.util.stream.Stream;

import io.helidon.grpc.api.Grpc;

import io.grpc.stub.StreamObserver;

/**
* A client for a {@link StringService}.
*/
@Grpc.GrpcService("StringService")
@Grpc.GrpcChannel("string-channel") // see application.yaml
public interface StringServiceClient {

/**
* Uppercase a string.
*
* @param request string message
* @return string message
*/
@Grpc.Unary("Upper")
Strings.StringMessage upper(Strings.StringMessage request);

/**
* Lowercase a string.
*
* @param request string message
* @return string message
*/
@Grpc.Unary("Lower")
Strings.StringMessage lower(Strings.StringMessage request);

/**
* Split a string using space delimiters.
*
* @param request string message
* @return stream of string messages
*/
@Grpc.ServerStreaming("Split")
void split(Strings.StringMessage request, StreamObserver<Strings.StringMessage> observer);
Stream<Strings.StringMessage> split(Strings.StringMessage request);

/**
* Join a stream of messages using spaces.
*
* @param observer stream of messages
* @return single message as a stream
*/
@Grpc.ClientStreaming("Join")
StreamObserver<Strings.StringMessage> join(StreamObserver<Strings.StringMessage> observer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.examples.microprofile.grpc;
package io.helidon.examples.microprofile.grpc;
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import io.helidon.grpc.api.Grpc;
import io.helidon.microprofile.grpc.client.GrpcClientCdiExtension;
Expand Down Expand Up @@ -57,10 +58,9 @@ void testUnaryLower() {
}

@Test
void testServerStreamingSplit() throws InterruptedException {
ListObserver<Strings.StringMessage> response = new ListObserver<>();
client.split(newMessage("hello world"), response);
List<Strings.StringMessage> value = response.value();
void testServerStreamingSplit() {
Stream<Strings.StringMessage> stream = client.split(newMessage("hello world"));
List<Strings.StringMessage> value = stream.toList();
assertThat(value, hasSize(2));
assertThat(value, contains(newMessage("hello"), newMessage("world")));
}
Expand Down

0 comments on commit 025935d

Please sign in to comment.