From 2da8a1f55928ff57d8b4b9cefb8e82109943aa83 Mon Sep 17 00:00:00 2001 From: Madhan Neethiraj Date: Fri, 25 Aug 2023 00:50:41 -0700 Subject: [PATCH] RANGER-4375: updated to log plugin activities asynchronously - #2 --- .../java/org/apache/ranger/biz/AssetMgr.java | 93 +++++++++++-------- .../service/RangerTransactionService.java | 57 +++++++++++- 2 files changed, 108 insertions(+), 42 deletions(-) diff --git a/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java b/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java index 1dc3d372df..8bbeba7834 100644 --- a/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java +++ b/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java @@ -44,7 +44,6 @@ import org.apache.ranger.common.DateUtil; import org.apache.ranger.common.JSONUtil; import org.apache.ranger.common.MessageEnums; -import org.apache.ranger.common.PropertiesUtil; import org.apache.ranger.common.RangerCommonEnums; import org.apache.ranger.common.RangerConstants; import org.apache.ranger.common.SearchCriteria; @@ -75,6 +74,9 @@ @Component public class AssetMgr extends AssetMgrBase { + private static final String PROP_RANGER_LOG_SC_NOT_MODIFIED = "ranger.log.SC_NOT_MODIFIED"; + private static final String PROP_PLUGIN_ACTIVITY_AUDIT_NOT_MODIFIED = "ranger.plugin.activity.audit.not.modified"; + private static final String PROP_PLUGIN_ACTIVITY_AUDIT_COMMIT_INLINE = "ranger.plugin.activity.audit.commit.inline"; @Autowired XPermMapService xPermMapService; @@ -136,7 +138,9 @@ public class AssetMgr extends AssetMgrBase { @Autowired ServiceMgr serviceMgr; - boolean pluginActivityAuditCommitInline = false; + boolean rangerLogNotModified = false; + boolean pluginActivityAuditLogNotModified = false; + boolean pluginActivityAuditCommitInline = false; private static final Logger logger = LoggerFactory.getLogger(AssetMgr.class); @@ -146,9 +150,13 @@ public class AssetMgr extends AssetMgrBase { public void init() { logger.info("==> AssetMgr.init()"); - pluginActivityAuditCommitInline = RangerAdminConfig.getInstance().getBoolean("ranger.plugin.activity.audit.commit.inline", false); + rangerLogNotModified = RangerAdminConfig.getInstance().getBoolean(PROP_RANGER_LOG_SC_NOT_MODIFIED, false); + pluginActivityAuditLogNotModified = RangerAdminConfig.getInstance().getBoolean(PROP_PLUGIN_ACTIVITY_AUDIT_NOT_MODIFIED, false); + pluginActivityAuditCommitInline = RangerAdminConfig.getInstance().getBoolean(PROP_PLUGIN_ACTIVITY_AUDIT_COMMIT_INLINE, false); - logger.info("ranger.plugin.activity.audit.commit.inline={}", pluginActivityAuditCommitInline); + logger.info("{}={}", PROP_RANGER_LOG_SC_NOT_MODIFIED, rangerLogNotModified); + logger.info("{}={}", PROP_PLUGIN_ACTIVITY_AUDIT_NOT_MODIFIED, pluginActivityAuditLogNotModified); + logger.info("{}={}", PROP_PLUGIN_ACTIVITY_AUDIT_COMMIT_INLINE, pluginActivityAuditCommitInline); logger.info("<== AssetMgr.init()"); } @@ -662,13 +670,10 @@ public void UpdateDefaultPolicyUserAndPerm(VXResource vXResource, public void createPolicyAudit(final XXPolicyExportAudit xXPolicyExportAudit) { final Runnable commitWork; if (xXPolicyExportAudit.getHttpRetCode() == HttpServletResponse.SC_NOT_MODIFIED) { - boolean logNotModified = PropertiesUtil.getBooleanProperty("ranger.log.SC_NOT_MODIFIED", false); - if (!logNotModified) { - commitWork = null; + if (!rangerLogNotModified) { + logger.debug("Not logging HttpServletResponse. SC_NOT_MODIFIED. To enable, set configuration: {}=true", PROP_RANGER_LOG_SC_NOT_MODIFIED); - logger.debug("Not logging HttpServletResponse." - + "SC_NOT_MODIFIED, to enable, update " - + ": ranger.log.SC_NOT_MODIFIED"); + commitWork = null; } else { // Create PolicyExportAudit record after transaction is completed. If it is created in-line here // then the TransactionManager will roll-back the changes because the HTTP return code is @@ -762,34 +767,40 @@ private void createOrUpdatePluginInfo(final RangerPluginInfo pluginInfo, int ent final Runnable commitWork; if (httpCode == HttpServletResponse.SC_NOT_MODIFIED) { - // Create or update PluginInfo record after transaction is completed. If it is created in-line here - // then the TransactionManager will roll-back the changes because the HTTP return code is - // HttpServletResponse.SC_NOT_MODIFIED - - switch (entityType) { - case RangerPluginInfo.ENTITY_TYPE_POLICIES: - isTagVersionResetNeeded = rangerDaoManager.getXXService().findAssociatedTagService(pluginInfo.getServiceName()) == null; - break; - case RangerPluginInfo.ENTITY_TYPE_TAGS: - isTagVersionResetNeeded = false; - break; - case RangerPluginInfo.ENTITY_TYPE_ROLES: - isTagVersionResetNeeded = false; - break; - case RangerPluginInfo.ENTITY_TYPE_USERSTORE: - isTagVersionResetNeeded = false; - break; - default: - isTagVersionResetNeeded = false; - break; - } + if (!pluginActivityAuditLogNotModified) { + logger.debug("Not logging HttpServletResponse. SC_NOT_MODIFIED. To enable, set configuration: {}=true", PROP_PLUGIN_ACTIVITY_AUDIT_NOT_MODIFIED); - commitWork = new Runnable() { - @Override - public void run() { - doCreateOrUpdateXXPluginInfo(pluginInfo, entityType, isTagVersionResetNeeded, clusterName); + commitWork = null; + } else { + // Create or update PluginInfo record after transaction is completed. If it is created in-line here + // then the TransactionManager will roll-back the changes because the HTTP return code is + // HttpServletResponse.SC_NOT_MODIFIED + + switch (entityType) { + case RangerPluginInfo.ENTITY_TYPE_POLICIES: + isTagVersionResetNeeded = rangerDaoManager.getXXService().findAssociatedTagService(pluginInfo.getServiceName()) == null; + break; + case RangerPluginInfo.ENTITY_TYPE_TAGS: + isTagVersionResetNeeded = false; + break; + case RangerPluginInfo.ENTITY_TYPE_ROLES: + isTagVersionResetNeeded = false; + break; + case RangerPluginInfo.ENTITY_TYPE_USERSTORE: + isTagVersionResetNeeded = false; + break; + default: + isTagVersionResetNeeded = false; + break; } - }; + + commitWork = new Runnable() { + @Override + public void run() { + doCreateOrUpdateXXPluginInfo(pluginInfo, entityType, isTagVersionResetNeeded, clusterName); + } + }; + } } else if (httpCode == HttpServletResponse.SC_NOT_FOUND) { if ((isPolicyDownloadRequest(entityType) && (pluginInfo.getPolicyActiveVersion() == null || pluginInfo.getPolicyActiveVersion() == -1)) || (isTagDownloadRequest(entityType) && (pluginInfo.getTagActiveVersion() == null || pluginInfo.getTagActiveVersion() == -1)) @@ -820,10 +831,12 @@ public void run() { }; } - if (pluginActivityAuditCommitInline) { - transactionSynchronizationAdapter.executeOnTransactionCompletion(commitWork); - } else { - transactionSynchronizationAdapter.executeAsyncOnTransactionComplete(commitWork); + if (commitWork != null) { + if (pluginActivityAuditCommitInline) { + transactionSynchronizationAdapter.executeOnTransactionCompletion(commitWork); + } else { + transactionSynchronizationAdapter.executeAsyncOnTransactionComplete(commitWork); + } } if (logger.isDebugEnabled()) { diff --git a/security-admin/src/main/java/org/apache/ranger/service/RangerTransactionService.java b/security-admin/src/main/java/org/apache/ranger/service/RangerTransactionService.java index 49d07fcd08..0e7ae7daa1 100644 --- a/security-admin/src/main/java/org/apache/ranger/service/RangerTransactionService.java +++ b/security-admin/src/main/java/org/apache/ranger/service/RangerTransactionService.java @@ -19,6 +19,7 @@ package org.apache.ranger.service; +import org.apache.ranger.authorization.hadoop.config.RangerAdminConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -35,22 +36,41 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import static java.util.concurrent.TimeUnit.MILLISECONDS; @Service public class RangerTransactionService { + private static final String PROP_THREADPOOL_SIZE = "ranger.admin.transaction.service.threadpool.size"; + private static final String PROP_SUMMARY_LOG_INTERVAL_SEC = "ranger.admin.transaction.service.summary.log.interval.sec"; + @Autowired @Qualifier(value = "transactionManager") PlatformTransactionManager txManager; private static final Logger LOG = LoggerFactory.getLogger(RangerTransactionService.class); - private ScheduledExecutorService scheduler = null; + private ScheduledExecutorService scheduler = null; + private AtomicLong scheduledTaskCount = new AtomicLong(0); + private AtomicLong executedTaskCount = new AtomicLong(0); + private AtomicLong failedTaskCount = new AtomicLong(0); + private long summaryLogIntervalMs = 5 * 60 * 1000; + private long nextLogSummaryTime = System.currentTimeMillis() + summaryLogIntervalMs; @PostConstruct public void init() { - scheduler = Executors.newScheduledThreadPool(1); + RangerAdminConfig config = RangerAdminConfig.getInstance(); + + int numOfThreads = config.getInt(PROP_THREADPOOL_SIZE, 1); + long summaryLogIntervalSec = config.getInt(PROP_SUMMARY_LOG_INTERVAL_SEC, 5 * 60); + + scheduler = Executors.newScheduledThreadPool(numOfThreads); + summaryLogIntervalMs = summaryLogIntervalSec * 1000; + nextLogSummaryTime = System.currentTimeMillis() + summaryLogIntervalSec; + + LOG.info("{}={}", PROP_THREADPOOL_SIZE, numOfThreads); + LOG.info("{}={}", PROP_SUMMARY_LOG_INTERVAL_SEC, summaryLogIntervalSec); } @PreDestroy @@ -59,6 +79,8 @@ public void destroy() { LOG.info("attempt to shutdown RangerTransactionService"); scheduler.shutdown(); scheduler.awaitTermination(5, TimeUnit.SECONDS); + + logSummary(); } catch (InterruptedException e) { LOG.error("RangerTransactionService tasks interrupted"); @@ -90,16 +112,47 @@ public Object doInTransaction(TransactionStatus status) { } }); } catch (Exception e) { + failedTaskCount.getAndIncrement(); + LOG.error("Failed to commit TransactionService transaction", e); LOG.error("Ignoring..."); + } finally { + executedTaskCount.getAndIncrement(); + logSummaryIfNeeded(); } } } }, delayInMillis, MILLISECONDS); + + scheduledTaskCount.getAndIncrement(); + + logSummaryIfNeeded(); } catch (Exception e) { LOG.error("Failed to schedule TransactionService transaction:", e); LOG.error("Ignroing..."); } } + private void logSummaryIfNeeded() { + long now = System.currentTimeMillis(); + + if (summaryLogIntervalMs > 0 && now > nextLogSummaryTime) { + synchronized (this) { + if (now > nextLogSummaryTime) { + nextLogSummaryTime = now + summaryLogIntervalMs; + + logSummary(); + } + } + } + } + + private void logSummary() { + long scheduled = scheduledTaskCount.get(); + long executed = executedTaskCount.get(); + long failed = failedTaskCount.get(); + long pending = scheduled - executed; + + LOG.info("RangerTransactionService: tasks(scheduled={}, executed={}, failed={}, pending={})", scheduled, executed, failed, pending); + } } \ No newline at end of file