Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize #37

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions src/jobs/equities.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
(:require [clj-time.coerce :as coerce]
[clj-time.core :as time]
[clj-time.format :as format]
[clojure.core.reducers :as r]
[clojure.data.json :as json]
[clojure.java.jdbc :as jdbc]
[clojure.tools.cli :as cli]
Expand Down Expand Up @@ -289,11 +290,11 @@
(defn execute! [cxn data]
(jdbc/with-db-transaction [txn cxn]
(->> data
(map prepare-row)
flatten
(remove nil?)
(map #(update-or-insert! txn %))
doall)))
(r/map prepare-row)
r/flatten
(r/remove nil?)
(r/map #(update-or-insert! txn %))
(r/fold 1 r/cat r/append!))))

(defn -main [& args]
(error/set-default-error-handler)
Expand All @@ -311,8 +312,9 @@
util/joda-date->date-str)}
query-params)
data (->> (concat alpha-vantage tiingo morningstar quandl)
(map #(api/get-data % query-params*))
flatten)]
(execute! cxn data)))
vec
(r/map #(api/get-data % query-params*))
(r/fold 1 r/cat r/append!))]
(execute! cxn data)))

(util/notify-healthchecks-io (-> :healthchecks-io-api-key env)))
64 changes: 37 additions & 27 deletions src/markets_etl/api.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns markets-etl.api
(:require [clj-http.client :as http]
[clojure.core.reducers :as r]
[clojure.data.json :as json]
[clojure.set :as set]
[clojure.string :as string]
Expand Down Expand Up @@ -57,7 +58,8 @@
(query-alpha-vantage-api! ticker {}))
([url ticker paramz]
{:pre [(every? true? (allowed? paramz))]}
(Thread/sleep 5500)
(log/debug "query-alpha-vantage-api! called")
(Thread/sleep 3500) ; previous value: 5500
(let [params (dissoc paramz :limit)
response (try
(http/get url)
Expand All @@ -66,9 +68,9 @@
(ex-data e))
(ex-data e)))
{:keys [status body]} response
_ (log/debug ticker)
_ (log/debug params)
#__ #_(log/info body)]
_ (log/trace ticker)
_ (log/trace params)
#__ #_(log/trace body)]
(if (= 200 status)
(-> body
(json/read-str :key-fn (comp keyword
Expand Down Expand Up @@ -107,6 +109,7 @@
(query-tiingo! ticker {}))
([ticker paramz]
{:pre [(every? true? (allowed? paramz))]}
(log/debug "query-tiingo! called")
(let [params (dissoc paramz :limit)
url (str (:protocol tiingo-api)
(:url tiingo-api)
Expand All @@ -127,9 +130,9 @@
(ex-data e))
(ex-data e)))
{:keys [status body]} response
_ (log/debug ticker)
_ (log/debug params)
#__ #_(log/info body)]
_ (log/trace ticker)
_ (log/trace params)
#__ #_(log/trace body)]
(if (= 200 status)
(-> body
(json/read-str :key-fn (comp keyword string/lower-case)))
Expand Down Expand Up @@ -157,9 +160,9 @@
(ex-data e))
(ex-data e)))
{:keys [status body]} response
_ (log/debug ticker)
_ (log/debug params)
#__ #_(log/debug body)]
_ (log/trace ticker)
_ (log/trace params)
#__ #_(log/trace body)]
(if (= 200 status)
(-> body
(json/read-str :key-fn keyword))
Expand All @@ -170,6 +173,7 @@
(query-morningstar! ticker {}))
([ticker paramz]
{:pre [(every? true? (allowed? paramz))]}
(log/debug "query-morningstar! called")
(let [params (dissoc paramz :limit)
url (str (:protocol morningstar-api)
(:url morningstar-api)
Expand All @@ -189,10 +193,10 @@
{:keys [status body]} response
body' (-> body
(string/replace #"NaN" "null"))
_ (log/debug ticker)
_ (log/debug params)
_ (log/debug url)
#__ #_(log/debug body')]
_ (log/trace ticker)
_ (log/trace params)
_ (log/trace url)
#__ #_(log/trace body')]
(if (and (= 200 status) ((comp not empty?) body'))
(-> body'
(json/read-str :key-fn (comp keyword string/lower-case)))
Expand All @@ -204,6 +208,7 @@
(query-quandl! dataset ticker {}))
([dataset ticker paramz]
{:pre [(every? true? (allowed? paramz))]}
(log/debug "query-quandl! called")
(let [url (str (:protocol quandl-api)
(:url quandl-api)
(str dataset "/")
Expand All @@ -219,9 +224,9 @@
(ex-data e))
(ex-data e)))
{:keys [status body]} response
_ (log/debug ticker)
_ (log/debug params)
#__ #_(log/debug body)]
_ (log/trace ticker)
_ (log/trace params)
#__ #_(log/trace body)]
(if (= 200 status)
(-> body
(json/read-str :key-fn keyword)
Expand All @@ -236,27 +241,30 @@
ticker]}
query-params]
(->> ticker
(map (fn [tkr]
(r/map (fn [tkr]
(->> (query-tiingo! tkr
query-params)
(map #(assoc % :dataset dataset :ticker tkr)))))))
(map #(assoc % :dataset dataset :ticker tkr)))))
r/foldcat))

(defmethod get-data "MSTAR" [{:keys [dataset
ticker]}
query-params]
(->> ticker
(map (fn [tkr]
(r/map (fn [tkr]
(-> (query-morningstar! tkr
query-params)
(assoc :dataset dataset :ticker tkr))))))
(assoc :dataset dataset :ticker tkr))))
r/foldcat))

(defmethod get-data "INTRINIO" [{:keys [dataset
ticker]}
query-params]
(->> ticker
(map (fn [tkr]
(r/map (fn [tkr]
(-> (query-intrinio! tkr query-params)
(assoc :dataset dataset :ticker tkr))))))
(assoc :dataset dataset :ticker tkr))))
r/foldcat))

(defmethod get-data "ALPHA-VANTAGE" [{:keys [dataset
ticker]}
Expand All @@ -267,16 +275,18 @@
:currency
:equities)]
(->> ticker
(map (fn [tkr]
(r/map (fn [tkr]
(-> (query-alpha-vantage! {:endpoint alpha-vantage-dataset
:ticker tkr
:query-params query-params})
(assoc :dataset dataset :ticker tkr)))))))
(assoc :dataset dataset :ticker tkr))))
r/foldcat)))

(defmethod get-data :default [{:keys [dataset
ticker] :as m}
query-params]
(->> ticker
(map (fn [tkr]
(r/map (fn [tkr]
(-> (query-quandl! dataset tkr query-params)
(assoc :dataset dataset :ticker tkr))))))
(assoc :dataset dataset :ticker tkr))))
r/foldcat))