diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index dea6bd6c..da20c7e8 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -1527,6 +1527,11 @@ func TestUpdatePackages(t *testing.T) { errorOnCallback.errorOnCallback = true tests = append(tests, errorOnCallback) + // Check that the downloading status is sent + downloading := createPackageTestCase("download status set", downloadSrv) + downloading.expectedStatus.Packages["package1"].Status = protobufs.PackageStatusEnum_PackageStatusEnum_Downloading + tests = append(tests, downloading) + for _, test := range tests { t.Run(test.name, func(t *testing.T) { verifyUpdatePackages(t, test) diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 283e3b01..40abe08c 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "time" "google.golang.org/protobuf/proto" @@ -59,6 +60,8 @@ type ClientCommon struct { // Indicates that the Client is fully stopped. stoppedSignal chan struct{} + + downloadReporterInterval time.Duration } // NewClientCommon creates a new ClientCommon. @@ -144,6 +147,12 @@ func (c *ClientCommon) PrepareStart( return err } + if settings.DownloadReporterInterval != nil && settings.DownloadReporterInterval < time.Second { + c.downloadReporterInterval = time.Second + } else if settings.DownloadReporterInterval != nil { + c.downloadReporterInterval = settings.DownloadReporterInterval + } + return nil } diff --git a/client/internal/httpsender.go b/client/internal/httpsender.go index a97e1311..6c638403 100644 --- a/client/internal/httpsender.go +++ b/client/internal/httpsender.go @@ -96,7 +96,7 @@ func (h *HTTPSender) Run( ) { h.url = url h.callbacks = callbacks - h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex) + h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex, c.common.downloadReporterInterval) // we need to detect if the redirect was ever set, if not, we want default behaviour if callbacks.CheckRedirect != nil { diff --git a/client/internal/inmempackagestore.go b/client/internal/inmempackagestore.go index dc719e7c..720324b3 100644 --- a/client/internal/inmempackagestore.go +++ b/client/internal/inmempackagestore.go @@ -3,6 +3,7 @@ package internal import ( "context" "io" + "maps" "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" @@ -92,7 +93,7 @@ func (l *InMemPackagesStore) SetAllPackagesHash(hash []byte) error { } func (l *InMemPackagesStore) GetContent() map[string][]byte { - return l.fileContents + return maps.Clone(l.fileContents) } func (l *InMemPackagesStore) GetSignature() map[string][]byte { diff --git a/client/internal/package_download_details_reporter.go b/client/internal/package_download_details_reporter.go new file mode 100644 index 00000000..cba22685 --- /dev/null +++ b/client/internal/package_download_details_reporter.go @@ -0,0 +1,68 @@ +package internal + +import ( + "context" + "sync/atomic" + "time" +) + +const downloadReporterDefaultInterval = time.Second * 10 + +type downloadReporter struct { + start time.Time + interval time.Duration + packageLength float64 + + downloaded atomic.Uint64 + + done chan struct{} +} + +func newDownloadReporter(interval time.Duration, length int) *downloadReporter { + if interval <= 0 { + interval = downloadReporterDefaultInterval + } + return &downloadReporter{ + start: time.Now(), + interval: interval, + packageLength: float64(length), + done: make(chan struct{}), + } +} + +// Write tracks the number of bytes downloaded. It will never return an error. +func (p *downloadReporter) Write(b []byte) (int, error) { + n := len(b) + p.downloaded.Add(uint64(n)) + return n, nil +} + +// report periodically calls the passed function to with the download percent and rate to update the status of a package. +func (p *downloadReporter) report(ctx context.Context, updateFn func(context.Context, float, float) error) { + go func() { + ticker := time.NewTicker(p.interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-p.done: + return + case <-ticker.C: + downloadTime := time.Now().Sub(p.start) + downloaded := float64(p.downloaded.Load()) + bps := downloaded / float64(downloadTime/time.Second) + var downloadPercent float64 + if p.packageLength > 0 { + downloadPercent = downloaded / p.packageLength * 100 + } + _ = updateFn(ctx, downloadPercent, bps) + } + } + }() +} + +// stop the downloadReporter report goroutine +func (p *downloadReporter) stop() { + close(p.done) +} diff --git a/client/internal/packagessyncer.go b/client/internal/packagessyncer.go index c02828d4..fea88b3e 100644 --- a/client/internal/packagessyncer.go +++ b/client/internal/packagessyncer.go @@ -5,8 +5,11 @@ import ( "context" "errors" "fmt" + "io" "net/http" + "strconv" "sync" + "time" "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" @@ -19,6 +22,7 @@ type packagesSyncer struct { clientSyncedState *ClientSyncedState localState types.PackagesStateProvider sender Sender + reporterInterval time.Duration statuses *protobufs.PackageStatuses mux *sync.Mutex @@ -33,6 +37,7 @@ func NewPackagesSyncer( clientSyncedState *ClientSyncedState, packagesStateProvider types.PackagesStateProvider, mux *sync.Mutex, + reporterInterval time.Duration, ) *packagesSyncer { return &packagesSyncer{ logger: logger, @@ -42,6 +47,7 @@ func NewPackagesSyncer( localState: packagesStateProvider, doneCh: make(chan struct{}), mux: mux, + reporterInterval: reporterInterval, } } @@ -273,6 +279,10 @@ func (s *packagesSyncer) shouldDownloadFile(ctx context.Context, // downloadFile downloads the file from the server. func (s *packagesSyncer) downloadFile(ctx context.Context, pkgName string, file *protobufs.DownloadableFile) error { + status := s.statuses.Packages[pkgName] + status.Status = protobufs.PackageStatusEnum_PackageStatusEnum_Downloading + _ = s.reportStatuses(ctx, true) + s.logger.Debugf(ctx, "Downloading package %s file from %s", pkgName, file.DownloadUrl) req, err := http.NewRequestWithContext(ctx, "GET", file.DownloadUrl, nil) @@ -290,13 +300,37 @@ func (s *packagesSyncer) downloadFile(ctx context.Context, pkgName string, file return fmt.Errorf("cannot download file from %s, HTTP response=%v", file.DownloadUrl, resp.StatusCode) } - err = s.localState.UpdateContent(ctx, pkgName, resp.Body, file.ContentHash, file.Signature) + // Package length is required to be able to report download percent. + packageLength := -1 + if contentLength := resp.Header.Get("Content-Length"); contentLength != "" { + if length, err := strconv.Atoi(contentLength); err == nil { + packageLength = length + } + } + // start the download reporter + detailsReporter := newDownloadReporter(s.reporterInterval, packageLength) + detailsReporter.report(ctx, s.getDownloadDetailsFn(pkgName)) + defer detailsReporter.stop() + + tr := io.TeeReader(resp.Body, detailsReporter) + err = s.localState.UpdateContent(ctx, pkgName, tr, file.ContentHash, file.Signature) if err != nil { return fmt.Errorf("failed to install/update the package %s downloaded from %s: %v", pkgName, file.DownloadUrl, err) } return nil } +func (s *packagesSyncer) getDownloadDetailsFn(pkgName string) func(context.Context, float64, float64) error { + return func(ctx context.Context, percent, rate float64) error { + status := s.statuses.Packages[pkgName] + status.DownloadDetails = &protobufs.PackageDownloadDetails{ + DownloadPercent: percent, + DownloadBytesPerSecond: rate, + } + return s.reportStatuses(ctx, true) + } +} + // deleteUnneededLocalPackages deletes local packages that are not // needed anymore. This is done by comparing the local package state // with the server's package state. diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index aee05a81..1644b302 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -31,6 +31,10 @@ type receivedProcessor struct { // Agent's capabilities defined at Start() time. capabilities protobufs.AgentCapabilities + + // Download reporter interval value + // a negative number indicates that the default should be used instead. + reporterInterval time.Duration } func newReceivedProcessor( @@ -41,6 +45,7 @@ func newReceivedProcessor( packagesStateProvider types.PackagesStateProvider, capabilities protobufs.AgentCapabilities, packageSyncMutex *sync.Mutex, + reporterInt time.Duration, ) receivedProcessor { return receivedProcessor{ logger: logger, @@ -50,6 +55,7 @@ func newReceivedProcessor( packagesStateProvider: packagesStateProvider, capabilities: capabilities, packageSyncMutex: packageSyncMutex, + reporterInterval: reporterInt, } } @@ -129,6 +135,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro r.clientSyncedState, r.packagesStateProvider, r.packageSyncMutex, + r.reporterInterval, ) } else { r.logger.Debugf(ctx, "Ignoring PackagesAvailable, agent does not have AcceptsPackages capability") diff --git a/client/internal/wsreceiver.go b/client/internal/wsreceiver.go index bb9c90bc..3b2d482e 100644 --- a/client/internal/wsreceiver.go +++ b/client/internal/wsreceiver.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/gorilla/websocket" "github.com/open-telemetry/opamp-go/client/types" @@ -34,13 +35,14 @@ func NewWSReceiver( packagesStateProvider types.PackagesStateProvider, capabilities protobufs.AgentCapabilities, packageSyncMutex *sync.Mutex, + reporterInterval time.Duration, ) *wsReceiver { w := &wsReceiver{ conn: conn, logger: logger, sender: sender, callbacks: callbacks, - processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex), + processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities, packageSyncMutex, reporterInterval), stopped: make(chan struct{}), } diff --git a/client/types/startsettings.go b/client/types/startsettings.go index 6184d575..c8ba7c19 100644 --- a/client/types/startsettings.go +++ b/client/types/startsettings.go @@ -64,4 +64,9 @@ type StartSettings struct { // // If the ReportsHeartbeat capability is disabled, this option has no effect. HeartbeatInterval *time.Duration + + // Optional DownloadReporterInterval to configure how often a client reports the status of a package that is being downloaded. + // If nil, the default reporter interval (10s) will be used. + // If specified a minimum value of 1s will be enforced. + DownloadReporterInterval *time.Duration } diff --git a/client/wsclient.go b/client/wsclient.go index f19d8ab4..425a41c7 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -359,6 +359,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { c.common.PackagesStateProvider, c.common.Capabilities, &c.common.PackageSyncMutex, + c.common.downloadReporterInterval, ) // When the wsclient is closed, the context passed to runOneCycle will be canceled.