Skip to content

Commit

Permalink
RANGER-4943: Error in ElasticSearchAuditDestination shutting down Res…
Browse files Browse the repository at this point in the history
…tHighLevelClient client
  • Loading branch information
FerArribas14 committed Jan 21, 2025
1 parent 2ea2842 commit 7a6c0f6
Showing 1 changed file with 35 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.ranger.audit.destination;

import java.io.File;
import java.io.IOException;
import java.security.PrivilegedActionException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -98,7 +99,8 @@ public void init(Properties props, String propPrefix) {
this.port = MiscUtil.getIntProperty(props, propPrefix + "." + CONFIG_PORT, 9200);
this.index = getStringProperty(props, propPrefix + "." + CONFIG_INDEX, DEFAULT_INDEX);
this.hosts = getHosts();
LOG.info("Connecting to ElasticSearch: " + connectionString());

LOG.info("Connecting to ElasticSearch: {}", connectionString());
getClient(); // Initialize client
}

Expand Down Expand Up @@ -152,9 +154,7 @@ public boolean log(Collection<AuditEventBase> events) {
addFailedCount(1);
logFailedEvent(Arrays.asList(itemRequest), itemResponse.getFailureMessage());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Indexed %s", itemRequest.getEventKey()));
}
LOG.debug("Indexed {}", itemRequest.getEventKey());
addSuccessCount(1);
ret = true;
}
Expand Down Expand Up @@ -219,10 +219,7 @@ public static RestClientBuilder getRestClientBuilder(String urls, String protoco
.map(x -> new HttpHost(x, port, protocol))
.toArray(HttpHost[]::new)
);
ThreadFactory clientThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("ElasticSearch rest client %s")
.setDaemon(true)
.build();
ThreadFactory clientThreadFactory = new ThreadFactoryBuilder().setNameFormat("ElasticSearch rest client %s").setDaemon(true).build();
if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password) && !user.equalsIgnoreCase("NONE") && !password.equalsIgnoreCase("NONE")) {
if (password.contains("keytab") && new File(password).exists()) {
final KerberosCredentialsProvider credentialsProvider =
Expand All @@ -236,8 +233,7 @@ public static RestClientBuilder getRestClientBuilder(String urls, String protoco
return clientBuilder;
});
} else {
final CredentialsProvider credentialsProvider =
CredentialsProviderUtil.getBasicCredentials(user, password);
final CredentialsProvider credentialsProvider = CredentialsProviderUtil.getBasicCredentials(user, password);
restClientBuilder.setHttpClientConfigCallback(clientBuilder -> {
clientBuilder.setThreadFactory(clientThreadFactory);
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
Expand All @@ -257,42 +253,50 @@ public static RestClientBuilder getRestClientBuilder(String urls, String protoco
}

private RestHighLevelClient newClient() {
RestHighLevelClient restHighLevelClient = null;

try {
if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password) && password.contains("keytab") && new File(password).exists()) {
subject = CredentialsProviderUtil.login(user, password);
}
RestClientBuilder restClientBuilder =
getRestClientBuilder(hosts, protocol, user, password, port);
try (RestHighLevelClient restHighLevelClient = new RestHighLevelClient(restClientBuilder)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Initialized client");
}
boolean exists = false;
try {
exists = restHighLevelClient.indices().open(new OpenIndexRequest(this.index), RequestOptions.DEFAULT).isShardsAcknowledged();
} catch (Exception e) {
LOG.warn("Error validating index " + this.index);
}
if (exists) {
if (LOG.isDebugEnabled()) {
LOG.debug("Index exists");
}
} else {
LOG.info("Index does not exist");
}
return restHighLevelClient;
RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, protocol, user, password, port);
restHighLevelClient = new RestHighLevelClient(restClientBuilder);
boolean exists = false;

try {
exists = restHighLevelClient.indices().open(new OpenIndexRequest(this.index), RequestOptions.DEFAULT).isShardsAcknowledged();
} catch (Exception e) {
LOG.warn("Error validating index {}", this.index);
}

if (exists) {
LOG.debug("Index exists");
} else {
LOG.info("Index does not exist");
}

return restHighLevelClient;
} catch (Throwable t) {
lastLoggedAt.updateAndGet(lastLoggedAt -> {
long now = System.currentTimeMillis();
long elapsed = now - lastLoggedAt;
if (elapsed > TimeUnit.MINUTES.toMillis(1)) {
LOG.error("Can't connect to ElasticSearch server: " + connectionString(), t);
LOG.error("Can't connect to ElasticSearch server: {}", connectionString(), t);
return now;
} else {
return lastLoggedAt;
}
});

if (restHighLevelClient != null) {
try {
restHighLevelClient.close();
LOG.debug("Closed RestHighLevelClient after failure");
} catch (IOException e) {
LOG.warn("Error closing RestHighLevelClient: {}", e.getMessage(), e);
}
}

return null;
}
}
Expand Down Expand Up @@ -346,5 +350,4 @@ Map<String, Object> toDoc(AuthzAuditEvent auditEvent) {
doc.put("policyVersion", auditEvent.getPolicyVersion());
return doc;
}

}

0 comments on commit 7a6c0f6

Please sign in to comment.