Spark broadcasted variable returns NullPointerException when run in Amazon EMR cluster -
the variables share via broadcast null in cluster.
my application quite complex, have written small example works flawlessly when run locally, fails in cluster:
package com.gonzalopezzi.bigdata.bicing import org.apache.spark.broadcast.broadcast import org.apache.spark.rdd.rdd import org.apache.spark.{sparkcontext, sparkconf} object pruebabroadcast2 extends app { val conf = new sparkconf().setappname("pruebabroadcast2") val sc = new sparkcontext(conf) val arr : array[int] = (6 9).toarray val broadcasted = sc.broadcast(arr) val rdd : rdd[int] = sc.parallelize((1 4).toseq, 2) // small integer array [1, 2, 3, 4] paralellized in 2 machines rdd.flatmap((a : int) => list((a, broadcasted.value(0)))).reducebykey(_+_).collect().foreach(println) // nullpointerexception in flatmap. broadcasted null }
i don't know if problem coding error or configuration issue.
this stacktrace get:
15/07/07 20:55:13 info scheduler.dagscheduler: job 0 failed: collect @ pruebabroadcast2.scala:24, took 0.992297 s exception in thread "main" org.apache.spark.sparkexception: job aborted due stage failure: task 0 in stage 0.0 failed 4 times, recent failure: lost task 0.3 in stage 0.0 (tid 6, ip-172-31-36-49.ec2.internal): java.lang.nullpointerexception @ com.gonzalopezzi.bigdata.bicing.pruebabroadcast2$$anonfun$2.apply(pruebabroadcast2.scala:24) @ com.gonzalopezzi.bigdata.bicing.pruebabroadcast2$$anonfun$2.apply(pruebabroadcast2.scala:24) @ scala.collection.iterator$$anon$13.hasnext(iterator.scala:371) @ org.apache.spark.util.collection.externalsorter.insertall(externalsorter.scala:202) @ org.apache.spark.shuffle.sort.sortshufflewriter.write(sortshufflewriter.scala:56) @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:68) @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:41) @ org.apache.spark.scheduler.task.run(task.scala:64) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:203) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:745) driver stacktrace: @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1204) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1193) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1192) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47) @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1192) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:693) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:693) @ scala.option.foreach(option.scala:236) @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:693) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1393) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1354) @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48) command exiting ret '1'
can me fix this? @ least, can tell me if see strange in code? if think code ok, please tell me, mean problem in configuration of cluster.
thanks in advance.
finally got working.
it doesn't work declaring object this:
object myobject extends app {
but works, if declare object main function:
object myobject { def main (args : array[string]) { /* ... */ } }
so, short example in question works if rewrite way:
object pruebabroadcast2 { def main (args: array[string]) { val conf = new sparkconf().setappname("pruebabroadcast2") val sc = new sparkcontext(conf) val arr : array[int] = (6 9).toarray val broadcasted = sc.broadcast(arr) val rdd : rdd[int] = sc.parallelize((1 4).toseq, 2) rdd.flatmap((a : int) => list((a, broadcasted.value(0)))).reducebykey(_+_).collect().foreach(println) } }
this problem seems related bug: https://issues.apache.org/jira/browse/spark-4170
Comments
Post a Comment