Skip to content

Commit

Permalink
Support for pagination in list catalog (#201)
Browse files Browse the repository at this point in the history
+ Tagserver now sends list and listRepositories in paginated format
+ TagClient support list and listRepositories with pagination
+ registryoverride support pagination for catalog listing
+ Test cases
  • Loading branch information
rmalpani-uber authored and evelynl94 committed Aug 26, 2019
1 parent 63cd7ff commit fdd1276
Show file tree
Hide file tree
Showing 7 changed files with 329 additions and 68 deletions.
124 changes: 104 additions & 20 deletions build-index/tagclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/url"
"strconv"
"time"

"github.com/uber/kraken/build-index/tagmodels"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/healthcheck"
"github.com/uber/kraken/utils/httputil"
Expand All @@ -40,7 +43,9 @@ type Client interface {
Get(tag string) (core.Digest, error)
Has(tag string) (bool, error)
List(prefix string) ([]string, error)
ListWithPagination(prefix string, filter ListFilter) (tagmodels.ListResponse, error)
ListRepository(repo string) ([]string, error)
ListRepositoryWithPagination(repo string, filter ListFilter) (tagmodels.ListResponse, error)
Replicate(tag string) error
Origin() (string, error)

Expand All @@ -54,6 +59,12 @@ type singleClient struct {
tls *tls.Config
}

// ListFilter contains filter request for list with pagination operations.
type ListFilter struct {
Offset string
Limit int
}

// NewSingleClient returns a Client scoped to a single tagserver instance.
func NewSingleClient(addr string, config *tls.Config) Client {
return &singleClient{addr, config}
Expand Down Expand Up @@ -112,37 +123,90 @@ func (c *singleClient) Has(tag string) (bool, error) {
return true, nil
}

func (c *singleClient) List(prefix string) ([]string, error) {
resp, err := httputil.Get(
fmt.Sprintf("http://%s/list/%s", c.addr, prefix),
func (c *singleClient) doListPaginated(urlFormat string, pathSub string,
filter ListFilter) (tagmodels.ListResponse, error) {

// Build query.
reqVal := url.Values{}
if filter.Offset != "" {
reqVal.Add(tagmodels.OffsetQ, filter.Offset)
}
if filter.Limit != 0 {
reqVal.Add(tagmodels.LimitQ, strconv.Itoa(filter.Limit))
}

// Fetch list response from server.
serverUrl := url.URL{
Scheme: "http",
Host: c.addr,
Path: fmt.Sprintf(urlFormat, pathSub),
RawQuery: reqVal.Encode(),
}
var resp tagmodels.ListResponse
httpResp, err := httputil.Get(
serverUrl.String(),
httputil.SendTimeout(60*time.Second),
httputil.SendTLS(c.tls))
if err != nil {
return nil, err
return resp, err
}
defer resp.Body.Close()
defer httpResp.Body.Close()
if err := json.NewDecoder(httpResp.Body).Decode(&resp); err != nil {
return resp, fmt.Errorf("json decode: %s", err)
}

return resp, nil
}

func (c *singleClient) doList(pathSub string,
fn func(pathSub string, filter ListFilter) (tagmodels.ListResponse, error)) (
[]string, error) {

var names []string
if err := json.NewDecoder(resp.Body).Decode(&names); err != nil {
return nil, fmt.Errorf("json decode: %s", err)

offset := ""
for ok := true; ok; ok = (offset != "") {
filter := ListFilter{Offset: offset}
resp, err := fn(pathSub, filter)
if err != nil {
return nil, err
}
offset, err = resp.GetOffset()
if err != nil && err != io.EOF {
return nil, err
}
names = append(names, resp.Result...)
}
return names, nil
}

func (c *singleClient) List(prefix string) ([]string, error) {
return c.doList(prefix, func(prefix string, filter ListFilter) (
tagmodels.ListResponse, error) {

return c.ListWithPagination(prefix, filter)
})
}

func (c *singleClient) ListWithPagination(prefix string, filter ListFilter) (
tagmodels.ListResponse, error) {

return c.doListPaginated("list/%s", prefix, filter)
}

// XXX: Deprecated. Use List instead.
func (c *singleClient) ListRepository(repo string) ([]string, error) {
resp, err := httputil.Get(
fmt.Sprintf("http://%s/repositories/%s/tags", c.addr, url.PathEscape(repo)),
httputil.SendTimeout(60*time.Second),
httputil.SendTLS(c.tls))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var tags []string
if err := json.NewDecoder(resp.Body).Decode(&tags); err != nil {
return nil, fmt.Errorf("json decode: %s", err)
}
return tags, nil
return c.doList(repo, func(repo string, filter ListFilter) (
tagmodels.ListResponse, error) {

return c.ListRepositoryWithPagination(repo, filter)
})
}

func (c *singleClient) ListRepositoryWithPagination(repo string,
filter ListFilter) (tagmodels.ListResponse, error) {

return c.doListPaginated("repositories/%s/tags", url.PathEscape(repo), filter)
}

// ReplicateRequest defines a Replicate request body.
Expand Down Expand Up @@ -279,6 +343,16 @@ func (cc *clusterClient) List(prefix string) (tags []string, err error) {
return
}

func (cc *clusterClient) ListWithPagination(prefix string, filter ListFilter) (
resp tagmodels.ListResponse, err error) {

err = cc.do(func(c Client) error {
resp, err = c.ListWithPagination(prefix, filter)
return err
})
return
}

func (cc *clusterClient) ListRepository(repo string) (tags []string, err error) {
err = cc.do(func(c Client) error {
tags, err = c.ListRepository(repo)
Expand All @@ -287,6 +361,16 @@ func (cc *clusterClient) ListRepository(repo string) (tags []string, err error)
return
}

func (cc *clusterClient) ListRepositoryWithPagination(repo string,
filter ListFilter) (resp tagmodels.ListResponse, err error) {

err = cc.do(func(c Client) error {
resp, err = c.ListRepositoryWithPagination(repo, filter)
return err
})
return
}

func (cc *clusterClient) Replicate(tag string) error {
return cc.do(func(c Client) error { return c.Replicate(tag) })
}
Expand Down
59 changes: 59 additions & 0 deletions build-index/tagmodels/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tagmodels

import (
"fmt"
"io"
"net/url"
)

const (
// Filters.
LimitQ string = "limit"
OffsetQ string = "offset"
)

// List Response with pagination. Models tagserver reponse to list and
// listRepository.
type ListResponse struct {
Links struct {
Next string `json:"next"`
Self string `json:"self"`
}
Size int `json:"size"`
Result []string `json:"result"`
}

// GetOffset returns offset token from the ListResponse struct.
// Returns token if present, io.EOF if Next is empty, error otherwise.
func (resp ListResponse) GetOffset() (string, error) {
if resp.Links.Next == "" {
return "", io.EOF
}

nextUrl, err := url.Parse(resp.Links.Next)
if err != nil {
return "", err
}
val, err := url.ParseQuery(nextUrl.RawQuery)
if err != nil {
return "", err
}
offset := val.Get(OffsetQ)
if offset == "" {
return "", fmt.Errorf("invalid offset in %s", resp.Links.Next)
}
return offset, nil
}
62 changes: 26 additions & 36 deletions build-index/tagserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/uber/kraken/build-index/tagclient"
"github.com/uber/kraken/build-index/tagmodels"
"github.com/uber/kraken/build-index/tagstore"
"github.com/uber/kraken/build-index/tagtype"
"github.com/uber/kraken/core"
Expand All @@ -45,11 +46,6 @@ import (
"github.com/uber-go/tally"
)

const (
limitQ string = "limit"
offsetQ string = "offset"
)

// Server provides tag operations for the build-index.
type Server struct {
config Config
Expand All @@ -69,16 +65,6 @@ type Server struct {
depResolver tagtype.DependencyResolver
}

// List Response with pagination.
type ListResponse struct {
Links struct {
Next string `json:"next"`
Self string `json:"self"`
}
Size int `json:"size"`
Result []string `json:"result"`
}

// New creates a new Server.
func New(
config Config,
Expand Down Expand Up @@ -253,6 +239,8 @@ func (s *Server) hasTagHandler(w http.ResponseWriter, r *http.Request) error {
return nil
}

// listHandler handles list images request. Response model
// tagmodels.ListResponse.
func (s *Server) listHandler(w http.ResponseWriter, r *http.Request) error {
prefix := r.URL.Path[len("/list/"):]

Expand Down Expand Up @@ -282,6 +270,8 @@ func (s *Server) listHandler(w http.ResponseWriter, r *http.Request) error {
return nil
}

// listRepositoryHandler handles list images tag request. Response model
// tagmodels.ListResponse.
// TODO(codyg): Remove this.
func (s *Server) listRepositoryHandler(w http.ResponseWriter, r *http.Request) error {
repo, err := httputil.ParseParam(r, "repo")
Expand Down Expand Up @@ -455,7 +445,7 @@ func buildPaginationOptions(u *url.URL) ([]backend.ListOption, error) {
"invalid query %s:%s", k, v).Status(http.StatusBadRequest)
}
switch k {
case limitQ:
case tagmodels.LimitQ:
limitCount, err := strconv.Atoi(v[0])
if err != nil {
return nil, handler.Errorf(
Expand All @@ -466,7 +456,7 @@ func buildPaginationOptions(u *url.URL) ([]backend.ListOption, error) {
"invalid limit %d", limitCount).Status(http.StatusBadRequest)
}
opts = append(opts, backend.ListWithMaxKeys(limitCount))
case offsetQ:
case tagmodels.OffsetQ:
opts = append(opts, backend.ListWithContinuationToken(v[0]))
default:
return nil, handler.Errorf(
Expand All @@ -482,31 +472,31 @@ func buildPaginationOptions(u *url.URL) ([]backend.ListOption, error) {
}

func buildPaginationResponse(u *url.URL, continuationToken string,
result []string) (interface{}, error) {

if continuationToken == "" {
return result, nil
}
result []string) (*tagmodels.ListResponse, error) {

// Deep copy url.
nextUrl, err := url.Parse(u.String())
if err != nil {
return nil, handler.Errorf(
"invalid url string: %s", err).Status(http.StatusBadRequest)
}
v := url.Values{}
if limit := u.Query().Get(limitQ); limit != "" {
v.Add(limitQ, limit)
nextUrlString := ""
if continuationToken != "" {
// Deep copy url.
nextUrl, err := url.Parse(u.String())
if err != nil {
return nil, handler.Errorf(
"invalid url string: %s", err).Status(http.StatusBadRequest)
}
v := url.Values{}
if limit := u.Query().Get(tagmodels.LimitQ); limit != "" {
v.Add(tagmodels.LimitQ, limit)
}
// ContinuationToken cannot be empty here.
v.Add(tagmodels.OffsetQ, continuationToken)
nextUrl.RawQuery = v.Encode()
nextUrlString = nextUrl.String()
}
// ContinuationToken cannot be empty here.
v.Add(offsetQ, continuationToken)
nextUrl.RawQuery = v.Encode()

resp := ListResponse{
resp := tagmodels.ListResponse{
Size: len(result),
Result: result,
}
resp.Links.Next = nextUrl.String()
resp.Links.Next = nextUrlString
resp.Links.Self = u.String()

return &resp, nil
Expand Down
Loading

0 comments on commit fdd1276

Please sign in to comment.