Skip to content

Commit

Permalink
Merge pull request #353 from teonite/parse_socket_addrs
Browse files Browse the repository at this point in the history
Add config typing for IP/socket addresses
  • Loading branch information
wojcik91 authored Nov 13, 2024
2 parents a630bc4 + 4a2767a commit 5cb7b00
Show file tree
Hide file tree
Showing 15 changed files with 52 additions and 49 deletions.
2 changes: 1 addition & 1 deletion event_sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ homepage = "https://github.com/casper-network/casper-sidecar/"
repository = "https://github.com/casper-network/casper-sidecar/"

[features]
additional-metrics = ["casper-event-types/additional-metrics"]
additional-metrics = ["casper-event-types/additional-metrics", "metrics/additional-metrics"]
testing = []

[dependencies]
Expand Down
4 changes: 2 additions & 2 deletions event_sidecar/src/database/writer_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use anyhow::Context;
use async_trait::async_trait;
use casper_types::AsymmetricType;
#[cfg(feature = "additional-metrics")]
use casper_event_types::metrics;
use metrics::db::DB_OPERATION_TIMES;
use itertools::Itertools;
use tokio::sync::Mutex;
use $crate::{
Expand Down Expand Up @@ -469,7 +469,7 @@ async fn save_event_log(
#[cfg(feature = "additional-metrics")]
fn observe_db_operation_time(operation_name: &str, start: Instant) {
let duration = start.elapsed();
metrics::DB_OPERATION_TIMES
DB_OPERATION_TIMES
.with_label_values(&[operation_name])
.observe(duration.as_nanos() as f64);
}
Expand Down
7 changes: 5 additions & 2 deletions event_sidecar/src/event_stream_server/sse_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,8 @@ fn stream_to_client(
stream_filter,
event_filter,
is_legacy_filter,
#[cfg(feature = "additional-metrics")]
metrics_sender,
)
}

Expand All @@ -617,6 +619,7 @@ fn build_combined_events_stream(
stream_filter: &'static Endpoint,
event_filter: &'static [EventFilter],
is_legacy_filter: bool,
#[cfg(feature = "additional-metrics")] metrics_sender: Sender<()>,
) -> impl Stream<Item = Result<WarpServerSentEvent, RecvError>> + 'static {
UnboundedReceiverStream::new(initial_events)
.map(move |event| {
Expand All @@ -642,7 +645,7 @@ fn build_combined_events_stream(
)
.await;
#[cfg(feature = "additional-metrics")]
if let Some(_) = fitlered_data {
if fitlered_data.is_some() {
let _ = sender.clone().send(()).await;
}
#[allow(clippy::let_and_return)]
Expand Down Expand Up @@ -973,7 +976,7 @@ mod tests {

let stream_filter = path_to_filter(path_filter, true).unwrap();
#[cfg(feature = "additional-metrics")]
let (tx, rx) = channel(1000);
let (tx, _rx) = channel(1000);
let (filter, is_legacy_filter) = get_filter(path_filter, true).unwrap();
// Collect the events emitted by `stream_to_client()` - should not contain duplicates.
let received_events: Vec<Result<WarpServerSentEvent, RecvError>> = stream_to_client(
Expand Down
4 changes: 2 additions & 2 deletions event_sidecar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod utils;
use std::collections::HashMap;
use std::process::ExitCode;
use std::sync::Arc;
use std::{net::IpAddr, path::PathBuf, str::FromStr, time::Duration};
use std::{path::PathBuf, time::Duration};

use crate::types::config::LegacySseApiTag;
use crate::{
Expand Down Expand Up @@ -256,7 +256,7 @@ fn builder(
inbound_sse_data_sender: Sender<SseEvent>,
) -> Result<EventListenerBuilder, Error> {
let node_interface = NodeConnectionInterface {
ip_address: IpAddr::from_str(&connection.ip_address)?,
ip_address: connection.ip_address,
sse_port: connection.sse_port,
rest_port: connection.rest_port,
};
Expand Down
9 changes: 6 additions & 3 deletions event_sidecar/src/testing/testing_config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#[cfg(test)]
use portpicker::Port;
use std::sync::{Arc, Mutex};
use std::{
net::{IpAddr, Ipv4Addr},
sync::{Arc, Mutex},
};
use tempfile::TempDir;

use crate::types::config::{Connection, RestApiServerConfig, SseEventServerConfig, StorageConfig};
Expand Down Expand Up @@ -83,14 +86,14 @@ impl TestingConfig {

pub(crate) fn add_connection(
&mut self,
ip_address: Option<String>,
ip_address: Option<IpAddr>,
sse_port: Option<u16>,
rest_port: Option<u16>,
) -> Port {
let random_port_for_sse = get_port();
let random_port_for_rest = get_port();
let connection = Connection {
ip_address: ip_address.unwrap_or_else(|| "127.0.0.1".to_string()),
ip_address: ip_address.unwrap_or_else(|| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))),
sse_port: sse_port.unwrap_or(random_port_for_sse),
rest_port: rest_port.unwrap_or(random_port_for_rest),
max_attempts: 2,
Expand Down
13 changes: 8 additions & 5 deletions event_sidecar/src/types/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use serde::Deserialize;
use std::net::IpAddr;
use std::string::ToString;
use std::vec;
use std::{
Expand Down Expand Up @@ -71,7 +72,7 @@ impl SseEventServerConfig {

#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
pub struct Connection {
pub ip_address: String,
pub ip_address: IpAddr,
pub sse_port: u16,
pub rest_port: u16,
pub max_attempts: usize,
Expand Down Expand Up @@ -347,12 +348,14 @@ impl Default for AdminApiServerConfig {

#[cfg(any(feature = "testing", test))]
mod tests {
use std::net::Ipv4Addr;

use super::*;

impl Connection {
pub fn example_connection_1() -> Connection {
Connection {
ip_address: "127.0.0.1".to_string(),
ip_address: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
sse_port: 18101,
rest_port: 14101,
max_attempts: 10,
Expand All @@ -367,7 +370,7 @@ mod tests {

pub fn example_connection_2() -> Connection {
Connection {
ip_address: "127.0.0.1".to_string(),
ip_address: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
sse_port: 18102,
rest_port: 14102,
max_attempts: 10,
Expand All @@ -382,7 +385,7 @@ mod tests {

pub fn example_connection_3() -> Connection {
Connection {
ip_address: "127.0.0.1".to_string(),
ip_address: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
sse_port: 18103,
rest_port: 14103,
max_attempts: 10,
Expand All @@ -399,7 +402,7 @@ mod tests {
impl Default for Connection {
fn default() -> Self {
Self {
ip_address: "127.0.0.1".to_string(),
ip_address: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
sse_port: 18101,
rest_port: 14101,
allow_partial_connection: false,
Expand Down
4 changes: 2 additions & 2 deletions event_sidecar/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[cfg(feature = "additional-metrics")]
use crate::metrics::EVENTS_PROCESSED_PER_SECOND;
use metrics::db::EVENTS_PROCESSED_PER_SECOND;
#[cfg(feature = "additional-metrics")]
use std::sync::Arc;
#[cfg(feature = "additional-metrics")]
Expand Down Expand Up @@ -136,7 +136,7 @@ pub fn start_metrics_thread(module_name: String) -> Sender<()> {
let metrics_data_for_thread = metrics_data.clone();
tokio::spawn(async move {
let metrics_data = metrics_data_for_thread;
while let Some(_) = metrics_queue_rx.recv().await {
while metrics_queue_rx.recv().await.is_some() {
let mut guard = metrics_data.lock().await;
guard.observed_events += 1;
drop(guard);
Expand Down
5 changes: 4 additions & 1 deletion metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ repository = "https://github.com/casper-network/casper-sidecar/"

[dependencies]
once_cell = { workspace = true }
prometheus = { version = "0.13.3", features = ["process"] }
prometheus = { version = "0.13.3", features = ["process"] }

[features]
additional-metrics = []
2 changes: 2 additions & 0 deletions metrics/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::REGISTRY;
use once_cell::sync::Lazy;
#[cfg(feature = "additional-metrics")]
use prometheus::GaugeVec;
use prometheus::{HistogramOpts, HistogramVec, Opts};

const RAW_DATA_SIZE_BUCKETS: &[f64; 8] = &[
Expand Down
6 changes: 3 additions & 3 deletions rpc_sidecar/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::SpeculativeExecConfig;
/// Default binding address for the JSON-RPC HTTP server.
///
/// Uses a fixed port per node, but binds on any interface.
const DEFAULT_ADDRESS: &str = "0.0.0.0:0";
const DEFAULT_ADDRESS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
/// Default rate limit in qps.
const DEFAULT_QPS_LIMIT: u64 = 100;
/// Default max body bytes. This is 2.5MB which should be able to accommodate the largest valid
Expand Down Expand Up @@ -74,7 +74,7 @@ pub struct RpcConfig {
/// Setting to enable the HTTP server.
pub enable_server: bool,
/// Address to bind JSON-RPC HTTP server to.
pub address: String,
pub address: SocketAddr,
/// Maximum rate limit in queries per second.
pub qps_limit: u64,
/// Maximum number of bytes to accept in a single request body.
Expand All @@ -88,7 +88,7 @@ impl RpcConfig {
pub fn new() -> Self {
RpcConfig {
enable_server: true,
address: DEFAULT_ADDRESS.to_string(),
address: DEFAULT_ADDRESS,
qps_limit: DEFAULT_QPS_LIMIT,
max_body_bytes: DEFAULT_MAX_BODY_BYTES,
cors_origin: DEFAULT_CORS_ORIGIN.to_string(),
Expand Down
23 changes: 3 additions & 20 deletions rpc_sidecar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ use node_client::FramedNodeClient;
pub use node_client::{Error as ClientError, NodeClient};
pub use speculative_exec_config::Config as SpeculativeExecConfig;
pub use speculative_exec_server::run as run_speculative_exec_server;
use std::process::ExitCode;
use std::{
net::{SocketAddr, ToSocketAddrs},
sync::Arc,
};
use std::{net::SocketAddr, process::ExitCode, sync::Arc};
use tracing::warn;

/// Minimal casper protocol version supported by this sidecar.
Expand Down Expand Up @@ -98,26 +94,13 @@ async fn run_speculative_exec(
Ok(())
}

fn start_listening(address: &str) -> anyhow::Result<ServerBuilder<AddrIncoming>> {
let address = resolve_address(address).map_err(|error| {
warn!(%error, %address, "failed to start HTTP server, cannot parse address");
error
})?;

Server::try_bind(&address).map_err(|error| {
fn start_listening(address: &SocketAddr) -> anyhow::Result<ServerBuilder<AddrIncoming>> {
Server::try_bind(address).map_err(|error| {
warn!(%error, %address, "failed to start HTTP server");
error.into()
})
}

/// Parses a network address from a string, with DNS resolution.
fn resolve_address(address: &str) -> anyhow::Result<SocketAddr> {
address
.to_socket_addrs()?
.next()
.ok_or_else(|| anyhow::anyhow!("failed to resolve address"))
}

fn encode_request(req: &BinaryRequest, id: u16) -> Result<Vec<u8>, bytesrepr::Error> {
let header = BinaryRequestHeader::new(SUPPORTED_PROTOCOL_VERSION, req.tag(), id);
let mut bytes = Vec::with_capacity(header.serialized_length() + req.serialized_length());
Expand Down
2 changes: 1 addition & 1 deletion rpc_sidecar/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ impl FramedNodeClient {
current_attempt - 1
);
}
warn!(%err, "failed to connect to the node, waiting {wait}ms before retrying");
warn!(%err, "failed to connect to node {}, waiting {wait}ms before retrying", config.address);
tokio::time::sleep(Duration::from_millis(wait)).await;
wait = (wait * config.exponential_backoff.coefficient)
.min(config.exponential_backoff.max_delay_ms);
Expand Down
8 changes: 5 additions & 3 deletions rpc_sidecar/src/speculative_exec_config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use datasize::DataSize;
use serde::Deserialize;

/// Default binding address for the speculative execution RPC HTTP server.
///
/// Uses a fixed port per node, but binds on any interface.
const DEFAULT_ADDRESS: &str = "0.0.0.0:1";
const DEFAULT_ADDRESS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 1);
/// Default rate limit in qps.
const DEFAULT_QPS_LIMIT: u64 = 1;
/// Default max body bytes (2.5MB).
Expand All @@ -20,7 +22,7 @@ pub struct Config {
/// Setting to enable the HTTP server.
pub enable_server: bool,
/// Address to bind JSON-RPC speculative execution server to.
pub address: String,
pub address: SocketAddr,
/// Maximum rate limit in queries per second.
pub qps_limit: u64,
/// Maximum number of bytes to accept in a single request body.
Expand All @@ -34,7 +36,7 @@ impl Config {
pub fn new() -> Self {
Config {
enable_server: false,
address: DEFAULT_ADDRESS.to_string(),
address: DEFAULT_ADDRESS,
qps_limit: DEFAULT_QPS_LIMIT,
max_body_bytes: DEFAULT_MAX_BODY_BYTES,
cors_origin: DEFAULT_CORS_ORIGIN.to_string(),
Expand Down
10 changes: 7 additions & 3 deletions sidecar/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,10 @@ impl Component for RpcApiComponent {

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
};

use super::*;
use crate::config::SidecarConfig;
Expand Down Expand Up @@ -379,15 +382,16 @@ mod tests {
let mut config = all_components_all_enabled();
config.rpc_server.as_mut().unwrap().node_client =
NodeClientConfig::new_with_port_and_retries(port, 1);
config.rpc_server.as_mut().unwrap().main_server.address = format!("0.0.0.0:{}", port);
config.rpc_server.as_mut().unwrap().main_server.address =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
config
.rpc_server
.as_mut()
.unwrap()
.speculative_exec_server
.as_mut()
.unwrap()
.address = format!("0.0.0.0:{}", port);
.address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
let res = component.prepare_component_task(&config).await;
assert!(res.is_ok());
assert!(res.unwrap().is_some());
Expand Down
2 changes: 1 addition & 1 deletion types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"
description = "Types for casper-event-listener library"
license-file = "../LICENSE"
documentation = "README.md"
homepage = "https://github.com/casper-network/casper-sidecar/"
homepage = "https://github.com/casper-network/casper-sidecar/"
repository = "https://github.com/casper-network/casper-sidecar/"

[dependencies]
Expand Down

0 comments on commit 5cb7b00

Please sign in to comment.