-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
176 lines (148 loc) · 5.03 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package automergendjsonsync
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"sync"
"github.com/automerge/automerge-go"
)
// The messageGenerator is a io.ReadCloser which runs generateMessagesToWriter in a goroutine once the first read is
// requested. This uses an IO pipe so messages are only generated at the rate that they are being read off the reader.
// Some extra context and synchronisation elements are stored in the struct in order to optimise things.
type messageGenerator struct {
ctx context.Context
state *automerge.SyncState
hints <-chan bool
writer *io.PipeWriter
reader *io.PipeReader
wg *sync.WaitGroup
once sync.Once
}
func newMessageGenerator(ctx context.Context, state *automerge.SyncState, hints <-chan bool, wg *sync.WaitGroup) *messageGenerator {
return &messageGenerator{ctx: ctx, state: state, hints: hints, wg: wg}
}
func (mg *messageGenerator) background() {
defer mg.wg.Done()
if err := generateMessagesToWriter(mg.ctx, mg.state, mg.hints, mg.writer, false); err != nil && !errors.Is(err, context.Canceled) {
_ = mg.writer.CloseWithError(err)
} else {
_ = mg.writer.Close()
}
}
func (mg *messageGenerator) start() {
mg.wg.Add(1)
mg.reader, mg.writer = io.Pipe()
go mg.background()
}
func (mg *messageGenerator) Close() error {
mg.once.Do(func() {})
if mg.reader == nil {
return nil
}
return mg.reader.Close()
}
func (mg *messageGenerator) Read(p []byte) (int, error) {
mg.once.Do(mg.start)
if mg.reader == nil {
return 0, io.ErrClosedPipe
}
return mg.reader.Read(p)
}
var _ io.ReadCloser = (*messageGenerator)(nil)
type HttpDoer interface {
Do(req *http.Request) (*http.Response, error)
}
type HttpDoerFunc func(*http.Request) (*http.Response, error)
func (f HttpDoerFunc) Do(req *http.Request) (*http.Response, error) {
return f(req)
}
type clientOptions struct {
client HttpDoer
state *automerge.SyncState
terminationCheck TerminationCheck
reqEditors []func(r *http.Request)
}
type ClientOption func(*clientOptions)
func newClientOptions(opts ...ClientOption) *clientOptions {
options := &clientOptions{client: http.DefaultClient, terminationCheck: NoTerminationCheck}
for _, opt := range opts {
opt(options)
}
return options
}
func WithHttpClient(client HttpDoer) ClientOption {
return func(o *clientOptions) {
o.client = client
}
}
func WithClientSyncState(state *automerge.SyncState) ClientOption {
return func(o *clientOptions) {
o.state = state
}
}
func WithClientTerminationCheck(check TerminationCheck) ClientOption {
return func(o *clientOptions) {
o.terminationCheck = check
}
}
func WithClientRequestEditor(f func(r *http.Request)) ClientOption {
return func(o *clientOptions) {
o.reqEditors = append(o.reqEditors, f)
}
}
// HttpPushPullChanges is the HTTP client function to synchronise a local document with a remote server. This uses either HTTP2 or HTTP1.1 depending on the
// remote server - HTTP2 is preferred since it has better understood bidirectional body capabilities.
func (b *SharedDoc) HttpPushPullChanges(ctx context.Context, url string, opts ...ClientOption) error {
log := Logger(ctx)
o := newClientOptions(opts...)
if o.state == nil {
o.state = automerge.NewSyncState(b.Doc())
}
// We use the PUT method here because we are modifying a document in place.
r, err := http.NewRequestWithContext(ctx, http.MethodPut, url, nil)
if err != nil {
return fmt.Errorf("failed to setup request: %w", err)
}
r.Header.Set("Content-Type", ContentTypeWithCharset)
r.Header.Set("Accept", ContentType)
r.Header.Set("Cache-Control", "no-store")
// We don't need to send the body content if the server will reject it, so we can notify that expect-continue is supported.
r.Header.Set("Expect", "100-continue")
for _, editor := range o.reqEditors {
editor(r)
}
wg := new(sync.WaitGroup)
defer wg.Wait()
sub, fin := b.SubscribeToReceivedChanges()
defer fin()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// We use a special body generator that runs in a goroutine on demand in order to generate new messages.
r.Body = newMessageGenerator(ctx, o.state, sub, wg)
r.GetBody = func() (io.ReadCloser, error) {
return newMessageGenerator(ctx, o.state, sub, wg), nil
}
res, err := o.client.Do(r)
if err != nil {
return fmt.Errorf("http request failed: %w", err)
}
log.InfoContext(ctx, "received http sync response", slog.String("proto", res.Proto), slog.String("target", fmt.Sprintf("%s %s", http.MethodPut, url)), slog.Int("status", res.StatusCode))
if res.StatusCode != 200 {
return fmt.Errorf("http request failed with status %d", res.StatusCode)
}
defer func() {
if err := res.Body.Close(); err != nil {
log.ErrorContext(ctx, "failed to close response body", slog.Any("err", err))
}
}()
if v := res.Header.Get("Content-Type"); v != "" && isNotSuitableContentType(v) {
return fmt.Errorf("http request returned a response with an unsuitable content type %s", v)
}
if _, err := b.consumeMessagesFromReader(ctx, o.state, res.Body, NoReadPredicate, o.terminationCheck); err != nil {
return err
}
return nil
}