Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] delta_lake_update_test.test_delta_update_fallback_with_deletion_vectors failed assertion failed: Could not find RapidsDeltaWriteExec in the GPU plans with spark34Xshims #12123

Open
pxLi opened this issue Feb 13, 2025 · 3 comments
Assignees
Labels
bug Something isn't working

Comments

@pxLi
Copy link
Collaborator

pxLi commented Feb 13, 2025

Describe the bug
this is affecting branch-25.04 only

first seen in rapids_it-arm64-dev (spark341), run:335

src.main.python.delta_lake_update_test.test_delta_update_fallback_with_deletion_vectors[DATAGEN_SEED=1739407068, TZ=UTC, IGNORE_ORDER, ALLOW_NON_GPU(ColumnarToRowExec,RapidsDeltaWriteExec,ExecutedCommandExec,DeserializeToObjectExec,ShuffleExchangeExec,FileSourceScanExec,FilterExec,MapPartitionsExec,MapElementsExec,ObjectHashAggregateExec,ProjectExec,SerializeFromObjectExec,SortExec)]

and another occurences rapids_it-matrix-dev-github (spark344, spark340), run:160

../../src/main/python/delta_lake_update_test.py::test_delta_update_fallback_with_deletion_vectors[DATAGEN_SEED=1739409713, TZ=UTC, IGNORE_ORDER, ALLOW_NON_GPU(ColumnarToRowExec,RapidsDeltaWriteExec,ExecutedCommandExec,DeserializeToObjectExec,ShuffleExchangeExec,FileSourceScanExec,FilterExec,MapPartitionsExec,MapElementsExec,ObjectHashAggregateExec,ProjectExec,SerializeFromObjectExec,SortExec)] 

for other spark versions, we have not seen any occurence yet

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertCapturedAndGpuFellBack.
: java.lang.AssertionError: assertion failed: Could not find RapidsDeltaWriteExec in the GPU plans:
*(1) Project [path#167658, partitionValues#167659, size#167660L, modificationTime#167661L, dataChange#167662, null AS stats#167675, tags#167664, deletionVector#167665]
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).path, true, false, true) AS path#167658, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -2), StringType, ObjectType(class java.lang.String)), true, false, true), knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).partitionValues) AS partitionValues#167659, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).size AS size#167660L, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).modificationTime AS modificationTime#167661L, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).dataChange AS dataChange#167662, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -3), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -3), StringType, ObjectType(class java.lang.String)), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -4), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -4), StringType, ObjectType(class java.lang.String)), true, false, true), knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).tags) AS tags#167664, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).deletionVector)) null else named_struct(storageType, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).deletionVector).storageType, true, false, true), pathOrInlineDv, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).deletionVector).pathOrInlineDv, true, false, true), offset, unwrapoption(IntegerType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).deletionVector).offset), sizeInBytes, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).deletionVector).sizeInBytes, cardinality, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).deletionVector).cardinality, maxRowIndex, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.AddFile, true])).deletionVector).maxRowIndex)) AS deletionVector#167665]
   +- *(1) MapElements org.apache.spark.sql.Dataset$$Lambda$4353/169212415@1379e1c8, obj#167657: org.apache.spark.sql.delta.actions.AddFile
      +- *(1) DeserializeToObject newInstance(class scala.Tuple1), obj#167656: scala.Tuple1
         +- *(1) Project [add#166997]
            +- *(1) Filter isnotnull(add#166997)
               +- *(1) Scan ExistingRDD Delta Table State #1 - file:///tmp/pyspark_tests/it-arm64-341-jenkins-rapids-it-arm64-dev-335-7n57v-g4sd9-gw2-1516737-1045750855/DELTA_DATA/GPU/_delta_log[txn#166996,add#166997,remove#166998,metaData#166999,protocol#167000,cdc#167001,commitInfo#167002]

LocalTableScan [path#167746, partitionValues#167747, size#167748L, modificationTime#167749L, dataChange#167750, stats#167751, tags#167752, deletionVector#167753]

LocalTableScan [path#167796, partitionValues#167797, size#167798L, modificationTime#167799L, dataChange#167800, stats#167801, tags#167802, deletionVector#167803]

GpuColumnarToRow false, [loreId=9]
+- GpuRapidsDeltaWrite [loreId=8]
   +- GpuProject [0 AS a#167715], [loreId=7]
      +- GpuCoalesceBatches targetsize(104857600), [loreId=6]
         +- GpuFilter UDF(), [loreId=5]
            +- GpuFileGpuScan parquet [] Batched: true, DataFilters: [], Format: Parquet, Location: TahoeBatchFileIndex[file:/tmp/pyspark_tests/it-arm64-341-jenkins-rapids-it-arm64-dev-335-7n57v-g4..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

*(1) Project [protocol#167839, metaData#167838, action_sort_column#167842]
+- *(1) Filter (isnotnull(protocol#167839.minReaderVersion) OR isnotnull(metaData#167838.id))
   +- *(1) Project [metaData#167838, protocol#167839, input_file_name() AS action_sort_column#167842]
      +- FileScan json [metaData#167838,protocol#167839] Batched: false, DataFilters: [], Format: JSON, Location: DeltaLogFileIndex(3 paths)[file:/tmp/pyspark_tests/it-arm64-341-jenkins-rapids-it-arm64-dev-335-7..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<metaData:struct<id:string,name:string,description:string,format:struct<provider:string,opt...

*(3) SerializeFromObject [if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn)) null else named_struct(appId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn).appId, true, false, true), version, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn).version, lastUpdated, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn).lastUpdated)) AS txn#167989, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add)) null else named_struct(path, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).path, true, false, true), partitionValues, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -1), StringType, ObjectType(class java.lang.String)), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -2), StringType, ObjectType(class java.lang.String)), true, false, true), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).partitionValues), size, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).size, modificationTime, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).modificationTime, dataChange, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).dataChange, stats, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).stats, true, false, true), tags, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -3), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -3), StringType, ObjectType(class java.lang.String)), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -4), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -4), StringType, ObjectType(class java.lang.String)), true, false, true), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).tags), deletionVector, if (isnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).deletionVector)) null else named_struct(storageType, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).deletionVector).storageType, true, false, true), pathOrInlineDv, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).deletionVector).pathOrInlineDv, true, false, true), offset, unwrapoption(IntegerType, knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).deletionVector).offset), sizeInBytes, knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).deletionVector).sizeInBytes, cardinality, knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).deletionVector).cardinality, maxRowIndex, unwrapoption(LongType, knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).deletionVector).maxRowIndex))) AS add#167990, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove)) null else named_struct(path, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).path, true, false, true), deletionTimestamp, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).deletionTimestamp), dataChange, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).dataChange, extendedFileMetadata, unwrapoption(BooleanType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).extendedFileMetadata), partitionValues, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -5), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -5), StringType, ObjectType(class java.lang.String)), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -6), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -6), StringType, ObjectType(class java.lang.String)), true, false, true), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).partitionValues), size, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).size), tags, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -7), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -7), StringType, ObjectType(class java.lang.String)), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -8), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -8), StringType, ObjectType(class java.lang.String)), true, false, true), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).tags), deletionVector, if (isnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).deletionVector)) null else named_struct(storageType, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).deletionVector).storageType, true, false, true), pathOrInlineDv, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).deletionVector).pathOrInlineDv, true, false, true), offset, unwrapoption(IntegerType, knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).deletionVector).offset), sizeInBytes, knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).deletionVector).sizeInBytes, cardinality, knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).deletionVector).cardinality, maxRowIndex, unwrapoption(LongType, knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).deletionVector).maxRowIndex))) AS remove#167991, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData)) null else named_struct(id, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).id, true, false, true), name, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).name, true, false, true), description, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).description, true, false, true), format, if (isnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).format)) null else named_struct(provider, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).format).provider, true, false, true), options, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -9), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -9), StringType, ObjectType(class java.lang.String)), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -10), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -10), StringType, ObjectType(class java.lang.String)), true, false, true), knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).format).options)), schemaString, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).schemaString, true, false, true), partitionColumns, mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -11), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -11), StringType, ObjectType(class java.lang.String)), true, false, true), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).partitionColumns, None), configuration, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -12), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -12), StringType, ObjectType(class java.lang.String)), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -13), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -13), StringType, ObjectType(class java.lang.String)), true, false, true), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).configuration), createdTime, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).createdTime)) AS metaData#167992, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).protocol)) null else named_struct(minReaderVersion, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).protocol).minReaderVersion, minWriterVersion, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).protocol).minWriterVersion, readerFeatures, mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -14), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -14), StringType, ObjectType(class java.lang.String)), true, false, true), unwrapoption(ObjectType(interface scala.collection.immutable.Set), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).protocol).readerFeatures).toSeq, None), writerFeatures, mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -15), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -15), StringType, ObjectType(class java.lang.String)), true, false, true), unwrapoption(ObjectType(interface scala.collection.immutable.Set), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).protocol).writerFeatures).toSeq, None)) AS protocol#167993, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc)) null else named_struct(path, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).path, true, false, true), partitionValues, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -16), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -16), StringType, ObjectType(class java.lang.String)), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -17), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -17), StringType, ObjectType(class java.lang.String)), true, false, true), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).partitionValues), size, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).size, tags, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -18), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -18), StringType, ObjectType(class java.lang.String)), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -19), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -19), StringType, ObjectType(class java.lang.String)), true, false, true), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).tags)) AS cdc#167994, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo)) null else named_struct(version, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).version), timestamp, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).timestamp, true, false, true), userId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).userId), true, false, true), userName, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).userName), true, false, true), operation, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).operation, true, false, true), operationParameters, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -20), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.Object), true, -20), StringType, ObjectType(class java.lang.String)), true, false, true), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -21), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.Object), true, -21), StringType, ObjectType(class java.lang.String)), true, false, true), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).operationParameters), job, if (isnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job))) null else named_struct(jobId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).jobId, true, false, true), jobName, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).jobName, true, false, true), runId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).runId, true, false, true), jobOwnerId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).jobOwnerId, true, false, true), triggerType, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).triggerType, true, false, true)), notebook, if (isnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.NotebookInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).notebook))) null else named_struct(notebookId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.NotebookInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).notebook)).notebookId, true, false, true)), clusterId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).clusterId), true, false, true), readVersion, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).readVersion), isolationLevel, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).isolationLevel), true, false, true), isBlindAppend, unwrapoption(BooleanType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).isBlindAppend), ... 10 more fields) AS commitInfo#167995]
+- MapPartitions org.apache.spark.sql.delta.Snapshot$$Lambda$3509/717991648@314372a8, obj#167988: org.apache.spark.sql.delta.actions.SingleAction
   +- DeserializeToObject newInstance(class org.apache.spark.sql.delta.actions.SingleAction), obj#167987: org.apache.spark.sql.delta.actions.SingleAction
      +- *(2) Project [txn#167863, CASE WHEN isnotnull(_extract_path#168003) THEN struct(path, add_path_canonical#167898, partitionValues, _extract_partitionValues#168004, size, _extract_size#168005L, modificationTime, _extract_modificationTime#168006L, dataChange, _extract_dataChange#168007, stats, add_stats_to_use#167887, tags, _extract_tags#168008, deletionVector, _extract_deletionVector#168009) END AS add#167933, CASE WHEN isnotnull(remove#167865.path) THEN if (isnull(remove#167865)) null else named_struct(path, remove_path_canonical#167914, deletionTimestamp, remove#167865.deletionTimestamp, dataChange, remove#167865.dataChange, extendedFileMetadata, remove#167865.extendedFileMetadata, partitionValues, remove#167865.partitionValues, size, remove#167865.size, tags, remove#167865.tags, deletionVector, remove#167865.deletionVector) END AS remove#167952, metaData#167866, protocol#167867, cdc#167868, commitInfo#167869]
         +- *(2) Sort [action_sort_column#167877 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(coalesce(add_path_canonical#167898, remove_path_canonical#167914), 50), REPARTITION_BY_NUM, [plan_id=113863]
               +- *(1) Project [txn#167863, add#167864.path AS _extract_path#168003, add#167864.partitionValues AS _extract_partitionValues#168004, add#167864.size AS _extract_size#168005L, add#167864.modificationTime AS _extract_modificationTime#168006L, add#167864.dataChange AS _extract_dataChange#168007, add#167864.tags AS _extract_tags#168008, add#167864.deletionVector AS _extract_deletionVector#168009, remove#167865, metaData#167866, protocol#167867, cdc#167868, commitInfo#167869, action_sort_column#167877, add#167864.stats AS add_stats_to_use#167887, CASE WHEN isnotnull(add#167864.path) THEN UDF(add#167864.path) END AS add_path_canonical#167898, CASE WHEN isnotnull(remove#167865.path) THEN UDF(remove#167865.path) END AS remove_path_canonical#167914]
                  +- *(1) Project [txn#167863, add#167864, remove#167865, metaData#167866, protocol#167867, cdc#167868, commitInfo#167869, input_file_name() AS action_sort_column#167877]
                     +- FileScan json [txn#167863,add#167864,remove#167865,metaData#167866,protocol#167867,cdc#167868,commitInfo#167869] Batched: false, DataFilters: [], Format: JSON, Location: DeltaLogFileIndex(3 paths)[file:/tmp/pyspark_tests/it-arm64-341-jenkins-rapids-it-arm64-dev-335-7..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<txn:struct<appId:string,version:bigint,lastUpdated:bigint>,add:struct<path:string,partitio...

ObjectHashAggregate(keys=[], functions=[collect_set(txn#167989, 0, 0), count(protocol#167993), sum(add#167990.size), last(metaData#167992, true), count(metaData#167992), last(protocol#167993, true), count(remove#167991), count(add#167990), count(txn#167989)], output=[fileSizeHistogram#168033, setTransactions#168034, numOfProtocol#168035L, sizeInBytes#168036L, metadata#168037, numOfMetadata#168038L, protocol#168039, numOfRemoves#168040L, numOfFiles#168041L, numOfSetTransactions#168042L])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=113917]
   +- ObjectHashAggregate(keys=[], functions=[partial_collect_set(txn#167989, 0, 0), partial_count(protocol#167993), partial_sum(add#167990.size), partial_last(metaData#167992, true), partial_count(metaData#167992), partial_last(protocol#167993, true), partial_count(remove#167991), partial_count(add#167990), partial_count(txn#167989)], output=[buf#168097, count#168098L, sum#168099L, last#168100, valueSet#168101, count#168102L, last#168103, valueSet#168104, count#168105L, count#168106L, count#168107L])
      +- *(1) Project [txn#167989, add#167990, remove#167991, metaData#167992, protocol#167993]
         +- *(1) Scan ExistingRDD Delta Table State #2 - file:///tmp/pyspark_tests/it-arm64-341-jenkins-rapids-it-arm64-dev-335-7n57v-g4sd9-gw2-1516737-1045750855/DELTA_DATA/GPU/_delta_log[txn#167989,add#167990,remove#167991,metaData#167992,protocol#167993,cdc#167994,commitInfo#167995]

GpuExecute GpuUpdateCommand [loreId=1]
   +- GpuUpdateCommand org.apache.spark.sql.delta.rapids.GpuDeltaLog@3067a548, Delta[version=2, ... tests/it-arm64-341-jenkins-rapids-it-arm64-dev-335-7n57v-g4sd9-gw2-1516737-1045750855/DELTA_DATA/GPU], [0]
         +- SubqueryAlias spark_catalog.delta.`/tmp/pyspark_tests//it-arm64-341-jenkins-rapids-it-arm64-dev-335-7n57v-g4sd9-gw2-1516737-1045750855//DELTA_DATA/GPU`
            +- Relation [a#167622] parquet

	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.sql.rapids.ShimmedExecutionPlanCaptureCallbackImpl.assertDidFallBack(ShimmedExecutionPlanCaptureCallbackImpl.scala:153)
	at org.apache.spark.sql.rapids.ShimmedExecutionPlanCaptureCallbackImpl.assertCapturedAndGpuFellBack(ShimmedExecutionPlanCaptureCallbackImpl.scala:144)
	at org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback$.assertCapturedAndGpuFellBack(ExecutionPlanCaptureCallback.scala:104)
	at org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertCapturedAndGpuFellBack(ExecutionPlanCaptureCallback.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Stacktrace
spark_tmp_path = '/tmp/pyspark_tests//it-arm64-341-jenkins-rapids-it-arm64-dev-335-7n57v-g4sd9-gw2-1516737-1045750855/'

    @allow_non_gpu('ColumnarToRowExec', 'RapidsDeltaWriteExec', delta_write_fallback_allow, *delta_meta_allow)
    @delta_lake
    @ignore_order
    @pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x")
    @pytest.mark.skipif(not supports_delta_lake_deletion_vectors(), reason="Deletion vectors aren't supported")
    def test_delta_update_fallback_with_deletion_vectors(spark_tmp_path):
        data_path = spark_tmp_path + "/DELTA_DATA"
        def setup_tables(spark):
            setup_delta_dest_tables(spark, data_path,
                                    dest_table_func=lambda spark: unary_op_df(spark, int_gen),
                                    use_cdf=False, enable_deletion_vectors=True)
        def write_func(spark, path):
            update_sql="UPDATE delta.`{}` SET a = 0".format(path)
            spark.sql(update_sql)
        with_cpu_session(setup_tables)
>       assert_gpu_fallback_write(write_func, read_delta_path, data_path,
                                  "RapidsDeltaWriteExec", delta_update_enabled_conf)

../../src/main/python/delta_lake_update_test.py:80: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../src/main/python/asserts.py:373: in assert_gpu_fallback_write
    jvm.org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertCapturedAndGpuFellBack(cpu_fallback_class_name_list, 10000)
/home/jenkins/agent/workspace/rapids_it-arm64-dev/jars/spark-3.4.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322: in __call__
    return_value = get_return_value(
../../../spark-3.4.1-bin-hadoop3/python/pyspark/errors/exceptions/captured.py:169: in deco
    return f(*a, **kw)

Steps/Code to reproduce bug
Please provide a list of steps or a code sample to reproduce the issue.
Avoid posting private or sensitive data.

Expected behavior
make the case deterministic

Environment details (please complete the following information)

  • Environment location: [Standalone, YARN, Kubernetes, Cloud(specify cloud provider)]
  • Spark configuration settings related to the issue

Additional context
Add any other context about the problem here.

@pxLi pxLi added ? - Needs Triage Need team to review and classify bug Something isn't working labels Feb 13, 2025
@pxLi pxLi changed the title [BUG] delta_lake_update_test.test_delta_update_fallback_with_deletion_vectors failed assertion failed: Could not find RapidsDeltaWriteExec in the GPU plans [BUG] delta_lake_update_test.test_delta_update_fallback_with_deletion_vectors failed assertion failed: Could not find RapidsDeltaWriteExec in the GPU plans with spark34Xshims Feb 13, 2025
@pxLi
Copy link
Collaborator Author

pxLi commented Feb 13, 2025

related to #12048 cc @razajafri to help take a look thanks

@gerashegalov
Copy link
Collaborator

gerashegalov commented Feb 14, 2025

To reproduce locally run

PYSP_TEST_spark_jars_packages="io.delta:delta-core_2.12:2.4.0" \
PYSP_TEST_spark_sql_extensions="io.delta.sql.DeltaSparkSessionExtension" \
PYSP_TEST_spark_sql_catalog_spark__catalog="org.apache.spark.sql.delta.catalog.DeltaCatalog" \
TEST_PARALLEL=0  \
TESTS=delta_lake_update_test.py::test_delta_update_fallback_with_deletion_vectors \
SPARK_HOME=~/dist/spark-3.4.3-bin-hadoop3 \
./integration_tests/run_pyspark_from_build.sh  --delta_lake -m delta_lake 

@razajafri
Copy link
Collaborator

I need to spend more time on how the DataWritingCommandExec works because as I understand it is hidden in the plan for Delta Lake which is why it is challenging to write a fallback test

gerashegalov pushed a commit that referenced this issue Feb 15, 2025
#12141)

This PR XFAILs a test that was added as part of
#12048 to unblock the CI
pipeline

It's OK for us to do this because we added the test to ensure we fall
back to CPU on DBR 14.3 when writing Deletion vectors. The failure on
Spark3.4+ needs further investigation which is why we are not closing
the issue

contributes to #12123 

---------
Signed-off-by: Raza Jafri <[email protected]>
@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label Feb 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants