Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make IP & port config consistent across all node connections #358

Merged
merged 2 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions resources/example_configs/EXAMPLE_NCTL_CONFIG.toml
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
[rpc_server.main_server]
enable_server = true
address = "0.0.0.0:11102"
ip_address = "0.0.0.0"
port = 11102
qps_limit = 100
max_body_bytes = 2621440
cors_origin = ""

[rpc_server.speculative_exec_server]
enable_server = true
address = "0.0.0.0:25102"
ip_address = "0.0.0.0"
port = 25102
qps_limit = 1
max_body_bytes = 2621440
cors_origin = ""

[rpc_server.node_client]
address = "0.0.0.0:28102"
ip_address = "0.0.0.0"
port = 28102
max_message_size_bytes = 4194304
request_limit = 3
request_buffer_size = 16
Expand Down
11 changes: 7 additions & 4 deletions resources/example_configs/EXAMPLE_NCTL_POSTGRES_CONFIG.toml
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
[rpc_server.main_server]
enable_server = true
address = "0.0.0.0:11102"
ip_address = "0.0.0.0"
port = 11102
qps_limit = 100
max_body_bytes = 2621440
cors_origin = ""

[rpc_server.speculative_exec_server]
enable_server = true
address = "0.0.0.0:25102"
ip_address = "0.0.0.0"
port = 25102
qps_limit = 1
max_body_bytes = 2621440
cors_origin = ""

[rpc_server.node_client]
address = "0.0.0.0:28102"
ip_address = "0.0.0.0"
port = 28102
max_message_size_bytes = 4194304
request_limit = 3
request_buffer_size = 16
Expand Down Expand Up @@ -89,4 +92,4 @@ max_requests_per_second = 50
[admin_api_server]
port = 18887
max_concurrent_requests = 1
max_requests_per_second = 1
max_requests_per_second = 1
9 changes: 6 additions & 3 deletions resources/example_configs/EXAMPLE_NODE_CONFIG.toml
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
[rpc_server.main_server]
enable_server = true
address = "0.0.0.0:7777"
ip_address = "0.0.0.0"
port = 7777
qps_limit = 100
max_body_bytes = 2621440
cors_origin = ""

[rpc_server.speculative_exec_server]
enable_server = true
address = "0.0.0.0:7778"
ip_address = "0.0.0.0"
port = 7778
qps_limit = 1
max_body_bytes = 2621440
cors_origin = ""

[rpc_server.node_client]
address = "3.20.57.210:7777"
ip_address = "3.20.57.210"
port = 7777
max_message_size_bytes = 4194304
request_limit = 10
request_buffer_size = 50
Expand Down
9 changes: 6 additions & 3 deletions resources/example_configs/default_debian_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ enable_server = true
# the JSON-RPC HTTP server will not run, but the node will be otherwise unaffected.
#
# The actual bound address will be reported via a log line if logging is enabled.
address = '0.0.0.0:7777'
ip_address = '0.0.0.0'
port= 7777

# The global max rate of requests (per second) before they are limited.
# Request will be delayed to the next 1 second bucket once limited.
Expand Down Expand Up @@ -44,7 +45,8 @@ enable_server = true
# but the node will be otherwise unaffected.
#
# The actual bound address will be reported via a log line if logging is enabled.
address = '0.0.0.0:7778'
ip_address = '0.0.0.0'
port = 7778

# The global max rate of requests (per second) before they are limited.
# Request will be delayed to the next 1 second bucket once limited.
Expand All @@ -65,7 +67,8 @@ cors_origin = ''
# =========================================
[rpc_server.node_client]
# The address of the node to connect to.
address = '127.0.0.1:7779'
ip_address = '127.0.0.1'
port = 7779
# Maximum size of a message in bytes.
max_message_size_bytes = 4_194_304
# Maximum number of in-flight node requests.
Expand Down
9 changes: 6 additions & 3 deletions resources/example_configs/default_rpc_only_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ enable_server = true
# the JSON-RPC HTTP server will not run, but the node will be otherwise unaffected.
#
# The actual bound address will be reported via a log line if logging is enabled.
address = '0.0.0.0:7777'
ip_address = '0.0.0.0'
port = 7777

# The global max rate of requests (per second) before they are limited.
# Request will be delayed to the next 1 second bucket once limited.
Expand Down Expand Up @@ -44,7 +45,8 @@ enable_server = true
# but the node will be otherwise unaffected.
#
# The actual bound address will be reported via a log line if logging is enabled.
address = '0.0.0.0:7778'
ip_address = '0.0.0.0'
port = 7778

# The global max rate of requests (per second) before they are limited.
# Request will be delayed to the next 1 second bucket once limited.
Expand All @@ -65,7 +67,8 @@ cors_origin = ''
# =========================================
[rpc_server.node_client]
# The address of the node to connect to.
address = '127.0.0.1:7779'
ip_address = '127.0.0.1'
port = 7779
# Maximum size of a message in bytes.
max_message_size_bytes = 4_194_304
# Maximum number of in-flight node requests.
Expand Down
45 changes: 29 additions & 16 deletions rpc_sidecar/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
convert::{TryFrom, TryInto},
net::{IpAddr, Ipv4Addr, SocketAddr},
net::{IpAddr, Ipv4Addr},
};

use datasize::DataSize;
Expand All @@ -12,7 +12,8 @@ use crate::SpeculativeExecConfig;
/// Default binding address for the JSON-RPC HTTP server.
///
/// Uses a fixed port per node, but binds on any interface.
const DEFAULT_ADDRESS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
const DEFAULT_IP_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
const DEFAULT_PORT: u16 = 0;
/// Default rate limit in qps.
const DEFAULT_QPS_LIMIT: u64 = 100;
/// Default max body bytes. This is 2.5MB which should be able to accommodate the largest valid
Expand Down Expand Up @@ -73,8 +74,10 @@ pub struct RpcServerConfig {
pub struct RpcConfig {
/// Setting to enable the HTTP server.
pub enable_server: bool,
/// Address to bind JSON-RPC HTTP server to.
pub address: SocketAddr,
/// IP address to bind JSON-RPC HTTP server to.
pub ip_address: IpAddr,
/// TCP port to bind JSON-RPC HTTP server to.
pub port: u16,
/// Maximum rate limit in queries per second.
pub qps_limit: u64,
/// Maximum number of bytes to accept in a single request body.
Expand All @@ -88,7 +91,8 @@ impl RpcConfig {
pub fn new() -> Self {
RpcConfig {
enable_server: true,
address: DEFAULT_ADDRESS,
ip_address: DEFAULT_IP_ADDRESS,
port: DEFAULT_PORT,
qps_limit: DEFAULT_QPS_LIMIT,
max_body_bytes: DEFAULT_MAX_BODY_BYTES,
cors_origin: DEFAULT_CORS_ORIGIN.to_string(),
Expand All @@ -104,7 +108,8 @@ impl Default for RpcConfig {

/// Default address to connect to the node.
// Change this to SocketAddr, once SocketAddr::new is const stable.
const DEFAULT_NODE_CONNECT_ADDRESS: (IpAddr, u16) = (IpAddr::V4(Ipv4Addr::LOCALHOST), 28104);
const DEFAULT_NODE_CONNECT_IP_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST);
const DEFAULT_NODE_CONNECT_PORT: u16 = 28104;
/// Default maximum payload size.
const DEFAULT_MAX_PAYLOAD_SIZE: u32 = 4 * 1024 * 1024;
/// Default message timeout in seconds.
Expand All @@ -127,8 +132,10 @@ const DEFAULT_EXPONENTIAL_BACKOFF_COEFFICIENT: u64 = 2;
// Disallow unknown fields to ensure config files and command-line overrides contain valid keys.
#[serde(deny_unknown_fields)]
pub struct NodeClientConfig {
/// Address of the node.
pub address: SocketAddr,
/// IP address of the node.
pub ip_address: IpAddr,
/// Port of the node.
pub port: u16,
/// Maximum size of a message in bytes.
pub max_message_size_bytes: u32,
/// Message transfer timeout in seconds.
Expand All @@ -148,7 +155,8 @@ impl NodeClientConfig {
/// Creates a default instance for `NodeClientConfig`.
pub fn new() -> Self {
NodeClientConfig {
address: DEFAULT_NODE_CONNECT_ADDRESS.into(),
ip_address: DEFAULT_NODE_CONNECT_IP_ADDRESS,
port: DEFAULT_NODE_CONNECT_PORT,
request_limit: DEFAULT_NODE_REQUEST_LIMIT,
max_message_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE,
request_buffer_size: DEFAULT_REQUEST_BUFFER_SIZE,
Expand All @@ -166,9 +174,10 @@ impl NodeClientConfig {
/// Creates an instance of `NodeClientConfig` with specified listening port.
#[cfg(any(feature = "testing", test))]
pub fn new_with_port(port: u16) -> Self {
let local_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let localhost = IpAddr::V4(Ipv4Addr::LOCALHOST);
NodeClientConfig {
address: local_socket,
ip_address: localhost,
port,
request_limit: DEFAULT_NODE_REQUEST_LIMIT,
max_message_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE,
request_buffer_size: DEFAULT_REQUEST_BUFFER_SIZE,
Expand All @@ -187,9 +196,10 @@ impl NodeClientConfig {
/// of reconnection retries.
#[cfg(any(feature = "testing", test))]
pub fn new_with_port_and_retries(port: u16, num_of_retries: usize) -> Self {
let local_socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let localhost = IpAddr::V4(Ipv4Addr::LOCALHOST);
NodeClientConfig {
address: local_socket,
ip_address: localhost,
port,
request_limit: DEFAULT_NODE_REQUEST_LIMIT,
max_message_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE,
request_buffer_size: DEFAULT_REQUEST_BUFFER_SIZE,
Expand All @@ -216,8 +226,10 @@ impl Default for NodeClientConfig {
// Disallow unknown fields to ensure config files and command-line overrides contain valid keys.
#[serde(deny_unknown_fields)]
pub struct NodeClientConfigTarget {
/// Address of the node.
pub address: SocketAddr,
/// IP address of the node.
pub ip_address: IpAddr,
/// TCP port of the node
pub port: u16,
/// Maximum size of a message in bytes.
pub max_message_size_bytes: u32,
/// Message transfer timeout in seconds.
Expand Down Expand Up @@ -245,7 +257,8 @@ impl TryFrom<NodeClientConfigTarget> for NodeClientConfig {
error: e.to_string(),
})?;
Ok(NodeClientConfig {
address: value.address,
ip_address: value.ip_address,
port: value.port,
request_limit: value.request_limit,
max_message_size_bytes: value.max_message_size_bytes,
request_buffer_size: value.request_buffer_size,
Expand Down
4 changes: 2 additions & 2 deletions rpc_sidecar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn retype_future_vec(
async fn run_rpc(config: RpcConfig, node_client: Arc<dyn NodeClient>) -> Result<(), Error> {
run_rpc_server(
node_client,
start_listening(&config.address)?,
start_listening(&SocketAddr::new(config.ip_address, config.port))?,
config.qps_limit,
config.max_body_bytes,
config.cors_origin.clone(),
Expand All @@ -85,7 +85,7 @@ async fn run_speculative_exec(
) -> anyhow::Result<()> {
run_speculative_exec_server(
node_client,
start_listening(&config.address)?,
start_listening(&SocketAddr::new(config.ip_address, config.port))?,
config.qps_limit,
config.max_body_bytes,
config.cors_origin.clone(),
Expand Down
12 changes: 7 additions & 5 deletions rpc_sidecar/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use metrics::rpc::{inc_disconnect, observe_reconnect_time};
use serde::de::DeserializeOwned;
use std::{
convert::{TryFrom, TryInto},
net::SocketAddr,
sync::{
atomic::{AtomicU16, Ordering},
Arc,
Expand Down Expand Up @@ -948,9 +949,10 @@ impl FramedNodeClient {
} else {
&config.exponential_backoff.max_attempts
};
let tcp_socket = SocketAddr::new(config.ip_address, config.port);
let mut current_attempt = 1;
loop {
match TcpStream::connect(config.address).await {
match TcpStream::connect(tcp_socket).await {
Ok(stream) => {
return Ok(Framed::new(
stream,
Expand All @@ -962,11 +964,11 @@ impl FramedNodeClient {
if !max_attempts.can_attempt(current_attempt) {
anyhow::bail!(
"Couldn't connect to node {} after {} attempts",
config.address,
tcp_socket,
current_attempt - 1
);
}
warn!(%err, "failed to connect to node {}, waiting {wait}ms before retrying", config.address);
warn!(%err, "failed to connect to node {tcp_socket}, waiting {wait}ms before retrying");
tokio::time::sleep(Duration::from_millis(wait)).await;
wait = (wait * config.exponential_backoff.coefficient)
.min(config.exponential_backoff.max_delay_ms);
Expand Down Expand Up @@ -1025,7 +1027,7 @@ impl NodeClient for FramedNodeClient {
let result = self.send_request_internal(&req, &mut client).await;
if let Err(err) = &result {
warn!(
addr = %self.config.address,
addr = %self.config.ip_address,
err = display_error(&err),
"binary port client handler error"
);
Expand All @@ -1039,7 +1041,7 @@ impl NodeClient for FramedNodeClient {
Err(err) => {
warn!(
%err,
addr = %self.config.address,
addr = %self.config.ip_address,
"binary port client failed to reconnect"
);
// schedule standard reconnect process with multiple retries
Expand Down
14 changes: 9 additions & 5 deletions rpc_sidecar/src/speculative_exec_config.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::net::{IpAddr, Ipv4Addr};

use datasize::DataSize;
use serde::Deserialize;

/// Default binding address for the speculative execution RPC HTTP server.
///
/// Uses a fixed port per node, but binds on any interface.
const DEFAULT_ADDRESS: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 1);
const DEFAULT_IP_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
const DEFAULT_PORT: u16 = 1;
/// Default rate limit in qps.
const DEFAULT_QPS_LIMIT: u64 = 1;
/// Default max body bytes (2.5MB).
Expand All @@ -21,8 +22,10 @@ const DEFAULT_CORS_ORIGIN: &str = "";
pub struct Config {
/// Setting to enable the HTTP server.
pub enable_server: bool,
/// Address to bind JSON-RPC speculative execution server to.
pub address: SocketAddr,
/// IP address to bind JSON-RPC speculative execution server to.
pub ip_address: IpAddr,
/// Port to bind JSON-RPC speculative execution server to.
pub port: u16,
/// Maximum rate limit in queries per second.
pub qps_limit: u64,
/// Maximum number of bytes to accept in a single request body.
Expand All @@ -36,7 +39,8 @@ impl Config {
pub fn new() -> Self {
Config {
enable_server: false,
address: DEFAULT_ADDRESS,
ip_address: DEFAULT_IP_ADDRESS,
port: DEFAULT_PORT,
qps_limit: DEFAULT_QPS_LIMIT,
max_body_bytes: DEFAULT_MAX_BODY_BYTES,
cors_origin: DEFAULT_CORS_ORIGIN.to_string(),
Expand Down
Loading
Loading