Skip to content

Commit

Permalink
feat: aiproxy modelinfo and dashboard and model rpm limit (#5291)
Browse files Browse the repository at this point in the history
* feat: model info

* fix: model config vision

* feat: aiproxy dashboard api

* fix: two week and pg hour format

* fix: model tag name

* feat: model rpm limit

* fix: ci

* feat: search log with code type

* feat: resp detail buf use pool

* feat: no need init client, use ctx

* fix: lint

* feat: admin api log filed

* feat: log usage

* feat: auto retry

* fix: retry channel exhausted, use first channel

* feat: init monitor

* feat: auto ban error rate and auto test unban

* fix: getChannelWithFallback

* feat: support google thinking

* fix: monitor

* feat: get log detail

* feat: no need channel config

* feat: key validate

* feat: add model error auto ban optioon

* feat: gemini tool

* feat: gemini openai sdk

* fix: option keys

* feat: do not save access at

* fix: del no use options

* fix: del no use options

* fix: auto test banned models need return when get from redis error happend

* fix: remove channel db hook

* chore: clean detail only after insert it

* fix: err print on debug

* fix: cache update

* feat: group consume level rpm ratio

* fix: error return

* feat: decode svg

* fix: check is image

* fix: reply raw 429 message

* feat: req and resp body max size limit

* fix: _ import lint

* fix: get token encoder log

* fix: sum used amount

* fix: delete no need cache

* feat: dashboard rpm

* feat: dashboard tpm

* feat: step modelinfo

* feat: yi

* fix: yi

* feat: debug banned

* chore: bump go mod

* chore: bump go mod

* fix: save model time parse

* feat: fill dash carts gaps

* feat: fill dash carts gaps

* chore: go mod tidy

* feat: dashboard timespan

* feat: dashboard timespan from query

* feat: decouple request paths

* feat: group model tmp limit

* feat: decoupling url paths

* fix: check balance

* refactor: relay handler

* refactor: post relay

* feat: fill gaps before and after point

* fix: qwen long tokens

* feat: get rpm from redis

* fix: fill gaps

* fix: log error

* fix: token not fount err log

* fix: if err resp is not json, replay raw content

* fix: do not save same response body and content

* fix: save resp json or empty

* feat: sort distinct values

* fix: token models

* feat: redis clean expired cache

* feat: atomic model cache

* feat: consume

* feat: group custom model rpm tpm

* fix: models

* fix: v1 route

* fix: cros

* feat: rate limit err log record

* fix: rpush

* fix: dashboard time span

* feat: group model list adjusted tpm rpm

* feat: baichuan model config

* fix: rpm limit recore ignore empty channel id

* feat: disable model config

* feat: internal token

* fix: lint

* fix: recore req to redis

* feat: option from env

* fix: internal token option key

* fix: ignore redis ping error

* fix: ignore redis ping error

* fix: subscription

* fix: subscription

* feat: precheck group balance

* fix: consume nil pointer

* feat: log balance

* feat: ip log

* fix: group disable

* fix: non stream context cancel

* feat: amount log

* fix: balance and amount log format

* fix: do not skip empty

* fix: reason system prompt

* feat: doubao and moonshot model

* feat: disable model config can load existed model

* chore: add shutdown timeout duration to 600 sec

* feat: dashboard data build whit concurrent

* feat: logs data build whit concurrent

* fix: monitor remove banned model

* feat: split think

* fix: skip enpty think

* fix: do not store large resp

* fix: reat limit script

* fix: reat limit use micro second

* fix: ignore gemini input count error

* feat: calude model config

* fix: claude stream usage resp

* fix: claude stream usage resp

* fix: claude stream usage resp

* feat: auto create sqlite dir

* feat: log detail body truncated

* chore: add body conv commend

* feat: monitor ignore error rate compute when is success request

* feat: ollama usage support

* feat: baseurl embed v1 prefix

* feat: limit detail record size

* feat: split think config

* feat: channel default priority

* fix: rate limit message

* feat: channel meta api

* feat: add channel key validate help message

* fix: channel config update

* fix: split think

* fix: claude api

* fix: record total tokens

* chore: bump go mod

* chore: bump go mod

* feat: qwen open source vl models

* fix: qwen2.5 vl tool choice

* feat: stt audio duration

* feat: ali paraformer price

* fix: stt usage

* feat: qwen mt

* fix: render when split skip

* feat: sealos realname check

* feat: gemini usage support

* fix: lint

* fix: error message

* fix: lint

* fix: search token

* fix: no real name limit han message

* feat: gemini model config

* fix: get group error hans message

* fix: get group dashboard models

* feat: channel and token model search

* feat: support ali completions

* feat: internal group and search optimize

* feat: conv gemini tool choice

* fix: gemini empty tool parameters

* chore: env readme

* fix: ci lint
  • Loading branch information
zijiren233 authored Feb 20, 2025
1 parent c67b4ef commit 6641cd9
Show file tree
Hide file tree
Showing 187 changed files with 8,494 additions and 4,945 deletions.
16 changes: 12 additions & 4 deletions service/aiproxy/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
FROM gcr.io/distroless/static:nonroot
FROM alpine:latest

ARG TARGETARCH
COPY bin/service-aiproxy-$TARGETARCH /manager
COPY bin/service-aiproxy-$TARGETARCH /aiproxy

ENV PUID=0 PGID=0 UMASK=022

ENV FFPROBE_ENABLED=true

EXPOSE 3000
USER 65532:65532

ENTRYPOINT ["/manager"]
RUN apk add --no-cache ca-certificates tzdata ffmpeg && \
rm -rf /var/cache/apk/*

ENTRYPOINT ["/aiproxy"]
10 changes: 10 additions & 0 deletions service/aiproxy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,13 @@ sealos run ghcr.io/labring/sealos-cloud-aiproxy-service:latest \
-e cloudDomain=<cloud-domain> \
-e LOG_SQL_DSN=""
```

# Envs

- `ADMIN_KEY`: The admin key for the AI Proxy Service, admin key is used to admin api and relay api, default is empty
- `SEALOS_JWT_KEY`: Used to sealos balance service, default is empty
- `SQL_DSN`: The database connection string, default is empty
- `LOG_SQL_DSN`: The log database connection string, default is empty
- `REDIS_CONN_STRING`: The redis connection string, default is empty
- `BALANCE_SEALOS_CHECK_REAL_NAME_ENABLE`: Whether to check real name, default is `false`
- `BALANCE_SEALOS_NO_REAL_NAME_USED_AMOUNT_LIMIT`: The amount of used balance when the user has no real name, default is `1`
76 changes: 76 additions & 0 deletions service/aiproxy/common/audio/audio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package audio

import (
"errors"
"io"
"os/exec"
"strconv"
"strings"

"github.com/labring/sealos/service/aiproxy/common/config"
)

var ErrAudioDurationNAN = errors.New("audio duration is N/A")

func GetAudioDuration(audio io.Reader) (float64, error) {
if !config.FfprobeEnabled {
return 0, nil
}

ffprobeCmd := exec.Command(
"ffprobe",
"-v", "error",
"-select_streams", "a:0",
"-show_entries", "stream=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
"-i", "-",
)
ffprobeCmd.Stdin = audio
output, err := ffprobeCmd.Output()
if err != nil {
return 0, err
}

str := strings.TrimSpace(string(output))

if str == "" || str == "N/A" {
return 0, ErrAudioDurationNAN
}

duration, err := strconv.ParseFloat(str, 64)
if err != nil {
return 0, err
}
return duration, nil
}

func GetAudioDurationFromFilePath(filePath string) (float64, error) {
if !config.FfprobeEnabled {
return 0, nil
}

ffprobeCmd := exec.Command(
"ffprobe",
"-v", "error",
"-select_streams", "a:0",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
"-i", filePath,
)
output, err := ffprobeCmd.Output()
if err != nil {
return 0, err
}

str := strings.TrimSpace(string(output))

if str == "" || str == "N/A" {
return 0, ErrAudioDurationNAN
}

duration, err := strconv.ParseFloat(str, 64)
if err != nil {
return 0, err
}
return duration, nil
}
18 changes: 14 additions & 4 deletions service/aiproxy/common/balance/balance.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
package balance

import "context"
import (
"context"

"github.com/labring/sealos/service/aiproxy/model"
)

type GroupBalance interface {
GetGroupRemainBalance(ctx context.Context, group string) (float64, PostGroupConsumer, error)
GetGroupRemainBalance(ctx context.Context, group model.GroupCache) (float64, PostGroupConsumer, error)
}

type PostGroupConsumer interface {
PostGroupConsume(ctx context.Context, tokenName string, usage float64) (float64, error)
GetBalance(ctx context.Context) (float64, error)
}

var Default GroupBalance = NewMockGroupBalance()
var (
mock GroupBalance = NewMockGroupBalance()
Default = mock
)

func MockGetGroupRemainBalance(ctx context.Context, group model.GroupCache) (float64, PostGroupConsumer, error) {
return mock.GetGroupRemainBalance(ctx, group)
}
12 changes: 6 additions & 6 deletions service/aiproxy/common/balance/mock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package balance

import "context"
import (
"context"

"github.com/labring/sealos/service/aiproxy/model"
)

var _ GroupBalance = (*MockGroupBalance)(nil)

Expand All @@ -14,14 +18,10 @@ func NewMockGroupBalance() *MockGroupBalance {
return &MockGroupBalance{}
}

func (q *MockGroupBalance) GetGroupRemainBalance(_ context.Context, _ string) (float64, PostGroupConsumer, error) {
func (q *MockGroupBalance) GetGroupRemainBalance(_ context.Context, _ model.GroupCache) (float64, PostGroupConsumer, error) {
return mockBalance, q, nil
}

func (q *MockGroupBalance) PostGroupConsume(_ context.Context, _ string, usage float64) (float64, error) {
return usage, nil
}

func (q *MockGroupBalance) GetBalance(_ context.Context) (float64, error) {
return mockBalance, nil
}
120 changes: 104 additions & 16 deletions service/aiproxy/common/balance/sealos.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/labring/sealos/service/aiproxy/common"
"github.com/labring/sealos/service/aiproxy/common/conv"
"github.com/labring/sealos/service/aiproxy/common/env"
"github.com/labring/sealos/service/aiproxy/model"
"github.com/redis/go-redis/v9"
"github.com/shopspring/decimal"
log "github.com/sirupsen/logrus"
Expand All @@ -25,6 +26,7 @@ const (
appType = "LLM-TOKEN"
sealosRequester = "sealos-admin"
sealosGroupBalanceKey = "sealos:balance:%s"
sealosUserRealNameKey = "sealos:realName:%s"
getBalanceRetry = 3
)

Expand All @@ -38,6 +40,11 @@ var (
sealosCacheExpire = 3 * time.Minute
)

var (
sealosCheckRealNameEnable = env.Bool("BALANCE_SEALOS_CHECK_REAL_NAME_ENABLE", false)
sealosNoRealNameUsedAmountLimit = env.Float64("BALANCE_SEALOS_NO_REAL_NAME_USED_AMOUNT_LIMIT", 1)
)

type Sealos struct {
accountURL string
}
Expand Down Expand Up @@ -145,12 +152,20 @@ func cacheDecreaseGroupBalance(ctx context.Context, group string, amount int64)
return decreaseGroupBalanceScript.Run(ctx, common.RDB, []string{fmt.Sprintf(sealosGroupBalanceKey, group)}, amount).Err()
}

func (s *Sealos) GetGroupRemainBalance(ctx context.Context, group string) (float64, PostGroupConsumer, error) {
var ErrNoRealNameUsedAmountLimit = errors.New("达到未实名用户使用额度限制,请实名认证")

func (s *Sealos) GetGroupRemainBalance(ctx context.Context, group model.GroupCache) (float64, PostGroupConsumer, error) {
var errs []error
for i := 0; ; i++ {
balance, consumer, err := s.getGroupRemainBalance(ctx, group)
balance, userUID, err := s.getGroupRemainBalance(ctx, group.ID)
if err == nil {
return balance, consumer, nil
if sealosCheckRealNameEnable &&
group.UsedAmount > sealosNoRealNameUsedAmountLimit &&
!s.checkRealName(ctx, userUID) {
return 0, nil, ErrNoRealNameUsedAmountLimit
}
return decimal.NewFromInt(balance).Div(decimalBalancePrecision).InexactFloat64(),
newSealosPostGroupConsumer(s.accountURL, group.ID, userUID), nil
}
errs = append(errs, err)
if i == getBalanceRetry-1 {
Expand All @@ -160,26 +175,105 @@ func (s *Sealos) GetGroupRemainBalance(ctx context.Context, group string) (float
}
}

func cacheGetUserRealName(ctx context.Context, userUID string) (bool, error) {
if !common.RedisEnabled || !sealosRedisCacheEnable {
return true, redis.Nil
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
realName, err := common.RDB.Get(ctx, fmt.Sprintf(sealosUserRealNameKey, userUID)).Bool()
if err != nil {
return false, err
}
return realName, nil
}

func cacheSetUserRealName(ctx context.Context, userUID string, realName bool) error {
if !common.RedisEnabled || !sealosRedisCacheEnable {
return nil
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
var expireTime time.Duration
if realName {
expireTime = time.Hour * 12
} else {
expireTime = time.Minute * 1
}
return common.RDB.Set(ctx, fmt.Sprintf(sealosUserRealNameKey, userUID), realName, expireTime).Err()
}

func (s *Sealos) checkRealName(ctx context.Context, userUID string) bool {
if cache, err := cacheGetUserRealName(ctx, userUID); err == nil {
return cache
} else if !errors.Is(err, redis.Nil) {
log.Errorf("get user (%s) real name cache failed: %s", userUID, err)
}

realName, err := s.fetchRealNameFromAPI(ctx, userUID)
if err != nil {
log.Errorf("fetch user (%s) real name failed: %s", userUID, err)
return true
}

if err := cacheSetUserRealName(ctx, userUID, realName); err != nil {
log.Errorf("set user (%s) real name cache failed: %s", userUID, err)
}

return realName
}

type sealosGetRealNameInfoResp struct {
IsRealName bool `json:"isRealName"`
Error string `json:"error"`
}

func (s *Sealos) fetchRealNameFromAPI(ctx context.Context, userUID string) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
fmt.Sprintf("%s/admin/v1alpha1/real-name-info?userUID=%s", s.accountURL, userUID), nil)
if err != nil {
return false, err
}

req.Header.Set("Authorization", "Bearer "+jwtToken)
resp, err := sealosHTTPClient.Do(req)
if err != nil {
return false, err
}
defer resp.Body.Close()

var sealosResp sealosGetRealNameInfoResp
if err := json.NewDecoder(resp.Body).Decode(&sealosResp); err != nil {
return false, err
}

if resp.StatusCode != http.StatusOK || sealosResp.Error != "" {
return false, fmt.Errorf("get user (%s) real name failed with status code %d, error: %s", userUID, resp.StatusCode, sealosResp.Error)
}

return sealosResp.IsRealName, nil
}

// GroupBalance interface implementation
func (s *Sealos) getGroupRemainBalance(ctx context.Context, group string) (float64, PostGroupConsumer, error) {
func (s *Sealos) getGroupRemainBalance(ctx context.Context, group string) (int64, string, error) {
if cache, err := cacheGetGroupBalance(ctx, group); err == nil && cache.UserUID != "" {
return decimal.NewFromInt(cache.Balance).Div(decimalBalancePrecision).InexactFloat64(),
newSealosPostGroupConsumer(s.accountURL, group, cache.UserUID, cache.Balance), nil
return cache.Balance, cache.UserUID, nil
} else if err != nil && !errors.Is(err, redis.Nil) {
log.Errorf("get group (%s) balance cache failed: %s", group, err)
}

balance, userUID, err := s.fetchBalanceFromAPI(ctx, group)
if err != nil {
return 0, nil, err
return 0, "", err
}

if err := cacheSetGroupBalance(ctx, group, balance, userUID); err != nil {
log.Errorf("set group (%s) balance cache failed: %s", group, err)
}

return decimal.NewFromInt(balance).Div(decimalBalancePrecision).InexactFloat64(),
newSealosPostGroupConsumer(s.accountURL, group, userUID, balance), nil
return balance, userUID, nil
}

func (s *Sealos) fetchBalanceFromAPI(ctx context.Context, group string) (balance int64, userUID string, err error) {
Expand Down Expand Up @@ -218,22 +312,16 @@ type SealosPostGroupConsumer struct {
accountURL string
group string
uid string
balance int64
}

func newSealosPostGroupConsumer(accountURL, group, uid string, balance int64) *SealosPostGroupConsumer {
func newSealosPostGroupConsumer(accountURL, group, uid string) *SealosPostGroupConsumer {
return &SealosPostGroupConsumer{
accountURL: accountURL,
group: group,
uid: uid,
balance: balance,
}
}

func (s *SealosPostGroupConsumer) GetBalance(_ context.Context) (float64, error) {
return decimal.NewFromInt(s.balance).Div(decimalBalancePrecision).InexactFloat64(), nil
}

func (s *SealosPostGroupConsumer) PostGroupConsume(ctx context.Context, tokenName string, usage float64) (float64, error) {
amount := s.calculateAmount(usage)

Expand Down
Loading

0 comments on commit 6641cd9

Please sign in to comment.