Skip to content

Commit

Permalink
Merge pull request #2 from scopely/inject-lease-manager
Browse files Browse the repository at this point in the history
Expose Worker constructor taking a LeaseManager<KinesisClientLease>
  • Loading branch information
avram committed Sep 28, 2015
2 parents 32b4a0b + b241202 commit f4fe780
Showing 1 changed file with 38 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,6 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.amazonaws.regions.Region;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
Expand All @@ -40,11 +28,24 @@
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxyFactory;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.leases.exceptions.LeasingException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* Worker is the high level class that Kinesis applications use to start
Expand Down Expand Up @@ -188,6 +189,8 @@ public Worker(
}
}



/**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
Expand All @@ -204,12 +207,33 @@ public Worker(
AmazonDynamoDB dynamoDBClient,
IMetricsFactory metricsFactory,
ExecutorService execService) {

this(recordProcessorFactory, config, kinesisClient,
new KinesisClientLeaseManager(config.getApplicationName(), dynamoDBClient), metricsFactory, execService);
}

/**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
* @param kinesisClient Kinesis Client used for fetching data
* @param leaseManager Implementation of lease manager; likely DynamoDB
* @param metricsFactory Metrics factory used to emit metrics
* @param execService ExecutorService to use for processing records (support for multi-threaded
* consumption)
*/
public Worker(
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config,
AmazonKinesis kinesisClient,
LeaseManager<KinesisClientLease> leaseManager,
IMetricsFactory metricsFactory,
ExecutorService execService) {
this(
config.getApplicationName(),
new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory),
new StreamConfig(
new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient)
.getProxy(config.getStreamName()),
.getProxy(config.getStreamName()),
config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
config.shouldValidateSequenceNumberBeforeCheckpointing(),
Expand All @@ -220,7 +244,7 @@ public Worker(
config.shouldCleanupLeasesUponShardCompletion(),
null,
new KinesisClientLibLeaseCoordinator(
new KinesisClientLeaseManager(config.getApplicationName(), dynamoDBClient),
leaseManager,
config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(),
metricsFactory),
execService,
Expand All @@ -231,8 +255,6 @@ public Worker(
Region region = RegionUtils.getRegion(config.getRegionName());
kinesisClient.setRegion(region);
LOG.debug("The region of Amazon Kinesis client has been set to " + config.getRegionName());
dynamoDBClient.setRegion(region);
LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName());
}
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
if (config.getKinesisEndpoint() != null) {
Expand Down

0 comments on commit f4fe780

Please sign in to comment.