From 61c6b271fab875d612b21503ecc367ababc7c7d5 Mon Sep 17 00:00:00 2001 From: Jimmy Moore Date: Mon, 20 Jan 2025 23:26:50 +0000 Subject: [PATCH] -some generics Signed-off-by: Jimmy Moore --- cmd/drafter-mounter/main.go | 3 --- cmd/drafter-peer/main.go | 2 +- pkg/mounter/migratable_mounter.go | 15 ++----------- pkg/peer/migrated_peer.go | 22 +++++++++---------- pkg/peer/peer.go | 35 ++++++++++--------------------- pkg/peer/resumed_peer.go | 29 +++++++++---------------- 6 files changed, 35 insertions(+), 71 deletions(-) diff --git a/cmd/drafter-mounter/main.go b/cmd/drafter-mounter/main.go index 069d226..5946b7b 100644 --- a/cmd/drafter-mounter/main.go +++ b/cmd/drafter-mounter/main.go @@ -510,9 +510,6 @@ l: OnAllDevicesSent: func() { log.Println("Sent all devices") }, - OnAllMigrationsCompleted: func() { - log.Println("Completed all device migrations") - }, OnProgress: func(p map[string]*migrator.MigrationProgress) { totalSize := 0 totalDone := 0 diff --git a/cmd/drafter-peer/main.go b/cmd/drafter-peer/main.go index 0874477..6cf9163 100644 --- a/cmd/drafter-peer/main.go +++ b/cmd/drafter-peer/main.go @@ -109,7 +109,7 @@ func main() { panic(err) } - p, err := peer.StartPeer[struct{}, ipc.AgentServerRemote[struct{}]]( + p, err := peer.StartPeer( goroutineManager.Context(), context.Background(), // Never give up on rescue operations diff --git a/pkg/mounter/migratable_mounter.go b/pkg/mounter/migratable_mounter.go index 0784462..696e88a 100644 --- a/pkg/mounter/migratable_mounter.go +++ b/pkg/mounter/migratable_mounter.go @@ -31,8 +31,7 @@ type MounterMigrateToHooks struct { OnProgress func(p map[string]*migrator.MigrationProgress) - OnAllDevicesSent func() - OnAllMigrationsCompleted func() + OnAllDevicesSent func() } func (migratableMounter *MigratableMounter) MigrateTo( @@ -56,15 +55,5 @@ func (migratableMounter *MigratableMounter) MigrateTo( }() vmStateMgr := common.NewDummyVMStateMgr(ctx) - err := common.MigrateToPipe(ctx, readers, writers, migratableMounter.Dg, concurrency, hooks.OnProgress, vmStateMgr, devices, nil) - - if err != nil { - return err - } - - if hooks.OnAllMigrationsCompleted != nil { - hooks.OnAllMigrationsCompleted() - } - - return + return common.MigrateToPipe(ctx, readers, writers, migratableMounter.Dg, concurrency, hooks.OnProgress, vmStateMgr, devices, nil) } diff --git a/pkg/peer/migrated_peer.go b/pkg/peer/migrated_peer.go index b1a1493..80d29cc 100644 --- a/pkg/peer/migrated_peer.go +++ b/pkg/peer/migrated_peer.go @@ -18,7 +18,7 @@ import ( "github.com/loopholelabs/silo/pkg/storage/devicegroup" ) -type MigratedPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any] struct { +type MigratedPeer struct { cancelCtx context.CancelFunc log types.Logger @@ -28,13 +28,13 @@ type MigratedPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any] str dgIncoming bool devices []common.MigrateFromDevice - runner *runner.Runner[L, R, G] + runner *runner.Runner[struct{}, ipc.AgentServerRemote[struct{}], struct{}] alreadyClosed bool alreadyWaited bool } -func (migratedPeer *MigratedPeer[L, R, G]) Close() error { +func (migratedPeer *MigratedPeer) Close() error { if migratedPeer.log != nil { migratedPeer.log.Debug().Msg("migratedPeer.Close") } @@ -56,7 +56,7 @@ func (migratedPeer *MigratedPeer[L, R, G]) Close() error { return migratedPeer.closeDG() } -func (migratedPeer *MigratedPeer[L, R, G]) Wait() error { +func (migratedPeer *MigratedPeer) Wait() error { if migratedPeer.log != nil { migratedPeer.log.Debug().Msg("migratedPeer.Wait") } @@ -92,14 +92,14 @@ func (migratedPeer *MigratedPeer[L, R, G]) Wait() error { return nil } -func (migratedPeer *MigratedPeer[L, R, G]) setDG(dg *devicegroup.DeviceGroup, incoming bool) { +func (migratedPeer *MigratedPeer) setDG(dg *devicegroup.DeviceGroup, incoming bool) { migratedPeer.dgLock.Lock() migratedPeer.dg = dg migratedPeer.dgIncoming = incoming migratedPeer.dgLock.Unlock() } -func (migratedPeer *MigratedPeer[L, R, G]) closeDG() error { +func (migratedPeer *MigratedPeer) closeDG() error { var err error migratedPeer.dgLock.Lock() if migratedPeer.dg != nil { @@ -110,18 +110,18 @@ func (migratedPeer *MigratedPeer[L, R, G]) closeDG() error { return err } -func (migratedPeer *MigratedPeer[L, R, G]) Resume( +func (migratedPeer *MigratedPeer) Resume( ctx context.Context, resumeTimeout, rescueTimeout time.Duration, - agentServerLocal L, - agentServerHooks ipc.AgentServerAcceptHooks[R, G], + agentServerLocal struct{}, + agentServerHooks ipc.AgentServerAcceptHooks[ipc.AgentServerRemote[struct{}], struct{}], snapshotLoadConfiguration runner.SnapshotLoadConfiguration, -) (*ResumedPeer[L, R, G], error) { - resumedPeer := &ResumedPeer[L, R, G]{ +) (*ResumedPeer, error) { + resumedPeer := &ResumedPeer{ dg: migratedPeer.dg, log: migratedPeer.log, } diff --git a/pkg/peer/peer.go b/pkg/peer/peer.go index 50a6aac..d8ea98d 100644 --- a/pkg/peer/peer.go +++ b/pkg/peer/peer.go @@ -14,7 +14,7 @@ import ( "github.com/loopholelabs/silo/pkg/storage/metrics" ) -type Peer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any] struct { +type Peer struct { log types.Logger VMPath string @@ -22,13 +22,13 @@ type Peer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any] struct { hypervisorCtx context.Context - runner *runner.Runner[L, R, G] + runner *runner.Runner[struct{}, ipc.AgentServerRemote[struct{}], struct{}] alreadyClosed bool alreadyWaited bool } -func (peer Peer[L, R, G]) Close() error { +func (peer Peer) Close() error { if peer.log != nil { peer.log.Debug().Msg("Peer.Wait") } @@ -51,7 +51,7 @@ func (peer Peer[L, R, G]) Close() error { return nil } -func (peer Peer[L, R, G]) Wait() error { +func (peer Peer) Wait() error { if peer.log != nil { peer.log.Debug().Msg("Peer.Wait") } @@ -70,23 +70,16 @@ func (peer Peer[L, R, G]) Wait() error { return nil } -func StartPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any]( - hypervisorCtx context.Context, - rescueCtx context.Context, - +func StartPeer(hypervisorCtx context.Context, rescueCtx context.Context, hypervisorConfiguration snapshotter.HypervisorConfiguration, - - stateName string, - memoryName string, - log types.Logger, -) (*Peer[L, R, G], error) { - peer := &Peer[L, R, G]{ + stateName string, memoryName string, log types.Logger) (*Peer, error) { + peer := &Peer{ hypervisorCtx: hypervisorCtx, log: log, } var err error - peer.runner, err = runner.StartRunner[L, R]( + peer.runner, err = runner.StartRunner[struct{}, ipc.AgentServerRemote[struct{}]]( hypervisorCtx, rescueCtx, hypervisorConfiguration, @@ -107,17 +100,11 @@ func StartPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any]( if log != nil { log.Info().Str("vmpath", peer.VMPath).Int("vmpid", peer.VMPid).Msg("started peer runner") } - return peer, nil } -func (peer *Peer[L, R, G]) MigrateFrom( - ctx context.Context, - devices []common.MigrateFromDevice, - readers []io.Reader, - writers []io.Writer, - hooks mounter.MigrateFromHooks, -) (*MigratedPeer[L, R, G], error) { +func (peer *Peer) MigrateFrom(ctx context.Context, devices []common.MigrateFromDevice, + readers []io.Reader, writers []io.Writer, hooks mounter.MigrateFromHooks) (*MigratedPeer, error) { if peer.log != nil { peer.log.Info().Msg("started MigrateFrom") @@ -153,7 +140,7 @@ func (peer *Peer[L, R, G]) MigrateFrom( return schema } - migratedPeer := &MigratedPeer[L, R, G]{ + migratedPeer := &MigratedPeer{ devices: devices, runner: peer.runner, log: peer.log, diff --git a/pkg/peer/resumed_peer.go b/pkg/peer/resumed_peer.go index 0c288a9..829d9db 100644 --- a/pkg/peer/resumed_peer.go +++ b/pkg/peer/resumed_peer.go @@ -13,10 +13,10 @@ import ( "github.com/loopholelabs/silo/pkg/storage/migrator" ) -type ResumedPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any] struct { +type ResumedPeer struct { dg *devicegroup.DeviceGroup - Remote R - resumedRunner *runner.ResumedRunner[L, R, G] + Remote ipc.AgentServerRemote[struct{}] + resumedRunner *runner.ResumedRunner[struct{}, ipc.AgentServerRemote[struct{}], struct{}] log types.Logger @@ -24,7 +24,7 @@ type ResumedPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any] stru alreadyWaited bool } -func (resumedPeer *ResumedPeer[L, R, G]) Wait() error { +func (resumedPeer *ResumedPeer) Wait() error { if resumedPeer.log != nil { resumedPeer.log.Debug().Msg("resumedPeer.Wait") } @@ -42,7 +42,7 @@ func (resumedPeer *ResumedPeer[L, R, G]) Wait() error { return nil } -func (resumedPeer *ResumedPeer[L, R, G]) Close() error { +func (resumedPeer *ResumedPeer) Close() error { if resumedPeer.log != nil { resumedPeer.log.Debug().Msg("resumedPeer.Close") } @@ -60,7 +60,7 @@ func (resumedPeer *ResumedPeer[L, R, G]) Close() error { return nil } -func (resumedPeer *ResumedPeer[L, R, G]) SuspendAndCloseAgentServer(ctx context.Context, resumeTimeout time.Duration) error { +func (resumedPeer *ResumedPeer) SuspendAndCloseAgentServer(ctx context.Context, resumeTimeout time.Duration) error { if resumedPeer.log != nil { resumedPeer.log.Debug().Msg("resumedPeer.SuspendAndCloseAgentServer") } @@ -90,15 +90,10 @@ type MigrateToHooks struct { * * */ -func (resumedPeer *ResumedPeer[L, R, G]) MigrateTo( - ctx context.Context, - devices []common.MigrateToDevice, - suspendTimeout time.Duration, - concurrency int, - readers []io.Reader, - writers []io.Writer, - hooks MigrateToHooks, -) error { +func (resumedPeer *ResumedPeer) MigrateTo(ctx context.Context, devices []common.MigrateToDevice, + suspendTimeout time.Duration, concurrency int, readers []io.Reader, writers []io.Writer, + hooks MigrateToHooks) error { + if resumedPeer.log != nil { resumedPeer.log.Info().Msg("resumedPeer.MigrateTo") } @@ -120,10 +115,6 @@ func (resumedPeer *ResumedPeer[L, R, G]) MigrateTo( return err } - if hooks.OnAllMigrationsCompleted != nil { - hooks.OnAllMigrationsCompleted() - } - if resumedPeer.log != nil { resumedPeer.log.Info().Msg("resumedPeer.MigrateTo completed successfuly") }