Skip to content

Commit

Permalink
shadow
Browse files Browse the repository at this point in the history
  • Loading branch information
padraicbc committed Sep 25, 2023
1 parent 23dc9e7 commit fc4668a
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 46 deletions.
12 changes: 6 additions & 6 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,14 @@ func newSaramaProducer(config Config) (sarama.SyncProducer, error) {

if config.ProxyURL != "" {
// Should point to a registered tcp proxy URL.
httpProxyURI, err := url.Parse(config.ProxyURL)
if err != nil {
return nil, err
httpProxyURI, parseErr := url.Parse(config.ProxyURL)
if parseErr != nil {
return nil, parseErr
}

httpDialer, err := proxy.FromURL(httpProxyURI, proxy.Direct)
if err != nil {
return nil, err
httpDialer, fromErr := proxy.FromURL(httpProxyURI, proxy.Direct)
if fromErr != nil {
return nil, fromErr
}

c.Net.Proxy = struct {
Expand Down
83 changes: 43 additions & 40 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,23 +94,16 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers
} else {
return nil, err
}
if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
return nil, err
}
c.Version = version
}

if config.ProxyURL != "" {
httpProxyURI, err := url.Parse(config.ProxyURL)
if err != nil {
return nil, err
httpProxyURI, parseErr := url.Parse(config.ProxyURL)
if parseErr != nil {
return nil, parseErr
}

httpDialer, err := proxy.FromURL(httpProxyURI, proxy.Direct)
if err != nil {
return nil, err
httpDialer, fromErr := proxy.FromURL(httpProxyURI, proxy.Direct)
if fromErr != nil {
return nil, fromErr
}

c.Net.Proxy = struct {
Expand All @@ -122,6 +115,14 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers
}
}

if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
return nil, err
}
c.Version = version
}

if err := kafkaexporter.ConfigureAuthentication(config.Authentication, c); err != nil {
return nil, err
}
Expand Down Expand Up @@ -208,23 +209,16 @@ func newMetricsReceiver(config Config, set receiver.CreateSettings, unmarshalers
} else {
return nil, err
}
if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
return nil, err
}
c.Version = version
}

if config.ProxyURL != "" {
httpProxyURI, err := url.Parse(config.ProxyURL)
if err != nil {
return nil, err
httpProxyURI, parseErr := url.Parse(config.ProxyURL)
if parseErr != nil {
return nil, parseErr
}

httpDialer, err := proxy.FromURL(httpProxyURI, proxy.Direct)
if err != nil {
return nil, err
httpDialer, fromErr := proxy.FromURL(httpProxyURI, proxy.Direct)
if fromErr != nil {
return nil, fromErr
}

c.Net.Proxy = struct {
Expand All @@ -237,6 +231,14 @@ func newMetricsReceiver(config Config, set receiver.CreateSettings, unmarshalers

}

if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
return nil, err
}
c.Version = version
}

if err := kafkaexporter.ConfigureAuthentication(config.Authentication, c); err != nil {
return nil, err
}
Expand Down Expand Up @@ -322,24 +324,16 @@ func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers ma
if err != nil {
return nil, err
}
if config.ProtocolVersion != "" {
var version sarama.KafkaVersion
version, err = sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
return nil, err
}
c.Version = version
}

if config.ProxyURL != "" {
httpProxyURI, err := url.Parse(config.ProxyURL)
if err != nil {
return nil, err
httpProxyURI, parseErr := url.Parse(config.ProxyURL)
if parseErr != nil {
return nil, parseErr
}

httpDialer, err := proxy.FromURL(httpProxyURI, proxy.Direct)
if err != nil {
return nil, err
httpDialer, fromErr := proxy.FromURL(httpProxyURI, proxy.Direct)
if fromErr != nil {
return nil, fromErr
}

c.Net.Proxy = struct {
Expand All @@ -352,6 +346,15 @@ func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers ma

}

if config.ProtocolVersion != "" {
var version sarama.KafkaVersion
version, err = sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
return nil, err
}
c.Version = version
}

if err = kafkaexporter.ConfigureAuthentication(config.Authentication, c); err != nil {
return nil, err
}
Expand Down

0 comments on commit fc4668a

Please sign in to comment.