I use spark 2.2.1, kafka_2.12-1.0.0 and scala to get some json data from kafka, however, I only connect the kafka but no data output .
here my scala code:
def main(args: Array[String]) { val spark = SparkSession .builder() .appName("Spark structured streaming Kafka example") .master("local[2]") .getOrCreate() val inputstream = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "behavior") .option("group.id","test-consumer-group") .option("startingOffsets", "earliest") .load() import spark.implicits._ println("===============================================================") val query = inputstream //select($"data") .selectExpr("CAST(key AS STRING)","CAST(value AS STRING)") .writeStream .outputMode("append") .format("console") .trigger( Trigger.ProcessingTime("2 seconds")) .start() println("===============================================================" +query.isActive) query.awaitTermination() here is my pom.xml
<properties> <spark.version>2.2.0</spark.version> <scala.version>2.11.6</scala.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>0.10.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.2.1</version> </dependency> </dependencies> I run this code, the console didn't show any data from kafka .
here the console output:
=============================================================== 18/03/12 17:00:47 INFO SparkSqlParser: Parsing command: CAST(key AS STRING) 18/03/12 17:00:47 INFO SparkSqlParser: Parsing command: CAST(value AS STRING) 18/03/12 17:00:48 INFO StreamExecution: Starting [id = 6648f18e-3ecd-4046-85ee-932fffaab70c, runId = cb6a9ae9-9460-4232-b8ed-342d48c2e524]. Use /D:/data/kafka to store the query checkpoint. ===============================================================true 18/03/12 17:00:48 INFO ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = spark-kafka-source-1b918ced-93c2-4648-8a60-16f9695d12d6-2063137397-driver-0 heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 1 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer Discovered coordinator KB2CMVMCIWDJT61.localdomain:9092 (id: 2147483647 rack: null) for group spark-kafka-source-1b918ced-93c2-4648-8a60-16f9695d12d6-2063137397-driver-0. Marking the coordinator KB2CMVMCIWDJT61.localdomain:9092 (id: 2147483647 rack: null) dead for group spark-kafka-source-1b918ced-93c2-4648-8a60-16f9695d12d6-2063137397-driver-0 the output only says that my consumer-group is dead. My kafka works well ,I can use the console command to get data from the topic "behavior".In short ,the kafka and topic didn't seem to be wrong. I'm a novice for Spark structured streaming and Kafka, hope to get your help.
https://stackoverflow.com/questions/62105605/unable-to-read-kafka-topic-data-using-spark/62105955?noredirect=1#comment109855136_62105955