From 603322da873fccaef0bf4e64025b4baeef4fd4cd Mon Sep 17 00:00:00 2001 From: wgzhao Date: Wed, 6 Dec 2023 17:24:27 +0800 Subject: [PATCH] [add][plugin] Add support for SAP HANA database (#977) response for #975 --- docs/assets/jobs/hanareader.json | 40 ++++++ docs/assets/jobs/hanawriter.json | 58 ++++++++ docs/reader/hanareader.md | 28 ++++ docs/writer/hanawriter.md | 40 ++++++ .../wgzhao/addax/rdbms/util/DataBaseType.java | 3 +- package.xml | 16 +++ plugin/reader/hanareader/package.xml | 44 ++++++ plugin/reader/hanareader/pom.xml | 67 +++++++++ .../plugin/reader/hanareader/HANAReader.java | 121 ++++++++++++++++ .../src/main/java/resources/plugin.json | 7 + .../java/resources/plugin_job_template.json | 19 +++ .../hanareader/src/main/resources/plugin.json | 7 + .../main/resources/plugin_job_template.json | 19 +++ plugin/writer/hanawriter/package.xml | 37 +++++ plugin/writer/hanawriter/pom.xml | 61 ++++++++ .../plugin/writer/hanawriter/HANAWriter.java | 130 ++++++++++++++++++ .../hanawriter/src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 22 +++ pom.xml | 2 + 19 files changed, 726 insertions(+), 1 deletion(-) create mode 100644 docs/assets/jobs/hanareader.json create mode 100644 docs/assets/jobs/hanawriter.json create mode 100644 docs/reader/hanareader.md create mode 100644 docs/writer/hanawriter.md create mode 100644 plugin/reader/hanareader/package.xml create mode 100644 plugin/reader/hanareader/pom.xml create mode 100644 plugin/reader/hanareader/src/main/java/com/wgzhao/addax/plugin/reader/hanareader/HANAReader.java create mode 100644 plugin/reader/hanareader/src/main/java/resources/plugin.json create mode 100644 plugin/reader/hanareader/src/main/java/resources/plugin_job_template.json create mode 100644 plugin/reader/hanareader/src/main/resources/plugin.json create mode 100644 plugin/reader/hanareader/src/main/resources/plugin_job_template.json create mode 100644 plugin/writer/hanawriter/package.xml create mode 100644 plugin/writer/hanawriter/pom.xml create mode 100644 plugin/writer/hanawriter/src/main/java/com/wgzhao/addax/plugin/writer/hanawriter/HANAWriter.java create mode 100644 plugin/writer/hanawriter/src/main/resources/plugin.json create mode 100644 plugin/writer/hanawriter/src/main/resources/plugin_job_template.json diff --git a/docs/assets/jobs/hanareader.json b/docs/assets/jobs/hanareader.json new file mode 100644 index 000000000..4cc8fc10e --- /dev/null +++ b/docs/assets/jobs/hanareader.json @@ -0,0 +1,40 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "hanareader", + "parameter": { + "column": [ + "*" + ], + "connection": [ + { + "jdbcUrl": [ + "jdbc:sap://wgzhao-pc:39017/system" + ], + "table": [ + "addax_tbl" + ] + } + ], + "username": "system", + "password": "HXEHana1" + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "print": true + } + } + } + ], + "setting": { + "speed": { + "bytes": -1, + "channel": 1 + } + } + } +} diff --git a/docs/assets/jobs/hanawriter.json b/docs/assets/jobs/hanawriter.json new file mode 100644 index 000000000..da0299ca7 --- /dev/null +++ b/docs/assets/jobs/hanawriter.json @@ -0,0 +1,58 @@ +{ + "job": { + "setting": { + "speed": { + "channel": 1, + "bytes": -1 + } + }, + "content": { + "reader": { + "name": "streamreader", + "parameter": { + "column": [ + { + "value": "Addax", + "type": "string" + }, + { + "value": 19880808, + "type": "long" + }, + { + "value": "1988-08-08 08:08:08", + "type": "date" + }, + { + "value": true, + "type": "bool" + }, + { + "value": "test", + "type": "bytes" + } + ], + "sliceRecordCount": 1000 + } + }, + "writer": { + "name": "hanawriter", + "parameter": { + "connection": [ + { + "jdbcUrl": "jdbc:sap://wgzhao-pc:39017/system", + "table": [ + "addax_tbl" + ] + } + ], + "username": "system", + "password": "HXEHana1", + "column": [ + "*" + ] + } + } + } + } +} diff --git a/docs/reader/hanareader.md b/docs/reader/hanareader.md new file mode 100644 index 000000000..8085f46ce --- /dev/null +++ b/docs/reader/hanareader.md @@ -0,0 +1,28 @@ +# HANA Reader + +HANAReader 插件实现了从 SAP HANA 读取数据的能力 + +## 示例 + + +下面的配置是读取该表到终端的作业: + +=== "job/hanareader.json" + + ```json + --8<-- "jobs/hanareader.json" + ``` + +将上述配置文件保存为 `job/hana2stream.json` + +### 执行采集命令 + +执行以下命令进行数据采集 + +```shell +bin/addax.sh job/hana2stream.json +``` + +## 参数说明 + +HANAReader 基于 [rdbmsreader](../rdbmsreader) 实现,因此可以参考 rdbmsreader 的所有配置项。 diff --git a/docs/writer/hanawriter.md b/docs/writer/hanawriter.md new file mode 100644 index 000000000..3ed14ca4d --- /dev/null +++ b/docs/writer/hanawriter.md @@ -0,0 +1,40 @@ +# HANA Writer + +HANAWriter 插件实现了写入数据到 [SAP HANA][1] 目的表的功能。 + +## 示例 + +假定要写入的 HANA 表建表语句如下: + +```sql +create table system.addax_tbl +( +col1 varchar(200) , +col2 int(4), +col3 date, +col4 boolean, +col5 clob +); +``` + +这里使用一份从内存产生到 HANA 导入的数据。 + +=== "job/hanawriter.json" + +```json +--8<-- "jobs/hanawriter.json" +``` + +将上述配置文件保存为 `job/hana2stream.json` + +### 执行采集命令 + +执行以下命令进行数据采集 + +```shell +bin/addax.sh job/hana2stream.json +``` + +## 参数说明 + +HANAWriter 基于 [rdbmswriter](../rdbmswriter) 实现,因此可以参考 rdbmswriter 的所有配置项。 diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/DataBaseType.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/DataBaseType.java index 97bc8e13f..caac6b37c 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/DataBaseType.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/DataBaseType.java @@ -49,7 +49,8 @@ public enum DataBaseType Trino("trino", "io.trino.jdbc.TrinoDriver"), Sybase("sybase", "com.sybase.jdbc4.jdbc.SybDriver"), Databend("databend", "com.databend.jdbc.DatabendDriver"), - Access("access","net.ucanaccess.jdbc.UcanaccessDriver"); + Access("access","net.ucanaccess.jdbc.UcanaccessDriver"), + HANA("hana", "com.sap.db.jdbc.Driver"); private static final Pattern jdbcUrlPattern = Pattern.compile("jdbc:\\w+:(?:thin:url=|//|thin:@|)([\\w\\d.,]+).*"); diff --git a/package.xml b/package.xml index e7d922080..f787ad819 100644 --- a/package.xml +++ b/package.xml @@ -136,6 +136,14 @@ 0644 addax-${project.version} + + plugin/reader/hanareader/target/hanareader-${project.version}/ + + **/*.* + + 0644 + addax-${project.version} + plugin/reader/hbase11xreader/target/hbase11xreader-${project.version}/ @@ -402,6 +410,14 @@ 0644 addax-${project.version} + + plugin/writer/hanawriter/target/hanawriter-${project.version}/ + + **/*.* + + 0644 + addax-${project.version} + plugin/writer/hbase11xsqlwriter/target/hbase11xsqlwriter-${project.version}/ diff --git a/plugin/reader/hanareader/package.xml b/plugin/reader/hanareader/package.xml new file mode 100644 index 000000000..0986ab310 --- /dev/null +++ b/plugin/reader/hanareader/package.xml @@ -0,0 +1,44 @@ + + release + + dir + + false + + + src/main/resources + + *.json + + plugin/reader/${project.artifactId} + + + target/ + + ${project.artifactId}-${project.version}.jar + + plugin/reader/${project.artifactId} + + + src/main/libs + + *.* + + plugin/reader/${project.artifactId}/libs + + + + + + false + plugin/reader/${project.artifactId}/libs + runtime + + com.wgzhao.addax:* + + + + diff --git a/plugin/reader/hanareader/pom.xml b/plugin/reader/hanareader/pom.xml new file mode 100644 index 000000000..858ee585f --- /dev/null +++ b/plugin/reader/hanareader/pom.xml @@ -0,0 +1,67 @@ + + + 4.0.0 + + addax-all + com.wgzhao.addax + 4.1.4-SNAPSHOT + ../../../pom.xml + + + hanareader + hanareader-reader + SAP HANA Reader + jar + + + + + com.wgzhao.addax + addax-common + ${project.version} + + + slf4j-log4j12 + org.slf4j + + + + + + com.wgzhao.addax + addax-rdbms + ${project.version} + + + + com.sap.cloud.db.jdbc + ngdbc + 2.18.16 + + + + + + + + maven-assembly-plugin + + + package.xml + + ${project.artifactId}-${project.version} + + + + release + package + + single + + + + + + + + diff --git a/plugin/reader/hanareader/src/main/java/com/wgzhao/addax/plugin/reader/hanareader/HANAReader.java b/plugin/reader/hanareader/src/main/java/com/wgzhao/addax/plugin/reader/hanareader/HANAReader.java new file mode 100644 index 000000000..b66079444 --- /dev/null +++ b/plugin/reader/hanareader/src/main/java/com/wgzhao/addax/plugin/reader/hanareader/HANAReader.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.wgzhao.addax.plugin.reader.hanareader; + +import com.wgzhao.addax.common.base.Key; +import com.wgzhao.addax.common.base.Constant; +import com.wgzhao.addax.common.plugin.RecordSender; +import com.wgzhao.addax.common.spi.Reader; +import com.wgzhao.addax.common.util.Configuration; +import com.wgzhao.addax.rdbms.reader.CommonRdbmsReader; +import com.wgzhao.addax.rdbms.util.DataBaseType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class HANAReader + extends Reader +{ + + private static final DataBaseType DATABASE_TYPE = DataBaseType.HANA; + + public static class Job + extends Reader.Job + { + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + + private Configuration originalConfig = null; + private CommonRdbmsReader.Job commonRdbmsReaderJob; + + @Override + public void init() + { + this.originalConfig = getPluginJobConf(); + + Integer fetchSize = this.originalConfig.getInt(Key.FETCH_SIZE, Constant.DEFAULT_FETCH_SIZE); + + this.originalConfig.set(Key.FETCH_SIZE, fetchSize); + + this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE); + this.originalConfig = this.commonRdbmsReaderJob.init(this.originalConfig); + } + + @Override + public void preCheck() + { + this.commonRdbmsReaderJob.preCheck(this.originalConfig, DATABASE_TYPE); + } + + @Override + public List split(int adviceNumber) + { + return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber); + } + + @Override + public void post() + { + this.commonRdbmsReaderJob.post(this.originalConfig); + } + + @Override + public void destroy() + { + this.commonRdbmsReaderJob.destroy(this.originalConfig); + } + } + + public static class Task + extends Reader.Task + { + + private Configuration readerSliceConfig; + private CommonRdbmsReader.Task commonRdbmsReaderTask; + + @Override + public void init() + { + this.readerSliceConfig = getPluginJobConf(); + this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE, getTaskGroupId(), getTaskId()); + this.commonRdbmsReaderTask.init(this.readerSliceConfig); + } + + @Override + public void startRead(RecordSender recordSender) + { + int fetchSize = this.readerSliceConfig.getInt(Key.FETCH_SIZE); + + this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender, getTaskPluginCollector(), fetchSize); + } + + @Override + public void post() + { + this.commonRdbmsReaderTask.post(this.readerSliceConfig); + } + + @Override + public void destroy() + { + this.commonRdbmsReaderTask.destroy(this.readerSliceConfig); + } + } +} diff --git a/plugin/reader/hanareader/src/main/java/resources/plugin.json b/plugin/reader/hanareader/src/main/java/resources/plugin.json new file mode 100644 index 000000000..bbf8ef574 --- /dev/null +++ b/plugin/reader/hanareader/src/main/java/resources/plugin.json @@ -0,0 +1,7 @@ +{ + "name": "rdbmsreader", + "class": "com.wgzhao.addax.plugin.reader.rdbmsreader.RdbmsReader", + "description": "general RDBMS reader plugin", + "developer": "alibaba", + "drivers": [] +} diff --git a/plugin/reader/hanareader/src/main/java/resources/plugin_job_template.json b/plugin/reader/hanareader/src/main/java/resources/plugin_job_template.json new file mode 100644 index 000000000..30d51033e --- /dev/null +++ b/plugin/reader/hanareader/src/main/java/resources/plugin_job_template.json @@ -0,0 +1,19 @@ +{ + "name": "rdbmsreader", + "parameter": { + "username": "", + "password": "", + "column": [], + "connection": [ + { + "jdbcUrl": [], + "table": [], + "driver": "" + } + ], + "where": "1=1", + "autoPk": false, + "fetchSize": 2048, + "splitPk": "" + } +} \ No newline at end of file diff --git a/plugin/reader/hanareader/src/main/resources/plugin.json b/plugin/reader/hanareader/src/main/resources/plugin.json new file mode 100644 index 000000000..406034a44 --- /dev/null +++ b/plugin/reader/hanareader/src/main/resources/plugin.json @@ -0,0 +1,7 @@ +{ + "name": "hanareader", + "class": "com.wgzhao.addax.plugin.reader.hanareader.HANAReader", + "description": "SAP HANA reader plugin", + "developer": "wgzhao", + "drivers": [] +} diff --git a/plugin/reader/hanareader/src/main/resources/plugin_job_template.json b/plugin/reader/hanareader/src/main/resources/plugin_job_template.json new file mode 100644 index 000000000..30d51033e --- /dev/null +++ b/plugin/reader/hanareader/src/main/resources/plugin_job_template.json @@ -0,0 +1,19 @@ +{ + "name": "rdbmsreader", + "parameter": { + "username": "", + "password": "", + "column": [], + "connection": [ + { + "jdbcUrl": [], + "table": [], + "driver": "" + } + ], + "where": "1=1", + "autoPk": false, + "fetchSize": 2048, + "splitPk": "" + } +} \ No newline at end of file diff --git a/plugin/writer/hanawriter/package.xml b/plugin/writer/hanawriter/package.xml new file mode 100644 index 000000000..8ecab9f9f --- /dev/null +++ b/plugin/writer/hanawriter/package.xml @@ -0,0 +1,37 @@ + + release + + dir + + false + + + src/main/resources + + *.json + + plugin/writer/${project.artifactId} + + + target/ + + ${project.artifactId}-${project.version}.jar + + plugin/writer/${project.artifactId} + + + + + + false + plugin/writer/${project.artifactId}/libs + runtime + + com.wgzhao.addax:* + + + + diff --git a/plugin/writer/hanawriter/pom.xml b/plugin/writer/hanawriter/pom.xml new file mode 100644 index 000000000..0a09f0dd1 --- /dev/null +++ b/plugin/writer/hanawriter/pom.xml @@ -0,0 +1,61 @@ + + 4.0.0 + + com.wgzhao.addax + addax-all + 4.1.4-SNAPSHOT + ../../../pom.xml + + hanawriter + hana-writer + SAP HANA Writer + jar + + + + com.wgzhao.addax + addax-common + ${project.version} + + + slf4j-log4j12 + org.slf4j + + + + + + com.wgzhao.addax + addax-rdbms + ${project.version} + + + + com.sap.cloud.db.jdbc + ngdbc + 2.18.16 + + + + + + maven-assembly-plugin + + + package.xml + + ${project.artifactId}-${project.version} + + + + release + package + + single + + + + + + + diff --git a/plugin/writer/hanawriter/src/main/java/com/wgzhao/addax/plugin/writer/hanawriter/HANAWriter.java b/plugin/writer/hanawriter/src/main/java/com/wgzhao/addax/plugin/writer/hanawriter/HANAWriter.java new file mode 100644 index 000000000..49e3639c1 --- /dev/null +++ b/plugin/writer/hanawriter/src/main/java/com/wgzhao/addax/plugin/writer/hanawriter/HANAWriter.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.wgzhao.addax.plugin.writer.hanawriter; + +import com.wgzhao.addax.common.base.Key; +import com.wgzhao.addax.common.element.Column; +import com.wgzhao.addax.common.plugin.RecordReceiver; +import com.wgzhao.addax.common.spi.Writer; +import com.wgzhao.addax.common.util.Configuration; +import com.wgzhao.addax.rdbms.util.DataBaseType; +import com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.util.List; + +public class HANAWriter + extends Writer +{ + private static final DataBaseType DATABASE_TYPE = DataBaseType.HANA; + + public static class Job + extends Writer.Job + { + private Configuration originalConfig = null; + private CommonRdbmsWriter.Job commonRdbmsWriterJob; + + @Override + public void preCheck() + { + this.commonRdbmsWriterJob.writerPreCheck(this.originalConfig, DATABASE_TYPE); + } + + @Override + public void init() + { + this.originalConfig = super.getPluginJobConf(); + this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE); + this.commonRdbmsWriterJob.init(this.originalConfig); + } + + @Override + public void prepare() + { + this.commonRdbmsWriterJob.prepare(this.originalConfig); + } + + @Override + public List split(int mandatoryNumber) + { + return this.commonRdbmsWriterJob.split(this.originalConfig, mandatoryNumber); + } + + @Override + public void post() + { + this.commonRdbmsWriterJob.post(this.originalConfig); + } + + @Override + public void destroy() + { + this.commonRdbmsWriterJob.destroy(this.originalConfig); + } + } + + public static class Task + extends Writer.Task + { + private Configuration writerSliceConfig; + private CommonRdbmsWriter.Task commonRdbmsWriterTask; + + @Override + public void init() + { + this.writerSliceConfig = super.getPluginJobConf(); + this.commonRdbmsWriterTask = new CommonRdbmsWriter.Task(DATABASE_TYPE); + this.commonRdbmsWriterTask.init(this.writerSliceConfig); + } + + @Override + public void prepare() + { + this.commonRdbmsWriterTask.prepare(this.writerSliceConfig); + } + + public void startWrite(RecordReceiver recordReceiver) + { + this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig, + super.getTaskPluginCollector()); + } + + @Override + public void post() + { + this.commonRdbmsWriterTask.post(this.writerSliceConfig); + } + + @Override + public void destroy() + { + this.commonRdbmsWriterTask.destroy(this.writerSliceConfig); + } + + @Override + public boolean supportFailOver() + { + String writeMode = writerSliceConfig.getString(Key.WRITE_MODE); + return "replace".equalsIgnoreCase(writeMode); + } + } +} diff --git a/plugin/writer/hanawriter/src/main/resources/plugin.json b/plugin/writer/hanawriter/src/main/resources/plugin.json new file mode 100644 index 000000000..c2544d46e --- /dev/null +++ b/plugin/writer/hanawriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "hanawriter", + "class": "com.wgzhao.addax.plugin.writer.hanawriter.HANAWriter", + "description": "Writer to SAP HANA database", + "developer": "wgzhao" +} \ No newline at end of file diff --git a/plugin/writer/hanawriter/src/main/resources/plugin_job_template.json b/plugin/writer/hanawriter/src/main/resources/plugin_job_template.json new file mode 100644 index 000000000..34819b179 --- /dev/null +++ b/plugin/writer/hanawriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,22 @@ +{ + "name": "hanawriter", + "parameter": { + "writeMode": "insert", + "username": "root", + "password": "", + "column": [ + "*" + ], + "preSql": [ + "delete from @table" + ], + "connection": [ + { + "jdbcUrl": "jdbc:sap://127.0.0.1:37019/test", + "table": [ + "addax_tbl" + ] + } + ] + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 5345600dc..cecc77281 100644 --- a/pom.xml +++ b/pom.xml @@ -403,6 +403,7 @@ plugin/reader/elasticsearchreader plugin/reader/excelreader plugin/reader/ftpreader + plugin/reader/hanareader plugin/reader/hbase11xreader plugin/reader/hbase11xsqlreader plugin/reader/hbase20xreader @@ -440,6 +441,7 @@ plugin/writer/excelwriter plugin/writer/ftpwriter plugin/writer/greenplumwriter + plugin/writer/hanawriter plugin/writer/hbase11xsqlwriter plugin/writer/hbase11xwriter plugin/writer/hbase20xsqlwriter