forked from go-kivik/kivik
-
Notifications
You must be signed in to change notification settings - Fork 0
/
updates.go
91 lines (77 loc) · 2.3 KB
/
updates.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package kivik
import (
"context"
"errors"
"net/http"
"github.com/go-kivik/kivik/driver"
)
// DBUpdates provides access to database updates.
type DBUpdates struct {
*iter
updatesi driver.DBUpdates
}
// Next returns the next DBUpdate from the feed. This function will block
// until an event is received. If an error occurs, it will be returned and
// the feed closed. If the feed was closed normally, io.EOF will be returned
// when there are no more events in the buffer.
func (f *DBUpdates) Next() bool {
return f.iter.Next()
}
// Close closes the feed. Any unread updates will still be accessible via
// Next().
func (f *DBUpdates) Close() error {
return f.iter.Close()
}
// Err returns the error, if any, that was encountered during iteration. Err
// may be called after an explicit or implicit Close.
func (f *DBUpdates) Err() error {
return f.iter.Err()
}
type updatesIterator struct{ driver.DBUpdates }
var _ iterator = &updatesIterator{}
func (r *updatesIterator) Next(i interface{}) error { return r.DBUpdates.Next(i.(*driver.DBUpdate)) }
func newDBUpdates(ctx context.Context, updatesi driver.DBUpdates) *DBUpdates {
return &DBUpdates{
iter: newIterator(ctx, &updatesIterator{updatesi}, &driver.DBUpdate{}),
updatesi: updatesi,
}
}
// DBName returns the database name for the current update.
func (f *DBUpdates) DBName() string {
runlock, err := f.rlock()
if err != nil {
return ""
}
defer runlock()
return f.curVal.(*driver.DBUpdate).DBName
}
// Type returns the type of the current update.
func (f *DBUpdates) Type() string {
runlock, err := f.rlock()
if err != nil {
return ""
}
defer runlock()
return f.curVal.(*driver.DBUpdate).Type
}
// Seq returns the update sequence of the current update.
func (f *DBUpdates) Seq() string {
runlock, err := f.rlock()
if err != nil {
return ""
}
defer runlock()
return f.curVal.(*driver.DBUpdate).Seq
}
// DBUpdates begins polling for database updates.
func (c *Client) DBUpdates(ctx context.Context) (*DBUpdates, error) {
updater, ok := c.driverClient.(driver.DBUpdater)
if !ok {
return nil, &Error{HTTPStatus: http.StatusNotImplemented, Err: errors.New("kivik: driver does not implement DBUpdater")}
}
updatesi, err := updater.DBUpdates(ctx)
if err != nil {
return nil, err
}
return newDBUpdates(context.Background(), updatesi), nil
}