Skip to content

Commit

Permalink
feat: add jetstream publisher, reuse nats connection
Browse files Browse the repository at this point in the history
* closes #31
  • Loading branch information
kamikazechaser committed Oct 16, 2024
1 parent b1fd228 commit 7b159bb
Show file tree
Hide file tree
Showing 14 changed files with 321 additions and 121 deletions.
37 changes: 25 additions & 12 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (

"github.com/grassrootseconomics/eth-custodial/internal/api"
"github.com/grassrootseconomics/eth-custodial/internal/gas"
"github.com/grassrootseconomics/eth-custodial/internal/jetstream"
"github.com/grassrootseconomics/eth-custodial/internal/pub"
"github.com/grassrootseconomics/eth-custodial/internal/store"
"github.com/grassrootseconomics/eth-custodial/internal/sub"
"github.com/grassrootseconomics/eth-custodial/internal/util"
Expand Down Expand Up @@ -78,13 +80,28 @@ func main() {
os.Exit(1)
}

js, err := jetstream.NewJetStream(jetstream.JetStreamOpts{
Logg: lo,
Endpoint: ko.MustString("jetstream.endpoint"),
JetStreamID: ko.MustString("jetstream.id"),
PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour,
})
if err != nil {
lo.Error("could not initialize jetstream sub", "error", err)
os.Exit(1)
}
pub := pub.NewPub(pub.PubOpts{
JSCtx: js.JSCtx,
})

workerOpts := worker.WorkerOpts{
MaxWorkers: ko.Int("workers.max"),
ChainProvider: chainProvider,
CustodialRegistrationProxy: ko.MustString("chain.custodial_registration_proxy"),
MaxWorkers: ko.Int("workers.max"),
GasOracle: gasOracle,
Store: store,
Logg: lo,
Pub: pub,
ChainProvider: chainProvider,
}
if ko.Int("workers.max") <= 0 {
workerOpts.MaxWorkers = runtime.NumCPU() * 2
Expand All @@ -95,17 +112,13 @@ func main() {
os.Exit(1)
}

jetStreamSub, err := sub.NewJetStreamSub(sub.JetStreamOpts{
Logg: lo,
sub := sub.NewSub(sub.SubObts{
Store: store,
Pub: pub,
JSSub: js.JSSub,
Logg: lo,
WorkerContainer: workerContainer,
Endpoint: ko.MustString("jetstream.endpoint"),
JetStreamID: ko.MustString("jetstream.id"),
})
if err != nil {
lo.Error("could not initialize jetstream sub", "error", err)
os.Exit(1)
}

apiServer := api.New(api.APIOpts{
APIKey: ko.MustString("api.key"),
Expand Down Expand Up @@ -137,7 +150,7 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
jetStreamSub.Process()
sub.Process(ctx)
}()

<-ctx.Done()
Expand All @@ -147,7 +160,7 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
jetStreamSub.Close()
js.Close()
if err := apiServer.Stop(shutdownCtx); err != nil {
lo.Error("failed to stop HTTP server", "err", fmt.Sprintf("%T", err))
}
Expand Down
1 change: 1 addition & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ custodial_registration_proxy = "0xf282a3C68A2505a79Fc99f94CE43D9c83230CaE5"
[jetstream]
endpoint = "nats://127.0.0.1:4222"
id = "eth-custodial-1"
persist_duration_hrs = 48
85 changes: 85 additions & 0 deletions internal/jetstream/jetstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package jetstream

import (
"errors"
"log/slog"
"time"

"github.com/nats-io/nats.go"
)

type (
JetStreamOpts struct {
Logg *slog.Logger
JetStreamID string
Endpoint string
PersistDuration time.Duration
}

JetStream struct {
JSCtx nats.JetStreamContext
JSSub *nats.Subscription

natsConn *nats.Conn
logg *slog.Logger
}
)

const (
PushStream = "CUSTODIAL"

pullStream = "TRACKER"
pullSubject = "TRACKER.*"
)

var pushStreamSubjects = []string{
"CUSTODIAL.*",
}

func NewJetStream(o JetStreamOpts) (*JetStream, error) {
natsConn, err := nats.Connect(o.Endpoint)
if err != nil {
return nil, err
}

js, err := natsConn.JetStream()
if err != nil {
return nil, err
}

stream, err := js.StreamInfo(PushStream)
if err != nil && !errors.Is(err, nats.ErrStreamNotFound) {
return nil, err
}
if stream == nil {
_, err := js.AddStream(&nats.StreamConfig{
Name: PushStream,
MaxAge: o.PersistDuration,
Storage: nats.FileStorage,
Subjects: pushStreamSubjects,
Duplicates: time.Minute,
})
if err != nil {
return nil, err
}
o.Logg.Info("successfully created NATS JetStream stream", "stream_name", PushStream)
}

sub, err := js.PullSubscribe(pullSubject, o.JetStreamID, nats.AckExplicit())
if err != nil {
return nil, err
}

return &JetStream{
JSCtx: js,
JSSub: sub,
natsConn: natsConn,
logg: o.Logg,
}, nil
}

func (s *JetStream) Close() {
if s.natsConn != nil {
s.natsConn.Close()
}
}
43 changes: 43 additions & 0 deletions internal/pub/pub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package pub

import (
"context"
"fmt"

"github.com/grassrootseconomics/eth-custodial/internal/jetstream"
"github.com/grassrootseconomics/eth-custodial/pkg/event"
"github.com/nats-io/nats.go"
)

type (
PubOpts struct {
JSCtx nats.JetStreamContext
}

Pub struct {
jsCtx nats.JetStreamContext
}
)

func NewPub(o PubOpts) *Pub {
return &Pub{
jsCtx: o.JSCtx,
}
}

func (p *Pub) Send(_ context.Context, payload event.Event) error {
data, err := payload.Serialize()
if err != nil {
return err
}

_, err = p.jsCtx.Publish(
fmt.Sprintf("%s.%s", jetstream.PushStream, payload.TrackingID),
data,
)
if err != nil {
return err
}

return nil
}
7 changes: 6 additions & 1 deletion internal/sub/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"errors"

"github.com/grassrootseconomics/eth-custodial/internal/store"
custodialEvent "github.com/grassrootseconomics/eth-custodial/pkg/event"
"github.com/grassrootseconomics/eth-tracker/pkg/event"
"github.com/jackc/pgx/v5"
)

func (s *JetStreamSub) processEvent(ctx context.Context, msgSubject string, msg []byte) error {
func (s *Sub) processEvent(ctx context.Context, msgSubject string, msg []byte) error {
s.logg.Debug("sub processing event", "subject", msgSubject, "data", string(msg))
var chainEvent event.Event

Expand Down Expand Up @@ -55,6 +56,10 @@ func (s *JetStreamSub) processEvent(ctx context.Context, msgSubject string, msg
if err := s.store.UpdateDispatchTxStatus(ctx, tx, updateDispatchStatus); err != nil {
return err
}
s.pub.Send(ctx, custodialEvent.Event{
TrackingID: otx.TrackingID,
Status: updateDispatchStatus.Status,
})

return tx.Commit(ctx)
}
105 changes: 0 additions & 105 deletions internal/sub/jetstream.go

This file was deleted.

Loading

0 comments on commit 7b159bb

Please sign in to comment.