Skip to content

Commit

Permalink
Nicer logging
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Aug 13, 2024
1 parent d57140d commit c1e9d64
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 75 deletions.
7 changes: 3 additions & 4 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func RootCmd() *cobra.Command {
rootCmd := &cobra.Command{
Use: "omq",
PersistentPreRun: func(cmd *cobra.Command, args []string) {
log.Setup()
if cfg.Size < 12 {
_, _ = fmt.Fprintf(os.Stderr, "ERROR: size can't be less than 12 bytes\n")
os.Exit(1)
Expand Down Expand Up @@ -180,8 +181,6 @@ func RootCmd() *cobra.Command {
}
metricsServer := metrics.GetMetricsServer()
metricsServer.Start()
log.Setup()

},
PersistentPostRun: func(cmd *cobra.Command, args []string) {
},
Expand Down Expand Up @@ -311,11 +310,11 @@ func start(cfg config.Config, publisherProto common.Protocol, consumerProto comm

func setUris(cfg *config.Config, command string) {
if cfg.PublisherUri == nil {
println("setting publisher uri to ", defaultUri(strings.Split(command, "-")[0]))
log.Debug("setting default publisher uri", "uri", defaultUri(strings.Split(command, "-")[0]))
(*cfg).PublisherUri = []string{defaultUri(strings.Split(command, "-")[0])}
}
if cfg.ConsumerUri == nil {
println("setting consumer uri to ", defaultUri(strings.Split(command, "-")[1]))
log.Debug("setting default consumer uri", "uri", defaultUri(strings.Split(command, "-")[1]))
(*cfg).ConsumerUri = []string{defaultUri(strings.Split(command, "-")[1])}
}
}
Expand Down
14 changes: 12 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.22.0

require (
github.com/Azure/go-amqp v1.0.5
github.com/charmbracelet/log v0.4.0
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/prometheus/client_golang v1.19.1
github.com/relvacode/iso8601 v1.4.0
Expand All @@ -13,20 +14,29 @@ require (
)

require (
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/charmbracelet/lipgloss v0.12.1 // indirect
github.com/charmbracelet/x/ansi v0.1.4 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/muesli/termenv v0.15.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.23.0 // indirect
golang.org/x/sys v0.24.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
34 changes: 28 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU=
github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k=
github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/charmbracelet/lipgloss v0.12.1 h1:/gmzszl+pedQpjCOH+wFkZr/N90Snz40J/NR7A0zQcs=
github.com/charmbracelet/lipgloss v0.12.1/go.mod h1:V2CiwIuhx9S1S1ZlADfOj9HmxeMAORuz5izHb0zGbB8=
github.com/charmbracelet/log v0.4.0 h1:G9bQAcx8rWA2T3pWvx7YtPTPwgqpk7D68BX21IRW8ZM=
github.com/charmbracelet/log v0.4.0/go.mod h1:63bXt/djrizTec0l11H20t8FDSvA4CRZJ1KH22MdptM=
github.com/charmbracelet/x/ansi v0.1.4 h1:IEU3D6+dWwPSgZ6HBH+v6oUuZ/nVawMiWj5831KfiLM=
github.com/charmbracelet/x/ansi v0.1.4/go.mod h1:dk73KoMTT5AX5BsX0KrqhsTqAnhZZoCBjs7dGWp4Ktw=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-stomp/stomp/v3 v3.1.2 h1:kmrNek021BsFUO8rxDhbkOYslRomKO/JIrUCIqyL0r8=
Expand All @@ -33,6 +43,14 @@ github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3x
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/muesli/termenv v0.15.2 h1:GohcuySI0QmI3wN8Ok9PtKGkgkFIk7y6Vpb5PvrY+Wo=
github.com/muesli/termenv v0.15.2/go.mod h1:Epx+iuz8sNs7mNKhxzH4fWXGNpZwUaJKRS1noLXviQ8=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4=
Expand All @@ -51,6 +69,9 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/relvacode/iso8601 v1.4.0 h1:GsInVSEJfkYuirYFxa80nMLbH2aydgZpIf52gYZXUJs=
github.com/relvacode/iso8601 v1.4.0/go.mod h1:FlNp+jz+TXpyRqgmM7tnzHHzBnz776kmAH2h3sZCn0I=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand All @@ -69,8 +90,8 @@ github.com/thediveo/success v1.0.1/go.mod h1:AZ8oUArgbIsCuDEWrzWNQHdKnPbDOLQsWOF
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa h1:ELnwvuAXPNtPk1TJRuGkI9fDTwym6AYBu0qzT8AcHdI=
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand All @@ -86,8 +107,9 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand All @@ -96,8 +118,8 @@ golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg=
golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI=
golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24=
golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
26 changes: 13 additions & 13 deletions pkg/amqp10_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,18 @@ func (c *Amqp10Consumer) Connect(ctx context.Context) {
},
})
if err != nil {
log.Error("consumer failed to connect", "protocol", "amqp-1.0", "consumerId", c.Id, "error", err.Error())
log.Error("consumer failed to connect", "consumerId", c.Id, "error", err.Error())
time.Sleep(1 * time.Second)
} else {
log.Debug("consumer connected", "protocol", "amqp-1.0", "consumerId", c.Id, "uri", uri)
log.Debug("consumer connected", "consumerId", c.Id, "uri", uri)
c.Connection = conn
}
}

for c.Session == nil {
session, err := c.Connection.NewSession(context.TODO(), nil)
if err != nil {
log.Error("consumer failed to create a session", "protocol", "amqp-1.0", "consumerId", c.Id, "error", err.Error())
log.Error("consumer failed to create a session", "consumerId", c.Id, "error", err.Error())
time.Sleep(1 * time.Second)
} else {
c.Session = session
Expand All @@ -112,7 +112,7 @@ func (c *Amqp10Consumer) CreateReceiver(ctx context.Context) {
for c.Receiver == nil {
receiver, err := c.Session.NewReceiver(ctx, c.Topic, &amqp.ReceiverOptions{SourceDurability: durability, Credit: int32(c.Config.ConsumerCredits), Properties: buildLinkProperties(c.Config), Filters: buildLinkFilters(c.Config)})
if err != nil {
log.Error("consumer failed to create a receiver", "protocol", "amqp-1.0", "consumerId", c.Id, "error", err.Error())
log.Error("consumer failed to create a receiver", "consumerId", c.Id, "error", err.Error())
time.Sleep(1 * time.Second)
} else {
c.Receiver = receiver
Expand All @@ -125,13 +125,13 @@ func (c *Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) {

c.CreateReceiver(ctx)
close(subscribed)
log.Info("consumer started", "protocol", "amqp-1.0", "consumerId", c.Id, "terminus", c.Topic)
log.Info("consumer started", "consumerId", c.Id, "terminus", c.Topic)
previousMessageTimeSent := time.Unix(0, 0)

for i := 1; i <= c.Config.ConsumeCount; {
if c.Receiver == nil {
c.CreateReceiver(ctx)
log.Debug("consumer subscribed", "protocol", "amqp-1.0", "consumerId", c.Id, "terminus", c.Topic)
log.Debug("consumer subscribed", "consumerId", c.Id, "terminus", c.Topic)
}

select {
Expand All @@ -141,7 +141,7 @@ func (c *Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) {
default:
msg, err := c.Receiver.Receive(ctx, nil)
if err != nil {
log.Error("failed to receive a message", "protocol", "amqp-1.0", "consumerId", c.Id, "terminus", c.Topic)
log.Error("failed to receive a message", "consumerId", c.Id, "terminus", c.Topic)
c.Connect(ctx)
continue
}
Expand All @@ -157,30 +157,30 @@ func (c *Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) {
}
previousMessageTimeSent = timeSent

log.Debug("message received", "protocol", "amqp-1.0", "consumerId", c.Id, "terminus", c.Topic, "size", len(payload), "priority", priority, "latency", latency)
log.Debug("message received", "consumerId", c.Id, "terminus", c.Topic, "size", len(payload), "priority", priority, "latency", latency)

if c.Config.ConsumerLatency > 0 {
log.Debug("consumer latency", "protocol", "amqp-1.0", "consumerId", c.Id, "latency", c.Config.ConsumerLatency)
log.Debug("consumer latency", "consumerId", c.Id, "latency", c.Config.ConsumerLatency)
time.Sleep(c.Config.ConsumerLatency)
}

err = c.Receiver.AcceptMessage(ctx, msg)
if err != nil {
log.Error("message NOT accepted", "protocol", "amqp-1.0", "consumerId", c.Id, "terminus", c.Topic)
log.Error("message NOT accepted", "consumerId", c.Id, "terminus", c.Topic)
} else {
metrics.MessagesConsumed.With(prometheus.Labels{"protocol": "amqp-1.0", "priority": priority}).Inc()
i++
log.Debug("message accepted", "protocol", "amqp-1.0", "consumerId", c.Id, "terminus", c.Topic)
log.Debug("message accepted", "consumerId", c.Id, "terminus", c.Topic)
}
}
}

c.Stop("message count reached")
log.Debug("consumer finished", "protocol", "amqp-1.0", "consumerId", c.Id)
log.Debug("consumer finished", "consumerId", c.Id)
}

func (c *Amqp10Consumer) Stop(reason string) {
log.Debug("closing connection", "protocol", "amqp-1.0", "consumerId", c.Id, "reason", reason)
log.Debug("closing connection", "consumerId", c.Id, "reason", reason)
_ = c.Connection.Close()
}

Expand Down
22 changes: 11 additions & 11 deletions pkg/amqp10_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,18 @@ func (p *Amqp10Publisher) Connect() {
})

if err != nil {
log.Error("connection failed", "protocol", "amqp-1.0", "publisherId", p.Id, "error", err.Error())
log.Error("connection failed", "publisherId", p.Id, "error", err.Error())
time.Sleep(1 * time.Second)
} else {
log.Debug("connection established", "protocol", "amqp-1.0", "publisherId", p.Id, "uri", uri)
log.Debug("connection established", "publisherId", p.Id, "uri", uri)
p.Connection = conn
}
}

for p.Session == nil {
session, err := p.Connection.NewSession(context.TODO(), nil)
if err != nil {
log.Error("publisher failed to create a session", "protocol", "amqp-1.0", "publisherId", p.Id, "error", err.Error())
log.Error("publisher failed to create a session", "publisherId", p.Id, "error", err.Error())
time.Sleep(1 * time.Second)
p.Connect()
} else {
Expand Down Expand Up @@ -121,7 +121,7 @@ func (p *Amqp10Publisher) CreateSender() {
SettlementMode: settleMode,
TargetDurability: durability})
if err != nil {
log.Error("publisher failed to create a sender", "protocol", "amqp-1.0", "publisherId", p.Id, "error", err.Error())
log.Error("publisher failed to create a sender", "publisherId", p.Id, "error", err.Error())
time.Sleep(1 * time.Second)
p.Connect()
} else {
Expand All @@ -146,11 +146,11 @@ func (p *Amqp10Publisher) Start(ctx context.Context) {
p.StartRateLimited(ctx)
}

log.Debug("publisher completed", "protocol", "amqp-1.0", "publisherId", p.Id)
log.Debug("publisher completed", "publisherId", p.Id)
}

func (p *Amqp10Publisher) StartFullSpeed(ctx context.Context) {
log.Info("publisher started", "protocol", "AMQP-1.0", "publisherId", p.Id, "rate", "unlimited", "destination", p.Terminus)
log.Info("publisher started", "publisherId", p.Id, "rate", "unlimited", "destination", p.Terminus)

for i := 1; i <= p.Config.PublishCount; {
select {
Expand All @@ -168,13 +168,13 @@ func (p *Amqp10Publisher) StartFullSpeed(ctx context.Context) {
}

func (p *Amqp10Publisher) StartIdle(ctx context.Context) {
log.Info("publisher started", "protocol", "AMQP-1.0", "publisherId", p.Id, "rate", "-", "destination", p.Terminus)
log.Info("publisher started", "publisherId", p.Id, "rate", "-", "destination", p.Terminus)

_ = ctx.Done()
}

func (p *Amqp10Publisher) StartRateLimited(ctx context.Context) {
log.Info("publisher started", "protocol", "AMQP-1.0", "publisherId", p.Id, "rate", p.Config.Rate, "destination", p.Terminus)
log.Info("publisher started", "publisherId", p.Id, "rate", p.Config.Rate, "destination", p.Terminus)
ticker := time.NewTicker(time.Duration(1000/float64(p.Config.Rate)) * time.Millisecond)

msgSent := 0
Expand Down Expand Up @@ -221,15 +221,15 @@ func (p *Amqp10Publisher) Send() error {
err := p.Sender.Send(context.TODO(), msg, nil)
timer.ObserveDuration()
if err != nil {
log.Error("message sending failure", "protocol", "amqp-1.0", "publisherId", p.Id, "error", err.Error())
log.Error("message sending failure", "publisherId", p.Id, "error", err.Error())
return err
}
metrics.MessagesPublished.With(prometheus.Labels{"protocol": "amqp-1.0"}).Inc()
log.Debug("message sent", "protocol", "amqp-1.0", "publisherId", p.Id)
log.Debug("message sent", "publisherId", p.Id)
return nil
}

func (p *Amqp10Publisher) Stop(reason string) {
log.Debug("closing connection", "protocol", "amqp-1.0", "publisherId", p.Id, "reason", reason)
log.Debug("closing connection", "publisherId", p.Id, "reason", reason)
_ = p.Connection.Close()
}
19 changes: 13 additions & 6 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,30 @@ import (
"os"

"log/slog"

"github.com/charmbracelet/log"
)

var logger *slog.Logger

var Levels = map[slog.Level][]string{
slog.LevelDebug: {"debug"},
slog.LevelInfo: {"info"},
slog.LevelError: {"error"},
var Levels = map[log.Level][]string{
log.DebugLevel: {"debug"},
log.InfoLevel: {"info"},
log.ErrorLevel: {"error"},
}

var Level slog.Level
var Level log.Level

func Setup() {
logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: Level}))
handler := log.New(os.Stderr)
handler.SetLevel(Level)
logger = slog.New(handler)
}

func Debug(format string, v ...interface{}) {
if logger == nil {
Setup()
}
logger.Debug(format, v...)
}

Expand Down
Loading

0 comments on commit c1e9d64

Please sign in to comment.