diff --git a/x/report/report.go b/x/report/report.go new file mode 100644 index 00000000..b785f4f2 --- /dev/null +++ b/x/report/report.go @@ -0,0 +1,251 @@ +// Package report provides functionality for collecting and sending reports. +// +// It defines the behavior of a report collector and provides implementations of the Collector interface. +// It also defines a report type and a HasSuccess interface that is implemented by the report type. +// The report type is used to represent a connectivity test report. +// The HasSuccess interface is used to determine the success status of a report. This will be used to control SamplingCollector behavior. +// The report package also defines a BadRequestError type that is used to represent an error that occurs when a sending the report to remote collector fails. +package report + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "math/rand" + "net/http" + "net/url" + "time" +) + +// BadRequestError represents an error that occurs when a request fails. +type BadRequestError struct { + Err error +} + +// Error returns the error message associated with the BadRequestError. +func (e BadRequestError) Error() string { + return e.Err.Error() +} + +// Unwrap returns the underlying error wrapped by the BadRequestError. +func (e BadRequestError) Unwrap() error { + return e.Err +} + +// ConnectivityReport represents a report containing connectivity information. +type ConnectivityReport struct { + // Connection setup + Connection interface{} `json:"connection"` + // Observations + Time time.Time `json:"time"` + DurationMs int64 `json:"durationMs"` + // Connectivity error, if any + Error interface{} `json:"error"` +} + +// Report is an alias for any type of report. +type Report any + +// HasSuccess is an interface that represents an object that has a success status. +type HasSuccess interface { + IsSuccess() bool +} + +// ConnectivityReport implements the [HasSuccess] interface. +func (r ConnectivityReport) IsSuccess() bool { + if r.Error == nil { + return true + } else { + return false + } +} + +// Collector is an interface that defines the behavior of a report collector. +// Implementations of this interface should be able to collect a report in a given context. +type Collector interface { + Collect(context.Context, Report) error +} + +// RemoteCollector represents a collector that communicates with a remote endpoint. +type RemoteCollector struct { + httpClient *http.Client + collectorEndpoint *url.URL +} + +// Collect sends the given report to the remote collector. +// It marshals the report into JSON format and sends it using the sendReport method. +// If there is an error encoding the JSON or sending the report, it returns the error. +// Otherwise, it returns nil. +func (c *RemoteCollector) Collect(ctx context.Context, report Report) error { + jsonData, err := json.Marshal(report) + if err != nil { + return err + } + err = c.sendReport(ctx, jsonData) + if err != nil { + return err + } + return nil +} + +// SamplingCollector represents a collector that randomly samples and collects a report. +type SamplingCollector struct { + collector Collector + successFraction float64 + failureFraction float64 +} + +// Collect collects the given report based on the sampling rate defined in the [SamplingCollector]. +// It checks if the report implements the [HasSuccess] interface and determines the sampling rate based on the success status. +// If the randomly generated number is less than the sampling rate, the report is collected using the underlying collector. +// Otherwise, the report is not sent. +// It returns an error if there is an issue collecting the report. +// Sampling rate of 1.0 means report is always sent, and 0.0 means report is never sent. +func (c *SamplingCollector) Collect(ctx context.Context, report Report) error { + var samplingRate float64 + hs, ok := report.(HasSuccess) + if !ok { + return nil + } + if hs.IsSuccess() { + samplingRate = c.successFraction + } else { + samplingRate = c.failureFraction + } + // Generate a random float64 number between 0 and 1 + random := rand.Float64() + if random < samplingRate { + err := c.collector.Collect(ctx, report) + if err != nil { + return err + } + return nil + } else { + return nil + } +} + +// RetryCollector represents a collector that supports retrying failed operations. +type RetryCollector struct { + collector Collector + maxRetry int + waitBetweenRetry time.Duration +} + +// Collect collects the report by making multiple attempts with retries. +// It uses the provided context and report to call the underlying collector's [Collect] method. +// If a [BadRequestError] is encountered during the collection, it breaks the retry loop. +// It sleeps for a specified duration between retries. +// Returns an error if the maximum number of retries is exceeded. +func (c *RetryCollector) Collect(ctx context.Context, report Report) error { + var e *BadRequestError + for i := 0; i < c.maxRetry; i++ { + err := c.collector.Collect(ctx, report) + if err != nil { + if errors.As(err, &e) { + break + } else { + time.Sleep(c.waitBetweenRetry) + } + } else { + return nil + } + } + return errors.New("max retry exceeded") +} + +// MutltiCollector represents a collector that combines multiple collectors. +type MutltiCollector struct { + collectors []Collector +} + +// Collect implements Collector interface on [MutltiCollector] type. +// It collects the report using all the provided collectors in the MultiCollector. +// It returns an error if all collectors fail to collect the report. +func (c *MutltiCollector) Collect(ctx context.Context, report Report) error { + success := false + for i := range c.collectors { + err := c.collectors[i].Collect(ctx, report) + if err != nil { + success = success || false + } else { + success = success || true + } + } + if success { + // At least one collector succeeded + return nil + } + return errors.New("all collectors failed") +} + +// FallbackCollector is a type that represents a collector that falls back to multiple collectors. +type FallbackCollector struct { + collectors []Collector +} + +// Collect implements Collector interface on [FallbackCollector] type that collects a report using the provided context and report data. +// It iterates over a list of collectors and attempts to collect the report using each collector. +// If any of the collectors succeeds in collecting the report, operation aborts, and it returns nil. +// If all collectors fail to collect the report, it returns an error indicating the failure. +func (c *FallbackCollector) Collect(ctx context.Context, report Report) error { + for i := range c.collectors { + err := c.collectors[i].Collect(ctx, report) + if err == nil { + return nil + } + } + return errors.New("all collectors failed") +} + +// sendReport sends a report to the remote collector. +// It takes a context.Context object for cancellation and deadline propagation, +// and a []byte containing the JSON data to be sent. +// It returns an error if there was a problem sending the report or reading the response. +func (c *RemoteCollector) sendReport(ctx context.Context, jsonData []byte) error { + // TODO: return status code of HTTP response + req, err := http.NewRequest("POST", c.collectorEndpoint.String(), bytes.NewReader(jsonData)) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/json; charset=utf-8") + resp, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer resp.Body.Close() + _, err = io.ReadAll(resp.Body) + if err != nil { + return err + } + if 400 <= resp.StatusCode && resp.StatusCode < 500 { + return &BadRequestError{ + Err: fmt.Errorf("http request failed with status code %d", resp.StatusCode), + } + } + return nil +} + +// WriteCollector represents a collector that writes the report to an io.Writer. +type WriteCollector struct { + writer io.Writer +} + +// Collect writes the report to the underlying io.Writer. +// It returns an error if there was a problem writing the report. +func (c *WriteCollector) Collect(ctx context.Context, report Report) error { + jsonData, err := json.Marshal(report) + if err != nil { + return err + } + _, err = c.writer.Write(jsonData) + if err != nil { + return err + } + fmt.Println("Report written") + return nil +} diff --git a/x/reporter/reporter_test.go b/x/report/report_test.go similarity index 69% rename from x/reporter/reporter_test.go rename to x/report/report_test.go index ef3559b6..359062c2 100644 --- a/x/reporter/reporter_test.go +++ b/x/report/report_test.go @@ -12,16 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -package reporter +package report import ( "context" "encoding/json" + "errors" "fmt" "io" "net/http" "net/http/httptest" "net/url" + "os" "testing" "time" ) @@ -89,6 +91,7 @@ func TestSendReportSuccessfully(t *testing.T) { } c := RemoteCollector{ collectorEndpoint: u, + httpClient: &http.Client{Timeout: 10 * time.Second}, } err = c.Collect(context.Background(), r) if err != nil { @@ -102,6 +105,7 @@ func TestSendReportUnsuccessfully(t *testing.T) { Time: time.Now().UTC().Truncate(time.Second), DurationMs: 1, } + var e *BadRequestError var r Report = testReport v, ok := r.(HasSuccess) if ok { @@ -113,15 +117,16 @@ func TestSendReportUnsuccessfully(t *testing.T) { } c := RemoteCollector{ collectorEndpoint: u, + httpClient: &http.Client{Timeout: 10 * time.Second}, } err = c.Collect(context.Background(), r) if err == nil { t.Errorf("Expected 405 error no error occurred!") } else { - if err, ok := err.(BadRequestErr); ok { - if err.StatusCode != 405 { - t.Errorf("Expected 405 error no error occurred!") - } + if errors.As(err, &e) { + fmt.Printf("Error was expected: %v\n", err) + } else { + t.Errorf("Expected 405 error, but got: %v", err) } } } @@ -144,6 +149,7 @@ func TestSamplingCollector(t *testing.T) { c := SamplingCollector{ collector: &RemoteCollector{ collectorEndpoint: u, + httpClient: &http.Client{Timeout: 10 * time.Second}, }, successFraction: 0.5, failureFraction: 0.1, @@ -186,6 +192,7 @@ func TestSendJSONToServer(t *testing.T) { var r Report = testReport c := RemoteCollector{ collectorEndpoint: u, + httpClient: &http.Client{Timeout: 10 * time.Second}, } err = c.Collect(context.Background(), r) if err != nil { @@ -216,9 +223,11 @@ func TestFallbackCollector(t *testing.T) { collectors: []Collector{ &RemoteCollector{ collectorEndpoint: u1, + httpClient: &http.Client{Timeout: 10 * time.Second}, }, &RemoteCollector{ collectorEndpoint: u2, + httpClient: &http.Client{Timeout: 10 * time.Second}, }, }, } @@ -227,3 +236,80 @@ func TestFallbackCollector(t *testing.T) { t.Errorf("Expected no error, but got: %v", err) } } + +func TestRetryCollector(t *testing.T) { + var testReport = ConnectivityReport{ + Connection: nil, + Time: time.Now().UTC().Truncate(time.Second), + DurationMs: 1, + } + var r Report = testReport + v, ok := r.(HasSuccess) + if ok { + fmt.Printf("The test report shows success: %v\n", v.IsSuccess()) + } + u, err := url.Parse("https://google.com") + if err != nil { + t.Errorf("Expected no error, but got: %v", err) + } + c := RetryCollector{ + collector: &RemoteCollector{ + collectorEndpoint: u, + httpClient: &http.Client{Timeout: 10 * time.Second}, + }, + maxRetry: 3, + waitBetweenRetry: 1 * time.Second, + } + err = c.Collect(context.Background(), r) + if err == nil { + t.Errorf("max retry error expcted not got none!") + } else { + fmt.Printf("Error was expected: %v\n", err) + } +} + +func TestWriteCollector(t *testing.T) { + var testReport = ConnectivityReport{ + Connection: nil, + Time: time.Now().UTC().Truncate(time.Second), + DurationMs: 1, + } + var r Report = testReport + v, ok := r.(HasSuccess) + if ok { + fmt.Printf("The test report shows success: %v\n", v.IsSuccess()) + } + c := WriteCollector{ + writer: io.Discard, + } + err := c.Collect(context.Background(), r) + if err != nil { + t.Errorf("Expected no error, but got: %v", err) + } +} + +// TestWriteCollectorToFile that opens a file and collects to a temp file +func TestWriteCollectorToFile(t *testing.T) { + var testReport = ConnectivityReport{ + Connection: nil, + Time: time.Now().UTC().Truncate(time.Second), + DurationMs: 1, + } + var r Report = testReport + v, ok := r.(HasSuccess) + if ok { + fmt.Printf("The test report shows success: %v\n", v.IsSuccess()) + } + f, err := os.CreateTemp("", "test") + if err != nil { + t.Errorf("Expected no error, but got: %v", err) + } + defer os.Remove(f.Name()) // clean up + c := WriteCollector{ + writer: f, + } + err = c.Collect(context.Background(), r) + if err != nil { + t.Errorf("Expected no error, but got: %v", err) + } +} diff --git a/x/reporter/reporter.go b/x/reporter/reporter.go deleted file mode 100644 index e46a3ca5..00000000 --- a/x/reporter/reporter.go +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright 2023 Jigsaw Operations LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package reporter - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "log" - "math/rand" - "net/http" - "net/url" - "time" -) - -var debugLog log.Logger = *log.New(io.Discard, "", 0) -var httpClient = &http.Client{} - -type BadRequestErr struct { - StatusCode int - Message string -} - -func (e BadRequestErr) Error() string { - return e.Message -} - -type ConnectivityReport struct { - // Connection setup - Connection interface{} `json:"connection"` - // Observations - Time time.Time `json:"time"` - DurationMs int64 `json:"durationMs"` - Error interface{} `json:"error"` -} - -type Report any - -type HasSuccess interface { - IsSuccess() bool -} - -// ConnectivityReport implements the HasSuccess interface -func (r ConnectivityReport) IsSuccess() bool { - if r.Error == nil { - return true - } else { - return false - } -} - -type Collector interface { - Collect(context.Context, Report) error -} - -type RemoteCollector struct { - collectorEndpoint *url.URL -} - -func (c *RemoteCollector) Collect(ctx context.Context, report Report) error { - jsonData, err := json.Marshal(report) - if err != nil { - log.Printf("Error encoding JSON: %s\n", err) - return err - } - err = sendReport(ctx, jsonData, c.collectorEndpoint) - if err != nil { - log.Printf("Send report failed: %v", err) - return err - } - fmt.Println("Report sent") - return nil -} - -type SamplingCollector struct { - collector Collector - successFraction float64 - failureFraction float64 -} - -func (c *SamplingCollector) Collect(ctx context.Context, report Report) error { - var samplingRate float64 - hs, ok := report.(HasSuccess) - if !ok { - log.Printf("Report does not implement HasSuccess interface") - return nil - } - if hs.IsSuccess() { - samplingRate = c.successFraction - } else { - samplingRate = c.failureFraction - } - // Generate a random float64 number between 0 and 1 - random := rand.Float64() - if random < samplingRate { - err := c.collector.Collect(ctx, report) - if err != nil { - log.Printf("Error collecting report: %v", err) - return err - } - return nil - } else { - fmt.Println("Report was not sent this time") - return nil - } -} - -type RetryCollector struct { - collector Collector - maxRetry int - waitBetweenRetry time.Duration -} - -func (c *RetryCollector) Collect(ctx context.Context, report Report) error { - for i := 0; i < c.maxRetry; i++ { - err := c.collector.Collect(ctx, report) - if err != nil { - if _, ok := err.(BadRequestErr); ok { - break - } else { - time.Sleep(c.waitBetweenRetry) - } - } else { - fmt.Println("Report sent") - return nil - } - } - return errors.New("max retry exceeded") -} - -type MutltiCollector struct { - collectors []Collector -} - -// Collects reports through multiple collectors -func (c *MutltiCollector) Collect(ctx context.Context, report Report) error { - success := false - for i := range c.collectors { - err := c.collectors[i].Collect(ctx, report) - if err != nil { - log.Printf("Error collecting report: %v", err) - success = success || false - } else { - success = success || true - } - } - if success { - // At least one collector succeeded - return nil - } - return errors.New("all collectors failed") -} - -type FallbackCollector struct { - collectors []Collector -} - -// Collects reports through multiple collectors -func (c *FallbackCollector) Collect(ctx context.Context, report Report) error { - for i := range c.collectors { - err := c.collectors[i].Collect(ctx, report) - if err == nil { - debugLog.Println("Report sent!") - return nil - } - } - return errors.New("all collectors failed") -} - -func sendReport(ctx context.Context, jsonData []byte, remote *url.URL) error { - // TODO: return status code of HTTP response - req, err := http.NewRequest("POST", remote.String(), bytes.NewReader(jsonData)) - if err != nil { - debugLog.Printf("Error creating the HTTP request: %s\n", err) - return err - } - - req.Header.Set("Content-Type", "application/json; charset=utf-8") - resp, err := httpClient.Do(req.WithContext(ctx)) - if err != nil { - log.Printf("Error sending the HTTP request: %s\n", err) - return err - } - defer resp.Body.Close() - // Access the HTTP response status code - fmt.Printf("HTTP Response Status Code: %d\n", resp.StatusCode) - respBody, err := io.ReadAll(resp.Body) - if err != nil { - debugLog.Printf("Error reading the HTTP response body: %s\n", err) - return err - } - if resp.StatusCode >= 400 { - debugLog.Printf("Error sending the report: %s\n", respBody) - return BadRequestErr{ - StatusCode: resp.StatusCode, - Message: fmt.Sprintf("HTTP request failed with status code %d", resp.StatusCode), - } - } - debugLog.Printf("Response: %s\n", respBody) - return nil -}