0

I'm trying to get tweets with IntelliJ and SBT, I'm using Spark 2.4.4, Scala 2.11.12, jdk1.8.0_231. The connection is local, I am not using cluster.

However I get this error, and I can't fix it, I have changed the dependencies in several ways, but I don't know what I'm doing wrong ... does anyone have any ideas that can help me find the solution?

package com.sparkStreaming.twitter

import org.apache.spark
import org.apache.spark.rdd._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StringType, StructField}
import org.apache.spark.streaming
import org.apache.spark.streaming.twitter.TwitterUtils

import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

import org.apache.spark.streaming._
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Tweets {

  def main(args: Array[String]) {

      //Configure Twitter credentials
      val consumerKey = "******"
      val consumerSecret = "******"
      val accessToken = "******"
      val accessTokenSecret = "******"

      System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
      System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
      System.setProperty("twitter4j.oauth.accessToken", accessToken)
      System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

      val conf = new SparkConf().setAppName("Tweets")
      conf.setMaster("local[2]")

      //Window and stream parameter to process
      var timeSec = 10

      //Filter tweets
      var filterTag = Array("@realmadrid,$SAN") //@businessinsider,

      //Create streaming context
      val ssc = new StreamingContext(conf, Seconds(timeSec)) //creo el streaming context

      val tweets = TwitterUtils.createStream(ssc, None, filterTag) // creo el TwitterUtils

      val rowRDD = tweets.map(rddTweet => rddTweet.getText)
      rowRDD.print()

      ssc.start()
      ssc.awaitTermination()
  }
}

This is the SBT

// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.4"

// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"

// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.4" //% "provided"

// https://mvnrepository.com/artifact/org.twitter4j/twitter4j-core
libraryDependencies += "org.twitter4j" % "twitter4j-core" % "4.0.7"

// https://mvnrepository.com/artifact/org.twitter4j/twitter4j-stream
libraryDependencies += "org.twitter4j" % "twitter4j-stream" % "4.0.7"

// https://mvnrepository.com/artifact/org.apache.bahir/spark-streaming-twitter
libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % "2.1.0"

This is the error:

20/02/06 17:52:38 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodError: twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)V
at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:72)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:601)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:591)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/02/06 17:52:38 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NoSuchMethodError: 
twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)V
at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:72)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:601)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:591)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

20/02/06 17:52:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
20/02/06 17:52:38 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
20/02/06 17:52:38 INFO TaskSchedulerImpl: Cancelling stage 0
20/02/06 17:52:38 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage cancelled
20/02/06 17:52:38 INFO DAGScheduler: ResultStage 0 (start at Tweets.scala:80) failed in 0,248 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NoSuchMethodError: twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)V
at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:72)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:601)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:591)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
20/02/06 17:52:38 ERROR ReceiverTracker: Receiver has been stopped. Try to restart it.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NoSuchMethodError: twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)V
at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:72)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:601)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:591)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:2212)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
2

2 Answers 2

1

The spark-streaming-twitter 2.1.0 artifact depends on a different version of twitter4j, as evidenced by its pom.xml, which you can find on the Maven site.

<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>4.0.4</version>
</dependency>

It's probably not backwards compatible, so moving it to the newer version won't work (SBT would generally use the highest version that any dependency uses, if you don't specify it). Remove your twitter4j specifications and try again.

0
0

I got it with the dependencies:

{

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>1.5.2</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>1.5.2</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.4.4</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-twitter_2.11</artifactId>
  <version>1.6.2</version>
</dependency>

}

These posts helped me:

Spark Twitter Streaming exception : (org.apache.spark.Logging) classnotfound

Dependencies for Spark-Streaming and Twiter-Streaming in SBT

Not the answer you're looking for? Browse other questions tagged or ask your own question.