diff --git a/flink-end-to-end-tests/flink-quickstart-test-dummy-dependency/pom.xml b/flink-end-to-end-tests/flink-quickstart-test-dummy-dependency/pom.xml new file mode 100644 index 0000000000000..1805f3c02fa2b --- /dev/null +++ b/flink-end-to-end-tests/flink-quickstart-test-dummy-dependency/pom.xml @@ -0,0 +1,33 @@ + + + + + org.apache.flink + flink-end-to-end-tests + 1.17-SNAPSHOT + + 4.0.0 + + flink-quickstart-test-dummy-dependency + Flink : E2E Tests : Quickstart : Dummy-Dependency + jar + diff --git a/flink-end-to-end-tests/flink-quickstart-test-dummy-dependency/src/main/java/org/apache/flink/quickstarts/test/utils/Utils.java b/flink-end-to-end-tests/flink-quickstart-test-dummy-dependency/src/main/java/org/apache/flink/quickstarts/test/utils/Utils.java new file mode 100644 index 0000000000000..64f465c52d47e --- /dev/null +++ b/flink-end-to-end-tests/flink-quickstart-test-dummy-dependency/src/main/java/org/apache/flink/quickstarts/test/utils/Utils.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.quickstarts.test.utils; + +/** Dummy util to test packaging of Flink dependencies in quickstarts. */ +public class Utils { + public static String prefix(Long message) { + return "message #" + message; + } +} diff --git a/flink-end-to-end-tests/flink-quickstart-test/pom.xml b/flink-end-to-end-tests/flink-quickstart-test/pom.xml index 8e7d71c46355e..01c2c45212274 100644 --- a/flink-end-to-end-tests/flink-quickstart-test/pom.xml +++ b/flink-end-to-end-tests/flink-quickstart-test/pom.xml @@ -56,7 +56,7 @@ under the License. org.apache.flink - flink-connector-elasticsearch7 + flink-quickstart-test-dummy-dependency ${project.version} diff --git a/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstarts/test/Elasticsearch7SinkExample.java b/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstarts/test/Elasticsearch7SinkExample.java deleted file mode 100644 index 36fd2a90c0a3b..0000000000000 --- a/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstarts/test/Elasticsearch7SinkExample.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.quickstarts.test; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -import org.apache.http.HttpHost; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; - -import java.util.HashMap; -import java.util.Map; - -/** End to end test for Elasticsearch7Sink. */ -public class Elasticsearch7SinkExample { - - public static void main(String[] args) throws Exception { - - final ParameterTool parameterTool = ParameterTool.fromArgs(args); - - if (parameterTool.getNumberOfParameters() < 3) { - System.out.println( - "Missing parameters!\n" - + "Usage: --numRecords --index --type "); - return; - } - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(5000); - - DataStream source = - env.fromSequence(0, parameterTool.getInt("numRecords") - 1) - .map((MapFunction) value -> "message #" + value); - - source.sinkTo( - new Elasticsearch7SinkBuilder() - // This instructs the sink to emit after every element, otherwise they would - // be buffered - .setBulkFlushMaxActions(1) - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter( - (element, context, indexer) -> - indexer.add(createIndexRequest(element, parameterTool))) - .build()); - - env.execute("Elasticsearch7.x end to end sink test example"); - } - - private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) { - Map json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index(parameterTool.getRequired("index")) - .type(parameterTool.getRequired("type")) - .id(element) - .source(json); - } -} diff --git a/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstarts/test/QuickstartExample.java b/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstarts/test/QuickstartExample.java new file mode 100644 index 0000000000000..1ca4b7c747251 --- /dev/null +++ b/flink-end-to-end-tests/flink-quickstart-test/src/main/java/org/apache/flink/quickstarts/test/QuickstartExample.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.quickstarts.test; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.quickstarts.test.utils.Utils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.CloseableIterator; + +/** End to end test for quickstarts. */ +public class QuickstartExample { + + public static void main(String[] args) throws Exception { + + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + + if (parameterTool.getNumberOfParameters() < 1) { + System.out.println("Missing parameters!\nUsage: --numRecords "); + return; + } + + int numRecordsToEmit = parameterTool.getInt("numRecords"); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(5000); + + DataStream source = + env.fromSequence(0, numRecordsToEmit - 1) + .map((MapFunction) Utils::prefix); + + try (CloseableIterator data = source.collectAsync()) { + env.execute("Quickstart example"); + + int count = 0; + while (data.hasNext()) { + data.next(); + count++; + } + if (count != numRecordsToEmit) { + throw new RuntimeException( + String.format( + "Unexpected number of records; expected :%s actual: %s", + numRecordsToEmit, count)); + } + } + } +} diff --git a/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/Elasticsearch7SinkExample.scala b/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/Elasticsearch7SinkExample.scala deleted file mode 100644 index 985b2a62296e2..0000000000000 --- a/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/Elasticsearch7SinkExample.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.quickstarts.test - -import org.apache.flink.api.connector.sink2.SinkWriter -import org.apache.flink.api.java.utils.ParameterTool -import org.apache.flink.connector.elasticsearch.sink.{Elasticsearch7SinkBuilder, RequestIndexer} -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment - -import org.apache.http.HttpHost -import org.elasticsearch.action.index.IndexRequest -import org.elasticsearch.client.Requests - -import scala.collection.JavaConversions.mapAsJavaMap - -object Elasticsearch7SinkExample { - def main(args: Array[String]) { - - val parameterTool = ParameterTool.fromArgs(args) - - if (parameterTool.getNumberOfParameters < 3) { - println( - "Missing parameters!\n" + "Usage:" + - " --numRecords --index --type ") - return - } - val env = StreamExecutionEnvironment.getExecutionEnvironment - env.enableCheckpointing(5000) - - val source: DataStream[(String)] = env - .fromSequence(0, 20 - 1) - .map(v => "message #" + v.toString) - - source.sinkTo( - new Elasticsearch7SinkBuilder[String] - // This instructs the sink to emit after every element, otherwise they would - // be buffered - .setBulkFlushMaxActions(1) - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter( - (element: String, context: SinkWriter.Context, indexer: RequestIndexer) => - indexer.add(createIndexRequest(element, parameterTool))) - .build()) - - env.execute("Elasticsearch7.x end to end sink test example") - } - - def createIndexRequest(element: (String), parameterTool: (ParameterTool)): IndexRequest = { - - val json2 = Map( - "data" -> element.asInstanceOf[AnyRef] - ) - - Requests.indexRequest - .index(parameterTool.getRequired("index")) - .`type`(parameterTool.getRequired("type")) - .source(mapAsJavaMap(json2)) - } -} diff --git a/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/QuickstartExample.scala b/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/QuickstartExample.scala new file mode 100644 index 0000000000000..fb6a1b6ab8f8a --- /dev/null +++ b/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/QuickstartExample.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.quickstarts.test + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.quickstarts.test.utils.Utils +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment + +object QuickstartExample { + def main(args: Array[String]) { + + val parameterTool = ParameterTool.fromArgs(args) + + if (parameterTool.getNumberOfParameters < 1) { + println("Missing parameters!\nUsage: --numRecords ") + return + } + + val numRecordsToEmit = parameterTool.getInt("numRecords") + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.enableCheckpointing(5000) + + val source: DataStream[(String)] = env + .fromSequence(0, numRecordsToEmit - 1) + .map(v => Utils.prefix(v)) + + val data = source.collectAsync() + + env.execute("Elasticsearch7.x end to end sink test example") + + var count = 0 + while (data.hasNext) { + data.next() + count += 1 + } + if (count != numRecordsToEmit) { + throw new RuntimeException( + s"Unexpected number of records; expected :$numRecordsToEmit actual: $count") + } + } +} diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index a9214b374e83f..a429891d820cd 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -53,6 +53,7 @@ under the License. flink-queryable-state-test flink-local-recovery-and-allocation-test flink-quickstart-test + flink-quickstart-test-dummy-dependency flink-confluent-schema-registry flink-stream-state-ttl-test flink-sql-client-test diff --git a/flink-end-to-end-tests/test-scripts/test_quickstarts.sh b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh index 93f7d225c1d66..3468ef1fd1d7c 100755 --- a/flink-end-to-end-tests/test-scripts/test_quickstarts.sh +++ b/flink-end-to-end-tests/test-scripts/test_quickstarts.sh @@ -22,18 +22,17 @@ # FLINK_DIR= flink-end-to-end-tests/test-scripts/test_quickstarts.sh source "$(dirname "$0")"/common.sh -source "$(dirname "$0")"/elasticsearch-common.sh TEST_TYPE=$1 -TEST_CLASS_NAME=Elasticsearch7SinkExample +TEST_CLASS_NAME=QuickstartExample TEST_FILE_PATH=flink-quickstart-test/src/main/${TEST_TYPE}/org/apache/flink/quickstarts/test/${TEST_CLASS_NAME}.${TEST_TYPE} QUICKSTARTS_FILE_PATH=${TEST_DATA_DIR}/flink-quickstart-${TEST_TYPE}/src/main/${TEST_TYPE}/org/apache/flink/quickstart/${TEST_CLASS_NAME}.${TEST_TYPE} ES_INDEX=index_${TEST_TYPE} -# get the elasticsearch dependency from flink-quickstart-test +# get the dummy Flink dependency from flink-quickstart-test ES_DEPENDENCY="\ org.apache.flink\ -$(awk '/flink-connector-elasticsearch/ {print $1}' ${END_TO_END_DIR}/flink-quickstart-test/target/dependency-reduced-pom.xml)\ +$(awk '/flink-quickstart-test-dummy-dependency/ {print $1}' ${END_TO_END_DIR}/flink-quickstart-test/target/dependency-reduced-pom.xml)\ \${flink.version}\ " @@ -56,27 +55,18 @@ run_mvn archetype:generate \ cd "${ARTIFACT_ID}" -# use the Flink Elasticsearch sink example job code in flink-quickstart-test to simulate modifications to contained job +# simulate modifications to contained job cp ${END_TO_END_DIR}/${TEST_FILE_PATH} "$QUICKSTARTS_FILE_PATH" sed -i -e 's/package org.apache.flink.quickstarts.test/package org.apache.flink.quickstart/' "${QUICKSTARTS_FILE_PATH}" position=$(awk '// {print NR}' pom.xml | head -1) -# Add ElasticSearch dependency to pom.xml +# Add dummy Flink dependency to pom.xml sed -i -e ''$(($position + 1))'i\ '${ES_DEPENDENCY}'' pom.xml sed -i -e "s/org.apache.flink.quickstart.DataStreamJob/org.apache.flink.quickstart.$TEST_CLASS_NAME/" pom.xml -case $PROFILE in -*"scala-2.12"*) - # all good - ;; -*"scala-"*) - echo "UNSUPPORTED SCALA VERSION" - exit 1 -esac - run_mvn clean package cd target @@ -94,31 +84,18 @@ else exit 1 fi -if [[ `grep -c "org/apache/flink/quickstart/DataStreamJob.class" contentsInJar.txt` -eq '0' && \ - `grep -c "org/apache/flink/quickstart/Elasticsearch7SinkExample.class" contentsInJar.txt` -eq '0' && \ - `grep -c "org/apache/flink/connector/elasticsearch" contentsInJar.txt` -eq '0' ]]; then +if [[ `grep -c "org/apache/flink/quickstarts/test/Utils.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/quickstart/${TEST_CLASS_NAME}.class" contentsInJar.txt` -eq '0' ]]; then - echo "Failure: Since Elasticsearch7SinkExample.class and other user classes are not included in the jar. " + echo "Failure: Since ${TEST_CLASS_NAME}.class and other user classes are not included in the jar. " exit 1 else - echo "Success: Elasticsearch7SinkExample.class and other user classes are included in the jar." + echo "Success: ${TEST_CLASS_NAME}.class and other user classes are included in the jar." fi -setup_elasticsearch "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.10.2-linux-x86_64.tar.gz" 7 -wait_elasticsearch_working - -function shutdownAndCleanup { - shutdown_elasticsearch_cluster "$ES_INDEX" -} -on_exit shutdownAndCleanup - TEST_PROGRAM_JAR=${TEST_DATA_DIR}/${ARTIFACT_ID}/target/${ARTIFACT_ID}-${ARTIFACT_VERSION}.jar start_cluster -${FLINK_DIR}/bin/flink run -c org.apache.flink.quickstart.Elasticsearch7SinkExample "$TEST_PROGRAM_JAR" \ - --numRecords 20 \ - --index "${ES_INDEX}" \ - --type type - -verify_result_line_number 20 "${ES_INDEX}" +${FLINK_DIR}/bin/flink run -c org.apache.flink.quickstart.${TEST_CLASS_NAME} "$TEST_PROGRAM_JAR" \ + --numRecords 20