Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
nyergler committed Sep 14, 2021
2 parents 9988670 + 6a41d8f commit f84f9f2
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 35 deletions.
63 changes: 63 additions & 0 deletions middleware/heartbeat/heartbeater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package heartbeat

import (
"context"
"time"

"github.com/taylorchu/work"
)

// HeartbeaterOptions defines Heartbeater periodically refreshes InvisibleSec for long-running jobs.
type HeartbeaterOptions struct {
// Queue is where the current job is currently in.
Queue work.Queue
// After the job is dequeued, no other dequeuer can see this job for a while.
// InvisibleSec controls how long this period is.
InvisibleSec int64
// IntervalSec controls how often job InvisibleSec is refreshed.
IntervalSec int64
}

// Heartbeater refreshes InvisibleSec for long-running jobs periodically.
// As a result, if the job is lost, it won't take a long time before it is retried.
func Heartbeater(hopts *HeartbeaterOptions) work.HandleMiddleware {
return func(f work.HandleFunc) work.HandleFunc {
return func(job *work.Job, opt *work.DequeueOptions) error {
refresh := func() error {
now := time.Now()
copiedJob := *job
copiedJob.UpdatedAt = now
copiedJob.EnqueuedAt = now.Add(time.Duration(hopts.InvisibleSec) * time.Second)
return hopts.Queue.Enqueue(&copiedJob, &work.EnqueueOptions{
Namespace: opt.Namespace,
QueueID: opt.QueueID,
})
}

done := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())

go func() {
defer close(done)

ticker := time.NewTicker(time.Duration(hopts.IntervalSec) * time.Second)
defer ticker.Stop()

refresh()
for {
select {
case <-ticker.C:
refresh()
case <-ctx.Done():
return
}
}
}()

err := f(job, opt)
cancel()
<-done
return err
}
}
}
68 changes: 68 additions & 0 deletions middleware/heartbeat/heartbeater_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package heartbeat

import (
"context"
"testing"
"time"

"github.com/go-redis/redis/v8"
"github.com/stretchr/testify/require"
"github.com/taylorchu/work"
"github.com/taylorchu/work/redistest"
)

func TestHeartbeater(t *testing.T) {
client := redistest.NewClient()
defer client.Close()
require.NoError(t, redistest.Reset(client))

job := work.NewJob()
opt := &work.DequeueOptions{
Namespace: "{ns1}",
QueueID: "q1",
At: time.Now(),
InvisibleSec: 60,
}

hb := Heartbeater(&HeartbeaterOptions{
Queue: work.NewRedisQueue(client),
InvisibleSec: 30,
IntervalSec: 1,
})

h := hb(func(*work.Job, *work.DequeueOptions) error {
return nil
})

err := h(job, opt)
require.NoError(t, err)
require.Equal(t, job.CreatedAt, job.UpdatedAt)
require.Equal(t, job.CreatedAt, job.EnqueuedAt)

z, err := client.ZRangeByScoreWithScores(
context.Background(),
"{ns1}:queue:q1",
&redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
}).Result()
require.NoError(t, err)
require.Len(t, z, 1)
require.EqualValues(t, job.EnqueuedAt.Unix()+30, z[0].Score)

err = h(job, opt)
require.NoError(t, err)
require.Equal(t, job.CreatedAt, job.UpdatedAt)
require.Equal(t, job.CreatedAt, job.EnqueuedAt)

z, err = client.ZRangeByScoreWithScores(
context.Background(),
"{ns1}:queue:q1",
&redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
}).Result()
require.NoError(t, err)
require.Len(t, z, 1)
require.EqualValues(t, job.EnqueuedAt.Unix()+30, z[0].Score)
}
2 changes: 1 addition & 1 deletion middleware/prometheus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func EnqueueFuncMetrics(f work.EnqueueFunc) work.EnqueueFunc {
}
}

// ExportWorkerMetrics adds prometheus metrics from work.Worker.
// ExportWorkerMetrics adds prometheus metrics from Worker.
func ExportWorkerMetrics(w *work.Worker) error {
all, err := w.ExportMetrics()
if err != nil {
Expand Down
18 changes: 10 additions & 8 deletions redis_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,17 @@ func scriptKey(ns, queueID string) []string {
return []string{strings.Join([]string{ns, "queue", queueID}, ":")}
}

// RedisQueue implements Queue with other additional capabilities
type RedisQueue interface {
Queue
BulkEnqueuer
BulkDequeuer
BulkJobFinder
MetricsExporter
}

// NewRedisQueue creates a new queue stored in redis.
func NewRedisQueue(client redis.UniversalClient) Queue {
func NewRedisQueue(client redis.UniversalClient) RedisQueue {
enqueueScript := redis.NewScript(`
local ns = ARGV[1]
local queue_id = ARGV[2]
Expand Down Expand Up @@ -257,13 +266,6 @@ func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error)
return jobs, nil
}

var (
_ MetricsExporter = (*redisQueue)(nil)
_ BulkEnqueuer = (*redisQueue)(nil)
_ BulkDequeuer = (*redisQueue)(nil)
_ BulkJobFinder = (*redisQueue)(nil)
)

func (q *redisQueue) GetQueueMetrics(opt *QueueMetricsOptions) (*QueueMetrics, error) {
err := opt.Validate()
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions redis_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestRedisQueueEnqueue(t *testing.T) {
"msgpack": string(jobm),
}, h)

jobs, err := q.(BulkJobFinder).BulkFind([]string{job.ID, "not-exist-id"}, &FindOptions{
jobs, err := q.BulkFind([]string{job.ID, "not-exist-id"}, &FindOptions{
Namespace: "{ns1}",
})
require.NoError(t, err)
Expand All @@ -56,7 +56,7 @@ func TestRedisQueueEnqueue(t *testing.T) {
QueueID: "q1",
})
require.NoError(t, err)
jobs, err = q.(BulkJobFinder).BulkFind([]string{job.ID}, &FindOptions{
jobs, err = q.BulkFind([]string{job.ID}, &FindOptions{
Namespace: "{ns1}",
})
require.NoError(t, err)
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestRedisQueueGetQueueMetrics(t *testing.T) {
})
require.NoError(t, err)

m, err := q.(MetricsExporter).GetQueueMetrics(&QueueMetricsOptions{
m, err := q.GetQueueMetrics(&QueueMetricsOptions{
Namespace: "{ns1}",
QueueID: "q1",
At: job.EnqueuedAt,
Expand All @@ -314,7 +314,7 @@ func TestRedisQueueGetQueueMetrics(t *testing.T) {
require.EqualValues(t, 1, m.ReadyTotal)
require.EqualValues(t, 0, m.ScheduledTotal)

m, err = q.(MetricsExporter).GetQueueMetrics(&QueueMetricsOptions{
m, err = q.GetQueueMetrics(&QueueMetricsOptions{
Namespace: "{ns1}",
QueueID: "q1",
At: job.EnqueuedAt.Add(-time.Second),
Expand Down
5 changes: 0 additions & 5 deletions sidekiq/external_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,3 @@ func (q *sidekiqQueue) externalBulkEnqueueIn(jobs []*work.Job, opt *work.Enqueue
}
return q.enqueueInScript.Run(context.Background(), q.client, nil, args...).Err()
}

var (
_ work.ExternalEnqueuer = (*sidekiqQueue)(nil)
_ work.ExternalBulkEnqueuer = (*sidekiqQueue)(nil)
)
19 changes: 6 additions & 13 deletions sidekiq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type sidekiqQueue struct {
redisQueue
work.RedisQueue
client redis.UniversalClient
enqueueScript *redis.Script
enqueueInScript *redis.Script
Expand All @@ -26,20 +26,13 @@ var (
ErrInvalidQueueID = errors.New("sidekiq: queue id should have format: SIDEKIQ_QUEUE/SIDEKIQ_CLASS")
)

type redisQueue interface {
work.Queue
work.BulkEnqueuer
work.BulkDequeuer
work.BulkJobFinder
work.MetricsExporter
}

// Queue extends redisQueue, and allows job pulling from sidekiq-compatible queue.
// Queue extends RedisQueue, and allows job pulling from sidekiq-compatible queue.
type Queue interface {
redisQueue
work.RedisQueue
JobPuller
work.ExternalEnqueuer
work.ExternalBulkEnqueuer
schedule(string, time.Time) error
}

// NewQueue creates a new queue stored in redis with sidekiq-compatible format.
Expand Down Expand Up @@ -151,7 +144,7 @@ func NewQueue(client redis.UniversalClient) Queue {
`)

return &sidekiqQueue{
redisQueue: work.NewRedisQueue(client).(redisQueue),
RedisQueue: work.NewRedisQueue(client),
client: client,
enqueueScript: enqueueScript,
enqueueInScript: enqueueInScript,
Expand Down Expand Up @@ -233,7 +226,7 @@ func (q *sidekiqQueue) Pull(opt *PullOptions) error {
if err != nil {
return err
}
err = q.redisQueue.Enqueue(job, &work.EnqueueOptions{
err = q.RedisQueue.Enqueue(job, &work.EnqueueOptions{
Namespace: opt.Namespace,
QueueID: FormatQueueID(sqJob.Queue, sqJob.Class),
})
Expand Down
8 changes: 4 additions & 4 deletions sidekiq/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func TestSidekiqQueueDequeue(t *testing.T) {

now := job.EnqueuedAt.Add(123 * time.Second)

err = q.(*sidekiqQueue).schedule("{ns1}", now)
err = q.schedule("{ns1}", now)
require.NoError(t, err)
err = q.Pull(&PullOptions{
Namespace: "{ns1}",
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestSidekiqQueueDequeue(t *testing.T) {
require.Equal(t, jobKey, z[0].Member)
require.EqualValues(t, job.EnqueuedAt.Unix(), z[0].Score)

err = q.(*sidekiqQueue).schedule("{ns1}", now)
err = q.schedule("{ns1}", now)
require.NoError(t, err)
jobDequeued, err = q.Dequeue(&work.DequeueOptions{
Namespace: "{ns1}",
Expand Down Expand Up @@ -260,7 +260,7 @@ func TestSidekiqQueueDequeue(t *testing.T) {
require.EqualValues(t, now.Unix()+60, z[0].Score)

// empty
err = q.(*sidekiqQueue).schedule("{ns1}", now)
err = q.schedule("{ns1}", now)
require.NoError(t, err)
_, err = q.Dequeue(&work.DequeueOptions{
Namespace: "{ns1}",
Expand Down Expand Up @@ -438,7 +438,7 @@ func TestSidekiqQueueEnqueueDuplicated(t *testing.T) {

now := job.EnqueuedAt

err = q.(*sidekiqQueue).schedule("{ns1}", now)
err = q.schedule("{ns1}", now)
require.NoError(t, err)
err = q.Pull(&PullOptions{
Namespace: "{ns1}",
Expand Down

0 comments on commit f84f9f2

Please sign in to comment.