Skip to content

Commit

Permalink
[FLINK-8983] Integrate test_confluent_schema_registry.sh into run-nig…
Browse files Browse the repository at this point in the history
…htly-tests.sh
  • Loading branch information
tillrohrmann committed Jun 28, 2018
1 parent a36b569 commit 9b36604
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 36 deletions.
1 change: 1 addition & 0 deletions flink-end-to-end-tests/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test-scripts/temp-test-directory*
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
src/main/java/example/avro
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ under the License.
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
<excludes>**/example/avro/*</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;

import example.avro.User;
import org.apache.avro.specific.SpecificRecordBase;

import java.util.Properties;

Expand All @@ -40,7 +41,6 @@
public class TestAvroConsumerConfluent {

public static void main(String[] args) throws Exception {
Properties config = new Properties();
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);

Expand All @@ -52,6 +52,7 @@ public static void main(String[] args) throws Exception {
"--schema-registry-url <confluent schema registry> --group.id <some id>");
return;
}
Properties config = new Properties();
config.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers"));
config.setProperty("group.id", parameterTool.getRequired("group.id"));
config.setProperty("zookeeper.connect", parameterTool.getRequired("zookeeper.connect"));
Expand All @@ -62,20 +63,15 @@ public static void main(String[] args) throws Exception {

DataStreamSource<User> input = env
.addSource(
new FlinkKafkaConsumer010(
new FlinkKafkaConsumer010<>(
parameterTool.getRequired("input-topic"),
ConfluentRegistryAvroDeserializationSchema.forSpecific(User.class, schemaRegistryUrl),
config).setStartFromEarliest());

SingleOutputStreamOperator<String> mapToString = input
.map(new MapFunction<User, String>() {
@Override
public String map(User value) throws Exception {
return value.toString();
}
});
.map((MapFunction<User, String>) SpecificRecordBase::toString);

FlinkKafkaProducer010<String> stringFlinkKafkaProducer010 = new FlinkKafkaProducer010(
FlinkKafkaProducer010<String> stringFlinkKafkaProducer010 = new FlinkKafkaProducer010<>(
parameterTool.getRequired("output-topic"),
new SimpleStringSchema(),
config);
Expand Down
2 changes: 1 addition & 1 deletion flink-end-to-end-tests/run-nightly-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ run_test "Elasticsearch (v5.1.2) sink end-to-end test" "$END_TO_END_DIR/test-scr
run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java"
run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala"

run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test-confluent-schema-registry.sh"
run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh"

printf "\n[PASS] All tests passed\n"
exit 0
28 changes: 20 additions & 8 deletions flink-end-to-end-tests/test-scripts/kafka-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ fi

KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
CONFLUENT_DIR=$TEST_DATA_DIR/confluent-3.2.0
SCHEMA_REGISTRY_PORT=8082
SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT}

function setup_kafka_dist {
# download Kafka
Expand All @@ -49,6 +51,9 @@ function setup_confluent_dist {
curl "$CONFLUENT_URL" > $TEST_DATA_DIR/confluent.tgz

tar xzf $TEST_DATA_DIR/confluent.tgz -C $TEST_DATA_DIR/

# fix confluent config
sed -i -e "s#listeners=http://0.0.0.0:8081#listeners=http://0.0.0.0:${SCHEMA_REGISTRY_PORT}#" $CONFLUENT_DIR/etc/schema-registry/schema-registry.properties
}

function start_kafka_cluster {
Expand Down Expand Up @@ -87,14 +92,8 @@ function read_messages_from_kafka {
--consumer-property group.id=$3 2> /dev/null
}

function read_messages_from_kafka_avro {
$CONFLUENT_DIR/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --from-beginning \
--max-messages $1 \
--topic $2 2> /dev/null
}

function send_messages_to_kafka_avro {
echo -e $1 | $CONFLUENT_DIR/bin/kafka-avro-console-producer --broker-list localhost:9092 --topic $2 --property value.schema=$3
echo -e $1 | $CONFLUENT_DIR/bin/kafka-avro-console-producer --broker-list localhost:9092 --topic $2 --property value.schema=$3 --property schema.registry.url=${SCHEMA_REGISTRY_URL}
}

function modify_num_partitions {
Expand All @@ -120,7 +119,20 @@ function get_partition_end_offset {
}

function start_confluent_schema_registry {
$CONFLUENT_DIR/bin/schema-registry-start -daemon $CONFLUENT_DIR/etc/schema-registry/schema-registry.properties
$CONFLUENT_DIR/bin/schema-registry-start -daemon $CONFLUENT_DIR/etc/schema-registry/schema-registry.properties

# wait until the schema registry REST endpoint is up
for i in {1..30}; do
QUERY_RESULT=$(curl "${SCHEMA_REGISTRY_URL}/subjects" 2> /dev/null || true)

if [[ ${QUERY_RESULT} =~ \[.*\] ]]; then
echo "Schema registry is up."
break
fi

echo "Waiting for schema registry..."
sleep 1
done
}

function stop_confluent_schema_registry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ function verify_output {
echo "Output from Flink program does not match expected output."
echo -e "EXPECTED FOR KEY: --$expected--"
echo -e "ACTUAL: --$result--"
PASS=""
exit 1
fi
}
Expand All @@ -41,12 +40,6 @@ function test_cleanup {

stop_kafka_cluster
stop_confluent_schema_registry

# revert our modifications to the Flink distribution
mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml

# make sure to run regular cleanup as well
cleanup
}

trap test_cleanup INT
Expand All @@ -55,26 +48,18 @@ trap test_cleanup EXIT
setup_kafka_dist
setup_confluent_dist

cd flink-end-to-end-tests/flink-confluent-schema-registry
mvn clean package -nsu

start_kafka_cluster
start_confluent_schema_registry
sleep 5

# modify configuration to use port 8082 for Flink
cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
sed -i -e "s/web.port: 8081/web.port: 8082/" $FLINK_DIR/conf/flink-conf.yaml

TEST_PROGRAM_JAR=target/TestAvroConsumerConfluent.jar
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-confluent-schema-registry/target/TestAvroConsumerConfluent.jar

INPUT_MESSAGE_1='{"name":"Alyssa","favoriteNumber":"250","favoriteColor":"green","eventType":"meeting"}'
INPUT_MESSAGE_2='{"name":"Charlie","favoriteNumber":"10","favoriteColor":"blue","eventType":"meeting"}'
INPUT_MESSAGE_3='{"name":"Ben","favoriteNumber":"7","favoriteColor":"red","eventType":"meeting"}'
USER_SCHEMA='{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string","default":""},{"name":"favoriteNumber","type":"string","default":""},{"name":"favoriteColor","type":"string","default":""},{"name":"eventType","type":{"name":"EventType","type":"enum","symbols":["meeting"]}}]}'

curl -X POST \
http://localhost:8081/subjects/users-value/versions \
${SCHEMA_REGISTRY_URL}/subjects/users-value/versions \
-H 'cache-control: no-cache' \
-H 'content-type: application/vnd.schemaregistry.v1+json' \
-d '{"schema": "{\"namespace\": \"example.avro\",\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"favoriteNumber\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"favoriteColor\", \"type\": \"string\", \"default\": \"\"},{\"name\": \"eventType\",\"type\": {\"name\": \"EventType\",\"type\": \"enum\", \"symbols\": [\"meeting\"] }}]}"}'
Expand All @@ -87,11 +72,13 @@ send_messages_to_kafka_avro $INPUT_MESSAGE_3 test-avro-input $USER_SCHEMA

start_cluster

create_kafka_topic 1 1 test-avro-out

# Read Avro message from [test-avro-input], check the schema and send message to [test-avro-out]
$FLINK_DIR/bin/flink run -d $TEST_PROGRAM_JAR \
--input-topic test-avro-input --output-topic test-avro-out \
--bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myconsumer --auto.offset.reset earliest \
--schema-registry-url http://localhost:8081
--schema-registry-url ${SCHEMA_REGISTRY_URL}

#echo "Reading messages from Kafka topic [test-avro-out] ..."

Expand Down

0 comments on commit 9b36604

Please sign in to comment.