Skip to content

Commit

Permalink
refactor(connector): use dyn for connection (#20253)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Jan 22, 2025
1 parent 49432f1 commit f5aed12
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 85 deletions.
18 changes: 9 additions & 9 deletions src/connector/src/connector_common/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,17 @@ use with_options::WithOptions;
use crate::connector_common::{
AwsAuthProps, IcebergCommon, KafkaConnectionProps, KafkaPrivateLinkCommon,
};
use crate::deserialize_optional_bool_from_string;
use crate::error::ConnectorResult;
use crate::schema::schema_registry::Client as ConfluentSchemaRegistryClient;
use crate::source::build_connection;
use crate::source::kafka::{KafkaContextCommon, RwConsumerContext};
use crate::{deserialize_optional_bool_from_string, dispatch_connection_impl, ConnectionImpl};

pub const SCHEMA_REGISTRY_CONNECTION_TYPE: &str = "schema_registry";

#[async_trait]
pub trait Connection {
async fn test_connection(&self) -> ConnectorResult<()>;
pub trait Connection: Send {
async fn validate_connection(&self) -> ConnectorResult<()>;
}

#[serde_as]
Expand All @@ -64,9 +65,8 @@ pub async fn validate_connection(connection: &PbConnection) -> ConnectorResult<(
let secret_refs = cp.secret_refs.clone().into_iter().collect();
let props_secret_resolved =
LocalSecretManager::global().fill_secrets(options, secret_refs)?;
let connection_impl =
ConnectionImpl::from_proto(cp.connection_type(), props_secret_resolved)?;
dispatch_connection_impl!(connection_impl, inner, inner.test_connection().await?)
let connection = build_connection(cp.connection_type(), props_secret_resolved)?;
connection.validate_connection().await?
}
risingwave_pb::catalog::connection::Info::PrivateLinkService(_) => unreachable!(),
}
Expand All @@ -76,7 +76,7 @@ pub async fn validate_connection(connection: &PbConnection) -> ConnectorResult<(

#[async_trait]
impl Connection for KafkaConnection {
async fn test_connection(&self) -> ConnectorResult<()> {
async fn validate_connection(&self) -> ConnectorResult<()> {
let client = self.build_client().await?;
// describe cluster here
client.fetch_metadata(None, Duration::from_secs(10)).await?;
Expand Down Expand Up @@ -175,7 +175,7 @@ pub struct IcebergConnection {

#[async_trait]
impl Connection for IcebergConnection {
async fn test_connection(&self) -> ConnectorResult<()> {
async fn validate_connection(&self) -> ConnectorResult<()> {
let info = match &self.warehouse_path {
Some(warehouse_path) => {
let url = Url::parse(warehouse_path);
Expand Down Expand Up @@ -298,7 +298,7 @@ pub struct ConfluentSchemaRegistryConnection {

#[async_trait]
impl Connection for ConfluentSchemaRegistryConnection {
async fn test_connection(&self) -> ConnectorResult<()> {
async fn validate_connection(&self) -> ConnectorResult<()> {
// GET /config to validate the connection
let client = ConfluentSchemaRegistryClient::try_from(self)?;
client.validate_connection().await?;
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ pub mod parser;
pub mod schema;
pub mod sink;
pub mod source;
pub use source::ConnectionImpl;

pub mod connector_common;

Expand Down
88 changes: 13 additions & 75 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,83 +228,21 @@ macro_rules! dispatch_split_impl {
}};
}

#[macro_export]
macro_rules! dispatch_connection_impl {
($impl:expr, $inner_name:ident, $body:expr) => {
$crate::dispatch_connection_enum! { $impl, $inner_name, $body }
};
}

#[macro_export]
macro_rules! dispatch_connection_enum {
($impl:expr, $inner_name:ident, $body:expr) => {{
$crate::for_all_connections! {
$crate::dispatch_connection_impl_inner,
$impl,
$inner_name,
$body
}
}};
}

#[macro_export]
macro_rules! dispatch_connection_impl_inner {
(
{ $({$conn_variant_name:ident, $connection:ty, $pb_variant_type:ty }),* },
$impl:expr,
$inner_name:ident,
$body:expr
) => {{
match $impl {
$(
ConnectionImpl::$conn_variant_name($inner_name) => {
$body
}
),*
}
}};
}

#[macro_export]
macro_rules! impl_connection {
({$({ $variant_name:ident, $connection:ty, $pb_connection_path:path }),*}) => {
#[derive(Debug, Clone, EnumAsInner, PartialEq)]
pub enum ConnectionImpl {
$(
$variant_name(Box<$connection>),
)*
}

$(
impl TryFrom<ConnectionImpl> for $connection {
type Error = $crate::error::ConnectorError;

fn try_from(connection: ConnectionImpl) -> std::result::Result<Self, Self::Error> {
match connection {
ConnectionImpl::$variant_name(inner) => Ok(Box::into_inner(inner)),
other => risingwave_common::bail!("expect {} but get {:?}", stringify!($connection), other),
}
}
}

impl From<$connection> for ConnectionImpl {
fn from(connection: $connection) -> ConnectionImpl {
ConnectionImpl::$variant_name(Box::new(connection))
}
}

)*

impl ConnectionImpl {
pub fn from_proto(pb_connection_type: risingwave_pb::catalog::connection_params::PbConnectionType, value_secret_filled: std::collections::BTreeMap<String, String>) -> $crate::error::ConnectorResult<Self> {
match pb_connection_type {
$(
<$pb_connection_path>::$variant_name => {
Ok(serde_json::from_value(json!(value_secret_filled)).map(ConnectionImpl::$variant_name).map_err($crate::error::ConnectorError::from)?)
},
)*
risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(),
}
({$({ $variant_name:ident, $connection_type:ty, $pb_connection_path:path }),*}) => {
pub fn build_connection(
pb_connection_type: risingwave_pb::catalog::connection_params::PbConnectionType,
value_secret_filled: std::collections::BTreeMap<String, String>
) -> $crate::error::ConnectorResult<Box<dyn $crate::connector_common::Connection>> {
match pb_connection_type {
$(
<$pb_connection_path>::$variant_name => {
let c: Box<$connection_type> = serde_json::from_value(json!(value_secret_filled)).map_err($crate::error::ConnectorError::from)?;
Ok(c)
},
)*
risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(),
}
}
}
Expand Down

0 comments on commit f5aed12

Please sign in to comment.