Skip to content

Commit

Permalink
Test context cancelation and increase coverage (#89)
Browse files Browse the repository at this point in the history
* Add test coverage for uploadWorker and resetMetrics.

* Typo.

* Remove extra comment.

* Update comment.

* Update comment.

* Remove extra wg.Done()
  • Loading branch information
robertodauria authored Nov 29, 2021
1 parent 1941a0d commit db14a5d
Showing 1 changed file with 146 additions and 4 deletions.
150 changes: 146 additions & 4 deletions exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@ import (
"text/template"
"time"

"github.com/m-lab/stats-pipeline/formatter"

"github.com/m-lab/stats-pipeline/output"

"cloud.google.com/go/bigquery"
"github.com/googleapis/google-cloud-go-testing/bigquery/bqiface"
"github.com/m-lab/go/cloudtest/bqfake"
"github.com/m-lab/go/cloudtest/gcsfake"
"github.com/m-lab/go/prometheusx/promtest"
"github.com/m-lab/go/testingx"
"github.com/m-lab/go/uploader"
"github.com/m-lab/stats-pipeline/formatter"
"github.com/m-lab/stats-pipeline/output"
dto "github.com/prometheus/client_model/go"
"google.golang.org/api/iterator"
)

Expand Down Expand Up @@ -144,6 +143,21 @@ func (it *mockRowIterator) Reset() {
it.index = 0
}

type mockWriter struct {
mu *sync.Mutex
path string
content []byte
}

// Write updates the mockWriter fields in a thread-safe way.
func (writer *mockWriter) Write(ctx context.Context, path string, content []byte) error {
writer.mu.Lock()
writer.path = path
writer.content = content
writer.mu.Unlock()
return nil
}

func TestNew(t *testing.T) {
bq, err := bqfake.NewClient(context.Background(), "test")
testingx.Must(t, err, "cannot init bq client")
Expand Down Expand Up @@ -368,3 +382,131 @@ func TestJSONExporter_processQueryResults(t *testing.T) {
t.Errorf("wrong value for the year field: %v", string(row["year"]))
}
}

func Test_resetMetrics(t *testing.T) {
const table = "test"

// Set all the metrics to 1.
queryProcessedMetric.WithLabelValues(table).Set(1)
uploadedBytesMetric.WithLabelValues(table).Set(1)
bytesProcessedMetric.WithLabelValues(table).Set(1)
cacheHitMetric.WithLabelValues(table).Set(1)

// Reset the metrics.
resetMetrics(table)

// Write the metrics to a map and check that they are all 0.
metrics := map[string]*dto.Metric{}
metrics["query_processed"] = &dto.Metric{}
metrics["uploaded_bytes"] = &dto.Metric{}
metrics["bytes_processed"] = &dto.Metric{}
metrics["cache_hit"] = &dto.Metric{}
queryProcessedMetric.WithLabelValues(table).Write(metrics["query_processed"])
uploadedBytesMetric.WithLabelValues(table).Write(metrics["uploaded_bytes"])
bytesProcessedMetric.WithLabelValues(table).Write(metrics["bytes_processed"])
cacheHitMetric.WithLabelValues(table).Write(metrics["cache_hit"])
for _, v := range metrics {
if v.Gauge.GetValue() != 0 {
t.Errorf("metric is not 0: %v", v)
}
}
}

func TestJSONExporter_uploadWorker(t *testing.T) {
// Create an exporter writing to memory through a mocked writer.
writer := &mockWriter{mu: &sync.Mutex{}}
exporter := &JSONExporter{
output: writer,
uploadJobs: make(chan *UploadJob),
results: make(chan UploadResult),
}

wg := sync.WaitGroup{}
wg.Add(1)
go exporter.uploadWorker(context.Background(), &wg)

// Send a fake job to the uploadJobs channel.
exporter.uploadJobs <- &UploadJob{
table: "testtable",
objName: "testfile.json",
content: []byte("test"),
}

// Read result from the results channel and check its content.
result := <-exporter.results
if result.err != nil {
t.Errorf("uploadWorker returned an error: %v", result.err)
}
if result.objName != "testfile.json" {
t.Errorf("wrong object name: %v", result.objName)
}

// Close the channel to signal that there are no more upload jobs.
close(exporter.uploadJobs)

// Wait for the worker to finish.
wg.Wait()

// Check content and path written by the uploadWorker.
if string(writer.content) != "test" {
t.Errorf("Wrong file content: %v", string(writer.content))
}
if writer.path != "testfile.json" {
t.Errorf("Wrong file path: %v", writer.path)
}
}

func TestJSONExporter_uploadWorkerCancellation(t *testing.T) {
// Test termination of the worker when the context is canceled.
// We expect both the query worker and the upload worker to terminate
// on context cancellation. The fact that this test does not time out
// means that the workers terminated properly.
ctx, cancel := context.WithCancel(context.Background())
writer := &mockWriter{mu: &sync.Mutex{}}
exporter := &JSONExporter{
output: writer,
uploadJobs: make(chan *UploadJob),
results: make(chan UploadResult),
}

wg := sync.WaitGroup{}

// Run two uploadWorker goroutines (at least) to simulate actual usage.
wg.Add(2)
go exporter.uploadWorker(ctx, &wg)
go exporter.uploadWorker(ctx, &wg)

// Continuously send the same job to the uploadJobs channel via a fake
// "query worker".
go func() {
for {
select {
case <-ctx.Done():
// Close the channel to signal that there are no more upload jobs and
// decrease the wait group counter.
close(exporter.uploadJobs)
return
case exporter.uploadJobs <- &UploadJob{
table: "testtable",
objName: "testfile.json",
content: []byte("test"),
}:
}
}
}()

// Drain the results channel.
go func() {
for range exporter.results {
// NOTHING
}
}()

// Cancel the context.
cancel()

// Wait for the worker and the job-sending goroutine to finish.
wg.Wait()

close(exporter.results)
}

0 comments on commit db14a5d

Please sign in to comment.