diff --git a/.circleci/config.yml b/.circleci/config.yml index 61ba406..dda8e0c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,23 +1,33 @@ -version: 2 +version: 2.1 jobs: - redis5: + redis: + parameters: + redis-image: + type: string + go-image: + type: string docker: - - image: circleci/golang:1.16 + - image: << parameters.go-image >> environment: - GO111MODULE=on - - image: redis:5-alpine + - image: << parameters.redis-image >> working_directory: /go/src/github.com/taylorchu/work steps: - checkout - run: dockerize -wait tcp://:6379 - run: go test -p 1 -v ./... - redis5-cluster: + redis-cluster: + parameters: + redis-image: + type: string + go-image: + type: string docker: - - image: circleci/golang:1.16 + - image: << parameters.go-image >> environment: - GO111MODULE=on - REDIS_ADDR=:7000,:7001,:7002 - - image: redis:5-alpine + - image: << parameters.redis-image >> command: - --cluster-enabled - "yes" @@ -25,7 +35,7 @@ jobs: - "7000" - --cluster-config-file - 7000.conf - - image: redis:5-alpine + - image: << parameters.redis-image >> command: - --cluster-enabled - "yes" @@ -33,7 +43,7 @@ jobs: - "7001" - --cluster-config-file - 7001.conf - - image: redis:5-alpine + - image: << parameters.redis-image >> command: - --cluster-enabled - "yes" @@ -45,74 +55,42 @@ jobs: steps: - checkout - run: dockerize -wait tcp://:7000 -wait tcp://:7001 -wait tcp://:7002 - - run: sudo apt-get install redis-tools + - run: sudo apt-get update && sudo apt-get install redis-tools - run: echo yes | redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 - run: go test -p 1 -v ./... - redis6: + fossa: docker: - - image: circleci/golang:1.16 - environment: - - GO111MODULE=on - - image: redis:6-alpine + - image: alpine working_directory: /go/src/github.com/taylorchu/work steps: - checkout - - run: dockerize -wait tcp://:6379 - - run: go test -p 1 -v ./... - redis6-cluster: - docker: - - image: circleci/golang:1.16 - environment: - - GO111MODULE=on - - REDIS_ADDR=:7000,:7001,:7002 - - image: redis:6-alpine - command: - - --cluster-enabled - - "yes" - - --port - - "7000" - - --cluster-config-file - - 7000.conf - - image: redis:6-alpine - command: - - --cluster-enabled - - "yes" - - --port - - "7001" - - --cluster-config-file - - 7001.conf - - image: redis:6-alpine - command: - - --cluster-enabled - - "yes" - - --port - - "7002" - - --cluster-config-file - - 7002.conf - working_directory: /go/src/github.com/taylorchu/work - steps: - - checkout - - run: dockerize -wait tcp://:7000 -wait tcp://:7001 -wait tcp://:7002 - - run: sudo apt-get install redis-tools - - run: echo yes | redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 - - run: go test -p 1 -v ./... - keydb: - docker: - - image: circleci/golang:1.16 - environment: - - GO111MODULE=on - - image: eqalpha/keydb - working_directory: /go/src/github.com/taylorchu/work - steps: - - checkout - - run: dockerize -wait tcp://:6379 - - run: go test -p 1 -v ./... + - run: apk add --no-cache curl bash + - run: | + curl -H 'Cache-Control: no-cache' https://raw.githubusercontent.com/fossas/fossa-cli/master/install-latest.sh | bash + - run: fossa analyze + - run: fossa test workflows: version: 2 build_and_test: jobs: - - redis5 - - redis6 - - redis5-cluster - - redis6-cluster - - keydb + - redis: + matrix: + parameters: + go-image: + - circleci/golang:1.17 + - circleci/golang:1.16 + redis-image: + - redis:5-alpine + - redis:6-alpine + - eqalpha/keydb + - redis-cluster: + matrix: + parameters: + go-image: + - circleci/golang:1.17 + - circleci/golang:1.16 + redis-image: + - redis:5-alpine + - redis:6-alpine + - eqalpha/keydb + - fossa diff --git a/README.md b/README.md index 614a995..27f81d6 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![GoDoc](https://godoc.org/github.com/taylorchu/work?status.png)](https://godoc.org/github.com/taylorchu/work) [![Go Report Card](https://goreportcard.com/badge/github.com/taylorchu/work)](https://goreportcard.com/report/github.com/taylorchu/work) -[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Ftaylorchu%2Fwork.svg?type=shield)](https://app.fossa.io/projects/git%2Bgithub.com%2Ftaylorchu%2Fwork?ref=badge_shield) +[![FOSSA Status](https://app.fossa.com/api/projects/custom%2B4257%2Fgit%40github.com%3Ataylorchu%2Fwork.git.svg?type=shield)](https://app.fossa.com/projects/custom%2B4257%2Fgit%40github.com%3Ataylorchu%2Fwork.git?ref=badge_shield) [![CircleCI](https://circleci.com/gh/taylorchu/work.svg?style=svg)](https://circleci.com/gh/taylorchu/work) Please see `cmd/` for enqueuer and worker demo. @@ -10,35 +10,40 @@ Please see `cmd/` for enqueuer and worker demo. ## Improvements - [x] queue backend abstraction - - redis is still the default, but the new design allows custom queue implementation. + - redis is still the default, but the new design allows custom queue implementation. - [x] simplify the keyspace design of redis queue backend - - The new design uses 1 redis hash per job, and 1 redis sorted set for queue. - - [Interesting read](https://kirshatrov.com/2018/07/20/redis-job-queue/) + - The new design uses 1 redis hash per job, and 1 redis sorted set for queue. + - [Interesting read](https://kirshatrov.com/2018/07/20/redis-job-queue/) - [x] modular - - The core only catches panics, retries on failure, and waits if a queue is empty. - - All other [functionalities](https://kirshatrov.com/2019/01/03/state-of-background-jobs/) - are either removed or moved to separate middlewares. + - The core only catches panics, retries on failure, and waits if a queue is empty. + - All other [functionalities](https://kirshatrov.com/2019/01/03/state-of-background-jobs/) + are either removed or moved to separate middlewares. - [x] support binary payload/args with message pack. - [x] replace built-in UI with prometheus metrics (use grafana if you want dashboard). - [x] additional optimizations (alloc + bulk queue ops) - ```go - BenchmarkWorkerRunJob/work_v1_1-8 3000 515957 ns/op - BenchmarkWorkerRunJob/work_v2_1-8 5000 284516 ns/op - BenchmarkWorkerRunJob/work_v1_10-8 1000 2136546 ns/op - BenchmarkWorkerRunJob/work_v2_10-8 5000 367997 ns/op - BenchmarkWorkerRunJob/work_v1_100-8 100 18234023 ns/op - BenchmarkWorkerRunJob/work_v2_100-8 1000 1759186 ns/op - BenchmarkWorkerRunJob/work_v1_1000-8 10 162110100 ns/op - BenchmarkWorkerRunJob/work_v2_1000-8 100 12646080 ns/op - BenchmarkWorkerRunJob/work_v1_10000-8 1 1691287122 ns/op - BenchmarkWorkerRunJob/work_v2_10000-8 10 144923087 ns/op - BenchmarkWorkerRunJob/work_v1_100000-8 1 17515722574 ns/op - BenchmarkWorkerRunJob/work_v2_100000-8 1 1502468637 ns/op - PASS - ok github.com/taylorchu/work 87.901s - ``` -- [ ] http enqueuer - + ```go + BenchmarkWorkerRunJob/work_v1_1-8 3000 515957 ns/op + BenchmarkWorkerRunJob/work_v2_1-8 5000 284516 ns/op + BenchmarkWorkerRunJob/work_v1_10-8 1000 2136546 ns/op + BenchmarkWorkerRunJob/work_v2_10-8 5000 367997 ns/op + BenchmarkWorkerRunJob/work_v1_100-8 100 18234023 ns/op + BenchmarkWorkerRunJob/work_v2_100-8 1000 1759186 ns/op + BenchmarkWorkerRunJob/work_v1_1000-8 10 162110100 ns/op + BenchmarkWorkerRunJob/work_v2_1000-8 100 12646080 ns/op + BenchmarkWorkerRunJob/work_v1_10000-8 1 1691287122 ns/op + BenchmarkWorkerRunJob/work_v2_10000-8 10 144923087 ns/op + BenchmarkWorkerRunJob/work_v1_100000-8 1 17515722574 ns/op + BenchmarkWorkerRunJob/work_v2_100000-8 1 1502468637 ns/op + PASS + ok github.com/taylorchu/work 87.901s + ``` +- [x] http server + - [x] delete job + - [x] create job + - [x] get job status + - [x] get queue metrics (kubernetes autoscaler integration with [keda metrics api scaler](https://keda.sh/docs/2.5/scalers/metrics-api/)) + - [ ] OpenAPI spec ## License -[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Ftaylorchu%2Fwork.svg?type=large)](https://app.fossa.io/projects/git%2Bgithub.com%2Ftaylorchu%2Fwork?ref=badge_large) + +[![FOSSA Status](https://app.fossa.com/api/projects/custom%2B4257%2Fgit%40github.com%3Ataylorchu%2Fwork.git.svg?type=large)](https://app.fossa.com/projects/custom%2B4257%2Fgit%40github.com%3Ataylorchu%2Fwork.git?ref=badge_large) diff --git a/bench/go.mod b/bench/go.mod index 4a44c69..ed00263 100644 --- a/bench/go.mod +++ b/bench/go.mod @@ -1,12 +1,24 @@ module github.com/taylorchu/work/bench -go 1.16 +go 1.17 require ( - github.com/go-redis/redis/v8 v8.10.0 + github.com/go-redis/redis/v8 v8.11.4 github.com/gocraft/work v0.5.2-0.20180912175354-c85b71e20062 github.com/gomodule/redigo v1.8.5 - github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/testify v1.7.0 - github.com/taylorchu/work v0.1.12-0.20210619192221-d3325e033bdf + github.com/taylorchu/work v0.2.4-0.20220210165902-787ae01fa4ea +) + +require ( + github.com/cenkalti/backoff/v4 v4.1.2 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/robfig/cron v1.2.0 // indirect + github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect ) diff --git a/bench/go.sum b/bench/go.sum index 586cb8e..3391ad9 100644 --- a/bench/go.sum +++ b/bench/go.sum @@ -1,20 +1,7 @@ -cloud.google.com/go v0.34.0 h1:eOI3/cP2VTU6uZLDYAoic+eyzzB9YyGmJ7eIjl8rOPg= -cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= -github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= -github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= -github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= -github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= -github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= -github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= -github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ= -github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= -github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= -github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= +github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -23,231 +10,112 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= -github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk= -github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= -github.com/go-kit/log v0.1.0 h1:DGJh0Sm43HbOeYDNnVZFl8BvcYVvjD5bqYJvp0REbwQ= -github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= -github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= -github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= -github.com/go-logfmt/logfmt v0.5.0 h1:TrB8swr/68K7m9CcGut2g3UOihhbcbiMAYiuTXdEih4= -github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= -github.com/go-redis/redis/v8 v8.10.0 h1:OZwrQKuZqdJ4QIM8wn8rnuz868Li91xA3J2DEq+TPGA= -github.com/go-redis/redis/v8 v8.10.0/go.mod h1:vXLTvigok0VtUX0znvbcEW1SOt4OA9CU1ZfnOtKOaiM= -github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= -github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg= +github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/gocraft/work v0.5.2-0.20180912175354-c85b71e20062 h1:o6jrINvuif/AgRZwiXF32IOtzYCsdKH6Fzm/gFBxG/w= github.com/gocraft/work v0.5.2-0.20180912175354-c85b71e20062/go.mod h1:pc3n9Pb5FAESPPGfM0nL+7Q1xtgtRnF8rr/azzhQVlM= -github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo= -github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= -github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/gomodule/redigo v1.8.5 h1:nRAxCa+SVsyjSBrtZmG/cqb6VbTmuRzpg/PoTFlpumc= github.com/gomodule/redigo v1.8.5/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= -github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= -github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= -github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= -github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= -github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ= -github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= -github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= -github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= -github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= -github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY= -github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pty v1.1.1 h1:VkoXIwSboBpnk99O/KFauAEILuNHv5DVFKZMBN/gUgw= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= -github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= -github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= -github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= -github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= -github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.15.0 h1:1V1NfVQR87RtWAgp1lv9JZJ5Jap+XFGKPi00andXGi4= -github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg= +github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= +github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ= -github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= -github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c= +github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= -github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= -github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= -github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ= -github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= -github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= -github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M= -github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= -github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= -github.com/prometheus/common v0.26.0 h1:iMAkS2TDoNWnKM+Kopnx/8tnEStIfpYA0ur0xQzzhMQ= -github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= -github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= -github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= -github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= -github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= -github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= -github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= -github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= -github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= -github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= -github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/taylorchu/work v0.1.12-0.20210619192221-d3325e033bdf h1:THNm3CxSpAsAnpAAnB9GHDfC4F6YENyeGsjVr4oWggo= -github.com/taylorchu/work v0.1.12-0.20210619192221-d3325e033bdf/go.mod h1:Q1RmJn/ZbuC9ic+PEiEAn4EiSMo+WB57mEk/hA+wTH4= -github.com/vmihailenco/msgpack/v5 v5.3.4 h1:qMKAwOV+meBw2Y8k9cVwAy7qErtYCwBzZ2ellBfvnqc= -github.com/vmihailenco/msgpack/v5 v5.3.4/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= +github.com/taylorchu/work v0.2.4-0.20220210165902-787ae01fa4ea h1:bU4abITIYex+dscOowA1/KHwHP//sQrFNX5xqcqxUVY= +github.com/taylorchu/work v0.2.4-0.20220210165902-787ae01fa4ea/go.mod h1:xMoGkfLklPrd9fznMeUHuOaSZMjHM/xUe2QMN6m/6x4= +github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU= +github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= -github.com/yuin/goldmark v1.2.1 h1:ruQGxdhGHe7FWOJPT0mKs5+pD2Xs1Bm/kdGlHO04FmM= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opentelemetry.io/otel v0.20.0 h1:eaP0Fqu7SXHwvjiqDq83zImeehOHX8doTvU9AwXON8g= -go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= -go.opentelemetry.io/otel/metric v0.20.0 h1:4kzhXFP+btKm4jwxpjIqjs41A7MakRFUS86bqLHTIw8= -go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU= -go.opentelemetry.io/otel/oteltest v0.20.0 h1:HiITxCawalo5vQzdHfKeZurV8x7ljcqAgiWzF6Vaeaw= -go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa9FqQ2IQ8LoxiGnw= -go.opentelemetry.io/otel/trace v0.20.0 h1:1DL6EXUdcg95gukhuRRvLDO/4X5THh/5dIV52lqtnbw= -go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw= -golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U= -golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421 h1:Wo7BWFiOk0QRFMLYMqJGFMd9CgUAcGx7V+qEg/h5IBI= -golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= -golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q= -golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e h1:4nW4NLDYnU28ojHaHO8OVxFHk/aQ33U01a9cjED+pzE= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508= -google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.26.0-rc.1 h1:7QnIQpGRHE5RnLKnESfDoxm2dTapTZua5a0kS0A+VXQ= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= -gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= -gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/bench/worker_bench_test.go b/bench/worker_bench_test.go index fb6277a..d919d87 100644 --- a/bench/worker_bench_test.go +++ b/bench/worker_bench_test.go @@ -31,8 +31,9 @@ func BenchmarkWorkerRunJob(b *testing.B) { for k := 1; k <= 1000; k *= 10 { b.Run(fmt.Sprintf("work_v1_%d", k), func(b *testing.B) { + b.StopTimer() + for n := 0; n < b.N; n++ { - b.StopTimer() require.NoError(b, redistest.Reset(client)) wp := work.NewWorkerPoolWithOptions( @@ -60,11 +61,13 @@ func BenchmarkWorkerRunJob(b *testing.B) { wp.Start() wg.Wait() wp.Stop() + b.StopTimer() } }) b.Run(fmt.Sprintf("work_v2_%d", k), func(b *testing.B) { + b.StopTimer() + for n := 0; n < b.N; n++ { - b.StopTimer() require.NoError(b, redistest.Reset(client)) queue := work2.NewRedisQueue(client) @@ -102,6 +105,7 @@ func BenchmarkWorkerRunJob(b *testing.B) { w.Start() wg.Wait() w.Stop() + b.StopTimer() } }) } diff --git a/go.mod b/go.mod index 4adda25..eb209a8 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,30 @@ module github.com/taylorchu/work -go 1.16 +go 1.17 require ( - github.com/cenkalti/backoff/v4 v4.1.1 - github.com/go-redis/redis/v8 v8.10.0 - github.com/google/uuid v1.2.0 + github.com/cenkalti/backoff/v4 v4.1.2 + github.com/go-redis/redis/v8 v8.11.4 + github.com/google/uuid v1.3.0 github.com/prometheus/client_golang v1.11.0 github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.0 - github.com/vmihailenco/msgpack/v5 v5.3.4 + github.com/vmihailenco/msgpack/v5 v5.3.5 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.2.0 // indirect + github.com/prometheus/common v0.26.0 // indirect + github.com/prometheus/procfs v0.6.0 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 // indirect + google.golang.org/protobuf v1.26.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect ) diff --git a/go.sum b/go.sum index 24c7fd4..6dea827 100644 --- a/go.sum +++ b/go.sum @@ -8,16 +8,18 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ= -github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= -github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= +github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= +github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -25,9 +27,10 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= -github.com/go-redis/redis/v8 v8.10.0 h1:OZwrQKuZqdJ4QIM8wn8rnuz868Li91xA3J2DEq+TPGA= -github.com/go-redis/redis/v8 v8.10.0/go.mod h1:vXLTvigok0VtUX0znvbcEW1SOt4OA9CU1ZfnOtKOaiM= +github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg= +github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -38,16 +41,20 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= -github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= @@ -58,8 +65,10 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= @@ -70,12 +79,16 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg= +github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= +github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= +github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c= +github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -109,21 +122,15 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/vmihailenco/msgpack/v5 v5.3.4 h1:qMKAwOV+meBw2Y8k9cVwAy7qErtYCwBzZ2ellBfvnqc= -github.com/vmihailenco/msgpack/v5 v5.3.4/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= +github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU= +github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opentelemetry.io/otel v0.20.0 h1:eaP0Fqu7SXHwvjiqDq83zImeehOHX8doTvU9AwXON8g= -go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= -go.opentelemetry.io/otel/metric v0.20.0 h1:4kzhXFP+btKm4jwxpjIqjs41A7MakRFUS86bqLHTIw8= -go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU= -go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa9FqQ2IQ8LoxiGnw= -go.opentelemetry.io/otel/trace v0.20.0 h1:1DL6EXUdcg95gukhuRRvLDO/4X5THh/5dIV52lqtnbw= -go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -139,7 +146,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -163,19 +171,25 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7qvEEgHo3otj+HB5CM7Q= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= @@ -184,17 +198,22 @@ google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.26.0-rc.1 h1:7QnIQpGRHE5RnLKnESfDoxm2dTapTZua5a0kS0A+VXQ= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/http/duration.go b/http/duration.go new file mode 100644 index 0000000..74bb4f1 --- /dev/null +++ b/http/duration.go @@ -0,0 +1,31 @@ +package http + +import ( + "encoding/json" + "fmt" + "time" +) + +type duration time.Duration + +func (d duration) MarshalJSON() ([]byte, error) { + return json.Marshal(time.Duration(d).String()) +} + +func (d *duration) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + switch v := v.(type) { + case string: + tmp, err := time.ParseDuration(v) + if err != nil { + return err + } + *d = duration(tmp) + return nil + default: + return fmt.Errorf("invalid duration: %v", v) + } +} diff --git a/http/duration_test.go b/http/duration_test.go new file mode 100644 index 0000000..97e8bb8 --- /dev/null +++ b/http/duration_test.go @@ -0,0 +1,30 @@ +package http + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestDuration(t *testing.T) { + d := duration(time.Minute) + b, err := d.MarshalJSON() + require.NoError(t, err) + require.Equal(t, "\"1m0s\"", string(b)) + + var d2 duration + err = d2.UnmarshalJSON(b) + require.NoError(t, err) + require.Equal(t, time.Minute, time.Duration(d2)) + + var d3 duration + err = d3.UnmarshalJSON([]byte("1")) + require.Error(t, err) + require.Equal(t, "invalid duration: 1", err.Error()) + + var d4 duration + err = d4.UnmarshalJSON([]byte("\"bad\"")) + require.Error(t, err) + require.Equal(t, "time: invalid duration \"bad\"", err.Error()) +} diff --git a/http/server.go b/http/server.go new file mode 100644 index 0000000..6cf024d --- /dev/null +++ b/http/server.go @@ -0,0 +1,241 @@ +package http + +import ( + "encoding/json" + "net/http" + "time" + + "github.com/taylorchu/work" +) + +// ServerOptions specifies how http server can manage work queues. +type ServerOptions struct { + Queue work.Queue +} + +func (opts *ServerOptions) deleteJob(rw http.ResponseWriter, r *http.Request) { + queue, ok := opts.Queue.(interface { + work.Queue + work.BulkJobFinder + }) + if !ok { + rw.WriteHeader(http.StatusNotFound) + return + } + namespace := r.URL.Query().Get("namespace") + queueID := r.URL.Query().Get("queue_id") + jobID := r.URL.Query().Get("job_id") + + job, err := func() (*work.Job, error) { + jobs, err := queue.BulkFind([]string{jobID}, &work.FindOptions{ + Namespace: namespace, + }) + if err != nil { + return nil, err + } + if len(jobs) == 1 && jobs[0] != nil { + err := queue.Ack(jobs[0], &work.AckOptions{ + Namespace: namespace, + QueueID: queueID, + }) + if err != nil { + return nil, err + } + return jobs[0], nil + } + return &work.Job{ + ID: jobID, + }, nil + }() + if err != nil { + rw.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(rw).Encode(struct { + Error string `json:"error"` + }{ + Error: err.Error(), + }) + return + } + json.NewEncoder(rw).Encode(struct { + Namespace string `json:"namespace"` + QueueID string `json:"queue_id"` + Job *work.Job `json:"job"` + }{ + Namespace: namespace, + QueueID: queueID, + Job: job, + }) +} + +func (opts *ServerOptions) getJob(rw http.ResponseWriter, r *http.Request) { + queue, ok := opts.Queue.(interface { + work.Queue + work.BulkJobFinder + }) + if !ok { + rw.WriteHeader(http.StatusNotFound) + return + } + namespace := r.URL.Query().Get("namespace") + jobID := r.URL.Query().Get("job_id") + + job, err := func() (*work.Job, error) { + jobs, err := queue.BulkFind([]string{jobID}, &work.FindOptions{ + Namespace: namespace, + }) + if err != nil { + return nil, err + } + if len(jobs) == 1 && jobs[0] != nil { + return jobs[0], nil + } + return &work.Job{ + ID: jobID, + }, nil + }() + if err != nil { + rw.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(rw).Encode(struct { + Error string `json:"error"` + }{ + Error: err.Error(), + }) + return + } + json.NewEncoder(rw).Encode(struct { + Namespace string `json:"namespace"` + Status string `json:"status"` + Job *work.Job `json:"job"` + }{ + Namespace: namespace, + Status: jobStatus(job), + Job: job, + }) +} + +func (opts *ServerOptions) createJob(rw http.ResponseWriter, r *http.Request) { + namespace := r.URL.Query().Get("namespace") + queueID := r.URL.Query().Get("queue_id") + + job, err := func() (*work.Job, error) { + var enqueueRequest struct { + ID string `json:"id"` + Payload json.RawMessage `json:"payload"` + Delay duration `json:"delay"` + } + err := json.NewDecoder(r.Body).Decode(&enqueueRequest) + if err != nil { + return nil, err + } + job := work.NewJob().Delay(time.Duration(enqueueRequest.Delay)) + if enqueueRequest.ID != "" { + job.ID = enqueueRequest.ID + } + job.Payload = enqueueRequest.Payload + err = opts.Queue.Enqueue(job, &work.EnqueueOptions{ + Namespace: namespace, + QueueID: queueID, + }) + if err != nil { + return nil, err + } + return job, nil + }() + if err != nil { + rw.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(rw).Encode(struct { + Error string `json:"error"` + }{ + Error: err.Error(), + }) + return + } + json.NewEncoder(rw).Encode(struct { + Namespace string `json:"namespace"` + QueueID string `json:"queue_id"` + Job *work.Job `json:"job"` + }{ + Namespace: namespace, + QueueID: queueID, + Job: job, + }) +} + +func (opts *ServerOptions) getMetrics(rw http.ResponseWriter, r *http.Request) { + queue, ok := opts.Queue.(interface { + work.Queue + work.MetricsExporter + }) + if !ok { + rw.WriteHeader(http.StatusNotFound) + return + } + namespace := r.URL.Query().Get("namespace") + queueID := r.URL.Query().Get("queue_id") + + metrics, err := queue.GetQueueMetrics(&work.QueueMetricsOptions{ + Namespace: namespace, + QueueID: queueID, + At: time.Now(), + }) + if err != nil { + rw.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(rw).Encode(struct { + Error string `json:"error"` + }{ + Error: err.Error(), + }) + return + } + json.NewEncoder(rw).Encode(struct { + Namespace string `json:"namespace"` + QueueID string `json:"queue_id"` + ReadyTotal int64 `json:"ready_total"` + ScheduledTotal int64 `json:"scheduled_total"` + Total int64 `json:"total"` + Latency time.Duration `json:"latency"` + }{ + Namespace: metrics.Namespace, + QueueID: metrics.QueueID, + ReadyTotal: metrics.ReadyTotal, + ScheduledTotal: metrics.ScheduledTotal, + Total: metrics.ReadyTotal + metrics.ScheduledTotal, + Latency: metrics.Latency, + }) +} + +// NewServer creates new http server that manages work queues. +func NewServer(opts *ServerOptions) http.Handler { + m := http.NewServeMux() + m.HandleFunc("/jobs", func(rw http.ResponseWriter, r *http.Request) { + switch r.Method { + case "DELETE": + opts.deleteJob(rw, r) + case "GET": + opts.getJob(rw, r) + case "POST": + opts.createJob(rw, r) + default: + rw.WriteHeader(http.StatusNotFound) + } + }) + m.HandleFunc("/metrics", func(rw http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + opts.getMetrics(rw, r) + default: + rw.WriteHeader(http.StatusNotFound) + } + }) + return m +} + +func jobStatus(job *work.Job) string { + if job.EnqueuedAt.IsZero() { + return "completed" + } + if job.EnqueuedAt.After(time.Now()) { + return "scheduled" + } + return "ready" +} diff --git a/http/server_test.go b/http/server_test.go new file mode 100644 index 0000000..f8fe17e --- /dev/null +++ b/http/server_test.go @@ -0,0 +1,207 @@ +package http + +import ( + "io" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/taylorchu/work" + "github.com/taylorchu/work/redistest" +) + +func TestServer(t *testing.T) { + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client)) + q := work.NewRedisQueue(client) + + srv := NewServer(&ServerOptions{ + Queue: q, + }) + + for _, test := range []struct { + reqMethod string + reqURL string + reqBody string + respCode int + respBody string + }{ + { + // bad route + reqMethod: "GET", + reqURL: "http://example.com/xxx", + respCode: 404, + respBody: "404 page not found\n", + }, + { + // bad method + reqMethod: "PUT", + reqURL: "http://example.com/jobs", + respCode: 404, + respBody: "", + }, + { + // bad method + reqMethod: "PUT", + reqURL: "http://example.com/metrics", + respCode: 404, + respBody: "", + }, + { + // bad url query + reqMethod: "GET", + reqURL: "http://example.com/jobs", + respCode: 500, + respBody: "{\"error\":\"work: empty namespace\"}\n", + }, + { + // bad url query + reqMethod: "DELETE", + reqURL: "http://example.com/jobs", + respCode: 500, + respBody: "{\"error\":\"work: empty namespace\"}\n", + }, + { + // bad body + reqMethod: "POST", + reqURL: "http://example.com/jobs", + respCode: 500, + respBody: "{\"error\":\"EOF\"}\n", + }, + { + // bad url query + reqMethod: "POST", + reqURL: "http://example.com/jobs", + reqBody: "{}", + respCode: 500, + respBody: "{\"error\":\"work: empty namespace\"}\n", + }, + { + // bad url query + reqMethod: "GET", + reqURL: "http://example.com/metrics", + respCode: 500, + respBody: "{\"error\":\"work: empty namespace\"}\n", + }, + { + reqMethod: "DELETE", + reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=xxx", + respCode: 200, + respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n", + }, + { + reqMethod: "GET", + reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=xxx", + respCode: 200, + respBody: "{\"namespace\":\"{ns1}\",\"status\":\"completed\",\"job\":{\"ID\":\"xxx\",\"CreatedAt\":\"0001-01-01T00:00:00Z\",\"UpdatedAt\":\"0001-01-01T00:00:00Z\",\"EnqueuedAt\":\"0001-01-01T00:00:00Z\",\"Payload\":null,\"Retries\":0,\"LastError\":\"\"}}\n", + }, + { + // bad duration + reqMethod: "POST", + reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&queue_id=q1", + reqBody: `{ + "payload": "payload1", + "delay": 1 + }`, + respCode: 500, + respBody: "{\"error\":\"invalid duration: 1\"}\n", + }, + { + // bad payload + reqMethod: "POST", + reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&queue_id=q1", + reqBody: `{`, + respCode: 500, + respBody: "{\"error\":\"unexpected EOF\"}\n", + }, + { + reqMethod: "POST", + reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&queue_id=q1", + reqBody: `{ + "payload": "payload1", + "delay": "10s" + }`, + respCode: 200, + }, + { + reqMethod: "GET", + reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1", + respCode: 200, + respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":0,\"scheduled_total\":1,\"total\":1,\"latency\":0}\n", + }, + { + reqMethod: "POST", + reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&queue_id=q1", + reqBody: `{ + "payload": "payload1" + }`, + respCode: 200, + }, + { + reqMethod: "GET", + reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1", + respCode: 200, + respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n", + }, + { + reqMethod: "POST", + reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&queue_id=q1", + reqBody: `{ + "id": "id1", + "payload": "payload1" + }`, + respCode: 200, + }, + { + reqMethod: "GET", + reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1", + respCode: 200, + respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":2,\"scheduled_total\":1,\"total\":3,\"latency\":[0-9]+}\n", + }, + { + reqMethod: "GET", + reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=id1", + respCode: 200, + }, + { + reqMethod: "DELETE", + reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&job_id=id1", + respCode: 500, + respBody: "{\"error\":\"work: empty queue id\"}\n", + }, + { + reqMethod: "DELETE", + reqURL: "http://example.com/jobs?namespace=%7Bns1%7D&queue_id=q1&job_id=id1", + respCode: 200, + }, + { + reqMethod: "GET", + reqURL: "http://example.com/metrics?namespace=%7Bns1%7D&queue_id=q1", + respCode: 200, + respBody: "{\"namespace\":\"{ns1}\",\"queue_id\":\"q1\",\"ready_total\":1,\"scheduled_total\":1,\"total\":2,\"latency\":[0-9]+}\n", + }, + } { + var reqBody io.Reader + if test.reqBody != "" { + reqBody = strings.NewReader(test.reqBody) + } + req := httptest.NewRequest(test.reqMethod, test.reqURL, reqBody) + w := httptest.NewRecorder() + srv.ServeHTTP(w, req) + if !(test.respBody == "" && test.respCode == 200) { + require.Regexp(t, test.respBody, w.Body.String()) + } else { + t.Logf("response: %q", w.Body.String()) + } + require.Equal(t, test.respCode, w.Code) + } +} + +func TestJobStatus(t *testing.T) { + require.Equal(t, "completed", jobStatus(&work.Job{})) + require.Equal(t, "ready", jobStatus(work.NewJob())) + require.Equal(t, "scheduled", jobStatus(work.NewJob().Delay(10*time.Second))) +} diff --git a/job.go b/job.go index 4fd6c32..be4e221 100644 --- a/job.go +++ b/job.go @@ -99,15 +99,6 @@ func (j Job) Delay(d time.Duration) *Job { return &j } -// WithPayload adds payload to the job. -func (j Job) WithPayload(v interface{}) (*Job, error) { - err := j.MarshalPayload(v) - if err != nil { - return nil, err - } - return &j, nil -} - // options validation errors var ( ErrEmptyNamespace = errors.New("work: empty namespace") diff --git a/metrics.go b/metrics.go index 557f673..04f68aa 100644 --- a/metrics.go +++ b/metrics.go @@ -10,6 +10,8 @@ type QueueMetrics struct { ReadyTotal int64 // Total number of jobs that are scheduled to run in future. ScheduledTotal int64 + // Processing delay from oldest ready job + Latency time.Duration } // MetricsExporter can be implemented by Queue to report metrics. diff --git a/middleware/concurrent/dequeuer.go b/middleware/concurrent/dequeuer.go index 4b3d8f6..3da45bd 100644 --- a/middleware/concurrent/dequeuer.go +++ b/middleware/concurrent/dequeuer.go @@ -19,32 +19,65 @@ type DequeuerOptions struct { } // Dequeuer limits running job count from a queue. -func Dequeuer(copt *DequeuerOptions) work.DequeueMiddleware { +func Dequeuer(copt *DequeuerOptions) (work.DequeueMiddleware, work.HandleMiddleware) { + workerID := copt.workerID + if workerID == "" { + workerID = uuid.NewString() + } + redisKey := func(namespace, queueID string) string { + return fmt.Sprintf("%s:lock:%s", namespace, queueID) + } return func(f work.DequeueFunc) work.DequeueFunc { - workerID := copt.workerID - if workerID == "" { - workerID = uuid.NewString() - } - return func(opt *work.DequeueOptions) (*work.Job, error) { - lock := &redislock.Lock{ - Client: copt.Client, - Key: fmt.Sprintf("%s:lock:%s", opt.Namespace, opt.QueueID), - ID: workerID, - At: opt.At, - ExpireInSec: opt.InvisibleSec, - MaxAcquirers: copt.Max, + return func(opt *work.DequeueOptions) (*work.Job, error) { + lock := &redislock.Lock{ + Client: copt.Client, + Key: redisKey(opt.Namespace, opt.QueueID), + ID: workerID, + At: opt.At, + ExpireInSec: opt.InvisibleSec, + MaxAcquirers: copt.Max, + } + acquired, err := lock.Acquire() + if err != nil { + return nil, err + } + if !acquired { + return nil, work.ErrEmptyQueue + } + unlock := false + defer func() { + if copt.disableUnlock { + return + } + if !unlock { + return + } + lock.Release() + }() + job, err := f(opt) + if err != nil { + unlock = true + return nil, err + } + return job, nil } - acquired, err := lock.Acquire() - if err != nil { - return nil, err + }, func(f work.HandleFunc) work.HandleFunc { + return func(job *work.Job, opt *work.DequeueOptions) error { + lock := &redislock.Lock{ + Client: copt.Client, + Key: redisKey(opt.Namespace, opt.QueueID), + ID: workerID, + At: opt.At, + ExpireInSec: opt.InvisibleSec, + MaxAcquirers: copt.Max, + } + defer func() { + if copt.disableUnlock { + return + } + lock.Release() + }() + return f(job, opt) } - if !acquired { - return nil, work.ErrEmptyQueue - } - if !copt.disableUnlock { - defer lock.Release() - } - return f(opt) } - } } diff --git a/middleware/concurrent/dequeuer_test.go b/middleware/concurrent/dequeuer_test.go index 29034b4..f326e5e 100644 --- a/middleware/concurrent/dequeuer_test.go +++ b/middleware/concurrent/dequeuer_test.go @@ -17,26 +17,40 @@ func TestDequeuer(t *testing.T) { defer client.Close() require.NoError(t, redistest.Reset(client)) + var called int + h1 := func(*work.DequeueOptions) (*work.Job, error) { + called++ + return work.NewJob(), nil + } + h2 := func(*work.Job, *work.DequeueOptions) error { + return nil + } + runJob := func(m1 work.DequeueMiddleware, m2 work.HandleMiddleware, opt *work.DequeueOptions) error { + job, err := m1(h1)(opt) + if err != nil { + return err + } + err = m2(h2)(job, opt) + if err != nil { + return err + } + return nil + } + opt := &work.DequeueOptions{ Namespace: "{ns1}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, } - var called int - h := func(*work.DequeueOptions) (*work.Job, error) { - called++ - return work.NewJob(), nil - } for i := 0; i < 3; i++ { - deq := Dequeuer(&DequeuerOptions{ + m1, m2 := Dequeuer(&DequeuerOptions{ Client: client, Max: 2, workerID: fmt.Sprintf("w%d", i), }) - _, err := deq(h)(opt) - + err := runJob(m1, m2, opt) require.NoError(t, err) } require.Equal(t, 3, called) @@ -44,14 +58,13 @@ func TestDequeuer(t *testing.T) { // worker 0, 1 get the lock // worker 2 is locked for i := 0; i < 3; i++ { - deq := Dequeuer(&DequeuerOptions{ + m1, m2 := Dequeuer(&DequeuerOptions{ Client: client, Max: 2, workerID: fmt.Sprintf("w%d", i), disableUnlock: true, }) - _, err := deq(h)(opt) - + err := runJob(m1, m2, opt) if i <= 1 { require.NoError(t, err) } else { @@ -79,13 +92,13 @@ func TestDequeuer(t *testing.T) { optLater.At = opt.At.Add(10 * time.Second) // worker 0 is locked already for i := 0; i < 3; i++ { - deq := Dequeuer(&DequeuerOptions{ + m1, m2 := Dequeuer(&DequeuerOptions{ Client: client, Max: 2, workerID: "w0", disableUnlock: true, }) - _, err := deq(h)(&optLater) + err := runJob(m1, m2, &optLater) require.Equal(t, work.ErrEmptyQueue, err) } require.Equal(t, 5, called) @@ -107,13 +120,13 @@ func TestDequeuer(t *testing.T) { optExpired := *opt optExpired.At = opt.At.Add(60 * time.Second) for i := 3; i < 6; i++ { - deq := Dequeuer(&DequeuerOptions{ + m1, m2 := Dequeuer(&DequeuerOptions{ Client: client, Max: 2, workerID: fmt.Sprintf("w%d", i), disableUnlock: true, }) - _, err := deq(h)(&optExpired) + err := runJob(m1, m2, &optExpired) if i < 5 { require.NoError(t, err) } else { @@ -144,25 +157,40 @@ func BenchmarkConcurrency(b *testing.B) { defer client.Close() require.NoError(b, redistest.Reset(client)) + var called int + h1 := func(*work.DequeueOptions) (*work.Job, error) { + called++ + return work.NewJob(), nil + } + h2 := func(*work.Job, *work.DequeueOptions) error { + return nil + } + runJob := func(m1 work.DequeueMiddleware, m2 work.HandleMiddleware, opt *work.DequeueOptions) error { + job, err := m1(h1)(opt) + if err != nil { + return err + } + err = m2(h2)(job, opt) + if err != nil { + return err + } + return nil + } + opt := &work.DequeueOptions{ Namespace: "{ns1}", QueueID: "q1", At: time.Now(), InvisibleSec: 60, } - deq := Dequeuer(&DequeuerOptions{ + + m1, m2 := Dequeuer(&DequeuerOptions{ Client: client, Max: 1, }) - var called int - h := deq(func(*work.DequeueOptions) (*work.Job, error) { - called++ - return work.NewJob(), nil - }) - b.StartTimer() for n := 0; n < b.N; n++ { - h(opt) + runJob(m1, m2, opt) } b.StopTimer() require.Equal(b, b.N, called) diff --git a/middleware/concurrent/local_dequeuer.go b/middleware/concurrent/local_dequeuer.go new file mode 100644 index 0000000..2fbf1f3 --- /dev/null +++ b/middleware/concurrent/local_dequeuer.go @@ -0,0 +1,56 @@ +package concurrent + +import ( + "github.com/taylorchu/work" +) + +// LocalDequeuerOptions defines how many jobs on the same process can be running at the same time. +type LocalDequeuerOptions struct { + Max int64 + + disableUnlock bool // for testing +} + +// LocalDequeuer limits running job count from the same process. +func LocalDequeuer(copt *LocalDequeuerOptions) (work.DequeueMiddleware, work.HandleMiddleware) { + ch := make(chan struct{}, copt.Max) + return func(f work.DequeueFunc) work.DequeueFunc { + return func(opt *work.DequeueOptions) (*work.Job, error) { + acquired := false + select { + case ch <- struct{}{}: + acquired = true + default: + } + if !acquired { + return nil, work.ErrEmptyQueue + } + unlock := false + defer func() { + if copt.disableUnlock { + return + } + if !unlock { + return + } + <-ch + }() + job, err := f(opt) + if err != nil { + unlock = true + return nil, err + } + return job, nil + } + }, func(f work.HandleFunc) work.HandleFunc { + return func(job *work.Job, opt *work.DequeueOptions) error { + defer func() { + if copt.disableUnlock { + return + } + <-ch + }() + return f(job, opt) + } + } +} diff --git a/middleware/concurrent/local_dequeuer_test.go b/middleware/concurrent/local_dequeuer_test.go new file mode 100644 index 0000000..b3971a0 --- /dev/null +++ b/middleware/concurrent/local_dequeuer_test.go @@ -0,0 +1,63 @@ +package concurrent + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/taylorchu/work" +) + +func TestLocalDequeuer(t *testing.T) { + var called int + h1 := func(*work.DequeueOptions) (*work.Job, error) { + called++ + return work.NewJob(), nil + } + h2 := func(*work.Job, *work.DequeueOptions) error { + return nil + } + runJob := func(m1 work.DequeueMiddleware, m2 work.HandleMiddleware, opt *work.DequeueOptions) error { + job, err := m1(h1)(opt) + if err != nil { + return err + } + err = m2(h2)(job, opt) + if err != nil { + return err + } + return nil + } + + opt := &work.DequeueOptions{ + Namespace: "{ns1}", + QueueID: "q1", + At: time.Now(), + InvisibleSec: 60, + } + + m1, m2 := LocalDequeuer(&LocalDequeuerOptions{ + Max: 2, + }) + for i := 0; i < 3; i++ { + err := runJob(m1, m2, opt) + require.NoError(t, err) + } + require.Equal(t, 3, called) + + // worker 0, 1 get the lock + // worker 2 is locked + m1, m2 = LocalDequeuer(&LocalDequeuerOptions{ + Max: 2, + disableUnlock: true, + }) + for i := 0; i < 3; i++ { + err := runJob(m1, m2, opt) + if i <= 1 { + require.NoError(t, err) + } else { + require.Equal(t, work.ErrEmptyQueue, err) + } + } + require.Equal(t, 5, called) +} diff --git a/middleware/prometheus/metrics.go b/middleware/prometheus/metrics.go index e5bbdb5..adade2b 100644 --- a/middleware/prometheus/metrics.go +++ b/middleware/prometheus/metrics.go @@ -56,6 +56,14 @@ var ( }, []string{"namespace", "queue"}, ) + jobLatencySeconds = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "work", + Name: "job_latency_seconds", + Help: "Processing delay from oldest ready job", + }, + []string{"namespace", "queue"}, + ) ) func init() { @@ -67,6 +75,7 @@ func init() { prometheus.MustRegister(jobReady) prometheus.MustRegister(jobScheduled) + prometheus.MustRegister(jobLatencySeconds) } // HandleFuncMetrics adds prometheus metrics like executed job count. @@ -99,6 +108,14 @@ func EnqueueFuncMetrics(f work.EnqueueFunc) work.EnqueueFunc { } } +// ExportQueueMetrics adds prometheus metrics from work.QueueMetrics directly. +func ExportQueueMetrics(m *work.QueueMetrics) error { + jobReady.WithLabelValues(m.Namespace, m.QueueID).Set(float64(m.ReadyTotal)) + jobScheduled.WithLabelValues(m.Namespace, m.QueueID).Set(float64(m.ScheduledTotal)) + jobLatencySeconds.WithLabelValues(m.Namespace, m.QueueID).Observe(m.Latency.Seconds()) + return nil +} + // ExportWorkerMetrics adds prometheus metrics from Worker. func ExportWorkerMetrics(w *work.Worker) error { all, err := w.ExportMetrics() @@ -106,8 +123,10 @@ func ExportWorkerMetrics(w *work.Worker) error { return err } for _, m := range all.Queue { - jobReady.WithLabelValues(m.Namespace, m.QueueID).Set(float64(m.ReadyTotal)) - jobScheduled.WithLabelValues(m.Namespace, m.QueueID).Set(float64(m.ScheduledTotal)) + err := ExportQueueMetrics(m) + if err != nil { + return err + } } return nil } diff --git a/middleware/prometheus/metrics_test.go b/middleware/prometheus/metrics_test.go index fda8249..4348028 100644 --- a/middleware/prometheus/metrics_test.go +++ b/middleware/prometheus/metrics_test.go @@ -4,10 +4,12 @@ import ( "errors" "net/http/httptest" "testing" + "time" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/stretchr/testify/require" "github.com/taylorchu/work" + "github.com/taylorchu/work/redistest" ) func TestHandleFuncMetrics(t *testing.T) { @@ -69,3 +71,37 @@ func TestEnqueueFuncMetrics(t *testing.T) { require.Contains(t, r.Body.String(), m) } } + +func TestExportWorkerMetrics(t *testing.T) { + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client)) + + w := work.NewWorker(&work.WorkerOptions{ + Namespace: "{ns1}", + Queue: work.NewRedisQueue(client), + }) + err := w.Register("test", + func(*work.Job, *work.DequeueOptions) error { return nil }, + &work.JobOptions{ + MaxExecutionTime: time.Second, + IdleWait: time.Second, + NumGoroutines: 2, + }, + ) + require.NoError(t, err) + + err = ExportWorkerMetrics(w) + require.NoError(t, err) + + r := httptest.NewRecorder() + promhttp.Handler().ServeHTTP(r, httptest.NewRequest("GET", "/metrics", nil)) + + for _, m := range []string{ + `job_ready{`, + `job_scheduled{`, + `job_latency_seconds_bucket{`, + } { + require.Contains(t, r.Body.String(), m) + } +} diff --git a/middleware/recovery/catch_panic.go b/middleware/recovery/catch_panic.go index c1722a1..dc6d2c9 100644 --- a/middleware/recovery/catch_panic.go +++ b/middleware/recovery/catch_panic.go @@ -23,3 +23,5 @@ func CatchPanic(f work.HandleFunc) work.HandleFunc { return f(job, opt) } } + +var _ work.HandleMiddleware = CatchPanic diff --git a/middleware/unique/enqueuer.go b/middleware/unique/enqueuer.go index 3ab8354..74fd56b 100644 --- a/middleware/unique/enqueuer.go +++ b/middleware/unique/enqueuer.go @@ -17,7 +17,7 @@ type Func func(*work.Job, *work.EnqueueOptions) ([]byte, time.Duration, error) // EnqueuerOptions defines job unique key generation. type EnqueuerOptions struct { Client redis.UniversalClient - // If returned []byte is nil, uniqness check is bypassed. + // If returned []byte is nil, uniqueness check is bypassed. // Returned time.Duration controls how long the unique key exists. UniqueFunc Func } diff --git a/redis_queue.go b/redis_queue.go index 5f45791..4e7f2cc 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/go-redis/redis/v8" ) @@ -281,10 +282,23 @@ func (q *redisQueue) GetQueueMetrics(opt *QueueMetricsOptions) (*QueueMetrics, e if err != nil { return nil, err } + z, err := q.client.ZRangeByScoreWithScores(context.Background(), queueKey, &redis.ZRangeBy{ + Min: "-inf", + Max: now, + Count: 1, + }).Result() + if err != nil { + return nil, err + } + var latency time.Duration + if len(z) > 0 { + latency = time.Since(time.Unix(int64(z[0].Score), 0)) + } return &QueueMetrics{ Namespace: opt.Namespace, QueueID: opt.QueueID, ReadyTotal: readyTotal, ScheduledTotal: scheduledTotal, + Latency: latency, }, nil } diff --git a/redis_queue_test.go b/redis_queue_test.go index b3d8cda..c5d9ff7 100644 --- a/redis_queue_test.go +++ b/redis_queue_test.go @@ -313,6 +313,7 @@ func TestRedisQueueGetQueueMetrics(t *testing.T) { require.Equal(t, "q1", m.QueueID) require.EqualValues(t, 1, m.ReadyTotal) require.EqualValues(t, 0, m.ScheduledTotal) + require.True(t, 0 < m.Latency && m.Latency < time.Minute) m, err = q.GetQueueMetrics(&QueueMetricsOptions{ Namespace: "{ns1}", @@ -324,4 +325,5 @@ func TestRedisQueueGetQueueMetrics(t *testing.T) { require.Equal(t, "q1", m.QueueID) require.EqualValues(t, 0, m.ReadyTotal) require.EqualValues(t, 1, m.ScheduledTotal) + require.True(t, m.Latency == 0) } diff --git a/sidekiq/queue_test.go b/sidekiq/queue_test.go index 7c6b694..2ca00da 100644 --- a/sidekiq/queue_test.go +++ b/sidekiq/queue_test.go @@ -406,6 +406,7 @@ func TestSidekiqQueueGetQueueMetrics(t *testing.T) { require.Equal(t, "low/q1", m.QueueID) require.EqualValues(t, 1, m.ReadyTotal) require.EqualValues(t, 0, m.ScheduledTotal) + require.True(t, 0 < m.Latency && m.Latency < time.Minute) m, err = q.GetQueueMetrics(&work.QueueMetricsOptions{ Namespace: "{ns1}", @@ -417,6 +418,7 @@ func TestSidekiqQueueGetQueueMetrics(t *testing.T) { require.Equal(t, "low/q1", m.QueueID) require.EqualValues(t, 0, m.ReadyTotal) require.EqualValues(t, 1, m.ScheduledTotal) + require.True(t, m.Latency == 0) } func TestSidekiqQueueEnqueueDuplicated(t *testing.T) { diff --git a/worker.go b/worker.go index adfeaf2..6cf3906 100644 --- a/worker.go +++ b/worker.go @@ -76,18 +76,6 @@ type JobOptions struct { HandleMiddleware []HandleMiddleware } -// AddDequeueMiddleware adds DequeueMiddleware. -func (opt *JobOptions) AddDequeueMiddleware(mw DequeueMiddleware) *JobOptions { - opt.DequeueMiddleware = append(opt.DequeueMiddleware, mw) - return opt -} - -// AddHandleMiddleware adds HandleMiddleware. -func (opt *JobOptions) AddHandleMiddleware(mw HandleMiddleware) *JobOptions { - opt.HandleMiddleware = append(opt.HandleMiddleware, mw) - return opt -} - // options validation error var ( ErrMaxExecutionTime = errors.New("work: max execution time should be > 0") @@ -126,12 +114,8 @@ func (w *Worker) RegisterWithContext(queueID string, h ContextHandleFunc, opt *J return err } w.handlerMap[queueID] = handler{ - QueueID: queueID, - HandleFunc: func(ctx context.Context, job *Job, o *DequeueOptions) error { - ctx, cancel := context.WithTimeout(ctx, opt.MaxExecutionTime) - defer cancel() - return h(ctx, job, o) - }, + QueueID: queueID, + HandleFunc: h, JobOptions: *opt, } return nil @@ -146,18 +130,6 @@ type OnceJobOptions struct { HandleMiddleware []HandleMiddleware } -// AddDequeueMiddleware adds DequeueMiddleware. -func (opt *OnceJobOptions) AddDequeueMiddleware(mw DequeueMiddleware) *OnceJobOptions { - opt.DequeueMiddleware = append(opt.DequeueMiddleware, mw) - return opt -} - -// AddHandleMiddleware adds HandleMiddleware. -func (opt *OnceJobOptions) AddHandleMiddleware(mw HandleMiddleware) *OnceJobOptions { - opt.HandleMiddleware = append(opt.HandleMiddleware, mw) - return opt -} - // Validate validates OnceJobOptions. func (opt *OnceJobOptions) Validate() error { if opt.MaxExecutionTime <= 0 { @@ -231,151 +203,52 @@ func (w *Worker) Start() { for _, h := range w.handlerMap { for i := int64(0); i < h.JobOptions.NumGoroutines; i++ { w.wg.Add(1) - go w.start(ctx, h) + go func(h handler) { + defer w.wg.Done() + + w.start(ctx, h) + }(h) } } } func (w *Worker) start(ctx context.Context, h handler) { - defer w.wg.Done() - - queue := w.opt.Queue - ns := w.opt.Namespace - // print errors by default so that problems are noticeable. errFunc := func(err error) { fmt.Println(err) } if w.opt.ErrorFunc != nil { errFunc = w.opt.ErrorFunc } - dequeue := getDequeueFunc(queue) - for _, mw := range h.JobOptions.DequeueMiddleware { - dequeue = mw(dequeue) - } - dequeue = idleWait(ctx, h.JobOptions.IdleWait)(dequeue) + var dequeueMiddleware []DequeueMiddleware + dequeueMiddleware = append(dequeueMiddleware, h.JobOptions.DequeueMiddleware...) + dequeueMiddleware = append(dequeueMiddleware, idleWait(ctx, h.JobOptions.IdleWait)) - handle := func(job *Job, o *DequeueOptions) error { - return h.HandleFunc(ctx, job, o) - } - for _, mw := range h.JobOptions.HandleMiddleware { - handle = mw(handle) - } - handle = catchPanic(handle) + var handleMiddleware []HandleMiddleware + handleMiddleware = append(handleMiddleware, h.JobOptions.HandleMiddleware...) + handleMiddleware = append(handleMiddleware, catchPanic, wrapHandlerError) - b := h.JobOptions.Backoff - if b == nil { - b = defaultBackoff() + opt := &OnceJobOptions{ + MaxExecutionTime: h.JobOptions.MaxExecutionTime, + Backoff: h.JobOptions.Backoff, + DequeueMiddleware: dequeueMiddleware, + HandleMiddleware: handleMiddleware, } - handle = retry(queue, b)(handle) - - // prepare bulk ack flush - var ackJobs []*Job - flush := func() error { - opt := &AckOptions{ - Namespace: ns, - QueueID: h.QueueID, - } - bulkDeq, ok := queue.(BulkDequeuer) - if ok { - err := bulkDeq.BulkAck(ackJobs, opt) - if err != nil { - return err - } - ackJobs = nil - return nil - } - for _, job := range ackJobs { - err := queue.Ack(job, opt) - if err != nil { - return err - } - } - ackJobs = nil - return nil - } - defer func() { - err := flush() - if err != nil { - errFunc(err) - } - }() - - const flushIntv = time.Second - flushTicker := time.NewTicker(flushIntv) - defer flushTicker.Stop() for { select { case <-ctx.Done(): return - case <-flushTicker.C: - err := flush() - if err != nil { - errFunc(err) - } default: - func() error { - opt := &DequeueOptions{ - Namespace: ns, - QueueID: h.QueueID, - At: time.Now(), - InvisibleSec: int64((h.JobOptions.MaxExecutionTime + flushIntv) / time.Second), - } - job, err := dequeue(opt) - if err != nil { - if !errors.Is(err, ErrEmptyQueue) { - errFunc(err) - } - return err - } - err = handle(job, opt) - if err != nil { - return err - } - ackJobs = append(ackJobs, job) - if len(ackJobs) >= 1000 { - // prevent un-acked job count to be too large - err := flush() - if err != nil { - errFunc(err) - return err - } - } - return nil - }() - } - } -} - -func getDequeueFunc(queue Queue) DequeueFunc { - bulkDeq, ok := queue.(BulkDequeuer) - if !ok { - return queue.Dequeue - } - - var jobs []*Job - return func(opt *DequeueOptions) (*Job, error) { - if len(jobs) == 0 { - // this is an optimization to reduce system calls. - // - // there could be an idle period on startup - // because worker previously pulls in too many jobs. - count := 60 / opt.InvisibleSec - if count <= 0 { - count = 1 - } - bulkOpt := *opt - bulkOpt.InvisibleSec *= count - - var err error - jobs, err = bulkDeq.BulkDequeue(count, &bulkOpt) + err := w.RunOnce(ctx, h.QueueID, h.HandleFunc, opt) if err != nil { - return nil, err + var wrappedError *wrappedHandlerError + if errors.As(err, &wrappedError) { + } else if errors.Is(err, ErrEmptyQueue) { + } else { + errFunc(err) + } } } - job := jobs[0] - jobs = jobs[1:] - return job, nil } } @@ -430,6 +303,28 @@ func idleWait(ctx context.Context, d time.Duration) DequeueMiddleware { } } +type wrappedHandlerError struct { + Err error +} + +func (e *wrappedHandlerError) Unwrap() error { + return e.Err +} + +func (e *wrappedHandlerError) Error() string { + return e.Err.Error() +} + +func wrapHandlerError(f HandleFunc) HandleFunc { + return func(job *Job, opt *DequeueOptions) error { + err := f(job, opt) + if err != nil { + return &wrappedHandlerError{Err: err} + } + return nil + } +} + func catchPanic(f HandleFunc) HandleFunc { return func(job *Job, opt *DequeueOptions) (err error) { defer func() { diff --git a/worker_test.go b/worker_test.go index 90d741d..0d3c172 100644 --- a/worker_test.go +++ b/worker_test.go @@ -419,6 +419,12 @@ func TestWorkerRunOnce(t *testing.T) { require.EqualValues(t, 1, count) } +func TestWrappedHandlerError(t *testing.T) { + errInner := errors.New("test") + errOuter := &wrappedHandlerError{Err: errInner} + require.True(t, errors.Is(errOuter, errInner)) +} + func TestRetry(t *testing.T) { client := redistest.NewClient() defer client.Close()