File tree Expand file tree Collapse file tree 3 files changed +24
-32
lines changed
main/scala/org/apache/kyuubi/session
test/scala/org/apache/kyuubi/server/api/v1 Expand file tree Collapse file tree 3 files changed +24
-32
lines changed Original file line number Diff line number Diff line change @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
23
23
24
24
import org .apache .kyuubi .client .util .BatchUtils ._
25
25
import org .apache .kyuubi .config .{KyuubiConf , KyuubiReservedKeys }
26
- import org .apache .kyuubi .config .KyuubiReservedKeys .KYUUBI_BATCH_PRIORITY
26
+ import org .apache .kyuubi .config .KyuubiReservedKeys .{ KYUUBI_BATCH_PRIORITY , KYUUBI_SESSION_CONNECTION_URL_KEY }
27
27
import org .apache .kyuubi .engine .KyuubiApplicationManager
28
28
import org .apache .kyuubi .engine .spark .SparkProcessBuilder
29
29
import org .apache .kyuubi .events .{EventBus , KyuubiSessionEvent }
@@ -54,6 +54,8 @@ class KyuubiBatchSession(
54
54
conf,
55
55
sessionManager) {
56
56
override val sessionType : SessionType = SessionType .BATCH
57
+ override val connectionUrl : String =
58
+ metadata.map(_.kyuubiInstance).getOrElse(conf.getOrElse(KYUUBI_SESSION_CONNECTION_URL_KEY , " " ))
57
59
58
60
override val handle : SessionHandle = {
59
61
val batchId = metadata.map(_.identifier).getOrElse(conf(KYUUBI_BATCH_ID_KEY ))
Original file line number Diff line number Diff line change @@ -312,18 +312,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
312
312
kyuubiInstance,
313
313
0 ,
314
314
Int .MaxValue ).map { metadata =>
315
- createBatchSession(
316
- metadata.username,
317
- " anonymous" ,
318
- metadata.ipAddress,
319
- metadata.requestConf,
320
- metadata.engineType,
321
- Option (metadata.requestName),
322
- metadata.resource,
323
- metadata.className,
324
- metadata.requestArgs,
325
- Some (metadata),
326
- fromRecovery = true )
315
+ createBatchSessionFromRecovery(metadata)
327
316
}).getOrElse(Seq .empty)
328
317
}
329
318
}
@@ -341,25 +330,26 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
341
330
getBatchMetadata(batchId)
342
331
.filter(m =>
343
332
m.kyuubiInstance == kyuubiInstance && batchStatesToRecovery.contains(m.opState))
344
- .flatMap { metadata =>
345
- Some (
346
- createBatchSession(
347
- metadata.username,
348
- " anonymous" ,
349
- metadata.ipAddress,
350
- metadata.requestConf,
351
- metadata.engineType,
352
- Option (metadata.requestName),
353
- metadata.resource,
354
- metadata.className,
355
- metadata.requestArgs,
356
- Some (metadata),
357
- fromRecovery = true ))
358
- }
333
+ .flatMap { metadata => Some (createBatchSessionFromRecovery(metadata)) }
359
334
}
360
335
}
361
336
}
362
337
338
+ private def createBatchSessionFromRecovery (metadata : Metadata ): KyuubiBatchSession = {
339
+ createBatchSession(
340
+ metadata.username,
341
+ " anonymous" ,
342
+ metadata.ipAddress,
343
+ metadata.requestConf,
344
+ metadata.engineType,
345
+ Option (metadata.requestName),
346
+ metadata.resource,
347
+ metadata.className,
348
+ metadata.requestArgs,
349
+ Some (metadata),
350
+ fromRecovery = true )
351
+ }
352
+
363
353
def reassignBatchSessions (
364
354
kyuubiInstance : String ,
365
355
newKyuubiInstance : String ): Seq [String ] = {
Original file line number Diff line number Diff line change @@ -714,12 +714,12 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
714
714
assert(session2.createTime === batchMetadata2.createTime)
715
715
716
716
eventually(timeout(10 .seconds)) {
717
- assert( session1.batchJobSubmissionOp.getStatus.state === OperationState . RUNNING ||
718
- session1.batchJobSubmissionOp.getStatus.state === OperationState .FINISHED )
717
+ val batch1State = session1.batchJobSubmissionOp.getStatus.state
718
+ assert(batch1State === OperationState .RUNNING || OperationState .isTerminal(batch1State) )
719
719
assert(session1.batchJobSubmissionOp.builder.processLaunched)
720
720
721
- assert( session2.batchJobSubmissionOp.getStatus.state === OperationState . RUNNING ||
722
- session2.batchJobSubmissionOp.getStatus.state === OperationState .FINISHED )
721
+ val batch2State = session2.batchJobSubmissionOp.getStatus.state
722
+ assert(batch2State === OperationState .RUNNING || OperationState .isTerminal(batch2State) )
723
723
assert(! session2.batchJobSubmissionOp.builder.processLaunched)
724
724
}
725
725
You can’t perform that action at this time.
0 commit comments