Skip to content

Commit

Permalink
Add a new config "unsupportedConfigs" and refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
maobaolong committed Jan 3, 2025
1 parent 3d2c4df commit 39e01fa
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,15 @@ public class CoordinatorConf extends RssBaseConf {
.defaultValues("appHeartbeat", "heartbeat")
.withDescription("Exclude record rpc audit operation list, separated by ','");

public static final ConfigOption<List<String>> COORDINATOR_UNSUPPORTED_CONFIGS =
ConfigOptions.key("rss.coordinator.unsupportedConfigs")
.stringType()
.asList()
.defaultValues("serializer:org.apache.hadoop.io.serializer.JavaSerialization")
.withDescription(
"The unsupported config list separated by ',', the key value separated by ':'. If the client configures these properties "
+ "and they are set to be denied access, the client's access will be rejected.");

public CoordinatorConf() {}

public CoordinatorConf(String fileName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,16 @@
/** Abstract class for checking the access info from the client-side. */
public abstract class AbstractAccessChecker implements AccessChecker {

protected AbstractAccessChecker(AccessManager accessManager) throws Exception {}
private final AccessManager accessManager;

protected AbstractAccessChecker(AccessManager accessManager) throws Exception {
this.accessManager = accessManager;
}

@Override
public void refreshAccessChecker() {}

public AccessManager getAccessManager() {
return accessManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
package org.apache.uniffle.coordinator.access.checker;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import org.apache.hadoop.io.serializer.JavaSerialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.AccessManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.access.AccessCheckResult;
import org.apache.uniffle.coordinator.access.AccessInfo;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
Expand All @@ -35,21 +39,43 @@
*/
public class AccessSupportRssChecker extends AbstractAccessChecker {
private static final Logger LOG = LoggerFactory.getLogger(AccessSupportRssChecker.class);
private final HashMap<String, String> unsupportedConfigMap;

public AccessSupportRssChecker(AccessManager accessManager) throws Exception {
super(accessManager);
List<String> unsupportedConfigs =
accessManager.getCoordinatorConf().get(CoordinatorConf.COORDINATOR_UNSUPPORTED_CONFIGS);
unsupportedConfigMap = new HashMap<>(unsupportedConfigs.size());
if (unsupportedConfigs != null && !unsupportedConfigs.isEmpty()) {
for (String keyValue : unsupportedConfigs) {
String[] pair = keyValue.split(":", 2);
if (pair.length == 2) {
unsupportedConfigMap.put(pair[0], pair[1]);
} else {
LOG.error("Unsupported config {} has wrong format, skip it.", keyValue);
}
}
}
}

@Override
public AccessCheckResult check(AccessInfo accessInfo) {
String serializer = accessInfo.getExtraProperties().get("serializer");
if (JavaSerialization.class.getName().equals(serializer)) {
String msg = String.format("Denied by AccessSupportRssChecker, accessInfo[%s].", accessInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("serializer is {}, {}", serializer, msg);
if (unsupportedConfigMap == null || unsupportedConfigMap.isEmpty()) {
return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE);
}
for (Map.Entry<String, String> entry : unsupportedConfigMap.entrySet()) {
String unsupportedConfKey = entry.getKey();
String unsupportedConfValue = entry.getValue();
String actualConfValue = accessInfo.getExtraProperties().get(unsupportedConfKey);
if (Objects.equals(actualConfValue, unsupportedConfValue)) {
String msg =
String.format(
"Denied by AccessSupportRssChecker, %s is %s, AccessSupportRssChecker does not supported.",
unsupportedConfKey, actualConfValue);
LOG.debug(msg);
CoordinatorMetrics.counterTotalSupportRssDeniedRequest.inc();
return new AccessCheckResult(false, msg);
}
CoordinatorMetrics.counterTotalSupportRssDeniedRequest.inc();
return new AccessCheckResult(false, msg);
}

return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE);
Expand Down
1 change: 1 addition & 0 deletions docs/coordinator_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ This document will introduce how to deploy Uniffle coordinators.
| rss.reconfigure.interval.sec | 5 | Reconfigure check interval. |
| rss.coordinator.rpc.audit.log.enabled | true | When set to true, for auditing purposes, the coordinator will log audit records for every rpc request operation. |
| rss.coordinator.rpc.audit.log.excludeList | appHeartbeat,heartbeat | Exclude record rpc audit operation list, separated by ','. |
| rss.coordinator.unsupportedConfigs | serializer:org.apache.hadoop.io.serializer.JavaSerialization | The unsupported config list separated by ',', the key value separated by ':'. If the client configures these properties and they are set to be denied access, the client's access will be rejected. |

### AccessClusterLoadChecker settings
|Property Name|Default| Description|
Expand Down

0 comments on commit 39e01fa

Please sign in to comment.