Skip to content

Commit

Permalink
Merge pull request #14 from fujiwara/json
Browse files Browse the repository at this point in the history
add -json flag
  • Loading branch information
fujiwara authored Aug 18, 2024
2 parents 779f85e + 7d24429 commit 9898064
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 61 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: "1.20"
go-version: "1.23"
- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v4
with:
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ jobs:
strategy:
matrix:
go:
- "1.21"
- "1.22"
- "1.23"
name: Build
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
.envrc
cmd/tracer/tracer
dist/
tracer
./tracer
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ tracer [options] [cluster] [task-id]
-duration duration
fetch logs duration from created / before stopping (default 1m0s)
-json
output as JSON lines
-sns string
SNS topic ARN
-stdout
output to stdout (default true)
-version
show the version
show the version
```

Environment variable `AWS_REGION` is required.
Expand Down
8 changes: 7 additions & 1 deletion cmd/tracer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"log/slog"
"os"
"strings"
"time"
Expand Down Expand Up @@ -43,9 +44,14 @@ func main() {
flag.BoolVar(&showVersion, "version", false, "show the version")
flag.BoolVar(&opt.Stdout, "stdout", true, "output to stdout")
flag.StringVar(&opt.SNSTopicArn, "sns", "", "SNS topic ARN")
flag.BoolVar(&opt.JSON, "json", false, "output as JSON lines")
flag.VisitAll(envToFlag)
flag.Parse()

if opt.JSON {
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, nil)))
}

if showVersion {
fmt.Println("tracer", Version)
return
Expand All @@ -60,7 +66,7 @@ func main() {
copy(args, flag.Args())

if err := t.Run(ctx, args[0], args[1], &opt); err != nil {
fmt.Fprintln(os.Stderr, err)
slog.Error(err.Error())
os.Exit(1)
}
}
Expand Down
4 changes: 2 additions & 2 deletions lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package tracer

import (
"context"
"fmt"
"log/slog"
"strings"

"github.com/aws/aws-sdk-go-v2/aws/arn"
)

func (t *Tracer) LambdaHandlerFunc(opt *RunOption) func(ctx context.Context, event *ECSTaskEvent) error {
return func(ctx context.Context, event *ECSTaskEvent) error {
fmt.Println(event.String())
slog.Info("event", "payload", event.String())
lastStatus := event.Detail.LastStatus
if lastStatus != "STOPPED" {
return nil
Expand Down
3 changes: 2 additions & 1 deletion lambda/function.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
FunctionName: 'tracer',
MemorySize: 128,
Handler: 'index.handler',
Role: 'arn:aws:iam::{account_id}:role/{role_name}',
// Role: 'arn:aws:iam::{account_id}:role/{role_name}',
Role: 'arn:aws:iam::314472643515:role/tracer',
Runtime: 'provided.al2',
}
165 changes: 125 additions & 40 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package tracer
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"sort"
"strings"
Expand Down Expand Up @@ -71,7 +73,7 @@ func (tl *Timeline) Add(event *TimelineEvent) {
tl.events = append(tl.events, event)
}

func (tl *Timeline) Print(w io.Writer) (int, error) {
func (tl *Timeline) Print(w io.Writer, json bool) (int, error) {
tl.mu.Lock()
defer tl.mu.Unlock()

Expand All @@ -81,16 +83,23 @@ func (tl *Timeline) Print(w io.Writer) (int, error) {
return tls[i].Timestamp.Before(tls[j].Timestamp)
})
n := 0
toString := func(e *TimelineEvent) string {
if json {
return e.JSON()
}
return e.String()
}
for _, e := range tls {
s := e.String()
if !tl.seen[s] {
l, err := fmt.Fprint(w, e.String())
if err != nil {
return n, err
}
n += l
tl.seen[s] = true
s := toString(e)
if tl.seen[s] {
continue
}
l, err := fmt.Fprint(w, s)
if err != nil {
return n, err
}
n += l
tl.seen[s] = true
}
return n, nil
}
Expand All @@ -106,6 +115,20 @@ func (e *TimelineEvent) String() string {
return fmt.Sprintf("%s\t%s\t%s\n", ts.Format(TimeFormat), e.Source, e.Message)
}

func (e *TimelineEvent) JSON() string {
ts := e.Timestamp.In(time.Local)
b, _ := json.Marshal(struct {
Time string `json:"time"`
Source string `json:"src"`
Message string `json:"msg"`
}{
Time: ts.Format(TimeFormat),
Source: e.Source,
Message: e.Message,
})
return string(b) + "\n"
}

func New(ctx context.Context) (*Tracer, error) {
region := os.Getenv("AWS_REGION")
awscfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(region))
Expand Down Expand Up @@ -138,6 +161,7 @@ type RunOption struct {
Stdout bool
SNSTopicArn string
Duration time.Duration
JSON bool
}

func (t *Tracer) SetOutput(w io.Writer) {
Expand All @@ -151,7 +175,7 @@ func (t *Tracer) Run(ctx context.Context, cluster string, taskID string, opt *Ru
defer func() { t.report(ctx, cluster, taskID) }()

if cluster == "" {
return t.listClusters(ctx)
return t.listClusters(ctx, opt)
}

if taskID == "" {
Expand All @@ -162,6 +186,12 @@ func (t *Tracer) Run(ctx context.Context, cluster string, taskID string, opt *Ru
if err != nil {
return err
}

defer func() {
if _, err := t.timeline.Print(t.buf, opt.JSON); err != nil {
slog.Error("failed to print timeline", "error", err)
}
}()
if err := t.traceLogs(ctx, task); err != nil {
return err
}
Expand All @@ -172,14 +202,19 @@ func (t *Tracer) Run(ctx context.Context, cluster string, taskID string, opt *Ru
func (t *Tracer) report(ctx context.Context, cluster, taskID string) {
opt := t.option
if opt.Stdout {
fmt.Fprintln(t.w, subject(cluster, taskID))
sub := &subject{cluster, taskID}
if opt.JSON {
fmt.Fprintln(t.w, sub.JSON())
} else {
fmt.Fprintln(t.w, sub.String())
}
if _, err := t.WriteTo(t.w); err != nil {
fmt.Fprintln(os.Stderr, err)
slog.Error("failed to write to output", "error", err)
}
}
if opt.SNSTopicArn != "" {
if err := t.Publish(ctx, opt.SNSTopicArn, cluster, taskID); err != nil {
fmt.Fprintln(os.Stderr, err)
slog.Error("failed to publish to SNS", "error", err)
}
}
}
Expand All @@ -189,19 +224,29 @@ func (t *Tracer) WriteTo(w io.Writer) (int64, error) {
return int64(n), err
}

func subject(cluster, taskID string) string {
s := "Tracer:"
if taskID != "" {
s += " " + taskID
} else if cluster != "" {
s += " tasks"
type subject struct {
Cluster string `json:"cluster"`
TaskID string `json:"task_id"`
}

func (s *subject) JSON() string {
b, _ := json.Marshal(s)
return string(b)
}

func (s *subject) String() string {
str := "Tracer:"
if s.TaskID != "" {
str += " " + s.TaskID
} else if s.Cluster != "" {
str += " tasks"
}
if cluster != "" {
s += " on " + cluster
if s.Cluster != "" {
str += " on " + s.Cluster
} else {
s += " clusters"
str += " clusters"
}
return s
return str
}

const (
Expand All @@ -215,7 +260,7 @@ func (t *Tracer) Publish(ctx context.Context, topicArn, cluster, taskID string)
msg = msg[:snsMaxPayloadSize]
}

s := subject(cluster, taskID)
s := (&subject{cluster, taskID}).String()
if len(s) > snsSubjectLimitLength {
s = s[0:snsSubjectLimitLength-len(ellipsisString)] + ellipsisString
}
Expand All @@ -236,7 +281,7 @@ func (t *Tracer) traceTask(ctx context.Context, cluster string, taskID string) (
return nil, fmt.Errorf("failed to describe tasks: %w", err)
}
if len(res.Tasks) == 0 {
return nil, fmt.Errorf("no tasks found: %w", err)
return nil, fmt.Errorf("no tasks found. cluster: %s, task_id: %s", cluster, taskID)
}
task := res.Tasks[0]

Expand Down Expand Up @@ -278,8 +323,6 @@ func (t *Tracer) traceTask(ctx context.Context, cluster string, taskID string) (
}

func (t *Tracer) traceLogs(ctx context.Context, task *ecsTypes.Task) error {
defer t.timeline.Print(t.buf)

res, err := t.ecs.DescribeTaskDefinition(ctx, &ecs.DescribeTaskDefinitionInput{
TaskDefinition: task.TaskDefinitionArn,
})
Expand Down Expand Up @@ -393,7 +436,7 @@ func (t *Tracer) listAllTasks(ctx context.Context, cluster string) error {
return nil
}

func (t *Tracer) listClusters(ctx context.Context) error {
func (t *Tracer) listClusters(ctx context.Context, opt *RunOption) error {
res, err := t.ecs.ListClusters(ctx, &ecs.ListClustersInput{})
if err != nil {
return err
Expand All @@ -403,6 +446,17 @@ func (t *Tracer) listClusters(ctx context.Context) error {
clusters = append(clusters, arnToName(c))
}
sort.Strings(clusters)
if opt.JSON {
err := json.NewEncoder(t.buf).Encode(
struct {
Clusters []string `json:"clusters"`
}{clusters},
)
if err != nil {
return fmt.Errorf("failed to encode JSON: %w", err)
}
return nil
}
for _, c := range clusters {
t.buf.WriteString(c)
t.buf.WriteByte('\n')
Expand Down Expand Up @@ -431,9 +485,13 @@ func (t *Tracer) listTasks(ctx context.Context, cluster string, status ecsTypes.
if err != nil {
return fmt.Errorf("failed to describe tasks: %w", err)
}
for _, task := range res.Tasks {
t.buf.WriteString(strings.Join(taskToColumns(&task), "\t"))
t.buf.WriteRune('\n')
for _, ts := range res.Tasks {
task := newTask(&ts)
if t.option.JSON {
t.buf.WriteString(task.JSON())
} else {
t.buf.WriteString(task.String())
}
}
if nextToken = listRes.NextToken; nextToken == nil {
break
Expand Down Expand Up @@ -480,14 +538,41 @@ func arnToName(arn string) string {
return arn[strings.LastIndex(arn, "/")+1:]
}

func taskToColumns(task *ecsTypes.Task) []string {
return []string{
arnToName(*task.TaskArn),
arnToName(*task.TaskDefinitionArn),
aws.ToString(task.LastStatus),
aws.ToString(task.DesiredStatus),
task.CreatedAt.In(time.Local).Format(time.RFC3339),
aws.ToString(task.Group),
string(task.LaunchType),
type task struct {
Arn string `json:"arn"`
TaskDefinition string `json:"task_definition"`
LastStatus string `json:"last_status"`
DesiredStatus string `json:"desired_status"`
CreatedAt string `json:"created_at"`
Group string `json:"group"`
LaunchType string `json:"launch_type"`
}

func newTask(t *ecsTypes.Task) *task {
return &task{
Arn: arnToName(*t.TaskArn),
TaskDefinition: arnToName(*t.TaskDefinitionArn),
LastStatus: aws.ToString(t.LastStatus),
DesiredStatus: aws.ToString(t.DesiredStatus),
CreatedAt: t.CreatedAt.In(time.Local).Format(time.RFC3339),
Group: aws.ToString(t.Group),
LaunchType: string(t.LaunchType),
}
}

func (t *task) String() string {
return strings.Join([]string{
t.Arn,
t.TaskDefinition,
t.LastStatus,
t.DesiredStatus,
t.CreatedAt,
t.Group,
t.LaunchType,
}, "\t") + "\n"
}

func (t *task) JSON() string {
b, _ := json.Marshal(t)
return string(b) + "\n"
}
Loading

0 comments on commit 9898064

Please sign in to comment.