diff --git a/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonWriter.java b/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonWriter.java index 79175abf9..ad514650b 100644 --- a/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonWriter.java +++ b/plugin/writer/paimonwriter/src/main/java/com/wgzhao/addax/plugin/writer/paimonwriter/PaimonWriter.java @@ -32,23 +32,24 @@ import static com.wgzhao.addax.common.base.Key.KERBEROS_KEYTAB_FILE_PATH; import static com.wgzhao.addax.common.base.Key.KERBEROS_PRINCIPAL; - -public class PaimonWriter extends Writer { +public class PaimonWriter + extends Writer +{ public static class Job - extends Writer.Job { + extends Writer.Job + { private static final Logger LOG = LoggerFactory.getLogger(Job.class); private Configuration conf = null; private BatchWriteBuilder writeBuilder = null; @Override - public void init() { + public void init() + { this.conf = this.getPluginJobConf(); Options options = PaimonHelper.getOptions(this.conf); CatalogContext context = PaimonHelper.getCatalogContext(options); - - if ("kerberos".equals(options.get("hadoop.security.authentication"))) { String kerberosKeytabFilePath = options.get(KERBEROS_KEYTAB_FILE_PATH); String kerberosPrincipal = options.get(KERBEROS_PRINCIPAL); @@ -58,7 +59,8 @@ public void init() { FileSystem fs = FileSystem.get(context.hadoopConf()); fs.getStatus().getCapacity(); - } catch (Exception e) { + } + catch (Exception e) { LOG.error("kerberos Authentication error", e); throw new RuntimeException(e); } @@ -72,18 +74,16 @@ public void init() { Table table = catalog.getTable(identifier); writeBuilder = table.newBatchWriteBuilder(); - - - } catch (Exception e) { + } + catch (Exception e) { LOG.error("init paimon error", e); throw new RuntimeException(e); } - - } @Override - public List split(int mandatoryNumber) { + public List split(int mandatoryNumber) + { List configurations = new ArrayList<>(mandatoryNumber); for (int i = 0; i < mandatoryNumber; i++) { configurations.add(conf); @@ -92,7 +92,8 @@ public List split(int mandatoryNumber) { } @Override - public void prepare() { + public void prepare() + { String writeMode = this.conf.getString("writeMode"); if ("truncate".equalsIgnoreCase(writeMode)) { if (writeBuilder != null) { @@ -102,7 +103,8 @@ public void prepare() { commit.commit(new ArrayList<>()); try { write.close(); - } catch (Exception e) { + } + catch (Exception e) { LOG.error("close paimon write error", e); throw new RuntimeException(e); } @@ -111,13 +113,15 @@ public void prepare() { } @Override - public void destroy() { + public void destroy() + { } } public static class Task - extends Writer.Task { + extends Writer.Task + { private static final Logger log = LoggerFactory.getLogger(Task.class); private Configuration conf = null; @@ -127,7 +131,8 @@ public static class Task private List typeList = new ArrayList<>(); @Override - public void startWrite(RecordReceiver recordReceiver) { + public void startWrite(RecordReceiver recordReceiver) + { List writerBuffer = new ArrayList<>(this.batchSize); Record record; @@ -148,11 +153,11 @@ public void startWrite(RecordReceiver recordReceiver) { String msg = String.format("task end, write size :%d", total); getTaskPluginCollector().collectMessage("writeSize", String.valueOf(total)); log.info(msg); - } @Override - public void init() { + public void init() + { this.conf = super.getPluginJobConf(); batchSize = conf.getInt("batchSize", 1000); @@ -166,7 +171,8 @@ public void init() { try { PaimonHelper.kerberosAuthentication(context.hadoopConf(), kerberosPrincipal, kerberosKeytabFilePath); log.info("kerberos Authentication success"); - } catch (Exception e) { + } + catch (Exception e) { log.error("kerberos Authentication error", e); throw new RuntimeException(e); } @@ -183,20 +189,21 @@ public void init() { columnList = table.rowType().getFields(); typeList = table.rowType().getFieldTypes(); writeBuilder = table.newBatchWriteBuilder(); - - - } catch (Exception e) { + } + catch (Exception e) { log.error("init paimon error", e); throw new RuntimeException(e); } } @Override - public void destroy() { + public void destroy() + { } - private long doBatchInsert(final List writerBuffer) { + private long doBatchInsert(final List writerBuffer) + { BatchTableWrite write = writeBuilder.newWrite(); GenericRow data; for (Record record : writerBuffer) { @@ -204,7 +211,7 @@ private long doBatchInsert(final List writerBuffer) { StringBuilder id = new StringBuilder(); for (int i = 0; i < record.getColumnNumber(); i++) { Column column = record.getColumn(i); - if(column ==null ){ + if (column == null) { continue; } if (i >= columnList.size()) { @@ -216,23 +223,27 @@ private long doBatchInsert(final List writerBuffer) { if (columnType.getTypeRoot().equals(DataTypeRoot.ARRAY)) { if (null == column.asString()) { data.setField(i, null); - } else { + } + else { String[] dataList = column.asString().split(","); data.setField(i, new GenericArray(dataList)); } - } else { + } + else { switch (columnType.getTypeRoot()) { case DATE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: try { - if(column.asLong()!=null) { + if (column.asLong() != null) { data.setField(i, Timestamp.fromEpochMillis(column.asLong())); - } else { + } + else { data.setField(i, null); } - } catch (Exception e) { + } + catch (Exception e) { getTaskPluginCollector().collectDirtyRecord(record, String.format("时间类型解析失败 [%s:%s] exception: %s", columnName, column.toString(), e)); } break; @@ -253,7 +264,7 @@ private long doBatchInsert(final List writerBuffer) { case INTEGER: case SMALLINT: case TINYINT: - data.setField(i, column.asBigInteger()==null?null:column.asBigInteger().intValue()); + data.setField(i, column.asBigInteger() == null ? null : column.asBigInteger().intValue()); break; case FLOAT: case DOUBLE: @@ -261,16 +272,18 @@ private long doBatchInsert(final List writerBuffer) { data.setField(i, column.asDouble()); break; case DECIMAL: - if(column.asBigDecimal()!=null) { + if (column.asBigDecimal() != null) { data.setField(i, Decimal.fromBigDecimal(column.asBigDecimal(), ((DecimalType) columnType).getPrecision(), ((DecimalType) columnType).getScale())); - } else { + } + else { data.setField(i, null); } break; case MAP: try { data.setField(i, new GenericMap(JSON.parseObject(column.asString(), Map.class))); - } catch (Exception e) { + } + catch (Exception e) { getTaskPluginCollector().collectDirtyRecord(record, String.format("MAP类型解析失败 [%s:%s] exception: %s", columnName, column.toString(), e)); } break; @@ -282,10 +295,10 @@ private long doBatchInsert(final List writerBuffer) { try { write.write(data); - } catch (Exception e) { + } + catch (Exception e) { throw new RuntimeException(e); } - } List messages = null; @@ -294,16 +307,13 @@ private long doBatchInsert(final List writerBuffer) { BatchTableCommit commit = writeBuilder.newCommit(); commit.commit(messages); - write.close(); return messages.size(); - } catch (Exception e) { + } + catch (Exception e) { throw new RuntimeException(e); } - - } - } } diff --git a/plugin/writer/paimonwriter/src/main/resources/plugin_job_template.json b/plugin/writer/paimonwriter/src/main/resources/plugin_job_template.json index 681c940f9..5ff5843f3 100644 --- a/plugin/writer/paimonwriter/src/main/resources/plugin_job_template.json +++ b/plugin/writer/paimonwriter/src/main/resources/plugin_job_template.json @@ -1,7 +1,6 @@ { "name": "paimonwriter", "parameter": { - "dbName": "test", "tableName": "test", "writeMode": "truncate",