i'm working with PySpark and trying to send a partitioned DataFrame to Kafka using the confluent-kafka library's Producer. However, I'm encountering a serialization problem with the Kafka producer on the Spark executors
Here is my code :
broadcast_config = spark.sparkContext.broadcast((kafka_broker, kafka_topic)) def send_partition_to_kafka(partition): kafka_broker, kafka_topic = broadcast_config.value for row in partition: producer = Producer(bootstrap_servers=kafka_broker, value_serializer=lambda v: json.dumps(v).encode('utf-8')) producer.send(kafka_topic, value=row.asDict()) producer.flush() grouped_df.foreachPartition(send_partition_to_kafka) and i keep getting this error :
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/c2b_py_etl/spark-3.4.3-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 812, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "/opt/c2b_py_etl/spark-3.4.3-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command command = serializer._read_with_length(file) File "/opt/c2b_py_etl/spark-3.4.3-bin-hadoop3/python/lib/pyspark.zip/pyspark/serializers.py", line 174, in _read_with_length return self.loads(obj) File "/opt/c2b_py_etl/spark-3.4.3-bin-hadoop3/python/lib/pyspark.zip/pyspark/serializers.py", line 472, in loads return cloudpickle.loads(obj, encoding=encoding) AttributeError: type object 'Producer' has no attribute '__bool__' Despite instantiating the producer within each partition, the error persists. How can I resolve this serialization issue?
Any help or pointers would be greatly appreciated. Thank you!