Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simple Flush method #195

Open
wants to merge 3 commits into
base: v3.0
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 45 additions & 11 deletions analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const Version = "3.0.0"
// provided by the package and provide a way to send messages via the HTTP API.
type Client interface {
io.Closer
http.Flusher

// Queues a message to be sent by the client when the conditions for a batch
// upload are met.
Expand All @@ -37,6 +38,7 @@ type Client interface {
// happens if the client was already closed at the time the method was
// called or if the message was malformed.
Enqueue(Message) error
EnqueueSync(Message) error
}

type client struct {
Expand All @@ -59,6 +61,12 @@ type client struct {
// This HTTP client is used to send requests to the backend, it uses the
// HTTP transport provided in the configuration.
http http.Client

// Used by the Flush method to send requests to the backend synchronously.
mq *messageQueue

// We can only run flush once at a time, so we use this mutex to synchronize.
mu *sync.Mutex
}

// Instantiate a new client that uses the write key passed as first argument to
Expand Down Expand Up @@ -89,6 +97,13 @@ func NewWithConfig(writeKey string, config Config) (cli Client, err error) {
http: makeHttpClient(config.Transport),
}

mq := messageQueue{
maxBatchSize: c.BatchSize,
maxBatchBytes: c.maxBatchBytes(),
}
c.mq = &mq
c.mu = &sync.Mutex{}

go c.loop()

cli = c
Expand Down Expand Up @@ -143,6 +158,12 @@ func dereferenceMessage(msg Message) Message {
}

func (c *client) Enqueue(msg Message) (err error) {
return c.enqueue(msg, true)
}
func (c *client) EnqueueSync(msg Message) (err error) {
return c.enqueue(msg, false)
}
func (c *client) enqueue(msg Message, async bool) (err error) {
msg = dereferenceMessage(msg)
if err = msg.Validate(); err != nil {
return
Expand Down Expand Up @@ -203,10 +224,24 @@ func (c *client) Enqueue(msg Message) (err error) {
}
}()

c.msgs <- msg
if async {
c.msgs <- msg
} else {
c.push(c.mq, msg, nil, nil)
}
return
}

// Flush flush metrics synchronously without closing the client.
func (c *client) Flush() {
c.mu.Lock()
defer c.mu.Unlock()
if msgs := c.mq.flush(); msgs != nil {
c.debugf("flushing %d messages synchronously", len(msgs))
c.send(msgs)
}
}

// Close and flush metrics.
func (c *client) Close() (err error) {
defer func() {
Expand Down Expand Up @@ -336,18 +371,13 @@ func (c *client) loop() {
ex := newExecutor(c.maxConcurrentRequests)
defer ex.close()

mq := messageQueue{
maxBatchSize: c.BatchSize,
maxBatchBytes: c.maxBatchBytes(),
}

for {
select {
case msg := <-c.msgs:
c.push(&mq, msg, wg, ex)
c.push(c.mq, msg, wg, ex)

case <-tick.C:
c.flush(&mq, wg, ex)
c.flush(c.mq, wg, ex)

case <-c.quit:
c.debugf("exit requested – draining messages")
Expand All @@ -356,10 +386,10 @@ func (c *client) loop() {
// messages can be pushed and otherwise the loop would never end.
close(c.msgs)
for msg := range c.msgs {
c.push(&mq, msg, wg, ex)
c.push(c.mq, msg, wg, ex)
}

c.flush(&mq, wg, ex)
c.flush(c.mq, wg, ex)
c.debugf("exit")
return
}
Expand All @@ -380,11 +410,15 @@ func (c *client) push(q *messageQueue, m Message, wg *sync.WaitGroup, ex *execut

if msgs := q.push(msg); msgs != nil {
c.debugf("exceeded messages batch limit with batch of %d messages – flushing", len(msgs))
c.sendAsync(msgs, wg, ex)
if wg != nil && ex != nil {
c.sendAsync(msgs, wg, ex)
}
}
}

func (c *client) flush(q *messageQueue, wg *sync.WaitGroup, ex *executor) {
c.mu.Lock()
defer c.mu.Unlock()
if msgs := q.flush(); msgs != nil {
c.debugf("flushing %d messages", len(msgs))
c.sendAsync(msgs, wg, ex)
Expand Down