Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more logs to tail and service status monitor #518

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 15 additions & 21 deletions src/cmd/cli/command/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"regexp"
"slices"
"strings"
"sync"
"time"

"github.com/AlecAivazis/survey/v2"
Expand Down Expand Up @@ -825,30 +824,25 @@ var composeUpCmd = &cobra.Command{
return nil
}

tailCtx, cancelTail := context.WithCancel(ctx)
tailCtx, cancelTail := contextWithServiceStatus(ctx, cli.ServiceStarted, serviceInfos)
defer cancelTail()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can this happen? startTailing blocks? You do <-Done() so this cancel is just for cleanup, in which case the name is wrong.


var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// show users the current streaming logs
if err := startTailing(tailCtx, deploy.Etag, since); err != nil {
var cerr *cli.CancelError
if !errors.As(err, &cerr) {
term.Debugf("failed to start tailing: %v", err)
}
}
}()
if err := waitServiceStatus(ctx, cli.ServiceStarted, serviceInfos); err != nil && !errors.Is(err, context.Canceled) {
if !errors.Is(err, cli.ErrDryRun) && !errors.As(err, new(cliClient.ErrNotImplemented)) {
term.Warnf("failed to wait for service status: %v", err)
}
wg.Wait() // Wait until ctrl+c is pressed
// show users the current streaming logs
err = startTailing(tailCtx, deploy.Etag, since)

// Indicate to the user the log tailing failed
var cerr *cli.CancelError
if !errors.As(err, &cerr) {
term.Warnf("failed to start tailing, you will not be able to see logs: %v", err)
Comment on lines +835 to +836
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am on the fence if this warning is needed

}
cancelTail()
wg.Wait() // Wait for tail to finish

// wait for service status to be ready even if tailing failed
<-tailCtx.Done()
printEndpoints(serviceInfos)

if err != nil {
return err
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes the detach message regression

}
term.Info("Done.")
return nil
},
Expand Down
46 changes: 36 additions & 10 deletions src/cmd/cli/command/servicemonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,67 @@ package command

import (
"context"
"errors"
"fmt"

"github.com/DefangLabs/defang/src/pkg/cli"
cliClient "github.com/DefangLabs/defang/src/pkg/cli/client"
"github.com/DefangLabs/defang/src/pkg/term"
defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1"
)

func contextWithServiceStatus(ctx context.Context, targetStatus cli.ServiceStatus, serviceInfos []*defangv1.ServiceInfo) (context.Context, context.CancelFunc) {
// New context to make sure tail is only cancelled when message about service status is printed
newCtx, cancel := context.WithCancel(context.Background())
go func() {
err := waitServiceStatus(ctx, targetStatus, serviceInfos)
if err == nil {
cancel()
return
}

if !errors.Is(err, context.Canceled) && !errors.Is(err, cli.ErrDryRun) && !errors.As(err, new(cliClient.ErrNotImplemented)) {
term.Info("Service status monitoring failed, we will continue tailing the logs. Press Ctrl+C to detach.")
}
term.Debugf("failed to wait for service status: %v", err)
Comment on lines +24 to +27
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code would be much simpler if we don't need to log these, but we risk swallow errors for debugging


<-ctx.Done() // Don't cancel tail until the original context is cancelled
cancel()
}()
return newCtx, cancel

}
func waitServiceStatus(ctx context.Context, targetStatus cli.ServiceStatus, serviceInfos []*defangv1.ServiceInfo) error {
serviceList := []string{}
for _, serviceInfo := range serviceInfos {
serviceList = append(serviceList, serviceInfo.Service.Name)
}
term.Debugf("Waiting for services %v to reach state: %s", serviceList, targetStatus)

// set up service status subscription (non-blocking)
subscribeServiceStatusChan, err := cli.Subscribe(ctx, client, serviceList)
serviceStatusUpdateStream, err := cli.Subscribe(ctx, client, serviceList)
if err != nil {
term.Debugf("error subscribing to service status: %v", err)
return err
return fmt.Errorf("error subscribing to service status: %w", err)
}
defer serviceStatusUpdateStream.Close()

serviceStatus := make(map[string]string, len(serviceList))
for _, name := range serviceList {
serviceStatus[name] = string(cli.ServiceUnknown)
}

// monitor for when all services are completed to end this command
for newStatus := range subscribeServiceStatusChan {
if _, ok := serviceStatus[newStatus.Name]; !ok {
term.Debugf("unexpected service %s update", newStatus.Name)
continue
for {
statusUpdate, err := serviceStatusUpdateStream.ServerStatus()
if err != nil {
return fmt.Errorf("service state monitoring terminated without all services reaching desired state %q: %w", targetStatus, err)
}

serviceStatus[newStatus.Name] = newStatus.Status
if _, ok := serviceStatus[statusUpdate.Name]; !ok {
term.Debugf("unexpected service %s update", statusUpdate.Name)
continue
}
serviceStatus[statusUpdate.Name] = statusUpdate.Status

if allInStatus(targetStatus, serviceStatus) {
for _, sInfo := range serviceInfos {
Expand All @@ -43,8 +71,6 @@ func waitServiceStatus(ctx context.Context, targetStatus cli.ServiceStatus, serv
return nil
}
}

return fmt.Errorf("service state monitoring terminated without all services reaching desired state: %s", targetStatus)
}

func allInStatus(targetStatus cli.ServiceStatus, serviceStatuses map[string]string) bool {
Expand Down
99 changes: 46 additions & 53 deletions src/pkg/cli/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,76 +10,69 @@ import (
defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1"
)

type SubscribeServiceStatus struct {
type ServiceStatusUpdate struct {
Name string
Status string
}

func Subscribe(ctx context.Context, client client.Client, services []string) (<-chan SubscribeServiceStatus, error) {
type ServiceStatusStream struct {
serverStream client.ServerStream[defangv1.SubscribeResponse]
normalizedServiceNameToServiceName map[string]string
}

func (s *ServiceStatusStream) ServerStatus() (*ServiceStatusUpdate, error) {
for s.serverStream.Receive() {
msg := s.serverStream.Msg()
if msg == nil {
continue
}

servInfo := msg.GetService()
if servInfo == nil || servInfo.Service == nil {
continue
}

serviceName, ok := s.normalizedServiceNameToServiceName[servInfo.Service.Name]
if !ok {
term.Debugf("Unknown service %s in subscribe response\n", servInfo.Service.Name)
continue
}

status := ServiceStatusUpdate{
Name: serviceName,
Status: servInfo.Status,
}

term.Debugf("service %s with status %s\n", serviceName, servInfo.Status)
return &status, nil
}

return nil, s.serverStream.Err()
}

func (s *ServiceStatusStream) Close() error {
return s.serverStream.Close()
}

func Subscribe(ctx context.Context, client client.Client, services []string) (*ServiceStatusStream, error) {
if len(services) == 0 {
return nil, fmt.Errorf("no services specified")
}

normalizedServiceNameToServiceName := make(map[string]string, len(services))
if DoDryRun {
return nil, ErrDryRun
}

normalizedServiceNameToServiceName := make(map[string]string, len(services))
for i, service := range services {
services[i] = compose.NormalizeServiceName(service)
normalizedServiceNameToServiceName[services[i]] = service
}

statusChan := make(chan SubscribeServiceStatus, len(services))
if DoDryRun {
defer close(statusChan)
return statusChan, ErrDryRun
}

serverStream, err := client.Subscribe(ctx, &defangv1.SubscribeRequest{Services: services})
if err != nil {
return nil, err
}

go func() {
defer serverStream.Close()
defer close(statusChan)
for {

// handle cancel from caller
select {
case <-ctx.Done():
term.Debug("Context Done - exiting Subscribe goroutine")
return
default:
}

if !serverStream.Receive() {
term.Debug("Subscribe Stream closed", serverStream.Err())
return
}

msg := serverStream.Msg()
if msg == nil {
continue
}

servInfo := msg.GetService()
if servInfo == nil || servInfo.Service == nil {
continue
}

serviceName, ok := normalizedServiceNameToServiceName[servInfo.Service.Name]
if !ok {
term.Debugf("Unknown service %s in subscribe response\n", servInfo.Service.Name)
continue
}
status := SubscribeServiceStatus{
Name: serviceName,
Status: servInfo.Status,
}

statusChan <- status
term.Debugf("service %s with status %s\n", serviceName, servInfo.Status)
}
}()

return statusChan, nil
return &ServiceStatusStream{serverStream, normalizedServiceNameToServiceName}, nil
}
8 changes: 8 additions & 0 deletions src/pkg/clouds/aws/ecs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/DefangLabs/defang/src/pkg/clouds/aws"
"github.com/DefangLabs/defang/src/pkg/clouds/aws/region"
"github.com/DefangLabs/defang/src/pkg/term"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
"github.com/aws/aws-sdk-go-v2/service/ecs"
Expand Down Expand Up @@ -140,7 +141,13 @@ type LogGroupInput struct {
LogEventFilterPattern string
}

func (l LogGroupInput) String() string {
return fmt.Sprintf("LogGroupARN: %s, LogStreamNames: %v, LogStreamNamePrefix: %s, LogEventFilterPattern: %s",
l.LogGroupARN, l.LogStreamNames, l.LogStreamNamePrefix, l.LogEventFilterPattern)
}

func TailLogGroup(ctx context.Context, input LogGroupInput) (EventStream, error) {
term.Debug("Trying to tail log, getting event stream for:", input)
var pattern *string
if input.LogEventFilterPattern != "" {
pattern = &input.LogEventFilterPattern
Expand Down Expand Up @@ -318,6 +325,7 @@ type collectionStream struct {
}

func (c *collectionStream) addAndStart(s EventStream, since time.Time, lgi LogGroupInput) {
term.Debug("Started to tail log event stream:", lgi)
c.lock.Lock()
defer c.lock.Unlock()
c.streams = append(c.streams, s)
Expand Down
Loading