Skip to content

Commit

Permalink
Use websocket notify
Browse files Browse the repository at this point in the history
  • Loading branch information
1aerostorm committed Dec 1, 2023
1 parent b52d9ee commit 694411a
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 40 deletions.
2 changes: 2 additions & 0 deletions app/components/App.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import golos from 'golos-lib-js';
import tt from 'counterpart';
import ChainFailure from 'app/components/elements/ChainFailure'
import DialogManager from 'app/components/elements/common/DialogManager';
import NotifyPolling from 'app/components/elements/NotifyPolling'
import { init as initAnchorHelper } from 'app/utils/anchorHelper';
import { authRegisterUrl, } from 'app/utils/AuthApiClient';
import { APP_ICON, VEST_TICKER, } from 'app/client_config';
Expand Down Expand Up @@ -325,6 +326,7 @@ class App extends React.Component {
{process.env.BROWSER ? <TooltipManager /> : null}
<GlobalStyle />
{process.env.IS_APP ? <URLLoader /> : null}
<NotifyPolling />
</div>

);
Expand Down
9 changes: 5 additions & 4 deletions app/components/elements/MarkNotificationRead.jsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import React from 'react';
import PropTypes from 'prop-types'
import {connect} from 'react-redux';
import { markNotificationRead } from 'app/utils/NotifyApiClient';

import { markNotificationReadWs } from 'app/utils/NotifyApiClient'

class MarkNotificationRead extends React.Component {

Expand All @@ -20,7 +21,7 @@ class MarkNotificationRead extends React.Component {
if (!this.interval) {
const { account, update } = this.props;
this.interval = setInterval(() => {
markNotificationRead(account, this.fields_array).then(nc => update(nc));
markNotificationReadWs(account, this.fields_array).then(nc => update(nc));
}, interval);
}
}
Expand All @@ -31,7 +32,7 @@ class MarkNotificationRead extends React.Component {
if (interval)
this._activateInterval(interval);
else
markNotificationRead(account, this.fields_array).then(nc => update(nc));
markNotificationReadWs(account, this.fields_array).then(nc => update(nc));
}

UNSAFE_componentWillReceiveProps(nextProps) {
Expand All @@ -52,5 +53,5 @@ class MarkNotificationRead extends React.Component {
}

export default connect(null, dispatch => ({
update: (payload) => { dispatch({type: 'UPDATE_NOTIFICOUNTERS', payload})},
update: (payload) => { /*dispatch({type: 'UPDATE_NOTIFICOUNTERS', payload}) */},
}))(MarkNotificationRead);
88 changes: 88 additions & 0 deletions app/components/elements/NotifyPolling.jsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import React from 'react'
import { connect } from 'react-redux'

import { counterSubscribeWs, getNotificationsWs } from 'app/utils/NotifyApiClient'

const delay = async (msec) => await new Promise(resolve => setTimeout(resolve, msec))

class NotifyPolling extends React.Component {
componentDidMount() {
const { username } = this.props
if (username) {
this.poll(username)
}
}

componentDidUpdate(prevProps) {
const { username } = this.props
if (username && username !== prevProps.username) {
this.poll(username)
}
}

async poll(username) {
let firstFilled = false

const { update } = this.props

await delay(500)

let subscribeRes
while (true) {
subscribeRes = await counterSubscribeWs(username, async function (err, res) {
if (err) {
console.error(err)
return
}

if (firstFilled) { // TODO: it is more reliably to use timestamps to check order
await update(res.counters)
}
})

if (subscribeRes.err) {
console.warning('counterSubscribeWs:', subscribeRes.err, ', retry...')
await delay(2000)
} else {
break
}
}

while (true) {
if (this.props.username !== username) {
return
}

let counters
try {
counters = await getNotificationsWs(username)
} catch (error) {
console.error('getNotificationsWs', error)
}

if (counters) {
firstFilled = true
await update(counters)
break
}
await delay(2000)
}
}

render() {
return null
}
}

export default connect(
(state, ownProps) => {
const current = state.user.get('current')
const username = current && current.get('username')
return { username }
},
dispatch => ({
update: (payload) => {
dispatch({type: 'UPDATE_NOTIFICOUNTERS', payload})
},
})
)(NotifyPolling)
4 changes: 2 additions & 2 deletions app/components/modules/UserWallet.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {numberWithCommas, toAsset, vestsToSteem, steemToVests, accuEmissionPerDa
import FoundationDropdownMenu from 'app/components/elements/FoundationDropdownMenu';
import LiteTooltip from 'app/components/elements/LiteTooltip'
import { blogsUrl } from 'app/utils/blogsUtils'
import { markNotificationRead } from 'app/utils/NotifyApiClient'
import { markNotificationReadWs } from 'app/utils/NotifyApiClient'
import shouldComponentUpdate from 'app/utils/shouldComponentUpdate';
import Tooltip from 'app/components/elements/Tooltip';
import Icon from 'app/components/elements/Icon';
Expand Down Expand Up @@ -89,7 +89,7 @@ class UserWallet extends React.Component {

readNotifications = (account) => {
setTimeout(() => {
markNotificationRead(account.get('name'), ['delegate_vs'])
markNotificationReadWs(account.get('name'), ['delegate_vs'])
}, 500)
}

Expand Down
31 changes: 0 additions & 31 deletions app/redux/PollDataSaga.js

This file was deleted.

3 changes: 0 additions & 3 deletions app/redux/RootSaga.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@ import { sharedWatches } from 'app/redux/SagaShared';
import { userWatches } from 'app/redux/UserSaga';
import { authWatches } from 'app/redux/AuthSaga';
import { transactionWatches } from 'app/redux/TransactionSaga';
import PollDataSaga from 'app/redux/PollDataSaga';


export default function* rootSaga() {
yield fork(PollDataSaga);
yield fork(userWatches);
yield fork(fetchDataWatches)
yield fork(sharedWatches)
Expand Down
146 changes: 146 additions & 0 deletions app/utils/NotifyApiClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ const notifyAvailable = () => {
&& $STM_Config.notify_service && $STM_Config.notify_service.host;
};

const notifyWsAvailable = () => {
return notifyAvailable() && $STM_Config.notify_service.host_ws
}

const notifyUrl = (pathname) => {
return new URL(pathname, window.$STM_Config.notify_service.host).toString();
};
Expand All @@ -36,6 +40,73 @@ function saveSession(response) {
localStorage.setItem('X-Session', session);
}

async function connectNotifyWs() {
if (!window.notifyWs) {
window.notifyWsReq = { id: 0, requests: {}, callbacks: {} }
await new Promise((resolve, reject) => {
const notifyWs = new WebSocket($STM_Config.notify_service.host_ws)
window.notifyWs = notifyWs

notifyWs.addEventListener('open', () => {
notifyWs.isOpen = true
resolve()
})

notifyWs.addEventListener('сlose', () => {
if (!notifyWs.isOpen) {
const err = new Error('notifyWs - cannot connect')
reject(err)
}
})

notifyWs.addEventListener('message', (msg) => {
console.log('notifyWs message:', msg)
const data = JSON.parse(msg.data)
const id = data.id
const request = window.notifyWsReq.requests[id]
if (request) {
const cleanRequest = () => {
delete window.notifyWsReq.requests[id]
}

if (data.err) {
request.callback(new Error(data.err.code + ': ' + data.err.msg), data)
cleanRequest()
return
}
request.callback(null, data.data)
cleanRequest()
} else if (!id && data.data && data.data.event) {
const { event } = data.data
const callback = window.notifyWsReq.callbacks[event]
if (callback) {
callback.callback(null, data.data)
}
}
})
})
}
}

async function notifyWsSend(api, args, callback = null, eventCallback = null) {
await connectNotifyWs()
const id = window.notifyWsReq.id++
let msg = {
api,
args,
id
}
msg = JSON.stringify(msg)
if (callback) {
window.notifyWsReq.requests[id] = { callback }
}
if (eventCallback) {
const { event, callback } = eventCallback
window.notifyWsReq.callbacks[event] = { callback }
}
window.notifyWs.send(msg)
}

export function notifyApiLogin(account, authSession) {
if (!notifyAvailable()) return;
let request = Object.assign({}, request_base, {
Expand Down Expand Up @@ -71,6 +142,24 @@ export function getNotifications(account) {
});
}

export async function getNotificationsWs(account) {
if (!notifyWsAvailable()) {
console.error('getNotificationsWs - no notify_service.host_ws in config?')
return null
}
return await new Promise(async (resolve, reject) => {
await notifyWsSend('counters', {
account
}, (err, res) => {
if (err) {
reject(err)
return
}
resolve(res.counters)
})
})
}

export function markNotificationRead(account, fields) {
if (!notifyAvailable()) return Promise.resolve(null);
let request = Object.assign({}, request_base, {method: 'put', mode: 'cors'});
Expand All @@ -84,6 +173,59 @@ export function markNotificationRead(account, fields) {
});
}

export async function markNotificationReadWs(account, fields) {
if (!notifyWsAvailable()) return null
const xSession = notifySession()
const scopes = fields.join(',')
return await new Promise(async (resolve, reject) => {
await notifyWsSend('counters/read', {
account,
'X-Session': xSession,
scopes,
}, (err, res) => {
if (err) {
reject(err)
return
}
resolve(res.counters)
})
})
}

export async function counterSubscribeWs(account, callback) {
if (!notifyWsAvailable()) return null
const xSession = notifySession()
return await new Promise(async (resolve, reject) => {
await notifyWsSend('counters/subscribe', {
account,
'X-Session': xSession,
}, (err, res) => {
if (err) {
reject(err)
return
}
resolve(res)
}, { event: 'counter', callback})
})
}

export async function counterUnsubscribeWs(account) {
if (!notifyWsAvailable()) return null
const xSession = notifySession()
return await new Promise(async (resolve, reject) => {
await notifyWsSend('counters/unsubscribe', {
account,
'X-Session': xSession,
}, (err, res) => {
if (err) {
reject(err)
return
}
resolve(res)
})
})
}

export async function notificationSubscribe(account, scopes = 'message', sidKey = '__subscriber_id') {
if (!notifyAvailable()) return;
if (window[sidKey]) return;
Expand Down Expand Up @@ -172,7 +314,11 @@ export async function notificationTake(account, removeTaskIds, forEach, sidKey =

if (process.env.BROWSER) {
window.getNotifications = getNotifications;
window.getNotificationsWs = getNotificationsWs
window.markNotificationRead = markNotificationRead;
window.markNotificationReadWs = markNotificationReadWs
window.counterSubscribeWs = counterSubscribeWs
window.counterUnsubscribeWs = counterUnsubscribeWs
window.notificationSubscribe = notificationSubscribe;
window.notificationUnsubscribe = notificationUnsubscribe;
window.notificationTake = notificationTake;
Expand Down

0 comments on commit 694411a

Please sign in to comment.