Skip to content

Commit

Permalink
[LIVY-977][SERVER][CONF] Livy can not be started if HDFS is still in …
Browse files Browse the repository at this point in the history
…safe mode

Added safe mode check to implement safe mode
  • Loading branch information
Rajshekhar Muchandi committed Jan 24, 2024
1 parent 86fc823 commit 10ee39f
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 1 deletion.
5 changes: 5 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,8 @@
# Enable to allow custom classpath by proxy user in cluster mode
# The below configuration parameter is disabled by default.
# livy.server.session.allow-custom-classpath = true

# value specifies interval to check safe mode in hdfs filesystem
# livy.server.hdfs.safe-mode.interval = 5
# value specifies max attempts to retry when safe mode is ON in hdfs filesystem
# livy.server.hdfs.safe-mode.max.retry.attempts = 10
6 changes: 6 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ object LivyConf {
// how often to check livy session leakage
val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check-interval", "60s")

// value specifies interval to check safe mode in hdfs filesystem
val HDFS_SAFE_MODE_INTERVAL_IN_SECONDS = Entry("livy.server.hdfs.safe-mode.interval", 5)

// value specifies max attempts to retry when safe mode is ON in hdfs filesystem
val HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS = Entry("livy.server.hdfs.safe-mode.max.retry.attempts", 12)

// Whether session timeout should be checked, by default it will be checked, which means inactive
// session will be stopped after "livy.server.session.timeout"
val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.livy.server.recovery
import java.io.{FileNotFoundException, IOException}
import java.net.URI
import java.util
import java.util.concurrent.TimeUnit

import scala.reflect.ClassTag
import scala.util.control.NonFatal
Expand All @@ -28,6 +29,8 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.HdfsConstants

import org.apache.livy.{LivyConf, Logging}
import org.apache.livy.Utils.usingResource
Expand All @@ -42,6 +45,8 @@ class FileSystemStateStore(
this(livyConf, None)
}

private val fs = FileSystem.newInstance(livyConf.hadoopConf)

private val fsUri = {
val fsPath = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
require(fsPath != null && !fsPath.isEmpty,
Expand All @@ -57,6 +62,8 @@ class FileSystemStateStore(
// Only Livy user should have access to state files.
fileContext.setUMask(new FsPermission("077"))

startSafeModeCheck()

// Create state store dir if it doesn't exist.
val stateStorePath = absPath(".")
try {
Expand Down Expand Up @@ -134,4 +141,42 @@ class FileSystemStateStore(
}

private def absPath(key: String): Path = new Path(fsUri.getPath(), key)

/**
* Checks whether HDFS is in safe mode.
*
* Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons
* makes it more public than not.
*/
def isFsInSafeMode(): Boolean = fs match {
case dfs: DistributedFileSystem =>
isFsInSafeMode(dfs)
case _ =>
false
}

def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = {
/* true to check only for Active NNs status */
dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET, true)
}

def startSafeModeCheck(): Unit = {
// Cannot probe anything while the FS is in safe mode,
// so wait for seconds which is configurable
val safeModeInterval = livyConf.getInt(LivyConf.HDFS_SAFE_MODE_INTERVAL_IN_SECONDS)
val safeModeMaxRetryAttempts = livyConf.getInt(LivyConf.HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS)
for (retryAttempts <- 0 to safeModeMaxRetryAttempts if isFsInSafeMode()) {
info("HDFS is still in safe mode. Waiting...")
Thread.sleep(TimeUnit.SECONDS.toMillis(safeModeInterval))
}

// if hdfs is still in safe mode
// even after max retry attempts
// then throw IllegalStateException
if (isFsInSafeMode()) {
throw new IllegalStateException("Reached max retry attempts for safe mode check " +
"in hdfs file system")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import java.util
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.hamcrest.Description
import org.mockito.ArgumentMatcher
import org.mockito.Matchers.{any, anyInt, argThat, eq => equal}
import org.mockito.Mockito.{atLeastOnce, verify, when}
import org.mockito.Mockito.{atLeastOnce, spy, verify, when}
import org.mockito.internal.matchers.Equals
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
Expand All @@ -53,6 +54,14 @@ class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
conf
}

def makeConfWithTwoSeconds(): LivyConf = {
val conf = new LivyConf()
conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "file://tmp/")
conf.set(LivyConf.HDFS_SAFE_MODE_INTERVAL_IN_SECONDS, new Integer(2))
conf.set(LivyConf.HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS, new Integer(2))
conf
}

def mockFileContext(rootDirPermission: String): FileContext = {
val fileContext = mock[FileContext]
val rootDirStatus = mock[FileStatus]
Expand Down Expand Up @@ -188,5 +197,29 @@ class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {

verify(fileContext).delete(pathEq("/key"), equal(false))
}

it("set safe mode ON and wait") {
val fileContext = mockFileContext("700")
val provider = spy(new FileSystemStateStore(makeConf(), Some(fileContext)))
val dfs = mock[DistributedFileSystem]
provider.isFsInSafeMode()
assert(!provider.isFsInSafeMode(dfs))
}

it("provider throws IllegalStateException when reaches 'N' " +
"max attempts to access HDFS file system") {
val provider = new SafeModeTestProvider(makeConfWithTwoSeconds(),
Some(mockFileContext("700")))
provider.inSafeMode = true
intercept[IllegalStateException](provider.startSafeModeCheck())
}
}

private class SafeModeTestProvider(conf: LivyConf, context: Option[FileContext])
extends FileSystemStateStore(conf, context) {
var inSafeMode = true

override def isFsInSafeMode(): Boolean = inSafeMode
}

}

0 comments on commit 10ee39f

Please sign in to comment.