From 09d5ee75ed1cf480e4a8c2663256a4b12db720da Mon Sep 17 00:00:00 2001 From: Isitha Subasinghe Date: Fri, 10 Jan 2025 10:26:50 +1100 Subject: [PATCH] fix: ensure namespace parallelism and parallelism work together. Fixes #10985 (#14039) Signed-off-by: isubasinghe --- docs/parallelism.md | 2 + workflow/controller/controller.go | 8 +- workflow/controller/controller_test.go | 1 + workflow/sync/chain_throttler.go | 41 ----- workflow/sync/chain_throttler_test.go | 24 --- workflow/sync/multi_throttler.go | 241 +++++++++++++++++++++++++ workflow/sync/multi_throttler_test.go | 204 +++++++++++++++++++++ workflow/sync/throttler.go | 211 ---------------------- workflow/sync/throttler_test.go | 153 ---------------- 9 files changed, 451 insertions(+), 434 deletions(-) delete mode 100644 workflow/sync/chain_throttler.go delete mode 100644 workflow/sync/chain_throttler_test.go create mode 100644 workflow/sync/multi_throttler.go create mode 100644 workflow/sync/multi_throttler_test.go delete mode 100644 workflow/sync/throttler.go delete mode 100644 workflow/sync/throttler_test.go diff --git a/docs/parallelism.md b/docs/parallelism.md index bae14e3c45fe..f94b68fccf14 100644 --- a/docs/parallelism.md +++ b/docs/parallelism.md @@ -18,6 +18,8 @@ data: namespaceParallelism: "4" ``` +When namespace parallelism is enabled, it is plausible for a workflow with a lower priority to be run first if a namespace is at its namespace parallelism limits. + !!! Note Workflows that are executing but restricted from running more nodes due to other mechanisms will still count toward parallelism limits. diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index d396115a582a..fc36c5e4e393 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -242,11 +242,8 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli } func (wfc *WorkflowController) newThrottler() sync.Throttler { - f := func(key string) { wfc.wfQueue.AddRateLimited(key) } - return sync.ChainThrottler{ - sync.NewThrottler(wfc.Config.Parallelism, sync.SingleBucket, f), - sync.NewThrottler(wfc.Config.NamespaceParallelism, sync.NamespaceBucket, f), - } + f := func(key string) { wfc.wfQueue.Add(key) } + return sync.NewMultiThrottler(wfc.Config.Parallelism, wfc.Config.NamespaceParallelism, f) } // runGCcontroller runs the workflow garbage collector controller @@ -482,6 +479,7 @@ func (wfc *WorkflowController) notifySemaphoreConfigUpdate(cm *apiv1.ConfigMap) log.Warnf("received object from indexer %s is not an unstructured", indexes.SemaphoreConfigIndexName) continue } + log.Infof("Adding workflow %s/%s", un.GetNamespace(), un.GetName()) wfc.wfQueue.AddRateLimited(fmt.Sprintf("%s/%s", un.GetNamespace(), un.GetName())) } } diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index d9756765b858..61073963b3c0 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -953,6 +953,7 @@ func TestNotifySemaphoreConfigUpdate(t *testing.T) { for i := 0; i < 3; i++ { key, _ := controller.wfQueue.Get() controller.wfQueue.Done(key) + controller.wfQueue.Forget(key) } assert.Equal(0, controller.wfQueue.Len()) diff --git a/workflow/sync/chain_throttler.go b/workflow/sync/chain_throttler.go deleted file mode 100644 index 3ea22759e238..000000000000 --- a/workflow/sync/chain_throttler.go +++ /dev/null @@ -1,41 +0,0 @@ -package sync - -import ( - "time" - - wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" -) - -type ChainThrottler []Throttler - -func (c ChainThrottler) Init(wfs []wfv1.Workflow) error { - for _, t := range c { - if err := t.Init(wfs); err != nil { - return err - } - } - return nil -} - -func (c ChainThrottler) Add(key Key, priority int32, creationTime time.Time) { - for _, t := range c { - t.Add(key, priority, creationTime) - } -} - -func (c ChainThrottler) Admit(key Key) bool { - for _, t := range c { - if !t.Admit(key) { - return false - } - } - return true -} - -func (c ChainThrottler) Remove(key Key) { - for _, t := range c { - t.Remove(key) - } -} - -var _ Throttler = ChainThrottler{} diff --git a/workflow/sync/chain_throttler_test.go b/workflow/sync/chain_throttler_test.go deleted file mode 100644 index c4ed1868ec26..000000000000 --- a/workflow/sync/chain_throttler_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package sync - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/argoproj/argo-workflows/v3/workflow/sync/mocks" -) - -func TestChainThrottler(t *testing.T) { - m := &mocks.Throttler{} - m.On("Add", "foo", int32(1), time.Time{}).Return() - m.On("Admit", "foo").Return(false) - m.On("Remove", "foo").Return() - - c := ChainThrottler{m} - c.Add("foo", 1, time.Time{}) - assert.False(t, c.Admit("foo")) - c.Remove("foo") - - assert.True(t, ChainThrottler{}.Admit("foo")) -} diff --git a/workflow/sync/multi_throttler.go b/workflow/sync/multi_throttler.go new file mode 100644 index 000000000000..434201e148e8 --- /dev/null +++ b/workflow/sync/multi_throttler.go @@ -0,0 +1,241 @@ +package sync + +import ( + "container/heap" + "sync" + "time" + + "k8s.io/client-go/tools/cache" + + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" +) + +//go:generate mockery --name=Throttler + +// Throttler allows the controller to limit number of items it is processing in parallel. +// Items are processed in priority order, and one processing starts, other items (including higher-priority items) +// will be kept pending until the processing is complete. +// Implementations should be idempotent. +type Throttler interface { + Init(wfs []wfv1.Workflow) error + Add(key Key, priority int32, creationTime time.Time) + // Admit returns if the item should be processed. + Admit(key Key) bool + // Remove notifies throttler that item processing is no longer needed + Remove(key Key) +} + +type Key = string +type QueueFunc func(Key) + +// NewMultiThrottler creates a new multi throttler for throttling both namespace and global parallelism, a parallelism value of zero disables throttling +func NewMultiThrottler(parallelism int, namespaceParallelismLimit int, queue QueueFunc) Throttler { + namespaceParallelism := make(map[string]int) + return &multiThrottler{ + queue: queue, + namespaceParallelism: namespaceParallelism, + namespaceParallelismDefault: namespaceParallelismLimit, + totalParallelism: parallelism, + running: make(map[Key]bool), + pending: make(map[string]*priorityQueue), + lock: &sync.Mutex{}, + } +} + +type multiThrottler struct { + queue QueueFunc + namespaceParallelism map[string]int + namespaceParallelismDefault int + totalParallelism int + running map[Key]bool + pending map[string]*priorityQueue + lock *sync.Mutex +} + +func (m *multiThrottler) Init(wfs []wfv1.Workflow) error { + m.lock.Lock() + defer m.lock.Unlock() + + keys := []Key{} + for _, wf := range wfs { + if wf.Status.Phase != wfv1.WorkflowRunning { + continue + } + key, err := cache.MetaNamespaceKeyFunc(&wf) + if err != nil { + return err + } + keys = append(keys, key) + } + + for _, key := range keys { + m.running[key] = true + } + return nil +} + +func (m *multiThrottler) namespaceCount(namespace string) (int, int) { + setLimit, has := m.namespaceParallelism[namespace] + if !has { + m.namespaceParallelism[namespace] = m.namespaceParallelismDefault + setLimit = m.namespaceParallelismDefault + } + if setLimit == 0 { + // return count is no longer accurate, but preserves behaviour + return 0, 0 + } + count := 0 + for key := range m.running { + ns, _, _ := cache.SplitMetaNamespaceKey(key) + if ns == namespace { + count++ + } + } + return count, setLimit +} + +func (m *multiThrottler) namespaceAllows(namespace string) bool { + count, limit := m.namespaceCount(namespace) + return count < limit || limit == 0 +} + +func (m *multiThrottler) Add(key Key, priority int32, creationTime time.Time) { + m.lock.Lock() + defer m.lock.Unlock() + namespace, _, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return + } + _, ok := m.pending[namespace] + if !ok { + m.pending[namespace] = &priorityQueue{itemByKey: make(map[string]*item)} + } + + m.pending[namespace].add(key, priority, creationTime) + m.queueThrottled() +} + +func (m *multiThrottler) Admit(key Key) bool { + m.lock.Lock() + defer m.lock.Unlock() + + _, ok := m.running[key] + if ok { + return true + } + m.queueThrottled() + return false +} + +func (m *multiThrottler) Remove(key Key) { + m.lock.Lock() + defer m.lock.Unlock() + + namespace, _, _ := cache.SplitMetaNamespaceKey(key) + delete(m.running, key) + m.pending[namespace].remove(key) + m.queueThrottled() +} + +func (m *multiThrottler) queueThrottled() { + if m.totalParallelism != 0 && len(m.running) >= m.totalParallelism { + return + } + + minPq := &priorityQueue{itemByKey: make(map[string]*item)} + + for _, pq := range m.pending { + if len(pq.items) == 0 { + continue + } + currItem := pq.peek() + + namespace, _, err := cache.SplitMetaNamespaceKey(currItem.key) + if err != nil { + return + } + if !m.namespaceAllows(namespace) { + continue + } + + minPq.add(currItem.key, currItem.priority, currItem.creationTime) + } + if len(minPq.items) > 0 { + bestItem := minPq.pop() + bestNamespace, _, _ := cache.SplitMetaNamespaceKey(bestItem.key) + m.pending[bestNamespace].pop() + m.running[bestItem.key] = true + m.queue(bestItem.key) + } +} + +type item struct { + key string + creationTime time.Time + priority int32 + index int +} + +type priorityQueue struct { + items []*item + itemByKey map[string]*item +} + +func (pq *priorityQueue) pop() *item { + return heap.Pop(pq).(*item) +} + +func (pq *priorityQueue) peek() *item { + return pq.items[0] +} + +func (pq *priorityQueue) add(key Key, priority int32, creationTime time.Time) { + if res, ok := pq.itemByKey[key]; ok { + if res.priority != priority { + res.priority = priority + heap.Fix(pq, res.index) + } + } else { + heap.Push(pq, &item{key: key, priority: priority, creationTime: creationTime}) + } +} + +func (pq *priorityQueue) remove(key Key) { + if item, ok := pq.itemByKey[key]; ok { + heap.Remove(pq, item.index) + delete(pq.itemByKey, key) + } +} + +func (pq priorityQueue) Len() int { return len(pq.items) } + +func (pq priorityQueue) Less(i, j int) bool { + if pq.items[i].priority == pq.items[j].priority { + return pq.items[i].creationTime.Before(pq.items[j].creationTime) + } + return pq.items[i].priority > pq.items[j].priority +} + +func (pq priorityQueue) Swap(i, j int) { + pq.items[i], pq.items[j] = pq.items[j], pq.items[i] + pq.items[i].index = i + pq.items[j].index = j +} + +func (pq *priorityQueue) Push(x interface{}) { + n := len(pq.items) + item := x.(*item) + item.index = n + pq.items = append(pq.items, item) + pq.itemByKey[item.key] = item +} + +func (pq *priorityQueue) Pop() interface{} { + old := pq.items + n := len(old) + item := old[n-1] + item.index = -1 + pq.items = old[0 : n-1] + delete(pq.itemByKey, item.key) + return item +} diff --git a/workflow/sync/multi_throttler_test.go b/workflow/sync/multi_throttler_test.go new file mode 100644 index 000000000000..d321c31c0766 --- /dev/null +++ b/workflow/sync/multi_throttler_test.go @@ -0,0 +1,204 @@ +package sync + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + fakewfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake" +) + +func TestMultiNoParallelismSamePriority(t *testing.T) { + throttler := NewMultiThrottler(0, 0, func(Key) {}) + + throttler.Add("default/c", 0, time.Now().Add(2*time.Hour)) + throttler.Add("default/b", 0, time.Now().Add(1*time.Hour)) + throttler.Add("default/a", 0, time.Now()) + + assert.True(t, throttler.Admit("default/a")) + assert.True(t, throttler.Admit("default/b")) + assert.True(t, throttler.Admit("default/c")) +} + +func TestMultiNoParallelismMultipleBuckets(t *testing.T) { + throttler := NewMultiThrottler(1, 1, func(Key) {}) + throttler.Add("a/0", 0, time.Now()) + throttler.Add("a/1", 0, time.Now().Add(-1*time.Second)) + throttler.Add("b/0", 0, time.Now().Add(-2*time.Second)) + throttler.Add("b/1", 0, time.Now().Add(-3*time.Second)) + + assert.True(t, throttler.Admit("a/0")) + assert.False(t, throttler.Admit("a/1")) + assert.False(t, throttler.Admit("b/0")) + assert.False(t, throttler.Admit("b/1")) + throttler.Remove("a/0") + assert.True(t, throttler.Admit("b/1")) +} + +func TestMultiWithParallelismLimitAndPriority(t *testing.T) { + queuedKey := "" + throttler := NewMultiThrottler(2, 0, func(key string) { queuedKey = key }) + + throttler.Add("default/a", 1, time.Now()) + throttler.Add("default/b", 2, time.Now()) + throttler.Add("default/c", 3, time.Now()) + throttler.Add("default/d", 4, time.Now()) + + assert.True(t, throttler.Admit("default/a"), "is started, even though low priority") + assert.True(t, throttler.Admit("default/b"), "is started, even though low priority") + assert.False(t, throttler.Admit("default/c"), "cannot start") + assert.False(t, throttler.Admit("default/d"), "cannot start") + assert.Equal(t, "default/b", queuedKey) + queuedKey = "" + + throttler.Remove("default/a") + assert.True(t, throttler.Admit("default/b"), "stays running") + assert.True(t, throttler.Admit("default/d"), "top priority") + assert.False(t, throttler.Admit("default/c")) + assert.Equal(t, "default/d", queuedKey) + queuedKey = "" + + throttler.Remove("default/b") + assert.True(t, throttler.Admit("default/d"), "top priority") + assert.True(t, throttler.Admit("default/c"), "now running too") + assert.Equal(t, "default/c", queuedKey) +} + +func TestMultiInitWithWorkflows(t *testing.T) { + queuedKey := "" + throttler := NewMultiThrottler(1, 1, func(key string) { queuedKey = key }) + ctx := context.Background() + + wfclientset := fakewfclientset.NewSimpleClientset( + wfv1.MustUnmarshalWorkflow(` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + labels: + workflows.argoproj.io/phase: Running + name: a + namespace: default +spec: + entrypoint: whalesay + templates: + - name: whalesay + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] +status: + phase: Running + startedAt: "2020-06-19T17:37:05Z" +`), + wfv1.MustUnmarshalWorkflow(` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + labels: + workflows.argoproj.io/phase: Running + name: b + namespace: default +spec: + entrypoint: whalesay + templates: + - name: whalesay + container: + image: docker/whalesay:latest + command: [cowsay] + args: ["hello world"] +status: + phase: Running + startedAt: "2020-06-19T17:37:05Z" +`)) + wfList, err := wfclientset.ArgoprojV1alpha1().Workflows("default").List(ctx, metav1.ListOptions{}) + require.NoError(t, err) + err = throttler.Init(wfList.Items) + require.NoError(t, err) + assert.True(t, throttler.Admit("default/a")) + assert.True(t, throttler.Admit("default/b")) + + throttler.Add("default/c", 0, time.Now()) + throttler.Add("default/d", 0, time.Now()) + assert.False(t, throttler.Admit("default/c")) + assert.False(t, throttler.Admit("default/d")) + + throttler.Remove("default/a") + assert.Equal(t, "", queuedKey) + assert.False(t, throttler.Admit("default/c")) + assert.False(t, throttler.Admit("default/d")) + + queuedKey = "" + throttler.Remove("default/b") + assert.Equal(t, "default/c", queuedKey) + assert.True(t, throttler.Admit("default/c")) + assert.False(t, throttler.Admit("default/d")) + + queuedKey = "" + throttler.Remove("default/c") + assert.Equal(t, "default/d", queuedKey) + assert.True(t, throttler.Admit("default/d")) +} + +func TestTotalAllowNamespaceLimit(t *testing.T) { + namespaceLimits := make(map[string]int) + namespaceLimits["a"] = 2 + namespaceLimits["b"] = 1 + throttler := &multiThrottler{ + queue: func(key Key) {}, + namespaceParallelism: namespaceLimits, + namespaceParallelismDefault: 6, + totalParallelism: 4, + running: make(map[Key]bool), + pending: make(map[string]*priorityQueue), + lock: &sync.Mutex{}, + } + throttler.Add("a/0", 1, time.Now()) + throttler.Add("b/0", 2, time.Now()) + throttler.Add("a/1", 3, time.Now()) + throttler.Add("a/2", 4, time.Now()) + throttler.Add("a/3", 5, time.Now()) + throttler.Add("a/4", 6, time.Now()) + throttler.Add("b/1", 7, time.Now()) + + assert.True(t, throttler.Admit("a/0")) + assert.True(t, throttler.Admit("b/0")) + assert.True(t, throttler.Admit("a/1")) + + assert.False(t, throttler.Admit("a/2")) + assert.False(t, throttler.Admit("a/3")) + assert.False(t, throttler.Admit("a/4")) + assert.False(t, throttler.Admit("b/1")) + + throttler.Add("c/0", 8, time.Now()) + assert.True(t, throttler.Admit("c/0")) +} + +func TestPriorityAcrossNamespaces(t *testing.T) { + throttler := NewMultiThrottler(3, 1, func(Key) {}) + throttler.Add("a/0", 0, time.Now()) + throttler.Add("a/1", 0, time.Now()) + throttler.Add("a/2", 0, time.Now()) + throttler.Add("b/0", 1, time.Now()) + throttler.Add("b/1", 1, time.Now()) + throttler.Add("b/2", 1, time.Now()) + + assert.True(t, throttler.Admit("a/0")) + assert.True(t, throttler.Admit("b/0")) + assert.False(t, throttler.Admit("a/1")) + assert.False(t, throttler.Admit("a/2")) + assert.True(t, throttler.Admit("b/0")) + assert.False(t, throttler.Admit("b/1")) + assert.False(t, throttler.Admit("b/2")) + throttler.Remove("a/0") + assert.False(t, throttler.Admit("b/1")) + assert.True(t, throttler.Admit("a/1")) + throttler.Remove("b/0") + assert.True(t, throttler.Admit("b/1")) + assert.False(t, throttler.Admit("a/2")) +} diff --git a/workflow/sync/throttler.go b/workflow/sync/throttler.go deleted file mode 100644 index 674cff254908..000000000000 --- a/workflow/sync/throttler.go +++ /dev/null @@ -1,211 +0,0 @@ -package sync - -import ( - "container/heap" - "sync" - "time" - - "k8s.io/client-go/tools/cache" - - wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" -) - -//go:generate mockery --name=Throttler - -// Throttler allows the controller to limit number of items it is processing in parallel. -// Items are processed in priority order, and one processing starts, other items (including higher-priority items) -// will be kept pending until the processing is complete. -// Implementations should be idempotent. -type Throttler interface { - Init(wfs []wfv1.Workflow) error - Add(key Key, priority int32, creationTime time.Time) - // Admit returns if the item should be processed. - Admit(key Key) bool - // Remove notifies throttler that item processing is no longer needed - Remove(key Key) -} - -type Key = string -type QueueFunc func(Key) - -type BucketKey = string -type BucketFunc func(Key) BucketKey - -var SingleBucket BucketFunc = func(key Key) BucketKey { return "" } -var NamespaceBucket BucketFunc = func(key Key) BucketKey { - namespace, _, _ := cache.SplitMetaNamespaceKey(key) - return namespace -} - -type throttler struct { - queue QueueFunc - bucketFunc BucketFunc - inProgress buckets - pending map[BucketKey]*priorityQueue - lock *sync.Mutex - parallelism int -} - -type bucket map[Key]bool -type buckets map[BucketKey]bucket - -// NewThrottler returns a throttle that only runs `parallelism` items at once. When an item may need processing, -// `queue` is invoked. -func NewThrottler(parallelism int, bucketFunc BucketFunc, queue QueueFunc) Throttler { - return &throttler{ - queue: queue, - bucketFunc: bucketFunc, - inProgress: make(buckets), - pending: make(map[BucketKey]*priorityQueue), - lock: &sync.Mutex{}, - parallelism: parallelism, - } -} - -func (t *throttler) Init(wfs []wfv1.Workflow) error { - t.lock.Lock() - defer t.lock.Unlock() - if t.parallelism == 0 { - return nil - } - - for _, wf := range wfs { - key, err := cache.MetaNamespaceKeyFunc(&wf) - if err != nil { - return err - } - if wf.Status.Phase == wfv1.WorkflowRunning { - bucketKey := t.bucketFunc(key) - if _, ok := t.inProgress[bucketKey]; !ok { - t.inProgress[bucketKey] = make(bucket) - } - t.inProgress[bucketKey][key] = true - } - } - return nil -} - -func (t *throttler) Add(key Key, priority int32, creationTime time.Time) { - t.lock.Lock() - defer t.lock.Unlock() - if t.parallelism == 0 { - return - } - bucketKey := t.bucketFunc(key) - if _, ok := t.pending[bucketKey]; !ok { - t.pending[bucketKey] = &priorityQueue{itemByKey: make(map[string]*item)} - } - t.pending[bucketKey].add(key, priority, creationTime) - t.queueThrottled(bucketKey) -} - -func (t *throttler) Admit(key Key) bool { - t.lock.Lock() - defer t.lock.Unlock() - if t.parallelism == 0 { - return true - } - bucketKey := t.bucketFunc(key) - if x, ok := t.inProgress[bucketKey]; ok && x[key] { - return true - } - t.queueThrottled(bucketKey) - return false -} - -func (t *throttler) Remove(key Key) { - t.lock.Lock() - defer t.lock.Unlock() - bucketKey := t.bucketFunc(key) - if x, ok := t.inProgress[bucketKey]; ok { - delete(x, key) - } - if x, ok := t.pending[bucketKey]; ok { - x.remove(key) - } - t.queueThrottled(bucketKey) -} - -func (t *throttler) queueThrottled(bucketKey BucketKey) { - if _, ok := t.inProgress[bucketKey]; !ok { - t.inProgress[bucketKey] = make(bucket) - } - inProgress := t.inProgress[bucketKey] - pending, ok := t.pending[bucketKey] - for ok && pending.Len() > 0 && t.parallelism > len(inProgress) { - key := pending.pop().key - inProgress[key] = true - t.queue(key) - } -} - -type item struct { - key string - creationTime time.Time - priority int32 - index int -} - -type priorityQueue struct { - items []*item - itemByKey map[string]*item -} - -func (pq *priorityQueue) pop() *item { - return heap.Pop(pq).(*item) -} - -func (pq *priorityQueue) peek() *item { - return pq.items[0] -} - -func (pq *priorityQueue) add(key Key, priority int32, creationTime time.Time) { - if res, ok := pq.itemByKey[key]; ok { - if res.priority != priority { - res.priority = priority - heap.Fix(pq, res.index) - } - } else { - heap.Push(pq, &item{key: key, priority: priority, creationTime: creationTime}) - } -} - -func (pq *priorityQueue) remove(key Key) { - if item, ok := pq.itemByKey[key]; ok { - heap.Remove(pq, item.index) - delete(pq.itemByKey, key) - } -} - -func (pq priorityQueue) Len() int { return len(pq.items) } - -func (pq priorityQueue) Less(i, j int) bool { - if pq.items[i].priority == pq.items[j].priority { - return pq.items[i].creationTime.Before(pq.items[j].creationTime) - } - return pq.items[i].priority > pq.items[j].priority -} - -func (pq priorityQueue) Swap(i, j int) { - pq.items[i], pq.items[j] = pq.items[j], pq.items[i] - pq.items[i].index = i - pq.items[j].index = j -} - -func (pq *priorityQueue) Push(x interface{}) { - n := len(pq.items) - item := x.(*item) - item.index = n - pq.items = append(pq.items, item) - pq.itemByKey[item.key] = item -} - -func (pq *priorityQueue) Pop() interface{} { - old := pq.items - n := len(old) - item := old[n-1] - item.index = -1 - pq.items = old[0 : n-1] - delete(pq.itemByKey, item.key) - return item -} diff --git a/workflow/sync/throttler_test.go b/workflow/sync/throttler_test.go deleted file mode 100644 index 4337eb8bdf7c..000000000000 --- a/workflow/sync/throttler_test.go +++ /dev/null @@ -1,153 +0,0 @@ -package sync - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/cache" - - wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" - fakewfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake" -) - -func Test_NamespaceBucket(t *testing.T) { - assert.Equal(t, "a", NamespaceBucket("a/b")) -} - -func TestNoParallelismSamePriority(t *testing.T) { - throttler := NewThrottler(0, SingleBucket, nil) - - throttler.Add("c", 0, time.Now().Add(2*time.Hour)) - throttler.Add("b", 0, time.Now().Add(1*time.Hour)) - throttler.Add("a", 0, time.Now()) - - assert.True(t, throttler.Admit("a")) - assert.True(t, throttler.Admit("b")) - assert.True(t, throttler.Admit("c")) -} - -func TestNoParallelismMultipleBuckets(t *testing.T) { - throttler := NewThrottler(1, func(key Key) BucketKey { - namespace, _, _ := cache.SplitMetaNamespaceKey(key) - return namespace - }, func(key string) {}) - - throttler.Add("a/0", 0, time.Now()) - throttler.Add("a/1", 0, time.Now()) - throttler.Add("b/0", 0, time.Now()) - throttler.Add("b/1", 0, time.Now()) - - assert.True(t, throttler.Admit("a/0")) - assert.False(t, throttler.Admit("a/1")) - assert.True(t, throttler.Admit("b/0")) - throttler.Remove("a/0") - assert.True(t, throttler.Admit("a/1")) -} - -func TestWithParallelismLimitAndPriority(t *testing.T) { - queuedKey := "" - throttler := NewThrottler(2, SingleBucket, func(key string) { queuedKey = key }) - - throttler.Add("a", 1, time.Now()) - throttler.Add("b", 2, time.Now()) - throttler.Add("c", 3, time.Now()) - throttler.Add("d", 4, time.Now()) - - assert.True(t, throttler.Admit("a"), "is started, even though low priority") - assert.True(t, throttler.Admit("b"), "is started, even though low priority") - assert.False(t, throttler.Admit("c"), "cannot start") - assert.False(t, throttler.Admit("d"), "cannot start") - assert.Equal(t, "b", queuedKey) - queuedKey = "" - - throttler.Remove("a") - assert.True(t, throttler.Admit("b"), "stays running") - assert.True(t, throttler.Admit("d"), "top priority") - assert.False(t, throttler.Admit("c")) - assert.Equal(t, "d", queuedKey) - queuedKey = "" - - throttler.Remove("b") - assert.True(t, throttler.Admit("d"), "top priority") - assert.True(t, throttler.Admit("c"), "now running too") - assert.Equal(t, "c", queuedKey) -} - -func TestInitWithWorkflows(t *testing.T) { - queuedKey := "" - throttler := NewThrottler(1, SingleBucket, func(key string) { queuedKey = key }) - ctx := context.Background() - - wfclientset := fakewfclientset.NewSimpleClientset( - wfv1.MustUnmarshalWorkflow(` -apiVersion: argoproj.io/v1alpha1 -kind: Workflow -metadata: - labels: - workflows.argoproj.io/phase: Running - name: a - namespace: default -spec: - entrypoint: whalesay - templates: - - name: whalesay - container: - image: docker/whalesay:latest - command: [cowsay] - args: ["hello world"] -status: - phase: Running - startedAt: "2020-06-19T17:37:05Z" -`), - wfv1.MustUnmarshalWorkflow(` -apiVersion: argoproj.io/v1alpha1 -kind: Workflow -metadata: - labels: - workflows.argoproj.io/phase: Running - name: b - namespace: default -spec: - entrypoint: whalesay - templates: - - name: whalesay - container: - image: docker/whalesay:latest - command: [cowsay] - args: ["hello world"] -status: - phase: Running - startedAt: "2020-06-19T17:37:05Z" -`)) - wfList, err := wfclientset.ArgoprojV1alpha1().Workflows("default").List(ctx, metav1.ListOptions{}) - require.NoError(t, err) - err = throttler.Init(wfList.Items) - require.NoError(t, err) - assert.True(t, throttler.Admit("default/a")) - assert.True(t, throttler.Admit("default/b")) - - throttler.Add("default/c", 0, time.Now()) - throttler.Add("default/d", 0, time.Now()) - assert.False(t, throttler.Admit("default/c")) - assert.False(t, throttler.Admit("default/d")) - - throttler.Remove("default/a") - assert.Equal(t, "", queuedKey) - assert.False(t, throttler.Admit("default/c")) - assert.False(t, throttler.Admit("default/d")) - - queuedKey = "" - throttler.Remove("default/b") - assert.Equal(t, "default/c", queuedKey) - assert.True(t, throttler.Admit("default/c")) - assert.False(t, throttler.Admit("default/d")) - - queuedKey = "" - throttler.Remove("default/c") - assert.Equal(t, "default/d", queuedKey) - assert.True(t, throttler.Admit("default/d")) -}