From f5aed12ec97f7f84c6dd99604398c809a9e3e36d Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 22 Jan 2025 12:47:20 +0800 Subject: [PATCH] refactor(connector): use dyn for connection (#20253) Signed-off-by: xxchan --- .../src/connector_common/connection.rs | 18 ++-- src/connector/src/lib.rs | 1 - src/connector/src/macros.rs | 88 +++---------------- 3 files changed, 22 insertions(+), 85 deletions(-) diff --git a/src/connector/src/connector_common/connection.rs b/src/connector/src/connector_common/connection.rs index 75a808a6f2eb9..3604b1bd2f99c 100644 --- a/src/connector/src/connector_common/connection.rs +++ b/src/connector/src/connector_common/connection.rs @@ -32,16 +32,17 @@ use with_options::WithOptions; use crate::connector_common::{ AwsAuthProps, IcebergCommon, KafkaConnectionProps, KafkaPrivateLinkCommon, }; +use crate::deserialize_optional_bool_from_string; use crate::error::ConnectorResult; use crate::schema::schema_registry::Client as ConfluentSchemaRegistryClient; +use crate::source::build_connection; use crate::source::kafka::{KafkaContextCommon, RwConsumerContext}; -use crate::{deserialize_optional_bool_from_string, dispatch_connection_impl, ConnectionImpl}; pub const SCHEMA_REGISTRY_CONNECTION_TYPE: &str = "schema_registry"; #[async_trait] -pub trait Connection { - async fn test_connection(&self) -> ConnectorResult<()>; +pub trait Connection: Send { + async fn validate_connection(&self) -> ConnectorResult<()>; } #[serde_as] @@ -64,9 +65,8 @@ pub async fn validate_connection(connection: &PbConnection) -> ConnectorResult<( let secret_refs = cp.secret_refs.clone().into_iter().collect(); let props_secret_resolved = LocalSecretManager::global().fill_secrets(options, secret_refs)?; - let connection_impl = - ConnectionImpl::from_proto(cp.connection_type(), props_secret_resolved)?; - dispatch_connection_impl!(connection_impl, inner, inner.test_connection().await?) + let connection = build_connection(cp.connection_type(), props_secret_resolved)?; + connection.validate_connection().await? } risingwave_pb::catalog::connection::Info::PrivateLinkService(_) => unreachable!(), } @@ -76,7 +76,7 @@ pub async fn validate_connection(connection: &PbConnection) -> ConnectorResult<( #[async_trait] impl Connection for KafkaConnection { - async fn test_connection(&self) -> ConnectorResult<()> { + async fn validate_connection(&self) -> ConnectorResult<()> { let client = self.build_client().await?; // describe cluster here client.fetch_metadata(None, Duration::from_secs(10)).await?; @@ -175,7 +175,7 @@ pub struct IcebergConnection { #[async_trait] impl Connection for IcebergConnection { - async fn test_connection(&self) -> ConnectorResult<()> { + async fn validate_connection(&self) -> ConnectorResult<()> { let info = match &self.warehouse_path { Some(warehouse_path) => { let url = Url::parse(warehouse_path); @@ -298,7 +298,7 @@ pub struct ConfluentSchemaRegistryConnection { #[async_trait] impl Connection for ConfluentSchemaRegistryConnection { - async fn test_connection(&self) -> ConnectorResult<()> { + async fn validate_connection(&self) -> ConnectorResult<()> { // GET /config to validate the connection let client = ConfluentSchemaRegistryClient::try_from(self)?; client.validate_connection().await?; diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 58877b557b52c..0e8501703136e 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -49,7 +49,6 @@ pub mod parser; pub mod schema; pub mod sink; pub mod source; -pub use source::ConnectionImpl; pub mod connector_common; diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index 1a0872e5b09c3..ac498839ed67c 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -228,83 +228,21 @@ macro_rules! dispatch_split_impl { }}; } -#[macro_export] -macro_rules! dispatch_connection_impl { - ($impl:expr, $inner_name:ident, $body:expr) => { - $crate::dispatch_connection_enum! { $impl, $inner_name, $body } - }; -} - -#[macro_export] -macro_rules! dispatch_connection_enum { - ($impl:expr, $inner_name:ident, $body:expr) => {{ - $crate::for_all_connections! { - $crate::dispatch_connection_impl_inner, - $impl, - $inner_name, - $body - } - }}; -} - -#[macro_export] -macro_rules! dispatch_connection_impl_inner { - ( - { $({$conn_variant_name:ident, $connection:ty, $pb_variant_type:ty }),* }, - $impl:expr, - $inner_name:ident, - $body:expr - ) => {{ - match $impl { - $( - ConnectionImpl::$conn_variant_name($inner_name) => { - $body - } - ),* - } - }}; -} - #[macro_export] macro_rules! impl_connection { - ({$({ $variant_name:ident, $connection:ty, $pb_connection_path:path }),*}) => { - #[derive(Debug, Clone, EnumAsInner, PartialEq)] - pub enum ConnectionImpl { - $( - $variant_name(Box<$connection>), - )* - } - - $( - impl TryFrom for $connection { - type Error = $crate::error::ConnectorError; - - fn try_from(connection: ConnectionImpl) -> std::result::Result { - match connection { - ConnectionImpl::$variant_name(inner) => Ok(Box::into_inner(inner)), - other => risingwave_common::bail!("expect {} but get {:?}", stringify!($connection), other), - } - } - } - - impl From<$connection> for ConnectionImpl { - fn from(connection: $connection) -> ConnectionImpl { - ConnectionImpl::$variant_name(Box::new(connection)) - } - } - - )* - - impl ConnectionImpl { - pub fn from_proto(pb_connection_type: risingwave_pb::catalog::connection_params::PbConnectionType, value_secret_filled: std::collections::BTreeMap) -> $crate::error::ConnectorResult { - match pb_connection_type { - $( - <$pb_connection_path>::$variant_name => { - Ok(serde_json::from_value(json!(value_secret_filled)).map(ConnectionImpl::$variant_name).map_err($crate::error::ConnectorError::from)?) - }, - )* - risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(), - } + ({$({ $variant_name:ident, $connection_type:ty, $pb_connection_path:path }),*}) => { + pub fn build_connection( + pb_connection_type: risingwave_pb::catalog::connection_params::PbConnectionType, + value_secret_filled: std::collections::BTreeMap + ) -> $crate::error::ConnectorResult> { + match pb_connection_type { + $( + <$pb_connection_path>::$variant_name => { + let c: Box<$connection_type> = serde_json::from_value(json!(value_secret_filled)).map_err($crate::error::ConnectorError::from)?; + Ok(c) + }, + )* + risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(), } } }