Skip to content

Commit

Permalink
RANGER-5069: Add ability to Kafka authorizer to define super users th…
Browse files Browse the repository at this point in the history
…rough Kafka config

Signed-off-by: Madhan Neethiraj <[email protected]>
  • Loading branch information
fonaid authored and mneethiraj committed Dec 31, 2024
1 parent 236e69c commit ca73b3c
Show file tree
Hide file tree
Showing 4 changed files with 303 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.File;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -56,7 +57,7 @@ public class RangerPluginConfig extends RangerConfiguration {
private Set<String> auditExcludedUsers = Collections.emptySet();
private Set<String> auditExcludedGroups = Collections.emptySet();
private Set<String> auditExcludedRoles = Collections.emptySet();
private Set<String> superUsers = Collections.emptySet();
private Set<String> superUsers = new HashSet<>();
private Set<String> superGroups = Collections.emptySet();
private Set<String> serviceAdmins = Collections.emptySet();

Expand Down Expand Up @@ -224,7 +225,7 @@ public void setAuditExcludedUsersGroupsRoles(Set<String> users, Set<String> grou
}

public void setSuperUsersGroups(Set<String> users, Set<String> groups) {
superUsers = CollectionUtils.isEmpty(users) ? Collections.emptySet() : new HashSet<>(users);
superUsers = CollectionUtils.isEmpty(users) ? new HashSet<>() : new HashSet<>(users);
superGroups = CollectionUtils.isEmpty(groups) ? Collections.emptySet() : new HashSet<>(groups);

LOG.debug("superUsers={}, superGroups={}", superUsers, superGroups);
Expand Down Expand Up @@ -258,6 +259,12 @@ public boolean isServiceAdmin(String userName) {
return serviceAdmins.contains(userName);
}

public void addSuperUsers(Collection<String> users) {
if (users != null) {
superUsers.addAll(users);
}
}

private void addResourcesForServiceType(String serviceType) {
String auditCfg = "ranger-" + serviceType + "-audit.xml";
String securityCfg = "ranger-" + serviceType + "-security.xml";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.server.authorizer.AclCreateResult;
Expand All @@ -53,6 +54,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -80,6 +82,8 @@ public class RangerKafkaAuthorizer implements Authorizer {
public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";

private static final String KAFKA_SUPER_USERS_PROP = "super.users";

private static volatile RangerBasePlugin rangerPlugin;

RangerKafkaAuditHandler auditHandler;
Expand Down Expand Up @@ -132,6 +136,12 @@ public void configure(Map<String, ?> configs) {

me.init();

Set<String> superUsersFromKafkaConfig = parseSuperUsersFromKafkaConfig(configs);

me.getPluginContext().getConfig().addSuperUsers(superUsersFromKafkaConfig);

logger.info("Super users added from Kafka config: {}", superUsersFromKafkaConfig);

auditHandler = new RangerKafkaAuditHandler();

me.setResultProcessor(auditHandler);
Expand Down Expand Up @@ -365,4 +375,41 @@ private Collection<RangerAccessResult> callRangerPlugin(List<RangerAccessRequest
auditHandler.flushAudit();
}
}

private Set<String> parseSuperUsersFromKafkaConfig(Map<String, ?> configs) {
if (configs == null) {
return Collections.emptySet();
}

Object kafkaSuperUsersConfig = configs.get(KAFKA_SUPER_USERS_PROP);

if (kafkaSuperUsersConfig == null) {
return Collections.emptySet();
}

if (!(kafkaSuperUsersConfig instanceof String)) {
logger.warn("super.users in Kafka config could not be parsed");

return Collections.emptySet();
}

String kafkaSuperUsers = (String) kafkaSuperUsersConfig;
String[] principals = kafkaSuperUsers.split(";");
Set<String> superUserNames = new HashSet<>();

for (String principal : principals) {
try {
KafkaPrincipal parsedPrincipal = SecurityUtils.parseKafkaPrincipal(principal.trim());
String userName = parsedPrincipal.getName();

if (KafkaPrincipal.USER_TYPE.equals(parsedPrincipal.getPrincipalType()) && StringUtils.isNotEmpty(userName)) {
superUserNames.add(userName);
}
} catch (Exception e) {
logger.warn("Kafka principal: \"{}\" could not be parsed and will not be added to the authorized super users list", principal, e);
}
}

return Collections.unmodifiableSet(superUserNames);
}
}
Loading

0 comments on commit ca73b3c

Please sign in to comment.