Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add debug logs to print COPY time for embedding jobs #360

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/publish-cli-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
type: string
description: "CLI version"
required: true
default: "0.4.0"
default: "0.5.1"
IMAGE_NAME:
type: string
description: "Container image name to tag"
Expand Down
2 changes: 1 addition & 1 deletion lantern_cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "lantern_cli"
version = "0.5.0"
version = "0.5.1"
edition = "2021"

[[bin]]
Expand Down
8 changes: 8 additions & 0 deletions lantern_cli/src/embeddings/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,13 +512,18 @@ async fn db_exporter_worker(
// if job is run in streaming mode
// it will write results to target table each 10 seconds (if collected rows are
// more than 50) or if collected row count is more than 1000 rows
let copy_start = Instant::now();
if !buf.is_empty() {
writer_sink.send(buf.split().freeze()).await?;
}
writer_sink.as_mut().finish().await?;
transaction.batch_execute(&update_sql).await?;
transaction.commit().await?;
transaction = client.transaction().await?;

let duration = copy_start.elapsed().as_millis();
logger.debug(&format!("Copied {collected_row_cnt} rows in {duration}ms"));

writer_sink = Box::pin(
transaction
.copy_in(&format!("COPY {temp_table_name} FROM stdin"))
Expand Down Expand Up @@ -546,12 +551,15 @@ async fn db_exporter_worker(
return Ok(processed_row_cnt);
}

let copy_start = Instant::now();
if !buf.is_empty() {
writer_sink.send(buf.split().freeze()).await?
}
writer_sink.as_mut().finish().await?;
transaction.batch_execute(&update_sql).await?;
transaction.commit().await?;
let duration = copy_start.elapsed().as_millis();
logger.debug(&format!("Copied {collected_row_cnt} rows in {duration}ms"));
logger.info(&format!(
"Embeddings exported to table {} under column {}",
&table, &column
Expand Down
Loading