Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aligning with node changes #382

Merged
merged 4 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 80 additions & 65 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion event_sidecar/src/event_stream_server/sse_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ use casper_event_types::{
sse_data::{EventFilter, SseData},
Filter as SseFilter,
};
use casper_types::{ProtocolVersion, Transaction};
use casper_types::ProtocolVersion;
#[cfg(test)]
use casper_types::Transaction;
use futures::{future, Stream, StreamExt};
use http::StatusCode;
use hyper::Body;
#[cfg(test)]
use serde::Serialize;
#[cfg(test)]
use serde_json::Value;
Expand Down Expand Up @@ -97,6 +100,7 @@ type UrlProps = (
IsLegacyFilter,
);

#[cfg(test)]
#[derive(Serialize)]
#[serde(rename_all = "PascalCase")]
pub(super) struct TransactionAccepted {
Expand Down
4 changes: 2 additions & 2 deletions event_sidecar/src/event_stream_server/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,11 +598,11 @@ async fn fetch_text(
///
/// The expected order is:
/// * data:<JSON-encoded ApiVersion> (note, no ID line follows this first event)
/// then the following three repeated for as many events as are applicable to that stream:
/// then the following three repeated for as many events as are applicable to that stream:
/// * data:<JSON-encoded event>
/// * id:<integer>
/// * empty line
/// then finally, repeated keepalive lines until the server is shut down.
/// then finally, repeated keepalive lines until the server is shut down.
#[allow(clippy::too_many_lines)]
fn parse_response(response_text: String, client_id: &str) -> Vec<ReceivedEvent> {
let mut received_events = Vec::new();
Expand Down
4 changes: 0 additions & 4 deletions event_sidecar/src/testing/simple_sse_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ pub(crate) mod tests {
pub routes: HashMap<Vec<String>, CacheAndData>,
}

#[derive(Debug)]
struct Nope;
impl warp::reject::Reject for Nope {}

type ShutdownCallbacks = Arc<Mutex<Vec<broadcast::Sender<Option<(Option<String>, String)>>>>>;

impl SimpleSseServer {
Expand Down
26 changes: 23 additions & 3 deletions event_sidecar/src/types/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,29 @@ pub enum DatabaseWriteError {
Unhandled(anyhow::Error),
}

impl ToString for DatabaseWriteError {
fn to_string(&self) -> String {
format!("{:?}", self)
impl Display for DatabaseWriteError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
DatabaseWriteError::Serialisation(error) => {
write!(f, "DatabaseWriteError::Serialisation: {}", error)
}
DatabaseWriteError::SqlConstruction(error) => {
write!(f, "DatabaseWriteError::SqlConstruction: {}", error)
}
DatabaseWriteError::UniqueConstraint(unique_constraint_error) => {
write!(
f,
"DatabaseWriteError::UniqueConstraint: table: {}, error: {}",
unique_constraint_error.table, unique_constraint_error.error
)
}
DatabaseWriteError::Database(error) => {
write!(f, "DatabaseWriteError::Database: {}", error)
}
DatabaseWriteError::Unhandled(error) => {
write!(f, "DatabaseWriteError::Unhandled: {}", error)
}
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions event_sidecar/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ pub mod tests {
let infix_str = infix.as_str();
data.iter().any(|x| x.contains(infix_str))
}

#[allow(dead_code)]
pub struct MockNodeTestProperties {
pub testing_config: TestingConfig,
pub temp_storage_dir: TempDir,
Expand Down
1 change: 1 addition & 0 deletions json_rpc/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl Request {
/// * `allow_unknown_fields` is `false` and extra fields exist
///
/// Returns a `Rejection` if the "id" field is `None`.
#[allow(clippy::result_large_err)]
pub(super) fn new(
mut request: Map<String, Value>,
allow_unknown_fields: bool,
Expand Down
1 change: 1 addition & 0 deletions json_rpc/src/request/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub enum Params {
}

impl Params {
#[allow(clippy::result_large_err)]
pub(super) fn try_from(request_id: &Value, params: Value) -> Result<Self, ErrorOrRejection> {
let err_invalid_request = |additional_info: &str| {
let error = Error::new(ReservedErrorCode::InvalidRequest, additional_info);
Expand Down
1 change: 0 additions & 1 deletion listener/src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ impl DefaultConnectionManagerBuilder {
impl DefaultConnectionManager {
/// Start handling traffic from nodes endpoint. This function is blocking, it will return a
/// ConnectionManagerError result if something went wrong while processing.

async fn connect(
&mut self,
) -> Result<Pin<Box<dyn Stream<Item = EventResult> + Send + 'static>>, ConnectionManagerError>
Expand Down
13 changes: 5 additions & 8 deletions listener/src/sse_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use eventsource_stream::{Event, EventStream, EventStreamError, Eventsource};
use futures::StreamExt;
use reqwest::Client;
use std::pin::Pin;
use std::{fmt::Debug, sync::Arc, time::Duration};
use std::{fmt::Debug, time::Duration};
use tokio::select;
use tokio_stream::Stream;
use tracing::debug;
Expand All @@ -17,7 +17,7 @@ use url::Url;
#[derive(Clone, Debug)]
pub enum SseDataStreamingError {
NoDataTimeout(),
ConnectionError(Arc<Error>),
ConnectionError(),
}

pub type EventResult = Result<Event, EventStreamError<SseDataStreamingError>>;
Expand Down Expand Up @@ -84,8 +84,8 @@ impl SseConnection {
monitor.tick().await;
yield Ok(bytes);
},
Err(err) => {
yield Err(SseDataStreamingError::ConnectionError(Arc::new(Error::from(err))));
Err(_) => {
yield Err(SseDataStreamingError::ConnectionError());
break;
}
}
Expand Down Expand Up @@ -177,7 +177,6 @@ pub mod tests {
use std::{
convert::Infallible,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::mpsc::channel;
Expand Down Expand Up @@ -225,9 +224,7 @@ pub mod tests {
}

pub fn build_failing_on_message() -> Self {
let e = SseDataStreamingError::ConnectionError(Arc::new(Error::msg(
"Some error on message",
)));
let e = SseDataStreamingError::ConnectionError();
MockSseConnection {
data: vec![],
failure_on_connection: None,
Expand Down
10 changes: 2 additions & 8 deletions listener/src/version_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,11 @@ fn try_resolve_version(raw_response: &Value) -> Result<ProtocolVersion, Error> {
let raw = build_version_value
.as_str()
.context("build_version_value should be a string")
.map_err(|e| {
count_error("version_value_not_a_string");
e
})?
.inspect_err(|_| count_error("version_value_not_a_string"))?
.split('-')
.next()
.context("splitting build_version_value should always return at least one slice")
.map_err(|e| {
count_error("incomprehensible_build_version_form");
e
})?;
.inspect_err(|_| count_error("incomprehensible_build_version_form"))?;
ProtocolVersion::from_str(raw).map_err(|error| {
count_error("failed_parsing_protocol_version");
anyhow!("failed parsing build version from '{}': {}", raw, error)
Expand Down
8 changes: 5 additions & 3 deletions metrics/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt::{Display, Formatter};

use once_cell::sync::Lazy;
use prometheus::{IntCounterVec, Opts, Registry};

Expand Down Expand Up @@ -25,9 +27,9 @@ pub struct MetricCollectionError {
reason: String,
}

impl ToString for MetricCollectionError {
fn to_string(&self) -> String {
format!("MetricCollectionError: {}", self.reason)
impl Display for MetricCollectionError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "MetricCollectionError: {}", self.reason)
}
}

Expand Down
2 changes: 1 addition & 1 deletion metrics/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub fn inc_method_call(method: &str) {
pub fn observe_response_time(method: &str, status: &str, response_time: Duration) {
let response_time = response_time.as_secs_f64() * 1000.0;
RESPONSE_TIMES_MS
.with_label_values(&[method, &status])
.with_label_values(&[method, status])
.observe(response_time);
}

Expand Down
14 changes: 7 additions & 7 deletions resources/test/rpc_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -5511,10 +5511,10 @@
"description": "A version 1 transfer.",
"type": "object",
"required": [
"LegacyTransfer"
"Transfer"
],
"properties": {
"LegacyTransfer": {
"Transfer": {
"$ref": "#/components/schemas/TransferV1"
}
},
Expand Down Expand Up @@ -5683,14 +5683,14 @@
"additionalProperties": false
},
{
"description": "A reservation record.",
"description": "A prepayment record.",
"type": "object",
"required": [
"Prepaid"
"Prepayment"
],
"properties": {
"Prepaid": {
"$ref": "#/components/schemas/PrepaidKind"
"Prepayment": {
"$ref": "#/components/schemas/PrepaymentKind"
}
},
"additionalProperties": false
Expand Down Expand Up @@ -6560,7 +6560,7 @@
}
}
},
"PrepaidKind": {
"PrepaymentKind": {
"description": "Container for bytes recording location, type and data for a gas pre payment",
"type": "object",
"required": [
Expand Down
14 changes: 7 additions & 7 deletions resources/test/speculative_rpc_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1543,10 +1543,10 @@
"description": "A version 1 transfer.",
"type": "object",
"required": [
"LegacyTransfer"
"Transfer"
],
"properties": {
"LegacyTransfer": {
"Transfer": {
"$ref": "#/components/schemas/TransferV1"
}
},
Expand Down Expand Up @@ -1715,14 +1715,14 @@
"additionalProperties": false
},
{
"description": "A reservation record.",
"description": "A prepayment record.",
"type": "object",
"required": [
"Prepaid"
"Prepayment"
],
"properties": {
"Prepaid": {
"$ref": "#/components/schemas/PrepaidKind"
"Prepayment": {
"$ref": "#/components/schemas/PrepaymentKind"
}
},
"additionalProperties": false
Expand Down Expand Up @@ -3529,7 +3529,7 @@
}
}
},
"PrepaidKind": {
"PrepaymentKind": {
"description": "Container for bytes recording location, type and data for a gas pre payment",
"type": "object",
"required": [
Expand Down
14 changes: 7 additions & 7 deletions rpc_sidecar/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,9 +520,9 @@ pub enum InvalidTransactionOrDeploy {
/// Entry point cannot be 'call'
#[error("entry point cannot be 'call'")]
InvalidTransactionEntryPointCannotBeCall,
/// Invalid transaction kind
#[error("invalid transaction kind")]
InvalidTransactionInvalidTransactionKind,
/// Invalid transaction lane
#[error("invalid transaction lane")]
InvalidTransactionInvalidTransactionLane,
}

impl From<ErrorCode> for InvalidTransactionOrDeploy {
Expand Down Expand Up @@ -627,8 +627,8 @@ impl From<ErrorCode> for InvalidTransactionOrDeploy {
ErrorCode::InvalidTransactionEntryPointCannotBeCall => {
Self::InvalidTransactionEntryPointCannotBeCall
}
ErrorCode::InvalidTransactionInvalidTransactionKind => {
Self::InvalidTransactionInvalidTransactionKind
ErrorCode::InvalidTransactionInvalidTransactionLane => {
Self::InvalidTransactionInvalidTransactionLane
}
ErrorCode::InvalidTransactionUnspecified => Self::TransactionUnspecified,
ErrorCode::InvalidTransactionOrDeployUnspecified => {
Expand Down Expand Up @@ -766,7 +766,7 @@ impl Error {
| ErrorCode::DeployMissingTransferTarget
| ErrorCode::DeployMissingModuleBytes
| ErrorCode::InvalidTransactionEntryPointCannotBeCall
| ErrorCode::InvalidTransactionInvalidTransactionKind
| ErrorCode::InvalidTransactionInvalidTransactionLane
| ErrorCode::InvalidTransactionUnspecified
| ErrorCode::InvalidTransactionOrDeployUnspecified),
) => Self::InvalidTransaction(InvalidTransactionOrDeploy::from(err)),
Expand Down Expand Up @@ -1206,7 +1206,7 @@ where
#[derive(Clone, Copy, Debug)]
pub(crate) struct ErrFormatter<'a, T>(pub &'a T);

impl<'a, T> Display for ErrFormatter<'a, T>
impl<T> Display for ErrFormatter<'_, T>
where
T: std::error::Error,
{
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[toolchain]
channel = "1.77.2"
channel = "1.83.0"
components = [ "rustfmt", "clippy" ]
targets = [ "wasm32-unknown-unknown" ]
profile = "minimal"
4 changes: 2 additions & 2 deletions sidecar/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use config::{SidecarConfig, SidecarConfigTarget};
use run::run;
use std::{
env, fmt, io,
panic::{self, PanicInfo},
panic::{self, PanicHookInfo},
process::{self, ExitCode},
};
#[cfg(not(target_env = "msvc"))]
Expand Down Expand Up @@ -68,7 +68,7 @@ pub fn read_config(config_path: &str) -> Result<SidecarConfigTarget, Error> {
toml::from_str(&toml_content).context("Error parsing config into TOML format")
}

fn panic_hook(info: &PanicInfo) {
fn panic_hook(info: &PanicHookInfo) {
let backtrace = Backtrace::new();

eprintln!("{:?}", backtrace);
Expand Down
4 changes: 2 additions & 2 deletions types/src/legacy_sse_data/translate_execution_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ fn maybe_tanslate_stored_value(stored_value: &StoredValue) -> Option<TransformKi
StoredValue::ContractWasm(_) => Some(TransformKindV1::WriteContractWasm),
StoredValue::Contract(_) => Some(TransformKindV1::WriteContract),
StoredValue::ContractPackage(_) => Some(TransformKindV1::WriteContractPackage),
StoredValue::LegacyTransfer(transfer_v1) => {
StoredValue::Transfer(transfer_v1) => {
Some(TransformKindV1::WriteTransfer(transfer_v1.clone()))
}
StoredValue::DeployInfo(deploy_info) => {
Expand Down Expand Up @@ -162,7 +162,7 @@ fn maybe_tanslate_stored_value(stored_value: &StoredValue) -> Option<TransformKi
StoredValue::ByteCode(_) => None,
StoredValue::MessageTopic(_) => None,
StoredValue::Message(_) => None,
StoredValue::Prepaid(_) => None,
StoredValue::Prepayment(_) => None,
StoredValue::EntryPoint(_) => None,
StoredValue::RawBytes(_) => None,
}
Expand Down
Loading