diff --git a/README.md b/README.md index 27da058..546c1a4 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,11 @@ Use flag `--registry=mdns` - [plugins](plugins) - How to use plugins - [template](template) - Api, web and srv service templates generated with `micro new` +- [client](client) - Usage of the Client package to call a service. +- [broker](broker) - A example of using Broker for Publish and Subscribing. +- [server](server) - Use of the Server package directly to server requests. +- [service](service) - Example of the top level Service in go-micro. + ## External - [auth-srv](https://github.com/micro/auth-srv) - An Oauth2 authentication service diff --git a/broker/README.md b/broker/README.md new file mode 100644 index 0000000..67246f2 --- /dev/null +++ b/broker/README.md @@ -0,0 +1,7 @@ +# Broker + +## Contents + +- main.go - demonstrates simple runs pub-sub as two go routines running for 10 seconds. +- producer - publishes messages to the broker every second +- consumer - consumes any messages sent by the producer diff --git a/broker/consumer/consumer.go b/broker/consumer/consumer.go new file mode 100644 index 0000000..465a473 --- /dev/null +++ b/broker/consumer/consumer.go @@ -0,0 +1,51 @@ +package main + +import ( + "fmt" + "log" + + "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/cmd" + // To enable rabbitmq plugin uncomment + //_ "github.com/micro/go-plugins/broker/rabbitmq" +) + +var ( + topic = "go.micro.topic.foo" +) + +// Example of a shared subscription which receives a subset of messages +func sharedSub() { + _, err := broker.Subscribe(topic, func(p broker.Publication) error { + fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) + return nil + }, broker.Queue("consumer")) + if err != nil { + fmt.Println(err) + } +} + +// Example of a subscription which receives all the messages +func sub() { + _, err := broker.Subscribe(topic, func(p broker.Publication) error { + fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) + return nil + }) + if err != nil { + fmt.Println(err) + } +} + +func main() { + cmd.Init() + + if err := broker.Init(); err != nil { + log.Fatalf("Broker Init error: %v", err) + } + if err := broker.Connect(); err != nil { + log.Fatalf("Broker Connect error: %v", err) + } + + sub() + select {} +} diff --git a/broker/main.go b/broker/main.go new file mode 100644 index 0000000..a1ff601 --- /dev/null +++ b/broker/main.go @@ -0,0 +1,59 @@ +package main + +import ( + "fmt" + "log" + "time" + + "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/cmd" +) + +var ( + topic = "go.micro.topic.foo" +) + +func pub() { + tick := time.NewTicker(time.Second) + i := 0 + for _ = range tick.C { + msg := &broker.Message{ + Header: map[string]string{ + "id": fmt.Sprintf("%d", i), + }, + Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())), + } + if err := broker.Publish(topic, msg); err != nil { + log.Printf("[pub] failed: %v", err) + } else { + fmt.Println("[pub] pubbed message:", string(msg.Body)) + } + i++ + } +} + +func sub() { + _, err := broker.Subscribe(topic, func(p broker.Publication) error { + fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) + return nil + }) + if err != nil { + fmt.Println(err) + } +} + +func main() { + cmd.Init() + + if err := broker.Init(); err != nil { + log.Fatalf("Broker Init error: %v", err) + } + if err := broker.Connect(); err != nil { + log.Fatalf("Broker Connect error: %v", err) + } + + go pub() + go sub() + + <-time.After(time.Second * 10) +} diff --git a/broker/producer/producer.go b/broker/producer/producer.go new file mode 100644 index 0000000..ac724dc --- /dev/null +++ b/broker/producer/producer.go @@ -0,0 +1,49 @@ +package main + +import ( + "fmt" + "log" + "time" + + "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/cmd" + // To enable rabbitmq plugin uncomment + //_ "github.com/micro/go-plugins/broker/rabbitmq" +) + +var ( + topic = "go.micro.topic.foo" +) + +func pub() { + tick := time.NewTicker(time.Second) + i := 0 + for _ = range tick.C { + msg := &broker.Message{ + Header: map[string]string{ + "id": fmt.Sprintf("%d", i), + }, + Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())), + } + if err := broker.Publish(topic, msg); err != nil { + log.Printf("[pub] failed: %v", err) + } else { + fmt.Println("[pub] pubbed message:", string(msg.Body)) + } + i++ + } +} + +func main() { + cmd.Init() + + if err := broker.Init(); err != nil { + log.Fatalf("Broker Init error: %v", err) + } + + if err := broker.Connect(); err != nil { + log.Fatalf("Broker Connect error: %v", err) + } + + pub() +} diff --git a/client/README.md b/client/README.md new file mode 100644 index 0000000..ead7486 --- /dev/null +++ b/client/README.md @@ -0,0 +1,12 @@ +# Client + +## Contents + +- main.go - calls each of the go.micro.srv.example handlers and includes the use of the streaming handler +- codegen - demonstrates how to use code generation to remove boilerplate code +- dc_filter - shows how to use Select filters inside a call wrapper for filtering to the local DC +- dc_selector - is the same as dc_filter but as a Selector implementation itself +- pub - publishes messages using the Publish method. By default encoding in protobuf +- selector - shows how to write and load your own Selector +- wrapper - provides examples for how to use client Wrappers (middleware) + diff --git a/client/codegen/README.md b/client/codegen/README.md new file mode 100644 index 0000000..d8069c9 --- /dev/null +++ b/client/codegen/README.md @@ -0,0 +1,150 @@ +# Code Generation [Experimental] + +We're experimenting with code generation to reduce the amount of boiler plate code written. + +## Example + +Going from this +```golang +req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{ + Name: "John", +}) + +rsp := &example.Response{} + +if err := client.Call(context.Background(), req, rsp); err != nil { + return err +} +``` + +To + +```golang +rsp, err := cl.Call(context.Background(), &example.Request{Name: "John"}) +if err != nil { + return err +} +``` + +## Generation of stub code for the example service + +```shell +go get github.com/micro/protobuf/protoc-gen-go +cd examples/server/proto/example +protoc --go_out=plugins=micro:. example.proto +``` + +Look at examples/server/proto/example/example.pb.go +to see the generated code. + +## Guide + +### Download the protoc-gen-go code + +```shell +go get github.com/micro/protobuf/protoc-gen-go +``` + +### Define your proto service. + +hello.proto +```shell +syntax = "proto3"; + +// package name is used as the service name for discovery +// if service name is not passed in when initialising the +// client +package go.micro.srv.greeter; + +service Say { + rpc Hello(Request) returns (Response) {} +} + +message Request { + optional string name = 1; +} + +message Response { + optional string msg = 1; +} +``` + +**Note: Remember to set package name in the proto, it's used to generate +the service for discovery.** + +### Generate code + +```shell +protoc --go_out=plugins=micro:. hello.proto +``` + +### Generated code + +```shell +// Client API for Say service + +type SayClient interface { + Hello(ctx context.Context, in *Request) (*Response, error) +} + +type sayClient struct { + c client.Client + serviceName string +} + +func NewSayClient(serviceName string, c client.Client) SayClient { + if c == nil { + c = client.NewClient() + } + if len(serviceName) == 0 { + serviceName = "go.micro.srv.greeter" + } + return &sayClient{ + c: c, + serviceName: serviceName, + } +} + +func (c *sayClient) Hello(ctx context.Context, in *Request) (*Response, error) { + req := c.c.NewRequest(c.serviceName, "Say.Hello", in) + out := new(Response) + err := c.c.Call(ctx, req, out) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Say service + +type SayHandler interface { + Hello(context.Context, *Request, *Response) error +} + +func RegisterSayHandler(s server.Server, hdlr SayHandler) { + s.Handle(s.NewHandler(hdlr)) +} +``` + +### Use the client +```golang + +import ( + "fmt" + + "golang.org/x/net/context" + "github.com/micro/go-micro/client" + hello "path/to/hello/proto" +) + +func main() { + cl := hello.NewSayClient("go.micro.srv.greeter", client.DefaultClient) + // alternative initialisation + // cl := hello.NewSayClient("", nil) + + rsp, err := cl.Hello(contex.Background(), &hello.Request{"Name": "John"}) + if err != nil { + fmt.Println(err) + } +} +``` diff --git a/client/codegen/codegen.go b/client/codegen/codegen.go new file mode 100644 index 0000000..98c1e25 --- /dev/null +++ b/client/codegen/codegen.go @@ -0,0 +1,79 @@ +package main + +import ( + "fmt" + + example "github.com/micro/examples/server/proto/example" + "github.com/micro/go-micro/cmd" + "golang.org/x/net/context" +) + +var ( + cl = example.NewExampleClient("go.micro.srv.example", nil) +) + +func call(i int) { + rsp, err := cl.Call(context.Background(), &example.Request{Name: "John"}) + if err != nil { + fmt.Println("call err: ", err, rsp) + return + } + fmt.Println("Call:", i, "rsp:", rsp.Msg) +} + +func stream(i int) { + stream, err := cl.Stream(context.Background(), &example.StreamingRequest{Count: int64(i)}) + if err != nil { + fmt.Println("err:", err) + return + } + for j := 0; j < i; j++ { + rsp, err := stream.Recv() + if err != nil { + fmt.Println("err:", err) + break + } + fmt.Println("Stream: rsp:", rsp.Count) + } + if err := stream.Close(); err != nil { + fmt.Println("stream close err:", err) + } +} + +func pingPong(i int) { + stream, err := cl.PingPong(context.Background()) + if err != nil { + fmt.Println("err:", err) + return + } + for j := 0; j < i; j++ { + if err := stream.Send(&example.Ping{Stroke: int64(j)}); err != nil { + fmt.Println("err:", err) + return + } + rsp, err := stream.Recv() + if err != nil { + fmt.Println("recv err", err) + break + } + fmt.Printf("Sent ping %v got pong %v\n", j, rsp.Stroke) + } + if err := stream.Close(); err != nil { + fmt.Println("stream close err:", err) + } +} + +func main() { + cmd.Init() + + fmt.Println("\n--- Call example ---\n") + for i := 0; i < 10; i++ { + call(i) + } + + fmt.Println("\n--- Streamer example ---\n") + stream(10) + + fmt.Println("\n--- Ping Pong example ---\n") + pingPong(10) +} diff --git a/client/dc_filter/dc_filter.go b/client/dc_filter/dc_filter.go new file mode 100644 index 0000000..184b153 --- /dev/null +++ b/client/dc_filter/dc_filter.go @@ -0,0 +1,88 @@ +package main + +import ( + "fmt" + "math/rand" + "time" + + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/metadata" + "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/selector" + "golang.org/x/net/context" + + example "github.com/micro/examples/server/proto/example" +) + +func init() { + rand.Seed(time.Now().Unix()) +} + +// A Wrapper that creates a Datacenter Selector Option +type dcWrapper struct { + client.Client +} + +func (dc *dcWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { + md, _ := metadata.FromContext(ctx) + + filter := func(services []*registry.Service) []*registry.Service { + for _, service := range services { + var nodes []*registry.Node + for _, node := range service.Nodes { + if node.Metadata["datacenter"] == md["datacenter"] { + nodes = append(nodes, node) + } + } + service.Nodes = nodes + } + return services + } + + callOptions := append(opts, client.WithSelectOption( + selector.WithFilter(filter), + )) + + fmt.Printf("[DC Wrapper] filtering for datacenter %s\n", md["datacenter"]) + return dc.Client.Call(ctx, req, rsp, callOptions...) +} + +func NewDCWrapper(c client.Client) client.Client { + return &dcWrapper{c} +} + +func call(i int) { + // Create new request to service go.micro.srv.example, method Example.Call + req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{ + Name: "John", + }) + + // create context with metadata + ctx := metadata.NewContext(context.Background(), map[string]string{ + "datacenter": "local", + }) + + rsp := &example.Response{} + + // Call service + if err := client.Call(ctx, req, rsp); err != nil { + fmt.Println("call err: ", err, rsp) + return + } + + fmt.Println("Call:", i, "rsp:", rsp.Msg) +} + +func main() { + cmd.Init() + + client.DefaultClient = client.NewClient( + client.Wrap(NewDCWrapper), + ) + + fmt.Println("\n--- Call example ---\n") + for i := 0; i < 10; i++ { + call(i) + } +} diff --git a/client/dc_selector/dc_selector.go b/client/dc_selector/dc_selector.go new file mode 100644 index 0000000..3fca9d9 --- /dev/null +++ b/client/dc_selector/dc_selector.go @@ -0,0 +1,134 @@ +package main + +import ( + "fmt" + "math/rand" + "sync" + "time" + + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/selector" + "golang.org/x/net/context" + + example "github.com/micro/examples/server/proto/example" +) + +// Built in random hashed node selector +type dcSelector struct { + opts selector.Options +} + +var ( + datacenter = "local" +) + +func init() { + rand.Seed(time.Now().Unix()) +} + +func (n *dcSelector) Init(opts ...selector.Option) error { + for _, o := range opts { + o(&n.opts) + } + return nil +} + +func (n *dcSelector) Options() selector.Options { + return n.opts +} + +func (n *dcSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { + services, err := n.opts.Registry.GetService(service) + if err != nil { + return nil, err + } + + if len(services) == 0 { + return nil, selector.ErrNotFound + } + + var nodes []*registry.Node + + // Filter the nodes for datacenter + for _, service := range services { + for _, node := range service.Nodes { + if node.Metadata["datacenter"] == datacenter { + nodes = append(nodes, node) + } + } + } + + if len(nodes) == 0 { + return nil, selector.ErrNotFound + } + + var i int + var mtx sync.Mutex + + return func() (*registry.Node, error) { + mtx.Lock() + defer mtx.Unlock() + i++ + return nodes[i%len(nodes)], nil + }, nil +} + +func (n *dcSelector) Mark(service string, node *registry.Node, err error) { + return +} + +func (n *dcSelector) Reset(service string) { + return +} + +func (n *dcSelector) Close() error { + return nil +} + +func (n *dcSelector) String() string { + return "dc" +} + +// Return a new first node selector +func DCSelector(opts ...selector.Option) selector.Selector { + var sopts selector.Options + for _, opt := range opts { + opt(&sopts) + } + if sopts.Registry == nil { + sopts.Registry = registry.DefaultRegistry + } + return &dcSelector{sopts} +} + +func call(i int) { + // Create new request to service go.micro.srv.example, method Example.Call + req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{ + Name: "John", + }) + + rsp := &example.Response{} + + // Call service + if err := client.Call(context.Background(), req, rsp); err != nil { + fmt.Println("call err: ", err, rsp) + return + } + + fmt.Println("Call:", i, "rsp:", rsp.Msg) +} + +func main() { + cmd.Init() + + client.DefaultClient = client.NewClient( + client.Selector(DCSelector()), + ) + + fmt.Println("\n--- Call example ---\n") + for i := 0; i < 10; i++ { + call(i) + } +} diff --git a/client/main.go b/client/main.go new file mode 100644 index 0000000..018e26c --- /dev/null +++ b/client/main.go @@ -0,0 +1,143 @@ +package main + +import ( + "fmt" + + example "github.com/micro/examples/server/proto/example" + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/metadata" + "golang.org/x/net/context" +) + +// publishes a message +func pub() { + msg := client.NewPublication("topic.go.micro.srv.example", &example.Message{ + Say: "This is a publication", + }) + + // create context with metadata + ctx := metadata.NewContext(context.Background(), map[string]string{ + "X-User-Id": "john", + "X-From-Id": "script", + }) + + // publish message + if err := client.Publish(ctx, msg); err != nil { + fmt.Println("pub err: ", err) + return + } + + fmt.Printf("Published: %v\n", msg) +} + +func call(i int) { + // Create new request to service go.micro.srv.example, method Example.Call + req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{ + Name: "John", + }) + + // create context with metadata + ctx := metadata.NewContext(context.Background(), map[string]string{ + "X-User-Id": "john", + "X-From-Id": "script", + }) + + rsp := &example.Response{} + + // Call service + if err := client.Call(ctx, req, rsp); err != nil { + fmt.Println("call err: ", err, rsp) + return + } + + fmt.Println("Call:", i, "rsp:", rsp.Msg) +} + +func stream(i int) { + // Create new request to service go.micro.srv.example, method Example.Call + // Request can be empty as its actually ignored and merely used to call the handler + req := client.NewRequest("go.micro.srv.example", "Example.Stream", &example.StreamingRequest{}) + + stream, err := client.Stream(context.Background(), req) + if err != nil { + fmt.Println("err:", err) + return + } + if err := stream.Send(&example.StreamingRequest{Count: int64(i)}); err != nil { + fmt.Println("err:", err) + return + } + for stream.Error() == nil { + rsp := &example.StreamingResponse{} + err := stream.Recv(rsp) + if err != nil { + fmt.Println("recv err", err) + break + } + fmt.Println("Stream: rsp:", rsp.Count) + } + + if stream.Error() != nil { + fmt.Println("stream err:", err) + return + } + + if err := stream.Close(); err != nil { + fmt.Println("stream close err:", err) + } +} + +func pingPong(i int) { + // Create new request to service go.micro.srv.example, method Example.Call + // Request can be empty as its actually ignored and merely used to call the handler + req := client.NewRequest("go.micro.srv.example", "Example.PingPong", &example.StreamingRequest{}) + + stream, err := client.Stream(context.Background(), req) + if err != nil { + fmt.Println("err:", err) + return + } + + for j := 0; j < i; j++ { + if err := stream.Send(&example.Ping{Stroke: int64(j + 1)}); err != nil { + fmt.Println("err:", err) + return + } + rsp := &example.Pong{} + err := stream.Recv(rsp) + if err != nil { + fmt.Println("recv err", err) + break + } + fmt.Printf("Sent ping %v got pong %v\n", j+1, rsp.Stroke) + } + + if stream.Error() != nil { + fmt.Println("stream err:", err) + return + } + + if err := stream.Close(); err != nil { + fmt.Println("stream close err:", err) + } +} + +func main() { + cmd.Init() + + fmt.Println("\n--- Publisher example ---\n") + pub() + + fmt.Println("\n--- Call example ---\n") + for i := 0; i < 10; i++ { + call(i) + } + + fmt.Println("\n--- Streamer example ---\n") + stream(10) + + fmt.Println("\n--- Ping Pong example ---\n") + pingPong(10) + +} diff --git a/client/pub/pub.go b/client/pub/pub.go new file mode 100644 index 0000000..1ee8d32 --- /dev/null +++ b/client/pub/pub.go @@ -0,0 +1,40 @@ +package main + +import ( + "fmt" + + example "github.com/micro/examples/server/proto/example" + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/metadata" + "golang.org/x/net/context" +) + +// publishes a message +func pub(i int) { + msg := client.NewPublication("topic.go.micro.srv.example", &example.Message{ + Say: fmt.Sprintf("This is a publication %d", i), + }) + + // create context with metadata + ctx := metadata.NewContext(context.Background(), map[string]string{ + "X-User-Id": "john", + "X-From-Id": "script", + }) + + // publish message + if err := client.Publish(ctx, msg); err != nil { + fmt.Println("pub err: ", err) + return + } + + fmt.Printf("Published %d: %v\n", i, msg) +} + +func main() { + cmd.Init() + fmt.Println("\n--- Publisher example ---\n") + for i := 0; i < 10; i++ { + pub(i) + } +} diff --git a/client/selector/selector.go b/client/selector/selector.go new file mode 100644 index 0000000..1e29eeb --- /dev/null +++ b/client/selector/selector.go @@ -0,0 +1,124 @@ +package main + +import ( + "fmt" + "math/rand" + "time" + + example "github.com/micro/examples/server/proto/example" + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/selector" + "golang.org/x/net/context" +) + +func init() { + rand.Seed(time.Now().Unix()) +} + +// Built in random hashed node selector +type firstNodeSelector struct { + opts selector.Options +} + +func (n *firstNodeSelector) Init(opts ...selector.Option) error { + for _, o := range opts { + o(&n.opts) + } + return nil +} + +func (n *firstNodeSelector) Options() selector.Options { + return n.opts +} + +func (n *firstNodeSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { + services, err := n.opts.Registry.GetService(service) + if err != nil { + return nil, err + } + + if len(services) == 0 { + return nil, selector.ErrNotFound + } + + var sopts selector.SelectOptions + for _, opt := range opts { + opt(&sopts) + } + + for _, filter := range sopts.Filters { + services = filter(services) + } + + if len(services) == 0 { + return nil, selector.ErrNotFound + } + + if len(services[0].Nodes) == 0 { + return nil, selector.ErrNotFound + } + + return func() (*registry.Node, error) { + return services[0].Nodes[0], nil + }, nil +} + +func (n *firstNodeSelector) Mark(service string, node *registry.Node, err error) { + return +} + +func (n *firstNodeSelector) Reset(service string) { + return +} + +func (n *firstNodeSelector) Close() error { + return nil +} + +func (n *firstNodeSelector) String() string { + return "first" +} + +// Return a new first node selector +func FirstNodeSelector(opts ...selector.Option) selector.Selector { + var sopts selector.Options + for _, opt := range opts { + opt(&sopts) + } + if sopts.Registry == nil { + sopts.Registry = registry.DefaultRegistry + } + return &firstNodeSelector{sopts} +} + +func call(i int) { + // Create new request to service go.micro.srv.example, method Example.Call + req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{ + Name: "John", + }) + + rsp := &example.Response{} + + // Call service + if err := client.Call(context.Background(), req, rsp); err != nil { + fmt.Println("call err: ", err, rsp) + return + } + + fmt.Println("Call:", i, "rsp:", rsp.Msg) +} + +func main() { + cmd.Init() + + client.DefaultClient = client.NewClient( + client.Selector(FirstNodeSelector()), + ) + + fmt.Println("\n--- Call example ---\n") + for i := 0; i < 10; i++ { + call(i) + } +} diff --git a/client/wrapper/wrapper.go b/client/wrapper/wrapper.go new file mode 100644 index 0000000..616d0b7 --- /dev/null +++ b/client/wrapper/wrapper.go @@ -0,0 +1,109 @@ +package main + +import ( + "fmt" + "time" + + example "github.com/micro/examples/server/proto/example" + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/metadata" + "golang.org/x/net/context" +) + +// wrapper example code + +// log wrapper logs every time a request is made +type logWrapper struct { + client.Client +} + +func (l *logWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { + md, _ := metadata.FromContext(ctx) + fmt.Printf("[Log Wrapper] ctx: %v service: %s method: %s\n", md, req.Service(), req.Method()) + return l.Client.Call(ctx, req, rsp) +} + +// trace wrapper attaches a unique trace ID - timestamp +type traceWrapper struct { + client.Client +} + +func (t *traceWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { + ctx = metadata.NewContext(ctx, map[string]string{ + "X-Trace-Id": fmt.Sprintf("%d", time.Now().Unix()), + }) + return t.Client.Call(ctx, req, rsp) +} + +// Implements client.Wrapper as logWrapper +func logWrap(c client.Client) client.Client { + return &logWrapper{c} +} + +// Implements client.Wrapper as traceWrapper +func traceWrap(c client.Client) client.Client { + return &traceWrapper{c} +} + +func metricsWrap(cf client.CallFunc) client.CallFunc { + return func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { + t := time.Now() + err := cf(ctx, addr, req, rsp, opts) + fmt.Printf("[Metrics Wrapper] called: %s %s.%s duration: %v\n", addr, req.Service(), req.Method(), time.Since(t)) + return err + } +} + +func call(i int) { + // Create new request to service go.micro.srv.example, method Example.Call + req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{ + Name: "John", + }) + + // create context with metadata + ctx := metadata.NewContext(context.Background(), map[string]string{ + "X-User-Id": "john", + "X-From-Id": "script", + }) + + rsp := &example.Response{} + + // Call service + if err := client.Call(ctx, req, rsp); err != nil { + fmt.Println("call err: ", err, rsp) + return + } + + fmt.Println("Call:", i, "rsp:", rsp.Msg) +} + +func main() { + cmd.Init() + + fmt.Println("\n--- Log Wrapper example ---\n") + + // Wrap the default client + client.DefaultClient = logWrap(client.DefaultClient) + + call(0) + + fmt.Println("\n--- Log+Trace Wrapper example ---\n") + + // Wrap using client.Wrap option + client.DefaultClient = client.NewClient( + client.Wrap(traceWrap), + client.Wrap(logWrap), + ) + + call(1) + + fmt.Println("\n--- Metrics Wrapper example ---\n") + + // Wrap using client.Wrap option + client.DefaultClient = client.NewClient( + client.WrapCall(metricsWrap), + ) + + call(2) +} diff --git a/server/Dockerfile b/server/Dockerfile new file mode 100644 index 0000000..c24b628 --- /dev/null +++ b/server/Dockerfile @@ -0,0 +1,5 @@ +# Build binary with the following command +# CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-w' -o server ./main.go +FROM alpine:3.2 +ADD server / +ENTRYPOINT [ "/server" ] diff --git a/server/README.md b/server/README.md new file mode 100644 index 0000000..12c76cf --- /dev/null +++ b/server/README.md @@ -0,0 +1,38 @@ +# Service + +An example Go service running with go-micro + +## Contents + +- main.go - initialises and runs the the server +- handler - is an example RPC request handler for the Server +- proto - contains the protobuf defintion for the Server API +- subscriber - is a handler for subscribing via the Server +- wrapper - demonstrates use of a server HandlerWrapper +- codegen - shows how to use codegenerated registration to reduce boilerplate + +## Usage + +### Prerequisites + +Install Consul +[https://www.consul.io/intro/getting-started/install.html](https://www.consul.io/intro/getting-started/install.html) + +Run Consul +``` +$ consul agent -dev -advertise=127.0.0.1 +``` + +Run Service +``` +$ go run server/main.go +I0525 18:06:14.471489 83304 server.go:117] Starting server go.micro.srv.example id go.micro.srv.example-59b6e0ab-0300-11e5-b696-68a86d0d36b6 +I0525 18:06:14.474960 83304 rpc_server.go:126] Listening on [::]:62216 +I0525 18:06:14.474997 83304 server.go:99] Registering node: go.micro.srv.example-59b6e0ab-0300-11e5-b696-68a86d0d36b6 +``` + +Test Service +``` +$ go run client/main.go +go.micro.srv.example-59b6e0ab-0300-11e5-b696-68a86d0d36b6: Hello John +``` diff --git a/server/codegen/codegen.go b/server/codegen/codegen.go new file mode 100644 index 0000000..6205cbd --- /dev/null +++ b/server/codegen/codegen.go @@ -0,0 +1,83 @@ +package main + +import ( + "log" + + "github.com/micro/examples/server/subscriber" + "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/server" + "golang.org/x/net/context" + + example "github.com/micro/examples/server/proto/example" +) + +type Example struct{} + +func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error { + log.Print("Received Example.Call request") + rsp.Msg = server.DefaultOptions().Id + ": Hello " + req.Name + return nil +} + +func (e *Example) Stream(ctx context.Context, req *example.StreamingRequest, stream example.Example_StreamStream) error { + log.Printf("Received Example.Stream request with count: %d", req.Count) + + for i := 0; i < int(req.Count); i++ { + log.Printf("Responding: %d", i) + if err := stream.Send(&example.StreamingResponse{ + Count: int64(i), + }); err != nil { + return err + } + } + + return nil +} + +func (e *Example) PingPong(ctx context.Context, stream example.Example_PingPongStream) error { + for { + req, err := stream.Recv() + if err != nil { + return err + } + log.Printf("Got ping %v", req.Stroke) + if err := stream.Send(&example.Pong{Stroke: req.Stroke}); err != nil { + return err + } + } +} + +func main() { + // optionally setup command line usage + cmd.Init() + + // Initialise Server + server.Init( + server.Name("go.micro.srv.example"), + ) + + // Register Subscribers + server.Subscribe( + server.NewSubscriber( + "topic.go.micro.srv.example", + new(subscriber.Example), + ), + ) + + server.Subscribe( + server.NewSubscriber( + "topic.go.micro.srv.example", + subscriber.Handler, + ), + ) + + // Register Handler + example.RegisterExampleHandler( + server.DefaultServer, new(Example), + ) + + // Run server + if err := server.Run(); err != nil { + log.Fatal(err) + } +} diff --git a/server/handler/example.go b/server/handler/example.go new file mode 100644 index 0000000..cec5452 --- /dev/null +++ b/server/handler/example.go @@ -0,0 +1,58 @@ +package handler + +import ( + "log" + + example "github.com/micro/examples/server/proto/example" + "github.com/micro/go-micro/metadata" + "github.com/micro/go-micro/server" + + "golang.org/x/net/context" +) + +type Example struct{} + +func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error { + md, _ := metadata.FromContext(ctx) + log.Printf("Received Example.Call request with metadata: %v", md) + rsp.Msg = server.DefaultOptions().Id + ": Hello " + req.Name + return nil +} + +func (e *Example) Stream(ctx context.Context, stream server.Streamer) error { + log.Print("Executing streaming handler") + req := &example.StreamingRequest{} + + // We just want to receive 1 request and then process here + if err := stream.Recv(req); err != nil { + log.Printf("Error receiving streaming request: %v", err) + return err + } + + log.Printf("Received Example.Stream request with count: %d", req.Count) + + for i := 0; i < int(req.Count); i++ { + log.Printf("Responding: %d", i) + + if err := stream.Send(&example.StreamingResponse{ + Count: int64(i), + }); err != nil { + return err + } + } + + return nil +} + +func (e *Example) PingPong(ctx context.Context, stream server.Streamer) error { + for { + req := &example.Ping{} + if err := stream.Recv(req); err != nil { + return err + } + log.Printf("Got ping %v", req.Stroke) + if err := stream.Send(&example.Pong{Stroke: req.Stroke}); err != nil { + return err + } + } +} diff --git a/server/main.go b/server/main.go new file mode 100644 index 0000000..e918527 --- /dev/null +++ b/server/main.go @@ -0,0 +1,51 @@ +package main + +import ( + "log" + + "github.com/micro/examples/server/handler" + "github.com/micro/examples/server/subscriber" + "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/server" +) + +func main() { + // optionally setup command line usage + cmd.Init() + + // Initialise Server + server.Init( + server.Name("go.micro.srv.example"), + ) + + // Register Handlers + server.Handle( + server.NewHandler( + new(handler.Example), + ), + ) + + // Register Subscribers + if err := server.Subscribe( + server.NewSubscriber( + "topic.go.micro.srv.example", + new(subscriber.Example), + ), + ); err != nil { + log.Fatal(err) + } + + if err := server.Subscribe( + server.NewSubscriber( + "topic.go.micro.srv.example", + subscriber.Handler, + ), + ); err != nil { + log.Fatal(err) + } + + // Run server + if err := server.Run(); err != nil { + log.Fatal(err) + } +} diff --git a/server/proto/example/example.pb.go b/server/proto/example/example.pb.go new file mode 100644 index 0000000..238ac67 --- /dev/null +++ b/server/proto/example/example.pb.go @@ -0,0 +1,355 @@ +// Code generated by protoc-gen-go. +// source: go-micro/examples/server/proto/example/example.proto +// DO NOT EDIT! + +/* +Package go_micro_srv_example is a generated protocol buffer package. + +It is generated from these files: + go-micro/examples/server/proto/example/example.proto + +It has these top-level messages: + Message + Request + Response + StreamingRequest + StreamingResponse + Ping + Pong +*/ +package go_micro_srv_example + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + client "github.com/micro/go-micro/client" + server "github.com/micro/go-micro/server" + context "golang.org/x/net/context" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +type Message struct { + Say string `protobuf:"bytes,1,opt,name=say" json:"say,omitempty"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +type Request struct { + Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` +} + +func (m *Request) Reset() { *m = Request{} } +func (m *Request) String() string { return proto.CompactTextString(m) } +func (*Request) ProtoMessage() {} +func (*Request) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +type Response struct { + Msg string `protobuf:"bytes,1,opt,name=msg" json:"msg,omitempty"` +} + +func (m *Response) Reset() { *m = Response{} } +func (m *Response) String() string { return proto.CompactTextString(m) } +func (*Response) ProtoMessage() {} +func (*Response) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +type StreamingRequest struct { + Count int64 `protobuf:"varint,1,opt,name=count" json:"count,omitempty"` +} + +func (m *StreamingRequest) Reset() { *m = StreamingRequest{} } +func (m *StreamingRequest) String() string { return proto.CompactTextString(m) } +func (*StreamingRequest) ProtoMessage() {} +func (*StreamingRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +type StreamingResponse struct { + Count int64 `protobuf:"varint,1,opt,name=count" json:"count,omitempty"` +} + +func (m *StreamingResponse) Reset() { *m = StreamingResponse{} } +func (m *StreamingResponse) String() string { return proto.CompactTextString(m) } +func (*StreamingResponse) ProtoMessage() {} +func (*StreamingResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + +type Ping struct { + Stroke int64 `protobuf:"varint,1,opt,name=stroke" json:"stroke,omitempty"` +} + +func (m *Ping) Reset() { *m = Ping{} } +func (m *Ping) String() string { return proto.CompactTextString(m) } +func (*Ping) ProtoMessage() {} +func (*Ping) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } + +type Pong struct { + Stroke int64 `protobuf:"varint,1,opt,name=stroke" json:"stroke,omitempty"` +} + +func (m *Pong) Reset() { *m = Pong{} } +func (m *Pong) String() string { return proto.CompactTextString(m) } +func (*Pong) ProtoMessage() {} +func (*Pong) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } + +func init() { + proto.RegisterType((*Message)(nil), "go.micro.srv.example.Message") + proto.RegisterType((*Request)(nil), "go.micro.srv.example.Request") + proto.RegisterType((*Response)(nil), "go.micro.srv.example.Response") + proto.RegisterType((*StreamingRequest)(nil), "go.micro.srv.example.StreamingRequest") + proto.RegisterType((*StreamingResponse)(nil), "go.micro.srv.example.StreamingResponse") + proto.RegisterType((*Ping)(nil), "go.micro.srv.example.Ping") + proto.RegisterType((*Pong)(nil), "go.micro.srv.example.Pong") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ client.Option +var _ server.Option + +// Client API for Example service + +type ExampleClient interface { + Call(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) + Stream(ctx context.Context, in *StreamingRequest, opts ...client.CallOption) (Example_StreamClient, error) + PingPong(ctx context.Context, opts ...client.CallOption) (Example_PingPongClient, error) +} + +type exampleClient struct { + c client.Client + serviceName string +} + +func NewExampleClient(serviceName string, c client.Client) ExampleClient { + if c == nil { + c = client.NewClient() + } + if len(serviceName) == 0 { + serviceName = "go.micro.srv.example" + } + return &exampleClient{ + c: c, + serviceName: serviceName, + } +} + +func (c *exampleClient) Call(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) { + req := c.c.NewRequest(c.serviceName, "Example.Call", in) + out := new(Response) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *exampleClient) Stream(ctx context.Context, in *StreamingRequest, opts ...client.CallOption) (Example_StreamClient, error) { + req := c.c.NewRequest(c.serviceName, "Example.Stream", &StreamingRequest{}) + stream, err := c.c.Stream(ctx, req, opts...) + if err != nil { + return nil, err + } + if err := stream.Send(in); err != nil { + return nil, err + } + return &exampleStreamClient{stream}, nil +} + +type Example_StreamClient interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Recv() (*StreamingResponse, error) +} + +type exampleStreamClient struct { + stream client.Streamer +} + +func (x *exampleStreamClient) Close() error { + return x.stream.Close() +} + +func (x *exampleStreamClient) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *exampleStreamClient) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *exampleStreamClient) Recv() (*StreamingResponse, error) { + m := new(StreamingResponse) + err := x.stream.Recv(m) + if err != nil { + return nil, err + } + return m, nil +} + +func (c *exampleClient) PingPong(ctx context.Context, opts ...client.CallOption) (Example_PingPongClient, error) { + req := c.c.NewRequest(c.serviceName, "Example.PingPong", &Ping{}) + stream, err := c.c.Stream(ctx, req, opts...) + if err != nil { + return nil, err + } + return &examplePingPongClient{stream}, nil +} + +type Example_PingPongClient interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*Ping) error + Recv() (*Pong, error) +} + +type examplePingPongClient struct { + stream client.Streamer +} + +func (x *examplePingPongClient) Close() error { + return x.stream.Close() +} + +func (x *examplePingPongClient) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *examplePingPongClient) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *examplePingPongClient) Send(m *Ping) error { + return x.stream.Send(m) +} + +func (x *examplePingPongClient) Recv() (*Pong, error) { + m := new(Pong) + err := x.stream.Recv(m) + if err != nil { + return nil, err + } + return m, nil +} + +// Server API for Example service + +type ExampleHandler interface { + Call(context.Context, *Request, *Response) error + Stream(context.Context, *StreamingRequest, Example_StreamStream) error + PingPong(context.Context, Example_PingPongStream) error +} + +func RegisterExampleHandler(s server.Server, hdlr ExampleHandler) { + s.Handle(s.NewHandler(&Example{hdlr})) +} + +type Example struct { + ExampleHandler +} + +func (h *Example) Call(ctx context.Context, in *Request, out *Response) error { + return h.ExampleHandler.Call(ctx, in, out) +} + +func (h *Example) Stream(ctx context.Context, stream server.Streamer) error { + m := new(StreamingRequest) + if err := stream.Recv(m); err != nil { + return err + } + return h.ExampleHandler.Stream(ctx, m, &exampleStreamStream{stream}) +} + +type Example_StreamStream interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*StreamingResponse) error +} + +type exampleStreamStream struct { + stream server.Streamer +} + +func (x *exampleStreamStream) Close() error { + return x.stream.Close() +} + +func (x *exampleStreamStream) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *exampleStreamStream) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *exampleStreamStream) Send(m *StreamingResponse) error { + return x.stream.Send(m) +} + +func (h *Example) PingPong(ctx context.Context, stream server.Streamer) error { + return h.ExampleHandler.PingPong(ctx, &examplePingPongStream{stream}) +} + +type Example_PingPongStream interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*Pong) error + Recv() (*Ping, error) +} + +type examplePingPongStream struct { + stream server.Streamer +} + +func (x *examplePingPongStream) Close() error { + return x.stream.Close() +} + +func (x *examplePingPongStream) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *examplePingPongStream) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *examplePingPongStream) Send(m *Pong) error { + return x.stream.Send(m) +} + +func (x *examplePingPongStream) Recv() (*Ping, error) { + m := new(Ping) + if err := x.stream.Recv(m); err != nil { + return nil, err + } + return m, nil +} + +var fileDescriptor0 = []byte{ + // 270 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x84, 0x91, 0x5f, 0x4b, 0xc3, 0x30, + 0x14, 0xc5, 0x17, 0x56, 0xdb, 0x79, 0xfd, 0x83, 0x06, 0x99, 0x52, 0x50, 0x34, 0x0f, 0xba, 0x17, + 0xd3, 0xa1, 0x7e, 0x03, 0x11, 0x7d, 0x11, 0x64, 0x3e, 0xfb, 0x10, 0xc7, 0x25, 0x0c, 0x9b, 0xa6, + 0xe6, 0x66, 0x43, 0x3f, 0xbb, 0x2f, 0x6e, 0x69, 0x3b, 0xc6, 0xec, 0xf0, 0x29, 0x70, 0x7e, 0xe7, + 0x5c, 0xce, 0x21, 0x70, 0xa7, 0xed, 0xb5, 0x99, 0x8c, 0x9d, 0xcd, 0xf0, 0x4b, 0x99, 0x32, 0x47, + 0xca, 0x08, 0xdd, 0x0c, 0x5d, 0x56, 0x3a, 0xeb, 0x97, 0x6a, 0xf3, 0xca, 0xa0, 0xf2, 0x23, 0x6d, + 0x65, 0x48, 0x49, 0x72, 0x33, 0x59, 0x33, 0xd1, 0x87, 0xe4, 0x19, 0x89, 0x94, 0x46, 0xbe, 0x03, + 0x5d, 0x52, 0xdf, 0x27, 0xec, 0x9c, 0x0d, 0xb6, 0xc5, 0x31, 0x24, 0x23, 0xfc, 0x9c, 0x22, 0x79, + 0xbe, 0x0b, 0x51, 0xa1, 0x0c, 0x2e, 0x41, 0x6f, 0x84, 0x54, 0xda, 0x82, 0x42, 0xc2, 0x90, 0xae, + 0xc1, 0x05, 0x1c, 0xbc, 0x7a, 0x87, 0xca, 0x4c, 0x0a, 0xdd, 0x44, 0xf7, 0x60, 0x6b, 0x6c, 0xa7, + 0x85, 0x0f, 0x96, 0xae, 0x10, 0x70, 0xb8, 0x62, 0xa9, 0x8f, 0xac, 0x79, 0xfa, 0x10, 0xbd, 0xcc, + 0x31, 0xdf, 0x87, 0x98, 0xbc, 0xb3, 0x1f, 0xb8, 0xa2, 0xdb, 0xbf, 0xfa, 0xcd, 0x0f, 0x83, 0xe4, + 0xa1, 0x1a, 0xc3, 0x1f, 0x21, 0xba, 0x57, 0x79, 0xce, 0x4f, 0x65, 0xdb, 0x56, 0x59, 0xb7, 0x4a, + 0xcf, 0x36, 0xe1, 0xaa, 0x91, 0xe8, 0xf0, 0x37, 0x88, 0xab, 0xa2, 0xfc, 0xb2, 0xdd, 0xbb, 0xbe, + 0x34, 0xbd, 0xfa, 0xd7, 0xd7, 0x1c, 0x1f, 0x32, 0xfe, 0x04, 0xbd, 0xc5, 0xc6, 0xb0, 0x27, 0x6d, + 0x0f, 0x2e, 0x78, 0xba, 0x89, 0xcd, 0x73, 0xa2, 0x33, 0x60, 0x43, 0xf6, 0x1e, 0x87, 0xbf, 0xbd, + 0xfd, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x53, 0xb5, 0xeb, 0x31, 0x13, 0x02, 0x00, 0x00, +} diff --git a/server/proto/example/example.proto b/server/proto/example/example.proto new file mode 100644 index 0000000..48c687e --- /dev/null +++ b/server/proto/example/example.proto @@ -0,0 +1,37 @@ +syntax = "proto3"; + +package go.micro.srv.example; + +service Example { + rpc Call(Request) returns (Response) {} + rpc Stream(StreamingRequest) returns (stream StreamingResponse) {} + rpc PingPong(stream Ping) returns (stream Pong) {} +} + +message Message { + string say = 1; +} + +message Request { + string name = 1; +} + +message Response { + string msg = 1; +} + +message StreamingRequest { + int64 count = 1; +} + +message StreamingResponse { + int64 count = 1; +} + +message Ping { + int64 stroke = 1; +} + +message Pong { + int64 stroke = 1; +} diff --git a/server/subscriber/subscriber.go b/server/subscriber/subscriber.go new file mode 100644 index 0000000..0bb3f7e --- /dev/null +++ b/server/subscriber/subscriber.go @@ -0,0 +1,20 @@ +package subscriber + +import ( + "log" + + example "github.com/micro/examples/server/proto/example" + "golang.org/x/net/context" +) + +type Example struct{} + +func (e *Example) Handle(ctx context.Context, msg *example.Message) error { + log.Print("Handler Received message: ", msg.Say) + return nil +} + +func Handler(ctx context.Context, msg *example.Message) error { + log.Print("Function Received message: ", msg.Say) + return nil +} diff --git a/server/wrapper/main.go b/server/wrapper/main.go new file mode 100644 index 0000000..dbdca75 --- /dev/null +++ b/server/wrapper/main.go @@ -0,0 +1,79 @@ +package main + +import ( + "log" + + "github.com/micro/examples/server/handler" + "github.com/micro/examples/server/subscriber" + "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/server" + "golang.org/x/net/context" +) + +func logWrapper(fn server.HandlerFunc) server.HandlerFunc { + return func(ctx context.Context, req server.Request, rsp interface{}) error { + log.Printf("[Log Wrapper] Before serving request method: %v", req.Method()) + err := fn(ctx, req, rsp) + log.Printf("[Log Wrapper] After serving request") + return err + } +} + +func logSubWrapper(fn server.SubscriberFunc) server.SubscriberFunc { + return func(ctx context.Context, req server.Publication) error { + log.Printf("[Log Sub Wrapper] Before serving publication topic: %v", req.Topic()) + err := fn(ctx, req) + log.Printf("[Log Sub Wrapper] After serving publication") + return err + } +} + +func main() { + // optionally setup command line usage + cmd.Init() + + md := server.DefaultOptions().Metadata + md["datacenter"] = "local" + + server.DefaultServer = server.NewServer( + server.WrapHandler(logWrapper), + server.WrapSubscriber(logSubWrapper), + server.Metadata(md), + ) + + // Initialise Server + server.Init( + server.Name("go.micro.srv.example"), + ) + + // Register Handlers + server.Handle( + server.NewHandler( + new(handler.Example), + ), + ) + + // Register Subscribers + if err := server.Subscribe( + server.NewSubscriber( + "topic.go.micro.srv.example", + new(subscriber.Example), + ), + ); err != nil { + log.Fatal(err) + } + + if err := server.Subscribe( + server.NewSubscriber( + "topic.go.micro.srv.example", + subscriber.Handler, + ), + ); err != nil { + log.Fatal(err) + } + + // Run server + if err := server.Run(); err != nil { + log.Fatal(err) + } +} diff --git a/service/README.md b/service/README.md new file mode 100644 index 0000000..2eaa090 --- /dev/null +++ b/service/README.md @@ -0,0 +1,43 @@ +# Service + +This is an example of creating a micro service. + +## Contents + +- main.go - is the main definition of the service, handler and client +- proto - contains the protobuf definition of the API +- wrapper - demonstrates the use of Client and Server Wrappers + +## Prereqs + +Micro services need a discovery system so they can find each other. Micro uses consul by default but +its easily swapped out with etcd, kubernetes, or various other systems. We'll run consul for convenience. + +Install consul +```shell +brew install consul +``` + +Alternative instructions - [https://www.consul.io/intro/getting-started/install.html](https://www.consul.io/intro/getting-started/install.html) + +Run Consul + +```shell +consul agent -dev -advertise=127.0.0.1 +``` + +## Run the example + +Run the service + +```shell +go run main.go +``` + +Run the client + +```shell +go run main.go --run_client +``` + +And that's all there is to it. diff --git a/service/main.go b/service/main.go new file mode 100644 index 0000000..2987e1b --- /dev/null +++ b/service/main.go @@ -0,0 +1,86 @@ +package main + +import ( + "fmt" + "os" + + "github.com/micro/cli" + proto "github.com/micro/examples/service/proto" + "github.com/micro/go-micro" + "golang.org/x/net/context" +) + +/* + +Example usage of top level service initialisation + +*/ + +type Greeter struct{} + +func (g *Greeter) Hello(ctx context.Context, req *proto.HelloRequest, rsp *proto.HelloResponse) error { + rsp.Greeting = "Hello " + req.Name + return nil +} + +// Setup and the client +func runClient(service micro.Service) { + // Create new greeter client + greeter := proto.NewGreeterClient("greeter", service.Client()) + + // Call the greeter + rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "John"}) + if err != nil { + fmt.Println(err) + return + } + + // Print response + fmt.Println(rsp.Greeting) +} + +func main() { + // Create a new service. Optionally include some options here. + service := micro.NewService( + micro.Name("greeter"), + micro.Version("latest"), + micro.Metadata(map[string]string{ + "type": "helloworld", + }), + + // Setup some flags. Specify --run_client to run the client + + // Add runtime flags + // We could do this below too + micro.Flags(cli.BoolFlag{ + Name: "run_client", + Usage: "Launch the client", + }), + ) + + // Init will parse the command line flags. Any flags set will + // override the above settings. Options defined here will + // override anything set on the command line. + service.Init( + // Add runtime action + // We could actually do this above + micro.Action(func(c *cli.Context) { + if c.Bool("run_client") { + runClient(service) + os.Exit(0) + } + }), + ) + + // By default we'll run the server unless the flags catch us + + // Setup the server + + // Register handler + proto.RegisterGreeterHandler(service.Server(), new(Greeter)) + + // Run the server + if err := service.Run(); err != nil { + fmt.Println(err) + } +} diff --git a/service/proto/greeter.pb.go b/service/proto/greeter.pb.go new file mode 100644 index 0000000..20fd7cf --- /dev/null +++ b/service/proto/greeter.pb.go @@ -0,0 +1,124 @@ +// Code generated by protoc-gen-go. +// source: go-micro/examples/service/proto/greeter.proto +// DO NOT EDIT! + +/* +Package greeter is a generated protocol buffer package. + +It is generated from these files: + go-micro/examples/service/proto/greeter.proto + +It has these top-level messages: + HelloRequest + HelloResponse +*/ +package greeter + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + client "github.com/micro/go-micro/client" + server "github.com/micro/go-micro/server" + context "golang.org/x/net/context" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +type HelloRequest struct { + Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` +} + +func (m *HelloRequest) Reset() { *m = HelloRequest{} } +func (m *HelloRequest) String() string { return proto.CompactTextString(m) } +func (*HelloRequest) ProtoMessage() {} +func (*HelloRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +type HelloResponse struct { + Greeting string `protobuf:"bytes,2,opt,name=greeting" json:"greeting,omitempty"` +} + +func (m *HelloResponse) Reset() { *m = HelloResponse{} } +func (m *HelloResponse) String() string { return proto.CompactTextString(m) } +func (*HelloResponse) ProtoMessage() {} +func (*HelloResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func init() { + proto.RegisterType((*HelloRequest)(nil), "HelloRequest") + proto.RegisterType((*HelloResponse)(nil), "HelloResponse") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ client.Option +var _ server.Option + +// Client API for Greeter service + +type GreeterClient interface { + Hello(ctx context.Context, in *HelloRequest, opts ...client.CallOption) (*HelloResponse, error) +} + +type greeterClient struct { + c client.Client + serviceName string +} + +func NewGreeterClient(serviceName string, c client.Client) GreeterClient { + if c == nil { + c = client.NewClient() + } + if len(serviceName) == 0 { + serviceName = "greeter" + } + return &greeterClient{ + c: c, + serviceName: serviceName, + } +} + +func (c *greeterClient) Hello(ctx context.Context, in *HelloRequest, opts ...client.CallOption) (*HelloResponse, error) { + req := c.c.NewRequest(c.serviceName, "Greeter.Hello", in) + out := new(HelloResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Greeter service + +type GreeterHandler interface { + Hello(context.Context, *HelloRequest, *HelloResponse) error +} + +func RegisterGreeterHandler(s server.Server, hdlr GreeterHandler) { + s.Handle(s.NewHandler(&Greeter{hdlr})) +} + +type Greeter struct { + GreeterHandler +} + +func (h *Greeter) Hello(ctx context.Context, in *HelloRequest, out *HelloResponse) error { + return h.GreeterHandler.Hello(ctx, in, out) +} + +var fileDescriptor0 = []byte{ + // 153 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xd2, 0x4d, 0xcf, 0xd7, 0xcd, + 0xcd, 0x4c, 0x2e, 0xca, 0xd7, 0x4f, 0xad, 0x48, 0xcc, 0x2d, 0xc8, 0x49, 0x2d, 0xd6, 0x2f, 0x4e, + 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0xd5, 0x2f, 0x28, 0xca, 0x2f, 0xc9, 0xd7, 0x4f, 0x2f, 0x4a, 0x4d, + 0x2d, 0x49, 0x2d, 0xd2, 0x03, 0xf3, 0x94, 0x64, 0xb8, 0x78, 0x3c, 0x52, 0x73, 0x72, 0xf2, 0x83, + 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x84, 0x78, 0xb8, 0x58, 0xf2, 0x12, 0x73, 0x53, 0x25, 0x18, + 0x15, 0x18, 0x35, 0x38, 0x95, 0x14, 0xb9, 0x78, 0xa1, 0xb2, 0xc5, 0x05, 0xf9, 0x79, 0xc5, 0xa9, + 0x42, 0x02, 0x5c, 0x1c, 0x60, 0xfd, 0x99, 0x79, 0xe9, 0x12, 0x4c, 0x20, 0x25, 0x46, 0xc6, 0x5c, + 0xec, 0xee, 0x10, 0x13, 0x85, 0x34, 0xb8, 0x58, 0xc1, 0xaa, 0x85, 0x78, 0xf5, 0x90, 0xcd, 0x94, + 0xe2, 0xd3, 0x43, 0x31, 0x44, 0x89, 0x21, 0x89, 0x0d, 0x6c, 0xb9, 0x31, 0x20, 0x00, 0x00, 0xff, + 0xff, 0x0f, 0xa9, 0x59, 0xb3, 0xad, 0x00, 0x00, 0x00, +} diff --git a/service/proto/greeter.proto b/service/proto/greeter.proto new file mode 100644 index 0000000..4ff347a --- /dev/null +++ b/service/proto/greeter.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +service Greeter { + rpc Hello(HelloRequest) returns (HelloResponse) {} +} + +message HelloRequest { + string name = 1; +} + +message HelloResponse { + string greeting = 2; +} diff --git a/service/wrapper/README.md b/service/wrapper/README.md new file mode 100644 index 0000000..049e722 --- /dev/null +++ b/service/wrapper/README.md @@ -0,0 +1,3 @@ +# Client/Server Wrapper + +This is an example using wrappers or middleware diff --git a/service/wrapper/main.go b/service/wrapper/main.go new file mode 100644 index 0000000..67627fe --- /dev/null +++ b/service/wrapper/main.go @@ -0,0 +1,116 @@ +package main + +import ( + "fmt" + "os" + + "github.com/micro/cli" + proto "github.com/micro/examples/service/proto" + "github.com/micro/go-micro" + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/metadata" + "github.com/micro/go-micro/server" + "golang.org/x/net/context" +) + +/* + +Example usage of top level service initialisation including wrappers + +*/ + +type logWrapper struct { + client.Client +} + +func (l *logWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { + md, _ := metadata.FromContext(ctx) + fmt.Printf("[Log Wrapper] ctx: %v service: %s method: %s\n", md, req.Service(), req.Method()) + return l.Client.Call(ctx, req, rsp) +} + +// Implements client.Wrapper as logWrapper +func logWrap(c client.Client) client.Client { + return &logWrapper{c} +} + +// Implements the server.HandlerWrapper +func logHandlerWrapper(fn server.HandlerFunc) server.HandlerFunc { + return func(ctx context.Context, req server.Request, rsp interface{}) error { + fmt.Printf("[Log Wrapper] Before serving request method: %v\n", req.Method()) + err := fn(ctx, req, rsp) + fmt.Println("[Log Wrapper] After serving request") + return err + } +} + +type Greeter struct{} + +func (g *Greeter) Hello(ctx context.Context, req *proto.HelloRequest, rsp *proto.HelloResponse) error { + rsp.Greeting = "Hello " + req.Name + return nil +} + +// Setup and the client +func runClient(service micro.Service) { + // Create new greeter client + greeter := proto.NewGreeterClient("greeter", service.Client()) + + // Call the greeter + rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "John"}) + if err != nil { + fmt.Println(err) + return + } + + // Print response + fmt.Println(rsp.Greeting) +} + +func main() { + // Create a new service. Optionally include some options here. + service := micro.NewService( + micro.Client(client.NewClient(client.Wrap(logWrap))), + micro.Server(server.NewServer(server.WrapHandler(logHandlerWrapper))), + micro.Name("greeter"), + micro.Version("latest"), + micro.Metadata(map[string]string{ + "type": "helloworld", + }), + + // Setup some flags. Specify --run_client to run the client + + // Add runtime flags + // We could do this below too + micro.Flags(cli.BoolFlag{ + Name: "run_client", + Usage: "Launch the client", + }), + ) + + // Init will parse the command line flags. Any flags set will + // override the above settings. Options defined here will + // override anything set on the command line. + service.Init( + // Add runtime action + // We could actually do this above + micro.Action(func(c *cli.Context) { + if c.Bool("run_client") { + runClient(service) + os.Exit(0) + } + }), + ) + + // By default we'll run the server unless the flags catch us + + // Setup the server + + // Register handler + proto.RegisterGreeterHandler(service.Server(), new(Greeter)) + + // Run the server + if err := service.Run(); err != nil { + fmt.Println(err) + } +}