Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/EV-38] Remove opentracing code #275

Merged
merged 14 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 0 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -425,38 +425,6 @@ and different timeout values.
:enable [true :bool]}}}}}
```

## Tracing

[Open Tracing](https://opentracing.io/docs/overview/) enables to identify the amount of time spent in various stages of the work flow.

Currently, the execution of the handler function is traced. If the message consumed has the corresponding tracing headers, then the E2E life time of the message from the time of production till the time of consumption can be traced.

Tracing has been added to the following flows:

1. Normal basic consume
2. Retry via rabbitmq
3. Produce to rabbitmq channel
4. Produce to another kafka topic

By default, tracing is done via [Jaeger](https://www.jaegertracing.io/) based on the env configs. Please refer [Jaeger Configuration](https://github.com/jaegertracing/jaeger-client-java/tree/master/jaeger-core#configuration-via-environment)
and [Jaeger Architecture](https://www.jaegertracing.io/docs/1.13/architecture/) to set the respective env variables.
To enable custom tracer, a custom tracer provider function name can be set in `:custom-provider`. The corresponding function will be executed in runtime to create a tracer. In the event of any errors while executing the custom tracer provider, a Noop tracer will be created.

To enable tracing, the following config needs to be added to the `config.edn` under `:ziggurat` key.

```clojure
:tracer {:enabled [true :bool]
:custom-provider ""}
```

Example Jaeger Env Config:

```
JAEGER_SERVICE_NAME: "service-name"
JAEGER_AGENT_HOST: "localhost"
JAEGER_AGENT_PORT: 6831
```

## Deprecation Notice
* Sentry has been deprecated.

Expand Down
6 changes: 0 additions & 6 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@
[mount "0.1.16"]
[io.jaegertracing/jaeger-core "1.6.0"]
[io.jaegertracing/jaeger-client "1.6.0"]
[io.opentracing/opentracing-api "0.33.0"]
[io.opentracing/opentracing-mock "0.33.0"]
[io.opentracing/opentracing-noop "0.33.0"]
[io.opentracing.contrib/opentracing-kafka-streams "0.1.15" :exclusions [org.lz4/lz4-java com.github.luben/zstd-jni org.apache.kafka/kafka-streams org.slf4j/slf4j-api org.xerial.snappy/snappy-java]]
[io.opentracing.contrib/opentracing-kafka-client "0.1.15" :exclusions [org.lz4/lz4-java com.github.luben/zstd-jni org.slf4j/slf4j-api org.xerial.snappy/snappy-java]]
[io.opentracing.contrib/opentracing-rabbitmq-client "0.1.11" :exclusions [com.rabbitmq/amqp-client]]
[org.apache.httpcomponents/fluent-hc "4.5.13"]
[org.apache.kafka/kafka-clients "2.8.2" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-streams "2.8.2" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
Expand Down
2 changes: 1 addition & 1 deletion resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
:default-api-timeout-ms-config [60000 :int]
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}}
:tracer {:enabled [true :bool]
:tracer {:enabled [false :bool]
:custom-provider ""}
:new-relic {:report-errors false}
:log-format "text"}}
7 changes: 2 additions & 5 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.server :as server]
[ziggurat.streams :as streams]
[ziggurat.tracer :as tracer]
[ziggurat.util.java-util :as util])
(:gen-class
:methods [^{:static true} [init [java.util.Map] void]]
Expand Down Expand Up @@ -150,15 +149,13 @@
(defn start-common-states []
(start* #{#'metrics/statsd-reporter
#'sentry-reporter
#'nrepl-server/server
#'tracer/tracer}))
#'nrepl-server/server}))

(defn stop-common-states []
(mount/stop #'config/config
#'sentry-reporter
#'metrics/statsd-reporter
#'nrepl-server/server
#'tracer/tracer))
#'nrepl-server/server))

(defn start
"Starts up Ziggurat's config, reporters, actor fn, rabbitmq connection and then streams, server etc"
Expand Down
11 changes: 3 additions & 8 deletions src/ziggurat/messaging/connection_helper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
[ziggurat.config :refer [ziggurat-config]]
[ziggurat.sentry :refer [sentry-reporter]]
[ziggurat.channel :refer [get-keys-for-topic]]
[ziggurat.tracer :refer [tracer]]
[ziggurat.messaging.util :as util]
[ziggurat.util.error :refer [report-error]])
(:import [com.rabbitmq.client ShutdownListener ConnectionFactory AddressResolver]
[java.util.concurrent Executors ExecutorService]
[io.opentracing.contrib.rabbitmq TracingConnectionFactory]
[com.rabbitmq.client.impl DefaultCredentialsProvider]))

(defn is-connection-required? []
Expand Down Expand Up @@ -48,10 +46,8 @@
(util/create-address-resolver rabbitmq-config)
(:connection-name rabbitmq-config))))

(defn create-connection [config tracer-enabled]
(if tracer-enabled
(create-rmq-connection (TracingConnectionFactory. tracer) config)
(create-rmq-connection (ConnectionFactory.) config)))
(defn create-connection [config]
(create-rmq-connection (ConnectionFactory.) config))

(defn- get-connection-config
[is-producer?]
Expand All @@ -70,8 +66,7 @@
(when (is-connection-required?)
(try
(let
[is-tracer-enabled? (get-in (ziggurat-config) [:tracer :enabled])
connection (create-connection (get-connection-config is-producer?) is-tracer-enabled?)]
[connection (create-connection (get-connection-config is-producer?))]
(log/info "Connection created " connection)
(doto connection
(.addShutdownListener
Expand Down
11 changes: 4 additions & 7 deletions src/ziggurat/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@
(:require [clojure.tools.logging :as log]
[mount.core :refer [defstate]]
[ziggurat.config :refer [build-producer-config-properties ziggurat-config]]
[ziggurat.tracer :refer [tracer]]
[ziggurat.util.java-util :refer [get-key]])
(:import (io.opentracing.contrib.kafka TracingKafkaProducer)
(org.apache.kafka.clients.producer KafkaProducer ProducerRecord))
(:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord))
(:gen-class
:methods [^{:static true} [send [String String Object Object] java.util.concurrent.Future]
^{:static true} [send [String String int Object Object] java.util.concurrent.Future]]
Expand All @@ -70,9 +68,8 @@
(do (log/info "Starting Kafka producers ...")
(reduce (fn [producers [stream-config-key properties]]
(log/debug "Constructing Kafka producer associated with [" stream-config-key "] ")
(let [kp (KafkaProducer. properties)
tkp (TracingKafkaProducer. kp tracer)]
(assoc producers stream-config-key tkp)))
(let [kp (KafkaProducer. properties)]
(assoc producers stream-config-key kp)))
{}
(seq (producer-properties-map))))
(log/info "No producers found. Can not initiate start."))
Expand All @@ -85,7 +82,7 @@
(.flush)
(.close)))
(seq kafka-producers))))
(log/info "No producers found.n Can not initiate stop.")))
(log/info "No producers found. Can not initiate stop.")))

(defn send
"A wrapper around `org.apache.kafka.clients.producer.KafkaProducer#send` which enables
Expand Down
36 changes: 11 additions & 25 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,9 @@
[ziggurat.message-payload :refer [->MessagePayload]]
[ziggurat.metrics :as metrics]
[ziggurat.timestamp-transformer :as timestamp-transformer]
[ziggurat.tracer :refer [tracer]]
[ziggurat.util.map :as umap]
[cambium.core :as clog])
(:import [io.opentracing.contrib.kafka TracingKafkaUtils]
[io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier]
[io.opentracing.tag Tags]
[java.time Duration]
(:import [java.time Duration]
[java.util Properties]
[java.util.regex Pattern]
[org.apache.kafka.common.errors TimeoutException]
Expand Down Expand Up @@ -126,22 +122,13 @@
(doseq [[topic-entity stream] streams]
(close-stream topic-entity stream)))

(defn- traced-handler-fn [handler-fn channels message topic-entity]
(let [parent-ctx (TracingKafkaUtils/extractSpanContext (:headers message) tracer)
span (as-> tracer t
(.buildSpan t "Message-Handler")
(.withTag t (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER)
(.withTag t (.getKey Tags/COMPONENT) "ziggurat")
(if (nil? parent-ctx)
t
(.asChildOf t parent-ctx))
(.start t))]
(try
((mapper-func handler-fn channels) (-> (->MessagePayload (:value message) topic-entity)
(assoc :headers (:headers message))
(assoc :metadata (:metadata message))))
(finally
(.finish span)))))
(defn- mapped-handler-fn [handler-fn channels message topic-entity]
(try
((mapper-func handler-fn channels)
(-> (->MessagePayload (:value message) topic-entity)
(assoc :headers (:headers message))
(assoc :metadata (:metadata message))))
(finally)))

(defn- join-streams
[oldest-processed-message-in-s topic-entity stream-1 stream-2]
Expand Down Expand Up @@ -187,7 +174,7 @@
{stream :stream} (reduce (partial join-streams oldest-processed-message-in-s topic-entity) stream-map)]
(->> stream
(header-transform-values)
(map-values #(traced-handler-fn handler-fn channels % topic-entity)))
(map-values #(mapped-handler-fn handler-fn channels % topic-entity)))
(.build builder))))

(defn- topology [handler-fn {:keys [origin-topic oldest-processed-message-in-s]} topic-entity channels]
Expand All @@ -198,7 +185,7 @@
(timestamp-transform-values topic-entity-name oldest-processed-message-in-s)
(header-transform-values)
(map-values #(log-and-report-metrics topic-entity-name %))
(map-values #(traced-handler-fn handler-fn channels % topic-entity)))
(map-values #(mapped-handler-fn handler-fn channels % topic-entity)))
(.build builder)))

(defn- start-stream* [handler-fn stream-config topic-entity channels]
Expand All @@ -209,8 +196,7 @@

(when-not (nil? top)
(KafkaStreams. ^Topology top
^Properties (properties stream-config)
(new TracingKafkaClientSupplier tracer)))))
^Properties (properties stream-config)))))

(defn- merge-consumer-type-config
[config]
Expand Down
92 changes: 0 additions & 92 deletions src/ziggurat/tracer.clj

This file was deleted.

15 changes: 2 additions & 13 deletions test/ziggurat/fixtures.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
[ziggurat.messaging.util :as util]
[ziggurat.metrics :as metrics]
[ziggurat.producer :as producer]
[ziggurat.server :refer [server]]
[ziggurat.tracer :as tracer])
(:import (io.opentracing.mock MockTracer)
(java.util Properties)
[ziggurat.server :refer [server]])
(:import (java.util Properties)
(org.apache.kafka.clients.consumer ConsumerConfig)
(org.apache.kafka.clients.producer ProducerConfig))
(:gen-class
Expand Down Expand Up @@ -67,14 +65,8 @@
(f)
(mount/stop #'metrics/statsd-reporter))

(defn mount-tracer []
(with-redefs [tracer/create-tracer (fn [] (MockTracer.))]
(-> (mount/only [#'tracer/tracer])
(mount/start))))

(defn mount-config-with-tracer [f]
(mount-config)
(mount-tracer)
(f)
(mount/stop))

Expand Down Expand Up @@ -119,8 +111,6 @@
(let [stream-routes {:default {:handler-fn #(constantly nil)
:channel-1 #(constantly nil)}}]
(mount-config)
(mount-tracer)

(->
(mount/only [#'producer-connection #'consumer-connection #'channel-pool])
(mount/with-args {:stream-routes stream-routes})
Expand Down Expand Up @@ -163,7 +153,6 @@

(defn mount-producer-with-config-and-tracer [f]
(mount-config)
(mount-tracer)
(mount-producer)
(binding [*bootstrap-servers* (get-in (config/ziggurat-config) [:stream-router :default :bootstrap-servers])]
(binding [*consumer-properties* (doto (Properties.)
Expand Down
Loading