Skip to content

Commit

Permalink
[FLINK-29285][tests] Move TestUtils#getResource
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol authored Sep 14, 2022
1 parent c0165a8 commit 469049a
Show file tree
Hide file tree
Showing 18 changed files with 145 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.flink.connector.aws.testutils.LocalstackContainer;
import org.apache.flink.connector.testframe.container.FlinkContainers;
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.util.DockerImageVersions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.jackson.JacksonMapperFactory;
Expand Down Expand Up @@ -84,7 +84,7 @@ public class KinesisFirehoseTableITTest extends TestLogger {

private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();

private final Path sqlConnectorFirehoseJar = TestUtils.getResource(".*firehose.jar");
private final Path sqlConnectorFirehoseJar = ResourceTestUtils.getResource(".*firehose.jar");

private SdkHttpClient httpClient;
private S3Client s3Client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.flink.connector.testframe.container.FlinkContainers;
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.util.DockerImageVersions;
import org.apache.flink.util.jackson.JacksonMapperFactory;

Expand Down Expand Up @@ -84,7 +84,8 @@ public class KinesisStreamsTableApiIT {
private SdkHttpClient httpClient;
private KinesisClient kinesisClient;

private final Path sqlConnectorKinesisJar = TestUtils.getResource(".*kinesis-streams.jar");
private final Path sqlConnectorKinesisJar =
ResourceTestUtils.getResource(".*kinesis-streams.jar");
private static final Network network = Network.newNetwork();

@ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.util.DockerImageVersions;

import org.testcontainers.containers.KafkaContainer;
Expand Down Expand Up @@ -66,15 +66,15 @@ public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> {
new KafkaSinkExternalContextFactory(
kafka.getContainer(),
Arrays.asList(
TestUtils.getResource("kafka-connector.jar")
ResourceTestUtils.getResource("kafka-connector.jar")
.toAbsolutePath()
.toUri()
.toURL(),
TestUtils.getResource("kafka-clients.jar")
ResourceTestUtils.getResource("kafka-clients.jar")
.toAbsolutePath()
.toUri()
.toURL(),
TestUtils.getResource("flink-connector-testing.jar")
ResourceTestUtils.getResource("flink-connector-testing.jar")
.toAbsolutePath()
.toUri()
.toURL()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.util.DockerImageVersions;

import org.testcontainers.containers.KafkaContainer;
Expand Down Expand Up @@ -66,8 +66,8 @@ public class KafkaSourceE2ECase extends SourceTestSuiteBase<String> {
new KafkaSourceExternalContextFactory(
kafka.getContainer(),
Arrays.asList(
TestUtils.getResource("kafka-connector.jar").toUri().toURL(),
TestUtils.getResource("kafka-clients.jar").toUri().toURL()),
ResourceTestUtils.getResource("kafka-connector.jar").toUri().toURL(),
ResourceTestUtils.getResource("kafka-clients.jar").toUri().toURL()),
PARTITION);

@SuppressWarnings("unused")
Expand All @@ -76,8 +76,8 @@ public class KafkaSourceE2ECase extends SourceTestSuiteBase<String> {
new KafkaSourceExternalContextFactory(
kafka.getContainer(),
Arrays.asList(
TestUtils.getResource("kafka-connector.jar").toUri().toURL(),
TestUtils.getResource("kafka-clients.jar").toUri().toURL()),
ResourceTestUtils.getResource("kafka-connector.jar").toUri().toURL(),
ResourceTestUtils.getResource("kafka-clients.jar").toUri().toURL()),
TOPIC);

public KafkaSourceE2ECase() throws Exception {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.cache.DownloadCache;
import org.apache.flink.tests.util.flink.ClusterController;
import org.apache.flink.tests.util.flink.FlinkResource;
Expand Down Expand Up @@ -101,8 +101,8 @@ private static Configuration getConfiguration() {

@ClassRule public static final DownloadCache DOWNLOAD_CACHE = DownloadCache.get();

private static final Path sqlAvroJar = TestUtils.getResource(".*avro.jar");
private static final Path sqlToolBoxJar = TestUtils.getResource(".*SqlToolbox.jar");
private static final Path sqlAvroJar = ResourceTestUtils.getResource(".*avro.jar");
private static final Path sqlToolBoxJar = ResourceTestUtils.getResource(".*SqlToolbox.jar");
private final List<Path> apacheAvroJars = new ArrayList<>();
private final Path sqlConnectorKafkaJar;

Expand All @@ -116,7 +116,7 @@ public SQLClientKafkaITCase(
this.kafkaSQLVersion = kafkaSQLVersion;
this.kafkaIdentifier = kafkaIdentifier;

this.sqlConnectorKafkaJar = TestUtils.getResource(kafkaSQLJarPattern);
this.sqlConnectorKafkaJar = ResourceTestUtils.getResource(kafkaSQLJarPattern);
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.connector.testframe.container.FlinkContainers;
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.kafka.containers.SchemaRegistryContainer;
import org.apache.flink.util.DockerImageVersions;

Expand Down Expand Up @@ -65,10 +65,11 @@ public class SQLClientSchemaRegistryITCase {

public static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
public static final String INTER_CONTAINER_REGISTRY_ALIAS = "registry";
private static final Path sqlAvroJar = TestUtils.getResource(".*avro.jar");
private static final Path sqlAvroRegistryJar = TestUtils.getResource(".*avro-confluent.jar");
private static final Path sqlToolBoxJar = TestUtils.getResource(".*SqlToolbox.jar");
private final Path sqlConnectorKafkaJar = TestUtils.getResource(".*kafka.jar");
private static final Path sqlAvroJar = ResourceTestUtils.getResource(".*avro.jar");
private static final Path sqlAvroRegistryJar =
ResourceTestUtils.getResource(".*avro-confluent.jar");
private static final Path sqlToolBoxJar = ResourceTestUtils.getResource(".*SqlToolbox.jar");
private final Path sqlConnectorKafkaJar = ResourceTestUtils.getResource(".*kafka.jar");

@ClassRule public static final Network NETWORK = Network.newNetwork();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.test.util.JobSubmission;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.util.TestLoggerExtension;

import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
Expand Down Expand Up @@ -129,7 +129,7 @@ private static void teardown() {

@Test
public void testKafka() throws Exception {
final Path kafkaExampleJar = TestUtils.getResource(EXAMPLE_JAR_MATCHER);
final Path kafkaExampleJar = ResourceTestUtils.getResource(EXAMPLE_JAR_MATCHER);

final String inputTopic = "test-input-" + "-" + UUID.randomUUID();
final String outputTopic = "test-output" + "-" + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,80 +18,24 @@

package org.apache.flink.tests.util;

import org.apache.flink.test.parameters.ParameterProperty;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.FileVisitOption;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** General test utilities. */
public enum TestUtils {
;

private static final ParameterProperty<Path> MODULE_DIRECTORY =
new ParameterProperty<>("moduleDir", Paths::get);

/**
* Searches for a resource file matching the given regex in the given directory. This method is
* primarily intended to be used for the initialization of static {@link Path} fields for
* resource file(i.e. jar, config file) that reside in the modules {@code target} directory.
*
* @param resourceNameRegex regex pattern to match against
* @return Path pointing to the matching jar
* @throws RuntimeException if none or multiple resource files could be found
*/
public static Path getResource(final String resourceNameRegex) {
// if the property is not set then we are most likely running in the IDE, where the working
// directory is the
// module of the test that is currently running, which is exactly what we want
Path moduleDirectory = MODULE_DIRECTORY.get(Paths.get("").toAbsolutePath());

try (Stream<Path> dependencyResources = Files.walk(moduleDirectory)) {
final List<Path> matchingResources =
dependencyResources
.filter(
jar ->
Pattern.compile(resourceNameRegex)
.matcher(jar.toAbsolutePath().toString())
.find())
.collect(Collectors.toList());
switch (matchingResources.size()) {
case 0:
throw new RuntimeException(
new FileNotFoundException(
String.format(
"No resource file could be found that matches the pattern %s. "
+ "This could mean that the test module must be rebuilt via maven.",
resourceNameRegex)));
case 1:
return matchingResources.get(0);
default:
throw new RuntimeException(
new IOException(
String.format(
"Multiple resource files were found matching the pattern %s. Matches=%s",
resourceNameRegex, matchingResources)));
}
} catch (final IOException ioe) {
throw new RuntimeException("Could not search for resource resource files.", ioe);
}
}

/**
* Copy all the files and sub-directories under source directory to destination directory
* recursively.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.streaming.tests;

import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.util.DockerImageVersions;

import org.slf4j.Logger;
Expand All @@ -44,15 +44,17 @@ String getElasticsearchContainerName() {
new Elasticsearch6SinkExternalContextFactory(
elasticsearch.getContainer(),
Arrays.asList(
TestUtils.getResource("dependencies/elasticsearch6-end-to-end-test.jar")
ResourceTestUtils.getResource(
"dependencies/elasticsearch6-end-to-end-test.jar")
.toAbsolutePath()
.toUri()
.toURL(),
TestUtils.getResource("dependencies/flink-connector-test-utils.jar")
ResourceTestUtils.getResource(
"dependencies/flink-connector-test-utils.jar")
.toAbsolutePath()
.toUri()
.toURL(),
TestUtils.getResource(
ResourceTestUtils.getResource(
"dependencies/flink-connector-elasticsearch-test-utils.jar")
.toAbsolutePath()
.toUri()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.streaming.tests;

import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.util.DockerImageVersions;

import org.slf4j.Logger;
Expand All @@ -44,15 +44,17 @@ String getElasticsearchContainerName() {
new Elasticsearch7SinkExternalContextFactory(
elasticsearch.getContainer(),
Arrays.asList(
TestUtils.getResource("dependencies/elasticsearch7-end-to-end-test.jar")
ResourceTestUtils.getResource(
"dependencies/elasticsearch7-end-to-end-test.jar")
.toAbsolutePath()
.toUri()
.toURL(),
TestUtils.getResource("dependencies/flink-connector-test-utils.jar")
ResourceTestUtils.getResource(
"dependencies/flink-connector-test-utils.jar")
.toAbsolutePath()
.toUri()
.toURL(),
TestUtils.getResource(
ResourceTestUtils.getResource(
"dependencies/flink-connector-elasticsearch-test-utils.jar")
.toAbsolutePath()
.toUri()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.flink.tests.util.hbase;

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.cache.DownloadCache;
import org.apache.flink.tests.util.flink.ClusterController;
import org.apache.flink.tests.util.flink.FlinkResource;
Expand Down Expand Up @@ -96,14 +96,15 @@ public static Collection<Object[]> data() {

@ClassRule public static final DownloadCache DOWNLOAD_CACHE = DownloadCache.get();

private static final Path sqlToolBoxJar = TestUtils.getResource(".*SqlToolbox.jar");
private static final Path hadoopClasspath = TestUtils.getResource(".*hadoop.classpath");
private static final Path sqlToolBoxJar = ResourceTestUtils.getResource(".*SqlToolbox.jar");
private static final Path hadoopClasspath = ResourceTestUtils.getResource(".*hadoop.classpath");
private List<Path> hadoopClasspathJars;

public SQLClientHBaseITCase(String hbaseVersion, String hbaseConnector) {
this.hbase = HBaseResource.get(hbaseVersion);
this.hbaseConnector = hbaseConnector;
this.sqlConnectorHBaseJar = TestUtils.getResource(".*sql-" + hbaseConnector + ".jar");
this.sqlConnectorHBaseJar =
ResourceTestUtils.getResource(".*sql-" + hbaseConnector + ".jar");
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.test.resources.ResourceTestUtils;

/** A Flink Container which would bundles pulsar connector in its classpath. */
public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvironment {
Expand All @@ -47,7 +47,7 @@ public FlinkContainerWithPulsarEnvironment(int numTaskManagers, int numSlotsPerT
}

private static String resourcePath(String jarName) {
return TestUtils.getResource(jarName).toAbsolutePath().toString();
return ResourceTestUtils.getResource(jarName).toAbsolutePath().toString();
}

protected static Configuration flinkConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.flink.tests.scala;

import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.test.util.JobSubmission;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.flink.ClusterController;
import org.apache.flink.tests.util.flink.FlinkResource;
import org.apache.flink.tests.util.flink.FlinkResourceSetup;
Expand Down Expand Up @@ -65,7 +65,8 @@ public static Collection<TestParams> testParameters() {
ScalaJob.class.getCanonicalName(),
builder ->
builder.addJar(
TestUtils.getResource("/scala.jar"), JarLocation.LIB)));
ResourceTestUtils.getResource("/scala.jar"),
JarLocation.LIB)));
}

public ScalaFreeITCase(TestParams testParams) {
Expand All @@ -79,7 +80,7 @@ public ScalaFreeITCase(TestParams testParams) {

@Test
public void testScalaFreeJobExecution() throws Exception {
final Path jobJar = TestUtils.getResource("/jobs.jar");
final Path jobJar = ResourceTestUtils.getResource("/jobs.jar");

try (final ClusterController clusterController = flink.startCluster(1)) {
// if the job fails then this throws an exception
Expand Down
Loading

0 comments on commit 469049a

Please sign in to comment.