From b5d3813ad5ce5137fa302459bb955245dfbd826e Mon Sep 17 00:00:00 2001 From: pablochacin Date: Thu, 19 Oct 2023 11:39:41 +0200 Subject: [PATCH] Refactor agent controller (#355) * Refactor agent controller * Inject Agent on each visit * Rename PodVisitor to avoid repetitive names * Change naming to better reflect functions * Simplify visitor command API * Refactor PodController interface Signed-off-by: Pablo Chacin --- pkg/disruptors/cmd_builders.go | 119 -------- pkg/disruptors/commads.go | 242 +++++++++++++++ .../{visitor_test.go => commads_test.go} | 171 +---------- pkg/disruptors/controller.go | 274 ++++++++--------- pkg/disruptors/controller_test.go | 285 ++++++++---------- pkg/disruptors/pod.go | 37 ++- pkg/disruptors/pod_test.go | 151 ++++++++++ pkg/disruptors/service.go | 37 ++- pkg/disruptors/visitor.go | 132 -------- 9 files changed, 712 insertions(+), 736 deletions(-) delete mode 100644 pkg/disruptors/cmd_builders.go create mode 100644 pkg/disruptors/commads.go rename pkg/disruptors/{visitor_test.go => commads_test.go} (62%) create mode 100644 pkg/disruptors/pod_test.go delete mode 100644 pkg/disruptors/visitor.go diff --git a/pkg/disruptors/cmd_builders.go b/pkg/disruptors/cmd_builders.go deleted file mode 100644 index bea2da71..00000000 --- a/pkg/disruptors/cmd_builders.go +++ /dev/null @@ -1,119 +0,0 @@ -package disruptors - -import ( - "fmt" - "time" - - "github.com/grafana/xk6-disruptor/pkg/utils" -) - -func buildGrpcFaultCmd( - targetAddress string, - fault GrpcFault, - duration time.Duration, - options GrpcDisruptionOptions, -) []string { - cmd := []string{ - "xk6-disruptor-agent", - "grpc", - "-d", utils.DurationSeconds(duration), - "-t", fmt.Sprint(fault.Port), - } - - // TODO: make port mandatory - if fault.Port != 0 { - cmd = append(cmd, "-t", fmt.Sprint(fault.Port)) - } - - if fault.AverageDelay > 0 { - cmd = append( - cmd, - "-a", - utils.DurationMillSeconds(fault.AverageDelay), - "-v", - utils.DurationMillSeconds(fault.DelayVariation), - ) - } - - if fault.ErrorRate > 0 { - cmd = append( - cmd, - "-s", - fmt.Sprint(fault.StatusCode), - "-r", - fmt.Sprint(fault.ErrorRate), - ) - if fault.StatusMessage != "" { - cmd = append(cmd, "-m", fault.StatusMessage) - } - } - - if len(fault.Exclude) > 0 { - cmd = append(cmd, "-x", fault.Exclude) - } - - if options.ProxyPort != 0 { - cmd = append(cmd, "-p", fmt.Sprint(options.ProxyPort)) - } - - cmd = append(cmd, "--upstream-host", targetAddress) - - return cmd -} - -func buildHTTPFaultCmd( - targetAddress string, - fault HTTPFault, - duration time.Duration, - options HTTPDisruptionOptions, -) []string { - cmd := []string{ - "xk6-disruptor-agent", - "http", - "-d", utils.DurationSeconds(duration), - } - - // TODO: make port mandatory - if fault.Port != 0 { - cmd = append(cmd, "-t", fmt.Sprint(fault.Port)) - } - - if fault.AverageDelay > 0 { - cmd = append( - cmd, - "-a", - utils.DurationMillSeconds(fault.AverageDelay), - "-v", - utils.DurationMillSeconds(fault.DelayVariation), - ) - } - - if fault.ErrorRate > 0 { - cmd = append( - cmd, - "-e", - fmt.Sprint(fault.ErrorCode), - "-r", - fmt.Sprint(fault.ErrorRate), - ) - if fault.ErrorBody != "" { - cmd = append(cmd, "-b", fault.ErrorBody) - } - } - - if len(fault.Exclude) > 0 { - cmd = append(cmd, "-x", fault.Exclude) - } - - if options.ProxyPort != 0 { - cmd = append(cmd, "-p", fmt.Sprint(options.ProxyPort)) - } - - cmd = append(cmd, "--upstream-host", targetAddress) - - return cmd -} - -func buildCleanupCmd() []string { - return []string{"xk6-disruptor-agent", "cleanup"} -} diff --git a/pkg/disruptors/commads.go b/pkg/disruptors/commads.go new file mode 100644 index 00000000..add27303 --- /dev/null +++ b/pkg/disruptors/commads.go @@ -0,0 +1,242 @@ +package disruptors + +import ( + "fmt" + "time" + + "github.com/grafana/xk6-disruptor/pkg/utils" + corev1 "k8s.io/api/core/v1" +) + +func buildGrpcFaultCmd( + targetAddress string, + fault GrpcFault, + duration time.Duration, + options GrpcDisruptionOptions, +) []string { + cmd := []string{ + "xk6-disruptor-agent", + "grpc", + "-d", utils.DurationSeconds(duration), + "-t", fmt.Sprint(fault.Port), + } + + // TODO: make port mandatory + if fault.Port != 0 { + cmd = append(cmd, "-t", fmt.Sprint(fault.Port)) + } + + if fault.AverageDelay > 0 { + cmd = append( + cmd, + "-a", + utils.DurationMillSeconds(fault.AverageDelay), + "-v", + utils.DurationMillSeconds(fault.DelayVariation), + ) + } + + if fault.ErrorRate > 0 { + cmd = append( + cmd, + "-s", + fmt.Sprint(fault.StatusCode), + "-r", + fmt.Sprint(fault.ErrorRate), + ) + if fault.StatusMessage != "" { + cmd = append(cmd, "-m", fault.StatusMessage) + } + } + + if len(fault.Exclude) > 0 { + cmd = append(cmd, "-x", fault.Exclude) + } + + if options.ProxyPort != 0 { + cmd = append(cmd, "-p", fmt.Sprint(options.ProxyPort)) + } + + cmd = append(cmd, "--upstream-host", targetAddress) + + return cmd +} + +func buildHTTPFaultCmd( + targetAddress string, + fault HTTPFault, + duration time.Duration, + options HTTPDisruptionOptions, +) []string { + cmd := []string{ + "xk6-disruptor-agent", + "http", + "-d", utils.DurationSeconds(duration), + } + + // TODO: make port mandatory + if fault.Port != 0 { + cmd = append(cmd, "-t", fmt.Sprint(fault.Port)) + } + + if fault.AverageDelay > 0 { + cmd = append( + cmd, + "-a", + utils.DurationMillSeconds(fault.AverageDelay), + "-v", + utils.DurationMillSeconds(fault.DelayVariation), + ) + } + + if fault.ErrorRate > 0 { + cmd = append( + cmd, + "-e", + fmt.Sprint(fault.ErrorCode), + "-r", + fmt.Sprint(fault.ErrorRate), + ) + if fault.ErrorBody != "" { + cmd = append(cmd, "-b", fault.ErrorBody) + } + } + + if len(fault.Exclude) > 0 { + cmd = append(cmd, "-x", fault.Exclude) + } + + if options.ProxyPort != 0 { + cmd = append(cmd, "-p", fmt.Sprint(options.ProxyPort)) + } + + cmd = append(cmd, "--upstream-host", targetAddress) + + return cmd +} + +func buildCleanupCmd() []string { + return []string{"xk6-disruptor-agent", "cleanup"} +} + +// PodHTTPFaultCommand implements the PodVisitCommands interface for injecting +// HttpFaults in a Pod +type PodHTTPFaultCommand struct { + fault HTTPFault + duration time.Duration + options HTTPDisruptionOptions +} + +// Commands return the command for injecting a HttpFault in a Pod +func (c PodHTTPFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) { + if !utils.HasPort(pod, c.fault.Port) { + return VisitCommands{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, c.fault.Port) + } + + if utils.HasHostNetwork(pod) { + return VisitCommands{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name) + } + + targetAddress, err := utils.PodIP(pod) + if err != nil { + return VisitCommands{}, err + } + + return VisitCommands{ + Exec: buildHTTPFaultCmd(targetAddress, c.fault, c.duration, c.options), + Cleanup: buildCleanupCmd(), + }, nil +} + +// PodGrpcFaultCommand implements the PodVisitCommands interface for injecting GrpcFaults in a Pod +type PodGrpcFaultCommand struct { + fault GrpcFault + duration time.Duration + options GrpcDisruptionOptions +} + +// Commands return the command for injecting a GrpcFault in a Pod +func (c PodGrpcFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) { + if !utils.HasPort(pod, c.fault.Port) { + return VisitCommands{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, c.fault.Port) + } + + targetAddress, err := utils.PodIP(pod) + if err != nil { + return VisitCommands{}, err + } + + return VisitCommands{ + Exec: buildGrpcFaultCmd(targetAddress, c.fault, c.duration, c.options), + Cleanup: buildCleanupCmd(), + }, nil +} + +// ServiceHTTPFaultCommand implements the PodVisitCommands interface for injecting HttpFaults in a Pod +type ServiceHTTPFaultCommand struct { + service corev1.Service + fault HTTPFault + duration time.Duration + options HTTPDisruptionOptions +} + +// Commands return the command for injecting a HttpFault in a Service +func (c ServiceHTTPFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) { + port, err := utils.MapPort(c.service, c.fault.Port, pod) + if err != nil { + return VisitCommands{}, err + } + + if utils.HasHostNetwork(pod) { + return VisitCommands{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name) + } + + // copy fault to change target port for the pod + podFault := c.fault + podFault.Port = port + + targetAddress, err := utils.PodIP(pod) + if err != nil { + return VisitCommands{}, err + } + + return VisitCommands{ + Exec: buildHTTPFaultCmd(targetAddress, podFault, c.duration, c.options), + Cleanup: buildCleanupCmd(), + }, nil +} + +// Cleanup defines the command to execute for cleaning up if command execution fails +func (c ServiceHTTPFaultCommand) Cleanup(_ corev1.Pod) []string { + return buildCleanupCmd() +} + +// ServiceGrpcFaultCommand implements the PodVisitCommands interface for injecting a +// GrpcFault in a Service +type ServiceGrpcFaultCommand struct { + service corev1.Service + fault GrpcFault + duration time.Duration + options GrpcDisruptionOptions +} + +// Commands return the VisitCommands for injecting a GrpcFault in a Service +func (c ServiceGrpcFaultCommand) Commands(pod corev1.Pod) (VisitCommands, error) { + port, err := utils.MapPort(c.service, c.fault.Port, pod) + if err != nil { + return VisitCommands{}, err + } + + podFault := c.fault + podFault.Port = port + + targetAddress, err := utils.PodIP(pod) + if err != nil { + return VisitCommands{}, err + } + + return VisitCommands{ + Exec: buildGrpcFaultCmd(targetAddress, podFault, c.duration, c.options), + Cleanup: buildCleanupCmd(), + }, nil +} diff --git a/pkg/disruptors/visitor_test.go b/pkg/disruptors/commads_test.go similarity index 62% rename from pkg/disruptors/visitor_test.go rename to pkg/disruptors/commads_test.go index e9b71c6f..850b49d5 100644 --- a/pkg/disruptors/visitor_test.go +++ b/pkg/disruptors/commads_test.go @@ -1,20 +1,14 @@ package disruptors import ( - "context" - "sort" "strings" "testing" "time" - "github.com/google/go-cmp/cmp" - "github.com/grafana/xk6-disruptor/pkg/kubernetes" "github.com/grafana/xk6-disruptor/pkg/testutils/command" "github.com/grafana/xk6-disruptor/pkg/testutils/kubernetes/builders" corev1 "k8s.io/api/core/v1" - k8sruntime "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes/fake" ) func buildPodWithPort(name string, portName string, port int32) corev1.Pod { @@ -31,7 +25,7 @@ func buildPodWithPort(name string, portName string, port int32) corev1.Pod { return pod } -func Test_PodHTTPFaultVisitor(t *testing.T) { +func Test_PodHTTPFaultCommandGenerator(t *testing.T) { t.Parallel() testCases := []struct { @@ -159,14 +153,13 @@ func Test_PodHTTPFaultVisitor(t *testing.T) { t.Run(tc.title, func(t *testing.T) { t.Parallel() - visitor := PodHTTPFaultVisitor{ + cmd := PodHTTPFaultCommand{ fault: tc.fault, duration: tc.duration, options: tc.opts, } - cmds, err := visitor.Visit(tc.target) - + cmds, err := cmd.Commands(tc.target) if tc.expectError && err == nil { t.Errorf("should had failed") return @@ -177,15 +170,14 @@ func Test_PodHTTPFaultVisitor(t *testing.T) { return } - exec := strings.Join(cmds.Exec, " ") - if !command.AssertCmdEquals(exec, tc.expectedCmd) { - t.Errorf("expected command: %s got: %s", tc.expectedCmd, exec) + if !command.AssertCmdEquals(strings.Join(cmds.Exec, " "), tc.expectedCmd) { + t.Errorf("expected command: %s got: %s", tc.expectedCmd, cmds.Exec) } }) } } -func Test_PodGrpcPFaultVisitor(t *testing.T) { +func Test_PodGrpcPFaultCommandGenerator(t *testing.T) { t.Parallel() testCases := []struct { @@ -268,13 +260,13 @@ func Test_PodGrpcPFaultVisitor(t *testing.T) { t.Run(tc.title, func(t *testing.T) { t.Parallel() - visitor := PodGrpcFaultVisitor{ + cmd := PodGrpcFaultCommand{ fault: tc.fault, duration: tc.duration, options: tc.opts, } - cmds, err := visitor.Visit(tc.target) + cmds, err := cmd.Commands(tc.target) if tc.expectError && err == nil { t.Errorf("should had failed") @@ -286,151 +278,8 @@ func Test_PodGrpcPFaultVisitor(t *testing.T) { return } - exec := strings.Join(cmds.Exec, " ") - if !command.AssertCmdEquals(exec, tc.expectedCmd) { - t.Errorf("expected command: %s got: %s", tc.expectedCmd, exec) - } - }) - } -} - -func Test_NewPodDisruptor(t *testing.T) { - t.Parallel() - - testCases := []struct { - title string - name string - namespace string - pods []corev1.Pod - selector PodSelector - expectError bool - expected []string - }{ - { - title: "matching pods", - name: "test-svc", - namespace: "test-ns", - pods: []corev1.Pod{ - builders.NewPodBuilder("pod-1"). - WithNamespace("test-ns"). - WithLabel("app", "test"). - WithIP("192.0.2.6"). - Build(), - }, - selector: PodSelector{ - Namespace: "test-ns", - Select: PodAttributes{Labels: map[string]string{ - "app": "test", - }}, - }, - expectError: false, - expected: []string{"pod-1"}, - }, - { - title: "no matching pods", - name: "test-svc", - namespace: "test-ns", - pods: []corev1.Pod{}, - selector: PodSelector{ - Namespace: "test-ns", - Select: PodAttributes{Labels: map[string]string{ - "app": "test", - }}, - }, - expectError: true, - }, - } - - for _, tc := range testCases { - tc := tc - - t.Run(tc.title, func(t *testing.T) { - t.Parallel() - - var objs []k8sruntime.Object - for p := range tc.pods { - objs = append(objs, &tc.pods[p]) - } - - client := fake.NewSimpleClientset(objs...) - k, _ := kubernetes.NewFakeKubernetes(client) - - d, err := NewPodDisruptor( - context.TODO(), - k, - tc.selector, - PodDisruptorOptions{InjectTimeout: -1}, // Disable waiting for injected container to become Running. - ) - - if tc.expectError && err != nil { - return - } - - if !tc.expectError && err != nil { - t.Errorf("unexpected error creating pod disruptor: %v", err) - return - } - - if tc.expectError && err == nil { - t.Errorf("should had failed creating service disruptor") - return - } - - targets, _ := d.Targets(context.TODO()) - sort.Strings(targets) - if diff := cmp.Diff(targets, tc.expected); diff != "" { - t.Errorf("expected targets dot not match returned\n%s", diff) - return - } - }) - } -} - -func Test_PodSelectorString(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - name string - selector PodSelector - expected string - }{ - { - name: "Empty selector", - expected: `all pods in ns "default"`, - }, - { - name: "Only inclusions", - selector: PodSelector{ - Namespace: "testns", - Select: PodAttributes{map[string]string{"foo": "bar"}}, - }, - expected: `pods including(foo=bar) in ns "testns"`, - }, - { - name: "Only exclusions", - selector: PodSelector{ - Namespace: "testns", - Exclude: PodAttributes{map[string]string{"foo": "bar"}}, - }, - expected: `pods excluding(foo=bar) in ns "testns"`, - }, - { - name: "Both inclusions and exclusions", - selector: PodSelector{ - Namespace: "testns", - Select: PodAttributes{map[string]string{"foo": "bar"}}, - Exclude: PodAttributes{map[string]string{"boo": "baa"}}, - }, - expected: `pods including(foo=bar), excluding(boo=baa) in ns "testns"`, - }, - } { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - output := tc.selector.String() - if tc.expected != output { - t.Errorf("expected string does not match output string:\n%s\n%s", tc.expected, output) + if !command.AssertCmdEquals(strings.Join(cmds.Exec, " "), tc.expectedCmd) { + t.Errorf("expected command: %s got: %s", tc.expectedCmd, cmds.Exec) } }) } diff --git a/pkg/disruptors/controller.go b/pkg/disruptors/controller.go index 2c127cec..7aa327f3 100644 --- a/pkg/disruptors/controller.go +++ b/pkg/disruptors/controller.go @@ -14,40 +14,109 @@ import ( corev1 "k8s.io/api/core/v1" ) -// PodVisitor defines the interface for visiting Pods -type PodVisitor interface { - // Visit returns the VisitComands for visiting the Pod - Visit(pod corev1.Pod) (VisitCommands, error) +// PodController uses a PodVisitor to perform a certain action (Visit) on a list of pods. +// The PodVisitor is responsible for executing the action in one target pod, while the PorController +// is responsible for coordinating the action of the PodVisitor on multiple target pods +type PodController struct { + targets []corev1.Pod } -// VisitCommands define the commands used for visiting a Pod +// NewPodController creates a new controller for a collection of pods +func NewPodController(targets []corev1.Pod) *PodController { + return &PodController{ + targets: targets, + } +} + +// Visit allows executing a different command on each target returned by a visiting function +func (c *PodController) Visit(ctx context.Context, visitor PodVisitor) error { + // if there are no targets, nothing to do + if len(c.targets) == 0 { + return nil + } + + // create context for the visit, that can be cancelled in case of error + visitCtx, cancelVisit := context.WithCancel(ctx) + defer cancelVisit() + + // make space to prevent blocking go routines + errCh := make(chan error, len(c.targets)) + + wg := sync.WaitGroup{} + for _, pod := range c.targets { + wg.Add(1) + go func(pod corev1.Pod) { + if err := visitor.Visit(visitCtx, pod); err != nil { + errCh <- err + } + + wg.Done() + }(pod) + } + + wg.Wait() + + select { + case e := <-errCh: + return e + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } +} + +// Targets return the name of the targets +func (c *PodController) Targets(_ context.Context) ([]string, error) { + names := []string{} + for _, pod := range c.targets { + names = append(names, pod.Name) + } + + return names, nil +} + +// VisitCommands contains the commands to be executed when visiting a pod type VisitCommands struct { - // Exec defines the command to be executed - Exec []string - // Cleanup defines the command to execute for cleaning up if command execution fails + Exec []string Cleanup []string } -// AgentController defines the interface for controlling agents in a set of targets -type AgentController interface { - // InjectDisruptorAgent injects the Disruptor agent in the target pods - InjectDisruptorAgent(ctx context.Context) error - // Targets retrieves the names of the target of the controller - Targets(ctx context.Context) ([]string, error) - // Visit allows executing a different command on each target returned by a visiting function - Visit(ctx context.Context, visitor PodVisitor) error +// PodVisitor is the interface implemented by objects that perform actions on a Pod +type PodVisitor interface { + Visit(context.Context, corev1.Pod) error +} + +// PodAgentVisitor implements PodVisitor, performing actions in a Pod by means of running a PodVisitCommand on the pod. +type PodAgentVisitor struct { + helper helpers.PodHelper + options PodAgentVisitorOptions + command PodVisitCommand } -// AgentController controls de agents in a set of target pods -type agentController struct { - helper helpers.PodHelper - namespace string - targets []corev1.Pod - timeout time.Duration +// NewPodAgentVisitor creates a new pod visitor +func NewPodAgentVisitor( + helper helpers.PodHelper, + options PodAgentVisitorOptions, + command PodVisitCommand, +) *PodAgentVisitor { + // FIXME: handling timeout < 0 is required only to allow tests to skip waiting for the agent injection + if options.Timeout == 0 { + options.Timeout = 30 * time.Second + } + if options.Timeout < 0 { + options.Timeout = 0 + } + + return &PodAgentVisitor{ + helper: helper, + options: options, + command: command, + } } -// InjectDisruptorAgent injects the Disruptor agent in the target pods -func (c *agentController) InjectDisruptorAgent(ctx context.Context) error { +// injectDisruptorAgent injects the Disruptor agent in the target pods +func (c *PodAgentVisitor) injectDisruptorAgent(ctx context.Context, pod corev1.Pod) error { var ( rootUser = int64(0) rootGroup = int64(0) @@ -72,135 +141,56 @@ func (c *agentController) InjectDisruptorAgent(ctx context.Context) error { }, } - var wg sync.WaitGroup - // ensure errors channel has enough space to avoid blocking gorutines - errors := make(chan error, len(c.targets)) - for _, pod := range c.targets { - wg.Add(1) - // attach each container asynchronously - go func(podName string) { - defer wg.Done() - - err := c.helper.AttachEphemeralContainer( - ctx, - podName, - agentContainer, - helpers.AttachOptions{ - Timeout: c.timeout, - IgnoreIfExists: true, - }, - ) - if err != nil { - errors <- err - } - }(pod.Name) - } - - wg.Wait() - - select { - case err := <-errors: - return err - default: - return nil - } + return c.helper.AttachEphemeralContainer( + ctx, + pod.Name, + agentContainer, + helpers.AttachOptions{ + Timeout: c.options.Timeout, + IgnoreIfExists: true, + }, + ) } // Visit allows executing a different command on each target returned by a visiting function -func (c *agentController) Visit(ctx context.Context, visitor PodVisitor) error { - // if there are no targets, nothing to do - if len(c.targets) == 0 { - return nil +func (c *PodAgentVisitor) Visit(ctx context.Context, pod corev1.Pod) error { + err := c.injectDisruptorAgent(ctx, pod) + if err != nil { + return fmt.Errorf("injecting agent in the pod %q: %w", pod.Name, err) } - execContext, cancel := context.WithCancel(context.Background()) - defer cancel() - - // ensure errCh channel has enough space to avoid blocking gorutines - errCh := make(chan error, len(c.targets)) - for _, pod := range c.targets { - pod := pod - // visit each target asynchronously - go func() { - errCh <- func(pod corev1.Pod) error { - // get the command to execute in the target - visitCommands, err := visitor.Visit(pod) - if err != nil { - return fmt.Errorf("unable to get command for pod %q: %w", pod.Name, err) - } - - _, stderr, err := c.helper.Exec(execContext, pod.Name, "xk6-agent", visitCommands.Exec, []byte{}) - - // if command failed, ensure the agent execution is terminated - if err != nil && visitCommands.Cleanup != nil { - // we ignore errors because k6 was cancelled, so there's no point in reporting - // use a fresh context because the exec context may have been cancelled or expired - //nolint:contextcheck - _, _, _ = c.helper.Exec(context.TODO(), pod.Name, "xk6-agent", visitCommands.Cleanup, []byte{}) - } - - // if the context is cancelled, it is reported in the main loop - if err != nil && !errors.Is(err, context.Canceled) { - return fmt.Errorf("failed command execution for pod %q: %w \n%s", pod.Name, err, string(stderr)) - } - - return nil - }(pod) - }() + // get the command to execute in the target + commands, err := c.command.Commands(pod) + if err != nil { + return fmt.Errorf("unable to get command for pod %q: %w", pod.Name, err) } - var err error - pending := len(c.targets) - for { - select { - case e := <-errCh: - pending-- - if e != nil { - // cancel ongoing commands - cancel() - // Save first received error as reason for ending execution - err = e - } + _, stderr, err := c.helper.Exec(ctx, pod.Name, "xk6-agent", commands.Exec, []byte{}) - if pending == 0 { - return err - } - case <-ctx.Done(): - // cancel ongoing commands - cancel() - // save the reason for ending execution - err = ctx.Err() - } + if err != nil && commands.Cleanup != nil { + // we ignore errors because we are reporting the reason of the exec failure + // we use a fresh context because the context used in exec may have been cancelled or expired + //nolint:contextcheck + _, _, _ = c.helper.Exec(context.TODO(), pod.Name, "xk6-agent", commands.Cleanup, []byte{}) } -} -// Targets retrieves the list of names of the target pods -func (c *agentController) Targets(_ context.Context) ([]string, error) { - names := []string{} - for _, p := range c.targets { - names = append(names, p.Name) + // if the context is cancelled, don't report error (we assume the caller is reporting this error) + if err != nil && !errors.Is(err, context.Canceled) { + return fmt.Errorf("failed command execution for pod %q: %w \n%s", pod.Name, err, string(stderr)) } - return names, nil + + return nil } -// NewAgentController creates a new controller for a list of target pods -func NewAgentController( - _ context.Context, - helper helpers.PodHelper, - namespace string, - targets []corev1.Pod, - timeout time.Duration, -) AgentController { - if timeout == 0 { - timeout = 30 * time.Second - } - if timeout < 0 { - timeout = 0 - } - return &agentController{ - helper: helper, - namespace: namespace, - targets: targets, - timeout: timeout, - } +// PodAgentVisitorOptions defines the options for the PodVisitor +type PodAgentVisitorOptions struct { + // Defines the timeout for injecting the agent + Timeout time.Duration +} + +// PodVisitCommand is a command that can be run on a given pod. +// Implementations build the VisitCommands according to properties of the pod where it is going to run +type PodVisitCommand interface { + // Commands defines the command to be executed, and optionally a cleanup command + Commands(corev1.Pod) (VisitCommands, error) } diff --git a/pkg/disruptors/controller_test.go b/pkg/disruptors/controller_test.go index 26002826..ec78471b 100644 --- a/pkg/disruptors/controller_test.go +++ b/pkg/disruptors/controller_test.go @@ -2,15 +2,13 @@ package disruptors import ( "context" + "errors" "fmt" - "sort" "strings" "testing" "time" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" "github.com/google/go-cmp/cmp" @@ -18,47 +16,91 @@ import ( "github.com/grafana/xk6-disruptor/pkg/testutils/kubernetes/builders" ) -func Test_InjectAgent(t *testing.T) { +type fakeCommand struct { + err error + exec []string + cleanup []string +} + +func (f fakeCommand) Commands(_ corev1.Pod) (VisitCommands, error) { + return VisitCommands{ + Exec: f.exec, + Cleanup: f.cleanup, + }, f.err +} + +func visitCommands() PodVisitCommand { + return fakeCommand{ + exec: []string{"command"}, + cleanup: []string{"cleanup"}, + } +} + +func Test_PodAgentVisitor(t *testing.T) { t.Parallel() testCases := []struct { - title string - namespace string - pods []corev1.Pod - // Set timeout to -1 to prevent waiting the ephemeral container to be ready, - // as the fake client will not update its status - timeout time.Duration + title string + namespace string + pod corev1.Pod + visitCmds PodVisitCommand + err error + stdout []byte + stderr []byte + options PodAgentVisitorOptions expectError bool + expected []helpers.Command }{ { - title: "Inject ephemeral container", + title: "successful execution", namespace: "test-ns", - pods: []corev1.Pod{ - builders.NewPodBuilder("pod1"). - WithNamespace("test-ns"). - WithIP("192.0.2.6"). - Build(), - builders.NewPodBuilder("pod2"). - WithNamespace("test-ns"). - WithIP("192.0.2.6"). - Build(), + pod: builders.NewPodBuilder("pod1"). + WithNamespace("test-ns"). + WithIP("192.0.2.6"). + Build(), + visitCmds: visitCommands(), + err: nil, + options: PodAgentVisitorOptions{ + Timeout: -1, }, - timeout: -1, expectError: false, + expected: []helpers.Command{ + {Pod: "pod1", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}}, + }, + }, + { + title: "failed execution", + namespace: "test-ns", + pod: builders.NewPodBuilder("pod1"). + WithNamespace("test-ns"). + WithIP("192.0.2.6"). + Build(), + visitCmds: visitCommands(), + err: fmt.Errorf("fake error"), + stderr: []byte("error output"), + options: PodAgentVisitorOptions{ + Timeout: -1, + }, + expectError: true, + expected: []helpers.Command{ + {Pod: "pod1", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}}, + {Pod: "pod1", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"cleanup"}, Stdin: []byte{}}, + }, }, { title: "ephemeral container not ready", namespace: "test-ns", - pods: []corev1.Pod{ - builders.NewPodBuilder("pod1"). - WithNamespace("test-ns"). - Build(), - builders.NewPodBuilder("pod2"). - WithNamespace("test-ns"). - Build(), + pod: builders.NewPodBuilder("pod1"). + WithNamespace("test-ns"). + WithIP("192.0.2.6"). + Build(), + visitCmds: visitCommands(), + err: nil, + options: PodAgentVisitorOptions{ + Timeout: 1, }, - timeout: 1, - expectError: true, // should fail because fake client will not update status + expectError: true, + expected: nil, }, } @@ -68,125 +110,108 @@ func Test_InjectAgent(t *testing.T) { t.Run(tc.title, func(t *testing.T) { t.Parallel() - objs := []runtime.Object{} - for p := range tc.pods { - objs = append(objs, &tc.pods[p]) - } - - client := fake.NewSimpleClientset(objs...) + client := fake.NewSimpleClientset(&tc.pod) executor := helpers.NewFakePodCommandExecutor() helper := helpers.NewPodHelper(client, executor, tc.namespace) - controller := NewAgentController( - context.TODO(), + visitor := NewPodAgentVisitor( helper, - tc.namespace, - tc.pods, - tc.timeout, + tc.options, + tc.visitCmds, ) - err := controller.InjectDisruptorAgent(context.TODO()) + executor.SetResult(tc.stdout, tc.stderr, tc.err) + err := visitor.Visit(context.TODO(), tc.pod) if tc.expectError && err == nil { - t.Errorf("should had failed") - return + t.Fatalf("should had failed") } - if !tc.expectError && err != nil { - t.Errorf("failed: %v", err) + if tc.expectError && err != nil { + // error expected return } - if tc.expectError && err != nil { - return + if !tc.expectError && err != nil { + t.Fatalf("failed unexpectedly: %v", err) } - for _, p := range tc.pods { - pod, err := client.CoreV1(). - Pods(tc.namespace). - Get(context.TODO(), p.Name, metav1.GetOptions{}) - if err != nil { - t.Errorf("failed: %v", err) - return + if tc.expectError && err != nil { + if !strings.Contains(err.Error(), string(tc.stderr)) { + t.Fatalf("returned error message should contain stderr (%q)", string(tc.stderr)) } + } - if len(pod.Spec.EphemeralContainers) == 0 { - t.Errorf("agent container is not attached") - return - } + if diff := cmp.Diff(tc.expected, executor.GetHistory()); diff != "" { + t.Errorf("Expected command did not match returned:\n%s", diff) } }) } } -type fakeVisitor struct { - cmds VisitCommands - err error +type fakePodVisitor struct { + delay time.Duration + err error } -func (v fakeVisitor) Visit(_ corev1.Pod) (VisitCommands, error) { - return v.cmds, v.err +func (f fakePodVisitor) Visit(_ context.Context, _ corev1.Pod) error { + time.Sleep(f.delay) + return f.err } -func Test_VisitPod(t *testing.T) { +var errFailed = errors.New("failed") + +func Test_PodController(t *testing.T) { t.Parallel() testCases := []struct { title string - namespace string - pods []corev1.Pod - visitCmds VisitCommands - err error - stdout []byte - stderr []byte - timeout time.Duration - expectError bool - expected []helpers.Command + targets []corev1.Pod + visitor PodVisitor + expectError error }{ { - title: "successful execution", - namespace: "test-ns", - pods: []corev1.Pod{ + title: "visit pods", + targets: []corev1.Pod{ builders.NewPodBuilder("pod1"). WithNamespace("test-ns"). + WithIP("192.0.2.6"). Build(), builders.NewPodBuilder("pod2"). WithNamespace("test-ns"). + WithIP("192.0.2.7"). Build(), }, - visitCmds: VisitCommands{ - Exec: []string{"command"}, - Cleanup: []string{"cleanup"}, - }, - err: nil, - expectError: false, - expected: []helpers.Command{ - {Pod: "pod1", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}}, - {Pod: "pod2", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}}, - }, + visitor: fakePodVisitor{}, + expectError: nil, }, { - title: "failed execution", - namespace: "test-ns", - pods: []corev1.Pod{ + title: "failed visit command", + targets: []corev1.Pod{ builders.NewPodBuilder("pod1"). WithNamespace("test-ns"). + WithIP("192.0.2.6"). Build(), builders.NewPodBuilder("pod2"). WithNamespace("test-ns"). + WithIP("192.0.2.7"). Build(), }, - visitCmds: VisitCommands{ - Exec: []string{"command"}, - Cleanup: []string{"cleanup"}, - }, - err: fmt.Errorf("fake error"), - stderr: []byte("error output"), - expectError: true, - expected: []helpers.Command{ - {Pod: "pod1", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}}, - {Pod: "pod1", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"cleanup"}, Stdin: []byte{}}, - {Pod: "pod2", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"command"}, Stdin: []byte{}}, - {Pod: "pod2", Container: "xk6-agent", Namespace: "test-ns", Command: []string{"cleanup"}, Stdin: []byte{}}, + visitor: fakePodVisitor{err: errFailed}, + expectError: errFailed, + }, + { + title: "context expired", + targets: []corev1.Pod{ + builders.NewPodBuilder("pod1"). + WithNamespace("test-ns"). + WithIP("192.0.2.6"). + Build(), + builders.NewPodBuilder("pod2"). + WithNamespace("test-ns"). + WithIP("192.0.2.7"). + Build(), }, + visitor: fakePodVisitor{delay: 2 * time.Second}, + expectError: context.DeadlineExceeded, }, } @@ -196,58 +221,14 @@ func Test_VisitPod(t *testing.T) { t.Run(tc.title, func(t *testing.T) { t.Parallel() - objs := []runtime.Object{} + controller := NewPodController(tc.targets) - targets := []corev1.Pod{} - for p := range tc.pods { - objs = append(objs, &tc.pods[p]) - targets = append(targets, tc.pods[p]) - } - client := fake.NewSimpleClientset(objs...) - executor := helpers.NewFakePodCommandExecutor() - helper := helpers.NewPodHelper(client, executor, tc.namespace) - controller := NewAgentController( - context.TODO(), - helper, - tc.namespace, - targets, - tc.timeout, - ) + ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Second) + defer cancel() - executor.SetResult(tc.stdout, tc.stderr, tc.err) - visitor := fakeVisitor{ - cmds: tc.visitCmds, - } - err := controller.Visit(context.TODO(), visitor) - if tc.expectError && err == nil { - t.Fatalf("should had failed") - } - - if !tc.expectError && err != nil { - t.Fatalf("failed unexpectedly: %v", err) - } - - if tc.expectError && err != nil { - if !strings.Contains(err.Error(), string(tc.stderr)) { - t.Fatalf("returned error message should contain stderr (%q)", string(tc.stderr)) - } - } - - // At this point, we either expected no error and got no error, or we got the error we expected. - // In either case, we check the expected commands have been executed. - - sort.Slice(tc.expected, func(i, j int) bool { - return tc.expected[i].Pod < tc.expected[j].Pod - }) - - history := executor.GetHistory() - - sort.Slice(history, func(i, j int) bool { - return history[i].Pod < history[j].Pod - }) - - if diff := cmp.Diff(tc.expected, history); diff != "" { - t.Errorf("Expected command did not match returned:\n%s", diff) + err := controller.Visit(ctx, tc.visitor) + if !errors.Is(err, tc.expectError) { + t.Fatalf("expected %v got %v", tc.expectError, err) } }) } diff --git a/pkg/disruptors/pod.go b/pkg/disruptors/pod.go index de229b72..53b4be5b 100644 --- a/pkg/disruptors/pod.go +++ b/pkg/disruptors/pod.go @@ -35,9 +35,11 @@ type PodDisruptorOptions struct { InjectTimeout time.Duration `js:"injectTimeout"` } -// podDisruptor is an instance of a PodDisruptor initialized with a list of target pods +// podDisruptor is an instance of a PodDisruptor that uses a PodController to interact with target pods type podDisruptor struct { - controller AgentController + helper helpers.PodHelper + options PodDisruptorOptions + controller *PodController } // PodSelector defines the criteria for selecting a pod for disruption @@ -133,19 +135,11 @@ func NewPodDisruptor( return nil, fmt.Errorf("finding pods matching '%s': %w", selector, ErrSelectorNoPods) } - controller := NewAgentController( - ctx, - helper, - namespace, - targets, - options.InjectTimeout, - ) - err = controller.InjectDisruptorAgent(ctx) - if err != nil { - return nil, err - } + controller := NewPodController(targets) return &podDisruptor{ + helper: helper, + options: options, controller: controller, }, nil } @@ -166,11 +160,18 @@ func (d *podDisruptor) InjectHTTPFaults( fault.Port = DefaultTargetPort } - visitor := PodHTTPFaultVisitor{ + command := PodHTTPFaultCommand{ fault: fault, duration: duration, options: options, } + + visitor := NewPodAgentVisitor( + d.helper, + PodAgentVisitorOptions{Timeout: d.options.InjectTimeout}, + command, + ) + return d.controller.Visit(ctx, visitor) } @@ -181,11 +182,17 @@ func (d *podDisruptor) InjectGrpcFaults( duration time.Duration, options GrpcDisruptionOptions, ) error { - visitor := PodGrpcFaultVisitor{ + command := PodGrpcFaultCommand{ fault: fault, duration: duration, options: options, } + visitor := NewPodAgentVisitor( + d.helper, + PodAgentVisitorOptions{Timeout: d.options.InjectTimeout}, + command, + ) + return d.controller.Visit(ctx, visitor) } diff --git a/pkg/disruptors/pod_test.go b/pkg/disruptors/pod_test.go new file mode 100644 index 00000000..2c936a98 --- /dev/null +++ b/pkg/disruptors/pod_test.go @@ -0,0 +1,151 @@ +package disruptors + +import ( + "context" + "sort" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/grafana/xk6-disruptor/pkg/kubernetes" + "github.com/grafana/xk6-disruptor/pkg/testutils/kubernetes/builders" + + corev1 "k8s.io/api/core/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" +) + +func Test_NewPodDisruptor(t *testing.T) { + t.Parallel() + + testCases := []struct { + title string + pods []corev1.Pod + selector PodSelector + expectError bool + expected []string + }{ + { + title: "matching pods", + pods: []corev1.Pod{ + builders.NewPodBuilder("pod-1"). + WithNamespace("test-ns"). + WithLabel("app", "test"). + WithIP("192.0.2.6"). + Build(), + }, + selector: PodSelector{ + Namespace: "test-ns", + Select: PodAttributes{Labels: map[string]string{ + "app": "test", + }}, + }, + expectError: false, + expected: []string{"pod-1"}, + }, + { + title: "no matching pods", + pods: []corev1.Pod{}, + selector: PodSelector{ + Namespace: "test-ns", + Select: PodAttributes{Labels: map[string]string{ + "app": "test", + }}, + }, + expectError: true, + }, + } + + for _, tc := range testCases { + tc := tc + + t.Run(tc.title, func(t *testing.T) { + t.Parallel() + + var objs []k8sruntime.Object + for p := range tc.pods { + objs = append(objs, &tc.pods[p]) + } + + client := fake.NewSimpleClientset(objs...) + k, _ := kubernetes.NewFakeKubernetes(client) + + d, err := NewPodDisruptor( + context.TODO(), + k, + tc.selector, + PodDisruptorOptions{InjectTimeout: -1}, // Disable waiting for injected container to become Running. + ) + + if tc.expectError && err != nil { + return + } + + if !tc.expectError && err != nil { + t.Errorf("unexpected error creating pod disruptor: %v", err) + return + } + + if tc.expectError && err == nil { + t.Errorf("should had failed creating service disruptor") + return + } + + targets, _ := d.Targets(context.TODO()) + sort.Strings(targets) + if diff := cmp.Diff(targets, tc.expected); diff != "" { + t.Errorf("expected targets dot not match returned\n%s", diff) + return + } + }) + } +} + +func Test_PodSelectorString(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + selector PodSelector + expected string + }{ + { + name: "Empty selector", + expected: `all pods in ns "default"`, + }, + { + name: "Only inclusions", + selector: PodSelector{ + Namespace: "testns", + Select: PodAttributes{map[string]string{"foo": "bar"}}, + }, + expected: `pods including(foo=bar) in ns "testns"`, + }, + { + name: "Only exclusions", + selector: PodSelector{ + Namespace: "testns", + Exclude: PodAttributes{map[string]string{"foo": "bar"}}, + }, + expected: `pods excluding(foo=bar) in ns "testns"`, + }, + { + name: "Both inclusions and exclusions", + selector: PodSelector{ + Namespace: "testns", + Select: PodAttributes{map[string]string{"foo": "bar"}}, + Exclude: PodAttributes{map[string]string{"boo": "baa"}}, + }, + expected: `pods including(foo=bar), excluding(boo=baa) in ns "testns"`, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + output := tc.selector.String() + if tc.expected != output { + t.Errorf("expected string does not match output string:\n%s\n%s", tc.expected, output) + } + }) + } +} diff --git a/pkg/disruptors/service.go b/pkg/disruptors/service.go index 9a4ae4b2..d2938e2e 100644 --- a/pkg/disruptors/service.go +++ b/pkg/disruptors/service.go @@ -7,6 +7,7 @@ import ( "time" "github.com/grafana/xk6-disruptor/pkg/kubernetes" + "github.com/grafana/xk6-disruptor/pkg/kubernetes/helpers" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,7 +32,9 @@ type ServiceDisruptorOptions struct { // serviceDisruptor is an instance of a ServiceDisruptor type serviceDisruptor struct { service corev1.Service - controller AgentController + helper helpers.PodHelper + options ServiceDisruptorOptions + controller *PodController } // NewServiceDisruptor creates a new instance of a ServiceDisruptor that targets the given service @@ -62,22 +65,14 @@ func NewServiceDisruptor( return nil, fmt.Errorf("creating disruptor for service %s/%s: %w", service, namespace, ErrServiceNoTargets) } - ph := k8s.PodHelper(namespace) - controller := NewAgentController( - ctx, - ph, - namespace, - targets, - options.InjectTimeout, - ) + helper := k8s.PodHelper(namespace) - err = controller.InjectDisruptorAgent(ctx) - if err != nil { - return nil, err - } + controller := NewPodController(targets) return &serviceDisruptor{ service: *svc, + helper: helper, + options: options, controller: controller, }, nil } @@ -88,13 +83,19 @@ func (d *serviceDisruptor) InjectHTTPFaults( duration time.Duration, options HTTPDisruptionOptions, ) error { - visitor := ServiceHTTPFaultVisitor{ + command := ServiceHTTPFaultCommand{ service: d.service, fault: fault, duration: duration, options: options, } + visitor := NewPodAgentVisitor( + d.helper, + PodAgentVisitorOptions{Timeout: d.options.InjectTimeout}, + command, + ) + return d.controller.Visit(ctx, visitor) } @@ -104,13 +105,19 @@ func (d *serviceDisruptor) InjectGrpcFaults( duration time.Duration, options GrpcDisruptionOptions, ) error { - visitor := ServiceGrpcFaultVisitor{ + command := ServiceGrpcFaultCommand{ service: d.service, fault: fault, duration: duration, options: options, } + visitor := NewPodAgentVisitor( + d.helper, + PodAgentVisitorOptions{Timeout: d.options.InjectTimeout}, + command, + ) + return d.controller.Visit(ctx, visitor) } diff --git a/pkg/disruptors/visitor.go b/pkg/disruptors/visitor.go deleted file mode 100644 index 6a83cbcc..00000000 --- a/pkg/disruptors/visitor.go +++ /dev/null @@ -1,132 +0,0 @@ -package disruptors - -import ( - "fmt" - "time" - - "github.com/grafana/xk6-disruptor/pkg/utils" - corev1 "k8s.io/api/core/v1" -) - -// PodHTTPFaultVisitor implements the Visitor interface for injecting HttpFaults in a Pod -type PodHTTPFaultVisitor struct { - fault HTTPFault - duration time.Duration - options HTTPDisruptionOptions -} - -// Visit return the VisitCommands for injecting a HttpFault in a Pod -func (i PodHTTPFaultVisitor) Visit(pod corev1.Pod) (VisitCommands, error) { - if !utils.HasPort(pod, i.fault.Port) { - return VisitCommands{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, i.fault.Port) - } - - if utils.HasHostNetwork(pod) { - return VisitCommands{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name) - } - - targetAddress, err := utils.PodIP(pod) - if err != nil { - return VisitCommands{}, err - } - - visitCommands := VisitCommands{ - Exec: buildHTTPFaultCmd(targetAddress, i.fault, i.duration, i.options), - Cleanup: buildCleanupCmd(), - } - - return visitCommands, nil -} - -// PodGrpcFaultVisitor implements the Visitor interface for injecting GrpcFaults in a Pod -type PodGrpcFaultVisitor struct { - fault GrpcFault - duration time.Duration - options GrpcDisruptionOptions -} - -// Visit return the VisitCommands for injecting a GrpcFault in a Pod -func (i PodGrpcFaultVisitor) Visit(pod corev1.Pod) (VisitCommands, error) { - if !utils.HasPort(pod, i.fault.Port) { - return VisitCommands{}, fmt.Errorf("pod %q does not expose port %d", pod.Name, i.fault.Port) - } - - targetAddress, err := utils.PodIP(pod) - if err != nil { - return VisitCommands{}, err - } - - visitCommands := VisitCommands{ - Exec: buildGrpcFaultCmd(targetAddress, i.fault, i.duration, i.options), - Cleanup: buildCleanupCmd(), - } - - return visitCommands, nil -} - -// ServiceHTTPFaultVisitor implements the Visitor interface for injecting HttpFaults in a Pod -type ServiceHTTPFaultVisitor struct { - service corev1.Service - fault HTTPFault - duration time.Duration - options HTTPDisruptionOptions -} - -// Visit return the VisitCommands for injecting a HttpFault in a Service -func (i ServiceHTTPFaultVisitor) Visit(pod corev1.Pod) (VisitCommands, error) { - port, err := utils.MapPort(i.service, i.fault.Port, pod) - if err != nil { - return VisitCommands{}, err - } - - if utils.HasHostNetwork(pod) { - return VisitCommands{}, fmt.Errorf("pod %q cannot be safely injected as it has hostNetwork set to true", pod.Name) - } - - // copy fault to change target port for the pod - podFault := i.fault - podFault.Port = port - - targetAddress, err := utils.PodIP(pod) - if err != nil { - return VisitCommands{}, err - } - - visitCommands := VisitCommands{ - Exec: buildHTTPFaultCmd(targetAddress, podFault, i.duration, i.options), - Cleanup: buildCleanupCmd(), - } - - return visitCommands, nil -} - -// ServiceGrpcFaultVisitor implements the Visitor interface for injecting a GrpcFault in a Service -type ServiceGrpcFaultVisitor struct { - service corev1.Service - fault GrpcFault - duration time.Duration - options GrpcDisruptionOptions -} - -// Visit return the VisitCommands for injecting a GrpcFault in a Pod -func (i ServiceGrpcFaultVisitor) Visit(pod corev1.Pod) (VisitCommands, error) { - port, err := utils.MapPort(i.service, i.fault.Port, pod) - if err != nil { - return VisitCommands{}, err - } - - podFault := i.fault - podFault.Port = port - - targetAddress, err := utils.PodIP(pod) - if err != nil { - return VisitCommands{}, err - } - - visitCommands := VisitCommands{ - Exec: buildGrpcFaultCmd(targetAddress, podFault, i.duration, i.options), - Cleanup: buildCleanupCmd(), - } - - return visitCommands, nil -}