diff --git a/connector-common-http/src/main/java/com/redhat/cloud/notifications/connector/http/HttpExceptionProcessor.java b/connector-common-http/src/main/java/com/redhat/cloud/notifications/connector/http/HttpExceptionProcessor.java index bcacc2116e..ef816d5dcd 100644 --- a/connector-common-http/src/main/java/com/redhat/cloud/notifications/connector/http/HttpExceptionProcessor.java +++ b/connector-common-http/src/main/java/com/redhat/cloud/notifications/connector/http/HttpExceptionProcessor.java @@ -9,6 +9,7 @@ import org.apache.hc.client5.http.ConnectTimeoutException; import org.apache.hc.client5.http.HttpHostConnectException; import org.jboss.logging.Logger; +import org.jboss.resteasy.reactive.ClientWebApplicationException; import javax.net.ssl.SSLException; import javax.net.ssl.SSLHandshakeException; @@ -39,20 +40,10 @@ public class HttpExceptionProcessor extends ExceptionProcessor { @Override protected void process(Throwable t, Exchange exchange) { - if (t instanceof HttpOperationFailedException e) { - exchange.setProperty(HTTP_STATUS_CODE, e.getStatusCode()); - if (e.getStatusCode() >= 300 && e.getStatusCode() < 400) { - exchange.setProperty(HTTP_ERROR_TYPE, HTTP_3XX); - logHttpError(connectorConfig.getServerErrorLogLevel(), e, exchange); - } else if (e.getStatusCode() >= 400 && e.getStatusCode() < 500 && e.getStatusCode() != SC_TOO_MANY_REQUESTS) { - exchange.setProperty(HTTP_ERROR_TYPE, HTTP_4XX); - logHttpError(connectorConfig.getClientErrorLogLevel(), e, exchange); - } else if (e.getStatusCode() == SC_TOO_MANY_REQUESTS || e.getStatusCode() >= 500) { - exchange.setProperty(HTTP_ERROR_TYPE, HTTP_5XX); - logHttpError(connectorConfig.getServerErrorLogLevel(), e, exchange); - } else { - logHttpError(ERROR, e, exchange); - } + if (t instanceof ClientWebApplicationException e) { + manageReturnedStatusCode(exchange, e.getResponse().getStatus(), e.getResponse().readEntity(String.class)); + } else if (t instanceof HttpOperationFailedException e) { + manageReturnedStatusCode(exchange, e.getStatusCode(), e.getResponseBody()); } else if (t instanceof ConnectTimeoutException) { exchange.setProperty(HTTP_ERROR_TYPE, CONNECT_TIMEOUT); } else if (t instanceof SocketTimeoutException) { @@ -70,7 +61,23 @@ protected void process(Throwable t, Exchange exchange) { } } - private void logHttpError(Logger.Level level, HttpOperationFailedException e, Exchange exchange) { + private void manageReturnedStatusCode(Exchange exchange, int statusCode, String responseBody) { + exchange.setProperty(HTTP_STATUS_CODE, statusCode); + if (statusCode >= 300 && statusCode < 400) { + exchange.setProperty(HTTP_ERROR_TYPE, HTTP_3XX); + logHttpError(connectorConfig.getServerErrorLogLevel(), statusCode, responseBody, exchange); + } else if (statusCode >= 400 && statusCode < 500 && statusCode != SC_TOO_MANY_REQUESTS) { + exchange.setProperty(HTTP_ERROR_TYPE, HTTP_4XX); + logHttpError(connectorConfig.getClientErrorLogLevel(), statusCode, responseBody, exchange); + } else if (statusCode == SC_TOO_MANY_REQUESTS || statusCode >= 500) { + exchange.setProperty(HTTP_ERROR_TYPE, HTTP_5XX); + logHttpError(connectorConfig.getServerErrorLogLevel(), statusCode, responseBody, exchange); + } else { + logHttpError(ERROR, statusCode, responseBody, exchange); + } + } + + private void logHttpError(Logger.Level level, int statusCode, String responseBody, Exchange exchange) { Log.logf( level, HTTP_ERROR_LOG_MSG, @@ -78,8 +85,8 @@ private void logHttpError(Logger.Level level, HttpOperationFailedException e, Ex getOrgId(exchange), getExchangeId(exchange), getTargetUrl(exchange), - e.getStatusCode(), - e.getResponseBody() + statusCode, + responseBody ); } } diff --git a/connector-email/pom.xml b/connector-email/pom.xml index 366af3d5ae..6ab8aac64d 100644 --- a/connector-email/pom.xml +++ b/connector-email/pom.xml @@ -79,6 +79,13 @@ test + + + dev.failsafe + failsafe + ${failsafe.version} + + org.mock-server @@ -87,6 +94,16 @@ test + + io.quarkus + quarkus-junit5-mockito + test + + + io.rest-assured + rest-assured + test + diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailCloudEventDataExtractor.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailCloudEventDataExtractor.java index 304c78fc54..0f2616cec6 100644 --- a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailCloudEventDataExtractor.java +++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailCloudEventDataExtractor.java @@ -44,6 +44,6 @@ public void extract(final Exchange exchange, final JsonObject cloudEventData) { exchange.setProperty(ExchangeProperty.EMAIL_RECIPIENTS, emails); exchange.setProperty(ExchangeProperty.EMAIL_SENDER, emailNotification.emailSender()); - exchange.setProperty(ExchangeProperty.USE_EMAIL_BOP_V1_SSL, emailConnectorConfig.isEnableBopServiceWithSslChecks()); + exchange.setProperty(ExchangeProperty.USE_SIMPLIFIED_EMAIL_ROUTE, emailConnectorConfig.useSimplifiedEmailRoute(emailNotification.orgId())); } } diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailManagementProcessor.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailManagementProcessor.java new file mode 100644 index 0000000000..9a3bc2026e --- /dev/null +++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailManagementProcessor.java @@ -0,0 +1,112 @@ +package com.redhat.cloud.notifications.connector.email; + +import com.redhat.cloud.notifications.connector.email.config.EmailConnectorConfig; +import com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty; +import com.redhat.cloud.notifications.connector.email.model.settings.RecipientSettings; +import com.redhat.cloud.notifications.connector.email.model.settings.User; +import com.redhat.cloud.notifications.connector.email.processors.bop.BOPManager; +import com.redhat.cloud.notifications.connector.email.processors.recipientsresolver.ExternalRecipientsResolver; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; +import io.quarkus.logging.Log; +import io.vertx.core.json.JsonObject; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static com.redhat.cloud.notifications.connector.ExchangeProperty.ID; +import static com.redhat.cloud.notifications.connector.ExchangeProperty.ORG_ID; +import static com.redhat.cloud.notifications.connector.email.CloudEventHistoryBuilder.TOTAL_RECIPIENTS_KEY; +import static java.util.stream.Collectors.toSet; + +@ApplicationScoped +public class EmailManagementProcessor implements Processor { + + @Inject + EmailConnectorConfig emailConnectorConfig; + + @Inject + ExternalRecipientsResolver externalRecipientsResolver; + + @Inject + BOPManager bopManager; + + @Inject + MeterRegistry meterRegistry; + + static final String BOP_RESPONSE_TIME_METRIC = "email.bop.response.time"; + static final String RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC = "email.recipients_resolver.response.time"; + + @Override + public void process(final Exchange exchange) { + // fetch recipients + Set recipientsList = fetchRecipients(exchange); + + if (recipientsList.isEmpty()) { + Log.infof("Skipped Email notification because the recipients list was empty [orgId=$%s, historyId=%s]", exchange.getProperty(ORG_ID, String.class), exchange.getProperty(ID, String.class)); + } else { + // send to bop + sendToBop(exchange, recipientsList); + } + } + + private void sendToBop(Exchange exchange, Set recipientsList) { + // split recipient list and send it ot BOP + List> packedRecipients = partition(recipientsList, emailConnectorConfig.getMaxRecipientsPerEmail() - 1); + final String subject = exchange.getProperty(ExchangeProperty.RENDERED_SUBJECT, String.class); + final String body = exchange.getProperty(ExchangeProperty.RENDERED_BODY, String.class); + final String sender = exchange.getProperty(ExchangeProperty.EMAIL_SENDER, String.class); + + for (int i = 0; i < packedRecipients.size(); i++) { + final Timer.Sample bopResponseTimeMetric = Timer.start(meterRegistry); + bopManager.sendToBop(packedRecipients.get(i), subject, body, sender); + bopResponseTimeMetric.stop(meterRegistry.timer(BOP_RESPONSE_TIME_METRIC)); + Log.infof("Sent Email notification %d/%d [orgId=%s, historyId=%s]", i + 1, packedRecipients.size(), exchange.getProperty(ORG_ID, String.class), exchange.getProperty(ID, String.class)); + } + } + + private static List> partition(Set collection, int n) { + AtomicInteger counter = new AtomicInteger(); + return collection.stream() + .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / n)) + .values().stream().collect(Collectors.toList()); + } + + private Set fetchRecipients(Exchange exchange) { + List recipientSettings = exchange.getProperty(ExchangeProperty.RECIPIENT_SETTINGS, List.class); + Set subscribers = exchange.getProperty(ExchangeProperty.SUBSCRIBERS, Set.class); + Set unsubscribers = exchange.getProperty(ExchangeProperty.UNSUBSCRIBERS, Set.class); + JsonObject recipientsAuthorizationCriterion = exchange.getProperty(ExchangeProperty.RECIPIENTS_AUTHORIZATION_CRITERION, JsonObject.class); + + boolean subscribedByDefault = exchange.getProperty(ExchangeProperty.SUBSCRIBED_BY_DEFAULT, boolean.class); + final String orgId = exchange.getProperty(ORG_ID, String.class); + + final Timer.Sample recipientsResolverResponseTimeMetric = Timer.start(meterRegistry); + Set recipientsList = externalRecipientsResolver.recipientUsers( + orgId, + Set.copyOf(recipientSettings), + subscribers, + unsubscribers, + subscribedByDefault, + recipientsAuthorizationCriterion) + .stream().map(User::getEmail).filter(email -> email != null && !email.isBlank()).collect(toSet()); + recipientsResolverResponseTimeMetric.stop(meterRegistry.timer(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC)); + + Set emails = exchange.getProperty(ExchangeProperty.EMAIL_RECIPIENTS, Set.of(), Set.class); + if (emailConnectorConfig.isEmailsInternalOnlyEnabled()) { + Set forbiddenEmail = emails.stream().filter(email -> !email.trim().toLowerCase().endsWith("@redhat.com")).collect(Collectors.toSet()); + if (!forbiddenEmail.isEmpty()) { + Log.warnf(" %s emails are forbidden for message historyId: %s ", forbiddenEmail, exchange.getProperty(com.redhat.cloud.notifications.connector.ExchangeProperty.ID, String.class)); + } + emails.removeAll(forbiddenEmail); + } + recipientsList.addAll(emails); + exchange.setProperty(TOTAL_RECIPIENTS_KEY, recipientsList.size()); + return recipientsList; + } +} diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailRouteBuilder.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailRouteBuilder.java index e1dedb5727..958428ec9e 100644 --- a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailRouteBuilder.java +++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/EmailRouteBuilder.java @@ -16,7 +16,6 @@ import org.apache.camel.support.jsse.KeyStoreParameters; import org.apache.camel.support.jsse.SSLContextParameters; import org.apache.camel.support.jsse.TrustManagersParameters; -import org.apache.http.conn.ssl.NoopHostnameVerifier; import java.util.Set; @@ -24,8 +23,7 @@ import static com.redhat.cloud.notifications.connector.ExchangeProperty.ID; import static com.redhat.cloud.notifications.connector.ExchangeProperty.ORG_ID; import static com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty.FILTERED_USERS; -import static com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty.USE_EMAIL_BOP_V1_SSL; -import static com.redhat.cloud.notifications.connector.http.SslTrustAllManager.getSslContextParameters; +import static com.redhat.cloud.notifications.connector.email.constants.ExchangeProperty.USE_SIMPLIFIED_EMAIL_ROUTE; import static org.apache.camel.LoggingLevel.DEBUG; import static org.apache.camel.LoggingLevel.INFO; import static org.apache.camel.builder.endpoint.dsl.HttpEndpointBuilderFactory.HttpEndpointBuilder; @@ -61,6 +59,9 @@ public class EmailRouteBuilder extends EngineToConnectorRouteBuilder { @Inject EmailMetricsProcessor emailMetricsProcessor; + @Inject + EmailManagementProcessor emailManagementProcessor; + /** * Configures the flow for this connector. */ @@ -74,23 +75,27 @@ public void configureRoutes() { .to(direct(ENTRYPOINT)); } - /* - * Prepares the payload accepted by BOP and sends the request to - * the service. - */ - final HttpEndpointBuilder bopEndpointV1 = setUpBOPEndpointV1(); - from(seda(ENGINE_TO_CONNECTOR)) .routeId(emailConnectorConfig.getConnectorName()) - .process(recipientsResolverRequestPreparer) - .to(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC + TIMER_ACTION_START) - .to(setupRecipientResolverEndpoint()) - .to(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC + TIMER_ACTION_START) - .process(recipientsResolverResponseProcessor) - .choice().when(shouldSkipEmail()) - .log(INFO, getClass().getName(), "Skipped Email notification because the recipients list was empty [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "}]") - .otherwise() - .to(direct(Routes.SPLIT_AND_SEND)) + .choice() + .when(shouldUseSimplifiedEmailManagement()) + .process(emailManagementProcessor) + .endChoice() + .otherwise() + .process(recipientsResolverRequestPreparer) + .to(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC + TIMER_ACTION_START) + .to(setupRecipientResolverEndpoint()) + .to(RECIPIENTS_RESOLVER_RESPONSE_TIME_METRIC + TIMER_ACTION_START) + .process(recipientsResolverResponseProcessor) + .choice() + .when(shouldSkipEmail()) + .log(INFO, getClass().getName(), "Skipped Email notification because the recipients list was empty [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "}]") + .endChoice() + .otherwise() + .to(direct(Routes.SPLIT_AND_SEND)) + .endChoice() + .end() + .endChoice() .end() .to(direct(SUCCESS)); @@ -105,16 +110,10 @@ public void configureRoutes() { // Clear all the headers that may come from the previous route. .removeHeaders("*") .process(this.BOPRequestPreparer) - .choice().when(shouldUseBopEmailServiceWithSslChecks()) - .log(DEBUG, getClass().getName(), "Sent Email notification [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "} using regular SSL checks on email service]") - .to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_START) - .to(emailConnectorConfig.getBopURL()) - .to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_STOP) - .otherwise() - .to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_START) - .to(bopEndpointV1) - .to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_STOP) - .end() + .log(DEBUG, getClass().getName(), "Sent Email notification [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "} using regular SSL checks on email service]") + .to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_START) + .to(emailConnectorConfig.getBopURL()) + .to(BOP_RESPONSE_TIME_METRIC + TIMER_ACTION_STOP) .log(INFO, getClass().getName(), "Sent Email notification [orgId=${exchangeProperty." + ORG_ID + "}, historyId=${exchangeProperty." + ID + "}]") .process(emailMetricsProcessor); } @@ -123,27 +122,8 @@ private Predicate shouldSkipEmail() { return exchange -> exchange.getProperty(FILTERED_USERS, Set.class).isEmpty(); } - private Predicate shouldUseBopEmailServiceWithSslChecks() { - return exchange -> exchange.getProperty(USE_EMAIL_BOP_V1_SSL, Boolean.class); - } - - /** - * Creates the endpoint for the BOP service. It makes Apache Camel trust - * BOP service's certificate. - * @return the created endpoint. - */ - protected HttpEndpointBuilder setUpBOPEndpointV1() { - // Remove the schema from the url to avoid the - // "ResolveEndpointFailedException", which complaints about specifying - // the schema twice. - final String fullURL = this.emailConnectorConfig.getBopURL(); - if (fullURL.startsWith("https")) { - return https(fullURL.replace("https://", "")) - .sslContextParameters(getSslContextParameters()) - .x509HostnameVerifier(NoopHostnameVerifier.INSTANCE); - } else { - return http(fullURL.replace("http://", "")); - } + private Predicate shouldUseSimplifiedEmailManagement() { + return exchange -> exchange.getProperty(USE_SIMPLIFIED_EMAIL_ROUTE, false, Boolean.class); } private HttpEndpointBuilder setupRecipientResolverEndpoint() { diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/config/EmailConnectorConfig.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/config/EmailConnectorConfig.java index 0965bfedc8..911dca4799 100644 --- a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/config/EmailConnectorConfig.java +++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/config/EmailConnectorConfig.java @@ -1,6 +1,7 @@ package com.redhat.cloud.notifications.connector.email.config; import com.redhat.cloud.notifications.connector.http.HttpConnectorConfig; +import com.redhat.cloud.notifications.unleash.UnleashContextBuilder; import io.quarkus.runtime.LaunchMode; import jakarta.annotation.PostConstruct; import jakarta.enterprise.context.ApplicationScoped; @@ -74,13 +75,13 @@ public class EmailConnectorConfig extends HttpConnectorConfig { @ConfigProperty(name = RECIPIENTS_RESOLVER_TRUST_STORE_TYPE) Optional recipientsResolverTrustStoreType; - private String enableBopEmailServiceWithSslChecks; private String toggleKafkaIncomingHighVolumeTopic; + private String toggleUseSimplifiedEmailRoute; @PostConstruct void emailConnectorPostConstruct() { - enableBopEmailServiceWithSslChecks = toggleRegistry.register("enable-bop-service-ssl-checks", true); toggleKafkaIncomingHighVolumeTopic = toggleRegistry.register("kafka-incoming-high-volume-topic", true); + toggleUseSimplifiedEmailRoute = toggleRegistry.register("use-simplified-email-route", true); } @Override @@ -102,7 +103,7 @@ protected Map getLoggedConfiguration() { config.put(RECIPIENTS_RESOLVER_USER_SERVICE_URL, recipientsResolverServiceURL); config.put(MAX_RECIPIENTS_PER_EMAIL, maxRecipientsPerEmail); config.put(NOTIFICATIONS_EMAILS_INTERNAL_ONLY_ENABLED, emailsInternalOnlyEnabled); - config.put(enableBopEmailServiceWithSslChecks, isEnableBopServiceWithSslChecks()); + config.put(toggleUseSimplifiedEmailRoute, useSimplifiedEmailRoute(null)); config.put(toggleKafkaIncomingHighVolumeTopic, isIncomingKafkaHighVolumeTopicEnabled()); /* @@ -182,8 +183,12 @@ public Optional getRecipientsResolverTrustStoreType() { return recipientsResolverTrustStoreType; } - public boolean isEnableBopServiceWithSslChecks() { - return true; + public boolean useSimplifiedEmailRoute(String orgId) { + if (unleashEnabled) { + return unleash.isEnabled(toggleUseSimplifiedEmailRoute, UnleashContextBuilder.buildUnleashContextWithOrgId(orgId), false); + } else { + return false; + } } /** diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/constants/ExchangeProperty.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/constants/ExchangeProperty.java index b765f9ceb8..ddbb58a217 100644 --- a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/constants/ExchangeProperty.java +++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/constants/ExchangeProperty.java @@ -47,9 +47,9 @@ public class ExchangeProperty { public static final String RECIPIENTS_SIZE = "recipientsSize"; - public static final String USE_EMAIL_BOP_V1_SSL = "use_email_bop_V1_ssl"; - public static final String RECIPIENTS_AUTHORIZATION_CRITERION = "recipients_authorization_criterion"; public static final String ADDITIONAL_ERROR_DETAILS = "additionalErrorDetails"; + + public static final String USE_SIMPLIFIED_EMAIL_ROUTE = "use_simplified_email_route"; } diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/bop/BOPManager.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/bop/BOPManager.java new file mode 100644 index 0000000000..74dac09e8f --- /dev/null +++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/bop/BOPManager.java @@ -0,0 +1,84 @@ +package com.redhat.cloud.notifications.connector.email.processors.bop; + +import com.redhat.cloud.notifications.connector.email.config.EmailConnectorConfig; +import com.redhat.cloud.notifications.connector.email.model.bop.Email; +import com.redhat.cloud.notifications.connector.email.model.bop.SendEmailsRequest; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import dev.failsafe.function.CheckedRunnable; +import io.quarkus.logging.Log; +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.eclipse.microprofile.rest.client.inject.RestClient; +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Set; + +@ApplicationScoped +public class BOPManager { + + public static final String NOTIFICATIONS_BOP_RETRY_MAX_ATTEMPTS = "notifications.bop.retry.max-attempts"; + public static final String NOTIFICATIONS_BOP_RETRY_INITIAL_BACKOFF = "notifications.bop.retry.initial-backoff"; + public static final String NOTIFICATIONS_BOP_RETRY_MAX_BACKOFF = "notifications.bop.retry.max-backoff"; + + @ConfigProperty(name = NOTIFICATIONS_BOP_RETRY_MAX_ATTEMPTS, defaultValue = "3") + int maxRetryAttempts; + + @ConfigProperty(name = NOTIFICATIONS_BOP_RETRY_INITIAL_BACKOFF, defaultValue = "0.1S") + Duration initialRetryBackoff; + + @ConfigProperty(name = NOTIFICATIONS_BOP_RETRY_MAX_BACKOFF, defaultValue = "1S") + Duration maxRetryBackoff; + + private RetryPolicy retryPolicy; + + @Inject + @RestClient + BOPService bopService; + + @Inject + EmailConnectorConfig emailConnectorConfig; + + @PostConstruct + public void postConstruct() { + retryPolicy = RetryPolicy.builder() + .handle(IOException.class) + .withBackoff(initialRetryBackoff, maxRetryBackoff) + .withMaxAttempts(maxRetryAttempts) + .onRetriesExceeded(event -> { + // All retry attempts failed, let's log a warning about the failure. + Log.warn("Sending email to BOP service failed", event.getException()); + }) + .build(); + } + + private void retryOnError(final CheckedRunnable usersServiceCall) { + Failsafe.with(retryPolicy).run(usersServiceCall); + } + + public void sendToBop(List recipients, String subject, String body, String sender) { + + // Prepare the email to be sent + final Email email = new Email( + subject, + body, + Set.copyOf(recipients) + ); + + final SendEmailsRequest request = new SendEmailsRequest( + Set.of(email), + sender, + sender + ); + + retryOnError(() -> + bopService.sendEmail(emailConnectorConfig.getBopApiToken(), + emailConnectorConfig.getBopClientId(), + emailConnectorConfig.getBopEnv(), + request) + ); + } +} diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/bop/BOPService.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/bop/BOPService.java new file mode 100644 index 0000000000..c8a5c58c49 --- /dev/null +++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/bop/BOPService.java @@ -0,0 +1,29 @@ +package com.redhat.cloud.notifications.connector.email.processors.bop; + +import com.redhat.cloud.notifications.connector.email.model.bop.SendEmailsRequest; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.HeaderParam; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import org.apache.camel.Body; +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; + +/** + * REST client for both BOP and MBOP services, which in turn, talk to the IT + * service. + */ +@RegisterRestClient(configKey = "bop") +public interface BOPService { + + @Path("/v1/sendEmails") + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + void sendEmail( + @HeaderParam(Constants.BOP_API_TOKEN_HEADER) String apiToken, + @HeaderParam(Constants.BOP_CLIENT_ID_HEADER) String clientId, + @HeaderParam(Constants.BOP_ENV_HEADER) String environment, + @Body SendEmailsRequest sendEmailsRequest); +} diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipientsresolver/ExternalRecipientsResolver.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipientsresolver/ExternalRecipientsResolver.java new file mode 100644 index 0000000000..758b566bbb --- /dev/null +++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipientsresolver/ExternalRecipientsResolver.java @@ -0,0 +1,73 @@ +package com.redhat.cloud.notifications.connector.email.processors.recipientsresolver; + +import com.redhat.cloud.notifications.connector.email.model.settings.RecipientSettings; +import com.redhat.cloud.notifications.connector.email.model.settings.User; +import com.redhat.cloud.notifications.connector.email.processors.recipients.RecipientsQuery; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import dev.failsafe.function.CheckedSupplier; +import io.quarkus.logging.Log; +import io.vertx.core.json.JsonObject; +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.eclipse.microprofile.rest.client.inject.RestClient; +import java.io.IOException; +import java.time.Duration; +import java.util.Set; + +@ApplicationScoped +public class ExternalRecipientsResolver { + + @Inject + @RestClient + RecipientsResolverService recipientsResolverService; + + public static final String NOTIFICATIONS_RECIPIENTS_RESOLVER_RETRY_MAX_ATTEMPTS = "notifications.recipients-resolver.retry.max-attempts"; + public static final String NOTIFICATIONS_RECIPIENTS_RESOLVER_RETRY_INITIAL_BACKOFF = "notifications.recipients-resolver.retry.initial-backoff"; + public static final String NOTIFICATIONS_RECIPIENTS_RESOLVER_RETRY_MAX_BACKOFF = "notifications.recipients-resolver.retry.max-backoff"; + + @ConfigProperty(name = NOTIFICATIONS_RECIPIENTS_RESOLVER_RETRY_MAX_ATTEMPTS, defaultValue = "3") + int maxRetryAttempts; + + @ConfigProperty(name = NOTIFICATIONS_RECIPIENTS_RESOLVER_RETRY_INITIAL_BACKOFF, defaultValue = "0.1S") + Duration initialRetryBackoff; + + @ConfigProperty(name = NOTIFICATIONS_RECIPIENTS_RESOLVER_RETRY_MAX_BACKOFF, defaultValue = "1S") + Duration maxRetryBackoff; + + private RetryPolicy retryPolicy; + + @PostConstruct + public void postConstruct() { + retryPolicy = RetryPolicy.builder() + .handle(IOException.class) + .withBackoff(initialRetryBackoff, maxRetryBackoff) + .withMaxAttempts(maxRetryAttempts) + .onRetriesExceeded(event -> { + // All retry attempts failed, let's log a warning about the failure. + Log.warn("Users fetching from external service failed", event.getException()); + }) + .build(); + } + + private T retryOnError(final CheckedSupplier usersServiceCall) { + return Failsafe.with(retryPolicy).get(usersServiceCall); + } + + public Set recipientUsers(String orgId, Set recipientSettings, Set subscribers, Set unsubscribers, boolean subscribedByDefault, JsonObject recipientsAuthorizationCriterion) { + RecipientsQuery recipientsQuery = new RecipientsQuery(); + recipientsQuery.subscribers = Set.copyOf(subscribers); + recipientsQuery.unsubscribers = Set.copyOf(unsubscribers); + recipientsQuery.orgId = orgId; + Set recipientSettingsSet = Set.copyOf(recipientSettings); + + recipientsQuery.recipientSettings = recipientSettingsSet; + recipientsQuery.subscribedByDefault = subscribedByDefault; + recipientsQuery.recipientsAuthorizationCriterion = recipientsAuthorizationCriterion; + Set recipientsList = retryOnError(() -> recipientsResolverService.getRecipients(recipientsQuery)); + return recipientsList; + } + +} diff --git a/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipientsresolver/RecipientsResolverService.java b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipientsresolver/RecipientsResolverService.java new file mode 100644 index 0000000000..a37a20b034 --- /dev/null +++ b/connector-email/src/main/java/com/redhat/cloud/notifications/connector/email/processors/recipientsresolver/RecipientsResolverService.java @@ -0,0 +1,22 @@ +package com.redhat.cloud.notifications.connector.email.processors.recipientsresolver; + + +import com.redhat.cloud.notifications.connector.email.model.settings.User; +import com.redhat.cloud.notifications.connector.email.processors.recipients.RecipientsQuery; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; +import java.util.Set; + +@RegisterRestClient(configKey = "recipients-resolver") +public interface RecipientsResolverService { + + @PUT + @Path("/internal/recipients-resolver") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + Set getRecipients(RecipientsQuery resolversQuery); +} diff --git a/connector-email/src/main/resources/application.properties b/connector-email/src/main/resources/application.properties index 3f88025f7a..f1d7558ab6 100644 --- a/connector-email/src/main/resources/application.properties +++ b/connector-email/src/main/resources/application.properties @@ -41,16 +41,24 @@ mp.messaging.high-volume.topic=platform.notifications.connector.email.high.volum mp.messaging.tocamel.topic=platform.notifications.tocamel mp.messaging.fromcamel.topic=platform.notifications.fromcamel -%test.quarkus.log.category."com.redhat.cloud.notifications".level=DEBUG - -# Configuration of the various user providers that the connector supports. +# Bop configuration +notifications.connector.user-provider.bop.url=https://backoffice-proxy.apps.ext.spoke.preprod.us-west-2.aws.paas.redhat.com +quarkus.rest-client.bop.url=${notifications.connector.user-provider.bop.url} +# For some environments, bop uses same certificates as recipients-resolver because they are hosted in the same cluster. +quarkus.rest-client.bop.trust-store=${clowder.endpoints.notifications-recipients-resolver-service.trust-store-path} +quarkus.rest-client.bop.trust-store-password=${clowder.endpoints.notifications-recipients-resolver-service.trust-store-password} +quarkus.rest-client.bop.trust-store-type=${clowder.endpoints.notifications-recipients-resolver-service.trust-store-type} notifications.connector.user-provider.bop.api_token=changeme notifications.connector.user-provider.bop.client_id=changeme notifications.connector.user-provider.bop.env=changeme -notifications.connector.user-provider.bop.url=https://backoffice-proxy.apps.ext.spoke.preprod.us-west-2.aws.paas.redhat.com +# The URL for the recipients-resolver using camel integration notifications.connector.recipients-resolver.url=${clowder.endpoints.notifications-recipients-resolver-service.url:http://localhost:9008} -%test.notifications.connector.max-recipients-per-email=4 +# The URL for the recipients-resolver using rest client integration +quarkus.rest-client.recipients-resolver.url=${clowder.endpoints.notifications-recipients-resolver-service.url:http://localhost:9008} +quarkus.rest-client.recipients-resolver.trust-store=${clowder.endpoints.notifications-recipients-resolver-service.trust-store-path} +quarkus.rest-client.recipients-resolver.trust-store-password=${clowder.endpoints.notifications-recipients-resolver-service.trust-store-password} +quarkus.rest-client.recipients-resolver.trust-store-type=${clowder.endpoints.notifications-recipients-resolver-service.trust-store-type} quarkus.unleash.active=false quarkus.unleash.url=http://localhost:4242 diff --git a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/BuildBopEndpointTest.java b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/BuildBopEndpointTest.java deleted file mode 100644 index 2f97467c7d..0000000000 --- a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/BuildBopEndpointTest.java +++ /dev/null @@ -1,59 +0,0 @@ -package com.redhat.cloud.notifications.connector.email; - -import com.redhat.cloud.notifications.connector.email.config.EmailConnectorConfig; -import com.redhat.cloud.notifications.connector.http.SslTrustAllManager; -import io.quarkus.test.junit.QuarkusTest; -import io.quarkus.test.junit.mockito.InjectSpy; -import jakarta.inject.Inject; -import org.apache.camel.Endpoint; -import org.apache.camel.quarkus.test.CamelQuarkusTestSupport; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import static org.mockito.Mockito.when; - -@QuarkusTest -public class BuildBopEndpointTest extends CamelQuarkusTestSupport { - - @InjectSpy - EmailConnectorConfig emailConnectorConfig; - - @Inject - EmailRouteBuilder emailRouteBuilder; - - @Override - public boolean isUseAdviceWith() { - return true; - } - - /** - * Disables the route builder to ensure that the Camel Context does not get - * started before the routes have been advised. More information is - * available at the dkulp's Apache Camel Test documentation page. - * @return {@code false} in order to stop the Camel Context from booting - * before the routes have been advised. - */ - @Override - public boolean isUseRouteBuilder() { - return false; - } - - /** - * Tests that the function under test creates the BOP endpoint with the - * {@link SslTrustAllManager} class as the SSL context parameters, and that - * that class is essentially a NOOP class. - * @throws Exception if the endpoint could not be created. - */ - @Test - void testBuildBOPEndpoint() throws Exception { - String initialBopUrl = emailConnectorConfig.getBopURL(); - when(emailConnectorConfig.getBopURL()).thenReturn("https://test.com"); - - Endpoint bopEndpoint = this.emailRouteBuilder.setUpBOPEndpointV1().resolve(this.context); - Assertions.assertEquals(this.emailConnectorConfig.getBopURL(), bopEndpoint.getEndpointBaseUri(), "the base URI of the endpoint is not the same as the one set through the properties"); - - final String bopEndpointURI = bopEndpoint.getEndpointUri(); - Assertions.assertTrue(bopEndpointURI.contains("trustManager%3Dcom.redhat.cloud.notifications.connector.http.SslTrustAllManager"), "the endpoint does not contain a reference to the SslTrustAllManager"); - Assertions.assertTrue(bopEndpointURI.contains("x509HostnameVerifier=NO_OP"), "the base URI does not contain a mention to the NO_OP hostname verifier"); - } -} diff --git a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/EmailRouteBuilderWithSimplifiedRouteTest.java b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/EmailRouteBuilderWithSimplifiedRouteTest.java new file mode 100644 index 0000000000..68d1b39f16 --- /dev/null +++ b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/EmailRouteBuilderWithSimplifiedRouteTest.java @@ -0,0 +1,336 @@ +package com.redhat.cloud.notifications.connector.email; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.redhat.cloud.notifications.MockServerLifecycleManager; +import com.redhat.cloud.notifications.connector.email.config.EmailConnectorConfig; +import com.redhat.cloud.notifications.connector.email.model.EmailNotification; +import com.redhat.cloud.notifications.connector.email.model.settings.RecipientSettings; +import com.redhat.cloud.notifications.connector.email.model.settings.User; +import com.redhat.cloud.notifications.connector.email.processors.bop.BOPManager; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.mockito.InjectSpy; +import io.vertx.core.json.JsonObject; +import jakarta.inject.Inject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.AdviceWithRouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.quarkus.test.CamelQuarkusTestSupport; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockserver.mock.action.ExpectationResponseCallback; +import org.mockserver.model.Delay; +import org.mockserver.model.HttpRequest; +import org.mockserver.model.MediaType; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import static com.redhat.cloud.notifications.connector.ConnectorRoutesTest.KAFKA_SOURCE_MOCK; +import static com.redhat.cloud.notifications.connector.ConnectorToEngineRouteBuilder.CONNECTOR_TO_ENGINE; +import static com.redhat.cloud.notifications.connector.EngineToConnectorRouteBuilder.ENGINE_TO_CONNECTOR; +import static com.redhat.cloud.notifications.connector.IncomingCloudEventFilter.X_RH_NOTIFICATIONS_CONNECTOR_HEADER; +import static com.redhat.cloud.notifications.connector.IncomingCloudEventProcessor.*; +import static com.redhat.cloud.notifications.connector.email.CloudEventHistoryBuilder.TOTAL_RECIPIENTS_KEY; +import static org.apache.camel.builder.AdviceWith.adviceWith; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockserver.model.HttpResponse.response; + +@QuarkusTest +@QuarkusTestResource(TestLifecycleManager.class) +public class EmailRouteBuilderWithSimplifiedRouteTest extends CamelQuarkusTestSupport { + @InjectSpy + EmailConnectorConfig emailConnectorConfig; + + @Inject + ObjectMapper objectMapper; + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + static boolean camelRoutesInitialised = false; + + static MockEndpoint kafkaConnectorToEngine; + static MockEndpoint kafkaEngineToConnector; + + @InjectSpy + BOPManager bopManager; + + @BeforeEach + void beforeEach() { + when(emailConnectorConfig.useSimplifiedEmailRoute(anyString())).thenReturn(true); + } + + void initCamelRoutes() throws Exception { + + adviceWith(emailConnectorConfig.getConnectorName(), context(), new AdviceWithRouteBuilder() { + @Override + public void configure() throws Exception { + mockEndpoints( + "direct:" + CONNECTOR_TO_ENGINE + ); + mockEndpointsAndSkip("kafka:" + emailConnectorConfig.getOutgoingKafkaTopic()); + } + }); + + adviceWith(CONNECTOR_TO_ENGINE, context(), new AdviceWithRouteBuilder() { + @Override + public void configure() throws Exception { + mockEndpointsAndSkip("kafka:" + emailConnectorConfig.getOutgoingKafkaTopic()); + } + }); + + adviceWith(ENGINE_TO_CONNECTOR, context(), new AdviceWithRouteBuilder() { + @Override + public void configure() { + replaceFromWith(KAFKA_SOURCE_MOCK); + } + }); + + kafkaConnectorToEngine = getMockEndpoint("mock:kafka:" + emailConnectorConfig.getOutgoingKafkaTopic()); + kafkaEngineToConnector = getMockEndpoint("mock:" + KAFKA_SOURCE_MOCK); + } + + @Deprecated + void initMocks(ExpectationResponseCallback verifyEmptyRequest, ExpectationResponseCallback bopResponse) throws Exception { + + MockServerLifecycleManager.getClient().reset(); + getMockHttpRequest("/internal/recipients-resolver", "PUT", verifyEmptyRequest); + getMockHttpRequest("/v1/sendEmails", "POST", bopResponse); + if (!camelRoutesInitialised) { + initCamelRoutes(); + camelRoutesInitialised = true; + } + } + + @Test + void testEmptyRecipients() throws Exception { + + ExpectationResponseCallback recipientsResolverResponse = req -> response().withBody("[]").withStatusCode(200); + ExpectationResponseCallback bopResponse = req -> response().withStatusCode(200); + initMocks(recipientsResolverResponse, bopResponse); + + kafkaConnectorToEngine.expectedMessageCount(1); + + buildCloudEventAndSendIt(null); + + kafkaConnectorToEngine.assertIsSatisfied(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testWithRecipients(boolean emailsInternalOnlyEnabled) throws Exception { + try { + emailConnectorConfig.setEmailsInternalOnlyEnabled(emailsInternalOnlyEnabled); + Set users = TestUtils.createUsers("user-1", "user-2", "user-3", "user-4", "user-5", "user-6", "user-7"); + String strUsers = objectMapper.writeValueAsString(users); + ExpectationResponseCallback recipientsResolverResponse = req -> response().withContentType(MediaType.APPLICATION_JSON).withBody(strUsers).withStatusCode(200); + ExpectationResponseCallback bopResponse = req -> response().withContentType(MediaType.APPLICATION_JSON).withStatusCode(200); + initMocks(recipientsResolverResponse, bopResponse); + + Set additionalEmails = Set.of("redhat_user@redhat.com", "external_user@noway.com"); + int usersAndRecipientsTotalNumber = users.size() + additionalEmails.size(); + + kafkaConnectorToEngine.expectedMessageCount(1); + + buildCloudEventAndSendIt(additionalEmails); + + kafkaConnectorToEngine.assertIsSatisfied(2000); + + final ArgumentCaptor> listCaptor = ArgumentCaptor.forClass((Class) List.class); + verify(bopManager, times(3)) + .sendToBop(listCaptor.capture(), anyString(), anyString(), anyString()); + + checkRecipientsAndHistory(usersAndRecipientsTotalNumber, listCaptor.getAllValues(), kafkaConnectorToEngine, emailsInternalOnlyEnabled, "external_user@noway.com"); + } finally { + emailConnectorConfig.setEmailsInternalOnlyEnabled(false); + } + } + + @Test + void testFailureFetchingRecipientsInternalError() throws Exception { + + ExpectationResponseCallback recipientsResolverResponse = req -> response().withContentType(MediaType.APPLICATION_JSON).withStatusCode(500); + initMocks(recipientsResolverResponse, null); + + kafkaConnectorToEngine.expectedMessageCount(1); + + buildCloudEventAndSendIt(null); + + kafkaConnectorToEngine.assertIsSatisfied(); + List responseDetails = checkRecipientsAndHistoryFailure(kafkaConnectorToEngine, 0, true); + for (JsonObject responseDetail : responseDetails) { + assertEquals(500, responseDetail.getJsonObject("error").getInteger("http_status_code")); + assertEquals("HTTP_5XX", responseDetail.getJsonObject("error").getString("error_type")); + } + } + + @Test + void testFailureFetchingRecipientsTimeout() throws Exception { + initMocks(null, null); + + kafkaConnectorToEngine.expectedMessageCount(1); + + buildCloudEventAndSendIt(null); + + kafkaConnectorToEngine.assertIsSatisfied(); + checkRecipientsAndHistoryFailure(kafkaConnectorToEngine, 0, false); + } + + @Test + void testFailureBopInternalError() throws Exception { + + Set users = TestUtils.createUsers("user-1", "user-2", "user-3", "user-4", "user-5", "user-6", "user-7"); + String strUsers = objectMapper.writeValueAsString(users); + ExpectationResponseCallback recipientsResolverResponse = req -> response().withBody(strUsers).withContentType(MediaType.APPLICATION_JSON).withStatusCode(200); + ExpectationResponseCallback bopInternalError = req -> response().withContentType(MediaType.APPLICATION_JSON).withStatusCode(500); + initMocks(recipientsResolverResponse, bopInternalError); + + kafkaConnectorToEngine.expectedMessageCount(1); + buildCloudEventAndSendIt(null); + + kafkaConnectorToEngine.assertIsSatisfied(); + + List responseDetails = checkRecipientsAndHistoryFailure(kafkaConnectorToEngine, 7, true); + for (JsonObject responseDetail : responseDetails) { + assertEquals(500, responseDetail.getJsonObject("error").getInteger("http_status_code")); + assertEquals("HTTP_5XX", responseDetail.getJsonObject("error").getString("error_type")); + } + } + + @Test + void testFailureBopRecipientsTimeout() throws Exception { + + Set users = TestUtils.createUsers("user-1", "user-2", "user-3", "user-4", "user-5", "user-6", "user-7"); + String strUsers = objectMapper.writeValueAsString(users); + ExpectationResponseCallback recipientsResolverResponse = req -> response().withContentType(MediaType.APPLICATION_JSON).withBody(strUsers).withStatusCode(200); + + initMocks(recipientsResolverResponse, null); + + kafkaConnectorToEngine.expectedMessageCount(1); + buildCloudEventAndSendIt(null); + + kafkaConnectorToEngine.assertIsSatisfied(); + + checkRecipientsAndHistoryFailure(kafkaConnectorToEngine, 7, false); + } + + private List checkRecipientsAndHistoryFailure(MockEndpoint kafkaEndpoint, int expectedRecipientNumber, boolean errorDetailsExpected) { + + int expectedBopRequests = expectedRecipientNumber > 0 ? 1 : 0; + verify(bopManager, times(expectedBopRequests)) + .sendToBop(anyList(), anyString(), anyString(), anyString()); + + List dataToReturn = new ArrayList<>(); + // check metrics sent to engine + for (Exchange kafkaMessage : kafkaEndpoint.getReceivedExchanges()) { + JsonObject payload = new JsonObject(kafkaMessage.getIn().getBody(String.class)); + JsonObject data = new JsonObject(payload.getString("data")); + + assertFalse(data.getBoolean("successful")); + assertTrue(data.containsKey("details")); + assertEquals(expectedRecipientNumber, data.getJsonObject("details").getInteger(TOTAL_RECIPIENTS_KEY)); + assertFalse(data.getJsonObject("details").getString("outcome").isBlank()); + if (errorDetailsExpected) { + assertTrue(data.containsKey("error")); + assertFalse(data.getJsonObject("error").getString("error_type").isBlank()); + } + dataToReturn.add(data); + } + return dataToReturn; + } + + private static void checkRecipientsAndHistory(int usersAndRecipientsTotalNumber, List> recipientsSentToBop, MockEndpoint kafkaEndpoint, boolean emailsInternalOnlyEnabled, String filteredRecipient) { + + // check recipients sent to bop + Set receivedEmails = new HashSet<>(); + for (List recipientsList : recipientsSentToBop) { + assertTrue(recipientsList.size() <= 3); + receivedEmails.addAll(recipientsList); + } + + if (emailsInternalOnlyEnabled) { + assertFalse(receivedEmails.contains(filteredRecipient)); + assertEquals(usersAndRecipientsTotalNumber - 1, receivedEmails.size()); + } else { + assertTrue(receivedEmails.contains(filteredRecipient)); + assertEquals(usersAndRecipientsTotalNumber, receivedEmails.size()); + } + + // check metrics sent to engine + Exchange kafkaMessage = kafkaEndpoint.getReceivedExchanges().stream().findFirst().get(); + JsonObject payload = new JsonObject(kafkaMessage.getIn().getBody(String.class)); + JsonObject data = new JsonObject(payload.getString("data")); + + assertTrue(data.getBoolean("successful")); + + if (emailsInternalOnlyEnabled) { + assertEquals(usersAndRecipientsTotalNumber - 1, data.getJsonObject("details").getInteger(TOTAL_RECIPIENTS_KEY)); + } else { + assertEquals(usersAndRecipientsTotalNumber, data.getJsonObject("details").getInteger(TOTAL_RECIPIENTS_KEY)); + } + } + + private HttpRequest getMockHttpRequest(String path, String method, ExpectationResponseCallback expectationResponseCallback) { + HttpRequest postReq = new HttpRequest() + .withPath(path) + .withMethod(method); + + Delay responseDelay = Delay.milliseconds(0); + if (expectationResponseCallback == null) { + responseDelay = Delay.seconds(1); + } + MockServerLifecycleManager.getClient() + .withSecure(false) + .when(postReq) + .respond(expectationResponseCallback, responseDelay); + return postReq; + } + + private void buildCloudEventAndSendIt(Set emailRecipients) { + final JsonObject cloudEvent = generateIncomingCloudEvent(emailRecipients); + + final Map headers = new HashMap<>(); + headers.put(X_RH_NOTIFICATIONS_CONNECTOR_HEADER, emailConnectorConfig.getConnectorName()); + template.sendBodyAndHeaders(KAFKA_SOURCE_MOCK, cloudEvent.encode(), headers); + } + + private JsonObject generateIncomingCloudEvent(Set emailRecipients) { + RecipientSettings recipientSettings = new RecipientSettings(false, false, null, null, emailRecipients); + + final EmailNotification emailNotification = new EmailNotification( + "test email body", + "test email subject", + "Not used", + "123456", + List.of(recipientSettings), + new ArrayList<>(), + new ArrayList<>(), + false, + null + ); + final JsonObject payload = JsonObject.mapFrom(emailNotification); + + final String cloudEventId = UUID.randomUUID().toString(); + + final JsonObject cloudEvent = new JsonObject(); + cloudEvent.put(CLOUD_EVENT_ID, cloudEventId); + cloudEvent.put(CLOUD_EVENT_TYPE, "com.redhat.console.notification.toCamel." + emailConnectorConfig.getConnectorName()); + cloudEvent.put(CLOUD_EVENT_DATA, JsonObject.mapFrom(payload)); + return cloudEvent; + } +} diff --git a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/TestLifecycleManager.java b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/TestLifecycleManager.java index 9e3418f885..2132ea8fc1 100644 --- a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/TestLifecycleManager.java +++ b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/TestLifecycleManager.java @@ -15,7 +15,9 @@ public Map start() { MockServerLifecycleManager.start(); Map properties = new HashMap<>(); properties.put("notifications.connector.recipients-resolver.url", getMockServerUrl()); + properties.put("quarkus.rest-client.recipients-resolver.url", getMockServerUrl()); properties.put("notifications.connector.user-provider.bop.url", getMockServerUrl()); + properties.put("quarkus.rest-client.bop.url", getMockServerUrl()); return properties; } diff --git a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/processors/bop/BOPRequestPreparerTest.java b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/processors/bop/BOPRequestPreparerTest.java index be6171e2ad..71344a3630 100644 --- a/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/processors/bop/BOPRequestPreparerTest.java +++ b/connector-email/src/test/java/com/redhat/cloud/notifications/connector/email/processors/bop/BOPRequestPreparerTest.java @@ -47,7 +47,6 @@ void testProcess(boolean useBopV2Service) { final Exchange exchange = createExchangeWithBody(context, emails); exchange.setProperty(ExchangeProperty.RENDERED_SUBJECT, emailSubject); exchange.setProperty(ExchangeProperty.RENDERED_BODY, emailBody); - exchange.setProperty(ExchangeProperty.USE_EMAIL_BOP_V1_SSL, useBopV2Service); // Call the processor under test. this.bopRequestPreparer.process(exchange); diff --git a/connector-email/src/test/resources/application.properties b/connector-email/src/test/resources/application.properties index 57cd56b61d..8bd4ba0e1c 100644 --- a/connector-email/src/test/resources/application.properties +++ b/connector-email/src/test/resources/application.properties @@ -1,3 +1,8 @@ notifications.connector.http.connect-timeout-ms=200 notifications.connector.http.socket-timeout-ms=200 +quarkus.rest-client.bop.read-timeout=500 +quarkus.rest-client.recipients-resolver.read-timeout=500 + notifications.connector.redelivery.delay=5 + +notifications.connector.max-recipients-per-email=4