diff --git a/integration-test/src/thriftserver/test/scala/org/apache/livy/test/JdbcIT.scala b/integration-test/src/thriftserver/test/scala/org/apache/livy/test/JdbcIT.scala index 929671cda..a251942e6 100644 --- a/integration-test/src/thriftserver/test/scala/org/apache/livy/test/JdbcIT.scala +++ b/integration-test/src/thriftserver/test/scala/org/apache/livy/test/JdbcIT.scala @@ -19,6 +19,7 @@ package org.apache.livy.test import java.sql.Date +import org.apache.livy.sessions.SessionState import org.apache.livy.test.framework.BaseThriftIntegrationTestSuite class JdbcIT extends BaseThriftIntegrationTestSuite { @@ -61,6 +62,14 @@ class JdbcIT extends BaseThriftIntegrationTestSuite { assert(resultSet.getString(3) == "{1:\"a\",2:\"b\"}") assert(!resultSet.next()) } + + checkQuery(c, "DESC LIVY SESSION") { resultSet => + resultSet.next() + assert(resultSet.getString("id").toInt >= 0) + assert(resultSet.getString("appId").startsWith("application_")) + assert(resultSet.getString("state").nonEmpty) + assert(resultSet.getString("logs").contains(resultSet.getString("appId"))) + } } } } diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala index dd551c20b..f73fe4f7b 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala @@ -155,7 +155,7 @@ class SparkYarnApp private[utils] ( // We cannot kill the YARN app without the app id. // There's a chance the YARN app hasn't been submitted during a livy-server failure. // We don't want a stuck session that can't be deleted. Emit a warning and move on. - case _: TimeoutException | _: InterruptedException => + case _: TimeoutException | _: InterruptedException | _: IllegalStateException => warn("Deleting a session while its YARN application is not found.") yarnAppMonitorThread.interrupt() } finally { diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala index eb1dd212d..cc657adec 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala @@ -18,7 +18,7 @@ package org.apache.livy.thriftserver import java.util -import java.util.{Map => JMap} +import java.util.{Locale, Map => JMap} import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable @@ -106,6 +106,14 @@ class LivyOperationManager(val livyThriftSessionManager: LivyThriftSessionManage op } + def newDescLivySessionOperation(sessionHandle: SessionHandle, + statement: String, sessionManager: LivyThriftSessionManager): Operation = { + val op = new DescLivySessionOperation(sessionHandle, sessionManager) + addOperation(op, sessionHandle) + debug(s"Create DescLivySessionOperation for $statement with session=$sessionHandle") + op + } + def getOperationLogRowSet( opHandle: OperationHandle, orientation: FetchOrientation, @@ -136,9 +144,15 @@ class LivyOperationManager(val livyThriftSessionManager: LivyThriftSessionManage confOverlay: util.Map[String, String], runAsync: Boolean, queryTimeout: Long): OperationHandle = { - executeOperation(sessionHandle, { - newExecuteStatementOperation(sessionHandle, statement, confOverlay, runAsync, queryTimeout) - }) + info(s"execute statement $statement") + val descLivySessionRegex = "^(DESC|DESCRIBE)\\s+LIVY\\s+SESSION$".r + val operationCreator = statement.trim.toUpperCase(Locale.ENGLISH) match { + case descLivySessionRegex(_*) => + newDescLivySessionOperation(sessionHandle, statement, livyThriftSessionManager) + case _ => + newExecuteStatementOperation(sessionHandle, statement, confOverlay, runAsync, queryTimeout) + } + executeOperation(sessionHandle, operationCreator) } @throws[HiveSQLException] diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala index 11294db83..d53135039 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala @@ -221,14 +221,15 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC delegationToken: String): SessionHandle = { val sessionHandle = new SessionHandle(protocol) incrementConnections(username, ipAddress, SessionInfo.getForwardedAddresses) + val nextId = server.livySessionManager.nextId() sessionInfo.put(sessionHandle, - new SessionInfo(username, ipAddress, SessionInfo.getForwardedAddresses, protocol)) + new SessionInfo(nextId, username, ipAddress, SessionInfo.getForwardedAddresses, protocol)) val (initStatements, createInteractiveRequest, sessionId) = LivyThriftSessionManager.processSessionConf(sessionConf, supportUseDatabase) val createLivySession = () => { createInteractiveRequest.kind = Spark val newSession = InteractiveSession.create( - server.livySessionManager.nextId(), + nextId, createInteractiveRequest.name, username, None, diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/SessionInfo.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/SessionInfo.scala index 4ebf86742..1a633d5c1 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/SessionInfo.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/SessionInfo.scala @@ -23,7 +23,9 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.livy.Logging -case class SessionInfo(username: String, +case class SessionInfo( + sessionId: Int, + username: String, ipAddress: String, forwardedAddresses: util.List[String], protocolVersion: TProtocolVersion) { diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/DescLivySessionOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/DescLivySessionOperation.scala new file mode 100644 index 000000000..7ca3226fc --- /dev/null +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/operation/DescLivySessionOperation.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.thriftserver.operation + +import org.apache.hive.service.cli.{FetchOrientation, OperationState, OperationType, SessionHandle} + +import org.apache.livy.thriftserver.LivyThriftSessionManager +import org.apache.livy.thriftserver.serde.ThriftResultSet +import org.apache.livy.thriftserver.types.{BasicDataType, Field, Schema} + +class DescLivySessionOperation(sessionHandle: SessionHandle, + sessionManager: LivyThriftSessionManager) + extends Operation(sessionHandle, OperationType.EXECUTE_STATEMENT) { + + private var hasNext: Boolean = true + + override protected def runInternal(): Unit = { + setState(OperationState.PENDING) + setHasResultSet(true) // avoid no resultset for async run + setState(OperationState.RUNNING) + setState(OperationState.FINISHED) + } + + override def cancel(stateAfterCancel: OperationState): Unit = { + setState(OperationState.CANCELED) + } + + override def close(): Unit = { + setState(OperationState.CLOSED) + } + + override def getResultSetSchema: Schema = { + assertState(Seq(OperationState.FINISHED)) + DescLivySessionOperation.SCHEMA + } + + override def getNextRowSet(orientation: FetchOrientation, + maxRows: Long): ThriftResultSet = { + validateFetchOrientation(orientation) + assertState(Seq(OperationState.FINISHED)) + setHasResultSet(true) + + val sessionVar = ThriftResultSet(this.getResultSetSchema, + sessionManager.getSessionInfo(sessionHandle).protocolVersion) + + val session = try { + sessionManager.getLivySession(sessionHandle) + } catch { + case e: Exception => + val sessionInfo = sessionManager.getSessionInfo(sessionHandle) + if (sessionManager.server.livySessionManager.get(sessionInfo.sessionId).isDefined) { + sessionManager.server.livySessionManager.get(sessionInfo.sessionId).get + } else { + error(s"Can't find session which id is ${sessionInfo.sessionId} in sessionManager.") + throw e + } + } + + if (hasNext) { + sessionVar.addRow( + Array( + session.id.toString, + session.appId.orNull, + session.state.state, + session.logLines().mkString("\n") + ) + ) + hasNext = false + } + sessionVar + } +} + +object DescLivySessionOperation { + val SCHEMA: Schema = Schema( + Field("id", BasicDataType("string"), "Livy session id."), + Field("appId", BasicDataType("string"), "Spark application id."), + Field("state", BasicDataType("string"), "Livy session state"), + Field("logs", BasicDataType("string"), "Spark application logs.") + ) +}