Skip to content

Commit

Permalink
[INLONG-11752][Agent] Modify the default collection range of data (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang authored Feb 13, 2025
1 parent bbddce4 commit 82cc76e
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ public class AgentConstants {
public static final String AGENT_ENABLE_OOM_EXIT = "agent.enable.oom.exit";
public static final boolean DEFAULT_ENABLE_OOM_EXIT = false;

public static final String AGENT_SCAN_RANGE = "agent.scan.range";
public static final String DEFAULT_AGENT_SCAN_RANGE = "-2";
public static final String DEFAULT_AGENT_SCAN_RANGE_DAY = "-2";
public static final String DEFAULT_AGENT_SCAN_RANGE_HOUR = "-10";
public static final String DEFAULT_AGENT_SCAN_RANGE_MINUTE = "-600";

// pulsar sink config
public static final String PULSAR_CLIENT_IO_TREHAD_NUM = "agent.sink.pulsar.client.io.thread.num";
public static final int DEFAULT_PULSAR_CLIENT_IO_TREHAD_NUM = Math.max(1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ public class InstanceManager extends AbstractDaemon {
public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000;
public static final int INSTANCE_PRINT_INTERVAL_MS = 10000;
public static final long INSTANCE_KEEP_ALIVE_MS = 5 * 60 * 1000;
public static final long KEEP_PACE_INTERVAL_MS = 60 * 1000;
private long lastPrintTime = 0;
private long lastTraverseTime = 0;
// instance in instance store
private final InstanceStore instanceStore;
private TaskStore taskStore;
Expand All @@ -67,7 +69,7 @@ public class InstanceManager extends AbstractDaemon {
private final BlockingQueue<InstanceAction> actionQueue;
private final BlockingQueue<InstanceAction> addActionQueue;
// task thread pool;
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(
private final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
1L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
Expand All @@ -77,10 +79,8 @@ public class InstanceManager extends AbstractDaemon {
private final AgentConfiguration agentConf;
private final String taskId;
private long auditVersion;
private volatile boolean runAtLeastOneTime = false;
private volatile boolean running = false;
private final double reserveCoefficient = 0.8;
private long finishedInstanceCount = 0;

private class InstancePrintStat {

Expand Down Expand Up @@ -165,12 +165,16 @@ private Runnable coreThread() {
Thread.currentThread().setName("instance-manager-core-" + taskId);
running = true;
while (isRunnable()) {
long currentTime = AgentUtils.getCurrentTime();
try {
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
printInstanceState();
dealWithActionQueue();
dealWithAddActionQueue();
keepPaceWithStore();
if (currentTime - lastTraverseTime > KEEP_PACE_INTERVAL_MS) {
keepPaceWithStore();
lastTraverseTime = currentTime;
}
String inlongGroupId = taskFromStore.getInlongGroupId();
String inlongStreamId = taskFromStore.getInlongStreamId();
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT, inlongGroupId, inlongStreamId,
Expand All @@ -179,7 +183,6 @@ private Runnable coreThread() {
LOGGER.error("coreThread error: ", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
}
runAtLeastOneTime = true;
}
running = false;
};
Expand Down Expand Up @@ -356,7 +359,6 @@ private void finishInstance(InstanceProfile profile) {
deleteFromMemory(profile.getInstanceId());
LOGGER.info("finished instance state {} taskId {} instanceId {}", profile.getState(),
profile.getTaskId(), profile.getInstanceId());
finishedInstanceCount++;
}

private void deleteInstance(String instanceId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.inlong.agent.core.task;

import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.store.InstanceStore;
Expand Down Expand Up @@ -50,8 +52,8 @@ public class OffsetManager extends AbstractDaemon {

private static final Logger LOGGER = LoggerFactory.getLogger(OffsetManager.class);
public static final int CORE_THREAD_SLEEP_TIME = 60 * 1000;
public static final int CLEAN_INSTANCE_ONCE_LIMIT = 100;
public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3";
public static final int CLEAN_INSTANCE_ONCE_LIMIT = 1000;
public static final long TWO_HOUR_TIMEOUT_INTERVAL = 2 * 3600 * 1000;
private static volatile OffsetManager offsetManager = null;
private final OffsetStore offsetStore;
private final InstanceStore instanceStore;
Expand Down Expand Up @@ -161,8 +163,7 @@ private void cleanDbInstance() {
}
}
}
long expireTime = DateTransUtils.calcOffset(
DB_INSTANCE_EXPIRE_CYCLE_COUNT + instanceFromDb.getCycleUnit());
long expireTime = Math.abs(getScanCycleRange(instanceFromDb.getCycleUnit())) + TWO_HOUR_TIMEOUT_INTERVAL;
if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() > expireTime) {
cleanCount.getAndIncrement();
LOGGER.info("instance has expired, delete from instance store dataTime {} taskId {} instanceId {}",
Expand All @@ -189,4 +190,26 @@ public void start() throws Exception {
public void stop() throws Exception {

}

public static long getScanCycleRange(String cycleUnit) {
if (AgentConfiguration.getAgentConf().hasKey(AgentConstants.AGENT_SCAN_RANGE)) {
String range = AgentConfiguration.getAgentConf().get(AgentConstants.AGENT_SCAN_RANGE);
return DateTransUtils.calcOffset(range + cycleUnit);
}
switch (cycleUnit) {
case AgentUtils.DAY: {
return DateTransUtils.calcOffset(AgentConstants.DEFAULT_AGENT_SCAN_RANGE_DAY + cycleUnit);
}
case AgentUtils.HOUR:
case AgentUtils.HOUR_LOW_CASE: {
return DateTransUtils.calcOffset(AgentConstants.DEFAULT_AGENT_SCAN_RANGE_HOUR + cycleUnit);
}
case AgentUtils.MINUTE: {
return DateTransUtils.calcOffset(AgentConstants.DEFAULT_AGENT_SCAN_RANGE_MINUTE + cycleUnit);
}
default: {
return DateTransUtils.calcOffset(AgentConstants.DEFAULT_AGENT_SCAN_RANGE + cycleUnit);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.inlong.agent.message.file.SenderMessage;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.sinks.filecollect.SenderManager;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;

Expand Down Expand Up @@ -66,7 +65,7 @@ public class ProxySink extends AbstractSink {
new SynchronousQueue<>(),
new AgentThreadFactory("proxy-sink"));
private MessageFilter messageFilter;
private SenderManager senderManager;
private Sender sender;
private byte[] fieldSplitter;
private volatile boolean shutdown = false;
private volatile boolean running = false;
Expand Down Expand Up @@ -159,7 +158,7 @@ public void sendMessageFromCache() {
if (senderMessage == null) {
continue;
}
senderManager.sendBatch(senderMessage);
sender.sendBatch(senderMessage);
if (AgentUtils.getCurrentTime() - lastPrintTime > TimeUnit.SECONDS.toMillis(1)) {
lastPrintTime = AgentUtils.getCurrentTime();
LOGGER.info("send groupId {}, streamId {}, message size {}, taskId {}, "
Expand All @@ -178,9 +177,9 @@ public void init(InstanceProfile profile) {
StandardCharsets.UTF_8);
sourceName = profile.getInstanceId();
offsetManager = OffsetManager.getInstance();
senderManager = new SenderManager(profile, inlongGroupId, sourceName);
sender = new Sender(profile, inlongGroupId, sourceName);
try {
senderManager.Start();
sender.Start();
EXECUTOR_SERVICE.execute(coreThread());
EXECUTOR_SERVICE.execute(flushOffset());
inited = true;
Expand All @@ -200,7 +199,7 @@ public void destroy() {
}
Long start = AgentUtils.getCurrentTime();
shutdown = true;
senderManager.Stop();
sender.Stop();
LOGGER.info("destroy proxySink, wait for sender close {} ms instance {}", AgentUtils.getCurrentTime() - start,
profile.getInstanceId());
start = AgentUtils.getCurrentTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.sinks.filecollect;
package org.apache.inlong.agent.plugin.sinks;

import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
Expand Down Expand Up @@ -68,9 +68,9 @@
/**
* proxy client
*/
public class SenderManager {
public class Sender {

private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class);
private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance();
public static final int RESEND_QUEUE_WAIT_MS = 10;
// cache for group and sender list, share the map cross agent lifecycle.
Expand Down Expand Up @@ -112,7 +112,7 @@ public class SenderManager {
private static final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
private long auditVersion;

public SenderManager(InstanceProfile profile, String inlongGroupId, String sourcePath) {
public Sender(InstanceProfile profile, String inlongGroupId, String sourcePath) {
this.profile = profile;
auditVersion = Long.parseLong(profile.get(TASK_AUDIT_VERSION));
managerAddr = agentConf.get(AGENT_MANAGER_ADDR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.inlong.agent.plugin.task.logcollection;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.core.task.TaskAction;
import org.apache.inlong.agent.plugin.task.AbstractTask;
import org.apache.inlong.agent.plugin.utils.regex.DateUtils;
Expand All @@ -44,7 +45,7 @@
public abstract class LogAbstractTask extends AbstractTask {

private static final int INSTANCE_QUEUE_CAPACITY = 10;
public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
public static final long ONE_HOUR_TIMEOUT_INTERVAL = 3600 * 1000;
private static final Logger LOGGER = LoggerFactory.getLogger(LogAbstractTask.class);
protected boolean retry;
protected BlockingQueue<InstanceProfile> instanceQueue;
Expand Down Expand Up @@ -207,10 +208,13 @@ protected void removeTimeoutEvent(Map<String, Map<String, InstanceProfile>> even
for (Map.Entry<String, Map<String, InstanceProfile>> entry : eventMap.entrySet()) {
/* If the data time of the event is within 2 days before (after) the current time, it is valid */
String dataTime = entry.getKey();
if (!DateUtils.isValidCreationTime(dataTime, DAY_TIMEOUT_INTERVAL)) {
if (!DateUtils.isValidCreationTime(dataTime,
Math.abs(OffsetManager.getScanCycleRange(taskProfile.getCycleUnit()))
+ ONE_HOUR_TIMEOUT_INTERVAL)) {
/* Remove it from memory map. */
eventMap.remove(dataTime);
LOGGER.warn("remove too old event from event map. dataTime {}", dataTime);
LOGGER.warn("remove too old event from event map taskId {} dataTime {}", taskProfile.getTaskId(),
dataTime);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.agent.plugin.utils.regex;

import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.utils.DateTransUtils;

import org.slf4j.Logger;
Expand All @@ -29,7 +30,6 @@
public class Scanner {

private static final Logger LOGGER = LoggerFactory.getLogger(Scanner.class);
public static final String SCAN_CYCLE_RANCE = "-2";

public static class TimeRange {

Expand Down Expand Up @@ -87,9 +87,8 @@ public static TimeRange getTimeRange(long startTime, long endTime, String cycleU
boolean isRetry) {
if (!isRetry) {
long currentTime = System.currentTimeMillis();
// only scan two cycle, like two hours or two days
long offset = DateTransUtils.calcOffset(SCAN_CYCLE_RANCE + cycleUnit);
startTime = currentTime + offset + DateTransUtils.calcOffset(timeOffset);
startTime =
currentTime + OffsetManager.getScanCycleRange(cycleUnit) + DateTransUtils.calcOffset(timeOffset);
endTime = currentTime + DateTransUtils.calcOffset(timeOffset);
}
return new TimeRange(startTime, endTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.sinks.filecollect.TestSenderManager;
import org.apache.inlong.agent.plugin.sinks.filecollect.TestSender;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.enums.TaskStateEnum;

Expand All @@ -40,12 +40,12 @@ public class KafkaSinkTest {
private static MockSink kafkaSink;
private static InstanceProfile profile;
private static AgentBaseTestsHelper helper;
private static final ClassLoader LOADER = TestSenderManager.class.getClassLoader();
private static final ClassLoader LOADER = TestSender.class.getClassLoader();

@BeforeClass
public static void setUp() throws Exception {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
helper = new AgentBaseTestsHelper(TestSender.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile =
helper.getFileTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.sinks.filecollect.TestSenderManager;
import org.apache.inlong.agent.plugin.sinks.filecollect.TestSender;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.enums.TaskStateEnum;

Expand All @@ -40,12 +40,12 @@ public class PulsarSinkTest {
private static MockSink pulsarSink;
private static InstanceProfile profile;
private static AgentBaseTestsHelper helper;
private static final ClassLoader LOADER = TestSenderManager.class.getClassLoader();
private static final ClassLoader LOADER = TestSender.class.getClassLoader();

@BeforeClass
public static void setUp() throws Exception {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
helper = new AgentBaseTestsHelper(TestSender.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile =
helper.getFileTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.inlong.agent.message.file.OffsetAckInfo;
import org.apache.inlong.agent.message.file.SenderMessage;
import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
import org.apache.inlong.agent.plugin.sinks.Sender;
import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.enums.TaskStateEnum;
Expand Down Expand Up @@ -52,12 +53,12 @@
import java.util.concurrent.TimeUnit;

@RunWith(PowerMockRunner.class)
@PrepareForTest(SenderManager.class)
@PrepareForTest(Sender.class)
@PowerMockIgnore({"javax.management.*"})
public class TestSenderManager {
public class TestSender {

private static final Logger LOGGER = LoggerFactory.getLogger(TestSenderManager.class);
private static final ClassLoader LOADER = TestSenderManager.class.getClassLoader();
private static final Logger LOGGER = LoggerFactory.getLogger(TestSender.class);
private static final ClassLoader LOADER = TestSender.class.getClassLoader();
private static AgentBaseTestsHelper helper;
private static InstanceProfile profile;
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(
Expand All @@ -69,7 +70,7 @@ public class TestSenderManager {
@BeforeClass
public static void setup() {
String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
helper = new AgentBaseTestsHelper(TestSender.class.getName()).setupAgentHome();
String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
TaskProfile taskProfile =
helper.getFileTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D",
Expand All @@ -88,17 +89,17 @@ public void testNormalAck() {
List<MsgSendCallback> cbList = new ArrayList<>();
try {
profile.set(TaskConstants.INODE_INFO, FileDataUtils.getInodeInfo(profile.getInstanceId()));
SenderManager senderManager = PowerMockito.spy(new SenderManager(profile, "inlongGroupId", "sourceName"));
PowerMockito.doNothing().when(senderManager, "createMessageSender");
Sender sender = PowerMockito.spy(new Sender(profile, "inlongGroupId", "sourceName"));
PowerMockito.doNothing().when(sender, "createMessageSender");

PowerMockito.doAnswer(invocation -> {
MsgSendCallback cb = invocation.getArgument(0);
cbList.add(cb);
return null;
}).when(senderManager, "asyncSendByMessageSender", Mockito.any(),
}).when(sender, "asyncSendByMessageSender", Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyLong(), Mockito.any(),
Mockito.any(), Mockito.anyBoolean());
senderManager.Start();
sender.Start();
Long offset = 0L;
List<OffsetAckInfo> ackInfoListTotal = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Expand All @@ -112,7 +113,7 @@ public void testNormalAck() {
}
SenderMessage senderMessage = new SenderMessage("taskId", "instanceId", "groupId", "streamId", bodyList,
AgentUtils.getCurrentTime(), null, ackInfoList);
senderManager.sendBatch(senderMessage);
sender.sendBatch(senderMessage);
}
Assert.assertTrue(cbList.size() == 10);
for (int i = 0; i < 5; i++) {
Expand Down
Loading

0 comments on commit 82cc76e

Please sign in to comment.