2

I am running a program involving spark parallelization multiple times. The program runs ok for the very first few iterations but crashes due to memory issue.I am using Spark 2.2.0 with Python 2.7 and I am running my tests on AWS EC2 with 30g of memory.

Below is My Spark Setting:

conf = pyspark.SparkConf() conf.set("spark.executor.memory", '4g') conf.set('spark.executor.cores', '16') conf.set('spark.cores.max', '16') conf.set("spark.driver.memory",'4g') conf.setMaster("local[*]") 

and here is my error log:

 Traceback (most recent call last): File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 1982, in wsgi_app response = self.full_dispatch_request() File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 1614, in full_dispatch_request rv = self.handle_user_exception(e) File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 1517, in handle_user_exception reraise(exc_type, exc_value, tb) File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 1612, in full_dispatch_request rv = self.dispatch_request() File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 1598, in dispatch_request return self.view_functions[rule.endpoint](**req.view_args) File "C:/Users/Administrator/Desktop/Flex_Api_Post/ flex_api_post_func_spark_setup.py", line 152, in travel_time_est count = ssc.parallelize(input_json).map(lambda j: flex_func(j)).collect() File "C:\ProgramData\Anaconda2\lib\site-packages\pyspark\rdd.py", line 809, in collect port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) File "C:\ProgramData\Anaconda2\lib\site-packages\py4j\java_gateway.py", line 1160, in __call__ answer, self.gateway_client, self.target_id, self.name) File "C:\ProgramData\Anaconda2\lib\site-packages\py4j\protocol.py", line 320, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 13.0 failed 1 times, most recent failure: Lost task 7.0 in stage 13.0 (TID 215, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\opt\spark\spark-2.2.0-bin- hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 166, in main File "C:\opt\spark\spark-2.2.0-bin- hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 57, in read_command File "C:\opt\spark\spark-2.2.0-bin- hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 454, in loads return pickle.loads(obj) MemoryError at org.apache.spark.api.python.PythonRunner$$anon$1. read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$$anon$1.<init> (PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler $DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499) at org.apache.spark.scheduler.DAGScheduler $$anonfun$abortStage$1.apply(DAGScheduler.scala:1487) at org.apache.spark.scheduler. DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486) at scala.collection. mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler. DAGScheduler.abortStage(DAGScheduler.scala:1486) at org.apache.spark.scheduler. DAGScheduler$$anonfun$handleTaskSetFailed$1.apply (DAGScheduler.scala:814) at org.apache.spark.scheduler. DAGScheduler$$anonfun$handleTaskSetFailed$1.apply (DAGScheduler.scala:814) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler. DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) at org.apache.spark.scheduler. DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714) at org.apache.spark.scheduler. DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669) at org.apache.spark.scheduler. DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler. runJob(DAGScheduler.scala:630) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936) at org.apache.spark.rdd.RDDOperationScope$. withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$. withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.collect(RDD.scala:935) at org.apache.spark.api.python.PythonRDD$. collectAndServe(PythonRDD.scala:458) at org.apache.spark.api.python.PythonRDD. collectAndServe(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Unknown Source) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "C:\opt\spark\spark-2.2.0-bin- hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 166, in main File "C:\opt\spark\spark-2.2.0-bin- hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 57, in read_command File "C:\opt\spark\spark-2.2.0-bin- hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 454, in loads return pickle.loads(obj) MemoryError at org.apache.spark.api.python. PythonRunner$$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$$anon$1.<init> (PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 1 more 

1 Answer 1

2

Let's explain a little bit how PySpark works.

Using pyspark with 16 cores for each worker, you are requesting Spark to start in parallel 16 instances of Python for each JVM worker. You can see in the image below:

enter image description here

So according to I can check here about your configuration, you are requesting a worker with 4Gb each, and each one will run with 16 cores. So this will create a structure with 1 JVM that will create 16 pipes, and 16 python instances that will run in parallel. This error that you are facing is about not enough memory for the Python to run.

Maybe you need to reduce the number of the cores per worker and it can handle the process, or you can add more memory.

For more details check here.

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.