Skip to content

Commit

Permalink
[INLONG-5222][Manager][Agent][DataProxy] Add heartbeat mechanism for …
Browse files Browse the repository at this point in the history
…Inlong cluster (#5332)

* Add heartbeat mechanism for Inlong component cluster

* Fix review comments, add inCharges field for heartbeat msg

Co-authored-by: healchow <[email protected]>
  • Loading branch information
kipshi and healchow authored Aug 4, 2022
1 parent aa2f8f2 commit a992b98
Show file tree
Hide file tree
Showing 70 changed files with 1,506 additions and 416 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@
=========
1.3 Third party Apache 2.0 Licenses
=========

1.3.1 codestyle/checkstyle.xml
Source : checkstyle 8.44 (Modified from src/main/resources/google_checks.xml)
License : https://github.com/checkstyle/checkstyle/blob/master/LICENSE.apache20
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public static JobProfile parseJsonFile(String fileName) {
@Override
public boolean allRequiredKeyExist() {
return hasKey(JobConstants.JOB_ID) && hasKey(JobConstants.JOB_SOURCE_CLASS)
&& hasKey(JobConstants.JOB_SINK) && hasKey(JobConstants.JOB_CHANNEL);
&& hasKey(JobConstants.JOB_SINK) && hasKey(JobConstants.JOB_CHANNEL)
&& hasKey(JobConstants.JOB_GROUP_ID) && hasKey(JobConstants.JOB_STREAM_ID);
}

public String toJsonStr() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ public class AgentConstants {
public static final int DEFAULT_JOB_NUMBER_LIMIT = 15;

public static final String AGENT_LOCAL_IP = "agent.local.ip";
public static final String DEFAULT_LOCAL_IP = "127.0.0.1";

public static final String AGENT_CLUSTER_NAME = "agent.cluster.name";
public static final String AGENT_CLUSTER_TAG = "agent.cluster.tag";
public static final String AGENT_CLUSTER_IN_CHARGES = "agent.cluster.inCharges";

public static final String AGENT_LOCAL_UUID = "agent.local.uuid";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ public class FetcherConstants {
public static final String AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH = "agent.manager.reportsnapshot.http.path";
public static final String DEFAULT_AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH = "/agent/reportSnapshot";

public static final String AGENT_MANAGER_HEARTBEAT_HTTP_PATH = "agent.manager.heartbeat.http.path";
public static final String DEFAULT_AGENT_MANAGER_HEARTBEAT_HTTP_PATH = "/heartbeat/report";

public static final String AGENT_HTTP_APPLICATION_JSON = "application/json";

public static final int AGENT_HTTP_SUCCESS_CODE = 200;

public static final String DEFAULT_LOCAL_IP = "127.0.0.1";

public static final String AGENT_MANAGER_RETURN_PARAM_IP = "ip";
public static final String AGENT_MANAGER_RETURN_PARAM_DATA = "data";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class JobConstants extends CommonConstants {
public static final String JOB_IP = "job.ip";
public static final String JOB_RETRY = "job.retry";
public static final String JOB_UUID = "job.uuid";
public static final String JOB_GROUP_ID = "job.groupId";
public static final String JOB_STREAM_ID = "job.streamId";

public static final String JOB_SOURCE_CLASS = "job.source";
public static final String JOB_SOURCE_TYPE = "job.sourceType";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,42 +167,44 @@ private static Proxy getProxy(DataConfig dataConfigs) {
/**
* convert DataConfig to TriggerProfile
*/
public static TriggerProfile convertToTriggerProfile(DataConfig dataConfigs) {
if (!dataConfigs.isValid()) {
throw new IllegalArgumentException("input dataConfig" + dataConfigs + "is invalid please check");
public static TriggerProfile convertToTriggerProfile(DataConfig dataConfig) {
if (!dataConfig.isValid()) {
throw new IllegalArgumentException("input dataConfig" + dataConfig + "is invalid please check");
}

JobProfileDto profileDto = new JobProfileDto();
Proxy proxy = getProxy(dataConfigs);
Proxy proxy = getProxy(dataConfig);
profileDto.setProxy(proxy);
Job job = new Job();

// common attribute
job.setId(String.valueOf(dataConfigs.getTaskId()));
job.setId(String.valueOf(dataConfig.getTaskId()));
job.setGroupId(dataConfig.getInlongGroupId());
job.setStreamId(dataConfig.getInlongStreamId());
job.setChannel(DEFAULT_CHANNEL);
job.setIp(dataConfigs.getIp());
job.setOp(dataConfigs.getOp());
job.setDeliveryTime(dataConfigs.getDeliveryTime());
job.setUuid(dataConfigs.getUuid());
job.setIp(dataConfig.getIp());
job.setOp(dataConfig.getOp());
job.setDeliveryTime(dataConfig.getDeliveryTime());
job.setUuid(dataConfig.getUuid());
job.setSink(DEFAULT_DATAPROXY_SINK);
job.setVersion(dataConfigs.getVersion());
TaskTypeEnum taskType = TaskTypeEnum.getTaskType(dataConfigs.getTaskType());
job.setVersion(dataConfig.getVersion());
TaskTypeEnum taskType = TaskTypeEnum.getTaskType(dataConfig.getTaskType());
switch (requireNonNull(taskType)) {
case SQL:
case BINLOG:
BinlogJob binlogJob = getBinlogJob(dataConfigs);
BinlogJob binlogJob = getBinlogJob(dataConfig);
job.setBinlogJob(binlogJob);
job.setSource(BINLOG_SOURCE);
profileDto.setJob(job);
break;
case FILE:
FileJob fileJob = getFileJob(dataConfigs);
FileJob fileJob = getFileJob(dataConfig);
job.setFileJob(fileJob);
job.setSource(DEFAULT_SOURCE);
profileDto.setJob(job);
break;
case KAFKA:
KafkaJob kafkaJob = getKafkaJob(dataConfigs);
KafkaJob kafkaJob = getKafkaJob(dataConfig);
job.setKafkaJob(kafkaJob);
job.setSource(KAFKA_SOURCE);
profileDto.setJob(job);
Expand All @@ -216,6 +218,8 @@ public static TriggerProfile convertToTriggerProfile(DataConfig dataConfigs) {
public static class Job {

private String id;
private String groupId;
private String streamId;
private String ip;
private String retry;
private String source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public abstract class AbstractStateWrapper implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStateWrapper.class);

private final Map<Pair<State, State>, StateCallback> callBacks = new HashMap<>();

private volatile State currentState = State.ACCEPTED;

public AbstractStateWrapper() {
Expand Down Expand Up @@ -88,4 +89,8 @@ public boolean isFatal() {
return State.FATAL.equals(tmpState) || State.KILLED.equals(tmpState);
}

public State getCurrentState() {
return currentState;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_UUID_OPEN;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_LOCAL_UUID_OPEN;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_ENABLE_OOM_EXIT;
import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_LOCAL_IP;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_LOCAL_IP;

/**
* Agent utils
Expand Down
2 changes: 2 additions & 0 deletions inlong-agent/agent-common/src/test/resources/binlogJob.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
"job.binlogJob.hostname": "0.0.0.0",
"job.binlogJob.tableWhiteList": "[\\s\\S]*.*",
"job.id": "5",
"job.groupId": "groupId",
"job.streamId": "streamId",
"job.binlogJob.user": "root",
"proxy.manager.port": "8000",
"job.binlogJob.port": "3306",
Expand Down
4 changes: 3 additions & 1 deletion inlong-agent/agent-common/src/test/resources/job.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
"source": "test.source",
"plugin": "test.plugin",
"sink": "test.sink",
"channel": "test.channel"
"channel": "test.channel",
"groupId": "groupId",
"streamId": "streamId"
}
}
4 changes: 3 additions & 1 deletion inlong-agent/agent-common/src/test/resources/job.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ job.name = test
job.plugin = test.plugin
job.source = test.source
job.sink = test.sink
job.channel = test.channel
job.channel = test.channel
job.groupId = groupId
job.streamId = streamId
Loading

0 comments on commit a992b98

Please sign in to comment.