Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose prometheus metrics from operator #138

Merged
merged 2 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 107 additions & 156 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 3 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,8 @@ keramik-common = { path = "./common/", default-features = false }
multiaddr = "0.17"
multibase = "0.9.1"
multihash = "0.18"
opentelemetry = { version = "0.18", features = [
"metrics",
"trace",
"rt-tokio",
] }
opentelemetry-otlp = { version = "0.11", features = [
opentelemetry = { version = "0.21", features = ["metrics", "trace"] }
opentelemetry-otlp = { version = "0.14", features = [
"metrics",
"trace",
"tokio",
Expand All @@ -32,7 +28,7 @@ serde_json = "1"
tokio = { version = "1", features = ["full", "tracing"] }
tonic = { version = "0.8" }
tracing = "0.1.37"
tracing-opentelemetry = "0.18"
tracing-opentelemetry = "0.22"
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
tracing-log = "0.1.3"

Expand Down
17 changes: 16 additions & 1 deletion common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@ edition = "2021"
[features]
default = []
telemetry = [
"dep:hyper",
"dep:opentelemetry",
"dep:opentelemetry-otlp",
"dep:opentelemetry-prometheus",
"dep:opentelemetry_sdk",
"dep:prometheus",
"dep:tokio",
"dep:tracing",
"dep:tracing-opentelemetry",
"dep:tracing-subscriber",
"dep:tracing",
]
tokio-console = ["telemetry", "dep:console-subscriber"]

Expand All @@ -26,3 +31,13 @@ serde.workspace = true
tracing-opentelemetry = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
opentelemetry-prometheus = { version = "0.14.1", optional = true, features = [
"prometheus-encoding",
] }
prometheus = { version = "0.13.3", optional = true }
opentelemetry_sdk = { version = "0.21.2", optional = true, features = [
"metrics",
"rt-tokio",
] }
hyper = { version = "0.14", features = ["full"], optional = true }
tokio = { workspace = true, optional = true }
152 changes: 112 additions & 40 deletions common/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,65 +1,49 @@
//! Provides helper functions for initializing telemetry collection and publication.
use std::time::Duration;
use std::{convert::Infallible, net::SocketAddr, sync::OnceLock, time::Duration};

use anyhow::Result;
use opentelemetry::{
runtime,
sdk::{
export::metrics::aggregation,
metrics::{controllers::BasicController, selectors},
},
use hyper::{
body::Body,
http::HeaderValue,
service::{make_service_fn, service_fn},
Request, Response,
};
use opentelemetry::global;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{
metrics::{
reader::{DefaultAggregationSelector, DefaultTemporalitySelector},
MeterProvider,
},
runtime, trace, Resource,
};
use prometheus::{Encoder, TextEncoder};
use tokio::{sync::oneshot, task::JoinHandle};
use tracing_subscriber::{filter::LevelFilter, prelude::*, EnvFilter, Registry};

/// Initialize tracing and metrics
pub async fn init(otlp_endpoint: String) -> Result<BasicController> {
// create a new prometheus registry
static PROM_REGISTRY: OnceLock<prometheus::Registry> = OnceLock::new();

/// Initialize tracing
pub async fn init_tracing(otlp_endpoint: String) -> Result<()> {
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(otlp_endpoint.clone()),
)
.with_trace_config(opentelemetry::sdk::trace::config().with_resource(
opentelemetry::sdk::Resource::new(vec![
.with_trace_config(trace::config().with_resource(Resource::new(vec![
opentelemetry::KeyValue::new(
"hostname",
gethostname::gethostname()
.into_string()
.expect("hostname should be valid utf-8"),
),
opentelemetry::KeyValue::new(
"service.name",
"keramik",
)]),
))
opentelemetry::KeyValue::new("service.name", "keramik"),
])))
.install_batch(runtime::Tokio)?;

let meter = opentelemetry_otlp::new_pipeline()
.metrics(
selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]),
aggregation::cumulative_temporality_selector(),
runtime::Tokio,
)
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(otlp_endpoint),
)
.with_resource(opentelemetry::sdk::Resource::new(vec![
opentelemetry::KeyValue::new(
"hostname",
gethostname::gethostname()
.into_string()
.expect("hostname should be valid utf-8"),
),
opentelemetry::KeyValue::new("service.name", "keramik"),
]))
.with_period(Duration::from_secs(10))
// Build starts the meter and sets it as the global meter provider
.build()?;

// Setup filters
// Default to INFO if no env is specified
let log_filter = EnvFilter::builder()
Expand Down Expand Up @@ -90,5 +74,93 @@ pub async fn init(otlp_endpoint: String) -> Result<BasicController> {
// Initialize tracing
tracing::subscriber::set_global_default(collector)?;

Ok(())
}

/// Initialize metrics such that metrics are pushed to the otlp_endpoint.
pub async fn init_metrics_otlp(otlp_endpoint: String) -> Result<MeterProvider> {
// configure OpenTelemetry to use this registry
let meter = opentelemetry_otlp::new_pipeline()
.metrics(runtime::Tokio)
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(otlp_endpoint),
)
.with_resource(Resource::new(vec![
opentelemetry::KeyValue::new(
"hostname",
gethostname::gethostname()
.into_string()
.expect("hostname should be valid utf-8"),
),
opentelemetry::KeyValue::new("service.name", "keramik"),
]))
.with_period(Duration::from_secs(10))
.with_aggregation_selector(DefaultAggregationSelector::new())
.with_temporality_selector(DefaultTemporalitySelector::new())
// Build starts the meter and sets it as the global meter provider
.build()?;

Ok(meter)
}

type MetricsServerShutdown = oneshot::Sender<()>;
type MetricsServerJoin = JoinHandle<Result<(), hyper::Error>>;

/// Initialize metrics such that metrics are expose via a Prometheus scrape endpoint.
/// A send on the MetricsServerShutdown channel will cause the server to shutdown gracefully.
pub async fn init_metrics_prom(
addr: &SocketAddr,
) -> Result<(MeterProvider, MetricsServerShutdown, MetricsServerJoin)> {
let prom_registry = prometheus::Registry::default();
let prom_exporter = opentelemetry_prometheus::exporter()
.with_registry(prom_registry.clone())
.build()?;
PROM_REGISTRY
.set(prom_registry)
.expect("should be able to set PROM_REGISTRY");
let meter = MeterProvider::builder().with_reader(prom_exporter).build();
global::set_meter_provider(meter.clone());

let (shutdown, join) = start_metrics_server(addr);
Ok((meter, shutdown, join))
}

// /metrics scrape endpoin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment spelling, "endpoint" 😇

async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
let mut data = Vec::new();
let encoder = TextEncoder::new();
let metric_families = PROM_REGISTRY
.get()
.expect("PROM_REGISTRY should be initialized")
.gather();
encoder.encode(&metric_families, &mut data).unwrap();

// Add EOF marker, this is part of openmetrics spec but not prometheus
// So we have to add it ourselves.
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#overall-structure
data.extend(b"# EOF\n");

let mut resp = Response::new(Body::from(data));
resp.headers_mut().insert(
"Content-Type",
// Use OpenMetrics content type so prometheus knows to parse it accordingly
HeaderValue::from_static("application/openmetrics-text"),
);
Ok(resp)
}

// Start metrics server.
// Sending on the returned channel will cause the server to shutdown gracefully.
fn start_metrics_server(addr: &SocketAddr) -> (MetricsServerShutdown, MetricsServerJoin) {
let (tx, rx) = oneshot::channel::<()>();
let service = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });

let server = hyper::Server::bind(addr)
.serve(service)
.with_graceful_shutdown(async {
rx.await.ok();
});
(tx, tokio::spawn(server))
}
7 changes: 7 additions & 0 deletions k8s/operator/manifests/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ spec:
targetPort: 8080
protocol: TCP
name: http
- port: 9464
targetPort: 9464
protocol: TCP
name: metrics
selector:
app: keramik-operator
---
Expand Down Expand Up @@ -119,6 +123,9 @@ spec:
- name: http
containerPort: 8080
protocol: TCP
- name: metrics
containerPort: 9464
protocol: TCP
env:
# We are pointing to tempo or grafana tracing agent's otlp grpc receiver port
- name: OPERATOR_OTLP_ENDPOINT
Expand Down
19 changes: 15 additions & 4 deletions operator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
#![deny(missing_docs)]
use anyhow::Result;
use clap::{command, Parser, Subcommand};
use opentelemetry::{global::shutdown_tracer_provider, Context};
use keramik_common::telemetry;
use opentelemetry::global::{shutdown_meter_provider, shutdown_tracer_provider};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
Expand All @@ -16,6 +17,9 @@ struct Cli {
default_value = "http://localhost:4317"
)]
otlp_endpoint: String,

#[arg(long, env = "OPERATOR_PROM_BIND", default_value = "0.0.0.0:9464")]
prom_bind: String,
}

/// Available Subcommands
Expand All @@ -30,7 +34,9 @@ async fn main() -> Result<()> {
tracing_log::LogTracer::init()?;

let args = Cli::parse();
let metrics_controller = keramik_common::telemetry::init(args.otlp_endpoint.clone()).await?;
telemetry::init_tracing(args.otlp_endpoint.clone()).await?;
let (metrics_controller, metrics_server_shutdown, metrics_server_join) =
telemetry::init_metrics_prom(&args.prom_bind.parse()?).await?;

match args.command {
Command::Daemon => {
Expand All @@ -41,10 +47,15 @@ async fn main() -> Result<()> {
}
};

// Shutdown the metrics server
let _ = metrics_server_shutdown.send(());
metrics_server_join.await??;

// Flush traces and metrics before shutdown
shutdown_tracer_provider();
let cx = Context::default();
metrics_controller.stop(&cx)?;
metrics_controller.force_flush()?;
drop(metrics_controller);
shutdown_meter_provider();

Ok(())
}
8 changes: 8 additions & 0 deletions operator/src/network/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use kube::{
},
Resource,
};
use opentelemetry::global;
use rand::RngCore;
use tracing::{debug, error, info, trace, warn};

Expand Down Expand Up @@ -202,6 +203,13 @@ async fn reconcile(
network: Arc<Network>,
cx: Arc<Context<impl IpfsRpcClient, impl RngCore, impl Clock>>,
) -> Result<Action, Error> {
let meter = global::meter("keramik");
let runs = meter
.u64_counter("network_reconcile_count")
.with_description("Number of network reconciles")
.init();
runs.add(1, &[]);

let spec = network.spec();
debug!(?spec, "reconcile");

Expand Down
8 changes: 8 additions & 0 deletions operator/src/simulation/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use kube::{
},
Resource, ResourceExt,
};
use opentelemetry::global;
use rand::{distributions::Alphanumeric, thread_rng, Rng, RngCore};

use tracing::{debug, error, info};
Expand Down Expand Up @@ -128,6 +129,13 @@ async fn reconcile(
simulation: Arc<Simulation>,
cx: Arc<Context<impl IpfsRpcClient, impl RngCore, impl Clock>>,
) -> Result<Action, Error> {
let meter = global::meter("keramik");
let runs = meter
.u64_counter("simulation_reconcile_count")
.with_description("Number of simulation reconciles")
.init();
runs.add(1, &[]);

let spec = simulation.spec();

let status = if let Some(status) = &simulation.status {
Expand Down
12 changes: 7 additions & 5 deletions runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use keramik_common::telemetry;

use anyhow::Result;
use clap::{Parser, Subcommand};
use opentelemetry::global::{shutdown_meter_provider, shutdown_tracer_provider};
use opentelemetry::{global, KeyValue};
use opentelemetry::{global::shutdown_tracer_provider, Context};
use tracing::info;

use crate::{bootstrap::bootstrap, simulate::simulate};
Expand Down Expand Up @@ -68,16 +68,16 @@ async fn main() -> Result<()> {
tracing_log::LogTracer::init()?;

let args = Cli::parse();
let cx = Context::current();
let metrics_controller = telemetry::init(args.otlp_endpoint.clone()).await?;
telemetry::init_tracing(args.otlp_endpoint.clone()).await?;
let metrics_controller = telemetry::init_metrics_otlp(args.otlp_endpoint.clone()).await?;

let meter = global::meter("keramik");
let runs = meter
.u64_counter("runner_runs")
.with_description("Number of runs of the runner")
.init();

runs.add(&cx, 1, &[KeyValue::new("command", args.command.name())]);
runs.add(1, &[KeyValue::new("command", args.command.name())]);

info!(?args.command, ?args.otlp_endpoint, "starting runner");
let success = match args.command {
Expand All @@ -88,7 +88,9 @@ async fn main() -> Result<()> {

// Flush traces and metrics before shutdown
shutdown_tracer_provider();
metrics_controller.stop(&cx)?;
metrics_controller.force_flush()?;
drop(metrics_controller);
shutdown_meter_provider();

// This fixes lost metrics not sure why :(
// Seems to be related to the inflight gRPC request getting cancelled
Expand Down
Loading
Loading