Spark broadcasted variable returns NullPointerException when run in Amazon EMR cluster -


the variables share via broadcast null in cluster.

my application quite complex, have written small example works flawlessly when run locally, fails in cluster:

package com.gonzalopezzi.bigdata.bicing  import org.apache.spark.broadcast.broadcast import org.apache.spark.rdd.rdd import org.apache.spark.{sparkcontext, sparkconf}  object pruebabroadcast2 extends app {   val conf = new sparkconf().setappname("pruebabroadcast2")   val sc = new sparkcontext(conf)    val arr : array[int] = (6 9).toarray   val broadcasted = sc.broadcast(arr)    val rdd : rdd[int] = sc.parallelize((1 4).toseq, 2) // small integer array [1, 2, 3, 4] paralellized in 2 machines   rdd.flatmap((a : int) => list((a, broadcasted.value(0)))).reducebykey(_+_).collect().foreach(println)  // nullpointerexception in flatmap. broadcasted null  } 

i don't know if problem coding error or configuration issue.

this stacktrace get:

15/07/07 20:55:13 info scheduler.dagscheduler: job 0 failed: collect @ pruebabroadcast2.scala:24, took 0.992297 s exception in thread "main" org.apache.spark.sparkexception: job aborted due stage failure: task 0 in stage 0.0 failed 4 times, recent failure: lost task 0.3 in stage 0.0 (tid 6, ip-172-31-36-49.ec2.internal): java.lang.nullpointerexception     @ com.gonzalopezzi.bigdata.bicing.pruebabroadcast2$$anonfun$2.apply(pruebabroadcast2.scala:24)     @ com.gonzalopezzi.bigdata.bicing.pruebabroadcast2$$anonfun$2.apply(pruebabroadcast2.scala:24)     @ scala.collection.iterator$$anon$13.hasnext(iterator.scala:371)     @ org.apache.spark.util.collection.externalsorter.insertall(externalsorter.scala:202)     @ org.apache.spark.shuffle.sort.sortshufflewriter.write(sortshufflewriter.scala:56)     @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:68)     @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:41)     @ org.apache.spark.scheduler.task.run(task.scala:64)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:203)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615)     @ java.lang.thread.run(thread.java:745)  driver stacktrace:     @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1204)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1193)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1192)     @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59)     @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47)     @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1192)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:693)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:693)     @ scala.option.foreach(option.scala:236)     @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:693)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1393)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1354)     @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48) command exiting ret '1' 

can me fix this? @ least, can tell me if see strange in code? if think code ok, please tell me, mean problem in configuration of cluster.

thanks in advance.

finally got working.

it doesn't work declaring object this:

object myobject extends app { 

but works, if declare object main function:

object myobject {     def main (args : array[string]) {     /* ... */     } } 

so, short example in question works if rewrite way:

object pruebabroadcast2 {    def main (args: array[string]) {     val conf = new sparkconf().setappname("pruebabroadcast2")     val sc = new sparkcontext(conf)      val arr : array[int] = (6 9).toarray     val broadcasted = sc.broadcast(arr)      val rdd : rdd[int] = sc.parallelize((1 4).toseq, 2)      rdd.flatmap((a : int) => list((a, broadcasted.value(0)))).reducebykey(_+_).collect().foreach(println)   } } 

this problem seems related bug: https://issues.apache.org/jira/browse/spark-4170


Comments

Popular posts from this blog

python - No exponential form of the z-axis in matplotlib-3D-plots -

php - Best Light server (Linux + Web server + Database) for Raspberry Pi -

c# - "Newtonsoft.Json.JsonSerializationException unable to find constructor to use for types" error when deserializing class -