Skip to content

Commit

Permalink
Merge pull request #2286 from barakmich/fix_migrations
Browse files Browse the repository at this point in the history
etcdserver: Canonicalize migrations
  • Loading branch information
barakmich committed Feb 12, 2015
2 parents 590205b + cd50f0e commit c6cc276
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 62 deletions.
46 changes: 1 addition & 45 deletions etcdmain/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/etcdhttp"
"github.com/coreos/etcd/pkg/cors"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/proxy"
Expand Down Expand Up @@ -88,13 +87,6 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
cfg.dir = fmt.Sprintf("%v.etcd", cfg.name)
log.Printf("no data-dir provided, using default data-dir ./%s", cfg.dir)
}
if err := makeMemberDir(cfg.dir); err != nil {
return nil, fmt.Errorf("cannot use /member sub-directory: %v", err)
}
membdir := path.Join(cfg.dir, "member")
if err := fileutil.IsDirWriteable(membdir); err != nil {
return nil, fmt.Errorf("cannot write to data directory: %v", err)
}

pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
if err != nil {
Expand Down Expand Up @@ -149,7 +141,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
Name: cfg.name,
ClientURLs: cfg.acurls,
PeerURLs: cfg.apurls,
DataDir: membdir,
DataDir: cfg.dir,
SnapCount: cfg.snapCount,
MaxSnapFiles: cfg.maxSnapFiles,
MaxWALFiles: cfg.maxWalFiles,
Expand Down Expand Up @@ -336,42 +328,6 @@ func setupCluster(cfg *config) (*etcdserver.Cluster, error) {
return cls, err
}

func makeMemberDir(dir string) error {
membdir := path.Join(dir, "member")
_, err := os.Stat(membdir)
switch {
case err == nil:
return nil
case !os.IsNotExist(err):
return err
}
if err := os.MkdirAll(membdir, 0700); err != nil {
return err
}
v1Files := types.NewUnsafeSet("conf", "log", "snapshot")
v2Files := types.NewUnsafeSet("wal", "snap")
names, err := fileutil.ReadDir(dir)
if err != nil {
return err
}
for _, name := range names {
switch {
case v1Files.Contains(name):
// Link it to the subdir and keep the v1 file at the original
// location, so v0.4 etcd can still bootstrap if the upgrade
// failed.
if err := os.Symlink(path.Join(dir, name), path.Join(membdir, name)); err != nil {
return err
}
case v2Files.Contains(name):
if err := os.Rename(path.Join(dir, name), path.Join(membdir, name)); err != nil {
return err
}
}
}
return nil
}

func genClusterString(name string, urls types.URLs) string {
addrs := make([]string, 0)
for _, u := range urls {
Expand Down
7 changes: 5 additions & 2 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ func (c *ServerConfig) VerifyBootstrapConfig() error {
return nil
}

func (c *ServerConfig) WALDir() string { return path.Join(c.DataDir, "wal") }
func (c *ServerConfig) MemberDir() string { return path.Join(c.DataDir, "member") }

func (c *ServerConfig) SnapDir() string { return path.Join(c.DataDir, "snap") }
func (c *ServerConfig) WALDir() string { return path.Join(c.MemberDir(), "wal") }

func (c *ServerConfig) SnapDir() string { return path.Join(c.MemberDir(), "snap") }

func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" }

Expand All @@ -99,6 +101,7 @@ func (c *ServerConfig) print(initial bool) {
log.Println("etcdserver: force new cluster")
}
log.Printf("etcdserver: data dir = %s", c.DataDir)
log.Printf("etcdserver: member dir = %s", c.MemberDir())
log.Printf("etcdserver: heartbeat = %dms", c.TickMs)
log.Printf("etcdserver: election = %dms", c.ElectionTicks*int(c.TickMs))
log.Printf("etcdserver: snapshot count = %d", c.SnapCount)
Expand Down
8 changes: 4 additions & 4 deletions etcdserver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ func TestBootstrapConfigVerify(t *testing.T) {

func TestSnapDir(t *testing.T) {
tests := map[string]string{
"/": "/snap",
"/var/lib/etc": "/var/lib/etc/snap",
"/": "/member/snap",
"/var/lib/etc": "/var/lib/etc/member/snap",
}
for dd, w := range tests {
cfg := ServerConfig{
Expand All @@ -144,8 +144,8 @@ func TestSnapDir(t *testing.T) {

func TestWALDir(t *testing.T) {
tests := map[string]string{
"/": "/wal",
"/var/lib/etc": "/var/lib/etc/wal",
"/": "/member/wal",
"/var/lib/etc": "/var/lib/etc/member/wal",
}
for dd, w := range tests {
cfg := ServerConfig{
Expand Down
19 changes: 14 additions & 5 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
var n raft.Node
var s *raft.MemoryStorage
var id types.ID
var ss *snap.Snapshotter

walVersion, err := wal.DetectVersion(cfg.DataDir)
if err != nil {
return nil, err
Expand All @@ -153,7 +155,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
}
haveWAL := walVersion != wal.WALNotExist

ss := snap.New(cfg.SnapDir())
switch {
case !haveWAL && !cfg.NewCluster:
us := getOtherPeerURLs(cfg.Cluster, cfg.Name)
Expand Down Expand Up @@ -189,15 +190,23 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
cfg.PrintWithInitial()
id, n, s, w = startNode(cfg, cfg.Cluster.MemberIDs())
case haveWAL:
if walVersion != wal.WALv0_5 {
if err := upgradeWAL(cfg, walVersion); err != nil {
return nil, err
}
// Run the migrations.
if err := upgradeWAL(cfg.DataDir, cfg.Name, walVersion); err != nil {
return nil, err
}

if err := fileutil.IsDirWriteable(cfg.DataDir); err != nil {
return nil, fmt.Errorf("cannot write to data directory: %v", err)
}

if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
return nil, fmt.Errorf("cannot write to member directory: %v", err)
}

if cfg.ShouldDiscover() {
log.Printf("etcdserver: discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
}
ss := snap.New(cfg.SnapDir())
snapshot, err := ss.Load()
if err != nil && err != snap.ErrNoSnapshot {
return nil, err
Expand Down
41 changes: 38 additions & 3 deletions etcdserver/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package etcdserver

import (
"log"
"os"
"path"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/migrate"
Expand Down Expand Up @@ -91,14 +93,47 @@ func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID,

// upgradeWAL converts an older version of the etcdServer data to the newest version.
// It must ensure that, after upgrading, the most recent version is present.
func upgradeWAL(cfg *ServerConfig, ver wal.WalVersion) error {
if ver == wal.WALv0_4 {
func upgradeWAL(baseDataDir string, name string, ver wal.WalVersion) error {
switch ver {
case wal.WALv0_4:
log.Print("etcdserver: converting v0.4 log to v2.0")
err := migrate.Migrate4To2(cfg.DataDir, cfg.Name)
err := migrate.Migrate4To2(baseDataDir, name)
if err != nil {
log.Fatalf("etcdserver: failed migrating data-dir: %v", err)
return err
}
fallthrough
case wal.WALv2_0:
err := makeMemberDir(baseDataDir)
if err != nil {
return err
}
fallthrough
case wal.WALv2_0_1:
fallthrough
default:
log.Printf("datadir is valid for the 2.0.1 format")
}
return nil
}

func makeMemberDir(dir string) error {
membdir := path.Join(dir, "member")
_, err := os.Stat(membdir)
switch {
case err == nil:
return nil
case !os.IsNotExist(err):
return err
}
if err := os.MkdirAll(membdir, 0700); err != nil {
return err
}
names := []string{"snap", "wal"}
for _, name := range names {
if err := os.Rename(path.Join(dir, name), path.Join(membdir, name)); err != nil {
return err
}
}
return nil
}
15 changes: 13 additions & 2 deletions wal/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ const (
WALUnknown WalVersion = "Unknown WAL"
WALNotExist WalVersion = "No WAL"
WALv0_4 WalVersion = "0.4.x"
WALv0_5 WalVersion = "0.5.x"
WALv2_0 WalVersion = "2.0.0"
WALv2_0_1 WalVersion = "2.0.1"
)

func DetectVersion(dirpath string) (WalVersion, error) {
Expand All @@ -48,10 +49,20 @@ func DetectVersion(dirpath string) (WalVersion, error) {
return WALNotExist, nil
}
nameSet := types.NewUnsafeSet(names...)
if nameSet.Contains("member") {
ver, err := DetectVersion(path.Join(dirpath, "member"))
if ver == WALv2_0 {
return WALv2_0_1, nil
} else if ver == WALv0_4 {
// How in the blazes did it get there?
return WALUnknown, nil
}
return ver, err
}
if nameSet.ContainsAll([]string{"snap", "wal"}) {
// .../wal cannot be empty to exist.
if Exist(path.Join(dirpath, "wal")) {
return WALv0_5, nil
return WALv2_0, nil
}
}
if nameSet.ContainsAll([]string{"snapshot", "conf", "log"}) {
Expand Down
3 changes: 2 additions & 1 deletion wal/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func TestDetectVersion(t *testing.T) {
wver WalVersion
}{
{[]string{}, WALNotExist},
{[]string{"snap/", "wal/", "wal/1"}, WALv0_5},
{[]string{"member/", "member/wal/", "member/wal/1", "member/snap/"}, WALv2_0_1},
{[]string{"snap/", "wal/", "wal/1"}, WALv2_0},
{[]string{"snapshot/", "conf", "log"}, WALv0_4},
{[]string{"weird"}, WALUnknown},
{[]string{"snap/", "wal/"}, WALUnknown},
Expand Down

0 comments on commit c6cc276

Please sign in to comment.