Skip to content

Commit

Permalink
feat: sub's client option
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangliang committed Apr 6, 2022
1 parent 405e8c7 commit 4f7ce95
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 38 deletions.
2 changes: 1 addition & 1 deletion examples/hello/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func buildTeam(tick agent.Ticker) (behavior3go.Status, error) {
}

func NewSubscription() core.IBaseNode {
subscription := agent.NewGqlSubscription("", agent.WithLog(nil))
subscription := agent.NewGraphqlSubscription("", agent.WithLog(nil))
return subscription
}

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/LilithGames/agent-go

go 1.17
go 1.18

require (
github.com/AsynkronIT/protoactor-go v0.0.0-20211018041209-5fdd594ca443
Expand Down Expand Up @@ -64,5 +64,5 @@ require (

replace (
github.com/hasura/go-graphql-client => github.com/LilithGames/go-graphql-client v1.0.4
github.com/magicsea/behavior3go => github.com/LilithGames/behavior3go v1.0.3
github.com/magicsea/behavior3go => github.com/LilithGames/behavior3go v1.0.4
)
7 changes: 2 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ github.com/AsynkronIT/protoactor-go v0.0.0-20211018041209-5fdd594ca443/go.mod h1
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/LilithGames/behavior3go v1.0.3 h1:uyCJvrViEwlJQQy+9KMk43p43cbRfxA1y0uuo+zeg+4=
github.com/LilithGames/behavior3go v1.0.3/go.mod h1:bqTRjmUHkU8u5UmTRzPJWkZeGRj6AJaYBlXksX5lpBc=
github.com/LilithGames/behavior3go v1.0.4 h1:f4xZa2AMlebgOZ1C/fgXWEbtyXtfCLb6GN3HPn0GOco=
github.com/LilithGames/behavior3go v1.0.4/go.mod h1:bqTRjmUHkU8u5UmTRzPJWkZeGRj6AJaYBlXksX5lpBc=
github.com/LilithGames/go-graphql-client v1.0.4 h1:AUwNRlIBIdn4DW1dvqJgkM+mgkB1Vba6u1/TTXAW08c=
github.com/LilithGames/go-graphql-client v1.0.4/go.mod h1:78viDEV6ew82QUorDE/bxDsdtG2QM4uqdsn5L3ROcww=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
Expand All @@ -74,7 +74,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down Expand Up @@ -225,7 +224,6 @@ github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pf
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/graph-gophers/graphql-go v0.0.0-20201112095111-7a585a01e04c/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/consul/api v1.10.1/go.mod h1:XjsvQN+RJGWI2TWy1/kqaE16HrR2J/FWgkYjdZQsX9M=
Expand Down Expand Up @@ -325,7 +323,6 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/orcaman/concurrent-map v0.0.0-20190107190726-7ed82d9cb717 h1:2v7IYkog9ZFN04bv5hkwjpyHkc6wujPPOVYDPp2rfwA=
github.com/orcaman/concurrent-map v0.0.0-20190107190726-7ed82d9cb717/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
Expand Down
14 changes: 7 additions & 7 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package agent
import (
"context"
"net/http"
_ "net/http/pprof"
"os"
"strconv"
"strings"
_ "net/http/pprof"

"github.com/LilithGames/agent-go/tools/metric"
"github.com/spf13/viper"
Expand All @@ -30,7 +30,7 @@ type Agent struct {
endpoint string
view *ViewOpt
alert Alert
stream *proxyStream
stream *proxyStream
}

func NewAgent(engine *Engine, cfg *viper.Viper, opts ...Option) *Agent {
Expand All @@ -52,11 +52,11 @@ func NewAgent(engine *Engine, cfg *viper.Viper, opts ...Option) *Agent {
endpoint := cfg.GetString(masterAddr)
ctx, cancel := context.WithCancel(context.Background())
at := &Agent{
id:id,
ctx: ctx,
cancel: cancel,
engine: engine,
cfg: cfg,
id: id,
ctx: ctx,
cancel: cancel,
engine: engine,
cfg: cfg,
endpoint: endpoint,
}
for _, opt := range opts {
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ func pushData(outcomes []*transfer.Outcome) {
func pushError(errors map[string]*transfer.ErrorMark, err string) {
last := strings.LastIndex(err, "#")
first := strings.Index(err, "#")
var depiction, trace string
var depiction, trace string
if first != last && first != -1 && last != -1 {
trace = err[first+1:last]
trace = err[first+1 : last]
depiction = err[:first]
} else {
depiction, trace = err, ""
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Option func(agent *Agent)
func WithAlert(alert Alert) Option {
return func(agent *Agent) {
agent.alert = alert
}
}
}

func WithConfig(cfg *viper.Viper) Option {
Expand All @@ -74,4 +74,4 @@ func WithViewer(views ...*ViewOpt) Option {
return func(agent *Agent) {
agent.view = view
}
}
}
4 changes: 2 additions & 2 deletions pkg/agent/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ type proxyStream struct {

func newProxyFromAgent(agent *Agent, clients ...transfer.Courier_DeliverMailClient) *proxyStream {
proxy := &proxyStream{
id: agent.id,
id: agent.id,
viewOpt: agent.view,
ctx: agent.ctx,
ctx: agent.ctx,
}
if len(clients) == 1 {
proxy.client = clients[0]
Expand Down
45 changes: 29 additions & 16 deletions pkg/agent/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,48 +17,61 @@ import (
"go.uber.org/zap"
)

type ClientOption func(client *graphql.SubscriptionClient)
type NewClientOption func(client *graphql.SubscriptionClient, tick core.Ticker)

type MessageCallback func(tick Ticker, message *json.RawMessage, err error) error

type MessageHandler func(message *json.RawMessage, err error) error

func WithLog(log func(args ...interface{})) ClientOption {
return func(client *graphql.SubscriptionClient) {
func WithLog(log func(args ...interface{})) NewClientOption {
return func(client *graphql.SubscriptionClient, _ core.Ticker) {
client.WithLog(log)
}
}

func WithOnErr(onError func(sc *graphql.SubscriptionClient, err error) error) ClientOption {
return func(client *graphql.SubscriptionClient) {
func WithOnErr(onError func(sc *graphql.SubscriptionClient, err error) error) NewClientOption {
return func(client *graphql.SubscriptionClient, _ core.Ticker) {
client.OnError(onError)
}
}

func WithOnConnected(fn func()) ClientOption {
return func(client *graphql.SubscriptionClient) {
func WithOnConnected(fn func()) NewClientOption {
return func(client *graphql.SubscriptionClient, _ core.Ticker) {
client.OnConnected(fn)
}
}

func WithOnDisconnected(fn func()) ClientOption {
return func(client *graphql.SubscriptionClient) {
func WithOnDisconnected(fn func()) NewClientOption {
return func(client *graphql.SubscriptionClient, _ core.Ticker) {
client.OnDisconnected(fn)
}
}

func WithoutLogTypes(types ...graphql.OperationMessageType) ClientOption {
return func(client *graphql.SubscriptionClient) {
func WithoutLogTypes(types ...graphql.OperationMessageType) NewClientOption {
return func(client *graphql.SubscriptionClient, _ core.Ticker) {
client.WithoutLogTypes(types...)
}
}

func WithTimeout(timeout time.Duration) ClientOption {
return func(client *graphql.SubscriptionClient) {
func WithTimeout(timeout time.Duration) NewClientOption {
return func(client *graphql.SubscriptionClient, _ core.Ticker) {
client.WithTimeout(timeout)
}
}

func WithNothing() NewClientOption {
return func(_ *graphql.SubscriptionClient, _ core.Ticker) {}
}

type InitFunction func(tick core.Ticker) NewClientOption

func WithInitFunc(init InitFunction) NewClientOption {
return func(client *graphql.SubscriptionClient, tick core.Ticker) {
option := init(tick)
option(client, tick)
}
}

type GqlSubscription struct {
composites.Subscription
token string
Expand All @@ -71,9 +84,9 @@ func (g *GqlSubscription) OnOpen(tick core.Ticker) {
}
}

func NewGqlSubscription(backend string, options ...ClientOption) *GqlSubscription {
func NewGraphqlSubscription(backend string, options ...NewClientOption) *GqlSubscription {
subscription := &GqlSubscription{}
subscription.ClientCreator = func() composites.SubClient {
subscription.ClientCreator = func(tick core.Ticker) composites.SubClient {
client := graphql.NewSubscriptionClient(backend)
client.WithWebSocket(newWebsocketConn)
if subscription.token != "" {
Expand All @@ -82,7 +95,7 @@ func NewGqlSubscription(backend string, options ...ClientOption) *GqlSubscriptio
})
}
for _, option := range options {
option(client)
option(client, tick)
}
return client
}
Expand Down
2 changes: 1 addition & 1 deletion tools/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func MetricsExport() *prometheus.Exporter {
aggregation.CumulativeTemporalitySelector(),
processor.WithMemory(true),
),
controller.WithCollectPeriod(time.Second * 5),
controller.WithCollectPeriod(time.Second*5),
)
if err := runtime.Start(runtime.WithMinimumReadMemStatsInterval(time.Second * 5)); err != nil {
log.Panic("create runtime metric error", err)
Expand Down

0 comments on commit 4f7ce95

Please sign in to comment.