Skip to content

Commit

Permalink
Merge pull request #139 from datastax/feature/duplicate-values-in-list
Browse files Browse the repository at this point in the history
Implements workaround for C* bug (duplicates in list on insert/update…
  • Loading branch information
pravinbhat authored May 4, 2023
2 parents c333263 + 727cd34 commit bdec1b1
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 48 deletions.
23 changes: 2 additions & 21 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<revision>3.4.1</revision>
<revision>3.4.2</revision>
<scala.version>2.12.17</scala.version>
<scala.main.version>2.12</scala.main.version>
<spark.version>3.3.1</spark.version>
Expand Down Expand Up @@ -48,21 +48,7 @@
<artifactId>spark-sql_${scala.main.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.main.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>apache-log4j-extras</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_${scala.main.version}</artifactId>
Expand All @@ -74,11 +60,6 @@
<version>3.1.15</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.19.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
Expand Down
29 changes: 19 additions & 10 deletions src/main/java/datastax/astra/migrate/AbstractJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import datastax.astra.migrate.schema.ColumnInfo;
import datastax.astra.migrate.schema.TableInfo;
import datastax.astra.migrate.schema.TypeInfo;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,6 +30,8 @@ public class AbstractJobSession extends BaseJobSession {
protected List<String> ttlWTCols;
protected String tsReplaceValStr;
protected long tsReplaceVal;
protected long customWriteTime = 0l;
protected long incrementWriteTime = 0l;

protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
this(sourceSession, astraSession, sc, false);
Expand Down Expand Up @@ -67,7 +69,6 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
logger.info("PARAM -- Destination Table: {}", astraKeyspaceTable.split("\\.")[1]);
logger.info("PARAM -- ReadRateLimit: {}", readLimiter.getRate());
logger.info("PARAM -- WriteRateLimit: {}", writeLimiter.getRate());
logger.info("PARAM -- WriteTimestampFilter: {}", writeTimeStampFilter);

tableInfo = TableInfo.getInstance(sourceSession, sourceKeyspaceTable.split("\\.")[0],
sourceKeyspaceTable.split("\\.")[1], Util.getSparkPropOrEmpty(sc, "spark.query.origin"));
Expand All @@ -90,18 +91,26 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
}
String maxWriteTimeStampFilterStr =
Util.getSparkPropOr(sc, "spark.origin.maxWriteTimeStampFilter", "0");
if (null != maxWriteTimeStampFilterStr && maxWriteTimeStampFilterStr.trim().length() > 1) {
if (StringUtils.isNotBlank(maxWriteTimeStampFilterStr)) {
maxWriteTimeStampFilter = Long.parseLong(maxWriteTimeStampFilterStr);
}

String customWriteTimeStr =
Util.getSparkPropOr(sc, "spark.target.custom.writeTime", "0");
if (null != customWriteTimeStr && customWriteTimeStr.trim().length() > 1 && StringUtils.isNumeric(customWriteTimeStr.trim())) {
customWritetime = Long.parseLong(customWriteTimeStr);
Util.getSparkPropOr(sc, "spark.target.writeTime.fixedValue", "0");
if (StringUtils.isNotBlank(customWriteTimeStr) && StringUtils.isNumeric(customWriteTimeStr)) {
customWriteTime = Long.parseLong(customWriteTimeStr);
}

String incrWriteTimeStr =
Util.getSparkPropOr(sc, "spark.target.writeTime.incrementBy", "0");
if (StringUtils.isNotBlank(incrWriteTimeStr) && StringUtils.isNumeric(incrWriteTimeStr)) {
incrementWriteTime = Long.parseLong(incrWriteTimeStr);
}

logger.info("PARAM -- TTL-WriteTime Columns: {}", ttlWTCols);
logger.info("PARAM -- WriteTimestampFilter: {}", writeTimeStampFilter);
logger.info("PARAM -- WriteTimes Filter: {}", writeTimeStampFilter);
logger.info("PARAM -- WriteTime Custom Value: {}", customWriteTime);
logger.info("PARAM -- WriteTime Increment Value: {}", incrementWriteTime);
if (writeTimeStampFilter) {
logger.info("PARAM -- minWriteTimeStampFilter: {} datetime is {}", minWriteTimeStampFilter,
Instant.ofEpochMilli(minWriteTimeStampFilter / 1000));
Expand Down Expand Up @@ -193,10 +202,10 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
if (!ttlWTCols.isEmpty()) {
boundInsertStatement = boundInsertStatement.set(index, getLargestTTL(sourceRow), Integer.class);
index++;
if (customWritetime > 0) {
boundInsertStatement = boundInsertStatement.set(index, customWritetime, Long.class);
if (customWriteTime > 0) {
boundInsertStatement = boundInsertStatement.set(index, customWriteTime, Long.class);
} else {
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow), Long.class);
boundInsertStatement = boundInsertStatement.set(index, getLargestWriteTimeStamp(sourceRow) + incrementWriteTime, Long.class);
}
}
}
Expand Down
23 changes: 11 additions & 12 deletions src/main/java/datastax/astra/migrate/BaseJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,26 @@ public abstract class BaseJobSession {
// Rate = Total Throughput (write/read per sec) / Total Executors
protected RateLimiter readLimiter;
protected RateLimiter writeLimiter;
protected Integer maxRetries = 10;
protected int maxRetries = 10;
protected AtomicLong readCounter = new AtomicLong(0);
protected Integer batchSize = 1;
protected Integer fetchSizeInRows = 1000;
protected Integer printStatsAfter = 100000;
protected int batchSize = 1;
protected int fetchSizeInRows = 1000;
protected int printStatsAfter;

protected Boolean writeTimeStampFilter = Boolean.FALSE;
protected Long minWriteTimeStampFilter = 0l;
protected Long maxWriteTimeStampFilter = Long.MAX_VALUE;
protected Long customWritetime = 0l;
protected boolean writeTimeStampFilter;
protected long minWriteTimeStampFilter = 0l;
protected long maxWriteTimeStampFilter = Long.MAX_VALUE;

protected Boolean isCounterTable = false;
protected boolean isCounterTable;

protected String sourceKeyspaceTable;
protected String astraKeyspaceTable;

protected Boolean hasRandomPartitioner;
protected Boolean filterData;
protected boolean hasRandomPartitioner;
protected boolean filterData;
protected String filterColName;
protected String filterColType;
protected Integer filterColIndex;
protected int filterColIndex;
protected String filterColValue;
protected String sourceSelectCondition;

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/datastax/astra/migrate/DiffJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public class DiffJobSession extends CopyJobSession {
private final AtomicLong validCounter = new AtomicLong(0);
private final AtomicLong skippedCounter = new AtomicLong(0);
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
protected Boolean autoCorrectMissing = false;
protected Boolean autoCorrectMismatch = false;
protected boolean autoCorrectMissing;
protected boolean autoCorrectMismatch;

private DiffJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
super(sourceSession, astraSession, sc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class GuardrailJobSession extends BaseJobSession {
protected AtomicLong readCounter = new AtomicLong(0);
protected AtomicLong largeRowCounter = new AtomicLong(0);
protected AtomicLong largeFieldCounter = new AtomicLong(0);
protected Integer guardrailColSizeInKB;
protected int guardrailColSizeInKB;

protected GuardrailJobSession(CqlSession session, SparkConf sc) {
super(sc);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/datastax/astra/migrate/Util.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package datastax.astra.migrate;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;

import java.io.BufferedReader;
Expand Down
6 changes: 5 additions & 1 deletion src/resources/cdm.properties
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ spark.batchSize 10
#spark.read.fetch.sizeInRows 1000

# ENABLE ONLY IF YOU WANT TO USE CUSTOM FIXED WRITETIME VALUE ON TARGET
#spark.target.custom.writeTime 0
#spark.target.writeTime.fixedValue 0

# ENABLE ONLY IF YOU WANT TO INCREMENT SOURCE WRITETIME VALUE
# DUPLICATES IN LIST FIELDS: USE THIS WORKAROUND FOR CASSANDRA BUG https://issues.apache.org/jira/browse/CASSANDRA-11368
#spark.target.writeTime.incrementBy 0

# ONLY USE when running in Guardrail mode to identify large fields
#spark.guardrail.colSizeInKB 1024
Expand Down

0 comments on commit bdec1b1

Please sign in to comment.