Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: databricks/spark-redshift
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: master
Choose a base ref
...
head repository: udemy/spark-redshift
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref
Able to merge. These branches can be automatically merged.
  • 7 commits
  • 15 files changed
  • 1 contributor

Commits on Jan 31, 2019

  1. Copy the full SHA
    3b6a318 View commit details
  2. Copy the full SHA
    b15f13a View commit details
  3. Copy the full SHA
    e9cacb1 View commit details
  4. Copy the full SHA
    ca9782d View commit details
  5. Support decimal type properly

    Redshift handles Decimal as string currently when loading data from Avro format since Redshift used older version of Avro
    The Avro supported Decimal type natively from 1.7.7 - https://issues.apache.org/jira/browse/AVRO-1402
    sungjuly committed Jan 31, 2019
    Copy the full SHA
    858abe4 View commit details

Commits on Feb 5, 2019

  1. Merge pull request #2 from udemy/support-spark-24

    Support spark 24
    sungjuly authored Feb 5, 2019
    Copy the full SHA
    c729aba View commit details
  2. Merge pull request #3 from udemy/support-decimal-type

    Support decimal type properly
    sungjuly authored Feb 5, 2019
    Copy the full SHA
    6e14490 View commit details
18 changes: 0 additions & 18 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -25,24 +25,6 @@ matrix:
- jdk: openjdk7
scala: 2.11.7
env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0" AWS_JAVA_SDK_VERSION="1.7.4"
env:
global:
# AWS_REDSHIFT_JDBC_URL
- secure: "RNkxdKcaKEYuJqxli8naazp42qO5/pgueIzs+J5rHwl39jcBvJMgW3DX8kT7duzdoBb/qrolj/ttbQ3l/30P45+djn0BEwcJMX7G/FGpZYD23yd03qeq7sOKPQl2Ni/OBttYHJMah5rI6aPmAysBZMQO7Wijdenb/RUiU2YcZp0="
# AWS_REDSHIFT_PASSWORD
- secure: "g5li3gLejD+/2BIqIm+qHiqBUvCc5l0qnftVaVlLtL7SffErp/twDiFP4gW8eqnFqi2GEC1c9Shf7Z9cOIUunNSBQZdYIVG0f38UfBeDP14nOoIuwZ974O5yggbgZhX0cKvJzINcENGoRNk0FzRwgOdCCiF05IMnRqQxI3C24fE="
# AWS_REDSHIFT_USER
- secure: "LIkY/ZpBXK3vSFsdpBSRXEsgfD2wDF52X8OZOlyBJOiZpS4y1/obj8b3VQABDPyPH95bGX/LOpM0vVM137rYgF0pskgVEzLMyZOPpwYqNGPf/d4BtQhBRc8f7+jmr6D4Hrox4jCl0cCKaeiTazun2+Y9E+zgCUDvQ8y9qGctR2k="
# TEST_AWS_ACCESS_KEY_ID
- secure: "bsB6YwkscUxtzcZOKja4Y69IR3JqvCP3W/4vFftW/v33/hOC3EBz7TVNKS+ZIomBUQYJnzsMfM59bj7YEc3KZe8WxIcUdLI40hg0X5O1RhJDNPW+0oGbWshmzyua+hY1y7nRja+8/17tYTbAi1+MhscRu+O/2aWaXolA9BicuX0="
# TEST_AWS_SECRET_ACCESS_KEY
- secure: "cGxnZh4be9XiPBOMxe9wHYwEfrWNw4zSjmvGFEC9UUV11ydHLo5wrXtcTVFmY7qxUxYeb0NB2N+CQXE0GcyUKoTviKG9sOS3cxR1q30FsdOVcWDKAzpBUmzDTMwDLAUMysziyOtMorDlNVydqYdYLMpiUN0O+eDKA+iOHlJp7fo="
# STS_ROLE_ARN
- secure: "cuyemI1bqPkWBD5B1FqIKDJb5g/SX5x8lrzkO0J/jkyGY0VLbHxrl5j/9PrKFuvraBK3HC56HEP1Zg+IMvh+uv0D+p5y14C97fAzE33uNgR2aVkamOo92zHvxvXe7zBtqc8rztWsJb1pgkrY7SdgSXgQc88ohey+XecDh4TahTY="
# AWS_S3_SCRATCH_SPACE
- secure: "LvndQIW6dHs6nyaMHtblGI/oL+s460lOezFs2BoD0Isenb/O/IM+nY5K9HepTXjJIcq8qvUYnojZX1FCrxxOXX2/+/Iihiq7GzJYdmdMC6hLg9bJYeAFk0dWYT88/AwadrJCBOa3ockRLhiO3dkai7Ki5+M1erfaFiAHHMpJxYQ="
# AWS_S3_CROSS_REGION_SCRATCH_SPACE
- secure: "esYmBqt256Dc77HT68zoaE/vtsFGk2N+Kt+52RlR0cjHPY1q5801vxLbeOlpYb2On3x8YckE++HadjL40gwSBsca0ffoogq6zTlfbJYDSQkQG1evxXWJZLcafB0igfBs/UbEUo7EaxoAJQcLgiWWwUdO0a0iU1ciSVyogZPagL0="

script:
- ./dev/run-tests-travis.sh
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -3,6 +3,9 @@
[![Build Status](https://travis-ci.org/databricks/spark-redshift.svg?branch=master)](https://travis-ci.org/databricks/spark-redshift)
[![codecov.io](http://codecov.io/github/databricks/spark-redshift/coverage.svg?branch=master)](http://codecov.io/github/databricks/spark-redshift?branch=master)

## Disclaimer
This is fork version from Databricks's spark-redshift repository. Our custom changes only tested with Spark **2.4.0** version. These custom changes may not be worked with older version of Spark

## Note

To ensure the best experience for our customers, we have decided to inline this connector directly in Databricks Runtime. The latest version of Databricks Runtime (3.0+) includes an advanced version of the RedShift connector for Spark that features both performance improvements (full query pushdown) as well as security improvements (automatic encryption). For more information, refer to the <a href="https://docs.databricks.com/spark/latest/data-sources/aws/amazon-redshift.html">Databricks documentation</a>. As a result, we will no longer be making releases separately from Databricks Runtime.
8 changes: 4 additions & 4 deletions project/SparkRedshiftBuild.scala
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@ object SparkRedshiftBuild extends Build {
organization := "com.databricks",
scalaVersion := "2.11.7",
crossScalaVersions := Seq("2.10.5", "2.11.7"),
sparkVersion := "2.0.0",
sparkVersion := "2.4.0",
testSparkVersion := sys.props.get("spark.testVersion").getOrElse(sparkVersion.value),
testSparkAvroVersion := sys.props.get("sparkAvro.testVersion").getOrElse("3.0.0"),
testHadoopVersion := sys.props.get("hadoop.testVersion").getOrElse("2.2.0"),
@@ -64,7 +64,7 @@ object SparkRedshiftBuild extends Build {
"com.eclipsesource.minimal-json" % "minimal-json" % "0.9.4",
// We require spark-avro, but avro-mapred must be provided to match Hadoop version.
// In most cases, avro-mapred will be provided as part of the Spark assembly JAR.
"com.databricks" %% "spark-avro" % "3.0.0",
"org.apache.spark" %% "spark-avro" % sparkVersion.value,
if (testHadoopVersion.value.startsWith("1")) {
"org.apache.avro" % "avro-mapred" % "1.7.7" % "provided" classifier "hadoop1" exclude("org.mortbay.jetty", "servlet-api")
} else {
@@ -75,7 +75,7 @@ object SparkRedshiftBuild extends Build {
// A Redshift-compatible JDBC driver must be present on the classpath for spark-redshift to work.
// For testing, we use an Amazon driver, which is available from
// http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html
"com.amazon.redshift" % "jdbc4" % "1.1.7.1007" % "test" from "https://s3.amazonaws.com/redshift-downloads/drivers/RedshiftJDBC4-1.1.7.1007.jar",
"com.amazon.redshift" % "jdbc41" % "1.2.12.1017" % "test" from "https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.12.1017/RedshiftJDBC41-1.2.12.1017.jar",
// Although support for the postgres driver is lower priority than support for Amazon's
// official Redshift driver, we still run basic tests with it.
"postgresql" % "postgresql" % "8.3-606.jdbc4" % "test",
@@ -118,7 +118,7 @@ object SparkRedshiftBuild extends Build {
"org.apache.spark" %% "spark-core" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") force(),
"org.apache.spark" %% "spark-sql" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") force(),
"org.apache.spark" %% "spark-hive" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") force(),
"com.databricks" %% "spark-avro" % testSparkAvroVersion.value % "test" exclude("org.apache.avro", "avro-mapred") force()
"org.apache.spark" %% "spark-avro" % testSparkVersion.value % "test" exclude("org.apache.avro", "avro-mapred") force()
),
// Although spark-avro declares its avro-mapred dependency as `provided`, its version of the
// dependency can still end up on the classpath during tests, which breaks the tests for
Original file line number Diff line number Diff line change
@@ -44,7 +44,6 @@ class AWSCredentialsInUriIntegrationSuite extends IntegrationSuiteBase {
// Override this method so that we do not set the credentials in sc.hadoopConf.
override def beforeAll(): Unit = {
assert(tempDir.contains("AKIA"), "tempdir did not contain AWS credentials")
assert(!AWS_SECRET_ACCESS_KEY.contains("/"), "AWS secret key should not contain slash")
sc = new SparkContext("local", getClass.getSimpleName)
conn = DefaultJDBCWrapper.getConnector(None, jdbcUrl, None)
}
Original file line number Diff line number Diff line change
@@ -54,16 +54,19 @@ trait IntegrationSuiteBase
protected val AWS_REDSHIFT_JDBC_URL: String = loadConfigFromEnv("AWS_REDSHIFT_JDBC_URL")
protected val AWS_REDSHIFT_USER: String = loadConfigFromEnv("AWS_REDSHIFT_USER")
protected val AWS_REDSHIFT_PASSWORD: String = loadConfigFromEnv("AWS_REDSHIFT_PASSWORD")
protected val AWS_ACCESS_KEY_ID: String = loadConfigFromEnv("TEST_AWS_ACCESS_KEY_ID")
protected val AWS_SECRET_ACCESS_KEY: String = loadConfigFromEnv("TEST_AWS_SECRET_ACCESS_KEY")
protected val AWS_ACCESS_KEY_ID: String = loadConfigFromEnv("AWS_ACCESS_KEY_ID")
protected val AWS_SECRET_ACCESS_KEY: String = loadConfigFromEnv("AWS_SECRET_ACCESS_KEY")
// Path to a directory in S3 (e.g. 's3n://bucket-name/path/to/scratch/space').
protected val AWS_S3_SCRATCH_SPACE: String = loadConfigFromEnv("AWS_S3_SCRATCH_SPACE")
require(AWS_S3_SCRATCH_SPACE.contains("s3n"), "must use s3n:// URL")

protected def jdbcUrl: String = {
s"$AWS_REDSHIFT_JDBC_URL?user=$AWS_REDSHIFT_USER&password=$AWS_REDSHIFT_PASSWORD"
s"$AWS_REDSHIFT_JDBC_URL?user=$AWS_REDSHIFT_USER&password=$AWS_REDSHIFT_PASSWORD&ssl=true"
}

protected def jdbcUrlNoUserPassword: String = {
s"$AWS_REDSHIFT_JDBC_URL?ssl=true"
}
/**
* Random suffix appended appended to table and directory names in order to avoid collisions
* between separate Travis builds.
Original file line number Diff line number Diff line change
@@ -31,14 +31,14 @@ class RedshiftCredentialsInConfIntegrationSuite extends IntegrationSuiteBase {
val tableName = s"roundtrip_save_and_load_$randomSuffix"
try {
write(df)
.option("url", AWS_REDSHIFT_JDBC_URL)
.option("url", jdbcUrlNoUserPassword)
.option("user", AWS_REDSHIFT_USER)
.option("password", AWS_REDSHIFT_PASSWORD)
.option("dbtable", tableName)
.save()
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
val loadedDf = read
.option("url", AWS_REDSHIFT_JDBC_URL)
.option("url", jdbcUrlNoUserPassword)
.option("user", AWS_REDSHIFT_USER)
.option("password", AWS_REDSHIFT_PASSWORD)
.option("dbtable", tableName)
18 changes: 16 additions & 2 deletions src/it/scala/com/databricks/spark/redshift/RedshiftReadSuite.scala
Original file line number Diff line number Diff line change
@@ -197,16 +197,30 @@ class RedshiftReadSuite extends IntegrationSuiteBase {
s"INSERT INTO $tableName VALUES ('NaN'), ('Infinity'), ('-Infinity')")
conn.commit()
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
// Due to #98, we use Double here instead of float:
checkAnswer(
read.option("dbtable", tableName).load(),
Seq(Double.NaN, Double.PositiveInfinity, Double.NegativeInfinity).map(x => Row.apply(x)))
Seq(Float.NaN, Float.PositiveInfinity, Float.NegativeInfinity).map(x => Row.apply(x)))
} finally {
conn.prepareStatement(s"drop table if exists $tableName").executeUpdate()
conn.commit()
}
}

test("test empty string and null") {
withTempRedshiftTable("records_with_empty_and_null_characters") { tableName =>
conn.createStatement().executeUpdate(
s"CREATE TABLE $tableName (x varchar(256))")
conn.createStatement().executeUpdate(
s"INSERT INTO $tableName VALUES ('null'), (''), (null)")
conn.commit()
assert(DefaultJDBCWrapper.tableExists(conn, tableName))
checkAnswer(
read.option("dbtable", tableName).load(),
Seq("null", "", null).map(x => Row.apply(x)))
}
}


test("read special double values (regression test for #261)") {
val tableName = s"roundtrip_special_double_values_$randomSuffix"
try {
13 changes: 11 additions & 2 deletions src/main/scala/com/databricks/spark/redshift/Conversions.scala
Original file line number Diff line number Diff line change
@@ -78,7 +78,7 @@ private[redshift] object Conversions {
*
* Note that instances of this function are NOT thread-safe.
*/
def createRowConverter(schema: StructType): Array[String] => InternalRow = {
def createRowConverter(schema: StructType, nullString: String): Array[String] => InternalRow = {
val dateFormat = createRedshiftDateFormat()
val decimalFormat = createRedshiftDecimalFormat()
val conversionFunctions: Array[String => Any] = schema.fields.map { field =>
@@ -116,7 +116,16 @@ private[redshift] object Conversions {
var i = 0
while (i < schema.length) {
val data = inputRow(i)
converted(i) = if (data == null || data.isEmpty) null else conversionFunctions(i)(data)
converted(i) = if ((data == null || data == nullString) ||
(data.isEmpty && schema.fields(i).dataType != StringType)) {
null
}
else if (data.isEmpty) {
""
}
else {
conversionFunctions(i)(data)
}
i += 1
}
encoder.toRow(externalRow)
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, StructType}

/**
* Internal data source used for reading Redshift UNLOAD files.
@@ -95,8 +95,11 @@ private[redshift] class RedshiftFileFormat extends FileFormat {
// be closed once it is completely iterated, but this is necessary to guard against
// resource leaks in case the task fails or is interrupted.
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
val converter = Conversions.createRowConverter(requiredSchema)
val converter = Conversions.createRowConverter(requiredSchema,
options.getOrElse("nullString", Parameters.DEFAULT_PARAMETERS("csvnullstring")))
iter.map(converter)
}
}

override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = true
}
Original file line number Diff line number Diff line change
@@ -300,46 +300,39 @@ private[redshift] class JDBCWrapper {
// TODO: cleanup types which are irrelevant for Redshift.
val answer = sqlType match {
// scalastyle:off
case java.sql.Types.ARRAY => null
case java.sql.Types.BIGINT => if (signed) { LongType } else { DecimalType(20,0) }
case java.sql.Types.BINARY => BinaryType
case java.sql.Types.BIT => BooleanType // @see JdbcDialect for quirks
case java.sql.Types.BLOB => BinaryType
case java.sql.Types.BOOLEAN => BooleanType
// Null Type
case java.sql.Types.NULL => null

// Character Types
case java.sql.Types.CHAR => StringType
case java.sql.Types.CLOB => StringType
case java.sql.Types.DATALINK => null
case java.sql.Types.NCHAR => StringType
case java.sql.Types.NVARCHAR => StringType
case java.sql.Types.VARCHAR => StringType

// Datetime Types
case java.sql.Types.DATE => DateType
case java.sql.Types.TIME => TimestampType
case java.sql.Types.TIMESTAMP => TimestampType

// Boolean Type
case java.sql.Types.BIT => BooleanType // @see JdbcDialect for quirks
case java.sql.Types.BOOLEAN => BooleanType

// Numeric Types
case java.sql.Types.BIGINT => if (signed) { LongType } else { DecimalType(20,0) }
case java.sql.Types.DECIMAL
if precision != 0 || scale != 0 => DecimalType(precision, scale)
case java.sql.Types.DECIMAL => DecimalType(38, 18) // Spark 1.5.0 default
case java.sql.Types.DISTINCT => null
case java.sql.Types.DOUBLE => DoubleType
case java.sql.Types.FLOAT => FloatType
case java.sql.Types.INTEGER => if (signed) { IntegerType } else { LongType }
case java.sql.Types.JAVA_OBJECT => null
case java.sql.Types.LONGNVARCHAR => StringType
case java.sql.Types.LONGVARBINARY => BinaryType
case java.sql.Types.LONGVARCHAR => StringType
case java.sql.Types.NCHAR => StringType
case java.sql.Types.NCLOB => StringType
case java.sql.Types.NULL => null
case java.sql.Types.NUMERIC
if precision != 0 || scale != 0 => DecimalType(precision, scale)
case java.sql.Types.NUMERIC => DecimalType(38, 18) // Spark 1.5.0 default
case java.sql.Types.NVARCHAR => StringType
case java.sql.Types.OTHER => null
case java.sql.Types.REAL => DoubleType
case java.sql.Types.REF => StringType
case java.sql.Types.ROWID => LongType
// Redshift Real is represented in 4 bytes IEEE Float. https://docs.aws.amazon.com/redshift/latest/dg/r_Numeric_types201.html
case java.sql.Types.REAL => FloatType
case java.sql.Types.SMALLINT => IntegerType
case java.sql.Types.SQLXML => StringType
case java.sql.Types.STRUCT => StringType
case java.sql.Types.TIME => TimestampType
case java.sql.Types.TIMESTAMP => TimestampType
case java.sql.Types.TINYINT => IntegerType
case java.sql.Types.VARBINARY => BinaryType
case java.sql.Types.VARCHAR => StringType
case _ => null
// scalastyle:on
}
Original file line number Diff line number Diff line change
@@ -131,7 +131,6 @@ private[redshift] case class RedshiftRelation(
// Unload data from Redshift into a temporary directory in S3:
val tempDir = params.createPerQueryTempDir()
val unloadSql = buildUnloadStmt(requiredColumns, filters, tempDir, creds)
log.info(unloadSql)
val conn = jdbcWrapper.getConnector(params.jdbcDriver, params.jdbcUrl, params.credentials)
try {
jdbcWrapper.executeInterruptibly(conn.prepareStatement(unloadSql))
@@ -165,6 +164,7 @@ private[redshift] case class RedshiftRelation(
sqlContext.read
.format(classOf[RedshiftFileFormat].getName)
.schema(prunedSchema)
.option("nullString", params.nullString)
.load(filesToRead: _*)
.queryExecution.executedPlan.execute().asInstanceOf[RDD[Row]]
}
@@ -189,11 +189,13 @@ private[redshift] case class RedshiftRelation(
val escapedTableNameOrSubqury = tableNameOrSubquery.replace("\\", "\\\\").replace("'", "\\'")
s"SELECT $columnList FROM $escapedTableNameOrSubqury $whereClause"
}
log.info(query)
// We need to remove S3 credentials from the unload path URI because they will conflict with
// the credentials passed via `credsString`.
val fixedUrl = Utils.fixS3Url(Utils.removeCredentialsFromURI(new URI(tempDir)).toString)

s"UNLOAD ('$query') TO '$fixedUrl' WITH CREDENTIALS '$credsString' ESCAPE MANIFEST"
s"UNLOAD ('$query') TO '$fixedUrl' WITH CREDENTIALS '$credsString'" +
s" ESCAPE MANIFEST NULL AS '${params.nullString}'"
}

private def pruneSchema(schema: StructType, columns: Array[String]): StructType = {
Original file line number Diff line number Diff line change
@@ -223,6 +223,7 @@ private[redshift] class RedshiftWriter(
// However, each task gets its own deserialized copy, making this safe.
val conversionFunctions: Array[Any => Any] = data.schema.fields.map { field =>
field.dataType match {
case _: DecimalType => (v: Any) => if (v == null) null else v.toString
case DateType =>
val dateFormat = Conversions.createRedshiftDateFormat()
(v: Any) => {
@@ -271,6 +272,8 @@ private[redshift] class RedshiftWriter(
// strings. This is necessary for Redshift to be able to load these columns (see #39).
val convertedSchema: StructType = StructType(
schemaWithLowercaseColumnNames.map {
case StructField(name, _: DecimalType, nullable, meta) =>
StructField(name, StringType, nullable, meta)
case StructField(name, DateType, nullable, meta) =>
StructField(name, StringType, nullable, meta)
case StructField(name, TimestampType, nullable, meta) =>
@@ -282,7 +285,7 @@ private[redshift] class RedshiftWriter(
val writer = sqlContext.createDataFrame(convertedRows, convertedSchema).write
(tempFormat match {
case "AVRO" =>
writer.format("com.databricks.spark.avro")
writer.format("avro")
case "CSV" =>
writer.format("csv")
.option("escape", "\"")
Original file line number Diff line number Diff line change
@@ -31,7 +31,8 @@ import org.apache.spark.sql.types._
class ConversionsSuite extends FunSuite {

private def createRowConverter(schema: StructType) = {
Conversions.createRowConverter(schema).andThen(RowEncoder(schema).resolveAndBind().fromRow)
Conversions.createRowConverter(schema, Parameters.DEFAULT_PARAMETERS("csvnullstring"))
.andThen(RowEncoder(schema).resolveAndBind().fromRow)
}

test("Data should be correctly converted") {
Original file line number Diff line number Diff line change
@@ -150,7 +150,7 @@ class RedshiftSourceSuite
|1|f|2015-07-02|0|0.0|42|1239012341823719|-13|asdf|2015-07-02 00:00:00.0
|0||2015-07-03|0.0|-1.0|4141214|1239012341823719||f|2015-07-03 00:00:00
|0|f||-1234152.12312498|100000.0||1239012341823719|24|___\|_123|
||||||||||
|||||||||@NULL@|
""".stripMargin.trim
// scalastyle:on
val expectedQuery = (
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "3.0.0-SNAPSHOT"
version in ThisBuild := "3.0.0"