Skip to main content
added 1347 characters in body
Source Link
Ram Ghadiyaram
  • 29.4k
  • 16
  • 102
  • 133
added 1347 characters in body
Source Link
Ram Ghadiyaram
  • 29.4k
  • 16
  • 102
  • 133

Update : With the below we can achive uniform distribution...

  1. Fetch the primary key of the table.
  2. Find the key minimum and maximum values.
  3. Execute Spark with those values.
 def main(args: Array[String]){ // parsing input parameters ... val primaryKey = executeQuery(url, user, password, s"SHOW KEYS FROM ${config("schema")}.${config("table")} WHERE Key_name = 'PRIMARY'").getString(5) val result = executeQuery(url, user, password, s"select min(${primaryKey}), max(${primaryKey}) from ${config("schema")}.${config("table")}") val min = result.getString(1).toInt val max = result.getString(2).toInt val numPartitions = (max - min) / 5000 + 1 val spark = SparkSession.builder().appName("Spark reading jdbc").getOrCreate() var df = spark.read.format("jdbc"). option("url", s"${url}${config("schema")}"). option("driver", "com.mysql.jdbc.Driver"). option("lowerBound", min). option("upperBound", max). option("numPartitions", numPartitions). option("partitionColumn", primaryKey). option("dbtable", config("table")). option("user", user). option("password", password).load() // some data manipulations here ... df.repartition(10).write.mode(SaveMode.Overwrite).parquet(outputPath) } 

Update : With the below we can achive uniform distribution...

  1. Fetch the primary key of the table.
  2. Find the key minimum and maximum values.
  3. Execute Spark with those values.
 def main(args: Array[String]){ // parsing input parameters ... val primaryKey = executeQuery(url, user, password, s"SHOW KEYS FROM ${config("schema")}.${config("table")} WHERE Key_name = 'PRIMARY'").getString(5) val result = executeQuery(url, user, password, s"select min(${primaryKey}), max(${primaryKey}) from ${config("schema")}.${config("table")}") val min = result.getString(1).toInt val max = result.getString(2).toInt val numPartitions = (max - min) / 5000 + 1 val spark = SparkSession.builder().appName("Spark reading jdbc").getOrCreate() var df = spark.read.format("jdbc"). option("url", s"${url}${config("schema")}"). option("driver", "com.mysql.jdbc.Driver"). option("lowerBound", min). option("upperBound", max). option("numPartitions", numPartitions). option("partitionColumn", primaryKey). option("dbtable", config("table")). option("user", user). option("password", password).load() // some data manipulations here ... df.repartition(10).write.mode(SaveMode.Overwrite).parquet(outputPath) } 
deleted 11 characters in body
Source Link
marc_s
  • 759.9k
  • 186
  • 1.4k
  • 1.5k
Loading
deleted 2 characters in body
Source Link
Ram Ghadiyaram
  • 29.4k
  • 16
  • 102
  • 133
Loading
deleted 1 character in body
Source Link
Ram Ghadiyaram
  • 29.4k
  • 16
  • 102
  • 133
Loading
added 15 characters in body
Source Link
Ram Ghadiyaram
  • 29.4k
  • 16
  • 102
  • 133
Loading
added 1907 characters in body
Source Link
Ram Ghadiyaram
  • 29.4k
  • 16
  • 102
  • 133
Loading
added 242 characters in body
Source Link
Ram Ghadiyaram
  • 29.4k
  • 16
  • 102
  • 133
Loading
added 242 characters in body
Source Link
Ram Ghadiyaram
  • 29.4k
  • 16
  • 102
  • 133
Loading
added 49 characters in body
Source Link
Ram Ghadiyaram
  • 29.4k
  • 16
  • 102
  • 133
Loading
Source Link
Ram Ghadiyaram
  • 29.4k
  • 16
  • 102
  • 133
Loading