Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update client package docs #107

Merged
merged 3 commits into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type httpClient struct {
sender *internal.HTTPSender
}

// NewHTTP creates a new OpAMP Client that uses HTTP transport.
func NewHTTP(logger types.Logger) *httpClient {
if logger == nil {
logger = &sharedinternal.NopLogger{}
Expand All @@ -33,6 +34,7 @@ func NewHTTP(logger types.Logger) *httpClient {
return w
}

// Start implements OpAMPClient.Start.
func (c *httpClient) Start(ctx context.Context, settings types.StartSettings) error {
if err := c.common.PrepareStart(ctx, settings); err != nil {
return err
Expand All @@ -56,30 +58,37 @@ func (c *httpClient) Start(ctx context.Context, settings types.StartSettings) er
return nil
}

// Stop implements OpAMPClient.Stop.
func (c *httpClient) Stop(ctx context.Context) error {
return c.common.Stop(ctx)
}

// AgentDescription implements OpAMPClient.AgentDescription.
func (c *httpClient) AgentDescription() *protobufs.AgentDescription {
return c.common.AgentDescription()
}

// SetAgentDescription implements OpAMPClient.SetAgentDescription.
func (c *httpClient) SetAgentDescription(descr *protobufs.AgentDescription) error {
return c.common.SetAgentDescription(descr)
}

// SetHealth implements OpAMPClient.SetHealth.
func (c *httpClient) SetHealth(health *protobufs.AgentHealth) error {
return c.common.SetHealth(health)
}

// UpdateEffectiveConfig implements OpAMPClient.UpdateEffectiveConfig.
func (c *httpClient) UpdateEffectiveConfig(ctx context.Context) error {
return c.common.UpdateEffectiveConfig(ctx)
}

// SetRemoteConfigStatus implements OpAMPClient.SetRemoteConfigStatus.
func (c *httpClient) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error {
return c.common.SetRemoteConfigStatus(status)
}

// SetPackageStatuses implements OpAMPClient.SetPackageStatuses.
func (c *httpClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) error {
return c.common.SetPackageStatuses(statuses)
}
Expand Down
15 changes: 15 additions & 0 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ type ClientCommon struct {
stoppedSignal chan struct{}
}

// NewClientCommon creates a new ClientCommon.
func NewClientCommon(logger types.Logger, sender Sender) ClientCommon {
return ClientCommon{
Logger: logger, sender: sender, stoppedSignal: make(chan struct{}, 1),
}
}

// PrepareStart prepares the client state for the next Start() call.
// It returns an error if the client is already started, or if the settings are invalid.
func (c *ClientCommon) PrepareStart(
_ context.Context, settings types.StartSettings,
) error {
Expand Down Expand Up @@ -112,6 +115,7 @@ func (c *ClientCommon) PrepareStart(
return nil
}

// Stop stops the client. It returns an error if the client is not started.
func (c *ClientCommon) Stop(ctx context.Context) error {
if !c.isStarted {
return errCannotStopNotStarted
Expand All @@ -133,12 +137,15 @@ func (c *ClientCommon) Stop(ctx context.Context) error {
return nil
}

// IsStopping returns true if Stop() was called.
func (c *ClientCommon) IsStopping() bool {
c.isStoppingMutex.RLock()
defer c.isStoppingMutex.RUnlock()
return c.isStoppingFlag
}

// StartConnectAndRun initiates the connection with the Server and starts the
// background goroutine that handles the communication unitl client is stopped.
func (c *ClientCommon) StartConnectAndRun(runner func(ctx context.Context)) {
// Create a cancellable context.
runCtx, runCancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -256,6 +263,10 @@ func (c *ClientCommon) UpdateEffectiveConfig(ctx context.Context) error {
return nil
}

// SetRemoteConfigStatus sends a status update to the Server if the new RemoteConfigStatus
// is different from the status we already have in the state.
// It also remembers the new RemoteConfigStatus in the client state so that it can be
// sent to the Server when the Server asks for it.
func (c *ClientCommon) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error {
if status.LastRemoteConfigHash == nil {
return errLastRemoteConfigHashNil
Expand Down Expand Up @@ -283,6 +294,10 @@ func (c *ClientCommon) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatu
return nil
}

// SetPackageStatuses sends a status update to the Server if the new PackageStatuses
// are different from the ones we already have in the state.
// It also remembers the new PackageStatuses in the client state so that it can be
// sent to the Server when the Server asks for it.
func (c *ClientCommon) SetPackageStatuses(statuses *protobufs.PackageStatuses) error {
if statuses.ServerProvidedAllPackagesHash == nil {
return errServerProvidedAllPackagesHashNil
Expand Down
5 changes: 3 additions & 2 deletions client/internal/clientstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ var (
)

// ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to
// have access to synchronize to the Server. 3 messages can be stored in this store:
// AgentDescription, RemoteConfigStatus and PackageStatuses.
// have access to synchronize to the Server. 4 messages can be stored in this store:
// AgentDescription, AgentHealth, RemoteConfigStatus and PackageStatuses.
//
// See OpAMP spec for more details on how state synchronization works:
// https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#Agent-to-Server-state-synchronization
Expand Down Expand Up @@ -82,6 +82,7 @@ func (s *ClientSyncedState) SetAgentDescription(descr *protobufs.AgentDescriptio
return nil
}

// SetHealth sets the AgentHealth in the state.
func (s *ClientSyncedState) SetHealth(health *protobufs.AgentHealth) error {
if health == nil {
return ErrAgentHealthMissing
Expand Down
5 changes: 5 additions & 0 deletions client/internal/httpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type HTTPSender struct {
receiveProcessor receivedProcessor
}

// NewHTTPSender creates a new Sender that uses HTTP to send messages
// with default settings.
func NewHTTPSender(logger types.Logger) *HTTPSender {
h := &HTTPSender{
SenderCommon: NewSenderCommon(),
Expand Down Expand Up @@ -102,6 +104,9 @@ func (h *HTTPSender) SetRequestHeader(header http.Header) {
h.requestHeader.Set(headerContentType, contentTypeProtobuf)
}

// makeOneRequestRoundtrip sends a request and receives a response.
// It will retry the request if the server responds with too many
// requests or unavailable status.
func (h *HTTPSender) makeOneRequestRoundtrip(ctx context.Context) {
resp, err := h.sendRequestWithRetries(ctx)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions client/internal/nextmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type NextMessage struct {
messageMutex sync.Mutex
}

// NewNextMessage returns a new empty NextMessage.
func NewNextMessage() NextMessage {
return NextMessage{
nextMessage: &protobufs.AgentToServer{},
Expand Down
15 changes: 15 additions & 0 deletions client/internal/packagessyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type packagesSyncer struct {
doneCh chan struct{}
}

// NewPackagesSyncer creates a new packages syncer.
func NewPackagesSyncer(
logger types.Logger,
available *protobufs.PackagesAvailable,
Expand All @@ -40,6 +41,7 @@ func NewPackagesSyncer(
}
}

// Sync performs the package syncing process.
func (s *packagesSyncer) Sync(ctx context.Context) error {

defer func() {
Expand Down Expand Up @@ -95,6 +97,7 @@ func (s *packagesSyncer) initStatuses() error {
return nil
}

// doSync performs the actual syncing process.
func (s *packagesSyncer) doSync(ctx context.Context) {
hash, err := s.localState.AllPackagesHash()
if err != nil {
Expand Down Expand Up @@ -136,6 +139,7 @@ func (s *packagesSyncer) doSync(ctx context.Context) {
_ = s.reportStatuses(true)
}

// syncPackage downloads the package from the server and installs it.
func (s *packagesSyncer) syncPackage(
ctx context.Context,
pkgName string,
Expand Down Expand Up @@ -214,6 +218,9 @@ func (s *packagesSyncer) syncPackage(
return err
}

// syncPackageFile downloads the package file from the server.
// If the file already exists and contents are
// unchanged, it is not downloaded again.
func (s *packagesSyncer) syncPackageFile(
ctx context.Context, pkgName string, file *protobufs.DownloadableFile,
) error {
Expand All @@ -225,6 +232,7 @@ func (s *packagesSyncer) syncPackageFile(
return err
}

// shouldDownloadFile returns true if the file should be downloaded.
func (s *packagesSyncer) shouldDownloadFile(
packageName string,
file *protobufs.DownloadableFile,
Expand All @@ -246,6 +254,7 @@ func (s *packagesSyncer) shouldDownloadFile(
return false, nil
}

// downloadFile downloads the file from the server.
func (s *packagesSyncer) downloadFile(ctx context.Context, pkgName string, file *protobufs.DownloadableFile) error {
s.logger.Debugf("Downloading package %s file from %s", pkgName, file.DownloadUrl)

Expand Down Expand Up @@ -274,6 +283,9 @@ func (s *packagesSyncer) downloadFile(ctx context.Context, pkgName string, file
return nil
}

// deleteUnneededLocalPackages deletes local packages that are not
// needed anymore. This is done by comparing the local package state
// with the server's package state.
func (s *packagesSyncer) deleteUnneededLocalPackages() error {
// Read the list of packages we have locally.
localPackages, err := s.localState.Packages()
Expand Down Expand Up @@ -303,6 +315,9 @@ func (s *packagesSyncer) deleteUnneededLocalPackages() error {
return lastErr
}

// reportStatuses saves the last reported statuses to provider and client state.
// If sendImmediately is true, the statuses are scheduled to be
// sent to the server.
func (s *packagesSyncer) reportStatuses(sendImmediately bool) error {
// Save it in the user-supplied state provider.
if err := s.localState.SetLastReportedStatuses(s.statuses); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion client/internal/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Sender interface {
SetInstanceUid(instanceUid string) error
}

// SenderCommon is partial Sender implementation that is common WebSocket and plain
// SenderCommon is partial Sender implementation that is common between WebSocket and plain
// HTTP transports. This struct is intended to be embedded in the WebSocket and
// HTTP Sender implementations.
type SenderCommon struct {
Expand All @@ -34,6 +34,8 @@ type SenderCommon struct {
nextMessage NextMessage
}

// NewSenderCommon creates a new SenderCommon. This is intended to be used by
// the WebSocket and HTTP Sender implementations.
func NewSenderCommon() SenderCommon {
return SenderCommon{
hasPendingMessage: make(chan struct{}, 1),
Expand Down
3 changes: 3 additions & 0 deletions client/internal/wsreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type wsReceiver struct {
processor receivedProcessor
}

// NewWSReceiver creates a new Receiver that uses WebSocket to receive
// messages from the server.
func NewWSReceiver(
logger types.Logger,
callbacks types.Callbacks,
Expand All @@ -39,6 +41,7 @@ func NewWSReceiver(
return w
}

// ReceiverLoop runs the receiver loop. To stop the receiver cancel the context.
func (r *wsReceiver) ReceiverLoop(ctx context.Context) {
runContext, cancelFunc := context.WithCancel(ctx)

Expand Down
2 changes: 2 additions & 0 deletions client/internal/wssender.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type WSSender struct {
stopped chan struct{}
}

// NewSender creates a new Sender that uses WebSocket to send
// messages to the server.
func NewSender(logger types.Logger) *WSSender {
return &WSSender{
logger: logger,
Expand Down
6 changes: 5 additions & 1 deletion client/types/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type MessageData struct {
}

type Callbacks interface {
// OnConnect is called when the connection is successfully established to the Server.
// May be called after Start() is called and every time a connection is established to the Server.
// For WebSocket clients this is called after the handshake is completed without any error.
// For HTTP clients this is called for any request if the response status is OK.
OnConnect()

// OnConnectFailed is called when the connection to the Server cannot be established.
Expand All @@ -63,7 +67,7 @@ type Callbacks interface {
OnMessage(ctx context.Context, msg *MessageData)

// OnOpampConnectionSettings is called when the Agent receives an OpAMP
// connection settings offer from the Server. Typically the settings can specify
// connection settings offer from the Server. Typically, the settings can specify
// authorization headers or TLS certificate, potentially also a different
// OpAMP destination to work with.
//
Expand Down
1 change: 1 addition & 0 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type wsClient struct {
sender *internal.WSSender
}

// NewWebSocket creates a new OpAMP Client that uses WebSocket transport.
func NewWebSocket(logger types.Logger) *wsClient {
if logger == nil {
logger = &sharedinternal.NopLogger{}
Expand Down