Skip to content
This repository has been archived by the owner on Dec 28, 2024. It is now read-only.

Commit

Permalink
feat: presence PoC
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Dec 19, 2024
1 parent 844027d commit 0f3098b
Show file tree
Hide file tree
Showing 9 changed files with 569 additions and 25 deletions.
47 changes: 47 additions & 0 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,32 @@ type Cacheable interface {
ToCacheEntry() ([]byte, error)
}

type PresenceInfo struct {
// Total number of present clients (uniq)
Total int
// Presence records
Records []interface{}
}

// We can extend the presence read functionality in the future
// (e.g., add pagination, filtering, etc.)
type PresenceInfoOptions struct {
ReturnRecords bool
}

type PresenceInfoOption func(*PresenceInfoOptions)

func WithPresenceInfoOptions(opts *PresenceInfoOptions) PresenceInfoOption {
return func(o *PresenceInfoOptions) {
if opts != nil {
*o = *opts
}
}
}

// Broker is responsible for:
// - Managing streams history.
// - Managing presence information.
// - Keeping client states for recovery.
// - Distributing broadcasts across nodes.
//
Expand Down Expand Up @@ -56,6 +80,17 @@ type Broker interface {
RestoreSession(from string) ([]byte, error)
// Marks session as finished (for cache expiration)
FinishSession(sid string) error

// Adds a new presence record for the stream. Returns true if that's the first
// presence record for the presence ID (pid, a unique user presence identifier).
PresenceAdd(stream string, sid string, pid string, info interface{}) error

// Removes a presence record for the stream. Returns true if that was the last
// record for the presence ID (pid).
PresenceRemove(stream string, sid string, pid string) error

// Retrieves presence information for the stream (counts, records, etc. depending on the options)
PresenceInfo(stream string, opts ...PresenceInfoOption) (*PresenceInfo, error)
}

// LocalBroker is a single-node broker that can used to store streams data locally
Expand Down Expand Up @@ -193,3 +228,15 @@ func (LegacyBroker) RestoreSession(from string) ([]byte, error) {
func (LegacyBroker) FinishSession(sid string) error {
return nil
}

func (LegacyBroker) PresenceAdd(stream string, sid string, pid string, info interface{}) error {
return errors.New("presence not supported")
}

func (LegacyBroker) PresenceRemove(stream string, sid string, pid string) error {
return errors.New("presence not supported")
}

func (LegacyBroker) PresenceInfo(stream string, opts ...PresenceInfoOption) (*PresenceInfo, error) {
return nil, errors.New("presence not supported")
}
7 changes: 7 additions & 0 deletions broker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type Config struct {
HistoryLimit int `toml:"history_limit"`
// Sessions cache TTL in seconds (after disconnect)
SessionsTTL int64 `toml:"sessions_ttl"`
// Presence expire TTL in seconds (after disconnect)
PresenceTTL int64 `toml:"presence_ttl"`
}

func NewConfig() Config {
Expand All @@ -24,6 +26,8 @@ func NewConfig() Config {
HistoryLimit: 100,
// 5 minutes by default
SessionsTTL: 5 * 60,
// 15 seconds by default
PresenceTTL: 15,
}
}

Expand All @@ -46,6 +50,9 @@ func (c Config) ToToml() string {
result.WriteString("# For how long to store sessions state for resumeability (seconds)\n")
result.WriteString(fmt.Sprintf("sessions_ttl = %d\n", c.SessionsTTL))

result.WriteString("# For how long to keep presence information after session disconnect (seconds)\n")
result.WriteString(fmt.Sprintf("presence_ttl = %d\n", c.PresenceTTL))

result.WriteString("\n")

return result.String()
Expand Down
228 changes: 226 additions & 2 deletions broker/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,50 @@ type expireSessionEntry struct {
sid string
}

type presenceSessionEntry struct {
// stream -> pid
streams map[string]string
deadline int64
}

type presenceEntry struct {
info interface{}
sessions []string
}

func (pe *presenceEntry) remove(sid string) bool {
i := -1

for idx, s := range pe.sessions {
if s == sid {
i = idx
break
}
}

if i == -1 {
return false
}

pe.sessions = append(pe.sessions[:i], pe.sessions[i+1:]...)

return len(pe.sessions) == 0
}

type presenceState struct {
streams map[string]map[string]*presenceEntry
sessions map[string]*presenceSessionEntry

mu sync.RWMutex
}

func newPresenceState() *presenceState {
return &presenceState{
streams: make(map[string]map[string]*presenceEntry),
sessions: make(map[string]*presenceSessionEntry),
}
}

type Memory struct {
broadcaster Broadcaster
config *Config
Expand All @@ -179,6 +223,8 @@ type Memory struct {
sessions map[string]*sessionEntry
expireSessions []*expireSessionEntry

presence *presenceState

streamsMu sync.RWMutex
sessionsMu sync.RWMutex
epochMu sync.RWMutex
Expand All @@ -195,6 +241,7 @@ func NewMemoryBroker(node Broadcaster, config *Config) *Memory {
tracker: NewStreamsTracker(),
streams: make(map[string]*memstream),
sessions: make(map[string]*sessionEntry),
presence: newPresenceState(),
epoch: epoch,
}
}
Expand Down Expand Up @@ -376,18 +423,137 @@ func (b *Memory) RestoreSession(from string) ([]byte, error) {

func (b *Memory) FinishSession(sid string) error {
b.sessionsMu.Lock()
defer b.sessionsMu.Unlock()

if _, ok := b.sessions[sid]; ok {
b.expireSessions = append(
b.expireSessions,
&expireSessionEntry{sid: sid, deadline: time.Now().Unix() + b.config.SessionsTTL},
)
}
b.sessionsMu.Unlock()

b.presence.mu.Lock()

if sp, ok := b.presence.sessions[sid]; ok {
sp.deadline = time.Now().Unix() + b.config.PresenceTTL
}

b.presence.mu.Unlock()

return nil
}

func (b *Memory) PresenceAdd(stream string, sid string, pid string, info interface{}) error {
b.presence.mu.Lock()
defer b.presence.mu.Unlock()

if _, ok := b.presence.streams[stream]; !ok {
b.presence.streams[stream] = make(map[string]*presenceEntry)
}

streamPresence := b.presence.streams[stream]

if _, ok := streamPresence[pid]; !ok {
streamPresence[pid] = &presenceEntry{
info: info,
sessions: []string{},
}
}

streamSessionPresence := streamPresence[pid]

newPresence := len(streamSessionPresence.sessions) == 0

streamSessionPresence.sessions = append(
streamSessionPresence.sessions,
sid,
)

if _, ok := b.presence.sessions[sid]; !ok {
b.presence.sessions[sid] = &presenceSessionEntry{
streams: make(map[string]string),
}
}

b.presence.sessions[sid].streams[stream] = pid

if newPresence {
b.broadcaster.Broadcast(&common.StreamMessage{
Stream: stream,
Data: common.PresenceJoinMessage(pid, info),
})
}

return nil
}

func (b *Memory) PresenceRemove(stream string, sid string, pid string) error {
b.presence.mu.Lock()
defer b.presence.mu.Unlock()

if _, ok := b.presence.streams[stream]; !ok {
return nil
}

streamPresence := b.presence.streams[stream]

if _, ok := streamPresence[pid]; !ok {
return nil
}

streamSessionPresence := streamPresence[pid]

empty := streamSessionPresence.remove(sid)

if empty {
delete(streamPresence, pid)
}

if len(streamPresence) == 0 {
delete(b.presence.streams, stream)
}

if _, ok := b.presence.sessions[sid]; ok {
delete(b.presence.sessions[sid].streams, stream)
}

if empty {
b.broadcaster.Broadcast(&common.StreamMessage{
Stream: stream,
Data: common.PresenceLeaveMessage(pid),
})
}

return nil
}

func (b *Memory) PresenceInfo(stream string, opts ...PresenceInfoOption) (*PresenceInfo, error) {
options := &PresenceInfoOptions{}
for _, opt := range opts {
opt(options)
}

b.presence.mu.RLock()
defer b.presence.mu.RUnlock()

if _, ok := b.presence.streams[stream]; !ok {
return &PresenceInfo{Total: 0}, nil
}

streamPresence := b.presence.streams[stream]

info := &PresenceInfo{Total: len(streamPresence)}

if options.ReturnRecords {
info.Records = make([]interface{}, 0, len(streamPresence))

for _, entry := range streamPresence {
info.Records = append(info.Records, entry.info)
}
}

return info, nil
}

func (b *Memory) add(name string, data string) uint64 {
b.streamsMu.Lock()

Expand Down Expand Up @@ -457,4 +623,62 @@ func (b *Memory) expire() {
b.expireSessions = b.expireSessions[i:]

b.sessionsMu.Unlock()

// presence expiration
b.expirePresence()
}

func (b *Memory) expirePresence() {
b.presence.mu.Lock()

now := time.Now().Unix()
toDelete := []string{}

for sid, sp := range b.presence.sessions {
if sp.deadline < now {
toDelete = append(toDelete, sid)
}
}

leaveMessages := []common.StreamMessage{}

for _, sid := range toDelete {
entry := b.presence.sessions[sid]

for stream, pid := range entry.streams {
if _, ok := b.presence.streams[stream]; !ok {
continue
}

if _, ok := b.presence.streams[stream][pid]; !ok {
continue
}

streamSessionPresence := b.presence.streams[stream][pid]

empty := streamSessionPresence.remove(sid)

if empty {
delete(b.presence.streams[stream], pid)

leaveMessages = append(leaveMessages, common.StreamMessage{
Stream: stream,
Data: common.PresenceLeaveMessage(pid),
})

if len(b.presence.streams[stream]) == 0 {
delete(b.presence.streams, stream)
}
}
}

delete(b.presence.sessions, sid)
}

b.presence.mu.Unlock()

// TODO: batch broadcast?
for _, msg := range leaveMessages {
b.broadcaster.Broadcast(&msg)
}
}
12 changes: 12 additions & 0 deletions broker/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,18 @@ func (n *NATS) Reset() error {
return nil
}

func (n *NATS) PresenceAdd(stream string, sid string, pid string, info interface{}) error {
return errors.New("presence not supported")
}

func (n *NATS) PresenceRemove(stream string, sid string, pid string) error {
return errors.New("presence not supported")
}

func (n *NATS) PresenceInfo(stream string, opts ...PresenceInfoOption) (*PresenceInfo, error) {
return nil, errors.New("presence not supported")
}

func (n *NATS) add(stream string, data string) (uint64, error) {
err := n.ensureStreamExists(stream)

Expand Down
Loading

0 comments on commit 0f3098b

Please sign in to comment.