Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector-Redis] Redis support custom key and value #7888

Open
wants to merge 11 commits into
base: dev
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,24 @@ public enum HashKeyParseMode {
"batch_size is used to control the size of a batch of data during read and write operations"
+ ",default 10");

public static final Option<String> VALUE_FIELD =
Options.key("value_field")
.stringType()
.noDefaultValue()
.withDescription("The field of value you want to write to redis, support string list set zset");

public static final Option<String> HASH_KEY_FIELD =
Options.key("hash_key_field")
.stringType()
.noDefaultValue()
.withDescription("The field of hash key you want to write to redis");

public static final Option<String> HASH_VALUE_FIELD =
Options.key("hash_value_field")
.stringType()
.noDefaultValue()
.withDescription("The field of hash value you want to write to redis");

public enum Format {
JSON,
// TEXT will be supported later
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class RedisParameters implements Serializable {
private List<String> redisNodes = Collections.emptyList();
private long expire = RedisConfig.EXPIRE.defaultValue();
private int batchSize = RedisConfig.BATCH_SIZE.defaultValue();
private String valueField;
private String hashKeyField;
private String hashValueField;

private int redisVersion;

Expand Down Expand Up @@ -97,6 +100,18 @@ public void buildWithConfig(ReadonlyConfig config) {
this.redisDataType = config.get(RedisConfig.DATA_TYPE);
// Indicates the number of keys to attempt to return per iteration.default 10
this.batchSize = config.get(RedisConfig.BATCH_SIZE);
// set value field
if (config.getOptional(RedisConfig.VALUE_FIELD).isPresent()) {
this.valueField = config.get(RedisConfig.VALUE_FIELD);
}
// set hash key field
if (config.getOptional(RedisConfig.HASH_KEY_FIELD).isPresent()) {
this.hashKeyField = config.get(RedisConfig.HASH_KEY_FIELD);
}
// set hash value field
if (config.getOptional(RedisConfig.HASH_VALUE_FIELD).isPresent()) {
this.hashValueField = config.get(RedisConfig.HASH_VALUE_FIELD);
}
}

public RedisClient buildRedisClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,28 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
import org.apache.seatunnel.format.json.JsonSerializationSchema;

import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class RedisSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {
private static final String REDIS_GROUP_DELIMITER = ":";
private static final String LEFT_PLACEHOLDER_MARKER = "{";
private static final String RIGHT_PLACEHOLDER_MARKER = "}";
private final SeaTunnelRowType seaTunnelRowType;
private final RedisParameters redisParameters;
private final SerializationSchema serializationSchema;
Expand All @@ -60,23 +68,94 @@ public RedisSinkWriter(SeaTunnelRowType seaTunnelRowType, RedisParameters redisP

@Override
public void write(SeaTunnelRow element) throws IOException {
String data = new String(serializationSchema.serialize(element));
String keyField = redisParameters.getKeyField();
List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames());
String key;
if (fields.contains(keyField)) {
key = element.getField(fields.indexOf(keyField)).toString();
} else {
key = keyField;
}
String key = getKey(element, fields);
keyBuffer.add(key);
valueBuffer.add(data);
String value = getValue(element, fields);
valueBuffer.add(value);
if (keyBuffer.size() >= batchSize) {
doBatchWrite();
clearBuffer();
}
}

private String getKey(SeaTunnelRow element, List<String> fields) {
String keyField = redisParameters.getKeyField();
String[] keyFieldSegments = keyField.split(REDIS_GROUP_DELIMITER);
StringBuilder key = new StringBuilder();
for (int i = 0; i < keyFieldSegments.length; i++) {
String keyFieldSegment = keyFieldSegments[i];
if (keyFieldSegment.startsWith(LEFT_PLACEHOLDER_MARKER)
&& keyFieldSegment.endsWith(RIGHT_PLACEHOLDER_MARKER)) {
String realKeyField = keyFieldSegment.substring(1, keyFieldSegment.length() - 1);
if (fields.contains(realKeyField)) {
key.append(element.getField(fields.indexOf(realKeyField)).toString());
} else {
key.append(keyFieldSegment);
}
} else {
key.append(keyFieldSegment);
}
if (i != keyFieldSegments.length - 1) {
key.append(REDIS_GROUP_DELIMITER);
}
}
return key.toString();
}

private String getValue(SeaTunnelRow element, List<String> fields) {
String value;
RedisDataType redisDataType = redisParameters.getRedisDataType();
if (RedisDataType.HASH.equals(redisDataType)) {
value = handleHashType(element, fields);
} else {
value = handleOtherTypes(element, fields);
}
if (StringUtils.isEmpty(value)) {
byte[] serialize = serializationSchema.serialize(element);
value = new String(serialize);
}
return value;
}

private String handleHashType(SeaTunnelRow element, List<String> fields) {
String hashKeyField = redisParameters.getHashKeyField();
String hashValueField = redisParameters.getHashValueField();
if (StringUtils.isEmpty(hashKeyField)) {
return "";
}
String hashKey;
if (fields.contains(hashKeyField)) {
hashKey = element.getField(fields.indexOf(hashKeyField)).toString();
} else {
hashKey = hashKeyField;
}
String hashValue;
if (StringUtils.isEmpty(hashValueField)) {
hashValue = new String(serializationSchema.serialize(element));
} else {
if (fields.contains(hashValueField)) {
hashValue = element.getField(fields.indexOf(hashValueField)).toString();
} else {
hashValue = hashValueField;
}
}
Map<String, String> kvMap = new HashMap<>();
kvMap.put(hashKey, hashValue);
return JsonUtils.toJsonString(kvMap);
}

private String handleOtherTypes(SeaTunnelRow element, List<String> fields) {
String valueField = redisParameters.getValueField();
if (StringUtils.isEmpty(valueField)) {
return "";
}
if (fields.contains(valueField)) {
return element.getField(fields.indexOf(valueField)).toString();
}
return valueField;
}

private void clearBuffer() {
keyBuffer.clear();
valueBuffer.clear();
Expand Down
Loading