Skip to content

Commit

Permalink
Restructure service status update, fix log
Browse files Browse the repository at this point in the history
  • Loading branch information
edw-defang committed Jul 5, 2024
1 parent 3517f1c commit 304462d
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 89 deletions.
42 changes: 14 additions & 28 deletions src/cmd/cli/command/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"regexp"
"slices"
"strings"
"sync"
"time"

"github.com/AlecAivazis/survey/v2"
Expand Down Expand Up @@ -825,38 +824,25 @@ var composeUpCmd = &cobra.Command{
return nil
}

tailCtx, cancelTail := context.WithCancel(ctx)

var wg sync.WaitGroup
wg.Add(1)
go func() { // Cancel the tailing if the service is ready
defer wg.Done()
if err := waitServiceStatus(ctx, cli.ServiceStarted, serviceInfos); err != nil {
if !errors.Is(err, context.Canceled) &&
!errors.Is(err, cli.ErrDryRun) &&
!errors.As(err, new(cliClient.ErrNotImplemented)) {
term.Warnf("failed to wait for service status: %v", err)
} else {
term.Debugf("failed to wait for service status: %v", err)
}
term.Info("Service status monitoring failed, we will continue tailing the logs. Press Ctrl+C to detach.")
return // Do not cancel the tailing if we are unable to wait for the service status
}
cancelTail()
}()
tailCtx, cancelTail := contextWithServiceStatus(ctx, cli.ServiceStarted, serviceInfos)
defer cancelTail()

// show users the current streaming logs
if err := startTailing(tailCtx, deploy.Etag, since); err != nil {
var cerr *cli.CancelError
if !errors.As(err, &cerr) {
term.Warnf("failed to start tailing, you will not be able to see logs: %v", err)
} else {
term.Debugf("tailing cancelled: %v", err)
}
err = startTailing(tailCtx, deploy.Etag, since)

// Indicate to the user the log tailing failed
var cerr *cli.CancelError
if !errors.As(err, &cerr) {
term.Warnf("failed to start tailing, you will not be able to see logs: %v", err)
}

wg.Wait() // Wait for the service status monitoring to finish
// wait for service status to be ready even if tailing failed
<-tailCtx.Done()
printEndpoints(serviceInfos)

if err != nil {
return err
}
term.Info("Done.")
return nil
},
Expand Down
42 changes: 34 additions & 8 deletions src/cmd/cli/command/servicemonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,36 @@ package command

import (
"context"
"errors"
"fmt"

"github.com/DefangLabs/defang/src/pkg/cli"
cliClient "github.com/DefangLabs/defang/src/pkg/cli/client"
"github.com/DefangLabs/defang/src/pkg/term"
defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1"
)

func contextWithServiceStatus(ctx context.Context, targetStatus cli.ServiceStatus, serviceInfos []*defangv1.ServiceInfo) (context.Context, context.CancelFunc) {
// New context to make sure tail is only cancelled when message about service status is printed
newCtx, cancel := context.WithCancel(context.Background())
go func() {
err := waitServiceStatus(ctx, targetStatus, serviceInfos)
if err == nil {
cancel()
return
}

if !errors.Is(err, context.Canceled) && !errors.Is(err, cli.ErrDryRun) && !errors.As(err, new(cliClient.ErrNotImplemented)) {
term.Info("Service status monitoring failed, we will continue tailing the logs. Press Ctrl+C to detach.")
}
term.Debugf("failed to wait for service status: %v", err)

<-ctx.Done() // Don't cancel tail until the original context is cancelled
cancel()
}()
return newCtx, cancel

}
func waitServiceStatus(ctx context.Context, targetStatus cli.ServiceStatus, serviceInfos []*defangv1.ServiceInfo) error {
serviceList := []string{}
for _, serviceInfo := range serviceInfos {
Expand All @@ -17,24 +40,29 @@ func waitServiceStatus(ctx context.Context, targetStatus cli.ServiceStatus, serv
term.Debugf("Waiting for services %v to reach state: %s", serviceList, targetStatus)

// set up service status subscription (non-blocking)
subscribeServiceStatusChan, err := cli.Subscribe(ctx, client, serviceList)
serviceStatusUpdateStream, err := cli.Subscribe(ctx, client, serviceList)
if err != nil {
return fmt.Errorf("error subscribing to service status: %w", err)
}
defer serviceStatusUpdateStream.Close()

serviceStatus := make(map[string]string, len(serviceList))
for _, name := range serviceList {
serviceStatus[name] = string(cli.ServiceUnknown)
}

// monitor for when all services are completed to end this command
for newStatus := range subscribeServiceStatusChan {
if _, ok := serviceStatus[newStatus.Name]; !ok {
term.Debugf("unexpected service %s update", newStatus.Name)
continue
for {
statusUpdate, err := serviceStatusUpdateStream.ServerStatus()
if err != nil {
return fmt.Errorf("service state monitoring terminated without all services reaching desired state %q: %w", targetStatus, err)
}

serviceStatus[newStatus.Name] = newStatus.Status
if _, ok := serviceStatus[statusUpdate.Name]; !ok {
term.Debugf("unexpected service %s update", statusUpdate.Name)
continue
}
serviceStatus[statusUpdate.Name] = statusUpdate.Status

if allInStatus(targetStatus, serviceStatus) {
for _, sInfo := range serviceInfos {
Expand All @@ -43,8 +71,6 @@ func waitServiceStatus(ctx context.Context, targetStatus cli.ServiceStatus, serv
return nil
}
}

return fmt.Errorf("service state monitoring terminated without all services reaching desired state: %s", targetStatus)
}

func allInStatus(targetStatus cli.ServiceStatus, serviceStatuses map[string]string) bool {
Expand Down
99 changes: 46 additions & 53 deletions src/pkg/cli/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,76 +10,69 @@ import (
defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1"
)

type SubscribeServiceStatus struct {
type ServiceStatusUpdate struct {
Name string
Status string
}

func Subscribe(ctx context.Context, client client.Client, services []string) (<-chan SubscribeServiceStatus, error) {
type ServiceStatusStream struct {
serverStream client.ServerStream[defangv1.SubscribeResponse]
normalizedServiceNameToServiceName map[string]string
}

func (s *ServiceStatusStream) ServerStatus() (*ServiceStatusUpdate, error) {
for s.serverStream.Receive() {
msg := s.serverStream.Msg()
if msg == nil {
continue
}

servInfo := msg.GetService()
if servInfo == nil || servInfo.Service == nil {
continue
}

serviceName, ok := s.normalizedServiceNameToServiceName[servInfo.Service.Name]
if !ok {
term.Debugf("Unknown service %s in subscribe response\n", servInfo.Service.Name)
continue
}

status := ServiceStatusUpdate{
Name: serviceName,
Status: servInfo.Status,
}

term.Debugf("service %s with status %s\n", serviceName, servInfo.Status)
return &status, nil
}

return nil, s.serverStream.Err()
}

func (s *ServiceStatusStream) Close() error {
return s.serverStream.Close()
}

func Subscribe(ctx context.Context, client client.Client, services []string) (*ServiceStatusStream, error) {
if len(services) == 0 {
return nil, fmt.Errorf("no services specified")
}

normalizedServiceNameToServiceName := make(map[string]string, len(services))
if DoDryRun {
return nil, ErrDryRun
}

normalizedServiceNameToServiceName := make(map[string]string, len(services))
for i, service := range services {
services[i] = compose.NormalizeServiceName(service)
normalizedServiceNameToServiceName[services[i]] = service
}

statusChan := make(chan SubscribeServiceStatus, len(services))
if DoDryRun {
defer close(statusChan)
return statusChan, ErrDryRun
}

serverStream, err := client.Subscribe(ctx, &defangv1.SubscribeRequest{Services: services})
if err != nil {
return nil, err
}

go func() {
defer serverStream.Close()
defer close(statusChan)
for {

// handle cancel from caller
select {
case <-ctx.Done():
term.Debug("Context Done - exiting Subscribe goroutine")
return
default:
}

if !serverStream.Receive() {
term.Debug("Subscribe Stream closed", serverStream.Err())
return
}

msg := serverStream.Msg()
if msg == nil {
continue
}

servInfo := msg.GetService()
if servInfo == nil || servInfo.Service == nil {
continue
}

serviceName, ok := normalizedServiceNameToServiceName[servInfo.Service.Name]
if !ok {
term.Debugf("Unknown service %s in subscribe response\n", servInfo.Service.Name)
continue
}
status := SubscribeServiceStatus{
Name: serviceName,
Status: servInfo.Status,
}

statusChan <- status
term.Debugf("service %s with status %s\n", serviceName, servInfo.Status)
}
}()

return statusChan, nil
return &ServiceStatusStream{serverStream, normalizedServiceNameToServiceName}, nil
}

0 comments on commit 304462d

Please sign in to comment.