diff --git a/Makefile b/Makefile index d6e8476dd..4f509fee2 100644 --- a/Makefile +++ b/Makefile @@ -60,14 +60,6 @@ gen: --go-grpc_opt=paths=source_relative \ metamorph/metamorph_api/metamorph_api.proto - protoc \ - --proto_path=. \ - --go_out=. \ - --go_opt=paths=source_relative \ - --go-grpc_out=. \ - --go-grpc_opt=paths=source_relative \ - metamorph/health/grpc_health_check.proto - protoc \ --proto_path=. \ --go_out=. \ diff --git a/blocktx/client.go b/blocktx/client.go index 10406fedd..28a4088eb 100644 --- a/blocktx/client.go +++ b/blocktx/client.go @@ -21,6 +21,7 @@ type ClientI interface { GetBlock(ctx context.Context, blockHash *chainhash.Hash) (*blocktx_api.Block, error) GetLastProcessedBlock(ctx context.Context) (*blocktx_api.Block, error) GetMinedTransactionsForBlock(ctx context.Context, blockAndSource *blocktx_api.BlockAndSource) (*blocktx_api.MinedTransactions, error) + Health(ctx context.Context) error } const ( @@ -116,6 +117,15 @@ func (btc *Client) GetMinedTransactionsForBlock(ctx context.Context, blockAndSou return mt, nil } +func (btc *Client) Health(ctx context.Context) error { + _, err := btc.client.Health(ctx, &emptypb.Empty{}) + if err != nil { + return err + } + + return nil +} + func DialGRPC(address string) (*grpc.ClientConn, error) { opts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), diff --git a/cmd/metamorph.go b/cmd/metamorph.go index aa4c9607e..14d71dce9 100644 --- a/cmd/metamorph.go +++ b/cmd/metamorph.go @@ -26,9 +26,11 @@ import ( "github.com/bitcoin-sv/arc/metamorph/store/sqlite" "github.com/libsv/go-p2p" "github.com/ordishs/go-bitcoin" - "github.com/ordishs/go-utils" "github.com/ordishs/go-utils/safemap" "github.com/spf13/viper" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/reflection" ) const ( @@ -81,32 +83,6 @@ func StartMetamorph(logger *slog.Logger) (func(), error) { return nil, fmt.Errorf("no metamorph.listenAddr setting found") } - source := metamorphGRPCListenAddress - if viper.GetBool("metamorph.network.fixedIp") { - ip, port, err := net.SplitHostPort(metamorphGRPCListenAddress) - if err != nil { - return nil, fmt.Errorf("cannot parse ip address: %v", err) - } - - if ip != "" { - source = metamorphGRPCListenAddress - } else { - hint := viper.GetString("metamorph.network.ipAddressHint") - ips, err := utils.GetIPAddressesWithHint(hint) - if err != nil { - return nil, fmt.Errorf("cannot get local ip address") - } - - if len(ips) != 1 { - return nil, fmt.Errorf("cannot determine local ip address [%v]", ips) - } - - source = fmt.Sprintf("%s:%s", ips[0], port) - } - - logger.Info("Instance will register transactions with location", "source", source) - } - pm, statusMessageCh, err := initPeerManager(logger, s) if err != nil { return nil, err @@ -222,7 +198,7 @@ func StartMetamorph(logger *slog.Logger) (func(), error) { opts = append(opts, metamorph.WithBlocktxTimeout(btxTimeout)) } - serv := metamorph.NewServer(s, metamorphProcessor, btx, source, opts...) + serv := metamorph.NewServer(s, metamorphProcessor, btx, opts...) go func() { grpcMessageSize := viper.GetInt("grpcMessageSize") @@ -264,6 +240,13 @@ func StartMetamorph(logger *slog.Logger) (func(), error) { // pass all the started peers to the collector _ = metamorph.NewZMQCollector(zmqCollector) + go func() { + err = StartHealthServer(serv) + if err != nil { + logger.Error("failed to start health server", slog.String("err", err.Error())) + } + }() + return func() { logger.Info("Shutting down metamorph") @@ -276,6 +259,32 @@ func StartMetamorph(logger *slog.Logger) (func(), error) { }, nil } +func StartHealthServer(serv *metamorph.Server) error { + gs := grpc.NewServer() + defer gs.Stop() + + grpc_health_v1.RegisterHealthServer(gs, serv) // registration + // register your own services + reflection.Register(gs) + + address, err := config.GetString("metamorph.healthServerDialAddr") //"localhost:8005" + if err != nil { + return err + } + + listener, err := net.Listen("tcp", address) + if err != nil { + return err + } + + err = gs.Serve(listener) + if err != nil { + return err + } + + return nil +} + func NewStore(dbMode string, folder string) (s store.MetamorphStore, err error) { switch dbMode { case DbModePostgres: diff --git a/config.yaml b/config.yaml index 4fe9531d0..c0deaa6f4 100644 --- a/config.yaml +++ b/config.yaml @@ -58,11 +58,9 @@ metamorph: checkUtxos: false # force check each utxo for validity. If enabled ARC connects to bitcoin node using rpc for each utxo statsKeypress: false # enable stats keypress. If enabled pressing any key will print stats to stdout profilerAddr: localhost:9992 # address to start profiler server on - network: - fixedIp: true # Denotes if ARC is running with fixed IP addresses - ipAddressHint: ^172.28.* blocktxTimeout: 1s # timeout for blocktx service checkIfMinedInterval: 1m + healthServerDialAddr: localhost:8005 blocktx: listenAddr: localhost:8011 # address space for blocktx to listen on. Can be for example localhost:8011 or :8011 for listening on all addresses diff --git a/go.sum b/go.sum index 3a413634a..71d40bad1 100644 --- a/go.sum +++ b/go.sum @@ -456,8 +456,6 @@ github.com/libsv/go-bt v1.0.8 h1:nWLLcnUm0dxNO3exqrL5jvAcTGkl0dsnBuQqB6+M6vQ= github.com/libsv/go-bt v1.0.8/go.mod h1:yO023bNYLh5DwcOYl+ZqLAeTemoy6K+2UbQlIBMv+EQ= github.com/libsv/go-bt/v2 v2.2.5 h1:VoggBLMRW9NYoFujqe5bSYKqnw5y+fYfufgERSoubog= github.com/libsv/go-bt/v2 v2.2.5/go.mod h1:cV45+jDlPOLfhJLfpLmpQoWzrIvVth9Ao2ZO1f6CcqU= -github.com/libsv/go-p2p v0.1.5 h1:zbUE1UQ73J7LN/s3gruQBTh3Sz0DSSmj3cWhC1ZQVM0= -github.com/libsv/go-p2p v0.1.5/go.mod h1:9KhX8e+3oEmGiYQSeF/CrHj22YNHqiof3TH77VqcMCs= github.com/libsv/go-p2p v0.1.6 h1:YHlxAiuCSq7k+eyk7haD6M6v2+dTpo/F0pxCzOcFLI4= github.com/libsv/go-p2p v0.1.6/go.mod h1:9KhX8e+3oEmGiYQSeF/CrHj22YNHqiof3TH77VqcMCs= github.com/lmittmann/tint v1.0.3 h1:W5PHeA2D8bBJVvabNfQD/XW9HPLZK1XoPZH0cq8NouQ= diff --git a/metamorph/health/grpc_health_check.pb.go b/metamorph/health/grpc_health_check.pb.go deleted file mode 100644 index b003f3e7c..000000000 --- a/metamorph/health/grpc_health_check.pb.go +++ /dev/null @@ -1,287 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.31.0 -// protoc v4.25.1 -// source: metamorph/health/grpc_health_check.proto - -package healthcheck - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type HealthCheckResponse_ServingStatus int32 - -const ( - HealthCheckResponse_UNKNOWN HealthCheckResponse_ServingStatus = 0 - HealthCheckResponse_SERVING HealthCheckResponse_ServingStatus = 1 - HealthCheckResponse_NOT_SERVING HealthCheckResponse_ServingStatus = 2 - HealthCheckResponse_SERVICE_UNKNOWN HealthCheckResponse_ServingStatus = 3 // Used only by the Watch method. -) - -// Enum value maps for HealthCheckResponse_ServingStatus. -var ( - HealthCheckResponse_ServingStatus_name = map[int32]string{ - 0: "UNKNOWN", - 1: "SERVING", - 2: "NOT_SERVING", - 3: "SERVICE_UNKNOWN", - } - HealthCheckResponse_ServingStatus_value = map[string]int32{ - "UNKNOWN": 0, - "SERVING": 1, - "NOT_SERVING": 2, - "SERVICE_UNKNOWN": 3, - } -) - -func (x HealthCheckResponse_ServingStatus) Enum() *HealthCheckResponse_ServingStatus { - p := new(HealthCheckResponse_ServingStatus) - *p = x - return p -} - -func (x HealthCheckResponse_ServingStatus) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (HealthCheckResponse_ServingStatus) Descriptor() protoreflect.EnumDescriptor { - return file_metamorph_health_grpc_health_check_proto_enumTypes[0].Descriptor() -} - -func (HealthCheckResponse_ServingStatus) Type() protoreflect.EnumType { - return &file_metamorph_health_grpc_health_check_proto_enumTypes[0] -} - -func (x HealthCheckResponse_ServingStatus) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use HealthCheckResponse_ServingStatus.Descriptor instead. -func (HealthCheckResponse_ServingStatus) EnumDescriptor() ([]byte, []int) { - return file_metamorph_health_grpc_health_check_proto_rawDescGZIP(), []int{1, 0} -} - -type HealthCheckRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` -} - -func (x *HealthCheckRequest) Reset() { - *x = HealthCheckRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_metamorph_health_grpc_health_check_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *HealthCheckRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*HealthCheckRequest) ProtoMessage() {} - -func (x *HealthCheckRequest) ProtoReflect() protoreflect.Message { - mi := &file_metamorph_health_grpc_health_check_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use HealthCheckRequest.ProtoReflect.Descriptor instead. -func (*HealthCheckRequest) Descriptor() ([]byte, []int) { - return file_metamorph_health_grpc_health_check_proto_rawDescGZIP(), []int{0} -} - -func (x *HealthCheckRequest) GetService() string { - if x != nil { - return x.Service - } - return "" -} - -type HealthCheckResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Status HealthCheckResponse_ServingStatus `protobuf:"varint,1,opt,name=status,proto3,enum=grpc.health.v1.HealthCheckResponse_ServingStatus" json:"status,omitempty"` -} - -func (x *HealthCheckResponse) Reset() { - *x = HealthCheckResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_metamorph_health_grpc_health_check_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *HealthCheckResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*HealthCheckResponse) ProtoMessage() {} - -func (x *HealthCheckResponse) ProtoReflect() protoreflect.Message { - mi := &file_metamorph_health_grpc_health_check_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use HealthCheckResponse.ProtoReflect.Descriptor instead. -func (*HealthCheckResponse) Descriptor() ([]byte, []int) { - return file_metamorph_health_grpc_health_check_proto_rawDescGZIP(), []int{1} -} - -func (x *HealthCheckResponse) GetStatus() HealthCheckResponse_ServingStatus { - if x != nil { - return x.Status - } - return HealthCheckResponse_UNKNOWN -} - -var File_metamorph_health_grpc_health_check_proto protoreflect.FileDescriptor - -var file_metamorph_health_grpc_health_check_proto_rawDesc = []byte{ - 0x0a, 0x28, 0x6d, 0x65, 0x74, 0x61, 0x6d, 0x6f, 0x72, 0x70, 0x68, 0x2f, 0x68, 0x65, 0x61, 0x6c, - 0x74, 0x68, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x5f, 0x63, - 0x68, 0x65, 0x63, 0x6b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0e, 0x67, 0x72, 0x70, 0x63, - 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x22, 0x2e, 0x0a, 0x12, 0x48, 0x65, - 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x22, 0xb1, 0x01, 0x0a, 0x13, 0x48, - 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x49, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0e, 0x32, 0x31, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, - 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x53, - 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x4f, 0x0a, - 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, - 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x53, - 0x45, 0x52, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x4e, 0x4f, 0x54, 0x5f, - 0x53, 0x45, 0x52, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x02, 0x12, 0x13, 0x0a, 0x0f, 0x53, 0x45, 0x52, - 0x56, 0x49, 0x43, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x03, 0x32, 0xae, - 0x01, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x50, 0x0a, 0x05, 0x43, 0x68, 0x65, - 0x63, 0x6b, 0x12, 0x22, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, - 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, - 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, - 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x52, 0x0a, 0x05, 0x57, - 0x61, 0x74, 0x63, 0x68, 0x12, 0x22, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, - 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, - 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, - 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, - 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, - 0x0f, 0x5a, 0x0d, 0x2e, 0x3b, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x63, 0x68, 0x65, 0x63, 0x6b, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_metamorph_health_grpc_health_check_proto_rawDescOnce sync.Once - file_metamorph_health_grpc_health_check_proto_rawDescData = file_metamorph_health_grpc_health_check_proto_rawDesc -) - -func file_metamorph_health_grpc_health_check_proto_rawDescGZIP() []byte { - file_metamorph_health_grpc_health_check_proto_rawDescOnce.Do(func() { - file_metamorph_health_grpc_health_check_proto_rawDescData = protoimpl.X.CompressGZIP(file_metamorph_health_grpc_health_check_proto_rawDescData) - }) - return file_metamorph_health_grpc_health_check_proto_rawDescData -} - -var file_metamorph_health_grpc_health_check_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_metamorph_health_grpc_health_check_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_metamorph_health_grpc_health_check_proto_goTypes = []interface{}{ - (HealthCheckResponse_ServingStatus)(0), // 0: grpc.health.v1.HealthCheckResponse.ServingStatus - (*HealthCheckRequest)(nil), // 1: grpc.health.v1.HealthCheckRequest - (*HealthCheckResponse)(nil), // 2: grpc.health.v1.HealthCheckResponse -} -var file_metamorph_health_grpc_health_check_proto_depIdxs = []int32{ - 0, // 0: grpc.health.v1.HealthCheckResponse.status:type_name -> grpc.health.v1.HealthCheckResponse.ServingStatus - 1, // 1: grpc.health.v1.Health.Check:input_type -> grpc.health.v1.HealthCheckRequest - 1, // 2: grpc.health.v1.Health.Watch:input_type -> grpc.health.v1.HealthCheckRequest - 2, // 3: grpc.health.v1.Health.Check:output_type -> grpc.health.v1.HealthCheckResponse - 2, // 4: grpc.health.v1.Health.Watch:output_type -> grpc.health.v1.HealthCheckResponse - 3, // [3:5] is the sub-list for method output_type - 1, // [1:3] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name -} - -func init() { file_metamorph_health_grpc_health_check_proto_init() } -func file_metamorph_health_grpc_health_check_proto_init() { - if File_metamorph_health_grpc_health_check_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_metamorph_health_grpc_health_check_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*HealthCheckRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_metamorph_health_grpc_health_check_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*HealthCheckResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_metamorph_health_grpc_health_check_proto_rawDesc, - NumEnums: 1, - NumMessages: 2, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_metamorph_health_grpc_health_check_proto_goTypes, - DependencyIndexes: file_metamorph_health_grpc_health_check_proto_depIdxs, - EnumInfos: file_metamorph_health_grpc_health_check_proto_enumTypes, - MessageInfos: file_metamorph_health_grpc_health_check_proto_msgTypes, - }.Build() - File_metamorph_health_grpc_health_check_proto = out.File - file_metamorph_health_grpc_health_check_proto_rawDesc = nil - file_metamorph_health_grpc_health_check_proto_goTypes = nil - file_metamorph_health_grpc_health_check_proto_depIdxs = nil -} diff --git a/metamorph/health/grpc_health_check.proto b/metamorph/health/grpc_health_check.proto deleted file mode 100644 index 477ac1bd5..000000000 --- a/metamorph/health/grpc_health_check.proto +++ /dev/null @@ -1,25 +0,0 @@ -syntax = "proto3"; - -option go_package = ".;healthcheck"; - -package grpc.health.v1; - -message HealthCheckRequest { - string service = 1; -} - -message HealthCheckResponse { - enum ServingStatus { - UNKNOWN = 0; - SERVING = 1; - NOT_SERVING = 2; - SERVICE_UNKNOWN = 3; // Used only by the Watch method. - } - ServingStatus status = 1; -} - -service Health { - rpc Check(HealthCheckRequest) returns (HealthCheckResponse); - - rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); -} diff --git a/metamorph/health/grpc_health_check_grpc.pb.go b/metamorph/health/grpc_health_check_grpc.pb.go deleted file mode 100644 index a12be8bc8..000000000 --- a/metamorph/health/grpc_health_check_grpc.pb.go +++ /dev/null @@ -1,174 +0,0 @@ -// Code generated by protoc-gen-go-grpc. DO NOT EDIT. -// versions: -// - protoc-gen-go-grpc v1.3.0 -// - protoc v4.25.1 -// source: metamorph/health/grpc_health_check.proto - -package healthcheck - -import ( - context "context" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" -) - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 - -const ( - Health_Check_FullMethodName = "/grpc.health.v1.Health/Check" - Health_Watch_FullMethodName = "/grpc.health.v1.Health/Watch" -) - -// HealthClient is the client API for Health service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type HealthClient interface { - Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) - Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (Health_WatchClient, error) -} - -type healthClient struct { - cc grpc.ClientConnInterface -} - -func NewHealthClient(cc grpc.ClientConnInterface) HealthClient { - return &healthClient{cc} -} - -func (c *healthClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) { - out := new(HealthCheckResponse) - err := c.cc.Invoke(ctx, Health_Check_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *healthClient) Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (Health_WatchClient, error) { - stream, err := c.cc.NewStream(ctx, &Health_ServiceDesc.Streams[0], Health_Watch_FullMethodName, opts...) - if err != nil { - return nil, err - } - x := &healthWatchClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -type Health_WatchClient interface { - Recv() (*HealthCheckResponse, error) - grpc.ClientStream -} - -type healthWatchClient struct { - grpc.ClientStream -} - -func (x *healthWatchClient) Recv() (*HealthCheckResponse, error) { - m := new(HealthCheckResponse) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// HealthServer is the server API for Health service. -// All implementations must embed UnimplementedHealthServer -// for forward compatibility -type HealthServer interface { - Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) - Watch(*HealthCheckRequest, Health_WatchServer) error - mustEmbedUnimplementedHealthServer() -} - -// UnimplementedHealthServer must be embedded to have forward compatible implementations. -type UnimplementedHealthServer struct { -} - -func (UnimplementedHealthServer) Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Check not implemented") -} -func (UnimplementedHealthServer) Watch(*HealthCheckRequest, Health_WatchServer) error { - return status.Errorf(codes.Unimplemented, "method Watch not implemented") -} -func (UnimplementedHealthServer) mustEmbedUnimplementedHealthServer() {} - -// UnsafeHealthServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to HealthServer will -// result in compilation errors. -type UnsafeHealthServer interface { - mustEmbedUnimplementedHealthServer() -} - -func RegisterHealthServer(s grpc.ServiceRegistrar, srv HealthServer) { - s.RegisterService(&Health_ServiceDesc, srv) -} - -func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(HealthCheckRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(HealthServer).Check(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Health_Check_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(HealthServer).Check(ctx, req.(*HealthCheckRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Health_Watch_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(HealthCheckRequest) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(HealthServer).Watch(m, &healthWatchServer{stream}) -} - -type Health_WatchServer interface { - Send(*HealthCheckResponse) error - grpc.ServerStream -} - -type healthWatchServer struct { - grpc.ServerStream -} - -func (x *healthWatchServer) Send(m *HealthCheckResponse) error { - return x.ServerStream.SendMsg(m) -} - -// Health_ServiceDesc is the grpc.ServiceDesc for Health service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var Health_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "grpc.health.v1.Health", - HandlerType: (*HealthServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "Check", - Handler: _Health_Check_Handler, - }, - }, - Streams: []grpc.StreamDesc{ - { - StreamName: "Watch", - Handler: _Health_Watch_Handler, - ServerStreams: true, - }, - }, - Metadata: "metamorph/health/grpc_health_check.proto", -} diff --git a/metamorph/health_check.go b/metamorph/health_check.go index be82f19c6..429c31b45 100644 --- a/metamorph/health_check.go +++ b/metamorph/health_check.go @@ -4,21 +4,83 @@ import ( "context" "log/slog" + "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" ) +const ( + readiness = "readiness" +) + +//go:generate moq -pkg mocks -out ./mocks/health_watch_server_mock.go . HealthWatchServer +type HealthWatchServer interface { + Send(*grpc_health_v1.HealthCheckResponse) error + grpc.ServerStream +} + func (s *Server) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { s.logger.Info("checking health", slog.String("service", req.Service)) + if req.Service == readiness { + err := s.store.Ping(ctx) + if err != nil { + s.logger.Error("no connection to DB", slog.String("err", err.Error())) + return &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + }, nil + } + + err = s.processor.Health() + if err != nil { + s.logger.Error("processor unhealthy", slog.String("err", err.Error())) + return &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + }, nil + } + + err = s.btc.Health(ctx) + if err != nil { + s.logger.Error("btc connection unhealthy", slog.String("err", err.Error())) + return &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + }, nil + } + } + return &grpc_health_v1.HealthCheckResponse{ Status: grpc_health_v1.HealthCheckResponse_SERVING, }, nil } func (s *Server) Watch(req *grpc_health_v1.HealthCheckRequest, server grpc_health_v1.Health_WatchServer) error { - s.logger.Info("watching health", slog.String("service", req.Service)) + ctx := context.Background() + if req.Service == readiness { + err := s.store.Ping(ctx) + if err != nil { + s.logger.Error("no connection to DB", slog.String("err", err.Error())) + return server.Send(&grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + }) + } + + err = s.processor.Health() + if err != nil { + s.logger.Error("processor unhealthy", slog.String("err", err.Error())) + return server.Send(&grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + }) + } + + err = s.btc.Health(ctx) + if err != nil { + s.logger.Error("btc connection unhealthy", slog.String("err", err.Error())) + return server.Send(&grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + }) + } + } return server.Send(&grpc_health_v1.HealthCheckResponse{ Status: grpc_health_v1.HealthCheckResponse_SERVING, diff --git a/metamorph/health_check_test.go b/metamorph/health_check_test.go new file mode 100644 index 000000000..876b1bb64 --- /dev/null +++ b/metamorph/health_check_test.go @@ -0,0 +1,177 @@ +package metamorph_test + +import ( + "context" + "errors" + "testing" + + "github.com/bitcoin-sv/arc/metamorph" + "github.com/bitcoin-sv/arc/metamorph/mocks" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/health/grpc_health_v1" +) + +func TestCheck(t *testing.T) { + tt := []struct { + name string + service string + pingErr error + processorHealthErr error + clientHealthErr error + + expectedStatus grpc_health_v1.HealthCheckResponse_ServingStatus + }{ + { + name: "liveness - healthy", + service: "liveness", + + expectedStatus: grpc_health_v1.HealthCheckResponse_SERVING, + }, + { + name: "readiness - healthy", + service: "readiness", + + expectedStatus: grpc_health_v1.HealthCheckResponse_SERVING, + }, + { + name: "readiness - ping error", + service: "readiness", + pingErr: errors.New("no connection"), + + expectedStatus: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + }, + { + name: "readiness - unhealthy processor", + service: "readiness", + processorHealthErr: errors.New("unhealthy processor"), + + expectedStatus: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + }, + { + name: "readiness - unhealthy blocktx client connection", + service: "readiness", + clientHealthErr: errors.New("unhealthy blocktx client connection"), + + expectedStatus: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + + metamorphStore := &mocks.MetamorphStoreMock{ + PingFunc: func(ctx context.Context) error { + return tc.pingErr + }, + } + + req := &grpc_health_v1.HealthCheckRequest{ + Service: tc.service, + } + + processor := &mocks.ProcessorIMock{ + HealthFunc: func() error { + return tc.processorHealthErr + }, + } + + btx := &mocks.ClientIMock{ + HealthFunc: func(ctx context.Context) error { + return tc.clientHealthErr + }, + } + + server := metamorph.NewServer(metamorphStore, processor, btx) + + resp, err := server.Check(context.Background(), req) + require.NoError(t, err) + + require.Equal(t, tc.expectedStatus, resp.Status) + }) + } +} + +func TestWatch(t *testing.T) { + tt := []struct { + name string + service string + pingErr error + processorHealthErr error + clientHealthErr error + + expectedStatus grpc_health_v1.HealthCheckResponse_ServingStatus + }{ + { + name: "liveness - healthy", + service: "liveness", + + expectedStatus: grpc_health_v1.HealthCheckResponse_SERVING, + }, + { + name: "readiness - healty", + service: "readiness", + + expectedStatus: grpc_health_v1.HealthCheckResponse_SERVING, + }, + { + name: "readiness - ping error", + service: "readiness", + pingErr: errors.New("no connection"), + + expectedStatus: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + }, + { + name: "readiness - unhealthy processor", + service: "readiness", + processorHealthErr: errors.New("unhealthy processor"), + + expectedStatus: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + }, + { + name: "readiness - unhealthy blocktx client connection", + service: "readiness", + clientHealthErr: errors.New("unhealthy blocktx client connection"), + + expectedStatus: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + + metamorphStore := &mocks.MetamorphStoreMock{ + PingFunc: func(ctx context.Context) error { + return tc.pingErr + }, + } + + req := &grpc_health_v1.HealthCheckRequest{ + Service: tc.service, + } + + processor := &mocks.ProcessorIMock{ + HealthFunc: func() error { + return tc.processorHealthErr + }, + } + + btx := &mocks.ClientIMock{ + HealthFunc: func(ctx context.Context) error { + return tc.clientHealthErr + }, + } + + server := metamorph.NewServer(metamorphStore, processor, btx) + + watchServer := &mocks.HealthWatchServerMock{ + SendFunc: func(healthCheckResponse *grpc_health_v1.HealthCheckResponse) error { + require.Equal(t, tc.expectedStatus, healthCheckResponse.Status) + return nil + }, + } + + err := server.Watch(req, watchServer) + require.NoError(t, err) + }) + } +} diff --git a/metamorph/mocks/blocktx_mock.go b/metamorph/mocks/blocktx_mock.go index c7d14ab87..cf6fd7c34 100644 --- a/metamorph/mocks/blocktx_mock.go +++ b/metamorph/mocks/blocktx_mock.go @@ -39,6 +39,9 @@ var _ blocktx.ClientI = &ClientIMock{} // GetTransactionMerklePathFunc: func(ctx context.Context, transaction *blocktx_api.Transaction) (string, error) { // panic("mock out the GetTransactionMerklePath method") // }, +// HealthFunc: func(ctx context.Context) error { +// panic("mock out the Health method") +// }, // } // // // use mockedClientI in code that requires blocktx.ClientI @@ -64,6 +67,9 @@ type ClientIMock struct { // GetTransactionMerklePathFunc mocks the GetTransactionMerklePath method. GetTransactionMerklePathFunc func(ctx context.Context, transaction *blocktx_api.Transaction) (string, error) + // HealthFunc mocks the Health method. + HealthFunc func(ctx context.Context) error + // calls tracks calls to the methods. calls struct { // GetBlock holds details about calls to the GetBlock method. @@ -106,6 +112,11 @@ type ClientIMock struct { // Transaction is the transaction argument value. Transaction *blocktx_api.Transaction } + // Health holds details about calls to the Health method. + Health []struct { + // Ctx is the ctx argument value. + Ctx context.Context + } } lockGetBlock sync.RWMutex lockGetLastProcessedBlock sync.RWMutex @@ -113,6 +124,7 @@ type ClientIMock struct { lockGetTransactionBlock sync.RWMutex lockGetTransactionBlocks sync.RWMutex lockGetTransactionMerklePath sync.RWMutex + lockHealth sync.RWMutex } // GetBlock calls GetBlockFunc. @@ -326,3 +338,35 @@ func (mock *ClientIMock) GetTransactionMerklePathCalls() []struct { mock.lockGetTransactionMerklePath.RUnlock() return calls } + +// Health calls HealthFunc. +func (mock *ClientIMock) Health(ctx context.Context) error { + if mock.HealthFunc == nil { + panic("ClientIMock.HealthFunc: method is nil but ClientI.Health was just called") + } + callInfo := struct { + Ctx context.Context + }{ + Ctx: ctx, + } + mock.lockHealth.Lock() + mock.calls.Health = append(mock.calls.Health, callInfo) + mock.lockHealth.Unlock() + return mock.HealthFunc(ctx) +} + +// HealthCalls gets all the calls that were made to Health. +// Check the length with: +// +// len(mockedClientI.HealthCalls()) +func (mock *ClientIMock) HealthCalls() []struct { + Ctx context.Context +} { + var calls []struct { + Ctx context.Context + } + mock.lockHealth.RLock() + calls = mock.calls.Health + mock.lockHealth.RUnlock() + return calls +} diff --git a/metamorph/mocks/health_watch_server_mock.go b/metamorph/mocks/health_watch_server_mock.go new file mode 100644 index 000000000..aa56b9806 --- /dev/null +++ b/metamorph/mocks/health_watch_server_mock.go @@ -0,0 +1,335 @@ +// Code generated by moq; DO NOT EDIT. +// github.com/matryer/moq + +package mocks + +import ( + "context" + "github.com/bitcoin-sv/arc/metamorph" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/metadata" + "sync" +) + +// Ensure, that HealthWatchServerMock does implement metamorph.HealthWatchServer. +// If this is not the case, regenerate this file with moq. +var _ metamorph.HealthWatchServer = &HealthWatchServerMock{} + +// HealthWatchServerMock is a mock implementation of metamorph.HealthWatchServer. +// +// func TestSomethingThatUsesHealthWatchServer(t *testing.T) { +// +// // make and configure a mocked metamorph.HealthWatchServer +// mockedHealthWatchServer := &HealthWatchServerMock{ +// ContextFunc: func() context.Context { +// panic("mock out the Context method") +// }, +// RecvMsgFunc: func(m interface{}) error { +// panic("mock out the RecvMsg method") +// }, +// SendFunc: func(healthCheckResponse *grpc_health_v1.HealthCheckResponse) error { +// panic("mock out the Send method") +// }, +// SendHeaderFunc: func(mD metadata.MD) error { +// panic("mock out the SendHeader method") +// }, +// SendMsgFunc: func(m interface{}) error { +// panic("mock out the SendMsg method") +// }, +// SetHeaderFunc: func(mD metadata.MD) error { +// panic("mock out the SetHeader method") +// }, +// SetTrailerFunc: func(mD metadata.MD) { +// panic("mock out the SetTrailer method") +// }, +// } +// +// // use mockedHealthWatchServer in code that requires metamorph.HealthWatchServer +// // and then make assertions. +// +// } +type HealthWatchServerMock struct { + // ContextFunc mocks the Context method. + ContextFunc func() context.Context + + // RecvMsgFunc mocks the RecvMsg method. + RecvMsgFunc func(m interface{}) error + + // SendFunc mocks the Send method. + SendFunc func(healthCheckResponse *grpc_health_v1.HealthCheckResponse) error + + // SendHeaderFunc mocks the SendHeader method. + SendHeaderFunc func(mD metadata.MD) error + + // SendMsgFunc mocks the SendMsg method. + SendMsgFunc func(m interface{}) error + + // SetHeaderFunc mocks the SetHeader method. + SetHeaderFunc func(mD metadata.MD) error + + // SetTrailerFunc mocks the SetTrailer method. + SetTrailerFunc func(mD metadata.MD) + + // calls tracks calls to the methods. + calls struct { + // Context holds details about calls to the Context method. + Context []struct { + } + // RecvMsg holds details about calls to the RecvMsg method. + RecvMsg []struct { + // M is the m argument value. + M interface{} + } + // Send holds details about calls to the Send method. + Send []struct { + // HealthCheckResponse is the healthCheckResponse argument value. + HealthCheckResponse *grpc_health_v1.HealthCheckResponse + } + // SendHeader holds details about calls to the SendHeader method. + SendHeader []struct { + // MD is the mD argument value. + MD metadata.MD + } + // SendMsg holds details about calls to the SendMsg method. + SendMsg []struct { + // M is the m argument value. + M interface{} + } + // SetHeader holds details about calls to the SetHeader method. + SetHeader []struct { + // MD is the mD argument value. + MD metadata.MD + } + // SetTrailer holds details about calls to the SetTrailer method. + SetTrailer []struct { + // MD is the mD argument value. + MD metadata.MD + } + } + lockContext sync.RWMutex + lockRecvMsg sync.RWMutex + lockSend sync.RWMutex + lockSendHeader sync.RWMutex + lockSendMsg sync.RWMutex + lockSetHeader sync.RWMutex + lockSetTrailer sync.RWMutex +} + +// Context calls ContextFunc. +func (mock *HealthWatchServerMock) Context() context.Context { + if mock.ContextFunc == nil { + panic("HealthWatchServerMock.ContextFunc: method is nil but HealthWatchServer.Context was just called") + } + callInfo := struct { + }{} + mock.lockContext.Lock() + mock.calls.Context = append(mock.calls.Context, callInfo) + mock.lockContext.Unlock() + return mock.ContextFunc() +} + +// ContextCalls gets all the calls that were made to Context. +// Check the length with: +// +// len(mockedHealthWatchServer.ContextCalls()) +func (mock *HealthWatchServerMock) ContextCalls() []struct { +} { + var calls []struct { + } + mock.lockContext.RLock() + calls = mock.calls.Context + mock.lockContext.RUnlock() + return calls +} + +// RecvMsg calls RecvMsgFunc. +func (mock *HealthWatchServerMock) RecvMsg(m interface{}) error { + if mock.RecvMsgFunc == nil { + panic("HealthWatchServerMock.RecvMsgFunc: method is nil but HealthWatchServer.RecvMsg was just called") + } + callInfo := struct { + M interface{} + }{ + M: m, + } + mock.lockRecvMsg.Lock() + mock.calls.RecvMsg = append(mock.calls.RecvMsg, callInfo) + mock.lockRecvMsg.Unlock() + return mock.RecvMsgFunc(m) +} + +// RecvMsgCalls gets all the calls that were made to RecvMsg. +// Check the length with: +// +// len(mockedHealthWatchServer.RecvMsgCalls()) +func (mock *HealthWatchServerMock) RecvMsgCalls() []struct { + M interface{} +} { + var calls []struct { + M interface{} + } + mock.lockRecvMsg.RLock() + calls = mock.calls.RecvMsg + mock.lockRecvMsg.RUnlock() + return calls +} + +// Send calls SendFunc. +func (mock *HealthWatchServerMock) Send(healthCheckResponse *grpc_health_v1.HealthCheckResponse) error { + if mock.SendFunc == nil { + panic("HealthWatchServerMock.SendFunc: method is nil but HealthWatchServer.Send was just called") + } + callInfo := struct { + HealthCheckResponse *grpc_health_v1.HealthCheckResponse + }{ + HealthCheckResponse: healthCheckResponse, + } + mock.lockSend.Lock() + mock.calls.Send = append(mock.calls.Send, callInfo) + mock.lockSend.Unlock() + return mock.SendFunc(healthCheckResponse) +} + +// SendCalls gets all the calls that were made to Send. +// Check the length with: +// +// len(mockedHealthWatchServer.SendCalls()) +func (mock *HealthWatchServerMock) SendCalls() []struct { + HealthCheckResponse *grpc_health_v1.HealthCheckResponse +} { + var calls []struct { + HealthCheckResponse *grpc_health_v1.HealthCheckResponse + } + mock.lockSend.RLock() + calls = mock.calls.Send + mock.lockSend.RUnlock() + return calls +} + +// SendHeader calls SendHeaderFunc. +func (mock *HealthWatchServerMock) SendHeader(mD metadata.MD) error { + if mock.SendHeaderFunc == nil { + panic("HealthWatchServerMock.SendHeaderFunc: method is nil but HealthWatchServer.SendHeader was just called") + } + callInfo := struct { + MD metadata.MD + }{ + MD: mD, + } + mock.lockSendHeader.Lock() + mock.calls.SendHeader = append(mock.calls.SendHeader, callInfo) + mock.lockSendHeader.Unlock() + return mock.SendHeaderFunc(mD) +} + +// SendHeaderCalls gets all the calls that were made to SendHeader. +// Check the length with: +// +// len(mockedHealthWatchServer.SendHeaderCalls()) +func (mock *HealthWatchServerMock) SendHeaderCalls() []struct { + MD metadata.MD +} { + var calls []struct { + MD metadata.MD + } + mock.lockSendHeader.RLock() + calls = mock.calls.SendHeader + mock.lockSendHeader.RUnlock() + return calls +} + +// SendMsg calls SendMsgFunc. +func (mock *HealthWatchServerMock) SendMsg(m interface{}) error { + if mock.SendMsgFunc == nil { + panic("HealthWatchServerMock.SendMsgFunc: method is nil but HealthWatchServer.SendMsg was just called") + } + callInfo := struct { + M interface{} + }{ + M: m, + } + mock.lockSendMsg.Lock() + mock.calls.SendMsg = append(mock.calls.SendMsg, callInfo) + mock.lockSendMsg.Unlock() + return mock.SendMsgFunc(m) +} + +// SendMsgCalls gets all the calls that were made to SendMsg. +// Check the length with: +// +// len(mockedHealthWatchServer.SendMsgCalls()) +func (mock *HealthWatchServerMock) SendMsgCalls() []struct { + M interface{} +} { + var calls []struct { + M interface{} + } + mock.lockSendMsg.RLock() + calls = mock.calls.SendMsg + mock.lockSendMsg.RUnlock() + return calls +} + +// SetHeader calls SetHeaderFunc. +func (mock *HealthWatchServerMock) SetHeader(mD metadata.MD) error { + if mock.SetHeaderFunc == nil { + panic("HealthWatchServerMock.SetHeaderFunc: method is nil but HealthWatchServer.SetHeader was just called") + } + callInfo := struct { + MD metadata.MD + }{ + MD: mD, + } + mock.lockSetHeader.Lock() + mock.calls.SetHeader = append(mock.calls.SetHeader, callInfo) + mock.lockSetHeader.Unlock() + return mock.SetHeaderFunc(mD) +} + +// SetHeaderCalls gets all the calls that were made to SetHeader. +// Check the length with: +// +// len(mockedHealthWatchServer.SetHeaderCalls()) +func (mock *HealthWatchServerMock) SetHeaderCalls() []struct { + MD metadata.MD +} { + var calls []struct { + MD metadata.MD + } + mock.lockSetHeader.RLock() + calls = mock.calls.SetHeader + mock.lockSetHeader.RUnlock() + return calls +} + +// SetTrailer calls SetTrailerFunc. +func (mock *HealthWatchServerMock) SetTrailer(mD metadata.MD) { + if mock.SetTrailerFunc == nil { + panic("HealthWatchServerMock.SetTrailerFunc: method is nil but HealthWatchServer.SetTrailer was just called") + } + callInfo := struct { + MD metadata.MD + }{ + MD: mD, + } + mock.lockSetTrailer.Lock() + mock.calls.SetTrailer = append(mock.calls.SetTrailer, callInfo) + mock.lockSetTrailer.Unlock() + mock.SetTrailerFunc(mD) +} + +// SetTrailerCalls gets all the calls that were made to SetTrailer. +// Check the length with: +// +// len(mockedHealthWatchServer.SetTrailerCalls()) +func (mock *HealthWatchServerMock) SetTrailerCalls() []struct { + MD metadata.MD +} { + var calls []struct { + MD metadata.MD + } + mock.lockSetTrailer.RLock() + calls = mock.calls.SetTrailer + mock.lockSetTrailer.RUnlock() + return calls +} diff --git a/metamorph/mocks/processor_mock.go b/metamorph/mocks/processor_mock.go index a9d616e0a..39aa6b2be 100644 --- a/metamorph/mocks/processor_mock.go +++ b/metamorph/mocks/processor_mock.go @@ -27,6 +27,9 @@ var _ metamorph.ProcessorI = &ProcessorIMock{} // GetStatsFunc: func(debugItems bool) *metamorph.ProcessorStats { // panic("mock out the GetStats method") // }, +// HealthFunc: func() error { +// panic("mock out the Health method") +// }, // LoadUnminedFunc: func() { // panic("mock out the LoadUnmined method") // }, @@ -58,6 +61,9 @@ type ProcessorIMock struct { // GetStatsFunc mocks the GetStats method. GetStatsFunc func(debugItems bool) *metamorph.ProcessorStats + // HealthFunc mocks the Health method. + HealthFunc func() error + // LoadUnminedFunc mocks the LoadUnmined method. LoadUnminedFunc func() @@ -86,6 +92,9 @@ type ProcessorIMock struct { // DebugItems is the debugItems argument value. DebugItems bool } + // Health holds details about calls to the Health method. + Health []struct { + } // LoadUnmined holds details about calls to the LoadUnmined method. LoadUnmined []struct { } @@ -129,6 +138,7 @@ type ProcessorIMock struct { } lockGetPeers sync.RWMutex lockGetStats sync.RWMutex + lockHealth sync.RWMutex lockLoadUnmined sync.RWMutex lockProcessTransaction sync.RWMutex lockSendStatusForTransaction sync.RWMutex @@ -196,6 +206,33 @@ func (mock *ProcessorIMock) GetStatsCalls() []struct { return calls } +// Health calls HealthFunc. +func (mock *ProcessorIMock) Health() error { + if mock.HealthFunc == nil { + panic("ProcessorIMock.HealthFunc: method is nil but ProcessorI.Health was just called") + } + callInfo := struct { + }{} + mock.lockHealth.Lock() + mock.calls.Health = append(mock.calls.Health, callInfo) + mock.lockHealth.Unlock() + return mock.HealthFunc() +} + +// HealthCalls gets all the calls that were made to Health. +// Check the length with: +// +// len(mockedProcessorI.HealthCalls()) +func (mock *ProcessorIMock) HealthCalls() []struct { +} { + var calls []struct { + } + mock.lockHealth.RLock() + calls = mock.calls.Health + mock.lockHealth.RUnlock() + return calls +} + // LoadUnmined calls LoadUnminedFunc. func (mock *ProcessorIMock) LoadUnmined() { if mock.LoadUnminedFunc == nil { diff --git a/metamorph/mocks/store_mock.go b/metamorph/mocks/store_mock.go index 82a36de70..abd9c9eb5 100644 --- a/metamorph/mocks/store_mock.go +++ b/metamorph/mocks/store_mock.go @@ -37,6 +37,9 @@ var _ store.MetamorphStore = &MetamorphStoreMock{} // GetUnminedFunc: func(contextMoqParam context.Context, callback func(s *store.StoreData)) error { // panic("mock out the GetUnmined method") // }, +// PingFunc: func(ctx context.Context) error { +// panic("mock out the Ping method") +// }, // RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { // panic("mock out the RemoveCallbacker method") // }, @@ -80,6 +83,9 @@ type MetamorphStoreMock struct { // GetUnminedFunc mocks the GetUnmined method. GetUnminedFunc func(contextMoqParam context.Context, callback func(s *store.StoreData)) error + // PingFunc mocks the Ping method. + PingFunc func(ctx context.Context) error + // RemoveCallbackerFunc mocks the RemoveCallbacker method. RemoveCallbackerFunc func(ctx context.Context, hash *chainhash.Hash) error @@ -136,6 +142,11 @@ type MetamorphStoreMock struct { // Callback is the callback argument value. Callback func(s *store.StoreData) } + // Ping holds details about calls to the Ping method. + Ping []struct { + // Ctx is the ctx argument value. + Ctx context.Context + } // RemoveCallbacker holds details about calls to the RemoveCallbacker method. RemoveCallbacker []struct { // Ctx is the ctx argument value. @@ -201,6 +212,7 @@ type MetamorphStoreMock struct { lockGet sync.RWMutex lockGetBlockProcessed sync.RWMutex lockGetUnmined sync.RWMutex + lockPing sync.RWMutex lockRemoveCallbacker sync.RWMutex lockSet sync.RWMutex lockSetBlockProcessed sync.RWMutex @@ -386,6 +398,38 @@ func (mock *MetamorphStoreMock) GetUnminedCalls() []struct { return calls } +// Ping calls PingFunc. +func (mock *MetamorphStoreMock) Ping(ctx context.Context) error { + if mock.PingFunc == nil { + panic("MetamorphStoreMock.PingFunc: method is nil but MetamorphStore.Ping was just called") + } + callInfo := struct { + Ctx context.Context + }{ + Ctx: ctx, + } + mock.lockPing.Lock() + mock.calls.Ping = append(mock.calls.Ping, callInfo) + mock.lockPing.Unlock() + return mock.PingFunc(ctx) +} + +// PingCalls gets all the calls that were made to Ping. +// Check the length with: +// +// len(mockedMetamorphStore.PingCalls()) +func (mock *MetamorphStoreMock) PingCalls() []struct { + Ctx context.Context +} { + var calls []struct { + Ctx context.Context + } + mock.lockPing.RLock() + calls = mock.calls.Ping + mock.lockPing.RUnlock() + return calls +} + // RemoveCallbacker calls RemoveCallbackerFunc. func (mock *MetamorphStoreMock) RemoveCallbacker(ctx context.Context, hash *chainhash.Hash) error { if mock.RemoveCallbackerFunc == nil { diff --git a/metamorph/processor.go b/metamorph/processor.go index 0f6605e05..83fbd4c4e 100644 --- a/metamorph/processor.go +++ b/metamorph/processor.go @@ -199,7 +199,7 @@ func (p *Processor) processCheckIfMined() { continue } - p.logger.Debug("found blocks for transactions", slog.Int("number", len(blockTransactions.GetTransactionBlocks()))) + p.logger.Info("found blocks for transactions", slog.Int("number", len(blockTransactions.GetTransactionBlocks()))) for _, blockTxs := range blockTransactions.GetTransactionBlocks() { var blockHashString string @@ -506,3 +506,12 @@ func (p *Processor) ProcessTransaction(ctx context.Context, req *ProcessorReques }, }) } + +func (p *Processor) Health() error { + connected, _ := p.GetPeers() + if len(connected) == 0 { + return errors.New("no connection to any peers") + } + + return nil +} diff --git a/metamorph/server.go b/metamorph/server.go index 9e11dc563..7f4e70d07 100644 --- a/metamorph/server.go +++ b/metamorph/server.go @@ -59,6 +59,7 @@ type ProcessorI interface { GetStats(debugItems bool) *ProcessorStats GetPeers() ([]string, []string) Shutdown() + Health() error } // Server type carries the zmqLogger within it @@ -70,7 +71,6 @@ type Server struct { timeout time.Duration grpcServer *grpc.Server btc blocktx.ClientI - source string bitcoinNode BitcoinNode forceCheckUtxos bool blocktxTimeout time.Duration @@ -98,14 +98,13 @@ func WithForceCheckUtxos(bitcoinNode BitcoinNode) func(*Server) { type ServerOption func(s *Server) // NewServer will return a server instance with the zmqLogger stored within it -func NewServer(s store.MetamorphStore, p ProcessorI, btc blocktx.ClientI, source string, opts ...ServerOption) *Server { +func NewServer(s store.MetamorphStore, p ProcessorI, btc blocktx.ClientI, opts ...ServerOption) *Server { server := &Server{ logger: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: LogLevelDefault})).With(slog.String("service", "mtm")), processor: p, store: s, timeout: responseTimeout, btc: btc, - source: source, forceCheckUtxos: false, blocktxTimeout: blocktxTimeout, } diff --git a/metamorph/server_test.go b/metamorph/server_test.go index 98e9af302..29e649269 100644 --- a/metamorph/server_test.go +++ b/metamorph/server_test.go @@ -27,8 +27,6 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) -const source = "localhost:8000" - //go:generate moq -pkg mocks -out ./mocks/processor_mock.go . ProcessorI //go:generate moq -pkg mocks -out ./mocks/bitcon_mock.go . BitcoinNode @@ -72,7 +70,7 @@ func setStoreTestData(t *testing.T, s store.MetamorphStore) { func TestNewServer(t *testing.T) { t.Run("NewServer", func(t *testing.T) { - server := NewServer(nil, nil, nil, source) + server := NewServer(nil, nil, nil) assert.IsType(t, &Server{}, server) }) } @@ -99,7 +97,7 @@ func TestHealth(t *testing.T) { return []string{"peer1"}, []string{} } - server := NewServer(nil, processor, nil, source) + server := NewServer(nil, processor, nil) stats, err := server.Health(context.Background(), &emptypb.Empty{}) assert.NoError(t, err) assert.Equal(t, expectedStats.ChannelMapSize, stats.GetMapSize()) @@ -119,7 +117,7 @@ func TestPutTransaction(t *testing.T) { client := &ClientIMock{} - server := NewServer(s, processor, client, source) + server := NewServer(s, processor, client) server.SetTimeout(100 * time.Millisecond) var txStatus *metamorph_api.TransactionStatus @@ -143,7 +141,7 @@ func TestPutTransaction(t *testing.T) { }) t.Run("invalid request", func(t *testing.T) { - server := NewServer(nil, nil, nil, source) + server := NewServer(nil, nil, nil) txRequest := &metamorph_api.TransactionRequest{ CallbackUrl: "api.callback.com", @@ -160,7 +158,7 @@ func TestPutTransaction(t *testing.T) { processor := &ProcessorIMock{} btc := &ClientIMock{} - server := NewServer(s, processor, btc, source) + server := NewServer(s, processor, btc) var txStatus *metamorph_api.TransactionStatus txRequest := &metamorph_api.TransactionRequest{ @@ -187,7 +185,7 @@ func TestPutTransaction(t *testing.T) { processor := &ProcessorIMock{} btc := &ClientIMock{} - server := NewServer(s, processor, btc, source) + server := NewServer(s, processor, btc) var txStatus *metamorph_api.TransactionStatus txRequest := &metamorph_api.TransactionRequest{ @@ -316,7 +314,7 @@ func TestServer_GetTransactionStatus(t *testing.T) { }, } - server := NewServer(metamorphStore, nil, client, source) + server := NewServer(metamorphStore, nil, client) got, err := server.GetTransactionStatus(context.Background(), tt.req) if !tt.wantErr(t, err, fmt.Sprintf("GetTransactionStatus(%v)", tt.req)) { return @@ -564,7 +562,7 @@ func TestPutTransactions(t *testing.T) { } serverLogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) - server := NewServer(nil, processor, nil, source, WithLogger(serverLogger)) + server := NewServer(nil, processor, nil, WithLogger(serverLogger)) server.SetTimeout(5 * time.Second) statuses, err := server.PutTransactions(context.Background(), tc.requests) @@ -623,7 +621,7 @@ func TestSetUnlockedbyName(t *testing.T) { }, } - server := NewServer(metamorphStore, nil, nil, source) + server := NewServer(metamorphStore, nil, nil) response, err := server.SetUnlockedByName(context.Background(), &metamorph_api.SetUnlockedByNameRequest{ Name: "test", }) @@ -666,7 +664,7 @@ func TestStartGRPCServer(t *testing.T) { processor := &ProcessorIMock{ ShutdownFunc: func() {}, } - server := NewServer(metamorphStore, processor, btc, source) + server := NewServer(metamorphStore, processor, btc) go func() { err := server.StartGRPCServer("localhost:7000", 10000) diff --git a/metamorph/store/Interface.go b/metamorph/store/Interface.go index 902a6a344..95823dc82 100644 --- a/metamorph/store/Interface.go +++ b/metamorph/store/Interface.go @@ -217,6 +217,8 @@ type MetamorphStore interface { Close(ctx context.Context) error GetBlockProcessed(ctx context.Context, blockHash *chainhash.Hash) (*time.Time, error) SetBlockProcessed(ctx context.Context, blockHash *chainhash.Hash) error + + Ping(ctx context.Context) error } func encodeTime(buf *bytes.Buffer, tm time.Time) error { diff --git a/metamorph/store/badger/badger.go b/metamorph/store/badger/badger.go index e50e90534..53137645b 100644 --- a/metamorph/store/badger/badger.go +++ b/metamorph/store/badger/badger.go @@ -402,3 +402,18 @@ func (s *Badger) SetBlockProcessed(ctx context.Context, blockHash *chainhash.Has return nil } + +func (s *Badger) Ping(ctx context.Context) error { + start := gocore.CurrentNanos() + defer func() { + gocore.NewStat("mtm_store_badger").NewStat("Ping").AddTime(start) + }() + span, _ := opentracing.StartSpanFromContext(ctx, "badger:Ping") + defer span.Finish() + + if s.store == nil { + return errors.New("badger db store not found") + } + + return nil +} diff --git a/metamorph/store/dynamodb/dynamodb.go b/metamorph/store/dynamodb/dynamodb.go index 14c99cc45..dd6b4ca4e 100644 --- a/metamorph/store/dynamodb/dynamodb.go +++ b/metamorph/store/dynamodb/dynamodb.go @@ -660,3 +660,19 @@ func (ddb *DynamoDB) Close(ctx context.Context) error { ctx.Done() return nil } + +func (ddb *DynamoDB) Ping(ctx context.Context) error { + startNanos := ddb.now().UnixNano() + defer func() { + gocore.NewStat("mtm_store_dynamodb").NewStat("Ping").AddTime(startNanos) + }() + span, _ := opentracing.StartSpanFromContext(ctx, "dynamodb:Ping") + defer span.Finish() + + _, err := ddb.client.ListTables(ctx, &dynamodb.ListTablesInput{}) + if err != nil { + return err + } + + return nil +} diff --git a/metamorph/store/postgresql/postgres.go b/metamorph/store/postgresql/postgres.go index 034ee6225..d90b32e1b 100644 --- a/metamorph/store/postgresql/postgres.go +++ b/metamorph/store/postgresql/postgres.go @@ -651,3 +651,19 @@ func (p *PostgreSQL) Close(ctx context.Context) error { ctx.Done() return p.db.Close() } + +func (p *PostgreSQL) Ping(ctx context.Context) error { + startNanos := p.now().UnixNano() + defer func() { + gocore.NewStat("mtm_store_sql").NewStat("Ping").AddTime(startNanos) + }() + span, _ := opentracing.StartSpanFromContext(ctx, "sql:Ping") + defer span.Finish() + + _, err := p.db.QueryContext(ctx, "SELECT 1;") + if err != nil { + return err + } + + return nil +} diff --git a/metamorph/store/sqlite/sqlite.go b/metamorph/store/sqlite/sqlite.go index cee80c93b..2b4530875 100644 --- a/metamorph/store/sqlite/sqlite.go +++ b/metamorph/store/sqlite/sqlite.go @@ -607,3 +607,19 @@ func (s *SqLite) Close(ctx context.Context) error { ctx.Done() return s.db.Close() } + +func (s *SqLite) Ping(ctx context.Context) error { + startNanos := s.now().UnixNano() + defer func() { + gocore.NewStat("mtm_store_sql").NewStat("Ping").AddTime(startNanos) + }() + span, _ := opentracing.StartSpanFromContext(ctx, "sql:Ping") + defer span.Finish() + + _, err := s.db.QueryContext(ctx, "SELECT 1;") + if err != nil { + return err + } + + return nil +} diff --git a/test/config/config.yaml b/test/config/config.yaml index a67797e5a..8e37602bc 100644 --- a/test/config/config.yaml +++ b/test/config/config.yaml @@ -57,6 +57,7 @@ metamorph: statsKeypress: false # enable stats keypress. If enabled pressing any key will print stats to stdout profilerAddr: localhost:9992 # address to start profiler server on checkIfMinedInterval: 1s + healthServerDialAddr: localhost:8005 blocktx: listenAddr: 0.0.0.0:8011 # address space for blocktx to listen on. Can be for example localhost:8011 or :8011 for listening on all addresses