Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.18] Add custom thresholds mechanism in APIResponsivenessPrometheus measurement #1524

Merged
merged 1 commit into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/prometheus/common/model"
"gopkg.in/yaml.v2"
"k8s.io/klog"

"k8s.io/perf-tests/clusterloader2/pkg/errors"
Expand Down Expand Up @@ -70,7 +71,7 @@ const (

latencyWindowSize = 5 * time.Minute

// Number of metrics with highest latency to print. If the latency exceeeds SLO threshold, a metric is printed regardless.
// Number of metrics with highest latency to print. If the latency exceeds SLO threshold, a metric is printed regardless.
topToPrint = 5
)

Expand All @@ -95,6 +96,20 @@ type apiCallMetrics struct {
metrics map[string]*apiCallMetric
}

type customThresholdEntry struct {
Resource string `json:"resource"`
Subresource string `json:"subresource"`
Verb string `json:"verb"`
Scope string `json:"scope"`
Threshold time.Duration `json:"threshold"`
}

type customThresholds map[string]time.Duration

func (cte *customThresholdEntry) getKey() string {
return buildKey(cte.Resource, cte.Subresource, cte.Verb, cte.Scope)
}

type apiResponsivenessGatherer struct{}

func (a *apiResponsivenessGatherer) Gather(executor QueryExecutor, startTime time.Time, config *measurement.MeasurementConfig) (measurement.Summary, error) {
Expand All @@ -118,7 +133,12 @@ func (a *apiResponsivenessGatherer) Gather(executor QueryExecutor, startTime tim
return nil, err
}

badMetrics := a.validateAPICalls(config.Identifier, allowedSlowCalls, apiCalls)
customThresholds, err := getCustomThresholds(config, apiCalls)
if err != nil {
return nil, err
}

badMetrics := a.validateAPICalls(config.Identifier, allowedSlowCalls, apiCalls, customThresholds)
if len(badMetrics) > 0 {
err = errors.NewMetricViolationError("top latency metric", fmt.Sprintf("there should be no high-latency requests, but: %v", badMetrics))
}
Expand Down Expand Up @@ -202,13 +222,44 @@ func (a *apiResponsivenessGatherer) gatherAPICalls(executor QueryExecutor, start
return newFromSamples(latencySamples, countSamples, countSlowSamples)
}

func (a *apiResponsivenessGatherer) validateAPICalls(identifier string, allowedSlowCalls int, metrics *apiCallMetrics) []error {
func getCustomThresholds(config *measurement.MeasurementConfig, metrics *apiCallMetrics) (customThresholds, error) {
thresholdsString, err := util.GetStringOrDefault(config.Params, "customThresholds", "")
if err != nil {
return nil, err
}
var thresholds []customThresholdEntry
if err := yaml.Unmarshal([]byte(thresholdsString), &thresholds); err != nil {
return nil, err
}

customThresholds := customThresholds{}
for _, entry := range thresholds {
if entry.Threshold == 0 {
return nil, fmt.Errorf("custom threshold must be set to a positive time duration")
}
key := entry.getKey()
if _, ok := metrics.metrics[key]; !ok {
klog.Infof("WARNING: unrecognized custom threshold API call key: %v", key)
} else {
customThresholds[key] = entry.Threshold
}
}
return customThresholds, nil
}

func (a *apiResponsivenessGatherer) validateAPICalls(identifier string, allowedSlowCalls int, metrics *apiCallMetrics, customThresholds customThresholds) []error {
badMetrics := make([]error, 0)
top := topToPrint

for _, apiCall := range metrics.sorted() {
var threshold time.Duration
if customThreshold, ok := customThresholds[apiCall.getKey()]; ok {
threshold = customThreshold
} else {
threshold = apiCall.getSLOThreshold()
}
var err error
if err = apiCall.Validate(allowedSlowCalls); err != nil {
if err = apiCall.Validate(allowedSlowCalls, threshold); err != nil {
badMetrics = append(badMetrics, err)
}
if top > 0 || err != nil {
Expand All @@ -217,7 +268,7 @@ func (a *apiResponsivenessGatherer) validateAPICalls(identifier string, allowedS
if err != nil {
prefix = "WARNING "
}
klog.Infof("%s: %vTop latency metric: %v", identifier, prefix, apiCall)
klog.Infof("%s: %vTop latency metric: %+v; threshold: %v", identifier, prefix, *apiCall, threshold)
}
}
return badMetrics
Expand Down Expand Up @@ -257,7 +308,7 @@ func newFromSamples(latencySamples, countSamples, countSlowSamples []*model.Samp
}

func (m *apiCallMetrics) getAPICall(resource, subresource, verb, scope string) *apiCallMetric {
key := m.buildKey(resource, subresource, verb, scope)
key := buildKey(resource, subresource, verb, scope)
call, exists := m.metrics[key]
if !exists {
call = &apiCallMetric{
Expand Down Expand Up @@ -327,12 +378,15 @@ func (m *apiCallMetrics) sorted() []*apiCallMetric {
return all
}

func (m *apiCallMetrics) buildKey(resource, subresource, verb, scope string) string {
func buildKey(resource, subresource, verb, scope string) string {
return fmt.Sprintf("%s|%s|%s|%s", resource, subresource, verb, scope)
}

func (ap *apiCallMetric) Validate(allowedSlowCalls int) error {
threshold := ap.getSLOThreshold()
func (ap *apiCallMetric) getKey() string {
return buildKey(ap.Resource, ap.Subresource, ap.Verb, ap.Scope)
}

func (ap *apiCallMetric) Validate(allowedSlowCalls int, threshold time.Duration) error {
if err := ap.Latency.VerifyThreshold(threshold); err != nil {
// TODO(oxddr): remove allowedSlowCalls guard once it's stable
if allowedSlowCalls > 0 && ap.SlowCount <= allowedSlowCalls {
Expand All @@ -352,7 +406,3 @@ func (ap *apiCallMetric) getSLOThreshold() time.Duration {
}
return namespaceThreshold
}

func (ap *apiCallMetric) String() string {
return fmt.Sprintf("%+v; threshold: %v", *ap, ap.getSLOThreshold())
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@ import (
measurementutil "k8s.io/perf-tests/clusterloader2/pkg/measurement/util"
)

var (
// klogv1 allows users to turn on/off logging to stderr only through
// the use of flag. This prevents us from having control over which
// of the test functions have that mechanism turned off when we run
// go test command.
// TODO(#1286): refactor api_responsiveness_prometheus.go to make
// testing of logging easier and remove this hack in the end.
klogLogToStderr = true
)

func turnOffLoggingToStderrInKlog() {
if klogLogToStderr {
klog.InitFlags(nil)
flag.Set("logtostderr", "false")
flag.Parse()
klogLogToStderr = false
}
}

type sample struct {
resource string
subresource string
Expand Down Expand Up @@ -64,7 +83,7 @@ func (ex *fakeQueryExecutor) Query(query string, queryTime time.Time) ([]*model.
sample := &model.Sample{
Metric: model.Metric{
"resource": model.LabelValue(s.resource),
"subresoruce": model.LabelValue(s.subresource),
"subresource": model.LabelValue(s.subresource),
"verb": model.LabelValue(s.verb),
"scope": model.LabelValue(s.scope),
},
Expand Down Expand Up @@ -531,9 +550,7 @@ func TestLogging(t *testing.T) {
},
}

klog.InitFlags(nil)
flag.Set("logtostderr", "false")
flag.Parse()
turnOffLoggingToStderrInKlog()

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -556,3 +573,167 @@ func TestLogging(t *testing.T) {
})
}
}

func TestAPIResponsivenessCustomThresholds(t *testing.T) {
splitter := func(yamlLines []string) string {
return strings.Join(yamlLines, "\n")
}

cases := []struct {
name string
config *measurement.MeasurementConfig
samples []*sample
hasError bool
expectedMessages []string
}{
{
name: "simple_slo_threshold_override_success",
config: &measurement.MeasurementConfig{
Params: map[string]interface{}{
"customThresholds": splitter([]string{
"- verb: PUT",
" resource: leases",
" scope: namespace",
" threshold: 600ms",
}),
},
},
samples: []*sample{
{
resource: "leases",
verb: "PUT",
scope: "namespace",
latency: 0.5,
},
},
hasError: false,
},
{
name: "simple_slo_threshold_override_failure",
config: &measurement.MeasurementConfig{
Params: map[string]interface{}{
"customThresholds": splitter([]string{
"- verb: PUT",
" resource: leases",
" scope: namespace",
" threshold: 400ms",
}),
},
},
samples: []*sample{
{
resource: "leases",
verb: "PUT",
scope: "namespace",
latency: 0.5,
},
},
hasError: true,
expectedMessages: []string{
"WARNING Top latency metric",
},
},
{
name: "empty_custom_thresholds_field",
config: &measurement.MeasurementConfig{
Params: map[string]interface{}{
"customThresholds": "",
},
},
samples: []*sample{
{
resource: "leases",
verb: "PUT",
scope: "namespace",
latency: 0.5,
},
},
hasError: false,
},
{
name: "no_custom_thresholds_field",
config: &measurement.MeasurementConfig{
Params: map[string]interface{}{},
},
samples: []*sample{
{
resource: "leases",
verb: "PUT",
scope: "namespace",
latency: 0.5,
},
},
hasError: false,
},
{
name: "unrecognized_metric",
config: &measurement.MeasurementConfig{
Params: map[string]interface{}{
"customThresholds": splitter([]string{
"- verb: POST",
" resource: pod",
" scope: namespace",
" threshold: 500ms",
}),
},
},
samples: []*sample{
{
resource: "leases",
verb: "PUT",
scope: "namespace",
latency: 0.2,
},
},
hasError: false,
expectedMessages: []string{
"unrecognized custom threshold API call key",
},
},
{
name: "non_unmarshallable_custom_thresholds",
config: &measurement.MeasurementConfig{
Params: map[string]interface{}{
"customThresholds": splitter([]string{
"im: not",
"a: good",
"yaml: array",
}),
},
},
samples: []*sample{
{
resource: "pod",
verb: "POST",
scope: "namespace",
latency: 0.2,
},
},
hasError: true,
},
}

turnOffLoggingToStderrInKlog()

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
buf := bytes.NewBuffer(nil)
klog.SetOutput(buf)

executor := &fakeQueryExecutor{samples: tc.samples}
gatherer := &apiResponsivenessGatherer{}

_, err := gatherer.Gather(executor, time.Now(), tc.config)
klog.Flush()
if tc.hasError {
assert.NotNil(t, err, "expected an error, but got none")
} else {
assert.Nil(t, err, "expected no error, but got %v", err)
}

for _, msg := range tc.expectedMessages {
assert.Contains(t, buf.String(), msg)
}
})
}
}
2 changes: 2 additions & 0 deletions clusterloader2/testing/load/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
{{$ENABLE_RESTART_COUNT_CHECK := DefaultParam .ENABLE_RESTART_COUNT_CHECK false}}
{{$RESTART_COUNT_THRESHOLD_OVERRIDES:= DefaultParam .RESTART_COUNT_THRESHOLD_OVERRIDES ""}}
{{$ALLOWED_SLOW_API_CALLS := DefaultParam .CL2_ALLOWED_SLOW_API_CALLS 0}}
{{$CUSTOM_API_CALL_THRESHOLDS := DefaultParam .CUSTOM_API_CALL_THRESHOLDS ""}}
#Variables
{{$namespaces := DivideInt .Nodes $NODES_PER_NAMESPACE}}
{{$totalPods := MultiplyInt $namespaces $NODES_PER_NAMESPACE $PODS_PER_NODE}}
Expand Down Expand Up @@ -769,6 +770,7 @@ steps:
Params:
action: gather
allowedSlowCalls: {{$ALLOWED_SLOW_API_CALLS}}
customThresholds: {{YamlQuote $CUSTOM_API_CALL_THRESHOLDS 4}}
{{end}}
- Identifier: PodStartupLatency
Method: PodStartupLatency
Expand Down
11 changes: 11 additions & 0 deletions clusterloader2/testing/load/golang/custom_api_call_thresholds.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CUSTOM_API_CALL_THRESHOLDS: |
- verb: PUT
resource: leases
subresource: ''
scope: namespace
threshold: 500ms
- verb: DELETE
resource: pods
subresource: ''
scope: namespace
threshold: 700ms