Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sk ilya patch 1 #2

Draft
wants to merge 763 commits into
base: master
Choose a base branch
from
Draft

Sk ilya patch 1 #2

wants to merge 763 commits into from

Conversation

sk-ilya
Copy link
Owner

@sk-ilya sk-ilya commented Apr 14, 2022

What changes were proposed in this pull request?

Why are the changes needed?

Does this PR introduce any user-facing change?

How was this patch tested?

andygrove and others added 30 commits May 19, 2021 07:46
… up cached exchanges for re-use

### What changes were proposed in this pull request?
AQE has an optimization where it attempts to reuse compatible exchanges but it does not take into account whether the exchanges are columnar or not, resulting in incorrect reuse under some circumstances.

This PR simply changes the key used to lookup cached stages. It now uses the canonicalized form of the new query stage (potentially created by a plugin) rather than using the canonicalized form of the original exchange.

### Why are the changes needed?
When using the [RAPIDS Accelerator for Apache Spark](https://github.com/NVIDIA/spark-rapids) we sometimes see a new query stage correctly create a row-based exchange and then Spark replaces it with a cached columnar exchange, which is not compatible, and this causes queries to fail.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
The patch has been tested with the query that highlighted this issue. I looked at writing unit tests for this but it would involve implementing a mock columnar exchange in the tests so would be quite a bit of work. If anyone has ideas on other ways to test this I am happy to hear them.

Closes apache#32195 from andygrove/SPARK-35093.

Authored-by: Andy Grove <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
(cherry picked from commit 52e3cf9)
Signed-off-by: Thomas Graves <[email protected]>
### What changes were proposed in this pull request?

`./build/mvn` now downloads the .sha512 checksum of Maven artifacts it downloads, and checks the checksum after download.

### Why are the changes needed?

This ensures the integrity of the Maven artifact during a user's build, which may come from several non-ASF mirrors.

### Does this PR introduce _any_ user-facing change?

Should not affect anything about Spark per se, just the build.

### How was this patch tested?

Manual testing wherein I forced Maven/Scala download, verified checksums are downloaded and checked, and verified it fails on error with a corrupted checksum.

Closes apache#32505 from srowen/SPARK-35373.

Authored-by: Sean Owen <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
### What changes were proposed in this pull request?

This PR is a follow-up of apache#32505 to fix `zinc` installation.

### Why are the changes needed?

Currently, branch-3.1/3.0 is broken.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the GitHub Action.

Closes apache#32591 from dongjoon-hyun/SPARK-35373.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…n build/mvn

### What changes were proposed in this pull request?
change $(command -v curl) to "$(command -v curl)"

### Why are the changes needed?
We need change $(command -v curl) to "$(command -v curl)" to make sure it work when `curl` or `wget` is uninstall. othewise raised:
`build/mvn: line 56: [: /root/spark/build/apache-maven-3.6.3-bin.tar.gz: binary operator expected`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
```
apt remove curl
rm -f build/apache-maven-3.6.3-bin.tar.gz
rm -r build/apache-maven-3.6.3-bin
mvn -v
```

Closes apache#32608 from Yikun/patch-6.

Authored-by: Yikun Jiang <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit 3c3533d)
Signed-off-by: Sean Owen <[email protected]>
## What changes were proposed in this pull request?
Use ` > /dev/null` to replace `-q` in shasum validation.

### Why are the changes needed?
In PR apache#32505 , added the shasum check on maven. The `shasum -a 512 -q -c xxx.sha` is used to validate checksum, the `-q` args is for "don't print OK for each successfully verified file", but `-q` arg is introduce in shasum 6.x version.

So we got the `Unknown option: q`.

```
➜  ~ uname -a
Darwin MacBook.local 19.6.0 Darwin Kernel Version 19.6.0: Mon Apr 12 20:57:45 PDT 2021; root:xnu-6153.141.28.1~1/RELEASE_X86_64 x86_64
➜  ~ shasum -v
5.84
➜  ~ shasum -q
Unknown option: q
Type shasum -h for help
```

it makes ARM CI failed:
[1] https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-arm/

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
`shasum -a 512 -c wrong.sha  > /dev/null` return code 1 without print
`shasum -a 512 -c right.sha  > /dev/null` return code 0 without print
e2e test:
```
rm -f build/apache-maven-3.6.3-bin.tar.gz
rm -r build/apache-maven-3.6.3-bin
mvn -v
```

Closes apache#32604 from Yikun/patch-5.

Authored-by: Yikun Jiang <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit 38fbc0b)
Signed-off-by: Sean Owen <[email protected]>
### What changes were proposed in this pull request?

Not every build system has `shasum`. This PR aims to skip checksum checks on a system without `shasum`.

### Why are the changes needed?

**PREPARE**
```
$ docker run -it --rm -v $PWD:/spark openjdk:11-slim /bin/bash
roota0e001a6e50f:/# cd /spark/
roota0e001a6e50f:/spark# apt-get update
roota0e001a6e50f:/spark# apt-get install curl
roota0e001a6e50f:/spark# build/mvn clean
```

**BEFORE (Failure due to `command not found`)**
```
roota0e001a6e50f:/spark# build/mvn clean
exec: curl --silent --show-error -L https://downloads.lightbend.com/scala/2.12.10/scala-2.12.10.tgz
exec: curl --silent --show-error -L https://www.apache.org/dyn/closer.lua/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz?action=download
exec: curl --silent --show-error -L https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz.sha512
Veryfing checksum from /spark/build/apache-maven-3.6.3-bin.tar.gz.sha512
build/mvn: line 81: shasum: command not found
Bad checksum from https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz.sha512
```

**AFTER**
```
roota0e001a6e50f:/spark# build/mvn clean
exec: curl --silent --show-error -L https://downloads.lightbend.com/scala/2.12.10/scala-2.12.10.tgz
Skipping checksum because shasum is not installed.
exec: curl --silent --show-error -L https://www.apache.org/dyn/closer.lua/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz?action=download
exec: curl --silent --show-error -L https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz.sha512
Skipping checksum because shasum is not installed.
Using `mvn` from path: /spark/build/apache-maven-3.6.3/bin/mvn
```

### Does this PR introduce _any_ user-facing change?

Yes, this will recover the build.

### How was this patch tested?

Manually with the above process.

Closes apache#32613 from dongjoon-hyun/SPARK-35463.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 8e13b8c)
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?

Change the type of `DATASET_ID_TAG` from `Long` to `HashSet[Long]` to allow the logical plan to match multiple datasets.

### Why are the changes needed?

During the transformation from one Dataset to another Dataset, the DATASET_ID_TAG of logical plan won't change if the plan itself doesn't change:

https://github.com/apache/spark/blob/b5241c97b17a1139a4ff719bfce7f68aef094d95/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L234-L237

However, dataset id always changes even if the logical plan doesn't change:
https://github.com/apache/spark/blob/b5241c97b17a1139a4ff719bfce7f68aef094d95/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L207-L208

And this can lead to the mismatch between dataset's id and col's __dataset_id. E.g.,

```scala
  test("SPARK-28344: fail ambiguous self join - Dataset.colRegex as column ref") {
    // The test can fail if we change it to:
    // val df1 = spark.range(3).toDF()
    // val df2 = df1.filter($"id" > 0).toDF()
    val df1 = spark.range(3)
    val df2 = df1.filter($"id" > 0)

    withSQLConf(
      SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true",
      SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
      assertAmbiguousSelfJoin(df1.join(df2, df1.colRegex("id") > df2.colRegex("id")))
    }
  }
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit tests.

Closes apache#32616 from Ngone51/fix-ambiguous-join.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 34284c0)
Signed-off-by: Wenchen Fan <[email protected]>
….blockmanager.port` in BasicExecutorFeatureStep

### What changes were proposed in this pull request?

most spark conf keys are case sensitive, including `spark.blockManager.port`, we can not get the correct port number with `spark.blockmanager.port`.

This PR changes the wrong key to `spark.blockManager.port` in `BasicExecutorFeatureStep`.

This PR also ensures a fast fail when the port value is invalid for executor containers. When 0 is specified(it is valid as random port, but invalid as a k8s request), it should not be put in the `containerPort` field of executor pod desc. We do not expect executor pods to continuously fail to create because of invalid requests.

### Why are the changes needed?

bugfix

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes apache#32621 from yaooqinn/SPARK-35482.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit d957426)
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?

This PR proposes to avoid wrapping if-else to the constant literals for `percentage` and `accuracy` in `percentile_approx`. They are expected to be literals (or foldable expressions).

Pivot works by two phrase aggregations, and it works with manipulating the input to `null` for non-matched values (pivot column and value).

Note that pivot supports an optimized version without such logic with changing input to `null` for some types (non-nested types basically). So the issue fixed by this PR is only for complex types.

```scala
val df = Seq(
  ("a", -1.0), ("a", 5.5), ("a", 2.5), ("b", 3.0), ("b", 5.2)).toDF("type", "value")
  .groupBy().pivot("type", Seq("a", "b")).agg(
    percentile_approx(col("value"), array(lit(0.5)), lit(10000)))
df.show()
```

**Before:**

```
org.apache.spark.sql.AnalysisException: cannot resolve 'percentile_approx((IF((type <=> CAST('a' AS STRING)), value, CAST(NULL AS DOUBLE))), (IF((type <=> CAST('a' AS STRING)), array(0.5D), NULL)), (IF((type <=> CAST('a' AS STRING)), 10000, CAST(NULL AS INT))))' due to data type mismatch: The accuracy or percentage provided must be a constant literal;
'Aggregate [percentile_approx(if ((type#7 <=> cast(a as string))) value#8 else cast(null as double), if ((type#7 <=> cast(a as string))) array(0.5) else cast(null as array<double>), if ((type#7 <=> cast(a as string))) 10000 else cast(null as int), 0, 0) AS a#16, percentile_approx(if ((type#7 <=> cast(b as string))) value#8 else cast(null as double), if ((type#7 <=> cast(b as string))) array(0.5) else cast(null as array<double>), if ((type#7 <=> cast(b as string))) 10000 else cast(null as int), 0, 0) AS b#18]
+- Project [_1#2 AS type#7, _2#3 AS value#8]
   +- LocalRelation [_1#2, _2#3]
```

**After:**

```
+-----+-----+
|    a|    b|
+-----+-----+
|[2.5]|[3.0]|
+-----+-----+
```

### Why are the changes needed?

To make percentile_approx work with pivot as expected

### Does this PR introduce _any_ user-facing change?

Yes. It threw an exception but now it returns a correct result as shown above.

### How was this patch tested?

Manually tested and unit test was added.

Closes apache#32619 from HyukjinKwon/SPARK-35480.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 1d9f09d)
Signed-off-by: Hyukjin Kwon <[email protected]>
…check

### What changes were proposed in this pull request?

This patch is a followup of SPARK-35463. In SPARK-35463, we output a message to stdout and now we redirect it to stderr.

### Why are the changes needed?

All `echo` statements in `build/mvn` should redirect to stderr if it is not followed by `exit`. It is because we use `build/mvn` to get stdout output by other scripts. If we don't redirect it, we can get invalid output, e.g. got "Skipping checksum because shasum is not installed." as `commons-cli` version.

### Does this PR introduce _any_ user-facing change?

No. Dev only.

### How was this patch tested?

Manually test on internal system.

Closes apache#32637 from viirya/fix-build.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 594ffd2)
Signed-off-by: Dongjoon Hyun <[email protected]>
…IntegrationSuite

### What changes were proposed in this pull request?

This PR fixes an test added in SPARK-35226 (apache#32344).

### Why are the changes needed?

`SELECT 1` seems non-valid query for DB2.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

DB2KrbIntegrationSuite passes on my laptop.

I also confirmed all the KrbIntegrationSuites pass with the following command.
```
build/sbt -Phive -Phive-thriftserver -Pdocker-integration-tests "testOnly org.apache.spark.sql.jdbc.*KrbIntegrationSuite"
```

Closes apache#32632 from sarutak/followup-SPARK-35226.

Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 1a43415)
Signed-off-by: Dongjoon Hyun <[email protected]>
….driver.blockManager.port` as same as other cluster managers

### What changes were proposed in this pull request?

`spark.blockManager.port` does not work for k8s driver pods now, we should make it work as other cluster managers.

### Why are the changes needed?

`spark.blockManager.port` should be able to work for spark driver pod

### Does this PR introduce _any_ user-facing change?

yes, `spark.blockManager.port` will be respect iff it is present  && `spark.driver.blockManager.port` is absent

### How was this patch tested?

new tests

Closes apache#32639 from yaooqinn/SPARK-35493.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 96b0548)
Signed-off-by: Dongjoon Hyun <[email protected]>
…SparkR package

### What changes were proposed in this pull request?
Declare the markdown package as a dependency of the SparkR package

### Why are the changes needed?
If we didn't install pandoc locally, running make-distribution.sh will fail with the following message:
```
— re-building ‘sparkr-vignettes.Rmd’ using rmarkdown
Warning in engine$weave(file, quiet = quiet, encoding = enc) :
Pandoc (>= 1.12.3) not available. Falling back to R Markdown v1.
Error: processing vignette 'sparkr-vignettes.Rmd' failed with diagnostics:
The 'markdown' package should be declared as a dependency of the 'SparkR' package (e.g., in the 'Suggests' field of DESCRIPTION), because the latter contains vignette(s) built with the 'markdown' package. Please see yihui/knitr#1864 for more information.
— failed re-building ‘sparkr-vignettes.Rmd’
```

### Does this PR introduce _any_ user-facing change?
Yes. Workaround for R packaging.

### How was this patch tested?
Manually test. After the fix, the command `sh dev/make-distribution.sh -Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pyarn` in the environment without pandoc will pass.

Closes apache#32270 from xuanyuanking/SPARK-35171.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 8e9e700)
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?

As discussed, update SparkR maintainer for future release.

### Why are the changes needed?

Shivaram will not be able to work with this in the future, so we would like to migrate off the maintainer contact email.

shivaram

Closes apache#32642 from felixcheung/sparkr-maintainer.

Authored-by: Felix Cheung <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 1530876)
Signed-off-by: Dongjoon Hyun <[email protected]>
…xec which generates UnsafeRow for DataSourceV2ScanRelation

### What changes were proposed in this pull request?

This PR fixes an issue that `RemoveRedundantProjects` removes `ProjectExec` which is for generating `UnsafeRow`.
In `DataSourceV2Strategy`, `ProjectExec` will be inserted to ensure internal rows are `UnsafeRow`.

```
  private def withProjectAndFilter(
      project: Seq[NamedExpression],
      filters: Seq[Expression],
      scan: LeafExecNode,
      needsUnsafeConversion: Boolean): SparkPlan = {
    val filterCondition = filters.reduceLeftOption(And)
    val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)

    if (withFilter.output != project || needsUnsafeConversion) {
      ProjectExec(project, withFilter)
    } else {
      withFilter
    }
  }
...
    case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) =>
      // projection and filters were already pushed down in the optimizer.
      // this uses PhysicalOperation to get the projection and ensure that if the batch scan does
      // not support columnar, a projection is added to convert the rows to UnsafeRow.
      val batchExec = BatchScanExec(relation.output, relation.scan)
      withProjectAndFilter(project, filters, batchExec, !batchExec.supportsColumnar) :: Nil
```
So, the hierarchy of the partial tree should be like `ProjectExec(FilterExec(BatchScan))`.
But `RemoveRedundantProjects` doesn't consider this type of hierarchy, leading `ClassCastException`.

A concreate example to reproduce this issue is reported:
```
import scala.collection.JavaConverters._

import org.apache.iceberg.{PartitionSpec, TableProperties}
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession}
import org.apache.spark.sql.internal.SQLConf

class RemoveRedundantProjectsTest extends QueryTest {
  override val spark: SparkSession = SparkSession
    .builder()
    .master("local[4]")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .appName(suiteName)
    .getOrCreate()
  test("RemoveRedundantProjects removes non-redundant projects") {
    withSQLConf(
      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
      SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
      SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") {
      withTempDir { dir =>
        val path = dir.getCanonicalPath
        val data = spark.range(3).toDF
        val table = new HadoopTables().create(
          SparkSchemaUtil.convert(data.schema),
          PartitionSpec.unpartitioned(),
          Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava,
          path)
        data.write.format("iceberg").mode("overwrite").save(path)
        table.refresh()

        val df = spark.read.format("iceberg").load(path)
        val dfX = df.as("x")
        val dfY = df.as("y")
        val join = dfX.filter(dfX("id") > 0).join(dfY, "id")
        join.explain("extended")
        assert(join.count() == 2)
      }
    }
  }
}
```
```
[info] - RemoveRedundantProjects removes non-redundant projects *** FAILED ***
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 4) (xeroxms100.northamerica.corp.microsoft.com executor driver): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow
[info]  at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
[info]  at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
```

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes apache#32606 from sarutak/fix-project-removal-issue.

Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit d4fb983)
Signed-off-by: Wenchen Fan <[email protected]>
… values if elseValue is set

### What changes were proposed in this pull request?

This PR fixes a bug with subexpression elimination for CaseWhen statements. apache#30245 added support for creating subexpressions that are present in all branches of conditional statements. However, for a statement to be in "all branches" of a CaseWhen statement, it must also be in the elseValue.

### Why are the changes needed?

Fix a bug where a subexpression can be created and run for branches of a conditional that don't pass. This can cause issues especially with a UDF in a branch that gets executed assuming the condition is true.

### Does this PR introduce _any_ user-facing change?

Yes, fixes a potential bug where a UDF could be eagerly executed even though it might expect to have already passed some form of validation. For example:
```
val col = when($"id" < 0, myUdf($"id"))
spark.range(1).select(when(col > 0, col)).show()
```

`myUdf($"id")` is considered a subexpression and eagerly evaluated, because it is pulled out as a common expression from both executions of the when clause, but if `id >= 0` it should never actually be run.

### How was this patch tested?

Updated existing test with new case.

Closes apache#32651 from Kimahriman/bug-case-subexpr-elimination-3.1.

Authored-by: Adam Binford <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
…3.1.1 to 3.1.2

### What changes were proposed in this pull request?

Change version the facetFilter of DocSearch from 3.1.1 to 3.1.2

### Why are the changes needed?

As we are going to release 3.1.2, the search result of 3.1.2 documentation should point to the new documentation site instead of 3.1.1
### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Just simple configuration change.

Closes apache#32654 from gengliangwang/docVersion.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…via release-tag.sh

### What changes were proposed in this pull request?

Automatically update version index of DocSearch via release-tag.sh for releasing new documentation site, instead of the current manual update.

### Why are the changes needed?

Simplify the release process.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually run the following command and check the diff
```
R_NEXT_VERSION=3.2.0
sed -i".tmp8" "s/'facetFilters':.*$/'facetFilters': [\"version:$R_NEXT_VERSION\"]/g" docs/_config.yml
```

Closes apache#32662 from gengliangwang/updateDocsearchInRelease.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
(cherry picked from commit 321c654)
Signed-off-by: Gengliang Wang <[email protected]>
### What changes were proposed in this pull request?

Change the type of `DATASET_ID_TAG` from `Long` to `HashSet[Long]` to allow the logical plan to match multiple datasets.

### Why are the changes needed?

During the transformation from one Dataset to another Dataset, the DATASET_ID_TAG of logical plan won't change if the plan itself doesn't change:

https://github.com/apache/spark/blob/b5241c97b17a1139a4ff719bfce7f68aef094d95/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L234-L237

However, dataset id always changes even if the logical plan doesn't change:
https://github.com/apache/spark/blob/b5241c97b17a1139a4ff719bfce7f68aef094d95/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L207-L208

And this can lead to the mismatch between dataset's id and col's __dataset_id. E.g.,

```scala
  test("SPARK-28344: fail ambiguous self join - Dataset.colRegex as column ref") {
    // The test can fail if we change it to:
    // val df1 = spark.range(3).toDF()
    // val df2 = df1.filter($"id" > 0).toDF()
    val df1 = spark.range(3)
    val df2 = df1.filter($"id" > 0)

    withSQLConf(
      SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true",
      SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
      assertAmbiguousSelfJoin(df1.join(df2, df1.colRegex("id") > df2.colRegex("id")))
    }
  }
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit tests.

Closes apache#32692 from Ngone51/spark-35454-3.1.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?

I just noticed that `AdaptiveQueryExecSuite.SPARK-34091: Batch shuffle fetch in AQE partition coalescing` takes more than 10 minutes to finish, which is unacceptable.

This PR sets the shuffle partitions to 10 in that test, so that the test can finish with 5 seconds.

### Why are the changes needed?

speed up the test

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

Closes apache#32695 from cloud-fan/test.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 678592a)
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?

This is a minor change to update how `StateStoreRestoreExec` computes its number of output rows. Previously we only count input rows, but the optionally restored rows are not counted in.

### Why are the changes needed?

Currently the number of output rows of `StateStoreRestoreExec` only counts the each input row. But it actually outputs input rows + optional restored rows. We should provide correct number of output rows.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes apache#32703 from viirya/fix-outputrows.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 73ba449)
Signed-off-by: Hyukjin Kwon <[email protected]>
…g TreeNode to JSON

### What changes were proposed in this pull request?
Handle Currying Product while serializing TreeNode to JSON. While processing [Product](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L820), we may get an assert error for cases like Currying Product because of the mismatch of sizes between field name and field values.
Fallback to use reflection to get all the values for constructor parameters when we  meet such cases.

### Why are the changes needed?
Avoid throwing error while serializing TreeNode to JSON, try to output as much information as possible.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New UT case added.

Closes apache#32713 from ivoson/SPARK-35411-followup.

Authored-by: Tengfei Huang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
This PR proposes to support R 4.1.0+ in SparkR. Currently the tests are being failed as below:

```
══ Failed ══════════════════════════════════════════════════════════════════════
── 1. Failure (test_sparkSQL_arrow.R:71:3): createDataFrame/collect Arrow optimi
collect(createDataFrame(rdf)) not equal to `expected`.
Component “g”: 'tzone' attributes are inconsistent ('UTC' and '')

── 2. Failure (test_sparkSQL_arrow.R:143:3): dapply() Arrow optimization - type
collect(ret) not equal to `rdf`.
Component “b”: 'tzone' attributes are inconsistent ('UTC' and '')

── 3. Failure (test_sparkSQL_arrow.R:229:3): gapply() Arrow optimization - type
collect(ret) not equal to `rdf`.
Component “b”: 'tzone' attributes are inconsistent ('UTC' and '')

── 4. Error (test_sparkSQL.R:1454:3): column functions ─────────────────────────
Error: (converted from warning) cannot xtfrm data frames
Backtrace:
  1. base::sort(collect(distinct(select(df, input_file_name())))) test_sparkSQL.R:1454:2
  2. base::sort.default(collect(distinct(select(df, input_file_name()))))
  5. base::order(x, na.last = na.last, decreasing = decreasing)
  6. base::lapply(z, function(x) if (is.object(x)) as.vector(xtfrm(x)) else x)
  7. base:::FUN(X[[i]], ...)
 10. base::xtfrm.data.frame(x)

── 5. Failure (test_utils.R:67:3): cleanClosure on R functions ─────────────────
`actual` not equal to `g`.
names for current but not for target
Length mismatch: comparison on first 0 components

── 6. Failure (test_utils.R:80:3): cleanClosure on R functions ─────────────────
`actual` not equal to `g`.
names for current but not for target
Length mismatch: comparison on first 0 components
```

It fixes three as below:

- Avoid a sort on DataFrame which isn't legitimate: apache#32709 (comment)
- Treat the empty timezone and local timezone as equivalent in SparkR: apache#32709 (comment)
- Disable `check.environment` in the cleaned closure comparison (enabled by default from R 4.1+, https://cran.r-project.org/doc/manuals/r-release/NEWS.html), and keep the test as is apache#32709 (comment)

Higher R versions have bug fixes and improvements. More importantly R users tend to use highest R versions.

Yes, SparkR will work together with R 4.1.0+

```bash
./R/run-tests.sh
```

```
sparkSQL_arrow:
SparkSQL Arrow optimization: .................

...

sparkSQL:
SparkSQL functions: ........................................................................................................................................................................................................
........................................................................................................................................................................................................
........................................................................................................................................................................................................
........................................................................................................................................................................................................
........................................................................................................................................................................................................
........................................................................................................................................................................................................

...

utils:
functions in utils.R: ..............................................
```

Closes apache#32709 from HyukjinKwon/SPARK-35573.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 1ba1b70)
Signed-off-by: Hyukjin Kwon <[email protected]>
…t command

### What changes were proposed in this pull request?

Currently, the results of following SQL queries are not redacted:
```
SET [KEY];
SET;
```
For example:

```
scala> spark.sql("set javax.jdo.option.ConnectionPassword=123456").show()
+--------------------+------+
|                 key| value|
+--------------------+------+
|javax.jdo.option....|123456|
+--------------------+------+

scala> spark.sql("set javax.jdo.option.ConnectionPassword").show()
+--------------------+------+
|                 key| value|
+--------------------+------+
|javax.jdo.option....|123456|
+--------------------+------+

scala> spark.sql("set").show()
+--------------------+--------------------+
|                 key|               value|
+--------------------+--------------------+
|javax.jdo.option....|              123456|

```

We should hide the sensitive information and redact the query output.

### Why are the changes needed?

Security.

### Does this PR introduce _any_ user-facing change?

Yes, the sensitive information in the output of Set commands are redacted

### How was this patch tested?

Unit test

Closes apache#32720 from gengliangwang/cherry-pick-set-3.1.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
### What changes were proposed in this pull request?
```
- Upload multi stages *** FAILED ***
{{ The code passed to eventually never returned normally. Attempted 20 times over 10.011176743 seconds. Last failure message: fallbackStorage.exists(0, file) was false. (FallbackStorageSuite.scala:243)}}
```
The error like above was raised in aarch64 randomly and also in github action test[1][2].

[1] https://github.com/apache/spark/actions/runs/489319612
[2]https://github.com/apache/spark/actions/runs/479317320

### Why are the changes needed?
timeout is too short, need to increase to let test case complete.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
build/mvn test -Dtest=none -DwildcardSuites=org.apache.spark.storage.FallbackStorageSuite -pl :spark-core_2.12

Closes apache#32719 from Yikun/SPARK-35584.

Authored-by: Yikun Jiang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit d773373)
Signed-off-by: Dongjoon Hyun <[email protected]>
…stop shutdown hook

### What changes were proposed in this pull request?

Fixing the memory leak by deregistering the shutdown hook when the executor is stopped. This way the Garbage Collector can release the executor object early. Which is a huge win for our tests as user's classloader could be also released which keeps references to objects which are created for the jars on the classpath.

### Why are the changes needed?

I have identified this leak by running the Livy tests (I know it is close to the attic but this leak causes a constant OOM there) and it is in our Spark unit tests as well.

This leak can be identified by checking the number of `LeakyEntry` in case of Scala 2.12.14 (and `ZipEntry` for Scala 2.12.10) instances which with its related data can take up a considerable amount of memory (as those are created from the jars which are on the classpath).

I have my own tool for instrumenting JVM code [trace-agent](https://github.com/attilapiros/trace-agent) and with that I am able to call JVM diagnostic commands at specific methods. Let me show how it in action.

It has a single text file embedded into the tool's jar called action.txt.
In this case actions.txt content is:

{noformat}
$ unzip -q -c trace-agent-0.0.7.jar actions.txt
diagnostic_command org.apache.spark.repl.ReplSuite runInterpreter  cmd:gcClassHistogram,limit_output_lines:8,where:beforeAndAfter,with_gc:true
diagnostic_command org.apache.spark.repl.ReplSuite afterAll  cmd:gcClassHistogram,limit_output_lines:8,where:after,with_gc:true
{noformat}

Which creates a class histogram at the beginning and at the end of `org.apache.spark.repl.ReplSuite#runInterpreter()` (after triggering a GC which might not finish as GC is done in a separate thread..) and one histogram in the end of the `org.apache.spark.repl.ReplSuite#afterAll()` method.

And the histograms are the followings on master branch:

```
$ ./build/sbt ";project repl;set Test/javaOptions += \"-javaagent:/Users/attilazsoltpiros/git/attilapiros/memoryLeak/trace-agent-0.0.7.jar\"; testOnly" |grep "ZipEntry\|LeakyEntry"
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   3:        394178       18920544  scala.reflect.io.FileZipArchive$LeakyEntry
   3:        394178       18920544  scala.reflect.io.FileZipArchive$LeakyEntry
   3:        591267       28380816  scala.reflect.io.FileZipArchive$LeakyEntry
   3:        591267       28380816  scala.reflect.io.FileZipArchive$LeakyEntry
   3:        788356       37841088  scala.reflect.io.FileZipArchive$LeakyEntry
   3:        788356       37841088  scala.reflect.io.FileZipArchive$LeakyEntry
   3:        985445       47301360  scala.reflect.io.FileZipArchive$LeakyEntry
   3:        985445       47301360  scala.reflect.io.FileZipArchive$LeakyEntry
   3:       1182534       56761632  scala.reflect.io.FileZipArchive$LeakyEntry
   3:       1182534       56761632  scala.reflect.io.FileZipArchive$LeakyEntry
   3:       1379623       66221904  scala.reflect.io.FileZipArchive$LeakyEntry
   3:       1379623       66221904  scala.reflect.io.FileZipArchive$LeakyEntry
   3:       1576712       75682176  scala.reflect.io.FileZipArchive$LeakyEntry
```

Where the header of the table is:

```
num     #instances         #bytes  class name
```

So the `LeakyEntry` in the end is about 75MB (173MB in case of Scala 2.12.10 and before for another class called `ZipEntry`) but the first item (a char/byte arrays) and the second item (strings) in the histogram also relates to this leak:

```
$ ./build/sbt ";project repl;set Test/javaOptions += \"-javaagent:/Users/attilazsoltpiros/git/attilapiros/memoryLeak/trace-agent-0.0.7.jar\"; testOnly" |grep "1:\|2:\|3:"
   1:          2701        3496112  [B
   2:         21855        2607192  [C
   3:          4885         537264  java.lang.Class
   1:        480323       55970208  [C
   2:        480499       11531976  java.lang.String
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   1:        481825       56148024  [C
   2:        481998       11567952  java.lang.String
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   1:        487056       57550344  [C
   2:        487179       11692296  java.lang.String
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   1:        487054       57551008  [C
   2:        487176       11692224  java.lang.String
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   1:        927823      107139160  [C
   2:        928072       22273728  java.lang.String
   3:        394178       18920544  scala.reflect.io.FileZipArchive$LeakyEntry
   1:        927793      107129328  [C
   2:        928041       22272984  java.lang.String
   3:        394178       18920544  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       1361851      155555608  [C
   2:       1362261       32694264  java.lang.String
   3:        591267       28380816  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       1361683      155493464  [C
   2:       1362092       32690208  java.lang.String
   3:        591267       28380816  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       1803074      205157728  [C
   2:       1803268       43278432  java.lang.String
   3:        788356       37841088  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       1802385      204938224  [C
   2:       1802579       43261896  java.lang.String
   3:        788356       37841088  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       2236631      253636592  [C
   2:       2237029       53688696  java.lang.String
   3:        985445       47301360  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       2236536      253603008  [C
   2:       2236933       53686392  java.lang.String
   3:        985445       47301360  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       2668892      301893920  [C
   2:       2669510       64068240  java.lang.String
   3:       1182534       56761632  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       2668759      301846376  [C
   2:       2669376       64065024  java.lang.String
   3:       1182534       56761632  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       3101238      350101048  [C
   2:       3102073       74449752  java.lang.String
   3:       1379623       66221904  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       3101240      350101104  [C
   2:       3102075       74449800  java.lang.String
   3:       1379623       66221904  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       3533785      398371760  [C
   2:       3534835       84836040  java.lang.String
   3:       1576712       75682176  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       3533759      398367088  [C
   2:       3534807       84835368  java.lang.String
   3:       1576712       75682176  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       3967049      446893400  [C
   2:       3968314       95239536  java.lang.String
   3:       1773801       85142448  scala.reflect.io.FileZipArchive$LeakyEntry
[info] - SPARK-26633: ExecutorClassLoader.getResourceAsStream find REPL classes (8 seconds, 248 milliseconds)
Setting default log level to "ERROR".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
   1:       3966423      446709584  [C
   2:       3967682       95224368  java.lang.String
   3:       1773801       85142448  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       4399583      495097208  [C
   2:       4401050      105625200  java.lang.String
   3:       1970890       94602720  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       4399578      495070064  [C
   2:       4401040      105624960  java.lang.String
   3:       1970890       94602720  scala.reflect.io.FileZipArchive$LeakyEntry
```

The last three is about 700MB altogether.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

I used the trace-agent tool with the same settings for the modified code:

```
$ ./build/sbt ";project repl;set Test/javaOptions += \"-javaagent:/Users/attilazsoltpiros/git/attilapiros/memoryLeak/trace-agent-0.0.7.jar\"; testOnly" |grep "1:\|2:\|3:"
   1:          2701        3496112  [B
   2:         21855        2607192  [C
   3:          4885         537264  java.lang.Class
   1:        480323       55970208  [C
   2:        480499       11531976  java.lang.String
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   1:        481825       56148024  [C
   2:        481998       11567952  java.lang.String
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   1:        487056       57550344  [C
   2:        487179       11692296  java.lang.String
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   1:        487054       57551008  [C
   2:        487176       11692224  java.lang.String
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   1:        927823      107139160  [C
   2:        928072       22273728  java.lang.String
   3:        394178       18920544  scala.reflect.io.FileZipArchive$LeakyEntry
   1:        927793      107129328  [C
   2:        928041       22272984  java.lang.String
   3:        394178       18920544  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       1361851      155555608  [C
   2:       1362261       32694264  java.lang.String
   3:        591267       28380816  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       1361683      155493464  [C
   2:       1362092       32690208  java.lang.String
   3:        591267       28380816  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       1803074      205157728  [C
   2:       1803268       43278432  java.lang.String
   3:        788356       37841088  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       1802385      204938224  [C
   2:       1802579       43261896  java.lang.String
   3:        788356       37841088  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       2236631      253636592  [C
   2:       2237029       53688696  java.lang.String
   3:        985445       47301360  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       2236536      253603008  [C
   2:       2236933       53686392  java.lang.String
   3:        985445       47301360  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       2668892      301893920  [C
   2:       2669510       64068240  java.lang.String
   3:       1182534       56761632  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       2668759      301846376  [C
   2:       2669376       64065024  java.lang.String
   3:       1182534       56761632  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       3101238      350101048  [C
   2:       3102073       74449752  java.lang.String
   3:       1379623       66221904  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       3101240      350101104  [C
   2:       3102075       74449800  java.lang.String
   3:       1379623       66221904  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       3533785      398371760  [C
   2:       3534835       84836040  java.lang.String
   3:       1576712       75682176  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       3533759      398367088  [C
   2:       3534807       84835368  java.lang.String
   3:       1576712       75682176  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       3967049      446893400  [C
   2:       3968314       95239536  java.lang.String
   3:       1773801       85142448  scala.reflect.io.FileZipArchive$LeakyEntry
[info] - SPARK-26633: ExecutorClassLoader.getResourceAsStream find REPL classes (8 seconds, 248 milliseconds)
Setting default log level to "ERROR".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
   1:       3966423      446709584  [C
   2:       3967682       95224368  java.lang.String
   3:       1773801       85142448  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       4399583      495097208  [C
   2:       4401050      105625200  java.lang.String
   3:       1970890       94602720  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       4399578      495070064  [C
   2:       4401040      105624960  java.lang.String
   3:       1970890       94602720  scala.reflect.io.FileZipArchive$LeakyEntry
[success] Total time: 174 s (02:54), completed Jun 2, 2021 2:00:43 PM
╭─attilazsoltpirosapiros-MBP16 ~/git/attilapiros/memoryLeak ‹SPARK-35610*›
╰─$ vim
╭─attilazsoltpirosapiros-MBP16 ~/git/attilapiros/memoryLeak ‹SPARK-35610*›
╰─$ ./build/sbt ";project repl;set Test/javaOptions += \"-javaagent:/Users/attilazsoltpiros/git/attilapiros/memoryLeak/trace-agent-0.0.7.jar\"; testOnly" |grep "1:\|2:\|3:"
   1:          2685        3457368  [B
   2:         21833        2606712  [C
   3:          4885         537264  java.lang.Class
   1:        480245       55978400  [C
   2:        480421       11530104  java.lang.String
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   1:        480460       56005784  [C
   2:        480633       11535192  java.lang.String
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   1:        486643       57537784  [C
   2:        486766       11682384  java.lang.String
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   1:        486636       57538192  [C
   2:        486758       11682192  java.lang.String
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   1:        501208       60411856  [C
   2:        501180       12028320  java.lang.String
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   1:        501206       60412960  [C
   2:        501177       12028248  java.lang.String
   3:        197089        9460272  scala.reflect.io.FileZipArchive$LeakyEntry
   1:        934925      108773320  [C
   2:        935058       22441392  java.lang.String
   3:        394178       18920544  scala.reflect.io.FileZipArchive$LeakyEntry
   1:        934912      108769528  [C
   2:        935044       22441056  java.lang.String
   3:        394178       18920544  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       1370351      156901296  [C
   2:       1370318       32887632  java.lang.String
   3:        591267       28380816  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       1369660      156681680  [C
   2:       1369627       32871048  java.lang.String
   3:        591267       28380816  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       1803746      205383136  [C
   2:       1803917       43294008  java.lang.String
   3:        788356       37841088  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       1803658      205353096  [C
   2:       1803828       43291872  java.lang.String
   3:        788356       37841088  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       2235677      253608240  [C
   2:       2236068       53665632  java.lang.String
   3:        985445       47301360  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       2235539      253560088  [C
   2:       2235929       53662296  java.lang.String
   3:        985445       47301360  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       2667775      301799240  [C
   2:       2668383       64041192  java.lang.String
   3:       1182534       56761632  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       2667765      301798568  [C
   2:       2668373       64040952  java.lang.String
   3:       1182534       56761632  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       2666665      301491096  [C
   2:       2667285       64014840  java.lang.String
   3:       1182534       56761632  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       2666648      301490792  [C
   2:       2667266       64014384  java.lang.String
   3:       1182534       56761632  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       2668169      301833032  [C
   2:       2668782       64050768  java.lang.String
   3:       1182534       56761632  scala.reflect.io.FileZipArchive$LeakyEntry
[info] - SPARK-26633: ExecutorClassLoader.getResourceAsStream find REPL classes (6 seconds, 396 milliseconds)
Setting default log level to "ERROR".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
   1:       2235495      253419952  [C
   2:       2235887       53661288  java.lang.String
   3:        985445       47301360  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       2668379      301800768  [C
   2:       2668979       64055496  java.lang.String
   3:       1182534       56761632  scala.reflect.io.FileZipArchive$LeakyEntry
   1:       2236123      253522640  [C
   2:       2236514       53676336  java.lang.String
   3:        985445       47301360  scala.reflect.io.FileZipArchive$LeakyEntry
```

The sum of the last three numbers is about 354MB.

Closes apache#32748 from attilapiros/SPARK-35610.

Authored-by: attilapiros <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 806edf8)
Signed-off-by: Dongjoon Hyun <[email protected]>
HyukjinKwon and others added 11 commits March 24, 2022 12:13
…Utils.unpack

### What changes were proposed in this pull request?

This PR proposes to use `FileUtil.unTarUsingJava` that is a Java implementation for un-tar `.tar` files. `unTarUsingJava` is not public but it exists in all Hadoop versions from 2.1+, see HADOOP-9264.

The security issue reproduction requires a non-Windows platform, and a non-gzipped TAR archive file name (contents don't matter).

### Why are the changes needed?

There is a risk for arbitrary shell command injection via `Utils.unpack` when the filename is controlled by a malicious user. This is due to an issue in Hadoop's `unTar`, that is not properly escaping the filename before passing to a shell command:https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java#L904

### Does this PR introduce _any_ user-facing change?

Yes, it prevents a security issue that, previously, allowed users to execute arbitrary shall command.

### How was this patch tested?

Manually tested in local, and existing test cases should cover.

Closes apache#35946 from HyukjinKwon/SPARK-38631.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 057c051)
Signed-off-by: Hyukjin Kwon <[email protected]>
…iteral

This is a backport of apache#35878  to branch 3.1.

The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column.

For example, the sql in the test case will generate such a physical plan when the adaptive is closed:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
   :  :     :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=apache#336]
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  :              +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=apache#336]
   :  :                 +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=apache#335]
   :  :                    +- *(1) Project [store_id#5291, state_province#5292]
   :  :                       +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
   :  :                          +- *(1) ColumnarToRow
   :  :                             +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
   :        :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=apache#336]
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   :                 +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=apache#336]
   +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=apache#335]
```
after this pr:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=apache#326]
      +- *(3) Project [store_id#5291, state_province#5292]
         +- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
```

Execution performance improvement

No

Added unit test

Closes apache#35967 from mcdull-zhang/spark_38570_3.2.

Authored-by: mcdull-zhang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
(cherry picked from commit 8621914)
Signed-off-by: Yuming Wang <[email protected]>
…building project list in `ExtractGenerator`

Backport of apache#35837.

When building the project list from an aggregate sequence in `ExtractGenerator`, convert the aggregate sequence to an `IndexedSeq` before performing the flatMap operation.

This query fails with a `NullPointerException`:
```
val df = Seq(1, 2, 3).toDF("v")
df.select(Stream(explode(array(min($"v"), max($"v"))), sum($"v")): _*).collect
```
If you change `Stream` to `Seq`, then it succeeds.

`ExtractGenerator` uses a flatMap operation over `aggList` for two purposes:

- To produce a new aggregate list
- to update `projectExprs` (which is initialized as an array of nulls).

When `aggList` is a `Stream`, the flatMap operation evaluates lazily, so all entries in `projectExprs` after the first will still be null when the rule completes.

Changing `aggList` to an `IndexedSeq` forces the flatMap to evaluate eagerly.

No

New unit test

Closes apache#35851 from bersprockets/generator_aggregate_issue_32.

Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 7842621)
Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request?

This PR replaces `new Path(fileUri.getPath)` with `new Path(fileUri)`.
By using `Path` class constructor with URI parameter, we can preserve file scheme.

### Why are the changes needed?

If we use, `Path` class constructor with `String` parameter, it loses file scheme information.
Although the original code works so far, it fails at Apache Hadoop 3.3.2 and breaks dependency upload feature which is covered by K8s Minikube integration tests.

```scala
test("uploadFileUri") {
   val fileUri = org.apache.spark.util.Utils.resolveURI("/tmp/1.txt")
   assert(new Path(fileUri).toString == "file:/private/tmp/1.txt")
   assert(new Path(fileUri.getPath).toString == "/private/tmp/1.txt")
}
```

### Does this PR introduce _any_ user-facing change?

No, this will prevent a regression at Apache Spark 3.3.0 instead.

### How was this patch tested?

Pass the CIs.

In addition, this PR and apache#36009 will recover K8s IT `DepsTestsSuite`.

Closes apache#36010 from dongjoon-hyun/SPARK-38652.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit cab8aa1)
Signed-off-by: Dongjoon Hyun <[email protected]>
…function in Executor

It is master branch pr [SPARK-38333](apache#35662)

Bug fix, it is potential issue.

No

UT

Closes apache#36012 from monkeyboy123/spark-38333.

Authored-by: Dereck Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit a40acd4)
Signed-off-by: Wenchen Fan <[email protected]>
…ntExprs function instead of getExprState at SubexpressionEliminationSuite

### What changes were proposed in this pull request?

This pr use EquivalentExpressions getEquivalentExprs function instead of getExprState at SubexpressionEliminationSuite, and remove cpus paramter.
### Why are the changes needed?

Fixes build error

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

CI Tests

Closes apache#36033 from monkeyboy123/SPARK-38754.

Authored-by: Dereck Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
…DownloadCallback caused by Log4j

### What changes were proposed in this pull request?

While `log4j.ignoreTCL/log4j2.ignoreTCL` is false, which is the default, it uses the context ClassLoader for the current Thread, see `org.apache.logging.log4j.util.LoaderUtil.loadClass`. While ExecutorClassLoader try to loadClass through remotely though the FileDownload, if error occurs, we will long on debug level, and `log4j...LoaderUtil` will be blocked by ExecutorClassLoader acquired classloading lock.

Fortunately, it only happens when ThresholdFilter's level is `debug`.

or we can set `log4j.ignoreTCL/log4j2.ignoreTCL` to true, but I don't know what else it will cause.

So in this PR, I simply remove the debug log which cause this deadlock

### Why are the changes needed?

fix deadlock

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

apache/kyuubi#2046 (comment), with a ut in kyuubi project, resolved(https://github.com/apache/incubator-kyuubi/actions/runs/1950222737)

### Additional Resources

[ut.jstack.txt](https://github.com/apache/spark/files/8206457/ut.jstack.txt)

Closes apache#35765 from yaooqinn/SPARK-38446.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit aef6745)
Signed-off-by: Dongjoon Hyun <[email protected]>
…h a documentation fix

### What changes were proposed in this pull request?

This PR proposes two minor changes:
- Fixes the example at `Dataset.observe(String, ...)`
- Adds `varargs` to be consistent with another overloaded version: `Dataset.observe(Observation, ..)`

### Why are the changes needed?

To provide a correct example, support Java APIs properly with `varargs` and API consistency.

### Does this PR introduce _any_ user-facing change?

Yes, the example is fixed in the documentation. Additionally Java users should be able to use `Dataset.observe(String, ..)` per `varargs`.

### How was this patch tested?

Manually tested. CI should verify the changes too.

Closes apache#36084 from HyukjinKwon/minor-docs.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit fb3f380)
Signed-off-by: Hyukjin Kwon <[email protected]>
… ..) with a documentation fix"

This reverts commit dea607f.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment