Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Feb 23, 2022
1 parent 4edcf63 commit 6b35b68
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 239 deletions.
59 changes: 59 additions & 0 deletions vertx-grpc/src/main/java/io/vertx/grpc/server/GrpcMethodCall.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.vertx.grpc.server;

import io.grpc.ServerMethodDefinition;
import io.vertx.core.Handler;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;

import java.io.IOException;
import java.io.InputStream;

public class GrpcMethodCall<Req, Resp> {

final GrpcRequest request;
final ServerMethodDefinition<Req, Resp> def;
Handler<Req> handler;
Handler<Void> endHandler;

public GrpcMethodCall(GrpcRequest request, ServerMethodDefinition<Req, Resp> def) {
this.request = request;
this.def = def;
}

public GrpcMethodCall<Req, Resp> handler(Handler<Req> handler) {
this.handler = handler;
return this;
}

public GrpcMethodCall<Req, Resp> endHandler(Handler<Void> endHandler) {
this.endHandler = endHandler;
return this;
}

public void write(Resp message) {
request.write(encode(message));
}

public void end(Resp message) {
request.end(encode(message));
}

private Buffer encode(Resp resp) {
Buffer encoded = Buffer.buffer();
InputStream stream = def.getMethodDescriptor().streamResponse(resp);
byte[] tmp = new byte[256];
int i;
try {
while ((i = stream.read(tmp)) != -1) {
encoded.appendBytes(tmp, 0, i);
}
} catch (IOException e) {
throw new VertxException(e);
}
return encoded;
}

public void end() {
request.end();
}
}
75 changes: 75 additions & 0 deletions vertx-grpc/src/main/java/io/vertx/grpc/server/GrpcRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.vertx.grpc.server;

import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerRequest;

public class GrpcRequest {

final HttpServerRequest httpRequest;
Handler<GrpcMessage> messageHandler;
Handler<Void> endHandler;

public GrpcRequest(HttpServerRequest httpRequest) {
this.httpRequest = httpRequest;
}

public String fullMethodName() {
return httpRequest.path().substring(1);
}

public GrpcRequest messageHandler(Handler<GrpcMessage> messageHandler) {
this.messageHandler = messageHandler;
return this;
}

public GrpcRequest endHandler(Handler<Void> endHandler) {
this.endHandler = endHandler;
return this;
}

public void write(Buffer message) {
write(message, false);
}

public void end(Buffer message) {
write(message, true);
}

public void end() {
write(null, true);
}

private boolean headerSent;

private void write(Buffer message, boolean end) {
Buffer encoded;
if (message != null) {
encoded = Buffer.buffer(message.length());
encoded.appendByte((byte)0); // Compression
encoded.appendInt(message.length()); // Length
encoded.appendBuffer(message);
} else {
encoded = null;
}
if (!headerSent) {
headerSent = true;
MultiMap responseHeaders = httpRequest.response().headers();
responseHeaders.set("content-type", "application/grpc");
responseHeaders.set("grpc-encoding", "identity");
responseHeaders.set("grpc-accept-encoding", "gzip");
}
if (end) {
MultiMap responseTrailers = httpRequest.response().trailers();
responseTrailers.set("grpc-status", "0");
if (encoded != null) {
httpRequest.response().end(encoded);
} else {
httpRequest.response().end();
}
} else {
httpRequest.response().write(encoded);
}
}
}
171 changes: 59 additions & 112 deletions vertx-grpc/src/main/java/io/vertx/grpc/server/GrpcService.java
Original file line number Diff line number Diff line change
@@ -1,147 +1,94 @@
package io.vertx.grpc.server;

import io.grpc.MethodDescriptor;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerRequest;

import java.io.IOException;
import java.io.InputStream;
import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.Map;

public class GrpcService implements Handler<HttpServerRequest> {

private Handler<GrpcServiceRequest> requestHandler;
private ServerServiceDefinition serviceDefinition;
private Handler<GrpcRequest> requestHandler;
private Map<String, MethodCallHandler<?, ?>> handlerMap = new HashMap<>();

@Override
public void handle(HttpServerRequest request) {
String methodName = request.path().substring(1);
ServerMethodDefinition<?, ?> method = serviceDefinition.getMethod(methodName);
if (method != null) {
MethodDescriptor desc = method.getMethodDescriptor();
MethodDescriptor.MethodType type = desc.getType();
switch (type) {
case UNARY:
case SERVER_STREAMING:
case CLIENT_STREAMING:
case BIDI_STREAMING:
GrpcServiceRequest grpcRequest = new GrpcServiceRequest(new GrpcResponseImpl(desc, request), method);
request.handler(envelope -> {
int idx = 0;
while (idx < envelope.length()) {
int len = envelope.getInt(idx + 1);
Buffer data = envelope.slice(idx + 5, idx + 5 + len);
GrpcMessage msg = new GrpcMessage(data);
Handler<GrpcMessage> msgHandler = grpcRequest.messageHandler;
if (msgHandler != null) {
msgHandler.handle(msg);
}
idx += 5 + len;
}
});
request.endHandler(v -> {
Handler<Void> handler = grpcRequest.endHandler;
if (handler != null) {
handler.handle(null);
}
});
handleUnary(grpcRequest);
break;
default:
request.response().setStatusCode(500).end();
break;
GrpcRequest grpcRequest = new GrpcRequest(request);
request.handler(envelope -> {
int idx = 0;
while (idx < envelope.length()) {
int len = envelope.getInt(idx + 1);
Buffer data = envelope.slice(idx + 5, idx + 5 + len);
GrpcMessage msg = new GrpcMessage(data);
Handler<GrpcMessage> msgHandler = grpcRequest.messageHandler;
if (msgHandler != null) {
msgHandler.handle(msg);
}
idx += 5 + len;
}
}
});
request.endHandler(v -> {
Handler<Void> handler = grpcRequest.endHandler;
if (handler != null) {
handler.handle(null);
}
});
handleUnary(grpcRequest);
}

private void handleUnary(GrpcServiceRequest request) {
Handler<GrpcServiceRequest> handler = requestHandler;
if (handler != null) {
private void handleUnary(GrpcRequest request) {
String fmn = request.fullMethodName();
MethodCallHandler<?, ?> method = handlerMap.get(fmn);
if (method != null) {
method.handle(request);
} else {
Handler<GrpcRequest> handler = requestHandler;
if (handler != null) {
handler.handle(request);
} else {
request.httpRequest.response().setStatusCode(500).end();
}
}
}

public GrpcService serviceDefinition(ServerServiceDefinition serviceDefinition) {
this.serviceDefinition = serviceDefinition;
public GrpcService requestHandler(Handler<GrpcRequest> requestHandler) {
this.requestHandler = requestHandler;
return this;
}

public GrpcService requestHandler(Handler<GrpcServiceRequest> requestHandler) {
this.requestHandler = requestHandler;
public <Req, Resp> GrpcService methodHandler(ServerMethodDefinition<Req, Resp> def, Handler<GrpcMethodCall<Req, Resp>> handler) {
handlerMap.put(def.getMethodDescriptor().getFullMethodName(), new MethodCallHandler<>(def, handler));
return this;
}

private static class GrpcResponseImpl implements GrpcServiceResponse {

private final MethodDescriptor desc;
private final HttpServerRequest request;

public GrpcResponseImpl(MethodDescriptor desc, HttpServerRequest request) {
this.desc = desc;
this.request = request;
}

@Override
public void write(Object message) {
write(message, false);
private static class MethodCallHandler<Req, Resp> implements Handler<GrpcRequest> {
final ServerMethodDefinition<Req, Resp> def;
final Handler<GrpcMethodCall<Req, Resp>> handler;
MethodCallHandler(ServerMethodDefinition<Req, Resp> def, Handler<GrpcMethodCall<Req, Resp>> handler) {
this.def = def;
this.handler = handler;
}

@Override
public void end(Object message) {
write(message, true);
}

@Override
public void end() {
write(null, true);
}

private boolean headerSent;

private void write(Object message, boolean end) {
Buffer encoded;
if (message != null) {
InputStream stream = desc.streamResponse(message);
byte[] tmp = new byte[256];
int i;
try {
encoded = Buffer.buffer();
encoded.appendByte((byte)0); // Compression
encoded.appendIntLE(0); // Length
while ((i = stream.read(tmp)) != -1) {
encoded.appendBytes(tmp, 0, i);
}
encoded.setInt(1, encoded.length() - 5);
} catch (IOException e) {
e.printStackTrace();
return;
public void handle(GrpcRequest request) {
GrpcMethodCall<Req, Resp> call = new GrpcMethodCall<>(request, def);
request.messageHandler(msg -> {
ByteArrayInputStream in = new ByteArrayInputStream(msg.data().getBytes());
Req obj = def.getMethodDescriptor().parseRequest(in);
Handler<Req> handler = call.handler;
if (handler != null) {
handler.handle(obj);
}
} else {
encoded = null;
}
if (!headerSent) {
headerSent = true;
MultiMap responseHeaders = request.response().headers();
responseHeaders.set("content-type", "application/grpc");
responseHeaders.set("grpc-encoding", "identity");
responseHeaders.set("grpc-accept-encoding", "gzip");
}
if (end) {
MultiMap responseTrailers = request.response().trailers();
responseTrailers.set("grpc-status", "0");
if (encoded != null) {
request.response().end(encoded);
} else {
request.response().end();
});
request.endHandler(v -> {
Handler<Void> handler = call.endHandler;
if (handler != null) {
handler.handle(null);
}
} else {
request.response().write(encoded);
}
});
handler.handle(call);
}
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit 6b35b68

Please sign in to comment.