Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client reports download status and download details #341

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1527,6 +1527,11 @@ func TestUpdatePackages(t *testing.T) {
errorOnCallback.errorOnCallback = true
tests = append(tests, errorOnCallback)

// Check that the downloading status is sent
downloading := createPackageTestCase("download status set", downloadSrv)
downloading.expectedStatus.Packages["package1"].Status = protobufs.PackageStatusEnum_PackageStatusEnum_Downloading
tests = append(tests, downloading)

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
verifyUpdatePackages(t, test)
Expand Down
9 changes: 9 additions & 0 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"errors"
"fmt"
"sync"
"time"

"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -59,6 +60,8 @@

// Indicates that the Client is fully stopped.
stoppedSignal chan struct{}

downloadReporterInterval time.Duration
}

// NewClientCommon creates a new ClientCommon.
Expand Down Expand Up @@ -144,9 +147,15 @@
return err
}

if settings.DownloadReporterInterval != nil && settings.DownloadReporterInterval < time.Second {
c.downloadReporterInterval = time.Second
} else if settings.DownloadReporterInterval != nil {
c.downloadReporterInterval = settings.DownloadReporterInterval
}

Check failure on line 155 in client/internal/clientcommon.go

View workflow job for this annotation

GitHub Actions / build-and-test

invalid operation: settings.DownloadReporterInterval < time.Second (mismatched types *time.Duration and time.Duration)

Check failure on line 155 in client/internal/clientcommon.go

View workflow job for this annotation

GitHub Actions / test-coverage

invalid operation: settings.DownloadReporterInterval < time.Second (mismatched types *time.Duration and time.Duration)
return nil
}

Check failure on line 158 in client/internal/clientcommon.go

View workflow job for this annotation

GitHub Actions / build-and-test

cannot use settings.DownloadReporterInterval (variable of type *time.Duration) as time.Duration value in assignment

Check failure on line 158 in client/internal/clientcommon.go

View workflow job for this annotation

GitHub Actions / test-coverage

cannot use settings.DownloadReporterInterval (variable of type *time.Duration) as time.Duration value in assignment
// Stop stops the client. It returns an error if the client is not started.
func (c *ClientCommon) Stop(ctx context.Context) error {
if !c.isStarted {
Expand Down
2 changes: 1 addition & 1 deletion client/internal/httpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
) {
h.url = url
h.callbacks = callbacks
h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex)
h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex, c.common.downloadReporterInterval)

Check failure on line 99 in client/internal/httpsender.go

View workflow job for this annotation

GitHub Actions / build-and-test

undefined: c

Check failure on line 99 in client/internal/httpsender.go

View workflow job for this annotation

GitHub Actions / test-coverage

undefined: c

// we need to detect if the redirect was ever set, if not, we want default behaviour
if callbacks.CheckRedirect != nil {
Expand Down
3 changes: 2 additions & 1 deletion client/internal/inmempackagestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"context"
"io"
"maps"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand Down Expand Up @@ -92,7 +93,7 @@ func (l *InMemPackagesStore) SetAllPackagesHash(hash []byte) error {
}

func (l *InMemPackagesStore) GetContent() map[string][]byte {
return l.fileContents
return maps.Clone(l.fileContents)
}

func (l *InMemPackagesStore) GetSignature() map[string][]byte {
Expand Down
68 changes: 68 additions & 0 deletions client/internal/package_download_details_reporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package internal

import (
"context"
"sync/atomic"
"time"
)

const downloadReporterDefaultInterval = time.Second * 10

type downloadReporter struct {
start time.Time
interval time.Duration
packageLength float64

downloaded atomic.Uint64

done chan struct{}
}

func newDownloadReporter(interval time.Duration, length int) *downloadReporter {
if interval <= 0 {
interval = downloadReporterDefaultInterval
}
return &downloadReporter{
start: time.Now(),
interval: interval,
packageLength: float64(length),
done: make(chan struct{}),
}
}

// Write tracks the number of bytes downloaded. It will never return an error.
func (p *downloadReporter) Write(b []byte) (int, error) {
n := len(b)
p.downloaded.Add(uint64(n))
return n, nil
}

// report periodically calls the passed function to with the download percent and rate to update the status of a package.
func (p *downloadReporter) report(ctx context.Context, updateFn func(context.Context, float, float) error) {

Check failure on line 41 in client/internal/package_download_details_reporter.go

View workflow job for this annotation

GitHub Actions / build-and-test

undefined: float

Check failure on line 41 in client/internal/package_download_details_reporter.go

View workflow job for this annotation

GitHub Actions / test-coverage

undefined: float
go func() {
ticker := time.NewTicker(p.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-p.done:
return
case <-ticker.C:
downloadTime := time.Now().Sub(p.start)
downloaded := float64(p.downloaded.Load())
bps := downloaded / float64(downloadTime/time.Second)
var downloadPercent float64
if p.packageLength > 0 {
downloadPercent = downloaded / p.packageLength * 100
}
_ = updateFn(ctx, downloadPercent, bps)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can we change updateFn to take a struct { DownloadPercent float, Bps float } so that it is clear which of the 2 floats is which.

}
}
}()
}

// stop the downloadReporter report goroutine
func (p *downloadReporter) stop() {
close(p.done)
}
36 changes: 35 additions & 1 deletion client/internal/packagessyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
"context"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand All @@ -19,6 +22,7 @@
clientSyncedState *ClientSyncedState
localState types.PackagesStateProvider
sender Sender
reporterInterval time.Duration

statuses *protobufs.PackageStatuses
mux *sync.Mutex
Expand All @@ -33,6 +37,7 @@
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
mux *sync.Mutex,
reporterInterval time.Duration,
) *packagesSyncer {
return &packagesSyncer{
logger: logger,
Expand All @@ -42,6 +47,7 @@
localState: packagesStateProvider,
doneCh: make(chan struct{}),
mux: mux,
reporterInterval: reporterInterval,
}
}

Expand Down Expand Up @@ -273,6 +279,10 @@

// downloadFile downloads the file from the server.
func (s *packagesSyncer) downloadFile(ctx context.Context, pkgName string, file *protobufs.DownloadableFile) error {
status := s.statuses.Packages[pkgName]
status.Status = protobufs.PackageStatusEnum_PackageStatusEnum_Downloading
_ = s.reportStatuses(ctx, true)

s.logger.Debugf(ctx, "Downloading package %s file from %s", pkgName, file.DownloadUrl)

req, err := http.NewRequestWithContext(ctx, "GET", file.DownloadUrl, nil)
Expand All @@ -290,13 +300,37 @@
return fmt.Errorf("cannot download file from %s, HTTP response=%v", file.DownloadUrl, resp.StatusCode)
}

err = s.localState.UpdateContent(ctx, pkgName, resp.Body, file.ContentHash, file.Signature)
// Package length is required to be able to report download percent.
packageLength := -1
if contentLength := resp.Header.Get("Content-Length"); contentLength != "" {
if length, err := strconv.Atoi(contentLength); err == nil {
packageLength = length
}
}
// start the download reporter
detailsReporter := newDownloadReporter(s.reporterInterval, packageLength)
detailsReporter.report(ctx, s.getDownloadDetailsFn(pkgName))
defer detailsReporter.stop()

tr := io.TeeReader(resp.Body, detailsReporter)
err = s.localState.UpdateContent(ctx, pkgName, tr, file.ContentHash, file.Signature)
if err != nil {
return fmt.Errorf("failed to install/update the package %s downloaded from %s: %v", pkgName, file.DownloadUrl, err)
}
return nil
}

func (s *packagesSyncer) getDownloadDetailsFn(pkgName string) func(context.Context, float64, float64) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this func updates the download details, so perhaps a more fitting name is updateDownloadDetails?

return func(ctx context.Context, percent, rate float64) error {
status := s.statuses.Packages[pkgName]
status.DownloadDetails = &protobufs.PackageDownloadDetails{
DownloadPercent: percent,
DownloadBytesPerSecond: rate,

Check failure on line 328 in client/internal/packagessyncer.go

View workflow job for this annotation

GitHub Actions / build-and-test

cannot use rate (variable of type float64) as uint64 value in struct literal

Check failure on line 328 in client/internal/packagessyncer.go

View workflow job for this annotation

GitHub Actions / test-coverage

cannot use rate (variable of type float64) as uint64 value in struct literal
}
return s.reportStatuses(ctx, true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will call SetLastReportedStatuses. I think to be safe we need to document SetLastReportedStatuses and require that it must be prepared to be called concurrently.

Reading the current code, it appears we are NOT calling it concurrently. We call once at the start of downloadFile and then periodically, while detailsReporter is active. However, it still makes me uncomfortable, since these calls are made from different goroutines and it is very easy to accidentally change the code so that we end up calling reportStatuses concurrently.

Just to future-proof it I think it make sense to caution implementors of SetLastReportedStatuses and ask them to guard any mutations they have against concurrent call. We can also demonstrate it on InMemPackagesStore implementation (should be trivial to add a mutex that protects the updates).

What do you think?

}
}

// deleteUnneededLocalPackages deletes local packages that are not
// needed anymore. This is done by comparing the local package state
// with the server's package state.
Expand Down
7 changes: 7 additions & 0 deletions client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type receivedProcessor struct {

// Agent's capabilities defined at Start() time.
capabilities protobufs.AgentCapabilities

// Download reporter interval value
// a negative number indicates that the default should be used instead.
reporterInterval time.Duration
}

func newReceivedProcessor(
Expand All @@ -41,6 +45,7 @@ func newReceivedProcessor(
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
packageSyncMutex *sync.Mutex,
reporterInt time.Duration,
) receivedProcessor {
return receivedProcessor{
logger: logger,
Expand All @@ -50,6 +55,7 @@ func newReceivedProcessor(
packagesStateProvider: packagesStateProvider,
capabilities: capabilities,
packageSyncMutex: packageSyncMutex,
reporterInterval: reporterInt,
}
}

Expand Down Expand Up @@ -129,6 +135,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro
r.clientSyncedState,
r.packagesStateProvider,
r.packageSyncMutex,
r.reporterInterval,
)
} else {
r.logger.Debugf(ctx, "Ignoring PackagesAvailable, agent does not have AcceptsPackages capability")
Expand Down
4 changes: 3 additions & 1 deletion client/internal/wsreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/gorilla/websocket"
"github.com/open-telemetry/opamp-go/client/types"
Expand Down Expand Up @@ -34,13 +35,14 @@ func NewWSReceiver(
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
packageSyncMutex *sync.Mutex,
reporterInterval time.Duration,
) *wsReceiver {
w := &wsReceiver{
conn: conn,
logger: logger,
sender: sender,
callbacks: callbacks,
processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex),
processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex, reporterInterval),
stopped: make(chan struct{}),
}

Expand Down
5 changes: 5 additions & 0 deletions client/types/startsettings.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,9 @@ type StartSettings struct {
//
// If the ReportsHeartbeat capability is disabled, this option has no effect.
HeartbeatInterval *time.Duration

// Optional DownloadReporterInterval to configure how often a client reports the status of a package that is being downloaded.
// If nil, the default reporter interval (10s) will be used.
// If specified a minimum value of 1s will be enforced.
DownloadReporterInterval *time.Duration
}
1 change: 1 addition & 0 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) {
c.common.PackagesStateProvider,
c.common.Capabilities,
&c.common.PackageSyncMutex,
c.common.downloadReporterInterval,
)

// When the wsclient is closed, the context passed to runOneCycle will be canceled.
Expand Down
Loading