Skip to content

Commit

Permalink
Parse vertex object with model spec
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Feb 3, 2025
1 parent ec8ccd0 commit 5e7089a
Showing 1 changed file with 19 additions and 21 deletions.
40 changes: 19 additions & 21 deletions rust/serving/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::fmt::Debug;

use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use numaflow_models::models::Vertex;
use rcgen::{generate_simple_self_signed, Certificate, CertifiedKey, KeyPair};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -157,34 +158,31 @@ impl TryFrom<HashMap<String, String>> for Settings {
.decode(source_spec_encoded.as_bytes())
.map_err(|e| ParseConfig(format!("decoding {ENV_VERTEX_OBJ}: {e:?}")))?;

#[derive(Deserialize)]
struct Source {
serving: Serving,
}
#[derive(Deserialize)]
struct Spec {
source: Source,
}

#[derive(Deserialize)]
struct VertexObject {
spec: Spec,
}

let vertex_obj = serde_json::from_slice::<VertexObject>(&source_spec_decoded)
let vertex_obj = serde_json::from_slice::<Vertex>(&source_spec_decoded)
.map_err(|e| ParseConfig(format!("parsing {ENV_VERTEX_OBJ}: {e:?}")))?;

let serving_spec = vertex_obj
.spec
.source
.ok_or_else(|| {
ParseConfig(format!("parsing {ENV_VERTEX_OBJ}: source can not be empty"))
})?
.serving
.ok_or_else(|| {
ParseConfig(format!(
"parsing {ENV_VERTEX_OBJ}: Serving source spec is not found"
))
})?;
// Update tid_header from source_spec
if let Some(msg_id_header_key) = vertex_obj.spec.source.serving.msg_id_header_key {
settings.tid_header = msg_id_header_key;
}
settings.tid_header = serving_spec.msg_id_header_key;

// Update redis.addr from source_spec, currently we only support redis as callback storage
settings.redis.addr = vertex_obj.spec.source.serving.callback_storage.url;
settings.redis.addr = serving_spec.store.url;
settings.redis.ttl_secs = settings.redis.ttl_secs;

if let Some(auth) = vertex_obj.spec.source.serving.auth {
let auth_token = get_secret_from_volume(&auth.token.name, &auth.token.key)
if let Some(auth) = serving_spec.auth {
let token = auth.token.unwrap();
let auth_token = get_secret_from_volume(&token.name, &token.key)
.map_err(|e| ParseConfig(e.to_string()))?;
settings.api_auth_token = Some(auth_token);
}
Expand Down

0 comments on commit 5e7089a

Please sign in to comment.