Skip to content

Commit

Permalink
enhance(rtc): try to restart rtc when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
RCmerci committed Dec 11, 2024
1 parent 19ba710 commit 6cc6a20
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 54 deletions.
2 changes: 1 addition & 1 deletion src/main/frontend/components/header.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
[frontend.components.page-menu :as page-menu]
[frontend.components.plugins :as plugins]
[frontend.components.right-sidebar :as sidebar]
[frontend.components.rtc.flows :as rtc-flows]
[frontend.handler.db-based.rtc-flows :as rtc-flows]
[frontend.components.rtc.indicator :as rtc-indicator]
[frontend.components.server :as server]
[frontend.components.settings :as settings]
Expand Down
36 changes: 0 additions & 36 deletions src/main/frontend/components/rtc/flows.cljs

This file was deleted.

2 changes: 1 addition & 1 deletion src/main/frontend/components/rtc/indicator.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"RTC state indicator"
(:require [cljs-time.core :as t]
[frontend.common.missionary-util :as c.m]
[frontend.components.rtc.flows :as rtc-flows]
[frontend.handler.db-based.rtc-flows :as rtc-flows]
[frontend.state :as state]
[frontend.ui :as ui]
[frontend.util :as util]
Expand Down
25 changes: 22 additions & 3 deletions src/main/frontend/handler/db_based/rtc.cljs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
(ns frontend.handler.db-based.rtc
"RTC handler"
(:require [frontend.config :as config]
(:require [cljs-time.core :as t]
[frontend.common.missionary-util :as c.m]
[frontend.config :as config]
[frontend.db :as db]
[frontend.handler.db-based.rtc-flows :as rtc-flows]
[frontend.handler.notification :as notification]
[frontend.handler.user :as user-handler]
[frontend.state :as state]
[logseq.common.util :as common-util]
[logseq.db :as ldb]
[logseq.db.sqlite.common-db :as sqlite-common-db]
[missionary.core :as m]
[promesa.core :as p]))

(defn <rtc-create-graph!
Expand Down Expand Up @@ -54,12 +59,12 @@
(.rtc-stop worker)))

(defn <rtc-start!
[repo]
[repo & {:keys [stop-before-start?] :or {stop-before-start? true}}]
(when-let [^js worker @state/*db-worker]
(when (ldb/get-graph-rtc-uuid (db/get-db repo))
(p/do!
(js/Promise. user-handler/task--ensure-id&access-token)
(<rtc-stop!)
(when stop-before-start? (<rtc-stop!))
(let [token (state/get-auth-id-token)]
(p/let [result (.rtc-start worker repo token)
start-ex (ldb/read-transit-str result)
Expand Down Expand Up @@ -117,3 +122,17 @@
(p/catch (fn [e]
(notification/show! (str "Something wrong, please try again.") :error)
(js/console.error e)))))))

;;; background task: try to restart rtc-loop when possible,
;;; triggered by `rtc-flows/rtc-try-restart-flow`
(c.m/run-background-task
::restart-rtc-task
(m/reduce
(constantly nil)
(m/ap
(let [{:keys [graph-uuid t]} (m/?> rtc-flows/rtc-try-restart-flow)]
(when (and graph-uuid t
(= graph-uuid (ldb/get-graph-rtc-uuid (db/get-db)))
(> 5000 (- (common-util/time-ms) t)))
(prn :trying-to-restart-rtc graph-uuid (t/now))
(c.m/<? (<rtc-start! (state/get-current-repo) :stop-before-start? false)))))))
68 changes: 68 additions & 0 deletions src/main/frontend/handler/db_based/rtc_flows.cljs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
(ns frontend.handler.db-based.rtc-flows
(:require [frontend.common.missionary-util :as c.m]
[frontend.state :as state]
[logseq.common.util :as common-util]
[missionary.core :as m]))

(def rtc-log-flow
(m/watch (:rtc/log @state/state)))

(def rtc-download-log-flow
(m/eduction
(filter #(= :rtc.log/download (:type %)))
rtc-log-flow))

(def rtc-upload-log-flow
(m/eduction
(filter #(= :rtc.log/upload (:type %)))
rtc-log-flow))

(def rtc-misc-log-flow
(m/eduction
(remove #(contains? #{:rtc.log/download :rtc.log/upload} (:type %)))
rtc-log-flow))

(def rtc-state-flow
(m/stream (m/watch (:rtc/state @state/state))))

(def rtc-online-users-flow
(c.m/throttle
500
(m/eduction
(map (fn [m]
(when (and (= :open (:ws-state (:rtc-state m)))
(:rtc-lock m))
(:online-users m))))
(dedupe)
rtc-state-flow)))

(def ^:private network-online-change-flow
(m/stream
(m/relieve
(m/observe
(fn ctor [emit!]
(let [origin-callback js/window.ononline]
(set! js/window.ononline emit!)
(emit! nil)
(fn dtor []
(set! js/window.ononline origin-callback))))))))

(def rtc-try-restart-flow
"emit an event when it's time to restart rtc loop.
conditions:
1. no rtc loop running now
2. last rtc stop-reason is websocket message timeout
3. current js/navigator.onLine=true
5. throttle 5000ms"
(->> (m/latest
(fn [rtc-state _] rtc-state)
(c.m/continue-flow rtc-state-flow) (c.m/continue-flow network-online-change-flow))
(m/eduction
(keep (fn [m]
(let [{:keys [rtc-lock last-stop-exception-ex-data graph-uuid]} m]
(when (and (some? graph-uuid)
(not rtc-lock) ; no rtc loop now
(= :rtc.exception/ws-timeout (:type last-stop-exception-ex-data))
(true? js/navigator.onLine))
{:graph-uuid graph-uuid :t (common-util/time-ms)})))))
(c.m/throttle 5000)))
23 changes: 15 additions & 8 deletions src/main/frontend/worker/rtc/core.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@
(m/ap
(m/amb
v
(let [_ (m/?< (->> flow
(m/reductions {} nil)
(m/latest identity)))]
(let [_ (m/?< (c.m/continue-flow flow))]
(try
(m/?< clock-flow)
(catch Cancelled _ (m/amb))))))))
Expand Down Expand Up @@ -218,7 +216,8 @@
:*rtc-auto-push? nil
:*online-users nil
:*rtc-lock nil
:canceler nil})
:canceler nil
:*last-stop-exception nil})

(defonce ^:private *rtc-loop-metadata (atom empty-rtc-loop-metadata))

Expand All @@ -235,7 +234,11 @@
date-formatter (common-config/get-date-formatter config)
{:keys [rtc-state-flow *rtc-auto-push? rtc-loop-task *online-users onstarted-task]}
(create-rtc-loop graph-uuid repo conn date-formatter token)
canceler (c.m/run-task rtc-loop-task :rtc-loop-task)
*last-stop-exception (atom nil)
canceler (c.m/run-task rtc-loop-task :rtc-loop-task
:fail (fn [e]
(reset! *last-stop-exception e)
(js/console.log :rtc-loop-task e)))
start-ex (m/? onstarted-task)]
(if-let [start-ex (:ex-data start-ex)]
(r.ex/->map start-ex)
Expand All @@ -246,7 +249,8 @@
:*rtc-auto-push? *rtc-auto-push?
:*online-users *online-users
:*rtc-lock *rtc-lock
:canceler canceler})
:canceler canceler
:*last-stop-exception *last-stop-exception})
nil)))
(r.ex/->map r.ex/ex-local-not-rtc-graph))
(r.ex/->map (ex-info "Not found db-conn" {:type :rtc.exception/not-found-db-conn
Expand Down Expand Up @@ -309,7 +313,9 @@
(def ^:private create-get-state-flow*
(let [rtc-loop-metadata-flow (m/watch *rtc-loop-metadata)]
(m/ap
(let [{rtc-lock :*rtc-lock :keys [repo graph-uuid user-uuid rtc-state-flow *rtc-auto-push? *online-users]}
(let [{rtc-lock :*rtc-lock
:keys [repo graph-uuid user-uuid rtc-state-flow *rtc-auto-push? *online-users
*last-stop-exception]}
(m/?< rtc-loop-metadata-flow)]
(try
(when (and repo rtc-state-flow *rtc-auto-push? rtc-lock)
Expand All @@ -324,7 +330,8 @@
:rtc-state rtc-state
:rtc-lock rtc-lock
:auto-push? rtc-auto-push?
:online-users online-users})
:online-users online-users
:last-stop-exception-ex-data (some-> *last-stop-exception deref ex-data)})
rtc-state-flow (m/watch *rtc-auto-push?) (m/watch rtc-lock) (m/watch *online-users)
(client-op/create-pending-block-ops-count-flow repo)
(rtc-log-and-state/create-local-t-flow graph-uuid)
Expand Down
9 changes: 4 additions & 5 deletions src/main/frontend/worker/rtc/log_and_state.cljs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns frontend.worker.rtc.log-and-state
"Fns to generate rtc related logs"
(:require [frontend.common.schema-register :as sr]
(:require [frontend.common.missionary-util :as c.m]
[frontend.common.schema-register :as sr]
[frontend.worker.util :as worker-util]
[malli.core :as ma]
[missionary.core :as m]))
Expand Down Expand Up @@ -65,16 +66,14 @@
[graph-uuid]
(->> (m/watch *graph-uuid->local-t)
(m/eduction (keep (fn [m] (get m (ensure-uuid graph-uuid)))))
(m/reductions {} nil)
(m/latest identity)))
c.m/continue-flow))

(defn create-remote-t-flow
[graph-uuid]
{:pre [(some? graph-uuid)]}
(->> (m/watch *graph-uuid->remote-t)
(m/eduction (keep (fn [m] (get m (ensure-uuid graph-uuid)))))
(m/reductions {} nil)
(m/latest identity)))
c.m/continue-flow))

(defn update-local-t
[graph-uuid local-t]
Expand Down

0 comments on commit 6cc6a20

Please sign in to comment.