Skip to content

Commit

Permalink
feat:vlogs server add pod list. (#5360)
Browse files Browse the repository at this point in the history
* feat:add pod list.
  • Loading branch information
bearslyricattack authored Feb 10, 2025
1 parent 5cfeb59 commit a6e8c9c
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 37 deletions.
27 changes: 18 additions & 9 deletions service/pkg/api/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,24 @@ type VlogsRequest struct {
Time string `json:"time"`
Namespace string `json:"namespace"`
App string `json:"app"`
Limit string `json:"limit"`
JSONMode string `json:"jsonMode"`
StderrMode string `json:"stderrMode"`
NumberMode string `json:"numberMode"`
NumberLevel string `json:"numberLevel"`
Pod []string `json:"pod"`
Container []string `json:"container"`
Keyword string `json:"keyword"`
JSONQuery []JSONQuery `json:"jsonQuery"`
Limit string `json:"limit,omitempty"`
JSONMode string `json:"jsonMode,omitempty"`
StderrMode string `json:"stderrMode,omitempty"`
NumberMode string `json:"numberMode,omitempty"`
NumberLevel string `json:"numberLevel,omitempty"`
Pod []string `json:"pod,omitempty"`
Container []string `json:"container,omitempty"`
Keyword string `json:"keyword,omitempty"`
JSONQuery []JSONQuery `json:"jsonQuery,omitempty"`
PodQuery string `json:"podQuery,omitempty"`
}

type VlogsResponse struct {
Time string `json:"_time"`
Message string `json:"_msg"`
Container string `json:"container"`
Pod string `json:"pod"`
Stream string `json:"stream"`
}

var (
Expand Down
17 changes: 5 additions & 12 deletions service/vlogs/request/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package request
import (
"crypto/tls"
"fmt"
"io"
"net/http"
"net/url"
)
Expand All @@ -25,7 +24,7 @@ func generateReq(path string, username string, password string, query string) (*
return req, nil
}

func QueryLogsByParams(path string, username string, password string, query string, rw http.ResponseWriter) error {
func QueryLogsByParams(path string, username string, password string, query string) (*http.Response, error) {
httpClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
Expand All @@ -35,20 +34,14 @@ func QueryLogsByParams(path string, username string, password string, query stri
}
req, err := generateReq(path, username, password, query)
if err != nil {
return err
return nil, err
}
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("HTTP req error: %v", err)
return nil, fmt.Errorf("HTTP req error: %v", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("res error,err info: %+v", resp)
}
_, err = io.Copy(rw, resp.Body)
if err != nil {
return err
return nil, fmt.Errorf("res error,err info: %+v", resp)
}
return nil
return resp, nil
}
117 changes: 101 additions & 16 deletions service/vlogs/server/server.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package server

import (
"bufio"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"strings"

"github.com/labring/sealos/service/pkg/api"
"github.com/labring/sealos/service/pkg/auth"
"github.com/labring/sealos/service/vlogs/request"

"log"
"net/http"
"net/url"
"strings"
)

type VLogsServer struct {
Expand All @@ -33,32 +34,104 @@ func NewVLogsServer(config *Config) (*VLogsServer, error) {
}

func (vl *VLogsServer) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/queryLogsByParams" {
err := vl.queryLogsByParams(rw, req)
if err != nil {
http.Error(rw, fmt.Sprintf("query logs error: %s", err), http.StatusInternalServerError)
log.Printf("query logs error: %s", err)
}
query, err := vl.queryConvert(req)
if err != nil {
http.Error(rw, fmt.Sprintf("query %s error: %s", req.URL.Path, err), http.StatusInternalServerError)
return
}
err = query(rw, req)
if err != nil {
http.Error(rw, fmt.Sprintf("query %s error: %s", req.URL.Path, err), http.StatusInternalServerError)
slog.Error("%s error: %s", req.URL.Path, err)
return
}
http.Error(rw, "Not found", http.StatusNotFound)
}

func (vl *VLogsServer) queryLogsByParams(rw http.ResponseWriter, req *http.Request) error {
func (vl *VLogsServer) queryConvert(req *http.Request) (func(rw http.ResponseWriter, req *http.Request) error, error) {
switch req.URL.Path {
case "/queryLogsByParams":
return vl.queryLogsByParams, nil
case "/queryPodList":
return vl.queryPodList, nil
default:
return nil, fmt.Errorf("unknown url path")
}
}

func (vl *VLogsServer) authenticate(req *http.Request) (string, error) {
kubeConfig, namespace, query, err := vl.generateParamsRequest(req)
if err != nil {
return fmt.Errorf("bad request (%s)", err)
return "", fmt.Errorf("bad request (%s)", err)
}

err = auth.Authenticate(namespace, kubeConfig)
if err != nil {
return fmt.Errorf("authentication failed (%s)", err)
return "", fmt.Errorf("authentication failed (%s)", err)
}
return query, nil
}

func (vl *VLogsServer) queryLogsByParams(rw http.ResponseWriter, req *http.Request) error {
query, err := vl.authenticate(req)
if err != nil {
return err
}
resp, err := request.QueryLogsByParams(vl.path, vl.username, vl.password, query)
if err != nil {
return fmt.Errorf("query failed (%s)", err)
}
defer resp.Body.Close()
_, err = io.Copy(rw, resp.Body)
if err != nil {
return err
}
return nil
}

err = request.QueryLogsByParams(vl.path, vl.username, vl.password, query, rw)
func (vl *VLogsServer) queryPodList(rw http.ResponseWriter, req *http.Request) error {
query, err := vl.authenticate(req)
if err != nil {
return err
}
resp, err := request.QueryLogsByParams(vl.path, vl.username, vl.password, query)
if err != nil {
return fmt.Errorf("query failed (%s)", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %v", err)
}
if len(body) == 0 {
return fmt.Errorf("response body is empty")
}

scanner := bufio.NewScanner(strings.NewReader(string(body)))
var logs []api.VlogsResponse

for scanner.Scan() {
var entry api.VlogsResponse
line := scanner.Text()
err := json.Unmarshal([]byte(line), &entry)
if err != nil {
continue
}
logs = append(logs, entry)
}

uniquePods := make(map[string]struct{})
for _, log := range logs {
uniquePods[log.Pod] = struct{}{}
}
var podList []string
for pod := range uniquePods {
podList = append(podList, pod)
}

rw.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(rw).Encode(podList); err != nil {
return fmt.Errorf("failed to write response: %v", err)
}
return nil
}

Expand Down Expand Up @@ -91,6 +164,10 @@ type VLogsQuery struct {
}

func (v *VLogsQuery) getQuery(req *api.VlogsRequest) (string, error) {
if req.PodQuery == modeTrue {
query := v.generatePodListQuery(req)
return query, nil
}
v.generateKeywordQuery(req)
v.generateStreamQuery(req)
v.generateCommonQuery(req)
Expand All @@ -104,6 +181,14 @@ func (v *VLogsQuery) getQuery(req *api.VlogsRequest) (string, error) {
return v.query, nil
}

func (v *VLogsQuery) generatePodListQuery(req *api.VlogsRequest) string {
var builder strings.Builder
item := fmt.Sprintf(`{namespace="%s"} _time:%s app:="%s" | Drop _stream_id,_stream,app,job,namespace,node`, req.Namespace, req.Time, req.App)
builder.WriteString(item)
v.query += builder.String()
return v.query
}

func (v *VLogsQuery) generateKeywordQuery(req *api.VlogsRequest) {
var builder strings.Builder
builder.WriteString(req.Keyword)
Expand Down

0 comments on commit a6e8c9c

Please sign in to comment.