forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sk ilya patch 1 #2
Draft
sk-ilya
wants to merge
763
commits into
master
Choose a base branch
from
sk-ilya-patch-1
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
… up cached exchanges for re-use ### What changes were proposed in this pull request? AQE has an optimization where it attempts to reuse compatible exchanges but it does not take into account whether the exchanges are columnar or not, resulting in incorrect reuse under some circumstances. This PR simply changes the key used to lookup cached stages. It now uses the canonicalized form of the new query stage (potentially created by a plugin) rather than using the canonicalized form of the original exchange. ### Why are the changes needed? When using the [RAPIDS Accelerator for Apache Spark](https://github.com/NVIDIA/spark-rapids) we sometimes see a new query stage correctly create a row-based exchange and then Spark replaces it with a cached columnar exchange, which is not compatible, and this causes queries to fail. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The patch has been tested with the query that highlighted this issue. I looked at writing unit tests for this but it would involve implementing a mock columnar exchange in the tests so would be quite a bit of work. If anyone has ideas on other ways to test this I am happy to hear them. Closes apache#32195 from andygrove/SPARK-35093. Authored-by: Andy Grove <[email protected]> Signed-off-by: Thomas Graves <[email protected]> (cherry picked from commit 52e3cf9) Signed-off-by: Thomas Graves <[email protected]>
### What changes were proposed in this pull request? `./build/mvn` now downloads the .sha512 checksum of Maven artifacts it downloads, and checks the checksum after download. ### Why are the changes needed? This ensures the integrity of the Maven artifact during a user's build, which may come from several non-ASF mirrors. ### Does this PR introduce _any_ user-facing change? Should not affect anything about Spark per se, just the build. ### How was this patch tested? Manual testing wherein I forced Maven/Scala download, verified checksums are downloaded and checked, and verified it fails on error with a corrupted checksum. Closes apache#32505 from srowen/SPARK-35373. Authored-by: Sean Owen <[email protected]> Signed-off-by: Sean Owen <[email protected]>
### What changes were proposed in this pull request? This PR is a follow-up of apache#32505 to fix `zinc` installation. ### Why are the changes needed? Currently, branch-3.1/3.0 is broken. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the GitHub Action. Closes apache#32591 from dongjoon-hyun/SPARK-35373. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…n build/mvn ### What changes were proposed in this pull request? change $(command -v curl) to "$(command -v curl)" ### Why are the changes needed? We need change $(command -v curl) to "$(command -v curl)" to make sure it work when `curl` or `wget` is uninstall. othewise raised: `build/mvn: line 56: [: /root/spark/build/apache-maven-3.6.3-bin.tar.gz: binary operator expected` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ``` apt remove curl rm -f build/apache-maven-3.6.3-bin.tar.gz rm -r build/apache-maven-3.6.3-bin mvn -v ``` Closes apache#32608 from Yikun/patch-6. Authored-by: Yikun Jiang <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 3c3533d) Signed-off-by: Sean Owen <[email protected]>
## What changes were proposed in this pull request? Use ` > /dev/null` to replace `-q` in shasum validation. ### Why are the changes needed? In PR apache#32505 , added the shasum check on maven. The `shasum -a 512 -q -c xxx.sha` is used to validate checksum, the `-q` args is for "don't print OK for each successfully verified file", but `-q` arg is introduce in shasum 6.x version. So we got the `Unknown option: q`. ``` ➜ ~ uname -a Darwin MacBook.local 19.6.0 Darwin Kernel Version 19.6.0: Mon Apr 12 20:57:45 PDT 2021; root:xnu-6153.141.28.1~1/RELEASE_X86_64 x86_64 ➜ ~ shasum -v 5.84 ➜ ~ shasum -q Unknown option: q Type shasum -h for help ``` it makes ARM CI failed: [1] https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-arm/ ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `shasum -a 512 -c wrong.sha > /dev/null` return code 1 without print `shasum -a 512 -c right.sha > /dev/null` return code 0 without print e2e test: ``` rm -f build/apache-maven-3.6.3-bin.tar.gz rm -r build/apache-maven-3.6.3-bin mvn -v ``` Closes apache#32604 from Yikun/patch-5. Authored-by: Yikun Jiang <[email protected]> Signed-off-by: Sean Owen <[email protected]> (cherry picked from commit 38fbc0b) Signed-off-by: Sean Owen <[email protected]>
### What changes were proposed in this pull request? Not every build system has `shasum`. This PR aims to skip checksum checks on a system without `shasum`. ### Why are the changes needed? **PREPARE** ``` $ docker run -it --rm -v $PWD:/spark openjdk:11-slim /bin/bash roota0e001a6e50f:/# cd /spark/ roota0e001a6e50f:/spark# apt-get update roota0e001a6e50f:/spark# apt-get install curl roota0e001a6e50f:/spark# build/mvn clean ``` **BEFORE (Failure due to `command not found`)** ``` roota0e001a6e50f:/spark# build/mvn clean exec: curl --silent --show-error -L https://downloads.lightbend.com/scala/2.12.10/scala-2.12.10.tgz exec: curl --silent --show-error -L https://www.apache.org/dyn/closer.lua/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz?action=download exec: curl --silent --show-error -L https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz.sha512 Veryfing checksum from /spark/build/apache-maven-3.6.3-bin.tar.gz.sha512 build/mvn: line 81: shasum: command not found Bad checksum from https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz.sha512 ``` **AFTER** ``` roota0e001a6e50f:/spark# build/mvn clean exec: curl --silent --show-error -L https://downloads.lightbend.com/scala/2.12.10/scala-2.12.10.tgz Skipping checksum because shasum is not installed. exec: curl --silent --show-error -L https://www.apache.org/dyn/closer.lua/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz?action=download exec: curl --silent --show-error -L https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz.sha512 Skipping checksum because shasum is not installed. Using `mvn` from path: /spark/build/apache-maven-3.6.3/bin/mvn ``` ### Does this PR introduce _any_ user-facing change? Yes, this will recover the build. ### How was this patch tested? Manually with the above process. Closes apache#32613 from dongjoon-hyun/SPARK-35463. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 8e13b8c) Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? Change the type of `DATASET_ID_TAG` from `Long` to `HashSet[Long]` to allow the logical plan to match multiple datasets. ### Why are the changes needed? During the transformation from one Dataset to another Dataset, the DATASET_ID_TAG of logical plan won't change if the plan itself doesn't change: https://github.com/apache/spark/blob/b5241c97b17a1139a4ff719bfce7f68aef094d95/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L234-L237 However, dataset id always changes even if the logical plan doesn't change: https://github.com/apache/spark/blob/b5241c97b17a1139a4ff719bfce7f68aef094d95/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L207-L208 And this can lead to the mismatch between dataset's id and col's __dataset_id. E.g., ```scala test("SPARK-28344: fail ambiguous self join - Dataset.colRegex as column ref") { // The test can fail if we change it to: // val df1 = spark.range(3).toDF() // val df2 = df1.filter($"id" > 0).toDF() val df1 = spark.range(3) val df2 = df1.filter($"id" > 0) withSQLConf( SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true", SQLConf.CROSS_JOINS_ENABLED.key -> "true") { assertAmbiguousSelfJoin(df1.join(df2, df1.colRegex("id") > df2.colRegex("id"))) } } ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit tests. Closes apache#32616 from Ngone51/fix-ambiguous-join. Authored-by: yi.wu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 34284c0) Signed-off-by: Wenchen Fan <[email protected]>
….blockmanager.port` in BasicExecutorFeatureStep ### What changes were proposed in this pull request? most spark conf keys are case sensitive, including `spark.blockManager.port`, we can not get the correct port number with `spark.blockmanager.port`. This PR changes the wrong key to `spark.blockManager.port` in `BasicExecutorFeatureStep`. This PR also ensures a fast fail when the port value is invalid for executor containers. When 0 is specified(it is valid as random port, but invalid as a k8s request), it should not be put in the `containerPort` field of executor pod desc. We do not expect executor pods to continuously fail to create because of invalid requests. ### Why are the changes needed? bugfix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes apache#32621 from yaooqinn/SPARK-35482. Authored-by: Kent Yao <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit d957426) Signed-off-by: Dongjoon Hyun <[email protected]>
… ids" This reverts commit 4c14770.
### What changes were proposed in this pull request? This PR proposes to avoid wrapping if-else to the constant literals for `percentage` and `accuracy` in `percentile_approx`. They are expected to be literals (or foldable expressions). Pivot works by two phrase aggregations, and it works with manipulating the input to `null` for non-matched values (pivot column and value). Note that pivot supports an optimized version without such logic with changing input to `null` for some types (non-nested types basically). So the issue fixed by this PR is only for complex types. ```scala val df = Seq( ("a", -1.0), ("a", 5.5), ("a", 2.5), ("b", 3.0), ("b", 5.2)).toDF("type", "value") .groupBy().pivot("type", Seq("a", "b")).agg( percentile_approx(col("value"), array(lit(0.5)), lit(10000))) df.show() ``` **Before:** ``` org.apache.spark.sql.AnalysisException: cannot resolve 'percentile_approx((IF((type <=> CAST('a' AS STRING)), value, CAST(NULL AS DOUBLE))), (IF((type <=> CAST('a' AS STRING)), array(0.5D), NULL)), (IF((type <=> CAST('a' AS STRING)), 10000, CAST(NULL AS INT))))' due to data type mismatch: The accuracy or percentage provided must be a constant literal; 'Aggregate [percentile_approx(if ((type#7 <=> cast(a as string))) value#8 else cast(null as double), if ((type#7 <=> cast(a as string))) array(0.5) else cast(null as array<double>), if ((type#7 <=> cast(a as string))) 10000 else cast(null as int), 0, 0) AS a#16, percentile_approx(if ((type#7 <=> cast(b as string))) value#8 else cast(null as double), if ((type#7 <=> cast(b as string))) array(0.5) else cast(null as array<double>), if ((type#7 <=> cast(b as string))) 10000 else cast(null as int), 0, 0) AS b#18] +- Project [_1#2 AS type#7, _2#3 AS value#8] +- LocalRelation [_1#2, _2#3] ``` **After:** ``` +-----+-----+ | a| b| +-----+-----+ |[2.5]|[3.0]| +-----+-----+ ``` ### Why are the changes needed? To make percentile_approx work with pivot as expected ### Does this PR introduce _any_ user-facing change? Yes. It threw an exception but now it returns a correct result as shown above. ### How was this patch tested? Manually tested and unit test was added. Closes apache#32619 from HyukjinKwon/SPARK-35480. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 1d9f09d) Signed-off-by: Hyukjin Kwon <[email protected]>
This reverts commit 612e3b5.
…check ### What changes were proposed in this pull request? This patch is a followup of SPARK-35463. In SPARK-35463, we output a message to stdout and now we redirect it to stderr. ### Why are the changes needed? All `echo` statements in `build/mvn` should redirect to stderr if it is not followed by `exit`. It is because we use `build/mvn` to get stdout output by other scripts. If we don't redirect it, we can get invalid output, e.g. got "Skipping checksum because shasum is not installed." as `commons-cli` version. ### Does this PR introduce _any_ user-facing change? No. Dev only. ### How was this patch tested? Manually test on internal system. Closes apache#32637 from viirya/fix-build. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 594ffd2) Signed-off-by: Dongjoon Hyun <[email protected]>
…IntegrationSuite ### What changes were proposed in this pull request? This PR fixes an test added in SPARK-35226 (apache#32344). ### Why are the changes needed? `SELECT 1` seems non-valid query for DB2. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? DB2KrbIntegrationSuite passes on my laptop. I also confirmed all the KrbIntegrationSuites pass with the following command. ``` build/sbt -Phive -Phive-thriftserver -Pdocker-integration-tests "testOnly org.apache.spark.sql.jdbc.*KrbIntegrationSuite" ``` Closes apache#32632 from sarutak/followup-SPARK-35226. Authored-by: Kousuke Saruta <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 1a43415) Signed-off-by: Dongjoon Hyun <[email protected]>
….driver.blockManager.port` as same as other cluster managers ### What changes were proposed in this pull request? `spark.blockManager.port` does not work for k8s driver pods now, we should make it work as other cluster managers. ### Why are the changes needed? `spark.blockManager.port` should be able to work for spark driver pod ### Does this PR introduce _any_ user-facing change? yes, `spark.blockManager.port` will be respect iff it is present && `spark.driver.blockManager.port` is absent ### How was this patch tested? new tests Closes apache#32639 from yaooqinn/SPARK-35493. Authored-by: Kent Yao <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 96b0548) Signed-off-by: Dongjoon Hyun <[email protected]>
…SparkR package ### What changes were proposed in this pull request? Declare the markdown package as a dependency of the SparkR package ### Why are the changes needed? If we didn't install pandoc locally, running make-distribution.sh will fail with the following message: ``` — re-building ‘sparkr-vignettes.Rmd’ using rmarkdown Warning in engine$weave(file, quiet = quiet, encoding = enc) : Pandoc (>= 1.12.3) not available. Falling back to R Markdown v1. Error: processing vignette 'sparkr-vignettes.Rmd' failed with diagnostics: The 'markdown' package should be declared as a dependency of the 'SparkR' package (e.g., in the 'Suggests' field of DESCRIPTION), because the latter contains vignette(s) built with the 'markdown' package. Please see yihui/knitr#1864 for more information. — failed re-building ‘sparkr-vignettes.Rmd’ ``` ### Does this PR introduce _any_ user-facing change? Yes. Workaround for R packaging. ### How was this patch tested? Manually test. After the fix, the command `sh dev/make-distribution.sh -Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pyarn` in the environment without pandoc will pass. Closes apache#32270 from xuanyuanking/SPARK-35171. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: HyukjinKwon <[email protected]> (cherry picked from commit 8e9e700) Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? As discussed, update SparkR maintainer for future release. ### Why are the changes needed? Shivaram will not be able to work with this in the future, so we would like to migrate off the maintainer contact email. shivaram Closes apache#32642 from felixcheung/sparkr-maintainer. Authored-by: Felix Cheung <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 1530876) Signed-off-by: Dongjoon Hyun <[email protected]>
…xec which generates UnsafeRow for DataSourceV2ScanRelation ### What changes were proposed in this pull request? This PR fixes an issue that `RemoveRedundantProjects` removes `ProjectExec` which is for generating `UnsafeRow`. In `DataSourceV2Strategy`, `ProjectExec` will be inserted to ensure internal rows are `UnsafeRow`. ``` private def withProjectAndFilter( project: Seq[NamedExpression], filters: Seq[Expression], scan: LeafExecNode, needsUnsafeConversion: Boolean): SparkPlan = { val filterCondition = filters.reduceLeftOption(And) val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) if (withFilter.output != project || needsUnsafeConversion) { ProjectExec(project, withFilter) } else { withFilter } } ... case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) => // projection and filters were already pushed down in the optimizer. // this uses PhysicalOperation to get the projection and ensure that if the batch scan does // not support columnar, a projection is added to convert the rows to UnsafeRow. val batchExec = BatchScanExec(relation.output, relation.scan) withProjectAndFilter(project, filters, batchExec, !batchExec.supportsColumnar) :: Nil ``` So, the hierarchy of the partial tree should be like `ProjectExec(FilterExec(BatchScan))`. But `RemoveRedundantProjects` doesn't consider this type of hierarchy, leading `ClassCastException`. A concreate example to reproduce this issue is reported: ``` import scala.collection.JavaConverters._ import org.apache.iceberg.{PartitionSpec, TableProperties} import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession} import org.apache.spark.sql.internal.SQLConf class RemoveRedundantProjectsTest extends QueryTest { override val spark: SparkSession = SparkSession .builder() .master("local[4]") .config("spark.driver.bindAddress", "127.0.0.1") .appName(suiteName) .getOrCreate() test("RemoveRedundantProjects removes non-redundant projects") { withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") { withTempDir { dir => val path = dir.getCanonicalPath val data = spark.range(3).toDF val table = new HadoopTables().create( SparkSchemaUtil.convert(data.schema), PartitionSpec.unpartitioned(), Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava, path) data.write.format("iceberg").mode("overwrite").save(path) table.refresh() val df = spark.read.format("iceberg").load(path) val dfX = df.as("x") val dfY = df.as("y") val join = dfX.filter(dfX("id") > 0).join(dfY, "id") join.explain("extended") assert(join.count() == 2) } } } } ``` ``` [info] - RemoveRedundantProjects removes non-redundant projects *** FAILED *** [info] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 4) (xeroxms100.northamerica.corp.microsoft.com executor driver): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow [info] at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226) [info] at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119) ``` ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New test. Closes apache#32606 from sarutak/fix-project-removal-issue. Authored-by: Kousuke Saruta <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit d4fb983) Signed-off-by: Wenchen Fan <[email protected]>
… values if elseValue is set ### What changes were proposed in this pull request? This PR fixes a bug with subexpression elimination for CaseWhen statements. apache#30245 added support for creating subexpressions that are present in all branches of conditional statements. However, for a statement to be in "all branches" of a CaseWhen statement, it must also be in the elseValue. ### Why are the changes needed? Fix a bug where a subexpression can be created and run for branches of a conditional that don't pass. This can cause issues especially with a UDF in a branch that gets executed assuming the condition is true. ### Does this PR introduce _any_ user-facing change? Yes, fixes a potential bug where a UDF could be eagerly executed even though it might expect to have already passed some form of validation. For example: ``` val col = when($"id" < 0, myUdf($"id")) spark.range(1).select(when(col > 0, col)).show() ``` `myUdf($"id")` is considered a subexpression and eagerly evaluated, because it is pulled out as a common expression from both executions of the when clause, but if `id >= 0` it should never actually be run. ### How was this patch tested? Updated existing test with new case. Closes apache#32651 from Kimahriman/bug-case-subexpr-elimination-3.1. Authored-by: Adam Binford <[email protected]> Signed-off-by: Liang-Chi Hsieh <[email protected]>
…3.1.1 to 3.1.2 ### What changes were proposed in this pull request? Change version the facetFilter of DocSearch from 3.1.1 to 3.1.2 ### Why are the changes needed? As we are going to release 3.1.2, the search result of 3.1.2 documentation should point to the new documentation site instead of 3.1.1 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Just simple configuration change. Closes apache#32654 from gengliangwang/docVersion. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…via release-tag.sh ### What changes were proposed in this pull request? Automatically update version index of DocSearch via release-tag.sh for releasing new documentation site, instead of the current manual update. ### Why are the changes needed? Simplify the release process. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually run the following command and check the diff ``` R_NEXT_VERSION=3.2.0 sed -i".tmp8" "s/'facetFilters':.*$/'facetFilters': [\"version:$R_NEXT_VERSION\"]/g" docs/_config.yml ``` Closes apache#32662 from gengliangwang/updateDocsearchInRelease. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Gengliang Wang <[email protected]> (cherry picked from commit 321c654) Signed-off-by: Gengliang Wang <[email protected]>
### What changes were proposed in this pull request? Change the type of `DATASET_ID_TAG` from `Long` to `HashSet[Long]` to allow the logical plan to match multiple datasets. ### Why are the changes needed? During the transformation from one Dataset to another Dataset, the DATASET_ID_TAG of logical plan won't change if the plan itself doesn't change: https://github.com/apache/spark/blob/b5241c97b17a1139a4ff719bfce7f68aef094d95/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L234-L237 However, dataset id always changes even if the logical plan doesn't change: https://github.com/apache/spark/blob/b5241c97b17a1139a4ff719bfce7f68aef094d95/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L207-L208 And this can lead to the mismatch between dataset's id and col's __dataset_id. E.g., ```scala test("SPARK-28344: fail ambiguous self join - Dataset.colRegex as column ref") { // The test can fail if we change it to: // val df1 = spark.range(3).toDF() // val df2 = df1.filter($"id" > 0).toDF() val df1 = spark.range(3) val df2 = df1.filter($"id" > 0) withSQLConf( SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true", SQLConf.CROSS_JOINS_ENABLED.key -> "true") { assertAmbiguousSelfJoin(df1.join(df2, df1.colRegex("id") > df2.colRegex("id"))) } } ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit tests. Closes apache#32692 from Ngone51/spark-35454-3.1. Authored-by: yi.wu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? I just noticed that `AdaptiveQueryExecSuite.SPARK-34091: Batch shuffle fetch in AQE partition coalescing` takes more than 10 minutes to finish, which is unacceptable. This PR sets the shuffle partitions to 10 in that test, so that the test can finish with 5 seconds. ### Why are the changes needed? speed up the test ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A Closes apache#32695 from cloud-fan/test. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 678592a) Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? This is a minor change to update how `StateStoreRestoreExec` computes its number of output rows. Previously we only count input rows, but the optionally restored rows are not counted in. ### Why are the changes needed? Currently the number of output rows of `StateStoreRestoreExec` only counts the each input row. But it actually outputs input rows + optional restored rows. We should provide correct number of output rows. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes apache#32703 from viirya/fix-outputrows. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 73ba449) Signed-off-by: Hyukjin Kwon <[email protected]>
…g TreeNode to JSON ### What changes were proposed in this pull request? Handle Currying Product while serializing TreeNode to JSON. While processing [Product](https://github.com/apache/spark/blob/v3.1.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L820), we may get an assert error for cases like Currying Product because of the mismatch of sizes between field name and field values. Fallback to use reflection to get all the values for constructor parameters when we meet such cases. ### Why are the changes needed? Avoid throwing error while serializing TreeNode to JSON, try to output as much information as possible. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New UT case added. Closes apache#32713 from ivoson/SPARK-35411-followup. Authored-by: Tengfei Huang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
This PR proposes to support R 4.1.0+ in SparkR. Currently the tests are being failed as below: ``` ══ Failed ══════════════════════════════════════════════════════════════════════ ── 1. Failure (test_sparkSQL_arrow.R:71:3): createDataFrame/collect Arrow optimi collect(createDataFrame(rdf)) not equal to `expected`. Component “g”: 'tzone' attributes are inconsistent ('UTC' and '') ── 2. Failure (test_sparkSQL_arrow.R:143:3): dapply() Arrow optimization - type collect(ret) not equal to `rdf`. Component “b”: 'tzone' attributes are inconsistent ('UTC' and '') ── 3. Failure (test_sparkSQL_arrow.R:229:3): gapply() Arrow optimization - type collect(ret) not equal to `rdf`. Component “b”: 'tzone' attributes are inconsistent ('UTC' and '') ── 4. Error (test_sparkSQL.R:1454:3): column functions ───────────────────────── Error: (converted from warning) cannot xtfrm data frames Backtrace: 1. base::sort(collect(distinct(select(df, input_file_name())))) test_sparkSQL.R:1454:2 2. base::sort.default(collect(distinct(select(df, input_file_name())))) 5. base::order(x, na.last = na.last, decreasing = decreasing) 6. base::lapply(z, function(x) if (is.object(x)) as.vector(xtfrm(x)) else x) 7. base:::FUN(X[[i]], ...) 10. base::xtfrm.data.frame(x) ── 5. Failure (test_utils.R:67:3): cleanClosure on R functions ───────────────── `actual` not equal to `g`. names for current but not for target Length mismatch: comparison on first 0 components ── 6. Failure (test_utils.R:80:3): cleanClosure on R functions ───────────────── `actual` not equal to `g`. names for current but not for target Length mismatch: comparison on first 0 components ``` It fixes three as below: - Avoid a sort on DataFrame which isn't legitimate: apache#32709 (comment) - Treat the empty timezone and local timezone as equivalent in SparkR: apache#32709 (comment) - Disable `check.environment` in the cleaned closure comparison (enabled by default from R 4.1+, https://cran.r-project.org/doc/manuals/r-release/NEWS.html), and keep the test as is apache#32709 (comment) Higher R versions have bug fixes and improvements. More importantly R users tend to use highest R versions. Yes, SparkR will work together with R 4.1.0+ ```bash ./R/run-tests.sh ``` ``` sparkSQL_arrow: SparkSQL Arrow optimization: ................. ... sparkSQL: SparkSQL functions: ........................................................................................................................................................................................................ ........................................................................................................................................................................................................ ........................................................................................................................................................................................................ ........................................................................................................................................................................................................ ........................................................................................................................................................................................................ ........................................................................................................................................................................................................ ... utils: functions in utils.R: .............................................. ``` Closes apache#32709 from HyukjinKwon/SPARK-35573. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 1ba1b70) Signed-off-by: Hyukjin Kwon <[email protected]>
…t command ### What changes were proposed in this pull request? Currently, the results of following SQL queries are not redacted: ``` SET [KEY]; SET; ``` For example: ``` scala> spark.sql("set javax.jdo.option.ConnectionPassword=123456").show() +--------------------+------+ | key| value| +--------------------+------+ |javax.jdo.option....|123456| +--------------------+------+ scala> spark.sql("set javax.jdo.option.ConnectionPassword").show() +--------------------+------+ | key| value| +--------------------+------+ |javax.jdo.option....|123456| +--------------------+------+ scala> spark.sql("set").show() +--------------------+--------------------+ | key| value| +--------------------+--------------------+ |javax.jdo.option....| 123456| ``` We should hide the sensitive information and redact the query output. ### Why are the changes needed? Security. ### Does this PR introduce _any_ user-facing change? Yes, the sensitive information in the output of Set commands are redacted ### How was this patch tested? Unit test Closes apache#32720 from gengliangwang/cherry-pick-set-3.1. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
### What changes were proposed in this pull request? ``` - Upload multi stages *** FAILED *** {{ The code passed to eventually never returned normally. Attempted 20 times over 10.011176743 seconds. Last failure message: fallbackStorage.exists(0, file) was false. (FallbackStorageSuite.scala:243)}} ``` The error like above was raised in aarch64 randomly and also in github action test[1][2]. [1] https://github.com/apache/spark/actions/runs/489319612 [2]https://github.com/apache/spark/actions/runs/479317320 ### Why are the changes needed? timeout is too short, need to increase to let test case complete. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? build/mvn test -Dtest=none -DwildcardSuites=org.apache.spark.storage.FallbackStorageSuite -pl :spark-core_2.12 Closes apache#32719 from Yikun/SPARK-35584. Authored-by: Yikun Jiang <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit d773373) Signed-off-by: Dongjoon Hyun <[email protected]>
…stop shutdown hook ### What changes were proposed in this pull request? Fixing the memory leak by deregistering the shutdown hook when the executor is stopped. This way the Garbage Collector can release the executor object early. Which is a huge win for our tests as user's classloader could be also released which keeps references to objects which are created for the jars on the classpath. ### Why are the changes needed? I have identified this leak by running the Livy tests (I know it is close to the attic but this leak causes a constant OOM there) and it is in our Spark unit tests as well. This leak can be identified by checking the number of `LeakyEntry` in case of Scala 2.12.14 (and `ZipEntry` for Scala 2.12.10) instances which with its related data can take up a considerable amount of memory (as those are created from the jars which are on the classpath). I have my own tool for instrumenting JVM code [trace-agent](https://github.com/attilapiros/trace-agent) and with that I am able to call JVM diagnostic commands at specific methods. Let me show how it in action. It has a single text file embedded into the tool's jar called action.txt. In this case actions.txt content is: {noformat} $ unzip -q -c trace-agent-0.0.7.jar actions.txt diagnostic_command org.apache.spark.repl.ReplSuite runInterpreter cmd:gcClassHistogram,limit_output_lines:8,where:beforeAndAfter,with_gc:true diagnostic_command org.apache.spark.repl.ReplSuite afterAll cmd:gcClassHistogram,limit_output_lines:8,where:after,with_gc:true {noformat} Which creates a class histogram at the beginning and at the end of `org.apache.spark.repl.ReplSuite#runInterpreter()` (after triggering a GC which might not finish as GC is done in a separate thread..) and one histogram in the end of the `org.apache.spark.repl.ReplSuite#afterAll()` method. And the histograms are the followings on master branch: ``` $ ./build/sbt ";project repl;set Test/javaOptions += \"-javaagent:/Users/attilazsoltpiros/git/attilapiros/memoryLeak/trace-agent-0.0.7.jar\"; testOnly" |grep "ZipEntry\|LeakyEntry" 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 3: 394178 18920544 scala.reflect.io.FileZipArchive$LeakyEntry 3: 394178 18920544 scala.reflect.io.FileZipArchive$LeakyEntry 3: 591267 28380816 scala.reflect.io.FileZipArchive$LeakyEntry 3: 591267 28380816 scala.reflect.io.FileZipArchive$LeakyEntry 3: 788356 37841088 scala.reflect.io.FileZipArchive$LeakyEntry 3: 788356 37841088 scala.reflect.io.FileZipArchive$LeakyEntry 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 3: 1379623 66221904 scala.reflect.io.FileZipArchive$LeakyEntry 3: 1379623 66221904 scala.reflect.io.FileZipArchive$LeakyEntry 3: 1576712 75682176 scala.reflect.io.FileZipArchive$LeakyEntry ``` Where the header of the table is: ``` num #instances #bytes class name ``` So the `LeakyEntry` in the end is about 75MB (173MB in case of Scala 2.12.10 and before for another class called `ZipEntry`) but the first item (a char/byte arrays) and the second item (strings) in the histogram also relates to this leak: ``` $ ./build/sbt ";project repl;set Test/javaOptions += \"-javaagent:/Users/attilazsoltpiros/git/attilapiros/memoryLeak/trace-agent-0.0.7.jar\"; testOnly" |grep "1:\|2:\|3:" 1: 2701 3496112 [B 2: 21855 2607192 [C 3: 4885 537264 java.lang.Class 1: 480323 55970208 [C 2: 480499 11531976 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 481825 56148024 [C 2: 481998 11567952 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 487056 57550344 [C 2: 487179 11692296 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 487054 57551008 [C 2: 487176 11692224 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 927823 107139160 [C 2: 928072 22273728 java.lang.String 3: 394178 18920544 scala.reflect.io.FileZipArchive$LeakyEntry 1: 927793 107129328 [C 2: 928041 22272984 java.lang.String 3: 394178 18920544 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1361851 155555608 [C 2: 1362261 32694264 java.lang.String 3: 591267 28380816 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1361683 155493464 [C 2: 1362092 32690208 java.lang.String 3: 591267 28380816 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1803074 205157728 [C 2: 1803268 43278432 java.lang.String 3: 788356 37841088 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1802385 204938224 [C 2: 1802579 43261896 java.lang.String 3: 788356 37841088 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2236631 253636592 [C 2: 2237029 53688696 java.lang.String 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2236536 253603008 [C 2: 2236933 53686392 java.lang.String 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2668892 301893920 [C 2: 2669510 64068240 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2668759 301846376 [C 2: 2669376 64065024 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3101238 350101048 [C 2: 3102073 74449752 java.lang.String 3: 1379623 66221904 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3101240 350101104 [C 2: 3102075 74449800 java.lang.String 3: 1379623 66221904 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3533785 398371760 [C 2: 3534835 84836040 java.lang.String 3: 1576712 75682176 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3533759 398367088 [C 2: 3534807 84835368 java.lang.String 3: 1576712 75682176 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3967049 446893400 [C 2: 3968314 95239536 java.lang.String 3: 1773801 85142448 scala.reflect.io.FileZipArchive$LeakyEntry [info] - SPARK-26633: ExecutorClassLoader.getResourceAsStream find REPL classes (8 seconds, 248 milliseconds) Setting default log level to "ERROR". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 1: 3966423 446709584 [C 2: 3967682 95224368 java.lang.String 3: 1773801 85142448 scala.reflect.io.FileZipArchive$LeakyEntry 1: 4399583 495097208 [C 2: 4401050 105625200 java.lang.String 3: 1970890 94602720 scala.reflect.io.FileZipArchive$LeakyEntry 1: 4399578 495070064 [C 2: 4401040 105624960 java.lang.String 3: 1970890 94602720 scala.reflect.io.FileZipArchive$LeakyEntry ``` The last three is about 700MB altogether. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I used the trace-agent tool with the same settings for the modified code: ``` $ ./build/sbt ";project repl;set Test/javaOptions += \"-javaagent:/Users/attilazsoltpiros/git/attilapiros/memoryLeak/trace-agent-0.0.7.jar\"; testOnly" |grep "1:\|2:\|3:" 1: 2701 3496112 [B 2: 21855 2607192 [C 3: 4885 537264 java.lang.Class 1: 480323 55970208 [C 2: 480499 11531976 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 481825 56148024 [C 2: 481998 11567952 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 487056 57550344 [C 2: 487179 11692296 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 487054 57551008 [C 2: 487176 11692224 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 927823 107139160 [C 2: 928072 22273728 java.lang.String 3: 394178 18920544 scala.reflect.io.FileZipArchive$LeakyEntry 1: 927793 107129328 [C 2: 928041 22272984 java.lang.String 3: 394178 18920544 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1361851 155555608 [C 2: 1362261 32694264 java.lang.String 3: 591267 28380816 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1361683 155493464 [C 2: 1362092 32690208 java.lang.String 3: 591267 28380816 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1803074 205157728 [C 2: 1803268 43278432 java.lang.String 3: 788356 37841088 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1802385 204938224 [C 2: 1802579 43261896 java.lang.String 3: 788356 37841088 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2236631 253636592 [C 2: 2237029 53688696 java.lang.String 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2236536 253603008 [C 2: 2236933 53686392 java.lang.String 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2668892 301893920 [C 2: 2669510 64068240 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2668759 301846376 [C 2: 2669376 64065024 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3101238 350101048 [C 2: 3102073 74449752 java.lang.String 3: 1379623 66221904 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3101240 350101104 [C 2: 3102075 74449800 java.lang.String 3: 1379623 66221904 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3533785 398371760 [C 2: 3534835 84836040 java.lang.String 3: 1576712 75682176 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3533759 398367088 [C 2: 3534807 84835368 java.lang.String 3: 1576712 75682176 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3967049 446893400 [C 2: 3968314 95239536 java.lang.String 3: 1773801 85142448 scala.reflect.io.FileZipArchive$LeakyEntry [info] - SPARK-26633: ExecutorClassLoader.getResourceAsStream find REPL classes (8 seconds, 248 milliseconds) Setting default log level to "ERROR". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 1: 3966423 446709584 [C 2: 3967682 95224368 java.lang.String 3: 1773801 85142448 scala.reflect.io.FileZipArchive$LeakyEntry 1: 4399583 495097208 [C 2: 4401050 105625200 java.lang.String 3: 1970890 94602720 scala.reflect.io.FileZipArchive$LeakyEntry 1: 4399578 495070064 [C 2: 4401040 105624960 java.lang.String 3: 1970890 94602720 scala.reflect.io.FileZipArchive$LeakyEntry [success] Total time: 174 s (02:54), completed Jun 2, 2021 2:00:43 PM ╭─attilazsoltpirosapiros-MBP16 ~/git/attilapiros/memoryLeak ‹SPARK-35610*› ╰─$ vim ╭─attilazsoltpirosapiros-MBP16 ~/git/attilapiros/memoryLeak ‹SPARK-35610*› ╰─$ ./build/sbt ";project repl;set Test/javaOptions += \"-javaagent:/Users/attilazsoltpiros/git/attilapiros/memoryLeak/trace-agent-0.0.7.jar\"; testOnly" |grep "1:\|2:\|3:" 1: 2685 3457368 [B 2: 21833 2606712 [C 3: 4885 537264 java.lang.Class 1: 480245 55978400 [C 2: 480421 11530104 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 480460 56005784 [C 2: 480633 11535192 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 486643 57537784 [C 2: 486766 11682384 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 486636 57538192 [C 2: 486758 11682192 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 501208 60411856 [C 2: 501180 12028320 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 501206 60412960 [C 2: 501177 12028248 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 934925 108773320 [C 2: 935058 22441392 java.lang.String 3: 394178 18920544 scala.reflect.io.FileZipArchive$LeakyEntry 1: 934912 108769528 [C 2: 935044 22441056 java.lang.String 3: 394178 18920544 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1370351 156901296 [C 2: 1370318 32887632 java.lang.String 3: 591267 28380816 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1369660 156681680 [C 2: 1369627 32871048 java.lang.String 3: 591267 28380816 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1803746 205383136 [C 2: 1803917 43294008 java.lang.String 3: 788356 37841088 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1803658 205353096 [C 2: 1803828 43291872 java.lang.String 3: 788356 37841088 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2235677 253608240 [C 2: 2236068 53665632 java.lang.String 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2235539 253560088 [C 2: 2235929 53662296 java.lang.String 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2667775 301799240 [C 2: 2668383 64041192 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2667765 301798568 [C 2: 2668373 64040952 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2666665 301491096 [C 2: 2667285 64014840 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2666648 301490792 [C 2: 2667266 64014384 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2668169 301833032 [C 2: 2668782 64050768 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry [info] - SPARK-26633: ExecutorClassLoader.getResourceAsStream find REPL classes (6 seconds, 396 milliseconds) Setting default log level to "ERROR". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 1: 2235495 253419952 [C 2: 2235887 53661288 java.lang.String 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2668379 301800768 [C 2: 2668979 64055496 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2236123 253522640 [C 2: 2236514 53676336 java.lang.String 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry ``` The sum of the last three numbers is about 354MB. Closes apache#32748 from attilapiros/SPARK-35610. Authored-by: attilapiros <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 806edf8) Signed-off-by: Dongjoon Hyun <[email protected]>
…Utils.unpack ### What changes were proposed in this pull request? This PR proposes to use `FileUtil.unTarUsingJava` that is a Java implementation for un-tar `.tar` files. `unTarUsingJava` is not public but it exists in all Hadoop versions from 2.1+, see HADOOP-9264. The security issue reproduction requires a non-Windows platform, and a non-gzipped TAR archive file name (contents don't matter). ### Why are the changes needed? There is a risk for arbitrary shell command injection via `Utils.unpack` when the filename is controlled by a malicious user. This is due to an issue in Hadoop's `unTar`, that is not properly escaping the filename before passing to a shell command:https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java#L904 ### Does this PR introduce _any_ user-facing change? Yes, it prevents a security issue that, previously, allowed users to execute arbitrary shall command. ### How was this patch tested? Manually tested in local, and existing test cases should cover. Closes apache#35946 from HyukjinKwon/SPARK-38631. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 057c051) Signed-off-by: Hyukjin Kwon <[email protected]>
…iteral This is a backport of apache#35878 to branch 3.1. The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column. For example, the sql in the test case will generate such a physical plan when the adaptive is closed: ```tex *(4) Project [store_id#5281, date_id#5283, state_province#5292] +- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false :- Union : :- *(1) Project [4 AS store_id#5281, date_id#5283] : : +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300)) : : : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=apache#336] : : +- *(1) ColumnarToRow : : +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int> : : +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=apache#336] : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=apache#335] : : +- *(1) Project [store_id#5291, state_province#5292] : : +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291)) : : +- *(1) ColumnarToRow : : +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string> : +- *(2) Project [5 AS store_id#5282, date_id#5287] : +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300)) : : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=apache#336] : +- *(2) ColumnarToRow : +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int> : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=apache#336] +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=apache#335] ``` after this pr: ```tex *(4) Project [store_id#5281, date_id#5283, state_province#5292] +- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false :- Union : :- *(1) Project [4 AS store_id#5281, date_id#5283] : : +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) : : +- *(1) ColumnarToRow : : +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int> : +- *(2) Project [5 AS store_id#5282, date_id#5287] : +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) : +- *(2) ColumnarToRow : +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=apache#326] +- *(3) Project [store_id#5291, state_province#5292] +- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291)) +- *(3) ColumnarToRow +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string> ``` Execution performance improvement No Added unit test Closes apache#35967 from mcdull-zhang/spark_38570_3.2. Authored-by: mcdull-zhang <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit 8621914) Signed-off-by: Yuming Wang <[email protected]>
…building project list in `ExtractGenerator` Backport of apache#35837. When building the project list from an aggregate sequence in `ExtractGenerator`, convert the aggregate sequence to an `IndexedSeq` before performing the flatMap operation. This query fails with a `NullPointerException`: ``` val df = Seq(1, 2, 3).toDF("v") df.select(Stream(explode(array(min($"v"), max($"v"))), sum($"v")): _*).collect ``` If you change `Stream` to `Seq`, then it succeeds. `ExtractGenerator` uses a flatMap operation over `aggList` for two purposes: - To produce a new aggregate list - to update `projectExprs` (which is initialized as an array of nulls). When `aggList` is a `Stream`, the flatMap operation evaluates lazily, so all entries in `projectExprs` after the first will still be null when the rule completes. Changing `aggList` to an `IndexedSeq` forces the flatMap to evaluate eagerly. No New unit test Closes apache#35851 from bersprockets/generator_aggregate_issue_32. Authored-by: Bruce Robbins <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 7842621) Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request? This PR replaces `new Path(fileUri.getPath)` with `new Path(fileUri)`. By using `Path` class constructor with URI parameter, we can preserve file scheme. ### Why are the changes needed? If we use, `Path` class constructor with `String` parameter, it loses file scheme information. Although the original code works so far, it fails at Apache Hadoop 3.3.2 and breaks dependency upload feature which is covered by K8s Minikube integration tests. ```scala test("uploadFileUri") { val fileUri = org.apache.spark.util.Utils.resolveURI("/tmp/1.txt") assert(new Path(fileUri).toString == "file:/private/tmp/1.txt") assert(new Path(fileUri.getPath).toString == "/private/tmp/1.txt") } ``` ### Does this PR introduce _any_ user-facing change? No, this will prevent a regression at Apache Spark 3.3.0 instead. ### How was this patch tested? Pass the CIs. In addition, this PR and apache#36009 will recover K8s IT `DepsTestsSuite`. Closes apache#36010 from dongjoon-hyun/SPARK-38652. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit cab8aa1) Signed-off-by: Dongjoon Hyun <[email protected]>
…function in Executor It is master branch pr [SPARK-38333](apache#35662) Bug fix, it is potential issue. No UT Closes apache#36012 from monkeyboy123/spark-38333. Authored-by: Dereck Li <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit a40acd4) Signed-off-by: Wenchen Fan <[email protected]>
…ntExprs function instead of getExprState at SubexpressionEliminationSuite ### What changes were proposed in this pull request? This pr use EquivalentExpressions getEquivalentExprs function instead of getExprState at SubexpressionEliminationSuite, and remove cpus paramter. ### Why are the changes needed? Fixes build error ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? CI Tests Closes apache#36033 from monkeyboy123/SPARK-38754. Authored-by: Dereck Li <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…DownloadCallback caused by Log4j ### What changes were proposed in this pull request? While `log4j.ignoreTCL/log4j2.ignoreTCL` is false, which is the default, it uses the context ClassLoader for the current Thread, see `org.apache.logging.log4j.util.LoaderUtil.loadClass`. While ExecutorClassLoader try to loadClass through remotely though the FileDownload, if error occurs, we will long on debug level, and `log4j...LoaderUtil` will be blocked by ExecutorClassLoader acquired classloading lock. Fortunately, it only happens when ThresholdFilter's level is `debug`. or we can set `log4j.ignoreTCL/log4j2.ignoreTCL` to true, but I don't know what else it will cause. So in this PR, I simply remove the debug log which cause this deadlock ### Why are the changes needed? fix deadlock ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? apache/kyuubi#2046 (comment), with a ut in kyuubi project, resolved(https://github.com/apache/incubator-kyuubi/actions/runs/1950222737) ### Additional Resources [ut.jstack.txt](https://github.com/apache/spark/files/8206457/ut.jstack.txt) Closes apache#35765 from yaooqinn/SPARK-38446. Authored-by: Kent Yao <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit aef6745) Signed-off-by: Dongjoon Hyun <[email protected]>
…h a documentation fix ### What changes were proposed in this pull request? This PR proposes two minor changes: - Fixes the example at `Dataset.observe(String, ...)` - Adds `varargs` to be consistent with another overloaded version: `Dataset.observe(Observation, ..)` ### Why are the changes needed? To provide a correct example, support Java APIs properly with `varargs` and API consistency. ### Does this PR introduce _any_ user-facing change? Yes, the example is fixed in the documentation. Additionally Java users should be able to use `Dataset.observe(String, ..)` per `varargs`. ### How was this patch tested? Manually tested. CI should verify the changes too. Closes apache#36084 from HyukjinKwon/minor-docs. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit fb3f380) Signed-off-by: Hyukjin Kwon <[email protected]>
… ..) with a documentation fix" This reverts commit dea607f.
github-actions
bot
added
BUILD
SQL
INFRA
DOCS
EXAMPLES
CORE
SPARK SHELL
AVRO
DSTREAM
GRAPHX
ML
MLLIB
STRUCTURED STREAMING
PYTHON
R
YARN
MESOS
KUBERNETES
WEB UI
labels
Apr 14, 2022
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Why are the changes needed?
Does this PR introduce any user-facing change?
How was this patch tested?