Skip to content

Commit

Permalink
[update][plugin][paimonwriter] format to meet code style
Browse files Browse the repository at this point in the history
  • Loading branch information
wgzhao committed Jan 24, 2025
1 parent 4fdc864 commit 88e4900
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,24 @@
import static com.wgzhao.addax.common.base.Key.KERBEROS_KEYTAB_FILE_PATH;
import static com.wgzhao.addax.common.base.Key.KERBEROS_PRINCIPAL;


public class PaimonWriter extends Writer {
public class PaimonWriter
extends Writer
{
public static class Job
extends Writer.Job {
extends Writer.Job
{
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration conf = null;
private BatchWriteBuilder writeBuilder = null;

@Override
public void init() {
public void init()
{
this.conf = this.getPluginJobConf();

Options options = PaimonHelper.getOptions(this.conf);
CatalogContext context = PaimonHelper.getCatalogContext(options);



if ("kerberos".equals(options.get("hadoop.security.authentication"))) {
String kerberosKeytabFilePath = options.get(KERBEROS_KEYTAB_FILE_PATH);
String kerberosPrincipal = options.get(KERBEROS_PRINCIPAL);
Expand All @@ -58,7 +59,8 @@ public void init() {

FileSystem fs = FileSystem.get(context.hadoopConf());
fs.getStatus().getCapacity();
} catch (Exception e) {
}
catch (Exception e) {
LOG.error("kerberos Authentication error", e);
throw new RuntimeException(e);
}
Expand All @@ -72,18 +74,16 @@ public void init() {
Table table = catalog.getTable(identifier);

writeBuilder = table.newBatchWriteBuilder();


} catch (Exception e) {
}
catch (Exception e) {
LOG.error("init paimon error", e);
throw new RuntimeException(e);
}


}

@Override
public List<Configuration> split(int mandatoryNumber) {
public List<Configuration> split(int mandatoryNumber)
{
List<Configuration> configurations = new ArrayList<>(mandatoryNumber);
for (int i = 0; i < mandatoryNumber; i++) {
configurations.add(conf);
Expand All @@ -92,7 +92,8 @@ public List<Configuration> split(int mandatoryNumber) {
}

@Override
public void prepare() {
public void prepare()
{
String writeMode = this.conf.getString("writeMode");
if ("truncate".equalsIgnoreCase(writeMode)) {
if (writeBuilder != null) {
Expand All @@ -102,7 +103,8 @@ public void prepare() {
commit.commit(new ArrayList<>());
try {
write.close();
} catch (Exception e) {
}
catch (Exception e) {
LOG.error("close paimon write error", e);
throw new RuntimeException(e);
}
Expand All @@ -111,13 +113,15 @@ public void prepare() {
}

@Override
public void destroy() {
public void destroy()
{

}
}

public static class Task
extends Writer.Task {
extends Writer.Task
{

private static final Logger log = LoggerFactory.getLogger(Task.class);
private Configuration conf = null;
Expand All @@ -127,7 +131,8 @@ public static class Task
private List<DataType> typeList = new ArrayList<>();

@Override
public void startWrite(RecordReceiver recordReceiver) {
public void startWrite(RecordReceiver recordReceiver)
{

List<Record> writerBuffer = new ArrayList<>(this.batchSize);
Record record;
Expand All @@ -148,11 +153,11 @@ public void startWrite(RecordReceiver recordReceiver) {
String msg = String.format("task end, write size :%d", total);
getTaskPluginCollector().collectMessage("writeSize", String.valueOf(total));
log.info(msg);

}

@Override
public void init() {
public void init()
{
this.conf = super.getPluginJobConf();

batchSize = conf.getInt("batchSize", 1000);
Expand All @@ -166,7 +171,8 @@ public void init() {
try {
PaimonHelper.kerberosAuthentication(context.hadoopConf(), kerberosPrincipal, kerberosKeytabFilePath);
log.info("kerberos Authentication success");
} catch (Exception e) {
}
catch (Exception e) {
log.error("kerberos Authentication error", e);
throw new RuntimeException(e);
}
Expand All @@ -183,28 +189,29 @@ public void init() {
columnList = table.rowType().getFields();
typeList = table.rowType().getFieldTypes();
writeBuilder = table.newBatchWriteBuilder();


} catch (Exception e) {
}
catch (Exception e) {
log.error("init paimon error", e);
throw new RuntimeException(e);
}
}

@Override
public void destroy() {
public void destroy()
{

}

private long doBatchInsert(final List<Record> writerBuffer) {
private long doBatchInsert(final List<Record> writerBuffer)
{
BatchTableWrite write = writeBuilder.newWrite();
GenericRow data;
for (Record record : writerBuffer) {
data = new GenericRow(columnList.size());
StringBuilder id = new StringBuilder();
for (int i = 0; i < record.getColumnNumber(); i++) {
Column column = record.getColumn(i);
if(column ==null ){
if (column == null) {
continue;
}
if (i >= columnList.size()) {
Expand All @@ -216,23 +223,27 @@ private long doBatchInsert(final List<Record> writerBuffer) {
if (columnType.getTypeRoot().equals(DataTypeRoot.ARRAY)) {
if (null == column.asString()) {
data.setField(i, null);
} else {
}
else {
String[] dataList = column.asString().split(",");
data.setField(i, new GenericArray(dataList));
}
} else {
}
else {
switch (columnType.getTypeRoot()) {

case DATE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
try {
if(column.asLong()!=null) {
if (column.asLong() != null) {
data.setField(i, Timestamp.fromEpochMillis(column.asLong()));
} else {
}
else {
data.setField(i, null);
}
} catch (Exception e) {
}
catch (Exception e) {
getTaskPluginCollector().collectDirtyRecord(record, String.format("时间类型解析失败 [%s:%s] exception: %s", columnName, column.toString(), e));
}
break;
Expand All @@ -253,24 +264,26 @@ private long doBatchInsert(final List<Record> writerBuffer) {
case INTEGER:
case SMALLINT:
case TINYINT:
data.setField(i, column.asBigInteger()==null?null:column.asBigInteger().intValue());
data.setField(i, column.asBigInteger() == null ? null : column.asBigInteger().intValue());
break;
case FLOAT:
case DOUBLE:

data.setField(i, column.asDouble());
break;
case DECIMAL:
if(column.asBigDecimal()!=null) {
if (column.asBigDecimal() != null) {
data.setField(i, Decimal.fromBigDecimal(column.asBigDecimal(), ((DecimalType) columnType).getPrecision(), ((DecimalType) columnType).getScale()));
} else {
}
else {
data.setField(i, null);
}
break;
case MAP:
try {
data.setField(i, new GenericMap(JSON.parseObject(column.asString(), Map.class)));
} catch (Exception e) {
}
catch (Exception e) {
getTaskPluginCollector().collectDirtyRecord(record, String.format("MAP类型解析失败 [%s:%s] exception: %s", columnName, column.toString(), e));
}
break;
Expand All @@ -282,10 +295,10 @@ private long doBatchInsert(final List<Record> writerBuffer) {

try {
write.write(data);
} catch (Exception e) {
}
catch (Exception e) {
throw new RuntimeException(e);
}

}

List<CommitMessage> messages = null;
Expand All @@ -294,16 +307,13 @@ private long doBatchInsert(final List<Record> writerBuffer) {
BatchTableCommit commit = writeBuilder.newCommit();
commit.commit(messages);


write.close();

return messages.size();
} catch (Exception e) {
}
catch (Exception e) {
throw new RuntimeException(e);
}


}

}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{
"name": "paimonwriter",
"parameter": {

"dbName": "test",
"tableName": "test",
"writeMode": "truncate",
Expand Down

0 comments on commit 88e4900

Please sign in to comment.