diff --git a/api/protocol/outofstoremessagetypes/outofstoremessage.proto b/api/protocol/outofstoremessagetypes/outofstoremessage.proto new file mode 100644 index 00000000..643d40d8 --- /dev/null +++ b/api/protocol/outofstoremessagetypes/outofstoremessage.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +package weshnet.outofstoremessage.v1; + +import "gogoproto/gogo.proto"; +import "protocoltypes.proto"; + +option go_package = "berty.tech/weshnet/pkg/outofstoremessagetypes"; + +option (gogoproto.goproto_enum_prefix_all) = false; +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.sizer_all) = true; + +// OutOfStoreMessageService is the service used to open out-of-store messages (e.g. push notifications) +// It is used to open messages with a lightweight protocol service for mobile backgroup processes. +service OutOfStoreMessageService { + // OutOfStoreReceive parses a payload received outside a synchronized store + rpc OutOfStoreReceive(weshnet.protocol.v1.OutOfStoreReceive.Request) returns (weshnet.protocol.v1.OutOfStoreReceive.Reply); +} diff --git a/outofstoremessage_test.go b/outofstoremessage_test.go new file mode 100644 index 00000000..bd6f458d --- /dev/null +++ b/outofstoremessage_test.go @@ -0,0 +1,154 @@ +package weshnet + +import ( + "context" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" + + "berty.tech/weshnet/pkg/protocoltypes" + "berty.tech/weshnet/pkg/secretstore" + "berty.tech/weshnet/pkg/testutil" +) + +func Test_sealPushMessage_OutOfStoreReceive(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tp, cancel := NewTestingProtocol(ctx, t, &TestingOpts{}, nil) + defer cancel() + + g, _, err := NewGroupMultiMember() + require.NoError(t, err) + + s := tp.Service + + gPK, err := g.GetPubKey() + require.NoError(t, err) + + _, err = s.MultiMemberGroupJoin(ctx, &protocoltypes.MultiMemberGroupJoin_Request{Group: g}) + require.NoError(t, err) + + gPKRaw, err := gPK.Raw() + require.NoError(t, err) + + _, err = s.ActivateGroup(ctx, &protocoltypes.ActivateGroup_Request{GroupPK: gPKRaw}) + require.NoError(t, err) + + gc, err := s.(ServiceMethods).GetContextGroupForID(g.PublicKey) + require.NoError(t, err) + + otherSecretStore, cancel := createVirtualOtherPeerSecrets(t, ctx, gc) + defer cancel() + + testPayload := []byte("test payload") + + envBytes, err := otherSecretStore.SealEnvelope(ctx, g, testPayload) + require.NoError(t, err) + + env, headers, err := otherSecretStore.OpenEnvelopeHeaders(envBytes, g) + require.NoError(t, err) + + oosMsgEnv, err := otherSecretStore.SealOutOfStoreMessageEnvelope(cid.Undef, env, headers, g) + require.NoError(t, err) + oosMsgEnvBytes, err := oosMsgEnv.Marshal() + require.NoError(t, err) + + outOfStoreMessage, group, clearPayload, alreadyDecrypted, err := gc.SecretStore().OpenOutOfStoreMessage(ctx, oosMsgEnvBytes) + require.NoError(t, err) + + require.Equal(t, g, group) + require.Equal(t, []byte("test payload"), clearPayload) + require.False(t, alreadyDecrypted) + + require.Equal(t, headers.Counter, outOfStoreMessage.Counter) + require.Equal(t, headers.DevicePK, outOfStoreMessage.DevicePK) + require.Equal(t, headers.Sig, outOfStoreMessage.Sig) + require.Equal(t, env.Message, outOfStoreMessage.EncryptedPayload) +} + +func Test_OutOfStoreMessageFlow(t *testing.T) { + message := []byte("test message") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + logger, cleanup := testutil.Logger(t) + defer cleanup() + + tp, cancel := NewTestingProtocol(ctx, t, &TestingOpts{Logger: logger}, nil) + defer cancel() + + g, _, err := NewGroupMultiMember() + require.NoError(t, err) + + s := tp.Service + + gPK, err := g.GetPubKey() + require.NoError(t, err) + + _, err = s.MultiMemberGroupJoin(ctx, &protocoltypes.MultiMemberGroupJoin_Request{Group: g}) + require.NoError(t, err) + + gPKRaw, err := gPK.Raw() + require.NoError(t, err) + + _, err = s.ActivateGroup(ctx, &protocoltypes.ActivateGroup_Request{GroupPK: gPKRaw}) + require.NoError(t, err) + + // send a message + sendReply, err := s.AppMessageSend(ctx, &protocoltypes.AppMessageSend_Request{ + GroupPK: gPKRaw, + Payload: message, + }) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // craft an out of store message + craftReply, err := s.OutOfStoreSeal(ctx, &protocoltypes.OutOfStoreSeal_Request{ + CID: sendReply.CID, + GroupPublicKey: gPKRaw, + }) + require.NoError(t, err) + + // verify the out of store message + openReply, err := s.OutOfStoreReceive(ctx, &protocoltypes.OutOfStoreReceive_Request{ + Payload: craftReply.Encrypted, + }) + require.NoError(t, err) + + encryptedMessage := protocoltypes.EncryptedMessage{} + err = encryptedMessage.Unmarshal(openReply.Cleartext) + require.NoError(t, err) + + require.Equal(t, message, encryptedMessage.Plaintext) +} + +func createVirtualOtherPeerSecrets(t testing.TB, ctx context.Context, gc *GroupContext) (secretstore.SecretStore, func()) { + secretStore, err := secretstore.NewInMemSecretStore(nil) + require.NoError(t, err) + + cleanup := func() { + _ = secretStore.Close() + } + + // Manually adding another member to the group + otherMD, err := secretStore.GetOwnMemberDeviceForGroup(gc.Group()) + _, err = MetadataStoreAddDeviceToGroup(ctx, gc.MetadataStore(), gc.Group(), otherMD) + require.NoError(t, err) + + memberDevice, err := gc.SecretStore().GetOwnMemberDeviceForGroup(gc.Group()) + require.NoError(t, err) + + ds, err := secretStore.GetShareableChainKey(ctx, gc.Group(), memberDevice.Member()) + + _, err = MetadataStoreSendSecret(ctx, gc.MetadataStore(), gc.Group(), otherMD, memberDevice.Member(), ds) + require.NoError(t, err) + + time.Sleep(time.Millisecond * 200) + + return secretStore, cleanup +} diff --git a/pkg/outofstoremessagetypes/outofstoremessage.pb.go b/pkg/outofstoremessagetypes/outofstoremessage.pb.go new file mode 100644 index 00000000..6ba8ad38 --- /dev/null +++ b/pkg/outofstoremessagetypes/outofstoremessage.pb.go @@ -0,0 +1,44 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: outofstoremessagetypes/outofstoremessage.proto + +package outofstoremessagetypes + +import ( + _ "berty.tech/weshnet/pkg/protocoltypes" + fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +func init() { + proto.RegisterFile("outofstoremessagetypes/outofstoremessage.proto", fileDescriptor_14aba72a66934192) +} + +var fileDescriptor_14aba72a66934192 = []byte{ + // 206 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0xcb, 0x2f, 0x2d, 0xc9, + 0x4f, 0x2b, 0x2e, 0xc9, 0x2f, 0x4a, 0xcd, 0x4d, 0x2d, 0x2e, 0x4e, 0x4c, 0x4f, 0x2d, 0xa9, 0x2c, + 0x48, 0x2d, 0xd6, 0xc7, 0x10, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x92, 0x29, 0x4f, 0x2d, + 0xce, 0xc8, 0x4b, 0x2d, 0xc1, 0xd4, 0xa7, 0x57, 0x66, 0x28, 0x25, 0x92, 0x9e, 0x9f, 0x9e, 0x0f, + 0x56, 0xa8, 0x0f, 0x62, 0x41, 0xf4, 0x48, 0x09, 0x83, 0xa9, 0xe4, 0xfc, 0x1c, 0xb0, 0xd1, 0x10, + 0x41, 0xa3, 0x5e, 0x46, 0x2e, 0x09, 0xff, 0xd2, 0x12, 0xff, 0xb4, 0x60, 0x90, 0x19, 0xbe, 0x10, + 0x33, 0x82, 0x53, 0x8b, 0xca, 0x32, 0x93, 0x53, 0x85, 0x0a, 0xb9, 0x04, 0x11, 0x72, 0x41, 0xa9, + 0xc9, 0xa9, 0x99, 0x65, 0xa9, 0x42, 0x7a, 0x7a, 0x30, 0xbb, 0x61, 0xe6, 0xe9, 0x95, 0x19, 0xea, + 0x61, 0xa8, 0xd3, 0x0b, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x91, 0xd2, 0x21, 0x5a, 0x7d, 0x41, + 0x4e, 0xa5, 0x93, 0xfd, 0x85, 0x87, 0x72, 0x0c, 0x27, 0x1e, 0xc9, 0x31, 0x5e, 0x78, 0x24, 0xc7, + 0xf8, 0xe0, 0x91, 0x1c, 0x63, 0x94, 0x6e, 0x52, 0x6a, 0x51, 0x49, 0xa5, 0x5e, 0x49, 0x6a, 0x72, + 0x86, 0x3e, 0xd4, 0x24, 0xfd, 0x82, 0xec, 0x74, 0x7d, 0xec, 0x21, 0x96, 0xc4, 0x06, 0xb6, 0xc5, + 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0xea, 0xbf, 0x6c, 0x90, 0x52, 0x01, 0x00, 0x00, +} diff --git a/pkg/outofstoremessagetypes/outofstoremessage.pb.gw.go b/pkg/outofstoremessagetypes/outofstoremessage.pb.gw.go new file mode 100644 index 00000000..40f480d0 --- /dev/null +++ b/pkg/outofstoremessagetypes/outofstoremessage.pb.gw.go @@ -0,0 +1,170 @@ +// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT. +// source: outofstoremessagetypes/outofstoremessage.proto + +/* +Package outofstoremessagetypes is a reverse proxy. + +It translates gRPC into RESTful JSON APIs. +*/ +package outofstoremessagetypes + +import ( + "context" + "io" + "net/http" + + "berty.tech/weshnet/pkg/protocoltypes" + "github.com/golang/protobuf/descriptor" + "github.com/golang/protobuf/proto" + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/grpc-ecosystem/grpc-gateway/utilities" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +// Suppress "imported and not used" errors +var _ codes.Code +var _ io.Reader +var _ status.Status +var _ = runtime.String +var _ = utilities.NewDoubleArray +var _ = descriptor.ForMessage +var _ = metadata.Join + +func request_OutOfStoreMessageService_OutOfStoreReceive_0(ctx context.Context, marshaler runtime.Marshaler, client OutOfStoreMessageServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq protocoltypes.OutOfStoreReceive_Request + var metadata runtime.ServerMetadata + + newReader, berr := utilities.IOReaderFactory(req.Body) + if berr != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) + } + if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + msg, err := client.OutOfStoreReceive(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + +func local_request_OutOfStoreMessageService_OutOfStoreReceive_0(ctx context.Context, marshaler runtime.Marshaler, server OutOfStoreMessageServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq protocoltypes.OutOfStoreReceive_Request + var metadata runtime.ServerMetadata + + newReader, berr := utilities.IOReaderFactory(req.Body) + if berr != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) + } + if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + msg, err := server.OutOfStoreReceive(ctx, &protoReq) + return msg, metadata, err + +} + +// RegisterOutOfStoreMessageServiceHandlerServer registers the http handlers for service OutOfStoreMessageService to "mux". +// UnaryRPC :call OutOfStoreMessageServiceServer directly. +// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. +// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterOutOfStoreMessageServiceHandlerFromEndpoint instead. +func RegisterOutOfStoreMessageServiceHandlerServer(ctx context.Context, mux *runtime.ServeMux, server OutOfStoreMessageServiceServer) error { + + mux.Handle("POST", pattern_OutOfStoreMessageService_OutOfStoreReceive_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_OutOfStoreMessageService_OutOfStoreReceive_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + ctx = runtime.NewServerMetadataContext(ctx, md) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + + forward_OutOfStoreMessageService_OutOfStoreReceive_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + + return nil +} + +// RegisterOutOfStoreMessageServiceHandlerFromEndpoint is same as RegisterOutOfStoreMessageServiceHandler but +// automatically dials to "endpoint" and closes the connection when "ctx" gets done. +func RegisterOutOfStoreMessageServiceHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { + conn, err := grpc.Dial(endpoint, opts...) + if err != nil { + return err + } + defer func() { + if err != nil { + if cerr := conn.Close(); cerr != nil { + grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + } + return + } + go func() { + <-ctx.Done() + if cerr := conn.Close(); cerr != nil { + grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + } + }() + }() + + return RegisterOutOfStoreMessageServiceHandler(ctx, mux, conn) +} + +// RegisterOutOfStoreMessageServiceHandler registers the http handlers for service OutOfStoreMessageService to "mux". +// The handlers forward requests to the grpc endpoint over "conn". +func RegisterOutOfStoreMessageServiceHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error { + return RegisterOutOfStoreMessageServiceHandlerClient(ctx, mux, NewOutOfStoreMessageServiceClient(conn)) +} + +// RegisterOutOfStoreMessageServiceHandlerClient registers the http handlers for service OutOfStoreMessageService +// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "OutOfStoreMessageServiceClient". +// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "OutOfStoreMessageServiceClient" +// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in +// "OutOfStoreMessageServiceClient" to call the correct interceptors. +func RegisterOutOfStoreMessageServiceHandlerClient(ctx context.Context, mux *runtime.ServeMux, client OutOfStoreMessageServiceClient) error { + + mux.Handle("POST", pattern_OutOfStoreMessageService_OutOfStoreReceive_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + rctx, err := runtime.AnnotateContext(ctx, mux, req) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_OutOfStoreMessageService_OutOfStoreReceive_0(rctx, inboundMarshaler, client, req, pathParams) + ctx = runtime.NewServerMetadataContext(ctx, md) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + + forward_OutOfStoreMessageService_OutOfStoreReceive_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + + return nil +} + +var ( + pattern_OutOfStoreMessageService_OutOfStoreReceive_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"weshnet.outofstoremessage.v1.OutOfStoreMessageService", "OutOfStoreReceive"}, "", runtime.AssumeColonVerbOpt(true))) +) + +var ( + forward_OutOfStoreMessageService_OutOfStoreReceive_0 = runtime.ForwardResponseMessage +) diff --git a/pkg/outofstoremessagetypes/outofstoremessage_grpc.pb.go b/pkg/outofstoremessagetypes/outofstoremessage_grpc.pb.go new file mode 100644 index 00000000..7ec721da --- /dev/null +++ b/pkg/outofstoremessagetypes/outofstoremessage_grpc.pb.go @@ -0,0 +1,105 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package outofstoremessagetypes + +import ( + protocoltypes "berty.tech/weshnet/pkg/protocoltypes" + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// OutOfStoreMessageServiceClient is the client API for OutOfStoreMessageService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type OutOfStoreMessageServiceClient interface { + // OutOfStoreReceive parses a payload received outside a synchronized store + OutOfStoreReceive(ctx context.Context, in *protocoltypes.OutOfStoreReceive_Request, opts ...grpc.CallOption) (*protocoltypes.OutOfStoreReceive_Reply, error) +} + +type outOfStoreMessageServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewOutOfStoreMessageServiceClient(cc grpc.ClientConnInterface) OutOfStoreMessageServiceClient { + return &outOfStoreMessageServiceClient{cc} +} + +func (c *outOfStoreMessageServiceClient) OutOfStoreReceive(ctx context.Context, in *protocoltypes.OutOfStoreReceive_Request, opts ...grpc.CallOption) (*protocoltypes.OutOfStoreReceive_Reply, error) { + out := new(protocoltypes.OutOfStoreReceive_Reply) + err := c.cc.Invoke(ctx, "/weshnet.outofstoremessage.v1.OutOfStoreMessageService/OutOfStoreReceive", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// OutOfStoreMessageServiceServer is the server API for OutOfStoreMessageService service. +// All implementations must embed UnimplementedOutOfStoreMessageServiceServer +// for forward compatibility +type OutOfStoreMessageServiceServer interface { + // OutOfStoreReceive parses a payload received outside a synchronized store + OutOfStoreReceive(context.Context, *protocoltypes.OutOfStoreReceive_Request) (*protocoltypes.OutOfStoreReceive_Reply, error) + mustEmbedUnimplementedOutOfStoreMessageServiceServer() +} + +// UnimplementedOutOfStoreMessageServiceServer must be embedded to have forward compatible implementations. +type UnimplementedOutOfStoreMessageServiceServer struct { +} + +func (UnimplementedOutOfStoreMessageServiceServer) OutOfStoreReceive(context.Context, *protocoltypes.OutOfStoreReceive_Request) (*protocoltypes.OutOfStoreReceive_Reply, error) { + return nil, status.Errorf(codes.Unimplemented, "method OutOfStoreReceive not implemented") +} +func (UnimplementedOutOfStoreMessageServiceServer) mustEmbedUnimplementedOutOfStoreMessageServiceServer() { +} + +// UnsafeOutOfStoreMessageServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to OutOfStoreMessageServiceServer will +// result in compilation errors. +type UnsafeOutOfStoreMessageServiceServer interface { + mustEmbedUnimplementedOutOfStoreMessageServiceServer() +} + +func RegisterOutOfStoreMessageServiceServer(s grpc.ServiceRegistrar, srv OutOfStoreMessageServiceServer) { + s.RegisterService(&OutOfStoreMessageService_ServiceDesc, srv) +} + +func _OutOfStoreMessageService_OutOfStoreReceive_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(protocoltypes.OutOfStoreReceive_Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(OutOfStoreMessageServiceServer).OutOfStoreReceive(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/weshnet.outofstoremessage.v1.OutOfStoreMessageService/OutOfStoreReceive", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(OutOfStoreMessageServiceServer).OutOfStoreReceive(ctx, req.(*protocoltypes.OutOfStoreReceive_Request)) + } + return interceptor(ctx, in, info, handler) +} + +// OutOfStoreMessageService_ServiceDesc is the grpc.ServiceDesc for OutOfStoreMessageService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var OutOfStoreMessageService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "weshnet.outofstoremessage.v1.OutOfStoreMessageService", + HandlerType: (*OutOfStoreMessageServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "OutOfStoreReceive", + Handler: _OutOfStoreMessageService_OutOfStoreReceive_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "outofstoremessagetypes/outofstoremessage.proto", +} diff --git a/service_outofstoremessage.go b/service_outofstoremessage.go new file mode 100644 index 00000000..7a12809b --- /dev/null +++ b/service_outofstoremessage.go @@ -0,0 +1,271 @@ +package weshnet + +import ( + "context" + "fmt" + "io" + "time" + + ds "github.com/ipfs/go-datastore" + ds_sync "github.com/ipfs/go-datastore/sync" + ipfs_interface "github.com/ipfs/interface-go-ipfs-core" + "go.uber.org/zap" + "google.golang.org/grpc" + + "berty.tech/weshnet/pkg/errcode" + "berty.tech/weshnet/pkg/grpcutil" + "berty.tech/weshnet/pkg/outofstoremessagetypes" + "berty.tech/weshnet/pkg/protocoltypes" + "berty.tech/weshnet/pkg/secretstore" +) + +type OOSMService interface { + outofstoremessagetypes.OutOfStoreMessageServiceServer +} + +var _ OOSMService = (*oosmService)(nil) + +type oosmService struct { + logger *zap.Logger + rootDatastore ds.Datastore + secretStore secretstore.SecretStore + + outofstoremessagetypes.UnimplementedOutOfStoreMessageServiceServer +} + +type OOSMServiceClient interface { + outofstoremessagetypes.OutOfStoreMessageServiceClient + + io.Closer +} + +type oosmServiceClient struct { + OOSMServiceClient + + service OOSMService + server *grpc.Server +} + +type OOSMOption func(*oosmService) error + +// NewOutOfStoreMessageServiceClient creates a new Wesh protocol service and returns a gRPC +// ServiceClient which uses a direct in-memory connection. When finished, you must call Close(). +// This opens or creates a Wesh account where the datastore location is specified by the path argument. +// The service will not start any network stuff, it will only use the filesystem to store or get data. +func NewOutOfStoreMessageServiceClient(opts ...OOSMOption) (OOSMServiceClient, error) { + svc, err := NewOutOfStoreMessageService(opts...) + if err != nil { + return nil, err + } + + s := grpc.NewServer() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + c, err := newClientFromService(ctx, s, svc) + if err != nil { + return nil, fmt.Errorf("uanble to create client from server: %w", err) + } + + return &oosmServiceClient{ + OOSMServiceClient: c, + server: s, + service: svc, + }, nil +} + +type oosmClient struct { + outofstoremessagetypes.OutOfStoreMessageServiceClient + + l *grpcutil.BufListener + cc *grpc.ClientConn +} + +func (c *oosmClient) Close() error { + err := c.cc.Close() + _ = c.l.Close() + return err +} + +func newClientFromService(ctx context.Context, s *grpc.Server, svc OOSMService, opts ...grpc.DialOption) (OOSMServiceClient, error) { + bl := grpcutil.NewBufListener(ClientBufferSize) + cc, err := bl.NewClientConn(ctx, opts...) + if err != nil { + return nil, err + } + + outofstoremessagetypes.RegisterOutOfStoreMessageServiceServer(s, svc) + go func() { + // we dont need to log the error + _ = s.Serve(bl) + }() + + return &oosmClient{ + OutOfStoreMessageServiceClient: outofstoremessagetypes.NewOutOfStoreMessageServiceClient(cc), + cc: cc, + l: bl, + }, nil +} + +func NewOutOfStoreMessageService(opts ...OOSMOption) (OOSMService, error) { + svc := &oosmService{} + + withDefaultOpts := make([]OOSMOption, len(opts)) + copy(withDefaultOpts, opts) + withDefaultOpts = append(withDefaultOpts, WithFallbackDefaults) + for _, opt := range withDefaultOpts { + if err := opt(svc); err != nil { + return nil, err + } + } + + return svc, nil +} + +func (s *oosmService) Close() error { + return nil +} + +func (s *oosmService) Status() (Status, error) { + return Status{}, nil +} + +func (s *oosmService) IpfsCoreAPI() ipfs_interface.CoreAPI { + return nil +} + +func (s *oosmService) OutOfStoreReceive(ctx context.Context, request *protocoltypes.OutOfStoreReceive_Request) (*protocoltypes.OutOfStoreReceive_Reply, error) { + outOfStoreMessage, group, clearPayload, alreadyDecrypted, err := s.secretStore.OpenOutOfStoreMessage(ctx, request.Payload) + if err != nil { + return nil, errcode.ErrCryptoDecrypt.Wrap(err) + } + + return &protocoltypes.OutOfStoreReceive_Reply{ + Message: outOfStoreMessage, + Cleartext: clearPayload, + GroupPublicKey: group.PublicKey, + AlreadyReceived: alreadyDecrypted, + }, nil +} + +// FallBackOption is a structure that permit to fallback to a default option if the option is not set. +type FallBackOption struct { + fallback func(s *oosmService) bool + opt OOSMOption +} + +// WithLogger set the given logger. +var WithLogger = func(l *zap.Logger) OOSMOption { + return func(s *oosmService) error { + s.logger = l + return nil + } +} + +// WithDefaultLogger init a noop logger. +var WithDefaultLogger OOSMOption = func(s *oosmService) error { + s.logger = zap.NewNop() + return nil +} + +var fallbackLogger = FallBackOption{ + fallback: func(s *oosmService) bool { return s.logger == nil }, + opt: WithDefaultLogger, +} + +// WithFallbackLogger set the logger if no logger is set. +var WithFallbackLogger OOSMOption = func(s *oosmService) error { + if fallbackLogger.fallback(s) { + return fallbackLogger.opt(s) + } + return nil +} + +// WithRootDatastore set the root datastore. +var WithRootDatastore = func(ds ds.Datastore) OOSMOption { + return func(s *oosmService) error { + s.rootDatastore = ds + return nil + } +} + +// WithDefaultRootDatastore init a in-memory datastore. +var WithDefaultRootDatastore OOSMOption = func(s *oosmService) error { + s.rootDatastore = ds_sync.MutexWrap(ds.NewMapDatastore()) + return nil +} + +var fallbackRootDatastore = FallBackOption{ + fallback: func(s *oosmService) bool { return s.rootDatastore == nil }, + opt: WithDefaultRootDatastore, +} + +// WithFallbackRootDatastore set the root datastore if no root datastore is set. +var WithFallbackRootDatastore OOSMOption = func(s *oosmService) error { + if fallbackRootDatastore.fallback(s) { + return fallbackRootDatastore.opt(s) + } + return nil +} + +// WithSecretStore set the secret store. +var WithSecretStore = func(ss secretstore.SecretStore) OOSMOption { + return func(s *oosmService) error { + s.secretStore = ss + return nil + } +} + +// WithDefaultSecretStore init a new secret store. +// Call WithRootDatastore before this option if you want to use your datastore. +// Call WithLogger before this option if you want to use your logger. +var WithDefaultSecretStore OOSMOption = func(s *oosmService) error { + // dependency + if err := WithFallbackRootDatastore(s); err != nil { + return err + } + if err := WithFallbackLogger(s); err != nil { + return err + } + + var err error + s.secretStore, err = secretstore.NewSecretStore(s.rootDatastore, &secretstore.NewSecretStoreOptions{ + Logger: s.logger, + }) + return err +} + +var fallbackSecretStore = FallBackOption{ + fallback: func(s *oosmService) bool { return s.secretStore == nil }, + opt: WithDefaultSecretStore, +} + +// WithFallbackSecretStore set the secret store if no secret store is set. +// Call WithRootDatastore before this option if you want to use your datastore if a new secret store is created. +// Call WithLogger before this option if you want to use your logger if a new secret store is created. +var WithFallbackSecretStore OOSMOption = func(s *oosmService) error { + if fallbackSecretStore.fallback(s) { + return fallbackSecretStore.opt(s) + } + return nil +} + +var defaults = []FallBackOption{ + fallbackLogger, + fallbackRootDatastore, + fallbackSecretStore, +} + +// WithFallbackDefaults set the default options if no option is set. +var WithFallbackDefaults OOSMOption = func(s *oosmService) error { + for _, def := range defaults { + if !def.fallback(s) { + continue + } + if err := def.opt(s); err != nil { + return err + } + } + return nil +}