Skip to content

Commit

Permalink
update APM query interface to support time range
Browse files Browse the repository at this point in the history
  • Loading branch information
lgfa29 committed Sep 15, 2020
1 parent cec7ca3 commit 298b5c0
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 23 deletions.
22 changes: 15 additions & 7 deletions plugins/apm/apm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,20 @@ import (

plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad-autoscaler/plugins/base"
"github.com/hashicorp/nomad-autoscaler/sdk"
)

type APM interface {
Query(q string) (float64, error)
Query(q string, r sdk.TimeRange) (sdk.TimestampedMetrics, error)
PluginInfo() (*base.PluginInfo, error)
SetConfig(config map[string]string) error
}

type QueryRPCReq struct {
Query string
Range sdk.TimeRange
}

// RPC is a plugin implementation that talks over net/rpc
type RPC struct {
client *rpc.Client
Expand All @@ -27,11 +33,13 @@ func (r *RPC) SetConfig(config map[string]string) error {
return resp
}

func (r *RPC) Query(q string) (float64, error) {
var resp float64
err := r.client.Call("Plugin.Query", q, &resp)
func (r *RPC) Query(q string, rng sdk.TimeRange) (sdk.TimestampedMetrics, error) {
req := QueryRPCReq{Query: q, Range: rng}
var resp sdk.TimestampedMetrics

err := r.client.Call("Plugin.Query", req, &resp)
if err != nil {
return 0, err
return nil, err
}
return resp, nil
}
Expand All @@ -56,8 +64,8 @@ func (s *RPCServer) SetConfig(config map[string]string, resp *error) error {
return err
}

func (s *RPCServer) Query(q string, resp *float64) error {
r, err := s.Impl.Query(q)
func (s *RPCServer) Query(req QueryRPCReq, resp *sdk.TimestampedMetrics) error {
r, err := s.Impl.Query(req.Query, req.Range)
if err != nil {
return err
}
Expand Down
14 changes: 11 additions & 3 deletions policy/nomad/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func parseChecks(cs interface{}) []*sdk.ScalingPolicyCheck {
// | check "name" { |
// | source = "source" |
// | query = "query" |
// | query_window = "5m" |
// | strategy "strategy" { ... } |
// | } |
// +--------------------------------+
Expand Down Expand Up @@ -136,10 +137,17 @@ func parseCheck(c interface{}) *sdk.ScalingPolicyCheck {
query, _ := checkMap[keyQuery].(string)
source, _ := checkMap[keySource].(string)

// Parse query_window ignoring errors since we assume policy has been validated.
var queryWindow time.Duration
if queryWindowStr, ok := checkMap[keyQueryWindow].(string); ok {
queryWindow, _ = time.ParseDuration(queryWindowStr)
}

return &sdk.ScalingPolicyCheck{
Query: query,
Source: source,
Strategy: strategy,
Query: query,
QueryWindow: queryWindow,
Source: source,
Strategy: strategy,
}
}

Expand Down
1 change: 1 addition & 0 deletions policy/nomad/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
const (
keySource = "source"
keyQuery = "query"
keyQueryWindow = "query_window"
keyEvaluationInterval = "evaluation_interval"
keyTarget = "target"
keyChecks = "check"
Expand Down
22 changes: 18 additions & 4 deletions policy/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"time"

metrics "github.com/armon/go-metrics"
Expand Down Expand Up @@ -255,15 +256,23 @@ func (h *checkHandler) start(ctx context.Context) {
}

// Query check's APM.
h.checkEval.Metric, err = h.runAPMQuery(apmInst)
h.checkEval.Metrics, err = h.runAPMQuery(apmInst)
if err != nil {
result.err = fmt.Errorf("failed to query source: %v", err)
h.resultCh <- result
return
}

// Make sure metrics are sorted consistently.
sort.Sort(h.checkEval.Metrics)

if len(h.checkEval.Metrics) == 0 {
h.logger.Info("no metrics available")
return
}

// Calculate new count using check's Strategy.
h.logger.Info("calculating new count", "count", currentStatus.Count, "metric", h.checkEval.Metric)
h.logger.Info("calculating new count", "count", currentStatus.Count)
runResp, err := h.runStrategyRun(strategyInst, currentStatus.Count)
if err != nil {
result.err = fmt.Errorf("failed to execute strategy: %v", err)
Expand Down Expand Up @@ -386,15 +395,20 @@ func (h *checkHandler) runTargetScale(targetImpl target.Target, action sdk.Scali
}

// runAPMQuery wraps the apm.Query call to provide operational functionality.
func (h *checkHandler) runAPMQuery(apmImpl apm.APM) (float64, error) {
func (h *checkHandler) runAPMQuery(apmImpl apm.APM) (sdk.TimestampedMetrics, error) {

h.logger.Info("querying source", "query", h.checkEval.Check.Query, "source", h.checkEval.Check.Source)

// Trigger a metric measure to track latency of the call.
labels := []metrics.Label{{Name: "plugin_name", Value: h.checkEval.Check.Source}, {Name: "policy_id", Value: h.policy.ID}}
defer metrics.MeasureSinceWithLabels([]string{"plugin", "apm", "query", "invoke_ms"}, time.Now(), labels)

return apmImpl.Query(h.checkEval.Check.Query)
// Calculate query range from the query window defined in the check.
to := time.Now()
from := to.Add(-h.checkEval.Check.QueryWindow)
r := sdk.TimeRange{From: from, To: to}

return apmImpl.Query(h.checkEval.Check.Query, r)
}

// runStrategyRun wraps the strategy.Run call to provide operational functionality.
Expand Down
29 changes: 29 additions & 0 deletions sdk/apm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package sdk

import "time"

// TimestampedMetric contains a single metric Value along with its associated
// Timestamp.
type TimestampedMetric struct {
Timestamp time.Time
Value float64
}

// TimestampedMetrics is an array of timestamped metric values. This type is
// used so we can sort metrics based on the timestamp.
type TimestampedMetrics []TimestampedMetric

// Len satisfies the Len function of the sort.Interface interface.
func (t TimestampedMetrics) Len() int { return len(t) }

// Less satisfies the Less function of the sort.Interface interface.
func (t TimestampedMetrics) Less(i, j int) bool { return t[i].Timestamp.Before(t[j].Timestamp) }

// Swap satisfies the Swap function of the sort.Interface interface.
func (t TimestampedMetrics) Swap(i, j int) { t[i], t[j] = t[j], t[i] }

// TimeRange defines a range of time.
type TimeRange struct {
From time.Time
To time.Time
}
2 changes: 1 addition & 1 deletion sdk/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type ScalingCheckEvaluation struct {
Check *ScalingPolicyCheck

// Metric is the metric resulting from querying the APM.
Metric float64
Metrics TimestampedMetrics

// Action is the calculated desired state and is populated by strategy.Run.
Action *ScalingAction
Expand Down
51 changes: 43 additions & 8 deletions sdk/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,22 @@ type ScalingPolicyCheck struct {

// Name is a human readable name for this check and allows operators to
// create clearly identified policy checks.
Name string `hcl:"name,label"`
Name string

// Source is the APM plugin that should be used to perform the query and
// obtain the metric that will be used to perform a calculation.
Source string `hcl:"source,optional"`
Source string

// Query is run against the Source in order to receive a metric response.
Query string `hcl:"query"`
Query string

// QueryWindow is used to define how further back in time to query for
// metrics.
QueryWindow time.Duration

// Strategy is the ScalingPolicyStrategy to use when performing the
// ScalingPolicyCheck evaluation.
Strategy *ScalingPolicyStrategy `hcl:"strategy,block"`
Strategy *ScalingPolicyStrategy
}

// ScalingPolicyStrategy contains the plugin and configuration details for
Expand Down Expand Up @@ -119,9 +123,18 @@ type FileDecodePolicyDoc struct {
Cooldown time.Duration
CooldownHCL string `hcl:"cooldown,optional"`
EvaluationInterval time.Duration
EvaluationIntervalHCL string `hcl:"evaluation_interval,optional"`
Checks []*ScalingPolicyCheck `hcl:"check,block"`
Target *ScalingPolicyTarget `hcl:"target,block"`
EvaluationIntervalHCL string `hcl:"evaluation_interval,optional"`
Checks []*FileDecodePolicyCheckDoc `hcl:"check,block"`
Target *ScalingPolicyTarget `hcl:"target,block"`
}

type FileDecodePolicyCheckDoc struct {
Name string `hcl:"name,label"`
Source string `hcl:"source,optional"`
Query string `hcl:"query"`
QueryWindow time.Duration
QueryWindowHCL string `hcl:"query_window"`
Strategy *ScalingPolicyStrategy `hcl:"strategy,block"`
}

// Translate all values from the decoded policy file into our internal policy
Expand All @@ -132,6 +145,28 @@ func (fpd *FileDecodeScalingPolicy) Translate(p *ScalingPolicy) {
p.Enabled = fpd.Enabled
p.Cooldown = fpd.Doc.Cooldown
p.EvaluationInterval = fpd.Doc.EvaluationInterval
p.Checks = fpd.Doc.Checks
p.Target = fpd.Doc.Target

fpd.translateChecks(p)
}

func (fpd *FileDecodeScalingPolicy) translateChecks(p *ScalingPolicy) {
var checks []*ScalingPolicyCheck
for _, c := range fpd.Doc.Checks {
var check *ScalingPolicyCheck
c.Translate(check)
checks = append(checks, check)
}

p.Checks = checks
}

// Translate all values from the decoded policy check into our internal policy
// check object.
func (fdc *FileDecodePolicyCheckDoc) Translate(c *ScalingPolicyCheck) {
c.Name = fdc.Name
c.Source = fdc.Source
c.Query = fdc.Query
c.QueryWindow = fdc.QueryWindow
c.Strategy = fdc.Strategy
}

0 comments on commit 298b5c0

Please sign in to comment.