Skip to content

Commit

Permalink
feat: add round robin in remote module
Browse files Browse the repository at this point in the history
  • Loading branch information
xjlgod committed Nov 18, 2023
1 parent 4330268 commit 9f4a51c
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/remoting/loadbalance/loadbalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func Select(loadBalanceType string, sessions *sync.Map, xid string) getty.Sessio
return RandomLoadBalance(sessions, xid)
case xidLoadBalance:
return XidLoadBalance(sessions, xid)
case roundRobinLoadBalance:
return RoundRobinLoadBalance(sessions, xid)
default:
return RandomLoadBalance(sessions, xid)
}
Expand Down
69 changes: 69 additions & 0 deletions pkg/remoting/loadbalance/round_robin_loadbalance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package loadbalance

import (
"math"
"sort"
"sync"
"sync/atomic"

getty "github.com/apache/dubbo-getty"
)

var sequence int32

func RoundRobinLoadBalance(sessions *sync.Map, s string) getty.Session {
// collect sync.Map adderToSession
// filter out closed session instance
adderToSession := make(map[string]getty.Session, 0)
// map has no sequence, we should sort it to make sure the sequence is always the same
adders := make([]string, 0)
sessions.Range(func(key, value interface{}) bool {
session := key.(getty.Session)
if session.IsClosed() {
sessions.Delete(key)
} else {
adderToSession[session.RemoteAddr()] = session
adders = append(adders, session.RemoteAddr())
}
return true
})
sort.Strings(adders)
// adderToSession eq 0 means there are no available session
if len(adderToSession) == 0 {
return nil
}
index := getPositiveSequence() % len(adderToSession)
return adderToSession[adders[index]]
}

func getPositiveSequence() int {
for {
current := atomic.LoadInt32(&sequence)
next := current

Check failure on line 59 in pkg/remoting/loadbalance/round_robin_loadbalance.go

View workflow job for this annotation

GitHub Actions / build (1.18)

ineffectual assignment to next (ineffassign)
if current == math.MaxInt32 {
next = 0
} else {
next = current + 1
}
if atomic.CompareAndSwapInt32(&sequence, current, next) {
return int(current)
}
}
}
100 changes: 100 additions & 0 deletions pkg/remoting/loadbalance/round_robin_loadbalance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package loadbalance

import (
"fmt"
"math"
"sync"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

"github.com/seata/seata-go/pkg/remoting/mock"
)

func TestRoundRobinLoadBalance_Normal(t *testing.T) {
ctrl := gomock.NewController(t)
sessions := &sync.Map{}

for i := 0; i < 10; i++ {
session := mock.NewMockTestSession(ctrl)
session.EXPECT().IsClosed().Return(i == 2).AnyTimes()
session.EXPECT().RemoteAddr().Return(fmt.Sprintf("%d", i)).AnyTimes()
sessions.Store(session, fmt.Sprintf("session-%d", i+1))
}

for i := 0; i < 10; i++ {
if i == 2 {
continue
}
result := RoundRobinLoadBalance(sessions, "some_xid")
assert.Equal(t, fmt.Sprintf("%d", i), result.RemoteAddr())
assert.NotNil(t, result)
assert.False(t, result.IsClosed())
}
}

func TestRoundRobinLoadBalance_OverSequence(t *testing.T) {
ctrl := gomock.NewController(t)
sessions := &sync.Map{}
sequence = math.MaxInt32

for i := 0; i < 10; i++ {
session := mock.NewMockTestSession(ctrl)
session.EXPECT().IsClosed().Return(false).AnyTimes()
session.EXPECT().RemoteAddr().Return(fmt.Sprintf("%d", i)).AnyTimes()
sessions.Store(session, fmt.Sprintf("session-%d", i+1))
}

for i := 0; i < 10; i++ {
// over sequence here
if i == 0 {
result := RoundRobinLoadBalance(sessions, "some_xid")
assert.Equal(t, "7", result.RemoteAddr())
assert.NotNil(t, result)
assert.False(t, result.IsClosed())
continue
}
result := RoundRobinLoadBalance(sessions, "some_xid")
assert.Equal(t, fmt.Sprintf("%d", i-1), result.RemoteAddr())
assert.NotNil(t, result)
assert.False(t, result.IsClosed())
}
}

func TestRoundRobinLoadBalance_All_Closed(t *testing.T) {
ctrl := gomock.NewController(t)
sessions := &sync.Map{}
for i := 0; i < 10; i++ {
session := mock.NewMockTestSession(ctrl)
session.EXPECT().IsClosed().Return(true).AnyTimes()
sessions.Store(session, fmt.Sprintf("session-%d", i+1))
}
if result := RoundRobinLoadBalance(sessions, "some_xid"); result != nil {
t.Errorf("Expected nil, actual got %+v", result)
}
}

func TestRoundRobinLoadBalance_Empty(t *testing.T) {
sessions := &sync.Map{}
if result := RoundRobinLoadBalance(sessions, "some_xid"); result != nil {
t.Errorf("Expected nil, actual got %+v", result)
}
}

0 comments on commit 9f4a51c

Please sign in to comment.