Skip to content

Commit

Permalink
Add product entitlement check for Origin Inspector before enabling th…
Browse files Browse the repository at this point in the history
…e subscriber for origins. (#118)

* check if origins are available as a product before subscribing

* s/productCache.Fetch/productCache.Refresh/g

* refactor ProductCache

* consolidate Product structs

* add periodic producRefresh. Refactor manager

* manager will track subscribers by type rather than by service
* prodcut cache with refresh every 10m by default

* fix typos

* refactor manager key to be a struct

* use a bool for product access.

This way if access is removed the subscribers will be stopped

* fail open

* remove managedKeysWithLock func and move sorting to the tests

* fix the tests. fix the linting. fix the formatting

* remove unnecessary lookups to the managed map.
  • Loading branch information
leklund authored Dec 1, 2022
1 parent 7f51742 commit d4ec0f6
Show file tree
Hide file tree
Showing 8 changed files with 423 additions and 73 deletions.
48 changes: 47 additions & 1 deletion cmd/fastly-exporter/main.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package main is the entry point for the fastly-exporter.
package main

import (
Expand Down Expand Up @@ -39,6 +40,7 @@ func main() {
metricAllowlist stringslice
metricBlocklist stringslice
datacenterRefresh time.Duration
productRefresh time.Duration
serviceRefresh time.Duration
apiTimeout time.Duration
rtTimeout time.Duration
Expand All @@ -60,6 +62,7 @@ func main() {
fs.Var(&metricAllowlist, "metric-allowlist", "if set, only export metrics whose names match this regex (repeatable)")
fs.Var(&metricBlocklist, "metric-blocklist", "if set, don't export metrics whose names match this regex (repeatable)")
fs.DurationVar(&datacenterRefresh, "datacenter-refresh", 10*time.Minute, "how often to poll api.fastly.com for updated datacenter metadata (10m–1h)")
fs.DurationVar(&productRefresh, "product-refresh", 10*time.Minute, "how often to poll api.fastly.com for updated product metadata (10m–1h)")
fs.DurationVar(&serviceRefresh, "service-refresh", 1*time.Minute, "how often to poll api.fastly.com for updated service metadata (15s–10m)")
fs.DurationVar(&serviceRefresh, "api-refresh", 1*time.Minute, "DEPRECATED -- use service-refresh instead")
fs.DurationVar(&apiTimeout, "api-timeout", 15*time.Second, "HTTP client timeout for api.fastly.com requests (5–60s)")
Expand Down Expand Up @@ -127,6 +130,14 @@ func main() {
level.Warn(logger).Log("msg", "-datacenter-refresh cannot be longer than 1h; setting it to 1h")
datacenterRefresh = 1 * time.Hour
}
if productRefresh < 10*time.Minute {
level.Warn(logger).Log("msg", "-product-refresh cannot be shorter than 10m; setting it to 10m")
productRefresh = 10 * time.Minute
}
if productRefresh > 1*time.Hour {
level.Warn(logger).Log("msg", "-product-refresh cannot be longer than 1h; setting it to 1h")
productRefresh = 1 * time.Hour
}
if serviceRefresh < 15*time.Second {
level.Warn(logger).Log("msg", "-service-refresh cannot be shorter than 15s; setting it to 15s")
serviceRefresh = 15 * time.Second
Expand Down Expand Up @@ -261,6 +272,11 @@ func main() {
datacenterCache = api.NewDatacenterCache(apiClient, token)
}

var productCache *api.ProductCache
{
productCache = api.NewProductCache(apiClient, token, apiLogger)
}

{
var g errgroup.Group
g.Go(func() error {
Expand All @@ -275,6 +291,13 @@ func main() {
}
return nil
})
g.Go(func() error {
if err := productCache.Refresh(context.Background()); err != nil {
level.Warn(logger).Log("during", "initial fetch of products", "err", err, "msg", "products API unavailable, will retry")
}
return nil
})

g.Wait()
}

Expand Down Expand Up @@ -303,7 +326,7 @@ func main() {
rt.WithMetadataProvider(serviceCache),
}
)
manager = rt.NewManager(serviceCache, rtClient, token, registry, subscriberOptions, rtLogger)
manager = rt.NewManager(serviceCache, rtClient, token, registry, subscriberOptions, productCache, rtLogger)
manager.Refresh() // populate initial subscribers, based on the initial cache refresh
}

Expand Down Expand Up @@ -331,6 +354,29 @@ func main() {
cancel()
})
}
{
// Every productRefresh, ask the api.ProductCache to refresh
// data from the product entitlement endpoint.
var (
ctx, cancel = context.WithCancel(context.Background())
ticker = time.NewTicker(productRefresh)
)
g.Add(func() error {
for {
select {
case <-ticker.C:
if err := productCache.Refresh(ctx); err != nil {
level.Warn(apiLogger).Log("during", "product refresh", "err", err, "msg", "the product entitlement data may be stale")
}
case <-ctx.Done():
return ctx.Err()
}
}
}, func(error) {
ticker.Stop()
cancel()
})
}
{
// Every serviceRefresh, ask the api.ServiceCache to refresh the set of
// services we should be exporting data for. Then, ask the rt.Manager to
Expand Down
30 changes: 30 additions & 0 deletions pkg/api/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,33 @@ func (c paginatedResponseClient) Do(req *http.Request) (*http.Response, error) {
}).ServeHTTP(rec, req)
return rec.Result(), nil
}

//
//
//

type sequentialResponseClient struct {
responses []string
}

func newSequentialResponseClient(responses ...string) *sequentialResponseClient {
return &sequentialResponseClient{
responses: responses,
}
}

func (c *sequentialResponseClient) Do(req *http.Request) (*http.Response, error) {
var response string
if len(c.responses) <= 1 {
response = c.responses[0]
} else {
response, c.responses = c.responses[0], c.responses[1:]
}

rec := httptest.NewRecorder()
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, response)
}).ServeHTTP(rec, req)
return rec.Result(), nil
}
4 changes: 2 additions & 2 deletions pkg/api/link_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ func TestGetNextLink(t *testing.T) {
},

{
name: `RFC 5988 1`,
name: `RFC 5988 1`,
links: []string{` <http://example.com/TheBook/chapter2>; rel="previous"; title="previous chapter"`},
want: ``,
want: ``,
},
{
name: `RFC 5988 2`,
Expand Down
108 changes: 108 additions & 0 deletions pkg/api/product_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package api

import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
)

const (
// Default is the standard real-time stats available to all services
Default = "default"
// OriginInspector is the product name used to determine access to Origin Inspector via the entitlement API
OriginInspector = "origin_inspector"
)

// Products is the slice of available products supported by real-time stats.
var Products = []string{Default, OriginInspector}

// Product models the response from the Fastly Product Entitlement API.
type Product struct {
HasAccess bool `json:"has_access"`
Meta struct {
Name string `json:"id"`
} `json:"product"`
}

// ProductCache fetches product information from the Fastly Product Entitlement API
// and stores results in a local cache.
type ProductCache struct {
client HTTPClient
token string
logger log.Logger

mtx sync.Mutex
products map[string]bool
}

// NewProductCache returns an empty cache of Product information. Use the Refresh method
// to populate with data.
func NewProductCache(client HTTPClient, token string, logger log.Logger) *ProductCache {
return &ProductCache{
client: client,
token: token,
logger: logger,
products: make(map[string]bool),
}
}

// Refresh requests data from the Fastly API and stores data in the cache.
func (p *ProductCache) Refresh(ctx context.Context) error {
for _, product := range Products {
if product == Default {
continue
}
uri := fmt.Sprintf("https://api.fastly.com/entitled-products/%s", product)

req, err := http.NewRequestWithContext(ctx, "GET", uri, nil)
if err != nil {
return fmt.Errorf("error constructing API product request: %w", err)
}

req.Header.Set("Fastly-Key", p.token)
req.Header.Set("Accept", "application/json")
resp, err := p.client.Do(req)
if err != nil {
return fmt.Errorf("error executing API product request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return NewError(resp)
}

var response Product

if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
return fmt.Errorf("error decoding API product response: %w", err)
}

level.Debug(p.logger).Log("product", response.Meta.Name, "hasAccess", response.HasAccess)

p.mtx.Lock()
p.products[response.Meta.Name] = response.HasAccess
p.mtx.Unlock()

}

return nil
}

// HasAccess takes a product as a string and returns a boolean
// based on the response from the Product API.
func (p *ProductCache) HasAccess(product string) bool {
if product == Default {
return true
}
p.mtx.Lock()
defer p.mtx.Unlock()
if v, ok := p.products[product]; ok {
return v
}
return true
}
88 changes: 88 additions & 0 deletions pkg/api/product_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package api_test

import (
"context"
"net/http"
"testing"

"github.com/fastly/fastly-exporter/pkg/api"
"github.com/go-kit/log"
"github.com/google/go-cmp/cmp"
)

func TestProductCache(t *testing.T) {
t.Parallel()

for _, testcase := range []struct {
name string
client api.HTTPClient
wantProds map[string]bool
wantErr error
}{
{
name: "success",
client: newSequentialResponseClient(productsResponseOne, productsResponseTwo),
wantErr: nil,
wantProds: map[string]bool{
"origin_inspector": true,
},
},
{
name: "error",
client: fixedResponseClient{code: http.StatusUnauthorized},
wantErr: &api.Error{Code: http.StatusUnauthorized},
wantProds: map[string]bool{},
},
} {
t.Run(testcase.name, func(t *testing.T) {
var (
ctx = context.Background()
client = testcase.client
cache = api.NewProductCache(client, "irrelevant token", log.NewNopLogger())
)

// err
if want, have := testcase.wantErr, cache.Refresh(ctx); !cmp.Equal(want, have) {
t.Fatal(cmp.Diff(want, have))
}

for k, v := range testcase.wantProds {
if v != cache.HasAccess(k) {
t.Fatalf("expected %v, got %v for %v", v, cache.HasAccess(k), k)
}
}
})
}
}

const productsResponseOne = `
{
"product": {
"id": "origin_inspector",
"object": "product"
},
"has_access": true,
"access_level": "Origin_Inspector",
"has_permission_to_enable": false,
"has_permission_to_disable": true,
"_links": {
"self": ""
}
}
`

const productsResponseTwo = `
{
"product": {
"id": "domain_inspector",
"object": "product"
},
"has_access": false,
"access_level": "Domain_Inspector",
"has_permission_to_enable": false,
"has_permission_to_disable": true,
"_links": {
"self": ""
}
}
`
31 changes: 31 additions & 0 deletions pkg/rt/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,37 @@ func (c *mockCache) Metadata(id string) (name string, version int, found bool) {
//
//

type mockProductCache struct {
mtx sync.RWMutex
products map[string]bool
}

func newMockProductCache() *mockProductCache {
return &mockProductCache{
products: make(map[string]bool),
}
}

func (c *mockProductCache) HasAccess(product string) bool {
c.mtx.Lock()
defer c.mtx.Unlock()
if v, ok := c.products[product]; ok {
return v
}
return true
}

func (c *mockProductCache) update(product string, hasAccess bool) {
c.mtx.Lock()
defer c.mtx.Unlock()

c.products[product] = hasAccess
}

//
//
//

type mockRealtimeClient struct {
responses []string
next chan struct{}
Expand Down
Loading

0 comments on commit d4ec0f6

Please sign in to comment.