Skip to content

Commit

Permalink
Merge pull request #88 from datastax/feature/blank-timestamp-in-key
Browse files Browse the repository at this point in the history
Handle blank timestamp values in primary-key columns graciously
  • Loading branch information
pravinbhat authored Feb 22, 2023
2 parents c7e6f3c + 620a193 commit 39ab797
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 40 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<revision>3.1.0</revision>
<revision>3.2.1</revision>
<scala.version>2.12.17</scala.version>
<scala.main.version>2.12</scala.main.version>
<spark.version>3.3.1</spark.version>
Expand Down
93 changes: 68 additions & 25 deletions src/main/java/datastax/astra/migrate/AbstractJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;

public class AbstractJobSession extends BaseJobSession {
Expand All @@ -25,11 +26,11 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,

protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc, boolean isJobMigrateRowsFromFile) {
super(sc);

if (sourceSession == null) {
return;
}

this.sourceSession = sourceSession;
this.astraSession = astraSession;

Expand Down Expand Up @@ -105,14 +106,14 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
}

String selectCols = Util.getSparkProp(sc, "spark.query.origin");
String partionKey = Util.getSparkProp(sc, "spark.query.origin.partitionKey");
String partitionKey = Util.getSparkProp(sc, "spark.query.origin.partitionKey");
String sourceSelectCondition = Util.getSparkPropOrEmpty(sc, "spark.query.condition");
if (!sourceSelectCondition.isEmpty() && !sourceSelectCondition.trim().toUpperCase().startsWith("AND")) {
sourceSelectCondition = " AND " + sourceSelectCondition;
}

final StringBuilder selectTTLWriteTimeCols = new StringBuilder();
String[] allCols = selectCols.split(",");
allCols = selectCols.split(",");
ttlCols.forEach(col -> {
selectTTLWriteTimeCols.append(",ttl(" + allCols[col] + ")");
});
Expand All @@ -138,8 +139,9 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,

String fullSelectQuery;
if (!isJobMigrateRowsFromFile) {
fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable + " where token(" + partionKey.trim()
+ ") >= ? and token(" + partionKey.trim() + ") <= ? " + sourceSelectCondition + " ALLOW FILTERING";
fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable +
" where token(" + partitionKey.trim() + ") >= ? and token(" + partitionKey.trim() + ") <= ? " +
sourceSelectCondition + " ALLOW FILTERING";
} else {
fullSelectQuery = "select " + selectCols + selectTTLWriteTimeCols + " from " + sourceKeyspaceTable + " where " + insertBinds;
}
Expand Down Expand Up @@ -181,6 +183,12 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
}
astraInsertStatement = astraSession.prepare(fullInsertQuery);
}

// Handle rows with blank values for 'timestamp' data-type in primary-key fields
tsReplaceValStr = Util.getSparkPropOr(sc, "spark.target.replace.blankTimestampKeyUsingEpoch", "");
if (!tsReplaceValStr.isEmpty()) {
tsReplaceVal = Long.parseLong(tsReplaceValStr);
}
}

public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRow, Row astraRow) {
Expand All @@ -199,21 +207,8 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
} else {
int index = 0;
for (index = 0; index < selectColTypes.size(); index++) {
MigrateDataType dataTypeObj = selectColTypes.get(index);
Class dataType = dataTypeObj.typeClass;

try {
Object colData = getData(dataTypeObj, index, sourceRow);
if (index < idColTypes.size() && colData == null && dataType == String.class) {
colData = "";
}
boundInsertStatement = boundInsertStatement.set(index, colData, dataType);
} catch (NullPointerException e) {
// ignore the exception for map values being null
if (dataType != Map.class) {
throw e;
}
}
boundInsertStatement = getBoundStatement(sourceRow, boundInsertStatement, index, selectColTypes);
if (boundInsertStatement == null) return null;
}

if (!ttlCols.isEmpty()) {
Expand Down Expand Up @@ -246,12 +241,60 @@ public long getLargestWriteTimeStamp(Row sourceRow) {
public BoundStatement selectFromAstra(PreparedStatement selectStatement, Row sourceRow) {
BoundStatement boundSelectStatement = selectStatement.bind().setConsistencyLevel(readConsistencyLevel);
for (int index = 0; index < idColTypes.size(); index++) {
MigrateDataType dataType = idColTypes.get(index);
boundSelectStatement = boundSelectStatement.set(index, getData(dataType, index, sourceRow),
dataType.typeClass);
boundSelectStatement = getBoundStatement(sourceRow, boundSelectStatement, index, idColTypes);
if (boundSelectStatement == null) return null;
}

return boundSelectStatement;
}

private BoundStatement getBoundStatement(Row sourceRow, BoundStatement boundSelectStatement, int index,
List<MigrateDataType> cols) {
MigrateDataType dataTypeObj = cols.get(index);
Object colData = getData(dataTypeObj, index, sourceRow);

// Handle rows with blank values in primary-key fields
if (index < idColTypes.size()) {
Optional<Object> optionalVal = handleBlankInPrimaryKey(index, colData, dataTypeObj.typeClass, sourceRow);
if (!optionalVal.isPresent()) {
return null;
}
colData = optionalVal.get();
}
boundSelectStatement = boundSelectStatement.set(index, colData, dataTypeObj.typeClass);
return boundSelectStatement;
}

protected Optional<Object> handleBlankInPrimaryKey(int index, Object colData, Class dataType, Row sourceRow) {
return handleBlankInPrimaryKey(index, colData, dataType, sourceRow, true);
}

protected Optional<Object> handleBlankInPrimaryKey(int index, Object colData, Class dataType, Row sourceRow, boolean logWarn) {
// Handle rows with blank values for 'String' data-type in primary-key fields
if (index < idColTypes.size() && colData == null && dataType == String.class) {
if (logWarn) {
logger.warn("For row with Key: {}, found String primary-key column {} with blank value",
getKey(sourceRow), allCols[index]);
}
return Optional.of("");
}

// Handle rows with blank values for 'timestamp' data-type in primary-key fields
if (index < idColTypes.size() && colData == null && dataType == Instant.class) {
if (tsReplaceValStr.isEmpty()) {
logger.error("Skipping row with Key: {} as Timestamp primary-key column {} has invalid blank value. " +
"Alternatively rerun the job with --conf spark.target.replace.blankTimestampKeyUsingEpoch=\"<fixed-epoch-value>\" " +
"option to replace the blanks with a fixed timestamp value", getKey(sourceRow), allCols[index]);
return Optional.empty();
}
if (logWarn) {
logger.warn("For row with Key: {}, found Timestamp primary-key column {} with invalid blank value. " +
"Using value {} instead", getKey(sourceRow), allCols[index], Instant.ofEpochSecond(tsReplaceVal));
}
return Optional.of(Instant.ofEpochSecond(tsReplaceVal));
}

return Optional.of(colData);
}

}
4 changes: 4 additions & 0 deletions src/main/java/datastax/astra/migrate/BaseJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public abstract class BaseJobSession {
protected Integer filterColIndex;
protected String filterColValue;

protected String[] allCols;
protected String tsReplaceValStr;
protected long tsReplaceVal;

protected BaseJobSession(SparkConf sc) {
readConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.read"));
writeConsistencyLevel = Util.mapToConsistencyLevel(Util.getSparkPropOrEmpty(sc, "spark.consistency.write"));
Expand Down
15 changes: 12 additions & 3 deletions src/main/java/datastax/astra/migrate/CopyJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,12 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
astraRow = astraReadResultSet.one();
}

CompletionStage<AsyncResultSet> astraWriteResultSet = astraSession
.executeAsync(bindInsert(astraInsertStatement, sourceRow, astraRow));
BoundStatement bInsert = bindInsert(astraInsertStatement, sourceRow, astraRow);
if (null == bInsert) {
skipCnt++;
continue;
}
CompletionStage<AsyncResultSet> astraWriteResultSet = astraSession.executeAsync(bInsert);
writeResults.add(astraWriteResultSet);
if (writeResults.size() > fetchSizeInRows) {
writeCnt += iterateAndClearWriteResults(writeResults, 1);
Expand Down Expand Up @@ -124,7 +128,12 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
}

writeLimiter.acquire(1);
batchStatement = batchStatement.add(bindInsert(astraInsertStatement, sourceRow, null));
BoundStatement bInsert = bindInsert(astraInsertStatement, sourceRow, null);
if (null == bInsert) {
skipCnt++;
continue;
}
batchStatement = batchStatement.add(bInsert);

// if batch threshold is met, send the writes and clear the batch
if (batchStatement.size() >= batchSize) {
Expand Down
33 changes: 23 additions & 10 deletions src/main/java/datastax/astra/migrate/DiffJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.data.UdtValue;
Expand All @@ -12,6 +13,7 @@
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -74,11 +76,15 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
printCounts(false);
}

CompletionStage<AsyncResultSet> targetRowFuture = astraSession
.executeAsync(selectFromAstra(astraSelectStatement, srcRow));
srcToTargetRowMap.put(srcRow, targetRowFuture);
if (srcToTargetRowMap.size() > fetchSizeInRows) {
diffAndClear(srcToTargetRowMap);
BoundStatement bSelect = selectFromAstra(astraSelectStatement, srcRow);
if (null == bSelect) {
skippedCounter.incrementAndGet();
} else {
CompletionStage<AsyncResultSet> targetRowFuture = astraSession.executeAsync(bSelect);
srcToTargetRowMap.put(srcRow, targetRowFuture);
if (srcToTargetRowMap.size() > fetchSizeInRows) {
diffAndClear(srcToTargetRowMap);
}
}
} else {
readCounter.incrementAndGet();
Expand Down Expand Up @@ -165,13 +171,20 @@ private void diff(Row sourceRow, Row astraRow) {
private String isDifferent(Row sourceRow, Row astraRow) {
StringBuffer diffData = new StringBuffer();
IntStream.range(0, selectColTypes.size()).parallel().forEach(index -> {
MigrateDataType dataType = selectColTypes.get(index);
Object source = getData(dataType, index, sourceRow);
Object astra = getData(dataType, index, astraRow);
MigrateDataType dataTypeObj = selectColTypes.get(index);
Object source = getData(dataTypeObj, index, sourceRow);
if (index < idColTypes.size()) {
Optional<Object> optionalVal = handleBlankInPrimaryKey(index, source, dataTypeObj.typeClass, sourceRow, false);
if (optionalVal.isPresent()) {
source = optionalVal.get();
}
}

Object astra = getData(dataTypeObj, index, astraRow);

boolean isDiff = dataType.diff(source, astra);
boolean isDiff = dataTypeObj.diff(source, astra);
if (isDiff) {
if (dataType.typeClass.equals(UdtValue.class)) {
if (dataTypeObj.typeClass.equals(UdtValue.class)) {
String sourceUdtContent = ((UdtValue) source).getFormattedContents();
String astraUdtContent = ((UdtValue) astra).getFormattedContents();
if (!sourceUdtContent.equals(astraUdtContent)) {
Expand Down
2 changes: 1 addition & 1 deletion src/resources/sparkConf.properties
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ spark.query.types 9,1,4,3
#############################################################################################################

# ENABLE ONLY IF COLUMN NAMES ON TARGET IS DIFFERENT FROM ORIGIN (SCHEMA & DATA-TYPES MUST BE SAME)
#spark.query.target partition-key,clustering-key,order-date,amount
#spark.query.target comma-separated-partition-key,comma-separated-clustering-key,comma-separated-other-columns

# The tool adds TTL & Writetime at row-level (not field-level).
# The largest TTL & Writetime values are used if multiple indexes are listed (comma separated)
Expand Down

0 comments on commit 39ab797

Please sign in to comment.