Skip to content
This repository has been archived by the owner on Dec 28, 2024. It is now read-only.

Presence for pub/sub #212

Merged
merged 2 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,29 @@ type Cacheable interface {
ToCacheEntry() ([]byte, error)
}

// We can extend the presence read functionality in the future
// (e.g., add pagination, filtering, etc.)
type PresenceInfoOptions struct {
ReturnRecords bool `json:"return_records,omitempty"`
}

func NewPresenceInfoOptions() *PresenceInfoOptions {
return &PresenceInfoOptions{ReturnRecords: true}
}

type PresenceInfoOption func(*PresenceInfoOptions)

func WithPresenceInfoOptions(opts *PresenceInfoOptions) PresenceInfoOption {
return func(o *PresenceInfoOptions) {
if opts != nil {
*o = *opts
}
}
}

// Broker is responsible for:
// - Managing streams history.
// - Managing presence information.
// - Keeping client states for recovery.
// - Distributing broadcasts across nodes.
//
Expand Down Expand Up @@ -56,6 +77,20 @@ type Broker interface {
RestoreSession(from string) ([]byte, error)
// Marks session as finished (for cache expiration)
FinishSession(sid string) error

// Adds a new presence record for the stream. Returns true if that's the first
// presence record for the presence ID (pid, a unique user presence identifier).
PresenceAdd(stream string, sid string, pid string, info interface{}) (*common.PresenceEvent, error)

// Removes a presence record for the stream. Returns true if that was the last
// record for the presence ID (pid).
PresenceRemove(stream string, sid string) (*common.PresenceEvent, error)

// Retrieves presence information for the stream (counts, records, etc. depending on the options)
PresenceInfo(stream string, opts ...PresenceInfoOption) (*common.PresenceInfo, error)

// Marks presence record as finished (for cache expiration)
FinishPresence(sid string) error
}

// LocalBroker is a single-node broker that can used to store streams data locally
Expand Down Expand Up @@ -193,3 +228,19 @@ func (LegacyBroker) RestoreSession(from string) ([]byte, error) {
func (LegacyBroker) FinishSession(sid string) error {
return nil
}

func (LegacyBroker) PresenceAdd(stream string, sid string, pid string, info interface{}) (*common.PresenceEvent, error) {
return nil, errors.New("presence not supported")
}

func (LegacyBroker) PresenceRemove(stream string, sid string) (*common.PresenceEvent, error) {
return nil, errors.New("presence not supported")
}

func (LegacyBroker) PresenceInfo(stream string, opts ...PresenceInfoOption) (*common.PresenceInfo, error) {
return nil, errors.New("presence not supported")
}

func (LegacyBroker) FinishPresence(sid string) error {
return nil
}
7 changes: 7 additions & 0 deletions broker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type Config struct {
HistoryLimit int `toml:"history_limit"`
// Sessions cache TTL in seconds (after disconnect)
SessionsTTL int64 `toml:"sessions_ttl"`
// Presence expire TTL in seconds (after disconnect)
PresenceTTL int64 `toml:"presence_ttl"`
}

func NewConfig() Config {
Expand All @@ -24,6 +26,8 @@ func NewConfig() Config {
HistoryLimit: 100,
// 5 minutes by default
SessionsTTL: 5 * 60,
// 15 seconds by default
PresenceTTL: 15,
}
}

Expand All @@ -46,6 +50,9 @@ func (c Config) ToToml() string {
result.WriteString("# For how long to store sessions state for resumeability (seconds)\n")
result.WriteString(fmt.Sprintf("sessions_ttl = %d\n", c.SessionsTTL))

result.WriteString("# For how long to keep presence information after session disconnect (seconds)\n")
result.WriteString(fmt.Sprintf("presence_ttl = %d\n", c.PresenceTTL))

result.WriteString("\n")

return result.String()
Expand Down
Loading
Loading