Skip to content

Commit

Permalink
wip: idempotency
Browse files Browse the repository at this point in the history
  • Loading branch information
lionello committed Jul 5, 2024
1 parent 1442845 commit e0b86c6
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 251 deletions.
3 changes: 3 additions & 0 deletions src/gen/io/defang/v1/fabric_connect.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export declare const FabricController: {
readonly I: typeof Empty,
readonly O: typeof Empty,
readonly kind: MethodKind.Unary,
readonly idempotency: MethodIdempotency.Idempotent,
},
/**
* @generated from rpc io.defang.v1.FabricController.Tail
Expand All @@ -62,6 +63,7 @@ export declare const FabricController: {
readonly I: typeof TailRequest,
readonly O: typeof TailResponse,
readonly kind: MethodKind.ServerStreaming,
readonly idempotency: MethodIdempotency.NoSideEffects,
},
/**
* @generated from rpc io.defang.v1.FabricController.Update
Expand Down Expand Up @@ -119,6 +121,7 @@ export declare const FabricController: {
readonly I: typeof SubscribeRequest,
readonly O: typeof SubscribeResponse,
readonly kind: MethodKind.ServerStreaming,
readonly idempotency: MethodIdempotency.NoSideEffects,
},
/**
* rpc Promote(google.protobuf.Empty) returns (google.protobuf.Empty);
Expand Down
3 changes: 3 additions & 0 deletions src/gen/io/defang/v1/fabric_connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export const FabricController = {
I: Empty,
O: Empty,
kind: MethodKind.Unary,
idempotency: MethodIdempotency.Idempotent,
},
/**
* @generated from rpc io.defang.v1.FabricController.Tail
Expand All @@ -62,6 +63,7 @@ export const FabricController = {
I: TailRequest,
O: TailResponse,
kind: MethodKind.ServerStreaming,
idempotency: MethodIdempotency.NoSideEffects,
},
/**
* @generated from rpc io.defang.v1.FabricController.Update
Expand Down Expand Up @@ -119,6 +121,7 @@ export const FabricController = {
I: SubscribeRequest,
O: SubscribeResponse,
kind: MethodKind.ServerStreaming,
idempotency: MethodIdempotency.NoSideEffects,
},
/**
* rpc Promote(google.protobuf.Empty) returns (google.protobuf.Empty);
Expand Down
4 changes: 2 additions & 2 deletions src/gen/io/defang/v1/fabric_pb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ export declare class DeployRequest extends Message<DeployRequest> {
project: string;

/**
* @generated from field: google.protobuf.Struct compose = 3;
* @generated from field: string compose_yaml = 3;
*/
compose?: Struct;
composeYaml: string;

constructor(data?: PartialMessage<DeployRequest>);

Expand Down
2 changes: 1 addition & 1 deletion src/gen/io/defang/v1/fabric_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export const DeployRequest = /*@__PURE__*/ proto3.makeMessageType(
() => [
{ no: 1, name: "services", kind: "message", T: Service, repeated: true },
{ no: 2, name: "project", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 3, name: "compose", kind: "message", T: Struct },
{ no: 3, name: "compose_yaml", kind: "scalar", T: 9 /* ScalarType.STRING */ },
],
);

Expand Down
2 changes: 1 addition & 1 deletion src/pkg/cli/client/retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Retrier struct{}
func (Retrier) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
res, err := next(ctx, req)
if connect.CodeOf(err) == connect.CodeUnavailable {
if connect.CodeOf(err) == connect.CodeUnavailable || req.Spec().IdempotencyLevel == connect.IdempotencyNoSideEffects {
// Retry once after a 1 second sleep
pkg.SleepWithContext(ctx, 1*time.Second)
res, err = next(ctx, req)
Expand Down
67 changes: 54 additions & 13 deletions src/pkg/cli/client/retrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"net/http/httptest"
"testing"
"time"

defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1"
"github.com/DefangLabs/defang/src/protos/io/defang/v1/defangv1connect"
Expand All @@ -13,18 +14,28 @@ import (

type grpcMockHandler struct {
defangv1connect.UnimplementedFabricControllerHandler
tries int
getTries int
deployTries int
tailTries int
}

func (g *grpcMockHandler) Get(context.Context, *connect.Request[defangv1.ServiceID]) (*connect.Response[defangv1.ServiceInfo], error) {
println(time.Now().Format(time.RFC3339Nano), "Get")
g.getTries++
return nil, connect.NewError(connect.CodeUnavailable, errors.New("unavailable"))
}

func (g *grpcMockHandler) Deploy(context.Context, *connect.Request[defangv1.DeployRequest]) (*connect.Response[defangv1.DeployResponse], error) {
g.tries++
println(time.Now().Format(time.RFC3339Nano), "Deploy")
g.deployTries++
return nil, connect.NewError(connect.CodeUnavailable, errors.New("unavailable"))
}

// func (g *grpcMockHandler) Tail(ctx context.Context, r *connect.Request[defangv1.TailRequest], s *connect.ServerStream[defangv1.TailResponse]) error {
// g.tries++
// return connect.NewError(connect.CodeUnavailable, errors.New("unavailable"))
// }
func (g *grpcMockHandler) Tail(ctx context.Context, r *connect.Request[defangv1.TailRequest], s *connect.ServerStream[defangv1.TailResponse]) error {
println(time.Now().Format(time.RFC3339Nano), "Tail")
g.tailTries++
return connect.NewError(connect.CodeUnavailable, errors.New("unavailable"))
}

func TestRetrier(t *testing.T) {
fabricServer := &grpcMockHandler{}
Expand All @@ -35,11 +46,41 @@ func TestRetrier(t *testing.T) {

fabricClient := defangv1connect.NewFabricControllerClient(server.Client(), server.URL, connect.WithGRPC(), connect.WithInterceptors(Retrier{}))

_, err := fabricClient.Deploy(context.Background(), connect.NewRequest(&defangv1.DeployRequest{}))
if err == nil {
t.Fatal("expected error")
}
if fabricServer.tries != 2 {
t.Fatalf("expected 2 tries, got %d", fabricServer.tries)
}
t.Run("Unary idempotent", func(t *testing.T) {
_, err := fabricClient.Get(context.Background(), connect.NewRequest(&defangv1.ServiceID{}))
if err == nil {
t.Fatal("expected error")
}
if fabricServer.getTries != 2 {
t.Fatalf("expected 2 tries, got %d", fabricServer.getTries)
}
})

t.Run("Unary", func(t *testing.T) {
_, err := fabricClient.Deploy(context.Background(), connect.NewRequest(&defangv1.DeployRequest{}))
if err == nil {
t.Fatal("expected error")
}
if fabricServer.deployTries != 2 {
t.Fatalf("expected 2 tries, got %d", fabricServer.deployTries)
}
})

t.Run("Streaming", func(t *testing.T) {
ss, err := fabricClient.Tail(context.Background(), connect.NewRequest(&defangv1.TailRequest{}))
if err != nil {
t.Fatal(err)
}
defer ss.Close()
if ss.Receive() == true {
t.Error("expected false")
}
if ss.Err() == nil {
t.Errorf("expected error")
}
// TODO: implement retries for streaming calls
if fabricServer.tailTries != 1 {
t.Fatalf("expected 1 tries, got %d", fabricServer.tailTries)
}
})
}
18 changes: 12 additions & 6 deletions src/protos/io/defang/v1/defangv1connect/fabric.connect.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e0b86c6

Please sign in to comment.