From a0df0fa14c427ca468659d8dda3d65004b9d64ee Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sat, 16 Jul 2022 18:20:16 +0530 Subject: [PATCH 1/3] Update client package docs --- client/httpclient.go | 2 ++ client/internal/clientcommon.go | 15 +++++++++++++++ client/internal/clientstate.go | 5 +++-- client/internal/httpsender.go | 5 +++++ client/internal/nextmessage.go | 1 + client/internal/packagessyncer.go | 15 +++++++++++++++ client/internal/sender.go | 4 +++- client/internal/wsreceiver.go | 3 +++ client/internal/wssender.go | 2 ++ client/types/callbacks.go | 4 +++- client/wsclient.go | 1 + 11 files changed, 53 insertions(+), 4 deletions(-) diff --git a/client/httpclient.go b/client/httpclient.go index 96f89224..9b8bbda3 100644 --- a/client/httpclient.go +++ b/client/httpclient.go @@ -20,6 +20,7 @@ type httpClient struct { sender *internal.HTTPSender } +// NewHTTP creates a new OpAMP Client that uses HTTP transport. func NewHTTP(logger types.Logger) *httpClient { if logger == nil { logger = &sharedinternal.NopLogger{} @@ -33,6 +34,7 @@ func NewHTTP(logger types.Logger) *httpClient { return w } +// Start starts the client and runs until Stop is called. func (c *httpClient) Start(ctx context.Context, settings types.StartSettings) error { if err := c.common.PrepareStart(ctx, settings); err != nil { return err diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 313e2326..4df566bb 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -49,12 +49,15 @@ type ClientCommon struct { stoppedSignal chan struct{} } +// NewClientCommon creates a new ClientCommon. func NewClientCommon(logger types.Logger, sender Sender) ClientCommon { return ClientCommon{ Logger: logger, sender: sender, stoppedSignal: make(chan struct{}, 1), } } +// PrepareStart prepares the client state for the next Start() call. +// It returns an error if the client is already started, or if the settings are invalid. func (c *ClientCommon) PrepareStart( _ context.Context, settings types.StartSettings, ) error { @@ -112,6 +115,7 @@ func (c *ClientCommon) PrepareStart( return nil } +// Stop stops the client. It returns an error if the client is not started. func (c *ClientCommon) Stop(ctx context.Context) error { if !c.isStarted { return errCannotStopNotStarted @@ -133,12 +137,15 @@ func (c *ClientCommon) Stop(ctx context.Context) error { return nil } +// IsStopping returns true if Stop() was called. func (c *ClientCommon) IsStopping() bool { c.isStoppingMutex.RLock() defer c.isStoppingMutex.RUnlock() return c.isStoppingFlag } +// StartConnectAndRun initiates the connection with the Server and starts the +// background goroutine that handles the communication unitl client is stopped. func (c *ClientCommon) StartConnectAndRun(runner func(ctx context.Context)) { // Create a cancellable context. runCtx, runCancel := context.WithCancel(context.Background()) @@ -256,6 +263,10 @@ func (c *ClientCommon) UpdateEffectiveConfig(ctx context.Context) error { return nil } +// SetRemoteConfigStatus sends a status update to the Server if the new RemoteConfigStatus +// is different from the status we already have in the state. +// It also remembers the new RemoteConfigStatus in the client state so that it can be +// sent to the Server when the Server asks for it. func (c *ClientCommon) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error { if status.LastRemoteConfigHash == nil { return errLastRemoteConfigHashNil @@ -283,6 +294,10 @@ func (c *ClientCommon) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatu return nil } +// SetPackageStatuses sends a status update to the Server if the new PackageStatuses +// are different from the ones we already have in the state. +// It also remembers the new PackageStatuses in the client state so that it can be +// sent to the Server when the Server asks for it. func (c *ClientCommon) SetPackageStatuses(statuses *protobufs.PackageStatuses) error { if statuses.ServerProvidedAllPackagesHash == nil { return errServerProvidedAllPackagesHashNil diff --git a/client/internal/clientstate.go b/client/internal/clientstate.go index e0f65075..95a04f81 100644 --- a/client/internal/clientstate.go +++ b/client/internal/clientstate.go @@ -16,8 +16,8 @@ var ( ) // ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to -// have access to synchronize to the Server. 3 messages can be stored in this store: -// AgentDescription, RemoteConfigStatus and PackageStatuses. +// have access to synchronize to the Server. 4 messages can be stored in this store: +// AgentDescription, AgentHealth, RemoteConfigStatus and PackageStatuses. // // See OpAMP spec for more details on how state synchronization works: // https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#Agent-to-Server-state-synchronization @@ -82,6 +82,7 @@ func (s *ClientSyncedState) SetAgentDescription(descr *protobufs.AgentDescriptio return nil } +// SetHealth sets the AgentHealth in the state. func (s *ClientSyncedState) SetHealth(health *protobufs.AgentHealth) error { if health == nil { return ErrAgentHealthMissing diff --git a/client/internal/httpsender.go b/client/internal/httpsender.go index 4db1c244..d6b98095 100644 --- a/client/internal/httpsender.go +++ b/client/internal/httpsender.go @@ -41,6 +41,8 @@ type HTTPSender struct { receiveProcessor receivedProcessor } +// NewHTTPSender creates a new Sender that uses HTTP to send messages +// with default settings. func NewHTTPSender(logger types.Logger) *HTTPSender { h := &HTTPSender{ SenderCommon: NewSenderCommon(), @@ -102,6 +104,9 @@ func (h *HTTPSender) SetRequestHeader(header http.Header) { h.requestHeader.Set(headerContentType, contentTypeProtobuf) } +// makeOneRequestRoundtrip sends a request and receives a response. +// It will retry the request if the server responds with too many +// requests or unavailable status. func (h *HTTPSender) makeOneRequestRoundtrip(ctx context.Context) { resp, err := h.sendRequestWithRetries(ctx) if err != nil { diff --git a/client/internal/nextmessage.go b/client/internal/nextmessage.go index a8da82c4..e79c6780 100644 --- a/client/internal/nextmessage.go +++ b/client/internal/nextmessage.go @@ -18,6 +18,7 @@ type NextMessage struct { messageMutex sync.Mutex } +// NewNextMessage returns a new empty NextMessage. func NewNextMessage() NextMessage { return NextMessage{ nextMessage: &protobufs.AgentToServer{}, diff --git a/client/internal/packagessyncer.go b/client/internal/packagessyncer.go index 422c2179..8f76a2cc 100644 --- a/client/internal/packagessyncer.go +++ b/client/internal/packagessyncer.go @@ -23,6 +23,7 @@ type packagesSyncer struct { doneCh chan struct{} } +// NewPackagesSyncer creates a new packages syncer. func NewPackagesSyncer( logger types.Logger, available *protobufs.PackagesAvailable, @@ -40,6 +41,7 @@ func NewPackagesSyncer( } } +// Sync performs the package syncing process. func (s *packagesSyncer) Sync(ctx context.Context) error { defer func() { @@ -95,6 +97,7 @@ func (s *packagesSyncer) initStatuses() error { return nil } +// doSync performs the actual syncing process. func (s *packagesSyncer) doSync(ctx context.Context) { hash, err := s.localState.AllPackagesHash() if err != nil { @@ -136,6 +139,7 @@ func (s *packagesSyncer) doSync(ctx context.Context) { _ = s.reportStatuses(true) } +// syncPackage downloads the package from the server and installs it. func (s *packagesSyncer) syncPackage( ctx context.Context, pkgName string, @@ -214,6 +218,9 @@ func (s *packagesSyncer) syncPackage( return err } +// syncPackageFile downloads the package file from the server. +// If the file already exists and contents are +// unchanged, it is not downloaded again. func (s *packagesSyncer) syncPackageFile( ctx context.Context, pkgName string, file *protobufs.DownloadableFile, ) error { @@ -225,6 +232,7 @@ func (s *packagesSyncer) syncPackageFile( return err } +// shouldDownloadFile returns true if the file should be downloaded. func (s *packagesSyncer) shouldDownloadFile( packageName string, file *protobufs.DownloadableFile, @@ -246,6 +254,7 @@ func (s *packagesSyncer) shouldDownloadFile( return false, nil } +// downloadFile downloads the file from the server. func (s *packagesSyncer) downloadFile(ctx context.Context, pkgName string, file *protobufs.DownloadableFile) error { s.logger.Debugf("Downloading package %s file from %s", pkgName, file.DownloadUrl) @@ -274,6 +283,9 @@ func (s *packagesSyncer) downloadFile(ctx context.Context, pkgName string, file return nil } +// deleteUnneededLocalPackages deletes local packages that are not +// needed anymore. This is done by comparing the local package state +// with the server's package state. func (s *packagesSyncer) deleteUnneededLocalPackages() error { // Read the list of packages we have locally. localPackages, err := s.localState.Packages() @@ -303,6 +315,9 @@ func (s *packagesSyncer) deleteUnneededLocalPackages() error { return lastErr } +// reportStatuses saves the last reported statuses to provider and client state. +// If sendImmediately is true, the statuses are scheduled to be +// sent to the server. func (s *packagesSyncer) reportStatuses(sendImmediately bool) error { // Save it in the user-supplied state provider. if err := s.localState.SetLastReportedStatuses(s.statuses); err != nil { diff --git a/client/internal/sender.go b/client/internal/sender.go index fa928567..28a283d7 100644 --- a/client/internal/sender.go +++ b/client/internal/sender.go @@ -23,7 +23,7 @@ type Sender interface { SetInstanceUid(instanceUid string) error } -// SenderCommon is partial Sender implementation that is common WebSocket and plain +// SenderCommon is partial Sender implementation that is common between WebSocket and plain // HTTP transports. This struct is intended to be embedded in the WebSocket and // HTTP Sender implementations. type SenderCommon struct { @@ -34,6 +34,8 @@ type SenderCommon struct { nextMessage NextMessage } +// NewSenderCommon creates a new SenderCommon. This is intended to be used by +// the WebSocket and HTTP Sender implementations. func NewSenderCommon() SenderCommon { return SenderCommon{ hasPendingMessage: make(chan struct{}, 1), diff --git a/client/internal/wsreceiver.go b/client/internal/wsreceiver.go index 08bed682..55a4a1b1 100644 --- a/client/internal/wsreceiver.go +++ b/client/internal/wsreceiver.go @@ -20,6 +20,8 @@ type wsReceiver struct { processor receivedProcessor } +// NewWSReceiver creates a new Receiver that uses WebSocket to receive +// messages from the server. func NewWSReceiver( logger types.Logger, callbacks types.Callbacks, @@ -39,6 +41,7 @@ func NewWSReceiver( return w } +// ReceiverLoop runs the receiver loop. To stop the receiver cancel the context. func (r *wsReceiver) ReceiverLoop(ctx context.Context) { runContext, cancelFunc := context.WithCancel(ctx) diff --git a/client/internal/wssender.go b/client/internal/wssender.go index 0ea8a490..9a7b6460 100644 --- a/client/internal/wssender.go +++ b/client/internal/wssender.go @@ -19,6 +19,8 @@ type WSSender struct { stopped chan struct{} } +// NewSender creates a new Sender that uses WebSocket to send +// messages to the server. func NewSender(logger types.Logger) *WSSender { return &WSSender{ logger: logger, diff --git a/client/types/callbacks.go b/client/types/callbacks.go index b08c388a..f7a2a39e 100644 --- a/client/types/callbacks.go +++ b/client/types/callbacks.go @@ -39,6 +39,8 @@ type MessageData struct { } type Callbacks interface { + // OnConnect is called when the connection is successfully established to the Server. + // May be called after Start() is called and every time connected to the Server. OnConnect() // OnConnectFailed is called when the connection to the Server cannot be established. @@ -63,7 +65,7 @@ type Callbacks interface { OnMessage(ctx context.Context, msg *MessageData) // OnOpampConnectionSettings is called when the Agent receives an OpAMP - // connection settings offer from the Server. Typically the settings can specify + // connection settings offer from the Server. Typically, the settings can specify // authorization headers or TLS certificate, potentially also a different // OpAMP destination to work with. // diff --git a/client/wsclient.go b/client/wsclient.go index 551a13eb..77c1616e 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -37,6 +37,7 @@ type wsClient struct { sender *internal.WSSender } +// NewWebSocket creates a new OpAMP Client that uses WebSocket transport. func NewWebSocket(logger types.Logger) *wsClient { if logger == nil { logger = &sharedinternal.NopLogger{} From 359f526f1be44bccbbf1bc5cc6e7beafe87a4f5d Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 21 Jul 2022 15:19:05 +0530 Subject: [PATCH 2/3] Apply suggestions from code review Co-authored-by: Andy Keller --- client/types/callbacks.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/types/callbacks.go b/client/types/callbacks.go index f7a2a39e..76d38018 100644 --- a/client/types/callbacks.go +++ b/client/types/callbacks.go @@ -40,7 +40,7 @@ type MessageData struct { type Callbacks interface { // OnConnect is called when the connection is successfully established to the Server. - // May be called after Start() is called and every time connected to the Server. + // May be called after Start() is called and every time a connection is established to the Server. OnConnect() // OnConnectFailed is called when the connection to the Server cannot be established. From 3d2f632347c4c2ed1283127e8987c076c172ec7e Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sat, 23 Jul 2022 01:16:22 +0530 Subject: [PATCH 3/3] review suggestions --- client/httpclient.go | 9 ++++++++- client/types/callbacks.go | 2 ++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/client/httpclient.go b/client/httpclient.go index 9b8bbda3..429d7957 100644 --- a/client/httpclient.go +++ b/client/httpclient.go @@ -34,7 +34,7 @@ func NewHTTP(logger types.Logger) *httpClient { return w } -// Start starts the client and runs until Stop is called. +// Start implements OpAMPClient.Start. func (c *httpClient) Start(ctx context.Context, settings types.StartSettings) error { if err := c.common.PrepareStart(ctx, settings); err != nil { return err @@ -58,30 +58,37 @@ func (c *httpClient) Start(ctx context.Context, settings types.StartSettings) er return nil } +// Stop implements OpAMPClient.Stop. func (c *httpClient) Stop(ctx context.Context) error { return c.common.Stop(ctx) } +// AgentDescription implements OpAMPClient.AgentDescription. func (c *httpClient) AgentDescription() *protobufs.AgentDescription { return c.common.AgentDescription() } +// SetAgentDescription implements OpAMPClient.SetAgentDescription. func (c *httpClient) SetAgentDescription(descr *protobufs.AgentDescription) error { return c.common.SetAgentDescription(descr) } +// SetHealth implements OpAMPClient.SetHealth. func (c *httpClient) SetHealth(health *protobufs.AgentHealth) error { return c.common.SetHealth(health) } +// UpdateEffectiveConfig implements OpAMPClient.UpdateEffectiveConfig. func (c *httpClient) UpdateEffectiveConfig(ctx context.Context) error { return c.common.UpdateEffectiveConfig(ctx) } +// SetRemoteConfigStatus implements OpAMPClient.SetRemoteConfigStatus. func (c *httpClient) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error { return c.common.SetRemoteConfigStatus(status) } +// SetPackageStatuses implements OpAMPClient.SetPackageStatuses. func (c *httpClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) error { return c.common.SetPackageStatuses(statuses) } diff --git a/client/types/callbacks.go b/client/types/callbacks.go index 76d38018..57799f9f 100644 --- a/client/types/callbacks.go +++ b/client/types/callbacks.go @@ -41,6 +41,8 @@ type MessageData struct { type Callbacks interface { // OnConnect is called when the connection is successfully established to the Server. // May be called after Start() is called and every time a connection is established to the Server. + // For WebSocket clients this is called after the handshake is completed without any error. + // For HTTP clients this is called for any request if the response status is OK. OnConnect() // OnConnectFailed is called when the connection to the Server cannot be established.