Skip to content

Commit

Permalink
ARCO-182: send callbacks to the same url one by one (#563)
Browse files Browse the repository at this point in the history
* implement callbacks dispatcher
* make CallbackSender.Send() function blocking
* update README and CHANGELOG
* bump moq => 0.4.0
  • Loading branch information
arkadiuszos4chain authored Aug 27, 2024
1 parent c3df49e commit 47d739c
Show file tree
Hide file tree
Showing 18 changed files with 554 additions and 111 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ All notable changes to this project will be documented in this file. The format

## [Unreleased]

### Changed
- Callbacks are sent one by one to the same URL. In the previous implementation, each callback request created a new goroutine to send the callback, which could result in a potential DDoS of the callback receiver. The new approach sends callbacks to the same receiver in a serial manner. Note that URLs are not locked by the `callbacker` instance, so serial sends occur only within a single instance. In other words, the level of parallelism is determined by the number of `callbacker` instances.

## [1.3.0] - 2024-08-21

### Changed
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ install_gen:
go install google.golang.org/protobuf/cmd/[email protected]
go install google.golang.org/grpc/cmd/[email protected]
go install github.com/oapi-codegen/oapi-codegen/v2/cmd/[email protected]
go install github.com/matryer/moq@v0.3.4
go install github.com/matryer/moq@v0.4.0

.PHONY: docs
docs:
Expand Down
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ ARC is a transaction processor for Bitcoin that keeps track of the life cycle of
- [ZMQ](#zmq)
- [BlockTx](#blocktx)
- [BlockTx stores](#blocktx-stores)
- [Callbacker](#callbacker)
- [Message Queue](#message-queue)
- [K8s-Watcher](#k8s-watcher)
- [Broadcaster-cli](#broadcaster-cli)
Expand Down Expand Up @@ -92,6 +93,9 @@ where options are:
-k8s-watcher=<true|false>
whether to start k8s-watcher (default=true)
-callbacker=<true|false>
whether to start callbacker (default=true)
-config=/location
directory to look for config.yaml (default='')
Expand Down Expand Up @@ -248,6 +252,17 @@ Metamorph publishes new transactions to the message queue and BlockTx subscribes

![Message Queue](./doc/message_queue.png)

### Callbacker

Callbacker is a microservice that sends callbacks to a specified URL.

Callbacker is designed to be horizontally scalable, with each instance operating independently. As a result, they do not communicate with each other and remain unaware of each other's existence.

You can run callbacker like this:

```shell
go run main.go -callbacker=true
```

## K8s-Watcher

Expand Down
26 changes: 16 additions & 10 deletions cmd/arc/services/callbacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,40 @@ import (
"google.golang.org/grpc/reflection"
)

func StartCallbacker(logger *slog.Logger, config *config.ArcConfig) (func(), error) {
func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(), error) {
logger = logger.With(slog.String("service", "callbacker"))
logger.Info("Starting")

callbackSrv, err := callbacker.NewSender(&http.Client{Timeout: 5 * time.Second}, logger)
config := appConfig.Callbacker

callbackSender, err := callbacker.NewSender(&http.Client{Timeout: 5 * time.Second}, logger)
if err != nil {
return nil, fmt.Errorf("callbacker failed: %v", err)
}

srvOpts := []callbacker.ServerOption{
callbacker.WithLogger(logger.With(slog.String("module", "callbacker-server"))),
}
server := callbacker.NewServer(callbackSrv, srvOpts...)
err = server.Serve(config.Callbacker.ListenAddr, config.GrpcMessageSize, config.PrometheusEndpoint)
callbackDispatcher := callbacker.NewCallbackDispatcher(callbackSender, config.Pause)

server := callbacker.NewServer(callbackDispatcher, callbacker.WithLogger(logger.With(slog.String("module", "server"))))
err = server.Serve(config.ListenAddr, appConfig.GrpcMessageSize, appConfig.PrometheusEndpoint)
if err != nil {
return nil, fmt.Errorf("GRPCServer failed: %v", err)
}

healthServer, err := StartHealthServerCallbacker(server, config.Callbacker.Health, logger)
healthServer, err := StartHealthServerCallbacker(server, config.Health, logger)
if err != nil {
return nil, fmt.Errorf("failed to start health server: %v", err)
}

stopFn := func() {
logger.Info("Shutting down callbacker")

server.Shutdown()
callbackSrv.GracefulStop()
// dispose of dependencies in the correct order:
// 1. server - ensure no new callbacks will be received
// 2. dispatcher - ensure all already accepted callbacks are proccessed
// 3. sender - finally, stop the sender as there are no callbacks left to send.
server.GracefulStop()
callbackDispatcher.GracefulStop()
callbackSender.GracefulStop()

healthServer.Stop()

Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,5 @@ type CallbackerConfig struct {
ListenAddr string `mapstructure:"listenAddr"`
DialAddr string `mapstructure:"dialAddr"`
Health *HealthConfig `mapstructure:"health"`
Pause time.Duration `mapstructure:"pause"`
}
1 change: 1 addition & 0 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,5 +170,6 @@ func getCallbackerConfig() *CallbackerConfig {
Health: &HealthConfig{
SeverDialAddr: "localhost:8025",
},
Pause: 0,
}
}
3 changes: 2 additions & 1 deletion config/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,5 @@ callbacker:
listenAddr: localhost:8021 # address space for callbacker to listen on. Can be for example localhost:8021 or :8021 for listening on all addresses
dialAddr: localhost:8021 # address for other services to dial callbacker service
health:
serverDialAddr: localhost:8025 # address at which the grpc health server is exposed
serverDialAddr: localhost:8025 # address at which the grpc health server is exposed
pause: 0s # pause between sending next callback to the same receiver
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,16 @@ require (
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/oauth2 v0.22.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.23.0 // indirect
golang.org/x/term v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/term v0.23.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/time v0.6.0 // indirect
golang.org/x/tools v0.24.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240805194559-2c9e96a0b5d4 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240805194559-2c9e96a0b5d4 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
Expand All @@ -402,8 +402,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/oauth2 v0.22.0 h1:BzDx2FehcG7jJwgWLELCdmLuxk2i+x9UDpSiss2u0ZA=
golang.org/x/oauth2 v0.22.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -425,22 +425,22 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk=
golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4=
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg=
golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI=
golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24=
golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
123 changes: 123 additions & 0 deletions internal/callbacker/callbacker_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions internal/callbacker/callbacker_mocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package callbacker

// from callbacker.go
//go:generate moq -out ./callbacker_mock.go ./ CallbackerI
Loading

0 comments on commit 47d739c

Please sign in to comment.