2

I am writing a twitter connector using spark streaming.
I am facing the follwing exception

ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error starting Twitter stream - java.lang.NullPointerException at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:89) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply$mcV$sp(ReceiverSupervisor.scala:159) at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:152) at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:152) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Below is the relavent code snippet.

val config = new twitter4j.conf.ConfigurationBuilder()
    .setOAuthConsumerKey("*********************")
               .setOAuthConsumerSecret("**********************************************")
    .setOAuthAccessToken("****************************************************")
    .setOAuthAccessTokenSecret("**********************************************************")
    .build

val twitter_auth = new TwitterFactory(config)
val a = new twitter4j.auth.OAuthAuthorization(config)
val atwitter : Option[twitter4j.auth.Authorization] =  Some(twitter_auth.getInstance(a).getAuthorization())

val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// ssc.checkpoint("D:/test")
val stream = TwitterUtils.createStream(ssc, atwitter, null, StorageLevel.MEMORY_AND_DISK_2)

val hashTags = stream.map(status => status.getUser().getName())
hashTags.foreachRDD(rdd => {
  rdd.foreach(println)
})

ssc.start()
ssc.awaitTermination()

Can anyone help me to solve this issue?
Thanks :)

1 Answer 1

1

Going to the line where the exception is thrown, we can see:

if (filters.size > 0) {

For that line to throw a NPE, filters has to be null, which is exactly what's happening on the instantiation of the TwitterStream:

val stream = TwitterUtils.createStream(ssc, atwitter, null, StorageLevel.MEMORY_AND_DISK_2)

Being filter a sequence, initialize it with Seq() instead of null.

0

Not the answer you're looking for? Browse other questions tagged or ask your own question.