diff --git a/broker.go b/broker.go index 1586010038..61bfdd336f 100644 --- a/broker.go +++ b/broker.go @@ -77,6 +77,7 @@ type Broker struct { unicasts map[string]bool // "/client_id" => true unicastsMux sync.RWMutex // mutex for tracking unicast routes keepAppLive bool + clientsByID map[string]*Client } func newBroker(site *Site, editable, noStore, noLog, keepAppLive, debug bool) *Broker { @@ -96,6 +97,7 @@ func newBroker(site *Site, editable, noStore, noLog, keepAppLive, debug bool) *B make(map[string]bool), sync.RWMutex{}, keepAppLive, + make(map[string]*Client), } } @@ -263,6 +265,7 @@ func (b *Broker) addClient(route string, client *Client) { b.unicastsMux.Lock() b.unicasts["/"+client.id] = true + b.clientsByID[client.id] = client b.unicastsMux.Unlock() echo(Log{"t": "ui_add", "addr": client.addr, "route": route}) @@ -291,6 +294,7 @@ func (b *Broker) dropClient(client *Client) { b.unicastsMux.Lock() delete(b.unicasts, "/"+client.id) + delete(b.clientsByID, client.id) b.unicastsMux.Unlock() echo(Log{"t": "ui_drop", "addr": client.addr}) diff --git a/client.go b/client.go index 02e052ab63..85c1120018 100644 --- a/client.go +++ b/client.go @@ -54,23 +54,29 @@ type BootMsg struct { // Client represent a websocket (UI) client. type Client struct { - id string // unique id - auth *Auth // auth provider, might be nil - addr string // remote IP:port, used for logging only - session *Session // end-user session - broker *Broker // broker - conn *websocket.Conn // connection - routes []string // watched routes - data chan []byte // send data - editable bool // allow editing? // TODO move to user; tie to role - baseURL string // URL prefix of the Wave server - header *http.Header // forwarded headers from the WS connection - appPath string // path of the app this client is connected to, doesn't change throughout WS lifetime - pingInterval time.Duration + id string // unique id + auth *Auth // auth provider, might be nil + addr string // remote IP:port, used for logging only + session *Session // end-user session + broker *Broker // broker + conn *websocket.Conn // connection + routes []string // watched routes + data chan []byte // send data + editable bool // allow editing? // TODO move to user; tie to role + baseURL string // URL prefix of the Wave server + header *http.Header // forwarded headers from the WS connection + appPath string // path of the app this client is connected to, doesn't change throughout WS lifetime + pingInterval time.Duration + isReconnect bool + cancel context.CancelFunc + reconnectTimeout time.Duration } -func newClient(addr string, auth *Auth, session *Session, broker *Broker, conn *websocket.Conn, editable bool, baseURL string, header *http.Header, pingInterval time.Duration) *Client { - return &Client{uuid.New().String(), auth, addr, session, broker, conn, nil, make(chan []byte, 256), editable, baseURL, header, "", pingInterval} +// TODO: Refactor some of the params into a Config struct. +func newClient(addr string, auth *Auth, session *Session, broker *Broker, conn *websocket.Conn, editable bool, + baseURL string, header *http.Header, pingInterval time.Duration, isReconnect bool, reconnectTimeout time.Duration) *Client { + id := uuid.New().String() + return &Client{id, auth, addr, session, broker, conn, nil, make(chan []byte, 256), editable, baseURL, header, "", pingInterval, isReconnect, nil, reconnectTimeout} } func (c *Client) refreshToken() error { @@ -90,15 +96,26 @@ func (c *Client) refreshToken() error { func (c *Client) listen() { defer func() { - app := c.broker.getApp(c.appPath) - if app != nil { - app.forward(c.id, c.session, disconnectMsg) - if err := app.disconnect(c.id); err != nil { - echo(Log{"t": "disconnect", "client": c.addr, "route": c.appPath, "err": err.Error()}) + ctx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + go func(ctx context.Context) { + select { + // Send disconnect message only if client doesn't reconnect within the specified timeframe. + case <-time.After(c.reconnectTimeout): + app := c.broker.getApp(c.appPath) + if app != nil { + app.forward(c.id, c.session, disconnectMsg) + if err := app.disconnect(c.id); err != nil { + echo(Log{"t": "disconnect", "client": c.addr, "route": c.appPath, "err": err.Error()}) + } + } + + c.broker.unsubscribe <- c + case <-ctx.Done(): + return } - } + }(ctx) - c.broker.unsubscribe <- c c.conn.Close() }() // Time allowed to read the next pong message from the peer. Must be greater than ping interval. @@ -157,8 +174,10 @@ func (c *Client) listen() { c.broker.sendAll(c.broker.clients[app.route], clearStateMsg) } case watchMsgT: - c.subscribe(m.addr) // subscribe even if page is currently NA - + if c.isReconnect { + continue + } + c.subscribe(m.addr) // subscribe even if page is currently NA if app := c.broker.getApp(m.addr); app != nil { // do we have an app handling this route? c.appPath = m.addr switch app.mode { diff --git a/cmd/wave/main.go b/cmd/wave/main.go index d05097cbdb..9b70b537c5 100644 --- a/cmd/wave/main.go +++ b/cmd/wave/main.go @@ -181,6 +181,7 @@ func main() { panic(err) } + // TODO: Handle this at the config parser level. if authConf.SessionExpiry, err = time.ParseDuration(conf.SessionExpiry); err != nil { panic(err) } @@ -189,6 +190,10 @@ func main() { panic(err) } + if serverConf.ReconnectTimeout, err = time.ParseDuration(conf.ReconnectTimeout); err != nil { + panic(err) + } + serverConf.WebDir, _ = filepath.Abs(conf.WebDir) serverConf.DataDir, _ = filepath.Abs(conf.DataDir) serverConf.Version = Version @@ -227,6 +232,8 @@ func main() { authConf.URLParameters = append(authConf.URLParameters, kv) } } + + // TODO: Handle this at the config parser level. if authConf.SessionExpiry, err = time.ParseDuration(conf.SessionExpiry); err != nil { panic(err) } diff --git a/conf.go b/conf.go index 1b9a5dd716..78f368c46a 100644 --- a/conf.go +++ b/conf.go @@ -52,6 +52,7 @@ type ServerConf struct { ForwardedHeaders map[string]bool KeepAppLive bool PingInterval time.Duration + ReconnectTimeout time.Duration } type AuthConf struct { @@ -112,4 +113,5 @@ type Conf struct { SkipLogin bool `cfg:"oidc-skip-login" env:"H2O_WAVE_OIDC_SKIP_LOGIN" cfgDefault:"false" cfgHelper:"do not display the login form during OIDC authorization"` KeepAppLive bool `cfg:"keep-app-live" env:"H2O_WAVE_KEEP_APP_LIVE" cfgDefault:"false" cfgHelper:"do not unregister unresponsive apps"` Conf string `cfg:"conf" env:"H2O_WAVE_CONF" cfgDefault:".env" cfgHelper:"path to configuration file"` + ReconnectTimeout string `cfg:"reconnect-timeout" env:"H2O_WAVE_RECONNECT_TIMEOUT" cfgDefault:"2s" cfgHelper:"Time to wait for reconnect before dropping the client"` } diff --git a/protocol.go b/protocol.go index 2bfe537def..6efc6d3181 100644 --- a/protocol.go +++ b/protocol.go @@ -23,6 +23,7 @@ type OpsD struct { E string `json:"e,omitempty"` // error M *Meta `json:"m,omitempty"` // metadata C int `json:"c,omitempty"` // clear UI state + I string `json:"i,omitempty"` // client id } // Meta represents metadata unrelated to commands diff --git a/server.go b/server.go index d700c132e6..84aa683f66 100644 --- a/server.go +++ b/server.go @@ -107,7 +107,7 @@ func Run(conf ServerConf) { handle("_auth/refresh", newRefreshHandler(auth, conf.Keychain)) } - handle("_s/", newSocketServer(broker, auth, conf.Editable, conf.BaseURL, conf.ForwardedHeaders, conf.PingInterval)) // XXX terminate sockets when logged out + handle("_s/", newSocketServer(broker, auth, conf.Editable, conf.BaseURL, conf.ForwardedHeaders, conf.PingInterval, conf.ReconnectTimeout)) fileDir := filepath.Join(conf.DataDir, "f") handle("_f/", newFileServer(fileDir, conf.Keychain, auth, conf.BaseURL+"_f")) diff --git a/socket.go b/socket.go index 5b9e755dd0..c2668bd307 100644 --- a/socket.go +++ b/socket.go @@ -31,10 +31,11 @@ type SocketServer struct { baseURL string forwardedHeaders map[string]bool pingInterval time.Duration + reconnectTimeout time.Duration } -func newSocketServer(broker *Broker, auth *Auth, editable bool, baseURL string, forwardedHeaders map[string]bool, pingInterval time.Duration) *SocketServer { - return &SocketServer{broker, auth, editable, baseURL, forwardedHeaders, pingInterval} +func newSocketServer(broker *Broker, auth *Auth, editable bool, baseURL string, forwardedHeaders map[string]bool, pingInterval, reconnectTimeout time.Duration) *SocketServer { + return &SocketServer{broker, auth, editable, baseURL, forwardedHeaders, pingInterval, reconnectTimeout} } func (s *SocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -70,8 +71,31 @@ func (s *SocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } } + clientID := r.URL.Query().Get("client-id") + client, ok := s.broker.clientsByID[clientID] + if ok { + client.conn = conn + client.isReconnect = true + if client.cancel != nil { + client.cancel() + } + if s.broker.debug { + echo(Log{"t": "socket_reconnect", "client_id": clientID, "addr": getRemoteAddr(r)}) + } + } else { + client = newClient(getRemoteAddr(r), s.auth, session, s.broker, conn, s.editable, s.baseURL, &header, s.pingInterval, false, s.reconnectTimeout) + } + + if msg, err := json.Marshal(OpsD{I: client.id}); err == nil { + sw, err := conn.NextWriter(websocket.TextMessage) + if err != nil { + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + sw.Write(msg) + sw.Close() + } - client := newClient(getRemoteAddr(r), s.auth, session, s.broker, conn, s.editable, s.baseURL, &header, s.pingInterval) go client.flush() go client.listen() } diff --git a/ts/index.ts b/ts/index.ts index 8e3286e90e..2b24ec3b4d 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -195,6 +195,7 @@ interface OpsD { e: B // can the user edit pages? } c?: U // clear UI state + i?: S // client id } interface OpD { k?: S @@ -938,14 +939,21 @@ export const let _socket: WebSocket | null = null, _page: XPage | null = null, - _backoff = 1 + _backoff = 1, + _reconnectFailures = 0, + _clientID = '' const slug = window.location.pathname, reconnect = (address: S) => { + if (_clientID && !address.includes('?client-id')) { + address = `${address}?${new URLSearchParams({ 'client-id': _clientID })}` + } + const retry = () => reconnect(address) const socket = new WebSocket(address) socket.onopen = () => { + _reconnectFailures = 0 _socket = socket handle(connectEvent) _backoff = 1 @@ -954,11 +962,17 @@ export const } socket.onclose = () => { const refreshRate = refreshRateB() - if (refreshRate === 0) return - // TODO handle refreshRate > 0 case + if (refreshRate === 0) return _socket = null + + // If on unstable network, retry immediately if we haven't failed before. + if (!_reconnectFailures) { + retry() + return + } + _page = null _backoff *= 2 if (_backoff > 16) _backoff = 16 @@ -994,6 +1008,8 @@ export const } else if (msg.m) { const { u: username, e: editable } = msg.m handle({ t: WaveEventType.Config, username, editable }) + } else if (msg.i) { + _clientID = msg.i } } catch (error) { console.error(error) @@ -1003,10 +1019,15 @@ export const } socket.onerror = () => { handle(dataEvent) + _reconnectFailures++ } }, - push = (data: any) => { - if (!_socket) return + push = (data: unknown) => { + if (!_socket) { + // Maybe currently reconnecting. Try again in 500ms. + if (!_reconnectFailures) setTimeout(() => push(data), 500) + return + } _socket.send(`@ ${slug} ${JSON.stringify(data || {})}`) }, fork = (): ChangeSet => { diff --git a/website/docs/configuration.md b/website/docs/configuration.md index 16b668de99..780c4d5a5e 100644 --- a/website/docs/configuration.md +++ b/website/docs/configuration.md @@ -12,7 +12,7 @@ Wave allows starting Wave server in 2 ways: Wave can be configured via configuration (`.env`) file, environment variables or command line arguments with the following priority: `cmd arg > env var > config > default`. - + | ENV var or config (wave run or waved) | CLI args (waved) | Description | |----------------------------------------|---------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | H2O_WAVE_ACCESS_KEY_ID | -access-key-id string | default API access key ID (default "access_key_id") | @@ -58,6 +58,7 @@ Wave can be configured via configuration (`.env`) file, environment variables or | H2O_WAVE_WEB_DIR | -web-dir string | directory to serve web assets from (default "./www") | | H2O_WAVE_CONF | -conf string | path to a configuration file (default ".env") | | H2O_WAVE_PING_INTERVAL | -ping-interval string | how often should ping messages be sent (e.g. 60s or 1m or 0.1h) to keep the websocket connection alive (default "50s") | +| H2O_WAVE_RECONNECT_TIMEOUT | -reconnect-timeout string | Time to wait for reconnect before dropping the client (default "2s") | [^1]: `1`, `t`, `true` to enable; `0`, `f`, `false` to disable (case insensitive). [^2]: Use OS-specific path list separator to specify multiple arguments - `:` for Linux/OSX and `;` for Windows. For example, `H2O_WAVE_PUBLIC_DIR=/images/@./files/images:/downloads/@./files/downloads`.