diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java index 3be5b39de99..1fd2021679b 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java @@ -127,6 +127,24 @@ public enum HashKeyParseMode { "batch_size is used to control the size of a batch of data during read and write operations" + ",default 10"); + public static final Option VALUE_FIELD = + Options.key("value_field") + .stringType() + .noDefaultValue() + .withDescription("The field of value you want to write to redis, support string list set zset"); + + public static final Option HASH_KEY_FIELD = + Options.key("hash_key_field") + .stringType() + .noDefaultValue() + .withDescription("The field of hash key you want to write to redis"); + + public static final Option HASH_VALUE_FIELD = + Options.key("hash_value_field") + .stringType() + .noDefaultValue() + .withDescription("The field of hash value you want to write to redis"); + public enum Format { JSON, // TEXT will be supported later diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java index 3d7e954f1d1..48e9546077b 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java @@ -57,6 +57,9 @@ public class RedisParameters implements Serializable { private List redisNodes = Collections.emptyList(); private long expire = RedisConfig.EXPIRE.defaultValue(); private int batchSize = RedisConfig.BATCH_SIZE.defaultValue(); + private String valueField; + private String hashKeyField; + private String hashValueField; private int redisVersion; @@ -97,6 +100,18 @@ public void buildWithConfig(ReadonlyConfig config) { this.redisDataType = config.get(RedisConfig.DATA_TYPE); // Indicates the number of keys to attempt to return per iteration.default 10 this.batchSize = config.get(RedisConfig.BATCH_SIZE); + // set value field + if (config.getOptional(RedisConfig.VALUE_FIELD).isPresent()) { + this.valueField = config.get(RedisConfig.VALUE_FIELD); + } + // set hash key field + if (config.getOptional(RedisConfig.HASH_KEY_FIELD).isPresent()) { + this.hashKeyField = config.get(RedisConfig.HASH_KEY_FIELD); + } + // set hash value field + if (config.getOptional(RedisConfig.HASH_VALUE_FIELD).isPresent()) { + this.hashValueField = config.get(RedisConfig.HASH_VALUE_FIELD); + } } public RedisClient buildRedisClient() { diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java index f03c5c48c88..b9a9e843dd3 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType; @@ -29,13 +30,20 @@ import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException; import org.apache.seatunnel.format.json.JsonSerializationSchema; +import org.apache.commons.lang3.StringUtils; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class RedisSinkWriter extends AbstractSinkWriter implements SupportMultiTableSinkWriter { + private static final String REDIS_GROUP_DELIMITER = ":"; + private static final String LEFT_PLACEHOLDER_MARKER = "{"; + private static final String RIGHT_PLACEHOLDER_MARKER = "}"; private final SeaTunnelRowType seaTunnelRowType; private final RedisParameters redisParameters; private final SerializationSchema serializationSchema; @@ -60,23 +68,94 @@ public RedisSinkWriter(SeaTunnelRowType seaTunnelRowType, RedisParameters redisP @Override public void write(SeaTunnelRow element) throws IOException { - String data = new String(serializationSchema.serialize(element)); - String keyField = redisParameters.getKeyField(); List fields = Arrays.asList(seaTunnelRowType.getFieldNames()); - String key; - if (fields.contains(keyField)) { - key = element.getField(fields.indexOf(keyField)).toString(); - } else { - key = keyField; - } + String key = getKey(element, fields); keyBuffer.add(key); - valueBuffer.add(data); + String value = getValue(element, fields); + valueBuffer.add(value); if (keyBuffer.size() >= batchSize) { doBatchWrite(); clearBuffer(); } } + private String getKey(SeaTunnelRow element, List fields) { + String keyField = redisParameters.getKeyField(); + String[] keyFieldSegments = keyField.split(REDIS_GROUP_DELIMITER); + StringBuilder key = new StringBuilder(); + for (int i = 0; i < keyFieldSegments.length; i++) { + String keyFieldSegment = keyFieldSegments[i]; + if (keyFieldSegment.startsWith(LEFT_PLACEHOLDER_MARKER) + && keyFieldSegment.endsWith(RIGHT_PLACEHOLDER_MARKER)) { + String realKeyField = keyFieldSegment.substring(1, keyFieldSegment.length() - 1); + if (fields.contains(realKeyField)) { + key.append(element.getField(fields.indexOf(realKeyField)).toString()); + } else { + key.append(keyFieldSegment); + } + } else { + key.append(keyFieldSegment); + } + if (i != keyFieldSegments.length - 1) { + key.append(REDIS_GROUP_DELIMITER); + } + } + return key.toString(); + } + + private String getValue(SeaTunnelRow element, List fields) { + String value; + RedisDataType redisDataType = redisParameters.getRedisDataType(); + if (RedisDataType.HASH.equals(redisDataType)) { + value = handleHashType(element, fields); + } else { + value = handleOtherTypes(element, fields); + } + if (StringUtils.isEmpty(value)) { + byte[] serialize = serializationSchema.serialize(element); + value = new String(serialize); + } + return value; + } + + private String handleHashType(SeaTunnelRow element, List fields) { + String hashKeyField = redisParameters.getHashKeyField(); + String hashValueField = redisParameters.getHashValueField(); + if (StringUtils.isEmpty(hashKeyField)) { + return ""; + } + String hashKey; + if (fields.contains(hashKeyField)) { + hashKey = element.getField(fields.indexOf(hashKeyField)).toString(); + } else { + hashKey = hashKeyField; + } + String hashValue; + if (StringUtils.isEmpty(hashValueField)) { + hashValue = new String(serializationSchema.serialize(element)); + } else { + if (fields.contains(hashValueField)) { + hashValue = element.getField(fields.indexOf(hashValueField)).toString(); + } else { + hashValue = hashValueField; + } + } + Map kvMap = new HashMap<>(); + kvMap.put(hashKey, hashValue); + return JsonUtils.toJsonString(kvMap); + } + + private String handleOtherTypes(SeaTunnelRow element, List fields) { + String valueField = redisParameters.getValueField(); + if (StringUtils.isEmpty(valueField)) { + return ""; + } + if (fields.contains(valueField)) { + return element.getField(fields.indexOf(valueField)).toString(); + } + return valueField; + } + private void clearBuffer() { keyBuffer.clear(); valueBuffer.clear();