Skip to content

Commit

Permalink
-some generics
Browse files Browse the repository at this point in the history
Signed-off-by: Jimmy Moore <[email protected]>
  • Loading branch information
jimmyaxod committed Jan 20, 2025
1 parent 56e8056 commit 61c6b27
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 71 deletions.
3 changes: 0 additions & 3 deletions cmd/drafter-mounter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,9 +510,6 @@ l:
OnAllDevicesSent: func() {
log.Println("Sent all devices")
},
OnAllMigrationsCompleted: func() {
log.Println("Completed all device migrations")
},
OnProgress: func(p map[string]*migrator.MigrationProgress) {
totalSize := 0
totalDone := 0
Expand Down
2 changes: 1 addition & 1 deletion cmd/drafter-peer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func main() {
panic(err)
}

p, err := peer.StartPeer[struct{}, ipc.AgentServerRemote[struct{}]](
p, err := peer.StartPeer(
goroutineManager.Context(),
context.Background(), // Never give up on rescue operations

Expand Down
15 changes: 2 additions & 13 deletions pkg/mounter/migratable_mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ type MounterMigrateToHooks struct {

OnProgress func(p map[string]*migrator.MigrationProgress)

OnAllDevicesSent func()
OnAllMigrationsCompleted func()
OnAllDevicesSent func()
}

func (migratableMounter *MigratableMounter) MigrateTo(
Expand All @@ -56,15 +55,5 @@ func (migratableMounter *MigratableMounter) MigrateTo(
}()

vmStateMgr := common.NewDummyVMStateMgr(ctx)
err := common.MigrateToPipe(ctx, readers, writers, migratableMounter.Dg, concurrency, hooks.OnProgress, vmStateMgr, devices, nil)

if err != nil {
return err
}

if hooks.OnAllMigrationsCompleted != nil {
hooks.OnAllMigrationsCompleted()
}

return
return common.MigrateToPipe(ctx, readers, writers, migratableMounter.Dg, concurrency, hooks.OnProgress, vmStateMgr, devices, nil)
}
22 changes: 11 additions & 11 deletions pkg/peer/migrated_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/loopholelabs/silo/pkg/storage/devicegroup"
)

type MigratedPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any] struct {
type MigratedPeer struct {
cancelCtx context.CancelFunc

log types.Logger
Expand All @@ -28,13 +28,13 @@ type MigratedPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any] str
dgIncoming bool

devices []common.MigrateFromDevice
runner *runner.Runner[L, R, G]
runner *runner.Runner[struct{}, ipc.AgentServerRemote[struct{}], struct{}]

alreadyClosed bool
alreadyWaited bool
}

func (migratedPeer *MigratedPeer[L, R, G]) Close() error {
func (migratedPeer *MigratedPeer) Close() error {
if migratedPeer.log != nil {
migratedPeer.log.Debug().Msg("migratedPeer.Close")
}
Expand All @@ -56,7 +56,7 @@ func (migratedPeer *MigratedPeer[L, R, G]) Close() error {
return migratedPeer.closeDG()
}

func (migratedPeer *MigratedPeer[L, R, G]) Wait() error {
func (migratedPeer *MigratedPeer) Wait() error {
if migratedPeer.log != nil {
migratedPeer.log.Debug().Msg("migratedPeer.Wait")
}
Expand Down Expand Up @@ -92,14 +92,14 @@ func (migratedPeer *MigratedPeer[L, R, G]) Wait() error {
return nil
}

func (migratedPeer *MigratedPeer[L, R, G]) setDG(dg *devicegroup.DeviceGroup, incoming bool) {
func (migratedPeer *MigratedPeer) setDG(dg *devicegroup.DeviceGroup, incoming bool) {
migratedPeer.dgLock.Lock()
migratedPeer.dg = dg
migratedPeer.dgIncoming = incoming
migratedPeer.dgLock.Unlock()
}

func (migratedPeer *MigratedPeer[L, R, G]) closeDG() error {
func (migratedPeer *MigratedPeer) closeDG() error {
var err error
migratedPeer.dgLock.Lock()
if migratedPeer.dg != nil {
Expand All @@ -110,18 +110,18 @@ func (migratedPeer *MigratedPeer[L, R, G]) closeDG() error {
return err
}

func (migratedPeer *MigratedPeer[L, R, G]) Resume(
func (migratedPeer *MigratedPeer) Resume(
ctx context.Context,

resumeTimeout,
rescueTimeout time.Duration,

agentServerLocal L,
agentServerHooks ipc.AgentServerAcceptHooks[R, G],
agentServerLocal struct{},
agentServerHooks ipc.AgentServerAcceptHooks[ipc.AgentServerRemote[struct{}], struct{}],

snapshotLoadConfiguration runner.SnapshotLoadConfiguration,
) (*ResumedPeer[L, R, G], error) {
resumedPeer := &ResumedPeer[L, R, G]{
) (*ResumedPeer, error) {
resumedPeer := &ResumedPeer{
dg: migratedPeer.dg,
log: migratedPeer.log,
}
Expand Down
35 changes: 11 additions & 24 deletions pkg/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ import (
"github.com/loopholelabs/silo/pkg/storage/metrics"
)

type Peer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any] struct {
type Peer struct {
log types.Logger

VMPath string
VMPid int

hypervisorCtx context.Context

runner *runner.Runner[L, R, G]
runner *runner.Runner[struct{}, ipc.AgentServerRemote[struct{}], struct{}]

alreadyClosed bool
alreadyWaited bool
}

func (peer Peer[L, R, G]) Close() error {
func (peer Peer) Close() error {
if peer.log != nil {
peer.log.Debug().Msg("Peer.Wait")
}
Expand All @@ -51,7 +51,7 @@ func (peer Peer[L, R, G]) Close() error {
return nil
}

func (peer Peer[L, R, G]) Wait() error {
func (peer Peer) Wait() error {
if peer.log != nil {
peer.log.Debug().Msg("Peer.Wait")
}
Expand All @@ -70,23 +70,16 @@ func (peer Peer[L, R, G]) Wait() error {
return nil
}

func StartPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any](
hypervisorCtx context.Context,
rescueCtx context.Context,

func StartPeer(hypervisorCtx context.Context, rescueCtx context.Context,
hypervisorConfiguration snapshotter.HypervisorConfiguration,

stateName string,
memoryName string,
log types.Logger,
) (*Peer[L, R, G], error) {
peer := &Peer[L, R, G]{
stateName string, memoryName string, log types.Logger) (*Peer, error) {
peer := &Peer{
hypervisorCtx: hypervisorCtx,
log: log,
}

var err error
peer.runner, err = runner.StartRunner[L, R](
peer.runner, err = runner.StartRunner[struct{}, ipc.AgentServerRemote[struct{}]](
hypervisorCtx,
rescueCtx,
hypervisorConfiguration,
Expand All @@ -107,17 +100,11 @@ func StartPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any](
if log != nil {
log.Info().Str("vmpath", peer.VMPath).Int("vmpid", peer.VMPid).Msg("started peer runner")
}

return peer, nil
}

func (peer *Peer[L, R, G]) MigrateFrom(
ctx context.Context,
devices []common.MigrateFromDevice,
readers []io.Reader,
writers []io.Writer,
hooks mounter.MigrateFromHooks,
) (*MigratedPeer[L, R, G], error) {
func (peer *Peer) MigrateFrom(ctx context.Context, devices []common.MigrateFromDevice,
readers []io.Reader, writers []io.Writer, hooks mounter.MigrateFromHooks) (*MigratedPeer, error) {

if peer.log != nil {
peer.log.Info().Msg("started MigrateFrom")
Expand Down Expand Up @@ -153,7 +140,7 @@ func (peer *Peer[L, R, G]) MigrateFrom(
return schema
}

migratedPeer := &MigratedPeer[L, R, G]{
migratedPeer := &MigratedPeer{
devices: devices,
runner: peer.runner,
log: peer.log,
Expand Down
29 changes: 10 additions & 19 deletions pkg/peer/resumed_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ import (
"github.com/loopholelabs/silo/pkg/storage/migrator"
)

type ResumedPeer[L ipc.AgentServerLocal, R ipc.AgentServerRemote[G], G any] struct {
type ResumedPeer struct {
dg *devicegroup.DeviceGroup
Remote R
resumedRunner *runner.ResumedRunner[L, R, G]
Remote ipc.AgentServerRemote[struct{}]
resumedRunner *runner.ResumedRunner[struct{}, ipc.AgentServerRemote[struct{}], struct{}]

log types.Logger

alreadyClosed bool
alreadyWaited bool
}

func (resumedPeer *ResumedPeer[L, R, G]) Wait() error {
func (resumedPeer *ResumedPeer) Wait() error {
if resumedPeer.log != nil {
resumedPeer.log.Debug().Msg("resumedPeer.Wait")
}
Expand All @@ -42,7 +42,7 @@ func (resumedPeer *ResumedPeer[L, R, G]) Wait() error {
return nil
}

func (resumedPeer *ResumedPeer[L, R, G]) Close() error {
func (resumedPeer *ResumedPeer) Close() error {
if resumedPeer.log != nil {
resumedPeer.log.Debug().Msg("resumedPeer.Close")
}
Expand All @@ -60,7 +60,7 @@ func (resumedPeer *ResumedPeer[L, R, G]) Close() error {
return nil
}

func (resumedPeer *ResumedPeer[L, R, G]) SuspendAndCloseAgentServer(ctx context.Context, resumeTimeout time.Duration) error {
func (resumedPeer *ResumedPeer) SuspendAndCloseAgentServer(ctx context.Context, resumeTimeout time.Duration) error {
if resumedPeer.log != nil {
resumedPeer.log.Debug().Msg("resumedPeer.SuspendAndCloseAgentServer")
}
Expand Down Expand Up @@ -90,15 +90,10 @@ type MigrateToHooks struct {
*
*
*/
func (resumedPeer *ResumedPeer[L, R, G]) MigrateTo(
ctx context.Context,
devices []common.MigrateToDevice,
suspendTimeout time.Duration,
concurrency int,
readers []io.Reader,
writers []io.Writer,
hooks MigrateToHooks,
) error {
func (resumedPeer *ResumedPeer) MigrateTo(ctx context.Context, devices []common.MigrateToDevice,
suspendTimeout time.Duration, concurrency int, readers []io.Reader, writers []io.Writer,
hooks MigrateToHooks) error {

if resumedPeer.log != nil {
resumedPeer.log.Info().Msg("resumedPeer.MigrateTo")
}
Expand All @@ -120,10 +115,6 @@ func (resumedPeer *ResumedPeer[L, R, G]) MigrateTo(
return err
}

if hooks.OnAllMigrationsCompleted != nil {
hooks.OnAllMigrationsCompleted()
}

if resumedPeer.log != nil {
resumedPeer.log.Info().Msg("resumedPeer.MigrateTo completed successfuly")
}
Expand Down

0 comments on commit 61c6b27

Please sign in to comment.