使用一个SparkContext时,可以针对不同的Job进行分组提交和取消:
- 分组提交任务
// 提交任务
private SparkContext sc;
private SQLContext sqlc;
sc.setJobGroup(jobGroup, description, true);
// 具体提交任务
sqlc.sql(st);
val rdd = ***
rdd.map***
sc.clearJobGroup();
为了防止其他任务被取消,确保任务提交完成后,调用sc.clearJobGroup()
清除任务分组。
- 取消已经提交的任务
// 取消任务
sc.cancelJobGroup(jobGroup)
取消任务后,Spark Driver端将会抛出以下异常,但是不影响运行:
18/01/17 11:54:30 INFO YarnScheduler: Cancelling stage 218
18/01/17 11:54:30 INFO YarnScheduler: Removed TaskSet 218.0, whose tasks have all completed, from pool default
18/01/17 11:54:30 INFO YarnScheduler: Stage 218 was cancelled
18/01/17 11:54:30 INFO DAGScheduler: ResultStage 218 (text at QueryIndexReader.scala:26) failed in 14.400 s due to Job 204 cancelled part of cancelled job group task-86
18/01/17 11:54:30 INFO YarnScheduler: Cancelling stage 214
18/01/17 11:54:30 INFO YarnScheduler: Stage 214 was cancelled
18/01/17 11:54:30 INFO DAGScheduler: ShuffleMapStage 214 (text at WorkerActor.scala:96) failed in 16.903 s due to Job 202 cancelled part of cancelled job group task-86
18/01/17 11:54:30 INFO DAGScheduler: Job 204 failed: text at QueryIndexReader.scala:26, took 14.404794 s
18/01/17 11:54:30 INFO YarnScheduler: Cancelling stage 217
18/01/17 11:54:30 INFO DAGScheduler: Job 202 failed: text at WorkerActor.scala:96, took 17.523323 s
18/01/17 11:54:30 ERROR WorkerActor: Here should not throw any exception throwable:org.apache.spark.SparkException: Job 204 cancelled part of cancelled job group task-86
18/01/17 11:54:30 INFO YarnScheduler: Removed TaskSet 217.0, whose tasks have all completed, from pool default
18/01/17 11:54:30 INFO YarnScheduler: Stage 217 was cancelled
18/01/17 11:54:30 INFO DAGScheduler: ResultStage 217 (text at QueryIndexReader.scala:26) failed in 14.464 s due to Job 203 cancelled part of cancelled job group task-86
18/01/17 11:54:30 INFO DAGScheduler: Job 203 failed: text at QueryIndexReader.scala:26, took 14.467532 s
18/01/17 11:54:30 ERROR WorkerActor: Here should not throw any exception throwable:org.apache.spark.SparkException: Job 203 cancelled part of cancelled job group task-86
18/01/17 11:54:30 ERROR FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Job 202 cancelled part of cancelled job group task-86
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1375)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply$mcVI$sp(DAGScheduler.scala:788)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply(DAGScheduler.scala:788)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply(DAGScheduler.scala:788)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at org.apache.spark.scheduler.DAGScheduler.handleJobGroupCancelled(DAGScheduler.scala:788)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1625)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1920)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1933)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)