Skip to content

Commit

Permalink
Merge pull request kubernetes-sigs#1468 from saschagrunert/interrupta…
Browse files Browse the repository at this point in the history
…ble-all

Make all RPC's interruptable
  • Loading branch information
k8s-ci-robot authored Jun 27, 2024
2 parents 198050c + 34b940c commit db407a2
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 31 deletions.
4 changes: 3 additions & 1 deletion cmd/crictl/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ func Attach(ctx context.Context, client internalapi.RuntimeService, opts attachO
Stderr: !opts.tty,
}
logrus.Debugf("AttachRequest: %v", request)
r, err := client.Attach(ctx, request)
r, err := InterruptableRPC(ctx, func(ctx context.Context) (*pb.AttachResponse, error) {
return client.Attach(ctx, request)
})
logrus.Debugf("AttachResponse: %v", r)
if err != nil {
return err
Expand Down
40 changes: 30 additions & 10 deletions cmd/crictl/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,9 @@ var removeContainerCommand = &cli.Command{

ids := ctx.Args().Slice()
if ctx.Bool("all") {
r, err := runtimeClient.ListContainers(context.TODO(), nil)
r, err := InterruptableRPC(nil, func(ctx context.Context) ([]*pb.Container, error) {
return runtimeClient.ListContainers(ctx, nil)
})
if err != nil {
return err
}
Expand All @@ -395,7 +397,9 @@ var removeContainerCommand = &cli.Command{

errored := false
for _, id := range ids {
resp, err := runtimeClient.ContainerStatus(context.TODO(), id, false)
resp, err := InterruptableRPC(nil, func(ctx context.Context) (*pb.ContainerStatusResponse, error) {
return runtimeClient.ContainerStatus(ctx, id, false)
})
if err != nil {
logrus.Error(err)
errored = true
Expand Down Expand Up @@ -778,7 +782,9 @@ func CreateContainer(
SandboxConfig: podConfig,
}
logrus.Debugf("CreateContainerRequest: %v", request)
r, err := rClient.CreateContainer(context.TODO(), opts.podID, config, podConfig)
r, err := InterruptableRPC(nil, func(ctx context.Context) (string, error) {
return rClient.CreateContainer(ctx, opts.podID, config, podConfig)
})
logrus.Debugf("CreateContainerResponse: %v", r)
if err != nil {
return "", err
Expand All @@ -792,7 +798,9 @@ func StartContainer(client internalapi.RuntimeService, id string) error {
if id == "" {
return errors.New("ID cannot be empty")
}
if err := client.StartContainer(context.TODO(), id); err != nil {
if _, err := InterruptableRPC(nil, func(ctx context.Context) (any, error) {
return nil, client.StartContainer(ctx, id)
}); err != nil {
return err
}
fmt.Println(id)
Expand Down Expand Up @@ -849,7 +857,9 @@ func UpdateContainerResources(client internalapi.RuntimeService, id string, opts
}
logrus.Debugf("UpdateContainerResourcesRequest: %v", request)
resources := &pb.ContainerResources{Linux: request.Linux}
if err := client.UpdateContainerResources(context.TODO(), id, resources); err != nil {
if _, err := InterruptableRPC(nil, func(ctx context.Context) (any, error) {
return nil, client.UpdateContainerResources(ctx, id, resources)
}); err != nil {
return err
}
fmt.Println(id)
Expand All @@ -862,7 +872,9 @@ func StopContainer(client internalapi.RuntimeService, id string, timeout int64)
if id == "" {
return errors.New("ID cannot be empty")
}
if err := client.StopContainer(context.TODO(), id, timeout); err != nil {
if _, err := InterruptableRPC(nil, func(ctx context.Context) (any, error) {
return nil, client.StopContainer(ctx, id, timeout)
}); err != nil {
return err
}
fmt.Println(id)
Expand All @@ -883,7 +895,9 @@ func CheckpointContainer(
Location: export,
}
logrus.Debugf("CheckpointContainerRequest: %v", request)
err := rClient.CheckpointContainer(context.TODO(), request)
_, err := InterruptableRPC(nil, func(ctx context.Context) (*pb.ImageFsInfoResponse, error) {
return nil, rClient.CheckpointContainer(ctx, request)
})
if err != nil {
return err
}
Expand All @@ -897,7 +911,9 @@ func RemoveContainer(client internalapi.RuntimeService, id string) error {
if id == "" {
return errors.New("ID cannot be empty")
}
if err := client.RemoveContainer(context.TODO(), id); err != nil {
if _, err := InterruptableRPC(nil, func(ctx context.Context) (any, error) {
return nil, client.RemoveContainer(ctx, id)
}); err != nil {
return err
}
fmt.Println(id)
Expand Down Expand Up @@ -950,7 +966,9 @@ func ContainerStatus(client internalapi.RuntimeService, id, output string, tmplS
Verbose: verbose,
}
logrus.Debugf("ContainerStatusRequest: %v", request)
r, err := client.ContainerStatus(context.TODO(), id, verbose)
r, err := InterruptableRPC(nil, func(ctx context.Context) (*pb.ContainerStatusResponse, error) {
return client.ContainerStatus(ctx, id, verbose)
})
logrus.Debugf("ContainerStatusResponse: %v", r)
if err != nil {
return err
Expand Down Expand Up @@ -1053,7 +1071,9 @@ func ListContainers(runtimeClient internalapi.RuntimeService, imageClient intern
if opts.labels != nil {
filter.LabelSelector = opts.labels
}
r, err := runtimeClient.ListContainers(context.TODO(), filter)
r, err := InterruptableRPC(nil, func(ctx context.Context) ([]*pb.Container, error) {
return runtimeClient.ListContainers(ctx, filter)
})
logrus.Debugf("ListContainerResponse: %v", r)
if err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion cmd/crictl/container_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ func (d containerStatsDisplayer) displayStats(ctx context.Context, client intern

func getContainerStats(ctx context.Context, client internalapi.RuntimeService, request *pb.ListContainerStatsRequest) (*pb.ListContainerStatsResponse, error) {
logrus.Debugf("ListContainerStatsRequest: %v", request)
r, err := client.ListContainerStats(context.TODO(), request.Filter)
r, err := InterruptableRPC(ctx, func(ctx context.Context) ([]*pb.ContainerStats, error) {
return client.ListContainerStats(ctx, request.Filter)
})
logrus.Debugf("ListContainerResponse: %v", r)
if err != nil {
return nil, err
Expand Down
19 changes: 15 additions & 4 deletions cmd/crictl/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,21 @@ func ExecSync(client internalapi.RuntimeService, opts execOptions) (int, error)
}
logrus.Debugf("ExecSyncRequest: %v", request)
timeoutDuration := time.Duration(opts.timeout) * time.Second
stdout, stderr, err := client.ExecSync(context.TODO(), opts.id, opts.cmd, timeoutDuration)
type stdio struct {
stdout, stderr []byte
}
io, err := InterruptableRPC(nil, func(ctx context.Context) (*stdio, error) {
stdout, stderr, err := client.ExecSync(ctx, opts.id, opts.cmd, timeoutDuration)
if err != nil {
return nil, err
}
return &stdio{stdout, stderr}, nil
})
if err != nil {
return 1, err
}
fmt.Println(string(stdout))
fmt.Println(string(stderr))
fmt.Println(string(io.stdout))
fmt.Println(string(io.stderr))
return 0, nil
}

Expand All @@ -150,7 +159,9 @@ func Exec(ctx context.Context, client internalapi.RuntimeService, opts execOptio
}

logrus.Debugf("ExecRequest: %v", request)
r, err := client.Exec(ctx, request)
r, err := InterruptableRPC(ctx, func(ctx context.Context) (*pb.ExecResponse, error) {
return client.Exec(ctx, request)
})
logrus.Debugf("ExecResponse: %v", r)
if err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion cmd/crictl/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,9 @@ var removeImageCommand = &cli.Command{

// Add all available images to the ID selector
if all || prune {
r, err := imageClient.ListImages(context.TODO(), nil)
r, err := InterruptableRPC(nil, func(ctx context.Context) ([]*pb.Image, error) {
return imageClient.ListImages(ctx, nil)
})
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/crictl/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ var runtimeStatusCommand = &cli.Command{
func Info(cliContext *cli.Context, client internalapi.RuntimeService) error {
request := &pb.StatusRequest{Verbose: !cliContext.Bool("quiet")}
logrus.Debugf("StatusRequest: %v", request)
r, err := client.Status(context.TODO(), request.Verbose)
r, err := InterruptableRPC(nil, func(ctx context.Context) (*pb.StatusResponse, error) {
return client.Status(context.TODO(), request.Verbose)
})
logrus.Debugf("StatusResponse: %v", r)
if err != nil {
return err
Expand Down
5 changes: 4 additions & 1 deletion cmd/crictl/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/urfave/cli/v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pb "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/cri-client/pkg/logs"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -100,7 +101,9 @@ var logsCommand = &cli.Command{
SinceTime: since,
Timestamps: timestamp,
}, time.Now())
status, err := runtimeService.ContainerStatus(context.TODO(), containerID, false)
status, err := InterruptableRPC(nil, func(ctx context.Context) (*pb.ContainerStatusResponse, error) {
return runtimeService.ContainerStatus(ctx, containerID, false)
})
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/crictl/pod_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ func (p *podMetricsDisplayer) displayPodMetrics(
}

func podSandboxMetrics(client cri.RuntimeService) ([]*pb.PodSandboxMetrics, error) {
metrics, err := client.ListPodSandboxMetrics(context.TODO())
metrics, err := InterruptableRPC(nil, func(ctx context.Context) ([]*pb.PodSandboxMetrics, error) {
return client.ListPodSandboxMetrics(ctx)
})
if err != nil {
return nil, fmt.Errorf("list pod sandbox metrics: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/crictl/pod_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,9 @@ func getPodSandboxStats(
) ([]*pb.PodSandboxStats, error) {
logrus.Debugf("PodSandboxStatsFilter: %v", filter)

stats, err := client.ListPodSandboxStats(context.TODO(), filter)
stats, err := InterruptableRPC(nil, func(ctx context.Context) ([]*pb.PodSandboxStats, error) {
return client.ListPodSandboxStats(ctx, filter)
})
if err != nil {
return nil, fmt.Errorf("list pod sandbox stats: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/crictl/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ func PortForward(client internalapi.RuntimeService, opts portforwardOptions) err
PodSandboxId: opts.id,
}
logrus.Debugf("PortForwardRequest: %v", request)
r, err := client.PortForward(context.TODO(), request)
r, err := InterruptableRPC(nil, func(ctx context.Context) (*pb.PortForwardResponse, error) {
return client.PortForward(ctx, request)
})
logrus.Debugf("PortForwardResponse; %v", r)
if err != nil {
return err
Expand Down
5 changes: 4 additions & 1 deletion cmd/crictl/runtime_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/urfave/cli/v2"
internalapi "k8s.io/cri-api/pkg/apis"
pb "k8s.io/cri-api/pkg/apis/runtime/v1"
)

var runtimeConfigCommand = &cli.Command{
Expand All @@ -44,7 +45,9 @@ var runtimeConfigCommand = &cli.Command{

// Attach sends an AttachRequest to server, and parses the returned AttachResponse
func runtimeConfig(client internalapi.RuntimeService) error {
resp, err := client.RuntimeConfig(context.TODO())
resp, err := InterruptableRPC(nil, func(ctx context.Context) (*pb.RuntimeConfigResponse, error) {
return client.RuntimeConfig(ctx)
})
if err != nil {
return fmt.Errorf("call RuntimeConfig RPC: %w", err)
}
Expand Down
28 changes: 21 additions & 7 deletions cmd/crictl/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ var removePodCommand = &cli.Command{

ids := ctx.Args().Slice()
if ctx.Bool("all") {
r, err := runtimeClient.ListPodSandbox(context.TODO(), nil)
r, err := InterruptableRPC(nil, func(ctx context.Context) ([]*pb.PodSandbox, error) {
return runtimeClient.ListPodSandbox(ctx, nil)
})
if err != nil {
return err
}
Expand All @@ -160,7 +162,9 @@ var removePodCommand = &cli.Command{
for _, id := range ids {
podId := id
funcs = append(funcs, func() error {
resp, err := runtimeClient.PodSandboxStatus(context.TODO(), podId, false)
resp, err := InterruptableRPC(nil, func(ctx context.Context) (*pb.PodSandboxStatusResponse, error) {
return runtimeClient.PodSandboxStatus(ctx, podId, false)
})
if err != nil {
return fmt.Errorf("getting sandbox status of pod %q: %w", podId, err)
}
Expand Down Expand Up @@ -327,7 +331,9 @@ func RunPodSandbox(client internalapi.RuntimeService, config *pb.PodSandboxConfi
RuntimeHandler: runtime,
}
logrus.Debugf("RunPodSandboxRequest: %v", request)
r, err := client.RunPodSandbox(context.TODO(), config, runtime)
r, err := InterruptableRPC(nil, func(ctx context.Context) (string, error) {
return client.RunPodSandbox(ctx, config, runtime)
})
logrus.Debugf("RunPodSandboxResponse: %v", r)
if err != nil {
return "", err
Expand All @@ -341,7 +347,9 @@ func StopPodSandbox(client internalapi.RuntimeService, id string) error {
if id == "" {
return errors.New("ID cannot be empty")
}
if err := client.StopPodSandbox(context.TODO(), id); err != nil {
if _, err := InterruptableRPC(nil, func(ctx context.Context) (any, error) {
return nil, client.StopPodSandbox(ctx, id)
}); err != nil {
return err
}

Expand All @@ -355,7 +363,9 @@ func RemovePodSandbox(client internalapi.RuntimeService, id string) error {
if id == "" {
return errors.New("ID cannot be empty")
}
if err := client.RemovePodSandbox(context.TODO(), id); err != nil {
if _, err := InterruptableRPC(nil, func(ctx context.Context) (any, error) {
return nil, client.RemovePodSandbox(ctx, id)
}); err != nil {
return err
}
fmt.Printf("Removed sandbox %s\n", id)
Expand Down Expand Up @@ -394,7 +404,9 @@ func PodSandboxStatus(client internalapi.RuntimeService, id, output string, quie
Verbose: verbose,
}
logrus.Debugf("PodSandboxStatusRequest: %v", request)
r, err := client.PodSandboxStatus(context.TODO(), id, verbose)
r, err := InterruptableRPC(nil, func(ctx context.Context) (*pb.PodSandboxStatusResponse, error) {
return client.PodSandboxStatus(ctx, id, verbose)
})
logrus.Debugf("PodSandboxStatusResponse: %v", r)
if err != nil {
return err
Expand Down Expand Up @@ -483,7 +495,9 @@ func ListPodSandboxes(client internalapi.RuntimeService, opts listOptions) error
Filter: filter,
}
logrus.Debugf("ListPodSandboxRequest: %v", request)
r, err := client.ListPodSandbox(context.TODO(), filter)
r, err := InterruptableRPC(nil, func(ctx context.Context) ([]*pb.PodSandbox, error) {
return client.ListPodSandbox(ctx, filter)
})
logrus.Debugf("ListPodSandboxResponse: %v", r)
if err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion cmd/crictl/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ var runtimeVersionCommand = &cli.Command{
func Version(client internalapi.RuntimeService, version string) error {
request := &pb.VersionRequest{Version: version}
logrus.Debugf("VersionRequest: %v", request)
r, err := client.Version(context.TODO(), version)
r, err := InterruptableRPC(nil, func(ctx context.Context) (*pb.VersionResponse, error) {
return client.Version(ctx, version)
})
logrus.Debugf("VersionResponse: %v", r)
if err != nil {
return err
Expand Down

0 comments on commit db407a2

Please sign in to comment.