|
在利用Spark 1.6.0 Streaming处理AWS Kinesis消息时,不能正常停止Spark Streaming应用,异常如下:
有关这个问题的更多讨论参考AWS论坛。
java.lang.IllegalStateException: close() was called on BatchedWriteAheadLog before write request with time 1467910380055 could be fulfilled.
at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:169)
at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:223)
at org.apache.spark.streaming.scheduler.JobGenerator.clearCheckpointData(JobGenerator.scala:285)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:185)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
这个问题是由于BatchedWriteAheadLog功能的引入,具体可以参考SPARK-11141。
屏蔽这个问题,可以在提交应用时指定如下属性:--conf spark.streaming.driver.writeAheadLog.allowBatching=false
注意:checkpoint目录要使用本地HDFS,而不是S3。
|
|