Skip to content

Commit

Permalink
fix: Add nil safety to IsClosed and Close eventbus connection methods (
Browse files Browse the repository at this point in the history
…#2839)

Signed-off-by: gokulav137 <[email protected]>
Signed-off-by: gokulav137 <[email protected]>
  • Loading branch information
gokulav137 authored Oct 5, 2023
1 parent 4f01b34 commit 1b13a0c
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 1 deletion.
11 changes: 11 additions & 0 deletions eventbus/jetstream/eventsource/source_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,14 @@ func (jsc *JetstreamSourceConn) Publish(ctx context.Context,
jsc.Logger.Debugf("published message to subject %s", subject)
return err
}

func (conn *JetstreamSourceConn) IsClosed() bool {
return conn == nil || conn.JetstreamConnection.IsClosed()
}

func (conn *JetstreamSourceConn) Close() error {
if conn == nil {
return fmt.Errorf("can't close Jetstream source connection, JetstreamSourceConn is nil")
}
return conn.JetstreamConnection.Close()
}
11 changes: 11 additions & 0 deletions eventbus/jetstream/sensor/trigger_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ func NewJetstreamTriggerConn(conn *jetstreambase.JetstreamConnection,
return connection, nil
}

func (conn *JetstreamTriggerConn) IsClosed() bool {
return conn == nil || conn.JetstreamConnection.IsClosed()
}

func (conn *JetstreamTriggerConn) Close() error {
if conn == nil {
return fmt.Errorf("can't close Jetstream trigger connection, JetstreamTriggerConn is nil")
}
return conn.JetstreamConnection.Close()
}

func (conn *JetstreamTriggerConn) String() string {
if conn == nil {
return ""
Expand Down
5 changes: 4 additions & 1 deletion eventbus/kafka/sensor/trigger_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,14 @@ func (c *KafkaTriggerConnection) String() string {
}

func (c *KafkaTriggerConnection) Close() error {
if c.close == nil {
return fmt.Errorf("can't close Kafka trigger connection, close function is nil")
}
return c.close()
}

func (c *KafkaTriggerConnection) IsClosed() bool {
return c.isClosed()
return c.isClosed == nil || c.isClosed()
}

func (c *KafkaTriggerConnection) Subscribe(
Expand Down
11 changes: 11 additions & 0 deletions eventbus/stan/eventsource/source_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,14 @@ func (n *STANSourceConn) Publish(ctx context.Context,
}
return n.STANConn.Publish(n.subject, msg.Body)
}

func (conn *STANSourceConn) IsClosed() bool {
return conn == nil || conn.STANConnection.IsClosed()
}

func (conn *STANSourceConn) Close() error {
if conn == nil {
return fmt.Errorf("can't close STAN source connection, STANSourceConn is nil")
}
return conn.STANConnection.Close()
}
11 changes: 11 additions & 0 deletions eventbus/stan/sensor/trigger_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ func (n *STANTriggerConn) String() string {
return fmt.Sprintf("STANTriggerConn{ClientID:%s,Sensor:%s,Trigger:%s}", n.ClientID, n.sensorName, n.triggerName)
}

func (conn *STANTriggerConn) IsClosed() bool {
return conn == nil || conn.STANConnection.IsClosed()
}

func (conn *STANTriggerConn) Close() error {
if conn == nil {
return fmt.Errorf("can't close STAN trigger connection, STANTriggerConn is nil")
}
return conn.STANConnection.Close()
}

// Subscribe is used to subscribe to multiple event source dependencies
// Parameter - ctx, context
// Parameter - conn, eventbus connection
Expand Down

0 comments on commit 1b13a0c

Please sign in to comment.