diff --git a/src/jobs/equities.clj b/src/jobs/equities.clj index c93b530..301b585 100644 --- a/src/jobs/equities.clj +++ b/src/jobs/equities.clj @@ -310,9 +310,11 @@ :date util/joda-date->date-str)} query-params) - data (->> (concat alpha-vantage tiingo morningstar quandl) - (map #(api/get-data % query-params*)) + data (->> (concat #_alpha-vantage tiingo morningstar quandl) + util/tmap ;api/get-data query-params*)) + #_(map #(future (api/get-data % query-params*))) flatten)] - (execute! cxn data))) + (println data) + #_(execute! cxn data))) - (util/notify-healthchecks-io (-> :healthchecks-io-api-key env))) + #_(util/notify-healthchecks-io (-> :healthchecks-io-api-key env))) diff --git a/src/markets_etl/api.clj b/src/markets_etl/api.clj index bd7e130..633bbdb 100644 --- a/src/markets_etl/api.clj +++ b/src/markets_etl/api.clj @@ -57,7 +57,8 @@ (query-alpha-vantage-api! ticker {})) ([url ticker paramz] {:pre [(every? true? (allowed? paramz))]} - (Thread/sleep 5500) + (log/info "query-alpha-vantage-api! called") + (Thread/sleep 3500) (let [params (dissoc paramz :limit) response (try (http/get url) @@ -107,6 +108,7 @@ (query-tiingo! ticker {})) ([ticker paramz] {:pre [(every? true? (allowed? paramz))]} + (log/info "query-tiingo! called") (let [params (dissoc paramz :limit) url (str (:protocol tiingo-api) (:url tiingo-api) @@ -170,6 +172,7 @@ (query-morningstar! ticker {})) ([ticker paramz] {:pre [(every? true? (allowed? paramz))]} + (log/info "query-morningstar! called") (let [params (dissoc paramz :limit) url (str (:protocol morningstar-api) (:url morningstar-api) @@ -204,6 +207,7 @@ (query-quandl! dataset ticker {})) ([dataset ticker paramz] {:pre [(every? true? (allowed? paramz))]} + (log/info "query-quandl! called") (let [url (str (:protocol quandl-api) (:url quandl-api) (str dataset "/") diff --git a/src/markets_etl/util.clj b/src/markets_etl/util.clj index 240eb77..dbccecf 100644 --- a/src/markets_etl/util.clj +++ b/src/markets_etl/util.clj @@ -4,7 +4,9 @@ [clj-time.core :as time] [clj-time.format :as formatter] [clojure.pprint :as pprint] - [clojure.string :as string])) + [clojure.string :as string] + [markets-etl.api :as api]) + (:import (java.util.concurrent Executors))) ; -- dev ----------------------------------------------- (defn print-it [coll] @@ -83,6 +85,22 @@ (map string/join) (string/join "\n"))) +; -- parallelization ----------------------------------- +(defn tiled-pmap [grain-size f xs] + (->> xs + (partition-all grain-size) + (pmap (fn [pgroup] (doall (map f pgroup)))) (apply concat))) + +(defn tmap [data] ; threadmap + (let [pool (Executors/newFixedThreadPool 3) + query-params {:limit 500 + :start_date util/last-week + :end_date util/now} + tasks (map #(api/get-data % query-params) data)] + (doseq [future (.invokeAll pool tasks)] + (println (.get future))) + (.shutdown pool))) + ; -- alerts -------------------------------------------- (defn notify-healthchecks-io [api-key] (http/get (str "https://hchk.io/"