1

I have a DataSet[Row] where each row is JSON string. I want to just print the JSON stream or count the JSON stream per batch.

Here is my code so far

val ds = sparkSession.readStream() .format("kafka") .option("kafka.bootstrap.servers",bootstrapServers")) .option("subscribe", topicName) .option("checkpointLocation", hdfsCheckPointDir) .load(); val ds1 = ds.select(from_json(col("value").cast("string"), schema) as 'payload) val ds2 = ds1.select($"payload.info") val query = ds2.writeStream.outputMode("append").queryName("table").format("memory").start() query.awaitTermination() 
select * from table; -- don't see anything and there are no errors. However when I run my Kafka consumer separately (independent ofSpark I can see the data) 

My question really is what do I need to do just print the data I am receiving from Kafka using Structured Streaming? The messages in Kafka are JSON encoded strings so I am converting JSON encoded strings to some struct and eventually to a dataset. I am using Spark 2.1.0

3
  • Are you also discussing it on Spark users mailing list (with TD)? Trying to find out how different the two use cases are. Commented May 14, 2017 at 11:04
  • Hi! Yes but we haven't concluded that conversion. I am just trying to print data using structured streaming and I am having a hard time with that :( Commented May 15, 2017 at 7:33
  • I have also tried val query = ds.writeStream.outputMode("append").format("console").start() but that didn't work either Commented May 15, 2017 at 7:52

2 Answers 2

2
val ds1 = ds.select(from_json(col("value").cast("string"), schema) as payload).select($"payload.*") 

That will print your data on the console.

ds1.writeStream.format("console").option("truncate", "false").start().awaitTermination() 

Always use something like awaitTermination() or thread.Sleep(time in seconds) in these type of situations.

Sign up to request clarification or add additional context in comments.

Comments

1
from pyspark.sql import SparkSession from pyspark.sql import DataFrame as SparkDataFrame from pyspark.sql.functions import * # {"name":"jimyag","age":12,"ip":"111.11.1.1"} # nc -l p 9999 spark:SparkSession = SparkSession.builder.appName("readJSONData").getOrCreate() lines:SparkDataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load() jsons = lines.select(from_json(col("value"), "name STRING, age INT, ip STRING").alias("data")).select("data.*") jsons.writeStream.format("console").start().awaitTermination() # print # +------+---+----------+ # | name|age| ip| # +------+---+----------+ # |jimyag| 12|111.11.1.1| # +------+---+----------+ 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.