diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 3d73bd343..15529c2e0 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -14,18 +14,6 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import com.amazonaws.regions.Region; import com.amazonaws.regions.RegionUtils; import com.amazonaws.services.cloudwatch.AmazonCloudWatch; @@ -40,11 +28,24 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.leases.exceptions.LeasingException; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager; +import com.amazonaws.services.kinesis.leases.impl.LeaseManager; import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory; import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * Worker is the high level class that Kinesis applications use to start @@ -188,6 +189,8 @@ public Worker( } } + + /** * @param recordProcessorFactory Used to get record processor instances for processing data from shards * @param config Kinesis Client Library configuration @@ -204,12 +207,33 @@ public Worker( AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) { + + this(recordProcessorFactory, config, kinesisClient, + new KinesisClientLeaseManager(config.getApplicationName(), dynamoDBClient), metricsFactory, execService); + } + + /** + * @param recordProcessorFactory Used to get record processor instances for processing data from shards + * @param config Kinesis Client Library configuration + * @param kinesisClient Kinesis Client used for fetching data + * @param leaseManager Implementation of lease manager; likely DynamoDB + * @param metricsFactory Metrics factory used to emit metrics + * @param execService ExecutorService to use for processing records (support for multi-threaded + * consumption) + */ + public Worker( + com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory, + KinesisClientLibConfiguration config, + AmazonKinesis kinesisClient, + LeaseManager leaseManager, + IMetricsFactory metricsFactory, + ExecutorService execService) { this( config.getApplicationName(), new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), new StreamConfig( new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) - .getProxy(config.getStreamName()), + .getProxy(config.getStreamName()), config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), config.shouldCallProcessRecordsEvenForEmptyRecordList(), config.shouldValidateSequenceNumberBeforeCheckpointing(), @@ -220,7 +244,7 @@ public Worker( config.shouldCleanupLeasesUponShardCompletion(), null, new KinesisClientLibLeaseCoordinator( - new KinesisClientLeaseManager(config.getApplicationName(), dynamoDBClient), + leaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(), metricsFactory), execService, @@ -231,8 +255,6 @@ public Worker( Region region = RegionUtils.getRegion(config.getRegionName()); kinesisClient.setRegion(region); LOG.debug("The region of Amazon Kinesis client has been set to " + config.getRegionName()); - dynamoDBClient.setRegion(region); - LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName()); } // If a kinesis endpoint was explicitly specified, use it to set the region of kinesis. if (config.getKinesisEndpoint() != null) {