Skip to content

Commit

Permalink
Simplify visitor command api
Browse files Browse the repository at this point in the history
Signed-off-by: Pablo Chacin <[email protected]>
  • Loading branch information
pablochacin committed Oct 17, 2023
1 parent 5406ee2 commit 80ce265
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 113 deletions.
104 changes: 52 additions & 52 deletions pkg/disruptors/commads.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,127 +119,127 @@ func buildCleanupCmd() []string {
return []string{"xk6-disruptor-agent", "cleanup"}
}

// PodHTTPFaultCommandGenerator implements the AgentCommandGenerator interface for injecting
// PodHTTPFaultCommand implements the PodVisitCommands interface for injecting
// HttpFaults in a Pod
type PodHTTPFaultCommandGenerator struct {
type PodHTTPFaultCommand struct {
fault HTTPFault
duration time.Duration
options HTTPDisruptionOptions
}

// GetCommands return the VisitCommands for injecting a HttpFault in a Pod
func (i PodHTTPFaultCommandGenerator) GetCommands(pod corev1.Pod) (VisitCommands, error) {
if !utils.HasPort(pod, i.fault.Port) {
return VisitCommands{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, i.fault.Port)
// Exec return the command for injecting a HttpFault in a Pod
func (c PodHTTPFaultCommand) Exec(pod corev1.Pod) ([]string, error) {
if !utils.HasPort(pod, c.fault.Port) {
return nil, fmt.Errorf("pod %q does not expose port %d", pod.Name, c.fault.Port)
}

if utils.HasHostNetwork(pod) {
return VisitCommands{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name)
return nil, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name)
}

targetAddress, err := utils.PodIP(pod)
if err != nil {
return VisitCommands{}, err
return nil, err
}

visitCommands := VisitCommands{
Exec: buildHTTPFaultCmd(targetAddress, i.fault, i.duration, i.options),
Cleanup: buildCleanupCmd(),
}
return buildHTTPFaultCmd(targetAddress, c.fault, c.duration, c.options), nil
}

return visitCommands, nil
// Cleanup defines the command to execute for cleaning up if command execution fails
func (c PodHTTPFaultCommand) Cleanup(_ corev1.Pod) []string {
return buildCleanupCmd()
}

// PodGrpcFaultCommandGenerator implements the AgentCommandGenerator interface for injecting GrpcFaults in a Pod
type PodGrpcFaultCommandGenerator struct {
// PodGrpcFaultCommand implements the PodVisitCommands interface for injecting GrpcFaults in a Pod
type PodGrpcFaultCommand struct {
fault GrpcFault
duration time.Duration
options GrpcDisruptionOptions
}

// GetCommands return the VisitCommands for injecting a GrpcFault in a Pod
func (i PodGrpcFaultCommandGenerator) GetCommands(pod corev1.Pod) (VisitCommands, error) {
if !utils.HasPort(pod, i.fault.Port) {
return VisitCommands{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, i.fault.Port)
// Exec return the command for injecting a GrpcFault in a Pod
func (c PodGrpcFaultCommand) Exec(pod corev1.Pod) ([]string, error) {
if !utils.HasPort(pod, c.fault.Port) {
return nil, fmt.Errorf("pod %q does not expose port %d", pod.Name, c.fault.Port)
}

targetAddress, err := utils.PodIP(pod)
if err != nil {
return VisitCommands{}, err
return nil, err
}

visitCommands := VisitCommands{
Exec: buildGrpcFaultCmd(targetAddress, i.fault, i.duration, i.options),
Cleanup: buildCleanupCmd(),
}
return buildGrpcFaultCmd(targetAddress, c.fault, c.duration, c.options), nil
}

return visitCommands, nil
// Cleanup defines the command to execute for cleaning up if command execution fails
func (c PodGrpcFaultCommand) Cleanup(_ corev1.Pod) []string {
return buildCleanupCmd()
}

// ServiceHTTPFaultCommandGenerator implements the AgentCommandGenerator interface for injecting HttpFaults in a Pod
type ServiceHTTPFaultCommandGenerator struct {
// ServiceHTTPFaultCommand implements the PodVisitCommands interface for injecting HttpFaults in a Pod
type ServiceHTTPFaultCommand struct {
service corev1.Service
fault HTTPFault
duration time.Duration
options HTTPDisruptionOptions
}

// GetCommands return the VisitCommands for injecting a HttpFault in a Service
func (i ServiceHTTPFaultCommandGenerator) GetCommands(pod corev1.Pod) (VisitCommands, error) {
port, err := utils.MapPort(i.service, i.fault.Port, pod)
// Exec return the command for injecting a HttpFault in a Service
func (c ServiceHTTPFaultCommand) Exec(pod corev1.Pod) ([]string, error) {
port, err := utils.MapPort(c.service, c.fault.Port, pod)
if err != nil {
return VisitCommands{}, err
return nil, err
}

if utils.HasHostNetwork(pod) {
return VisitCommands{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name)
return nil, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name)
}

// copy fault to change target port for the pod
podFault := i.fault
podFault := c.fault
podFault.Port = port

targetAddress, err := utils.PodIP(pod)
if err != nil {
return VisitCommands{}, err
return nil, err
}

visitCommands := VisitCommands{
Exec: buildHTTPFaultCmd(targetAddress, podFault, i.duration, i.options),
Cleanup: buildCleanupCmd(),
}
return buildHTTPFaultCmd(targetAddress, podFault, c.duration, c.options), nil
}

return visitCommands, nil
// Cleanup defines the command to execute for cleaning up if command execution fails
func (c ServiceHTTPFaultCommand) Cleanup(_ corev1.Pod) []string {
return buildCleanupCmd()
}

// ServiceGrpcFaultCommandGenerator implements the AgentCommandGenerator interface for injecting a
// ServiceGrpcFaultCommand implements the PodVisitCommands interface for injecting a
// GrpcFault in a Service
type ServiceGrpcFaultCommandGenerator struct {
type ServiceGrpcFaultCommand struct {
service corev1.Service
fault GrpcFault
duration time.Duration
options GrpcDisruptionOptions
}

// GetCommands return the VisitCommands for injecting a GrpcFault in a Pod
func (i ServiceGrpcFaultCommandGenerator) GetCommands(pod corev1.Pod) (VisitCommands, error) {
port, err := utils.MapPort(i.service, i.fault.Port, pod)
// Exec return the VisitCommands for injecting a GrpcFault in a Service
func (c ServiceGrpcFaultCommand) Exec(pod corev1.Pod) ([]string, error) {
port, err := utils.MapPort(c.service, c.fault.Port, pod)
if err != nil {
return VisitCommands{}, err
return nil, err
}

podFault := i.fault
podFault := c.fault
podFault.Port = port

targetAddress, err := utils.PodIP(pod)
if err != nil {
return VisitCommands{}, err
return nil, err
}

visitCommands := VisitCommands{
Exec: buildGrpcFaultCmd(targetAddress, podFault, i.duration, i.options),
Cleanup: buildCleanupCmd(),
}
return buildGrpcFaultCmd(targetAddress, podFault, c.duration, c.options), nil
}

return visitCommands, nil
// Cleanup defines the command to execute for cleaning up if command execution fails
func (c ServiceGrpcFaultCommand) Cleanup(_ corev1.Pod) []string {
return buildCleanupCmd()
}
15 changes: 6 additions & 9 deletions pkg/disruptors/commads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,13 @@ func Test_PodHTTPFaultCommandGenerator(t *testing.T) {
t.Run(tc.title, func(t *testing.T) {
t.Parallel()

visitor := PodHTTPFaultCommandGenerator{
cmd := PodHTTPFaultCommand{
fault: tc.fault,
duration: tc.duration,
options: tc.opts,
}

cmds, err := visitor.GetCommands(tc.target)

exec, err := cmd.Exec(tc.target)
if tc.expectError && err == nil {
t.Errorf("should had failed")
return
Expand All @@ -171,8 +170,7 @@ func Test_PodHTTPFaultCommandGenerator(t *testing.T) {
return
}

exec := strings.Join(cmds.Exec, " ")
if !command.AssertCmdEquals(exec, tc.expectedCmd) {
if !command.AssertCmdEquals(strings.Join(exec, " "), tc.expectedCmd) {
t.Errorf("expected command: %s got: %s", tc.expectedCmd, exec)
}
})
Expand Down Expand Up @@ -262,13 +260,13 @@ func Test_PodGrpcPFaultCommandGenerator(t *testing.T) {
t.Run(tc.title, func(t *testing.T) {
t.Parallel()

visitor := PodGrpcFaultCommandGenerator{
cmd := PodGrpcFaultCommand{
fault: tc.fault,
duration: tc.duration,
options: tc.opts,
}

cmds, err := visitor.GetCommands(tc.target)
exec, err := cmd.Exec(tc.target)

if tc.expectError && err == nil {
t.Errorf("should had failed")
Expand All @@ -280,8 +278,7 @@ func Test_PodGrpcPFaultCommandGenerator(t *testing.T) {
return
}

exec := strings.Join(cmds.Exec, " ")
if !command.AssertCmdEquals(exec, tc.expectedCmd) {
if !command.AssertCmdEquals(strings.Join(exec, " "), tc.expectedCmd) {
t.Errorf("expected command: %s got: %s", tc.expectedCmd, exec)
}
})
Expand Down
31 changes: 13 additions & 18 deletions pkg/disruptors/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,12 @@ import (
corev1 "k8s.io/api/core/v1"
)

// AgentCommandGenerator defines the interface for generating agent commands
type AgentCommandGenerator interface {
// GetCommands returns the VisitCommands for visiting the Pod
GetCommands(pod corev1.Pod) (VisitCommands, error)
}

// VisitCommands define the commands used for visiting a Pod
type VisitCommands struct {
// PodVisitCommand define the commands used for visiting a Pod
type PodVisitCommand interface {
// Exec defines the command to be executed
Exec []string
Exec(corev1.Pod) ([]string, error)
// Cleanup defines the command to execute for cleaning up if command execution fails
Cleanup []string
Cleanup(corev1.Pod) []string
}

// PodController defines the interface for controlling a set of target Pods
Expand All @@ -42,7 +36,7 @@ type PodAgentVisitorOptions struct {

// PodVisitor defines the methods for executing actions in a target Pod
type PodVisitor interface {
Visit(context.Context, corev1.Pod, AgentCommandGenerator) error
Visit(context.Context, corev1.Pod, PodVisitCommand) error
}

// PodAgentVisitor executes actions in a Pod using the Agent
Expand Down Expand Up @@ -107,26 +101,27 @@ func (c *PodAgentVisitor) injectDisruptorAgent(ctx context.Context, pod corev1.P
}

// Visit allows executing a different command on each target returned by a visiting function
func (c *PodAgentVisitor) Visit(ctx context.Context, pod corev1.Pod, commandGenerator AgentCommandGenerator) error {
func (c *PodAgentVisitor) Visit(ctx context.Context, pod corev1.Pod, commands PodVisitCommand) error {
err := c.injectDisruptorAgent(ctx, pod)
if err != nil {
return fmt.Errorf("injecting agent in the pod %q: %w", pod.Name, err)
}

// get the command to execute in the target
visitCommands, err := commandGenerator.GetCommands(pod)
execCommand, err := commands.Exec(pod)
if err != nil {
return fmt.Errorf("unable to get command for pod %q: %w", pod.Name, err)
}

_, stderr, err := c.helper.Exec(ctx, pod.Name, "xk6-agent", visitCommands.Exec, []byte{})
_, stderr, err := c.helper.Exec(ctx, pod.Name, "xk6-agent", execCommand, []byte{})

// if command failed, ensure the agent execution is terminated
if err != nil && visitCommands.Cleanup != nil {
cleanupCommand := commands.Cleanup(pod)
if err != nil && cleanupCommand != nil {
// we ignore errors because we are reporting the reason of the exec failure
// we use a fresh context because the context used in exec may have been cancelled or expired
//nolint:contextcheck
_, _, _ = c.helper.Exec(context.TODO(), pod.Name, "xk6-agent", visitCommands.Cleanup, []byte{})
_, _, _ = c.helper.Exec(context.TODO(), pod.Name, "xk6-agent", cleanupCommand, []byte{})
}

// if the context is cancelled, don't report error (we assume the caller is reporting this error)
Expand All @@ -146,7 +141,7 @@ func NewAgentController(targets []corev1.Pod, visitor PodVisitor) *PodController
}

// Visit allows executing a different command on each target returned by a visiting function
func (c *PodController) Visit(ctx context.Context, commandGen AgentCommandGenerator) error {
func (c *PodController) Visit(ctx context.Context, commands PodVisitCommand) error {
// if there are no targets, nothing to do
if len(c.targets) == 0 {
return nil
Expand All @@ -163,7 +158,7 @@ func (c *PodController) Visit(ctx context.Context, commandGen AgentCommandGenera
for _, pod := range c.targets {
wg.Add(1)
go func(pod corev1.Pod) {
if err := c.visitor.Visit(visitCtx, pod, commandGen); err != nil {
if err := c.visitor.Visit(visitCtx, pod, commands); err != nil {
errCh <- err
}

Expand Down
Loading

0 comments on commit 80ce265

Please sign in to comment.