Skip to content

Commit

Permalink
go reports-service: Move OpenAi batch queue params to env vars
Browse files Browse the repository at this point in the history
  • Loading branch information
XxRoloxX committed Nov 2, 2024
1 parent fc772b5 commit 1bedafe
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 255 deletions.
2 changes: 2 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ REPORTS_PENDING_BATCH_REDIS_URL=reports-redis:6379
REPORTS_PENDING_BATCH_REDIS_PASSWORD=password
REPORTS_PENDING_BATCH_REDIS_DB=1
REPORTS_REDISINSIGHT_PORT=5540
REPORTS_MAX_IN_PRORESS_TOKENS=2000000
REPORTS_MAX_OPENAI_OUTPUT_COMPLETION_TOKENS=4096


# Logs ingestion service
Expand Down
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ services:
- REPORT_GENERATED_BROKER_TOPIC=${REPORT_GENERATED_BROKER_TOPIC}
- REPORT_REQUEST_FAILED_BROKER_TOPIC=${REPORT_REQUEST_FAILED_BROKER_TOPIC}
- REPORT_REQUESTED_BROKER_TOPIC=${REPORT_REQUESTED_BROKER_TOPIC}
- REPORTS_MAX_IN_PRORESS_TOKENS=${REPORTS_MAX_IN_PRORESS_TOKENS}
- REPORTS_MAX_OPENAI_OUTPUT_COMPLETION_TOKENS=${REPORTS_MAX_OPENAI_OUTPUT_COMPLETION_TOKENS}
- SWAGGER_HOST=${REPORTS_SERVICE_HOST}
- VIRTUAL_HOST=${REPORTS_PRODUCTION_HOST}
- LETSENCRYPT_HOST=${REPORTS_PRODUCTION_HOST}
Expand Down Expand Up @@ -132,6 +134,7 @@ services:
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL_DIFFERENT_HOST://0.0.0.0:9094,EXTERNAL_SAME_HOST://0.0.0.0:9095"
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092,EXTERNAL_DIFFERENT_HOST://${KAFKA_EXTERNAL_HOSTNAME}:9094,EXTERNAL_SAME_HOST://kafka:9095"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL_DIFFERENT_HOST:SASL_PLAINTEXT,EXTERNAL_SAME_HOST:SASL_PLAINTEXT"
KAFKA_CFG_MESSAGE_MAX_BYTES: 5242880
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CLIENT_USERS: ${KAFKA_CLIENT_USERNAME}
KAFKA_CLIENT_PASSWORDS: ${KAFKA_CLIENT_PASSWORD}
Expand Down
11 changes: 3 additions & 8 deletions go/services/reports/cmd/reports/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@ package main
import (
"context"
"fmt"
"net"
"net/http"
"os"

elasticsearch "github.com/Magpie-Monitor/magpie-monitor/pkg/elasticsearch"
sharedrepositories "github.com/Magpie-Monitor/magpie-monitor/pkg/repositories"
"github.com/Magpie-Monitor/magpie-monitor/pkg/routing"
Expand All @@ -22,6 +18,9 @@ import (
"go.uber.org/fx"
"go.uber.org/fx/fxevent"
"go.uber.org/zap"
"net"
"net/http"
"os"
)

type ServerParams struct {
Expand Down Expand Up @@ -118,10 +117,6 @@ func main() {

openai.NewBatchPoller,

openai.ProvideAsPendingBatchRepository(
openai.NewRedisPendingBatchRepository,
),

elasticsearch.NewElasticSearchLogsDbClient,
sharedrepositories.ProvideAsNodeLogsRepository(
sharedrepositories.NewElasticSearchNodeLogsRepository,
Expand Down
67 changes: 0 additions & 67 deletions go/services/reports/internal/brokers/job_scheduled.go

This file was deleted.

39 changes: 29 additions & 10 deletions go/services/reports/pkg/openai/batch_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ const (
OpenAiBatchStatus__Cancelled = "cancelled"
)
const (
CHARS_PER_OPENAI_TOKEN = 5
MAX_OPENAI_OUTPUT_COMPLETION_TOKENS = 4096
// MAX_IN_PROGRESS_TOKENS = 2000000
MAX_IN_PROGRESS_TOKENS = 20000
CHARS_PER_OPENAI_TOKEN = 5
)

const (
BATCH_AWAITING_INTERVAL_SECONDS_KEY = "REPORTS_BATCH_AWAITING_INTERVAL_SECONDS"
BATCH_AWAITING_INTERVAL_SECONDS_KEY = "REPORTS_BATCH_AWAITING_INTERVAL_SECONDS"
REPORTS_MAX_IN_PRORESS_TOKENS_KEY = "REPORTS_MAX_IN_PRORESS_TOKENS"
MAX_OPENAI_OUTPUT_COMPLETION_TOKENS_KEY = "REPORTS_MAX_OPENAI_OUTPUT_COMPLETION_TOKENS"
)

type BatchPoller struct {
Expand All @@ -41,13 +40,18 @@ type BatchPoller struct {
scheduledJobsRepository scheduledjobs.ScheduledJobRepository[*OpenAiJob]
pollingIntervalSeconds int
batchAwaitingIntervalSeconds int
maxInProgressTokens int
maxCompletionOutputTokens int
}

func NewBatchPoller(client *Client, scheduledJobsRepository scheduledjobs.ScheduledJobRepository[*OpenAiJob]) *BatchPoller {

envs.ValidateEnvs("Missing envs for openai batch poller",
[]string{POLLING_INTERVAL_SECONDS_KEY,
BATCH_AWAITING_INTERVAL_SECONDS_KEY})
BATCH_AWAITING_INTERVAL_SECONDS_KEY,
REPORTS_MAX_IN_PRORESS_TOKENS_KEY,
MAX_OPENAI_OUTPUT_COMPLETION_TOKENS_KEY,
})

pollingIntervalSeconds := os.Getenv(POLLING_INTERVAL_SECONDS_KEY)
pollingIntervalSecondsInt, err := strconv.Atoi(pollingIntervalSeconds)
Expand All @@ -61,12 +65,27 @@ func NewBatchPoller(client *Client, scheduledJobsRepository scheduledjobs.Schedu
panic(fmt.Sprintf("%s is not a number", BATCH_AWAITING_INTERVAL_SECONDS_KEY))
}

maxOutputTokens := os.Getenv(MAX_OPENAI_OUTPUT_COMPLETION_TOKENS_KEY)
maxOutputTokensInt, err := strconv.Atoi(maxOutputTokens)
if err != nil {
panic(fmt.Sprintf("%s is not a number", MAX_OPENAI_OUTPUT_COMPLETION_TOKENS_KEY))
}

maxInProgressTokens := os.Getenv(REPORTS_MAX_IN_PRORESS_TOKENS_KEY)
maxInProgressTokensInt, err := strconv.Atoi(maxInProgressTokens)

if err != nil {
panic(fmt.Sprintf("%s is not a number", MAX_OPENAI_OUTPUT_COMPLETION_TOKENS_KEY))
}

return &BatchPoller{
batches: make(chan *Batch),
client: client,
scheduledJobsRepository: scheduledJobsRepository,
pollingIntervalSeconds: pollingIntervalSecondsInt,
batchAwaitingIntervalSeconds: batchAwaitingIntervalSecondsInt,
maxCompletionOutputTokens: maxOutputTokensInt,
maxInProgressTokens: maxInProgressTokensInt,
}
}

Expand All @@ -86,9 +105,11 @@ func (p *BatchPoller) tokensFromJobs(jobs []*OpenAiJob) (int64, error) {
return int64(completionTokens), nil
}

// Aproximate the tokens for a job based on an average of chars per token and maximum tokens
// that a model might output for each request
func (p *BatchPoller) tokensFromJob(job *OpenAiJob) (int64, error) {

completionTokens := MAX_OPENAI_OUTPUT_COMPLETION_TOKENS * len(job.CompletionRequests)
completionTokens := p.maxCompletionOutputTokens * len(job.CompletionRequests)

batchFile := bytes.NewBufferString("")
err := jsonl.NewJsonLinesEncoder(batchFile).Encode(job.CompletionRequests)
Expand All @@ -99,8 +120,6 @@ func (p *BatchPoller) tokensFromJob(job *OpenAiJob) (int64, error) {

completionTokens += batchFile.Len() / CHARS_PER_OPENAI_TOKEN

p.client.logger.Info("TOKENS FROM BATCH", zap.Any("tokens", completionTokens))

return int64(completionTokens), nil
}

Expand All @@ -121,7 +140,7 @@ func (p *BatchPoller) dequeScheduledJob(enqueuedJobs []*OpenAiJob, pendingJobs [
return err
}

if lastEnqueuedJobTokens+inProgressTokens >= MAX_IN_PROGRESS_TOKENS {
if lastEnqueuedJobTokens+inProgressTokens >= int64(p.maxInProgressTokens) {
p.client.logger.Info("Waiting for jobs to complete before enqueuing next one", zap.Any("newJob", enqueuedJobs[0]))
return nil
}
Expand Down
162 changes: 0 additions & 162 deletions go/services/reports/pkg/openai/pending_batches_repository.go

This file was deleted.

Loading

0 comments on commit 1bedafe

Please sign in to comment.