Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add MemberDowngradeUpgrade failpoint #19125

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 115 additions & 25 deletions tests/robustness/failpoint/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ import (
)

var (
MemberReplace Failpoint = memberReplace{}
MemberDowngrade Failpoint = memberDowngrade{}
MemberReplace Failpoint = memberReplace{}
MemberDowngrade Failpoint = memberDowngrade{}
MemberDowngradeUpgrade Failpoint = memberDowngradeUpgrade{}
)

type memberReplace struct{}
Expand Down Expand Up @@ -148,14 +149,12 @@ func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, member e2e
type memberDowngrade struct{}

func (f memberDowngrade) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) {
v, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd)
currentVersion, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd)
if err != nil {
return nil, err
}
targetVersion := semver.Version{Major: v.Major, Minor: v.Minor - 1}
lastVersion := &semver.Version{Major: currentVersion.Major, Minor: currentVersion.Minor - 1}
numberOfMembersToDowngrade := rand.Int()%len(clus.Procs) + 1
membersToDowngrade := rand.Perm(len(clus.Procs))[:numberOfMembersToDowngrade]
lg.Info("Test downgrading members", zap.Any("members", membersToDowngrade))

member := clus.Procs[0]
endpoints := []string{member.EndpointsGRPC()[0]}
Expand All @@ -173,27 +172,12 @@ func (f memberDowngrade) Inject(ctx context.Context, t *testing.T, lg *zap.Logge
// Need to wait health interval for cluster to accept changes
time.Sleep(etcdserver.HealthInterval)
lg.Info("Enable downgrade")
err = enableDowngrade(ctx, cc, &targetVersion)
err = enableDowngrade(ctx, cc, lastVersion)
if err != nil {
return nil, err
}
// Need to wait health interval for cluster to prepare for downgrade
time.Sleep(etcdserver.HealthInterval)

for _, memberID := range membersToDowngrade {
member = clus.Procs[memberID]
lg.Info("Downgrading member", zap.String("member", member.Config().Name))
if err = member.Stop(); err != nil {
return nil, err
}
member.Config().ExecPath = e2e.BinPath.EtcdLastRelease
lg.Info("Restarting member", zap.String("member", member.Config().Name))
err = member.Start(ctx)
if err != nil {
return nil, err
}
err = verifyVersion(t, clus, member, targetVersion)
}
err = downgradeUpgradeMembers(ctx, t, lg, clus, numberOfMembersToDowngrade, currentVersion, lastVersion)
time.Sleep(etcdserver.HealthInterval)
return nil, err
}
Expand All @@ -215,6 +199,64 @@ func (f memberDowngrade) Available(config e2e.EtcdProcessClusterConfig, member e
return v.Compare(v3_6) >= 0 && (config.Version == e2e.CurrentVersion && member.Config().ExecPath == e2e.BinPath.Etcd)
}

type memberDowngradeUpgrade struct{}

func (f memberDowngradeUpgrade) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) {
currentVersion, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd)
if err != nil {
return nil, err
}
lastVersion := &semver.Version{Major: currentVersion.Major, Minor: currentVersion.Minor - 1}

member := clus.Procs[0]
endpoints := []string{member.EndpointsGRPC()[0]}
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
DialKeepAliveTime: 10 * time.Second,
DialKeepAliveTimeout: 100 * time.Millisecond,
})
if err != nil {
return nil, err
}
defer cc.Close()

// Need to wait health interval for cluster to accept changes
time.Sleep(etcdserver.HealthInterval)
lg.Info("Enable downgrade")
err = enableDowngrade(ctx, cc, lastVersion)
if err != nil {
return nil, err
}
// downgrade all members first
err = downgradeUpgradeMembers(ctx, t, lg, clus, len(clus.Procs), currentVersion, lastVersion)
if err != nil {
return nil, err
}
// partial upgrade the cluster
numberOfMembersToUpgrade := rand.Int()%len(clus.Procs) + 1
err = downgradeUpgradeMembers(ctx, t, lg, clus, numberOfMembersToUpgrade, lastVersion, currentVersion)
time.Sleep(etcdserver.HealthInterval)
return nil, err
}

func (f memberDowngradeUpgrade) Name() string {
return "MemberDowngradeUpgrade"
}

func (f memberDowngradeUpgrade) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool {
if !fileutil.Exist(e2e.BinPath.EtcdLastRelease) {
return false
}
v, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd)
if err != nil {
panic("Failed checking etcd version binary")
}
v3_6 := semver.Version{Major: 3, Minor: 6}
// only current version cluster can be downgraded.
return v.Compare(v3_6) >= 0 && (config.Version == e2e.CurrentVersion && member.Config().ExecPath == e2e.BinPath.Etcd)
}

func getID(ctx context.Context, cc *clientv3.Client, name string) (id uint64, found bool, err error) {
// Ensure linearized MemberList by first making a linearized Get request from the same member.
// This is required for v3.4 support as it doesn't support linearized MemberList https://github.com/etcd-io/etcd/issues/18929
Expand Down Expand Up @@ -257,9 +299,57 @@ func enableDowngrade(ctx context.Context, cc *clientv3.Client, targetVersion *se
return err
}

func verifyVersion(t *testing.T, clus *e2e.EtcdProcessCluster, member e2e.EtcdProcess, expectedVersion semver.Version) error {
func downgradeUpgradeMembers(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, numberOfMembersToChange int, currentVersion, targetVersion *semver.Version) error {
isDowngrade := targetVersion.LessThan(*currentVersion)
opString := "upgrading"
newExecPath := e2e.BinPath.Etcd
if isDowngrade {
opString = "downgrading"
newExecPath = e2e.BinPath.EtcdLastRelease
}
membersToChange := rand.Perm(len(clus.Procs))[:numberOfMembersToChange]
lg.Info(fmt.Sprintf("Test %s members", opString), zap.Any("members", membersToChange))

// Need to wait health interval for cluster to prepare for downgrade/upgrade
time.Sleep(etcdserver.HealthInterval)

membersChanged := 0
for _, memberID := range membersToChange {
member := clus.Procs[memberID]
if member.Config().ExecPath == newExecPath {
return fmt.Errorf("member:%s is already running with the %s target binary - %s", member.Config().Name, opString, member.Config().ExecPath)
}
lg.Info(fmt.Sprintf("%s member", opString), zap.String("member", member.Config().Name))
if err := member.Stop(); err != nil {
return err
}
member.Config().ExecPath = newExecPath
lg.Info("Restarting member", zap.String("member", member.Config().Name))
err := member.Start(ctx)
if err != nil {
return err
}
membersChanged++
if isDowngrade {
err = verifyVersion(t, clus, member, targetVersion, targetVersion)
} else {
// for upgrade, the cluster version will be changed to the higher version if all the members have finished upgrading.
if membersChanged == len(clus.Procs) {
err = verifyVersion(t, clus, member, targetVersion, targetVersion)
} else {
err = verifyVersion(t, clus, member, targetVersion, currentVersion)
}
}
if err != nil {
return err
}
}
return nil
}

func verifyVersion(t *testing.T, clus *e2e.EtcdProcessCluster, member e2e.EtcdProcess, expectedServerVersion, expectedClusterVersion *semver.Version) error {
var err error
expected := fmt.Sprintf(`"etcdserver":"%d.%d\..*"etcdcluster":"%d\.%d\.`, expectedVersion.Major, expectedVersion.Minor, expectedVersion.Major, expectedVersion.Minor)
expected := fmt.Sprintf(`"etcdserver":"%d.%d\..*"etcdcluster":"%d\.%d\.`, expectedServerVersion.Major, expectedServerVersion.Minor, expectedClusterVersion.Major, expectedClusterVersion.Minor)
for i := 0; i < 35; i++ {
if err = e2e.CURLGetFromMember(clus, member, e2e.CURLReq{Endpoint: "/version", Expected: expect.ExpectedResponse{Value: expected, IsRegularExpr: true}}); err != nil {
t.Logf("#%d: v3 is not ready yet (%v)", i, err)
Expand Down
1 change: 1 addition & 0 deletions tests/robustness/failpoint/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var allFailpoints = []Failpoint{
BeforeApplyOneConfChangeSleep,
MemberReplace,
MemberDowngrade,
MemberDowngradeUpgrade,
DropPeerNetwork,
RaftBeforeSaveSleep,
RaftAfterSaveSleep,
Expand Down
Loading