From d1f940bf6b06abdb79cfa61ec7a1d5d04bd49da8 Mon Sep 17 00:00:00 2001 From: Edward J Date: Sun, 30 Jun 2024 22:41:22 -0700 Subject: [PATCH 1/5] Add more logs to tail and service status monitor Slight restructure of the service monitor and tail code to improve readbablity --- src/cmd/cli/command/commands.go | 33 ++++++++++++++------------- src/cmd/cli/command/servicemonitor.go | 3 +-- src/pkg/clouds/aws/ecs/logs.go | 8 +++++++ 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/cmd/cli/command/commands.go b/src/cmd/cli/command/commands.go index 28d8b617d..c9caf735f 100644 --- a/src/cmd/cli/command/commands.go +++ b/src/cmd/cli/command/commands.go @@ -11,7 +11,6 @@ import ( "regexp" "slices" "strings" - "sync" "time" "github.com/AlecAivazis/survey/v2" @@ -827,26 +826,28 @@ var composeUpCmd = &cobra.Command{ tailCtx, cancelTail := context.WithCancel(ctx) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - // show users the current streaming logs - if err := startTailing(tailCtx, deploy.Etag, since); err != nil { - var cerr *cli.CancelError - if !errors.As(err, &cerr) { - term.Debugf("failed to start tailing: %v", err) + go func() { // Cancel the tailing if the service is ready + if err := waitServiceStatus(ctx, cli.ServiceStarted, serviceInfos); err != nil { + if !errors.Is(err, context.Canceled) && + !errors.Is(err, cli.ErrDryRun) && + !errors.As(err, new(cliClient.ErrNotImplemented)) { + term.Warnf("failed to wait for service status: %v", err) + } else { + term.Debugf("failed to wait for service status: %v", err) } + term.Info("Service status monitoring failed, we will continue tailing the logs. Press Ctrl+C to detach.") + return // Do not cancel the tailing if we are unable to wait for the service status } + cancelTail() }() - if err := waitServiceStatus(ctx, cli.ServiceStarted, serviceInfos); err != nil && !errors.Is(err, context.Canceled) { - if !errors.Is(err, cli.ErrDryRun) && !errors.As(err, new(cliClient.ErrNotImplemented)) { - term.Warnf("failed to wait for service status: %v", err) + + // show users the current streaming logs + if err := startTailing(tailCtx, deploy.Etag, since); err != nil { + var cerr *cli.CancelError + if !errors.As(err, &cerr) { + term.Debugf("failed to start tailing: %v", err) } - wg.Wait() // Wait until ctrl+c is pressed } - cancelTail() - wg.Wait() // Wait for tail to finish printEndpoints(serviceInfos) term.Info("Done.") diff --git a/src/cmd/cli/command/servicemonitor.go b/src/cmd/cli/command/servicemonitor.go index f33ace47b..85dd549e8 100644 --- a/src/cmd/cli/command/servicemonitor.go +++ b/src/cmd/cli/command/servicemonitor.go @@ -18,8 +18,7 @@ func waitServiceStatus(ctx context.Context, targetStatus cli.ServiceStatus, serv // set up service status subscription (non-blocking) subscribeServiceStatusChan, err := cli.Subscribe(ctx, client, serviceList) if err != nil { - term.Debugf("error subscribing to service status: %v", err) - return err + return fmt.Errorf("error subscribing to service status: %w", err) } serviceStatus := make(map[string]string, len(serviceList)) diff --git a/src/pkg/clouds/aws/ecs/logs.go b/src/pkg/clouds/aws/ecs/logs.go index 3aeac9dde..e5d11668b 100644 --- a/src/pkg/clouds/aws/ecs/logs.go +++ b/src/pkg/clouds/aws/ecs/logs.go @@ -11,6 +11,7 @@ import ( "github.com/DefangLabs/defang/src/pkg/clouds/aws" "github.com/DefangLabs/defang/src/pkg/clouds/aws/region" + "github.com/DefangLabs/defang/src/pkg/term" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" "github.com/aws/aws-sdk-go-v2/service/ecs" @@ -140,7 +141,13 @@ type LogGroupInput struct { LogEventFilterPattern string } +func (l LogGroupInput) String() string { + return fmt.Sprintf("LogGroupARN: %s, LogStreamNames: %v, LogStreamNamePrefix: %s, LogEventFilterPattern: %s", + l.LogGroupARN, l.LogStreamNames, l.LogStreamNamePrefix, l.LogEventFilterPattern) +} + func TailLogGroup(ctx context.Context, input LogGroupInput) (EventStream, error) { + term.Debug("Trying to tail log, getting event stream for:", input) var pattern *string if input.LogEventFilterPattern != "" { pattern = &input.LogEventFilterPattern @@ -318,6 +325,7 @@ type collectionStream struct { } func (c *collectionStream) addAndStart(s EventStream, since time.Time, lgi LogGroupInput) { + term.Debug("Started to tail log event stream:", lgi) c.lock.Lock() defer c.lock.Unlock() c.streams = append(c.streams, s) From 6b840c754c90082e258adf6687f3b029c65548e1 Mon Sep 17 00:00:00 2001 From: Edward J Date: Sun, 30 Jun 2024 22:53:58 -0700 Subject: [PATCH 2/5] One more log --- src/cmd/cli/command/servicemonitor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cmd/cli/command/servicemonitor.go b/src/cmd/cli/command/servicemonitor.go index 85dd549e8..f3c1abd43 100644 --- a/src/cmd/cli/command/servicemonitor.go +++ b/src/cmd/cli/command/servicemonitor.go @@ -14,6 +14,7 @@ func waitServiceStatus(ctx context.Context, targetStatus cli.ServiceStatus, serv for _, serviceInfo := range serviceInfos { serviceList = append(serviceList, serviceInfo.Service.Name) } + term.Debugf("Waiting for services %v to reach state: %s", serviceList, targetStatus) // set up service status subscription (non-blocking) subscribeServiceStatusChan, err := cli.Subscribe(ctx, client, serviceList) From 686c82d38d39645da173aff19e0c58b44d9a7530 Mon Sep 17 00:00:00 2001 From: Edward J Date: Tue, 2 Jul 2024 10:48:36 -0700 Subject: [PATCH 3/5] Wait for service monitor to finish even if log tail fails --- src/cmd/cli/command/commands.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/cmd/cli/command/commands.go b/src/cmd/cli/command/commands.go index c9caf735f..a0cfbacec 100644 --- a/src/cmd/cli/command/commands.go +++ b/src/cmd/cli/command/commands.go @@ -11,6 +11,7 @@ import ( "regexp" "slices" "strings" + "sync" "time" "github.com/AlecAivazis/survey/v2" @@ -826,6 +827,8 @@ var composeUpCmd = &cobra.Command{ tailCtx, cancelTail := context.WithCancel(ctx) + var wg sync.WaitGroup + wg.Add(1) go func() { // Cancel the tailing if the service is ready if err := waitServiceStatus(ctx, cli.ServiceStarted, serviceInfos); err != nil { if !errors.Is(err, context.Canceled) && @@ -845,10 +848,13 @@ var composeUpCmd = &cobra.Command{ if err := startTailing(tailCtx, deploy.Etag, since); err != nil { var cerr *cli.CancelError if !errors.As(err, &cerr) { - term.Debugf("failed to start tailing: %v", err) + term.Warnf("failed to start tailing, you will not be able to see logs: %v", err) + } else { + term.Debugf("tailing cancelled: %v", err) } } + wg.Wait() // Wait for the service status monitoring to finish printEndpoints(serviceInfos) term.Info("Done.") return nil From 3517f1c1660969c4db24baba67df548a62bbf6ea Mon Sep 17 00:00:00 2001 From: Edward J Date: Thu, 4 Jul 2024 14:22:04 -0700 Subject: [PATCH 4/5] Add wg.Done() --- src/cmd/cli/command/commands.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cmd/cli/command/commands.go b/src/cmd/cli/command/commands.go index a0cfbacec..fbe8b7adf 100644 --- a/src/cmd/cli/command/commands.go +++ b/src/cmd/cli/command/commands.go @@ -830,6 +830,7 @@ var composeUpCmd = &cobra.Command{ var wg sync.WaitGroup wg.Add(1) go func() { // Cancel the tailing if the service is ready + defer wg.Done() if err := waitServiceStatus(ctx, cli.ServiceStarted, serviceInfos); err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, cli.ErrDryRun) && From 304462d0a12beabcdaa97af296a8e32f6a699103 Mon Sep 17 00:00:00 2001 From: Edward J Date: Thu, 4 Jul 2024 22:49:30 -0700 Subject: [PATCH 5/5] Restructure service status update, fix log fixes: https://github.com/DefangLabs/defang-mvp/issues/790 --- src/cmd/cli/command/commands.go | 42 ++++-------- src/cmd/cli/command/servicemonitor.go | 42 +++++++++--- src/pkg/cli/subscribe.go | 99 +++++++++++++-------------- 3 files changed, 94 insertions(+), 89 deletions(-) diff --git a/src/cmd/cli/command/commands.go b/src/cmd/cli/command/commands.go index fbe8b7adf..d47fd6e4f 100644 --- a/src/cmd/cli/command/commands.go +++ b/src/cmd/cli/command/commands.go @@ -11,7 +11,6 @@ import ( "regexp" "slices" "strings" - "sync" "time" "github.com/AlecAivazis/survey/v2" @@ -825,38 +824,25 @@ var composeUpCmd = &cobra.Command{ return nil } - tailCtx, cancelTail := context.WithCancel(ctx) - - var wg sync.WaitGroup - wg.Add(1) - go func() { // Cancel the tailing if the service is ready - defer wg.Done() - if err := waitServiceStatus(ctx, cli.ServiceStarted, serviceInfos); err != nil { - if !errors.Is(err, context.Canceled) && - !errors.Is(err, cli.ErrDryRun) && - !errors.As(err, new(cliClient.ErrNotImplemented)) { - term.Warnf("failed to wait for service status: %v", err) - } else { - term.Debugf("failed to wait for service status: %v", err) - } - term.Info("Service status monitoring failed, we will continue tailing the logs. Press Ctrl+C to detach.") - return // Do not cancel the tailing if we are unable to wait for the service status - } - cancelTail() - }() + tailCtx, cancelTail := contextWithServiceStatus(ctx, cli.ServiceStarted, serviceInfos) + defer cancelTail() // show users the current streaming logs - if err := startTailing(tailCtx, deploy.Etag, since); err != nil { - var cerr *cli.CancelError - if !errors.As(err, &cerr) { - term.Warnf("failed to start tailing, you will not be able to see logs: %v", err) - } else { - term.Debugf("tailing cancelled: %v", err) - } + err = startTailing(tailCtx, deploy.Etag, since) + + // Indicate to the user the log tailing failed + var cerr *cli.CancelError + if !errors.As(err, &cerr) { + term.Warnf("failed to start tailing, you will not be able to see logs: %v", err) } - wg.Wait() // Wait for the service status monitoring to finish + // wait for service status to be ready even if tailing failed + <-tailCtx.Done() printEndpoints(serviceInfos) + + if err != nil { + return err + } term.Info("Done.") return nil }, diff --git a/src/cmd/cli/command/servicemonitor.go b/src/cmd/cli/command/servicemonitor.go index f3c1abd43..2f2ab5773 100644 --- a/src/cmd/cli/command/servicemonitor.go +++ b/src/cmd/cli/command/servicemonitor.go @@ -2,13 +2,36 @@ package command import ( "context" + "errors" "fmt" "github.com/DefangLabs/defang/src/pkg/cli" + cliClient "github.com/DefangLabs/defang/src/pkg/cli/client" "github.com/DefangLabs/defang/src/pkg/term" defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1" ) +func contextWithServiceStatus(ctx context.Context, targetStatus cli.ServiceStatus, serviceInfos []*defangv1.ServiceInfo) (context.Context, context.CancelFunc) { + // New context to make sure tail is only cancelled when message about service status is printed + newCtx, cancel := context.WithCancel(context.Background()) + go func() { + err := waitServiceStatus(ctx, targetStatus, serviceInfos) + if err == nil { + cancel() + return + } + + if !errors.Is(err, context.Canceled) && !errors.Is(err, cli.ErrDryRun) && !errors.As(err, new(cliClient.ErrNotImplemented)) { + term.Info("Service status monitoring failed, we will continue tailing the logs. Press Ctrl+C to detach.") + } + term.Debugf("failed to wait for service status: %v", err) + + <-ctx.Done() // Don't cancel tail until the original context is cancelled + cancel() + }() + return newCtx, cancel + +} func waitServiceStatus(ctx context.Context, targetStatus cli.ServiceStatus, serviceInfos []*defangv1.ServiceInfo) error { serviceList := []string{} for _, serviceInfo := range serviceInfos { @@ -17,10 +40,11 @@ func waitServiceStatus(ctx context.Context, targetStatus cli.ServiceStatus, serv term.Debugf("Waiting for services %v to reach state: %s", serviceList, targetStatus) // set up service status subscription (non-blocking) - subscribeServiceStatusChan, err := cli.Subscribe(ctx, client, serviceList) + serviceStatusUpdateStream, err := cli.Subscribe(ctx, client, serviceList) if err != nil { return fmt.Errorf("error subscribing to service status: %w", err) } + defer serviceStatusUpdateStream.Close() serviceStatus := make(map[string]string, len(serviceList)) for _, name := range serviceList { @@ -28,13 +52,17 @@ func waitServiceStatus(ctx context.Context, targetStatus cli.ServiceStatus, serv } // monitor for when all services are completed to end this command - for newStatus := range subscribeServiceStatusChan { - if _, ok := serviceStatus[newStatus.Name]; !ok { - term.Debugf("unexpected service %s update", newStatus.Name) - continue + for { + statusUpdate, err := serviceStatusUpdateStream.ServerStatus() + if err != nil { + return fmt.Errorf("service state monitoring terminated without all services reaching desired state %q: %w", targetStatus, err) } - serviceStatus[newStatus.Name] = newStatus.Status + if _, ok := serviceStatus[statusUpdate.Name]; !ok { + term.Debugf("unexpected service %s update", statusUpdate.Name) + continue + } + serviceStatus[statusUpdate.Name] = statusUpdate.Status if allInStatus(targetStatus, serviceStatus) { for _, sInfo := range serviceInfos { @@ -43,8 +71,6 @@ func waitServiceStatus(ctx context.Context, targetStatus cli.ServiceStatus, serv return nil } } - - return fmt.Errorf("service state monitoring terminated without all services reaching desired state: %s", targetStatus) } func allInStatus(targetStatus cli.ServiceStatus, serviceStatuses map[string]string) bool { diff --git a/src/pkg/cli/subscribe.go b/src/pkg/cli/subscribe.go index b73034b2c..62bae2656 100644 --- a/src/pkg/cli/subscribe.go +++ b/src/pkg/cli/subscribe.go @@ -10,76 +10,69 @@ import ( defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1" ) -type SubscribeServiceStatus struct { +type ServiceStatusUpdate struct { Name string Status string } -func Subscribe(ctx context.Context, client client.Client, services []string) (<-chan SubscribeServiceStatus, error) { +type ServiceStatusStream struct { + serverStream client.ServerStream[defangv1.SubscribeResponse] + normalizedServiceNameToServiceName map[string]string +} + +func (s *ServiceStatusStream) ServerStatus() (*ServiceStatusUpdate, error) { + for s.serverStream.Receive() { + msg := s.serverStream.Msg() + if msg == nil { + continue + } + + servInfo := msg.GetService() + if servInfo == nil || servInfo.Service == nil { + continue + } + + serviceName, ok := s.normalizedServiceNameToServiceName[servInfo.Service.Name] + if !ok { + term.Debugf("Unknown service %s in subscribe response\n", servInfo.Service.Name) + continue + } + + status := ServiceStatusUpdate{ + Name: serviceName, + Status: servInfo.Status, + } + + term.Debugf("service %s with status %s\n", serviceName, servInfo.Status) + return &status, nil + } + + return nil, s.serverStream.Err() +} + +func (s *ServiceStatusStream) Close() error { + return s.serverStream.Close() +} + +func Subscribe(ctx context.Context, client client.Client, services []string) (*ServiceStatusStream, error) { if len(services) == 0 { return nil, fmt.Errorf("no services specified") } - normalizedServiceNameToServiceName := make(map[string]string, len(services)) + if DoDryRun { + return nil, ErrDryRun + } + normalizedServiceNameToServiceName := make(map[string]string, len(services)) for i, service := range services { services[i] = compose.NormalizeServiceName(service) normalizedServiceNameToServiceName[services[i]] = service } - statusChan := make(chan SubscribeServiceStatus, len(services)) - if DoDryRun { - defer close(statusChan) - return statusChan, ErrDryRun - } - serverStream, err := client.Subscribe(ctx, &defangv1.SubscribeRequest{Services: services}) if err != nil { return nil, err } - go func() { - defer serverStream.Close() - defer close(statusChan) - for { - - // handle cancel from caller - select { - case <-ctx.Done(): - term.Debug("Context Done - exiting Subscribe goroutine") - return - default: - } - - if !serverStream.Receive() { - term.Debug("Subscribe Stream closed", serverStream.Err()) - return - } - - msg := serverStream.Msg() - if msg == nil { - continue - } - - servInfo := msg.GetService() - if servInfo == nil || servInfo.Service == nil { - continue - } - - serviceName, ok := normalizedServiceNameToServiceName[servInfo.Service.Name] - if !ok { - term.Debugf("Unknown service %s in subscribe response\n", servInfo.Service.Name) - continue - } - status := SubscribeServiceStatus{ - Name: serviceName, - Status: servInfo.Status, - } - - statusChan <- status - term.Debugf("service %s with status %s\n", serviceName, servInfo.Status) - } - }() - - return statusChan, nil + return &ServiceStatusStream{serverStream, normalizedServiceNameToServiceName}, nil }