Skip to content

Commit e4e3336

Browse files
rajeshparangiTimothyW553
authored andcommitted
[SPARK] Refactor Vacuum code (delta-io#5557)
#### Which Delta project/connector is this regarding - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR refactors Vacuum code a bit to be bit more extensible in the future. ## How was this patch tested? Existing tests ## Does this PR introduce _any_ user-facing changes? NO
1 parent e1334aa commit e4e3336

File tree

1 file changed

+20
-4
lines changed

1 file changed

+20
-4
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,8 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
220220
.getOrElse(spark.sessionState.conf.numShufflePartitions)
221221
val startTimeToIdentifyEligibleFiles = System.currentTimeMillis()
222222

223-
val validFiles =
223+
224+
val validFilesResult =
224225
getValidFilesFromSnapshot(
225226
spark,
226227
basePath,
@@ -236,6 +237,8 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
236237
)
237238
)
238239

240+
val validFiles = validFilesResult.validFiles // Extract DataFrame
241+
239242
val partitionColumns = snapshot.metadata.partitionSchema.fieldNames
240243
val parallelism = spark.sessionState.conf.parallelPartitionDiscoveryParallelism
241244
val shouldIcebergMetadataDirBeHidden = UniversalFormat.icebergEnabled(snapshot.metadata)
@@ -478,6 +481,10 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
478481
Some(eligibleEndCommitVersion)
479482
)
480483
}
484+
485+
case class ValidFilesResult(
486+
validFiles: DataFrame
487+
)
481488
}
482489

483490
trait VacuumCommandImpl extends DeltaCommand {
@@ -932,6 +939,7 @@ trait VacuumCommandImpl extends DeltaCommand {
932939
files
933940
}
934941

942+
935943
/**
936944
* Helper to compute all valid files based on basePath and Snapshot provided.
937945
* Returns a DataFrame with a single column "path" containing all files that should be
@@ -946,7 +954,7 @@ trait VacuumCommandImpl extends DeltaCommand {
946954
retentionMillis: Option[Long],
947955
hadoopConf: Broadcast[SerializableConfiguration],
948956
clock: Clock,
949-
config: ValidFilesConfig): DataFrame = {
957+
config: ValidFilesConfig): VacuumCommand.ValidFilesResult = {
950958
import org.apache.spark.sql.delta.implicits._
951959
require(snapshot.version >= 0, "No state defined for this table. Is this really " +
952960
"a Delta table? Refusing to garbage collect.")
@@ -973,7 +981,9 @@ trait VacuumCommandImpl extends DeltaCommand {
973981
)
974982

975983
val canonicalizedBasePath = SparkPath.fromPathString(basePath).urlEncoded
976-
snapshot.stateDS.mapPartitions { actions =>
984+
985+
986+
val files = snapshot.stateDS.mapPartitions { actions =>
977987
val reservoirBase = new Path(basePath)
978988
val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
979989
actions.flatMap {
@@ -994,7 +1004,13 @@ trait VacuumCommandImpl extends DeltaCommand {
9941004
case _ => Nil
9951005
}
9961006
}
997-
}.toDF("path")
1007+
}
1008+
1009+
val validFiles = files
1010+
.toDF("path")
1011+
VacuumCommand.ValidFilesResult(
1012+
validFiles
1013+
)
9981014
}
9991015

10001016
/**

0 commit comments

Comments
 (0)