From c6e3a1863052cb61121bc58f47be41baf26fc9ce Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Fri, 3 Jan 2025 11:42:52 -0800 Subject: [PATCH] add MemberDowngradeUpgrade failpoint Signed-off-by: Siyuan Zhang --- tests/robustness/failpoint/cluster.go | 140 +++++++++++++++++++----- tests/robustness/failpoint/failpoint.go | 1 + 2 files changed, 116 insertions(+), 25 deletions(-) diff --git a/tests/robustness/failpoint/cluster.go b/tests/robustness/failpoint/cluster.go index 95b36260991..93ae2d593cf 100644 --- a/tests/robustness/failpoint/cluster.go +++ b/tests/robustness/failpoint/cluster.go @@ -39,8 +39,9 @@ import ( ) var ( - MemberReplace Failpoint = memberReplace{} - MemberDowngrade Failpoint = memberDowngrade{} + MemberReplace Failpoint = memberReplace{} + MemberDowngrade Failpoint = memberDowngrade{} + MemberDowngradeUpgrade Failpoint = memberDowngradeUpgrade{} ) type memberReplace struct{} @@ -148,14 +149,12 @@ func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, member e2e type memberDowngrade struct{} func (f memberDowngrade) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { - v, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd) + currentVersion, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd) if err != nil { return nil, err } - targetVersion := semver.Version{Major: v.Major, Minor: v.Minor - 1} + lastVersion := &semver.Version{Major: currentVersion.Major, Minor: currentVersion.Minor - 1} numberOfMembersToDowngrade := rand.Int()%len(clus.Procs) + 1 - membersToDowngrade := rand.Perm(len(clus.Procs))[:numberOfMembersToDowngrade] - lg.Info("Test downgrading members", zap.Any("members", membersToDowngrade)) member := clus.Procs[0] endpoints := []string{member.EndpointsGRPC()[0]} @@ -173,27 +172,12 @@ func (f memberDowngrade) Inject(ctx context.Context, t *testing.T, lg *zap.Logge // Need to wait health interval for cluster to accept changes time.Sleep(etcdserver.HealthInterval) lg.Info("Enable downgrade") - err = enableDowngrade(ctx, cc, &targetVersion) + err = enableDowngrade(ctx, cc, lastVersion) if err != nil { return nil, err } - // Need to wait health interval for cluster to prepare for downgrade - time.Sleep(etcdserver.HealthInterval) - for _, memberID := range membersToDowngrade { - member = clus.Procs[memberID] - lg.Info("Downgrading member", zap.String("member", member.Config().Name)) - if err = member.Stop(); err != nil { - return nil, err - } - member.Config().ExecPath = e2e.BinPath.EtcdLastRelease - lg.Info("Restarting member", zap.String("member", member.Config().Name)) - err = member.Start(ctx) - if err != nil { - return nil, err - } - err = verifyVersion(t, clus, member, targetVersion) - } + err = downgradeUpgradeMembers(ctx, t, lg, clus, numberOfMembersToDowngrade, currentVersion, lastVersion) time.Sleep(etcdserver.HealthInterval) return nil, err } @@ -215,6 +199,64 @@ func (f memberDowngrade) Available(config e2e.EtcdProcessClusterConfig, member e return v.Compare(v3_6) >= 0 && (config.Version == e2e.CurrentVersion && member.Config().ExecPath == e2e.BinPath.Etcd) } +type memberDowngradeUpgrade struct{} + +func (f memberDowngradeUpgrade) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { + currentVersion, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd) + if err != nil { + return nil, err + } + lastVersion := &semver.Version{Major: currentVersion.Major, Minor: currentVersion.Minor - 1} + + member := clus.Procs[0] + endpoints := []string{member.EndpointsGRPC()[0]} + cc, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + Logger: zap.NewNop(), + DialKeepAliveTime: 10 * time.Second, + DialKeepAliveTimeout: 100 * time.Millisecond, + }) + if err != nil { + return nil, err + } + defer cc.Close() + + // Need to wait health interval for cluster to accept changes + time.Sleep(etcdserver.HealthInterval) + lg.Info("Enable downgrade") + err = enableDowngrade(ctx, cc, lastVersion) + if err != nil { + return nil, err + } + // downgrade all members first + err = downgradeUpgradeMembers(ctx, t, lg, clus, len(clus.Procs), currentVersion, lastVersion) + if err != nil { + return nil, err + } + // partial upgrade the cluster + numberOfMembersToUpgrade := rand.Int()%len(clus.Procs) + 1 + err = downgradeUpgradeMembers(ctx, t, lg, clus, numberOfMembersToUpgrade, lastVersion, currentVersion) + time.Sleep(etcdserver.HealthInterval) + return nil, err +} + +func (f memberDowngradeUpgrade) Name() string { + return "MemberDowngradeUpgrade" +} + +func (f memberDowngradeUpgrade) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool { + if !fileutil.Exist(e2e.BinPath.EtcdLastRelease) { + return false + } + v, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd) + if err != nil { + panic("Failed checking etcd version binary") + } + v3_6 := semver.Version{Major: 3, Minor: 6} + // only current version cluster can be downgraded. + return v.Compare(v3_6) >= 0 && (config.Version == e2e.CurrentVersion && member.Config().ExecPath == e2e.BinPath.Etcd) +} + func getID(ctx context.Context, cc *clientv3.Client, name string) (id uint64, found bool, err error) { // Ensure linearized MemberList by first making a linearized Get request from the same member. // This is required for v3.4 support as it doesn't support linearized MemberList https://github.com/etcd-io/etcd/issues/18929 @@ -257,9 +299,57 @@ func enableDowngrade(ctx context.Context, cc *clientv3.Client, targetVersion *se return err } -func verifyVersion(t *testing.T, clus *e2e.EtcdProcessCluster, member e2e.EtcdProcess, expectedVersion semver.Version) error { +func downgradeUpgradeMembers(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, numberOfMembersToChange int, currentVersion, targetVersion *semver.Version) error { + isDowngrade := targetVersion.LessThan(*currentVersion) + opString := "upgrading" + newExecPath := e2e.BinPath.Etcd + if isDowngrade { + opString = "downgrading" + newExecPath = e2e.BinPath.EtcdLastRelease + } + membersToChange := rand.Perm(len(clus.Procs))[:numberOfMembersToChange] + lg.Info(fmt.Sprintf("Test %s members", opString), zap.Any("members", membersToChange)) + + // Need to wait health interval for cluster to prepare for downgrade/upgrade + time.Sleep(etcdserver.HealthInterval) + + membersChanged := 0 + for _, memberID := range membersToChange { + member := clus.Procs[memberID] + if member.Config().ExecPath == newExecPath { + return fmt.Errorf("member:%s is already running with the %s target binary - %s", member.Config().Name, opString, member.Config().ExecPath) + } + lg.Info(fmt.Sprintf("%s member", opString), zap.String("member", member.Config().Name)) + if err := member.Stop(); err != nil { + return err + } + member.Config().ExecPath = newExecPath + lg.Info("Restarting member", zap.String("member", member.Config().Name)) + err := member.Start(ctx) + if err != nil { + return err + } + membersChanged++ + if isDowngrade { + err = verifyVersion(t, clus, member, targetVersion, targetVersion) + } else { + // for upgrade, the cluster version will be changed to the higher version if all the members have finished upgrading. + if membersChanged == len(clus.Procs) { + err = verifyVersion(t, clus, member, targetVersion, targetVersion) + } else { + err = verifyVersion(t, clus, member, targetVersion, currentVersion) + } + } + if err != nil { + return err + } + } + return nil +} + +func verifyVersion(t *testing.T, clus *e2e.EtcdProcessCluster, member e2e.EtcdProcess, expectedServerVersion, expectedClusterVersion *semver.Version) error { var err error - expected := fmt.Sprintf(`"etcdserver":"%d.%d\..*"etcdcluster":"%d\.%d\.`, expectedVersion.Major, expectedVersion.Minor, expectedVersion.Major, expectedVersion.Minor) + expected := fmt.Sprintf(`"etcdserver":"%d.%d\..*"etcdcluster":"%d\.%d\.`, expectedServerVersion.Major, expectedServerVersion.Minor, expectedClusterVersion.Major, expectedClusterVersion.Minor) for i := 0; i < 35; i++ { if err = e2e.CURLGetFromMember(clus, member, e2e.CURLReq{Endpoint: "/version", Expected: expect.ExpectedResponse{Value: expected, IsRegularExpr: true}}); err != nil { t.Logf("#%d: v3 is not ready yet (%v)", i, err) diff --git a/tests/robustness/failpoint/failpoint.go b/tests/robustness/failpoint/failpoint.go index 17c0d11b8e7..6522b6942da 100644 --- a/tests/robustness/failpoint/failpoint.go +++ b/tests/robustness/failpoint/failpoint.go @@ -47,6 +47,7 @@ var allFailpoints = []Failpoint{ BeforeApplyOneConfChangeSleep, MemberReplace, MemberDowngrade, + MemberDowngradeUpgrade, DropPeerNetwork, RaftBeforeSaveSleep, RaftAfterSaveSleep,