0

I have a sql dataframe named 'all_tweets' which has only one column text.

all_tweets.show(5) +--------------------+ | text| +--------------------+ |@always_nidhi @Yo...| |@OnlyDancers Bell...| |Taste of Iceland!...| |Serasi ade haha @...| |@BeezyDH_ it's li...| +--------------------+ only showing top 5 rows 

Now, I am converting this dataframe to an RDD to perform some transformations and action on it.

[I] all_twt_rdd = all_tweets.rdd [I] type(all_twt_rdd) [O] pyspark.rdd.RDD [I] all_twt_rdd.first() [O] Row(text=u'@always_nidhi @YouTube no i dnt understand bt i loved the music nd their dance awesome all the song of this mve is rocking') 

Now, I am trying to run flatMap on it to split the sentence in to words.

[I] user_cnt = all_twt_rdd.flatMap(lambda line: line.split(" ")).take(5) 

When I run the above, I receive below error. RDD has data. But I don't understand why it errored. Doesn't pipelined RDD inherit the functions of an RDD?

[O] Py4JJavaError Traceback (most recent call last) <ipython-input-101-31527190732e> in <module>() ----> 1 user_cnt = all_twt_rdd.flatMap(lambda line: line.split(" ")).take(2) /home/notebook/spark-1.6.0-bin-hadoop2.6/python/pyspark/rdd.pyc in take(self, num) 1295 1296 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> 1297 res = self.context.runJob(self, takeUpToNumLeft, p) 1298 1299 items += res /home/notebook/spark-1.6.0-bin-hadoop2.6/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal) 937 # SparkContext#runJob. 938 mappedRDD = rdd.mapPartitions(partitionFunc) --> 939 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 940 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 941 /home/notebook/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 811 answer = self.gateway_client.send_command(command) 812 return_value = get_return_value( --> 813 answer, self.gateway_client, self.target_id, self.name) 814 815 for temp_arg in temp_args: /home/notebook/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.pyc in deco(*a, **kw) 43 def deco(*a, **kw): 44 try: ---> 45 return f(*a, **kw) 46 except py4j.protocol.Py4JJavaError as e: 47 s = e.java_exception.toString() /home/notebook/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 306 raise Py4JJavaError( 307 "An error occurred while calling {0}{1}{2}.\n". --> 308 format(target_id, ".", name), value) 309 else: 310 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 1 times, most recent failure: Lost task 0.0 in stage 50.0 (TID 456, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/notebook/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/home/notebook/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/home/notebook/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/home/notebook/spark-1.6.0-bin-hadoop2.6/python/pyspark/rdd.py", line 1293, in takeUpToNumLeft yield next(iterator) File "<ipython-input-101-31527190732e>", line 1, in <lambda> File "/home/notebook/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/types.py", line 1272, in __getattr__ raise AttributeError(item) AttributeError: split at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393) at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/notebook/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/home/notebook/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/home/notebook/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/home/notebook/spark-1.6.0-bin-hadoop2.6/python/pyspark/rdd.py", line 1293, in takeUpToNumLeft yield next(iterator) File "<ipython-input-101-31527190732e>", line 1, in <lambda> File "/home/notebook/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/types.py", line 1272, in __getattr__ raise AttributeError(item) AttributeError: split at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ... 1 more 
2
  • You're calling split on a Row object. But Row has no method defined split. Commented Mar 30, 2016 at 3:41
  • Try doing lambda line: line.text.split(""). Commented Mar 30, 2016 at 3:43

1 Answer 1

2

The problem is that you're calling .split() on a Row, not a string. Row objects have no .split() method - only strings do. You want to split its text attribute, so call it explicitly:

user_cnt = all_twt_rdd.flatMap(lambda line: line.text.split(" ")).take(5) 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.