-1

i'm working with PySpark and trying to send a partitioned DataFrame to Kafka using the confluent-kafka library's Producer. However, I'm encountering a serialization problem with the Kafka producer on the Spark executors

Here is my code :

broadcast_config = spark.sparkContext.broadcast((kafka_broker, kafka_topic)) def send_partition_to_kafka(partition): kafka_broker, kafka_topic = broadcast_config.value for row in partition: producer = Producer(bootstrap_servers=kafka_broker, value_serializer=lambda v: json.dumps(v).encode('utf-8')) producer.send(kafka_topic, value=row.asDict()) producer.flush() grouped_df.foreachPartition(send_partition_to_kafka) 

and i keep getting this error :

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/c2b_py_etl/spark-3.4.3-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 812, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "/opt/c2b_py_etl/spark-3.4.3-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command command = serializer._read_with_length(file) File "/opt/c2b_py_etl/spark-3.4.3-bin-hadoop3/python/lib/pyspark.zip/pyspark/serializers.py", line 174, in _read_with_length return self.loads(obj) File "/opt/c2b_py_etl/spark-3.4.3-bin-hadoop3/python/lib/pyspark.zip/pyspark/serializers.py", line 472, in loads return cloudpickle.loads(obj, encoding=encoding) AttributeError: type object 'Producer' has no attribute '__bool__' 

Despite instantiating the producer within each partition, the error persists. How can I resolve this serialization issue?

Any help or pointers would be greatly appreciated. Thank you!

1 Answer 1

-1

I think you import the class Producer in global scope to spark tries to serialise it and send it to executors, i think you should use the import inside your broadcasted method like

It should expected the module to be add to spark session dependencies and accessible from executors

broadcast_config = spark.sparkContext.broadcast((kafka_broker, kafka_topic)) def send_partition_to_kafka(partition): from module import Producer kafka_broker, kafka_topic = broadcast_config.value producer = Producer(bootstrap_servers=kafka_broker, value_serializer=lambda v: json.dumps(v).encode('utf-8')) for row in partition: producer.send(kafka_topic, value=row.asDict()) producer.flush() grouped_df.foreachPartition(send_partition_to_kafka) 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.