Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.storage

import java.io.IOException
import java.io.{FileNotFoundException, IOException}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable
Expand All @@ -40,7 +40,7 @@ private[storage] class BlockManagerDecommissioner(
conf: SparkConf,
bm: BlockManager) extends Logging {

private val fallbackStorage = FallbackStorage.getFallbackStorage(conf)
private[storage] val fallbackStorage = FallbackStorage.getFallbackStorage(conf)
private val maxReplicationFailuresForDecommission =
conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
private val blockSavedOnDecommissionedBlockManagerException =
Expand Down Expand Up @@ -145,7 +145,15 @@ private[storage] class BlockManagerDecommissioner(
// Confirm peer is not the fallback BM ID because fallbackStorage would already
// have been used in the try-block above so there's no point trying again
&& peer != FallbackStorage.FALLBACK_BLOCK_MANAGER_ID) {
fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
try {
fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
} catch {
case e: FileNotFoundException =>
logWarning("Skipping block $shuffleBlockInfo, block deleted.", e)
case NonFatal(e) =>
logError(s"Fallback storage for $shuffleBlockInfo failed", e)
keepRunning = false
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop this ? The existing NonFatal block at the end does this currently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different from the existing NonFatal block because it will retry the failed blocks but the existing one is really a catch-all and leave some blocks not retried.

Copy link
Contributor

@mridulm mridulm Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was not clear from the PR description that this behavior change was being made.
+CC @dongjoon-hyun as you know this part more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't a behavior change. If we remove the added NonFatal block, this section won't get executed. This means there are shuffle blocks that never trigger numMigratedShuffles.incrementAndGet() and the decommissioner will loop forever because the numMigratedShuffles is always less than migratingShuffles.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true?

If we remove the added NonFatal block, this section won't get executed.

We have line 166, doesn't it?

case e: Exception =>
logError(s"Error occurred during migrating $shuffleBlockInfo", e)
keepRunning = false

Do you think you can provide a test case as the evidence for your claim, @ukby1234 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well this exception is thrown in this catch block, so this line 166 won't get executed.
And updated tests "SPARK-45579: abort for other errors" to show this situation.

} else if (e.getCause != null && e.getCause.getMessage != null
&& e.getCause.getMessage
.contains(blockSavedOnDecommissionedBlockManagerException)) {
Expand Down Expand Up @@ -203,7 +211,7 @@ private[storage] class BlockManagerDecommissioner(
@volatile private var stopped = false
@volatile private[storage] var stoppedRDD =
!conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)
@volatile private var stoppedShuffle =
@volatile private[storage] var stoppedShuffle =
!conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)

private val migrationPeers =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
*/
package org.apache.spark.storage

import java.io.{DataOutputStream, File, FileOutputStream, InputStream, IOException}
import java.io.{DataOutputStream, File, FileNotFoundException, FileOutputStream, InputStream, IOException}
import java.nio.file.Files

import scala.concurrent.duration._
import scala.reflect.runtime.{universe => ru}
import scala.util.Random

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable}
import org.mockito.{ArgumentMatchers => mc}
import org.mockito.Mockito.{mock, never, verify, when}
import org.mockito.Mockito.{atLeastOnce, mock, never, verify, when}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils}
Expand Down Expand Up @@ -229,6 +230,151 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("SPARK-45579: ignore deleted files") {
val conf = new SparkConf(false)
.set("spark.app.id", "testId")
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")

val ids = Set((1, 1L, 1))
val bm = mock(classOf[BlockManager])
val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = false)
when(bm.diskBlockManager).thenReturn(dbm)
val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf, bm)
val indexFile = indexShuffleBlockResolver.getIndexFile(1, 1L)
val dataFile = indexShuffleBlockResolver.getDataFile(1, 1L)
indexFile.createNewFile()
dataFile.createNewFile()

val resolver = mock(classOf[IndexShuffleBlockResolver])
when(resolver.getStoredShuffles())
.thenReturn(ids.map(triple => ShuffleBlockInfo(triple._1, triple._2)).toSeq)
ids.foreach { case (shuffleId: Int, mapId: Long, reduceId: Int) =>
when(resolver.getMigrationBlocks(mc.any()))
.thenReturn(List(
(ShuffleIndexBlockId(shuffleId, mapId, reduceId), mock(classOf[ManagedBuffer])),
(ShuffleDataBlockId(shuffleId, mapId, reduceId), mock(classOf[ManagedBuffer]))))
when(resolver.getIndexFile(shuffleId, mapId)).thenReturn(indexFile)
when(resolver.getDataFile(shuffleId, mapId)).thenReturn(dataFile)
}

when(bm.getPeers(mc.any()))
.thenReturn(Seq(BlockManagerId("test", "fake", 7337)))
val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false)
when(bm.master).thenReturn(bmm)
val blockTransferService = mock(classOf[BlockTransferService])
when(blockTransferService.uploadBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(),
mc.any(), mc.any())).thenThrow(new IOException)
when(bm.blockTransferService).thenReturn(blockTransferService)
when(bm.migratableResolver).thenReturn(resolver)
when(bm.getMigratableRDDBlocks()).thenReturn(Seq())

val decommissioner = new BlockManagerDecommissioner(conf, bm)
val mirror = ru.runtimeMirror(decommissioner.getClass.getClassLoader)
val im = mirror.reflect(decommissioner)
val classSymbol = mirror.staticClass("org.apache.spark.storage.BlockManagerDecommissioner")
val passwordTermSymbol =
classSymbol.info
.decl(ru.TermName("fallbackStorage"))
.asTerm
val fallbackStorageField = im.reflectField(passwordTermSymbol)
val mockFallbackStorage = mock(classOf[FallbackStorage])
when(mockFallbackStorage.copy(mc.any(), mc.any()))
.thenAnswer(_ => throw new FileNotFoundException())
fallbackStorageField.set(Some(mockFallbackStorage))

try {
decommissioner.start()
val fallbackStorage = new FallbackStorage(conf)
eventually(timeout(10.second), interval(1.seconds)) {
// uploadBlockSync should not be used, verify that it is not called
verify(blockTransferService, atLeastOnce())
.uploadBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any())

Seq("shuffle_1_1_0.index", "shuffle_1_1_0.data").foreach { filename =>
assert(!fallbackStorage.exists(shuffleId = 1, filename))
}
assert(decommissioner.numMigratedShuffles.get() > 0)
}
} finally {
decommissioner.stop()
}
}

test("SPARK-45579: abort for other errors") {
val conf = new SparkConf(false)
.set("spark.app.id", "testId")
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
.set(STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1000L)
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")

val ids = Set((1, 1L, 1))
val bm = mock(classOf[BlockManager])
val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = false)
when(bm.diskBlockManager).thenReturn(dbm)
val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf, bm)
val indexFile = indexShuffleBlockResolver.getIndexFile(1, 1L)
val dataFile = indexShuffleBlockResolver.getDataFile(1, 1L)
indexFile.createNewFile()
dataFile.createNewFile()

val resolver = mock(classOf[IndexShuffleBlockResolver])
when(resolver.getStoredShuffles())
.thenReturn(ids.map(triple => ShuffleBlockInfo(triple._1, triple._2)).toSeq)
ids.foreach { case (shuffleId: Int, mapId: Long, reduceId: Int) =>
when(resolver.getMigrationBlocks(mc.any()))
.thenReturn(List(
(ShuffleIndexBlockId(shuffleId, mapId, reduceId), mock(classOf[ManagedBuffer])),
(ShuffleDataBlockId(shuffleId, mapId, reduceId), mock(classOf[ManagedBuffer]))))
when(resolver.getIndexFile(shuffleId, mapId)).thenReturn(indexFile)
when(resolver.getDataFile(shuffleId, mapId)).thenReturn(dataFile)
}

when(bm.getPeers(mc.any()))
.thenReturn(Seq(BlockManagerId("test", "fake", 7337)))
val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false)
when(bm.master).thenReturn(bmm)
val blockTransferService = mock(classOf[BlockTransferService])
when(blockTransferService.uploadBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(),
mc.any(), mc.any())).thenThrow(new IOException)
when(bm.blockTransferService).thenReturn(blockTransferService)
when(bm.migratableResolver).thenReturn(resolver)
when(bm.getMigratableRDDBlocks()).thenReturn(Seq())

val decommissioner = new BlockManagerDecommissioner(conf, bm)
val mirror = ru.runtimeMirror(decommissioner.getClass.getClassLoader)
val im = mirror.reflect(decommissioner)
val classSymbol = mirror.staticClass("org.apache.spark.storage.BlockManagerDecommissioner")
val passwordTermSymbol =
classSymbol.info
.decl(ru.TermName("fallbackStorage"))
.asTerm
val fallbackStorageField = im.reflectField(passwordTermSymbol)
val mockFallbackStorage = mock(classOf[FallbackStorage])
when(mockFallbackStorage.copy(mc.any(), mc.any()))
.thenAnswer(_ => throw new RuntimeException())
fallbackStorageField.set(Some(mockFallbackStorage))

try {
decommissioner.start()
val fallbackStorage = new FallbackStorage(conf)
eventually(timeout(10.second), interval(1.seconds)) {
// uploadBlockSync should not be used, verify that it is not called
verify(blockTransferService, atLeastOnce())
.uploadBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any())

Seq("shuffle_1_1_0.index", "shuffle_1_1_0.data").foreach { filename =>
assert(!fallbackStorage.exists(shuffleId = 1, filename))
}
assert(decommissioner.stoppedShuffle)
}
} finally {
decommissioner.stop()
}
}

test("Upload from all decommissioned executors") {
sc = new SparkContext(getSparkConf(2, 2))
withSpark(sc) { sc =>
Expand Down