Skip to content

Commit

Permalink
Handling failed request properly for streaming client. Added request …
Browse files Browse the repository at this point in the history
…id in output log
  • Loading branch information
gangmuk committed Feb 24, 2025
1 parent fcc0896 commit 1b8805f
Showing 1 changed file with 75 additions and 58 deletions.
133 changes: 75 additions & 58 deletions benchmarks/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@

logging.basicConfig(level=logging.INFO)


async def send_request_streaming(client: openai.AsyncOpenAI,
model: str,
endpoint: str,
prompt: str,
output_file: str,
):
model: str,
endpoint: str,
prompt: str,
output_file: str,
request_id: int):
start_time = asyncio.get_event_loop().time()
first_response_time = None

try:
logging.info(f"Request {request_id}: Starting streaming request to {endpoint}")
stream = await client.chat.completions.create(
model=model,
messages=prompt,
Expand All @@ -30,74 +33,86 @@ async def send_request_streaming(client: openai.AsyncOpenAI,
stream=True,
stream_options={"include_usage": True},
)

text_chunks = []
prompt_tokens = 0
output_tokens = 0
total_tokens = 0

async for chunk in stream:
if chunk.choices:
if chunk.choices[0].delta.content is not None:
if not first_response_time:
first_response_time = asyncio.get_event_loop().time()
output_text = chunk.choices[0].delta.content
text_chunks.append(output_text)
prompt_tokens = chunk.usage.prompt_tokens
output_tokens = chunk.usage.completion_tokens
total_tokens = chunk.usage.total_tokens
response = "".join(text_chunks)
logging.info(result)
if response.status_code == 200:
response_time = asyncio.get_event_loop().time()
latency = response_time - start_time
throughput = output_tokens / latency
ttft = first_response_time - start_time
tpot = (response_time - first_response_time) / output_tokens
result = {
"status_code": response.status_code,
"input": prompt,
"output": response,
"prompt_tokens": prompt_tokens,
"output_tokens": output_tokens,
"total_tokens": total_tokens,
"latency": latency,
"throughput": throughput,
"start_time": start_time,
"current_time": asyncio.get_event_loop().time(),
"ttft": ttft,
"tpot": tpot,
}
else:
logging.error(f"Request failed status-code: {response.status_code}, raw response: {response.text}")
result = {
"status_code": response.status_code,
"input": prompt,
"output": response,
"prompt_tokens": prompt_tokens,
"output_tokens": None,
"total_tokens": total_tokens,
"latency": latency,
"throughput": None,
"start_time": start_time,
"current_time": asyncio.get_event_loop().time(),
"ttft": None,
"tpot": None,
}
try:
async for chunk in stream:
if chunk.choices:
if chunk.choices[0].delta.content is not None:
if not first_response_time:
first_response_time = asyncio.get_event_loop().time()
output_text = chunk.choices[0].delta.content
text_chunks.append(output_text)
prompt_tokens = chunk.usage.prompt_tokens
output_tokens = chunk.usage.completion_tokens
total_tokens = chunk.usage.total_tokens
except Exception as stream_error:
# Handle errors during streaming
logging.error(f"Request {request_id}: Stream interrupted: {type(stream_error).__name__}: {str(stream_error)}")
# Still try to use what we've received so far
if not text_chunks:
raise # Re-raise if we got nothing at all

response_text = "".join(text_chunks)
response_time = asyncio.get_event_loop().time()
latency = response_time - start_time
throughput = output_tokens / latency if output_tokens > 0 else 0
ttft = first_response_time - start_time if first_response_time else None
tpot = (response_time - first_response_time) / output_tokens if first_response_time and output_tokens > 0 else None

result = {
"request_id": request_id,
"status": "success",
"input": prompt,
"output": response_text,
"prompt_tokens": prompt_tokens,
"output_tokens": output_tokens,
"total_tokens": total_tokens,
"latency": latency,
"throughput": throughput,
"start_time": start_time,
"end_time": response_time,
"ttft": ttft,
"tpot": tpot,
}

# Write result to JSONL file
logging.info(f"Request {request_id}: Completed successfully. Tokens: {total_tokens}, Latency: {latency:.2f}s")
output_file.write(json.dumps(result) + "\n")
output_file.flush() # Ensure data is written immediately to the file
return result

except Exception as e:
logging.error(f"Error sending request to at {endpoint}: {str(e)}")
traceback.print_exc()
return None
error_time = asyncio.get_event_loop().time()
# Determine error type based on exception class
error_type = type(e).__name__

error_result = {
"request_id": request_id,
"status": "error",
"error_type": error_type,
"error_message": str(e),
"error_traceback": traceback.format_exc(),
"input": prompt,
"latency": error_time - start_time,
"start_time": start_time,
"end_time": error_time
}
logging.error(f"Request {request_id}: Error ({error_type}): {str(e)}")
output_file.write(json.dumps(error_result) + "\n")
output_file.flush()
return error_result

async def benchmark_streaming(client: openai.AsyncOpenAI,
endpoint: str,
model: str,
load_struct: List,
output_file: io.TextIOWrapper):

request_id = 0
batch_tasks = []
base_time = time.time()
num_requests = 0
Expand All @@ -116,8 +131,10 @@ async def benchmark_streaming(client: openai.AsyncOpenAI,
model = model,
endpoint = endpoint,
prompt = formatted_prompt,
output_file = output_file)
output_file = output_file,
request_id = request_id)
)
request_id += 1
batch_tasks.append(task)
num_requests += len(requests)
await asyncio.gather(*batch_tasks)
Expand Down

0 comments on commit 1b8805f

Please sign in to comment.