Extending Spark's Ingestion: Build Your Own Java Data Source with Jean Georges Perrin
The document outlines a presentation by Jean Georges Perrin on extending Apache Spark's ingestion capabilities using Java to create custom data sources. It discusses different data ingestion solutions, including leveraging existing libraries and building new ones, along with practical examples of coding. The presentation targets software and data engineers looking to enhance Spark's versatility with non-standard data formats.
The speaker discusses extending Apache Spark's ingestion capabilities using Java, introducing themselves and engaging with the audience on their programming experiences.
Identifies challenges with existing data formats like CSV and JSON, encouraging users to explore Spark packages and custom solutions.
Step-by-step guidance on writing a custom data source in Java, including reading metadata and building Spark applications.
Covers the mechanics of importing photo metadata into Spark, including schema definition and data extraction processes.
Focuses on the relation structure between Spark and Java libraries, emphasizing data conversion and efficient schema management.
Provides resources for further learning, concluding insights on building custom data sources, and encouraging audience engagement.
And What AboutYou? • Who is a Java programmer? • Who swears by Scala? • Who has tried to build a custom data source? • In Java? • Who has succeeded? 4@jgperrin - #EUdev6
Solution #1 6@jgperrin -#EUdev6 • Look in the Spark Packages library • https://spark-packages.org/ • 48 data sources listed (some dups) • Some with Source Code
Code Dataset<Row> df =spark.read() .format("exif") .option("recursive", "true") .option("limit", "80000") .option("extensions", "jpg,jpeg") .load(importDirectory); #EUdev6 Spark Session Short name for your data source Options Where to start /jgperrin/net.jgp.labs.spark.datasources
11.
Short name • Optional:can specify a full class name • An alias to a class name • Needs to be registered • Not recommended during development phase 11@jgperrin - #EUdev6
12.
Project 12#EUdev6 Information for registration of thedata source Data source’s short name Data source The app Relation Utilities An existing library you can embed/link to /jgperrin /net.jgp.labs.spark.datasources
13.
Library code • Alibrary you already use, you developed or acquired • No need for anything special 13@jgperrin - #EUdev6
14.
/jgperrin /net.jgp.labs.spark.datasources Scala 2 Java conversion (sorry!)– All the options Data Source Code 14@jgperrin - #EUdev6 Provides a relation Needed by Spark Needs to be passed to the relation public class ExifDirectoryDataSource implements RelationProvider { @Override public BaseRelation createRelation( SQLContext sqlContext, Map<String, String> params) { java.util.Map<String, String> javaMap = mapAsJavaMapConverter(params).asJava(); ExifDirectoryRelation br = new ExifDirectoryRelation(); br.setSqlContext(sqlContext); ... br.setPhotoLister(photoLister); return br; } } The relation to be exploited
15.
Relation • Plumbing between –Spark – Your existing library • Mission – Returns the schema as a StructType – Returns the data as a RDD<Row> 15@jgperrin - #EUdev6
16.
/jgperrin /net.jgp.labs.spark.datasources TableScan is the key,other more specialized Scan available Relation 16@jgperrin - #EUdev6 The schema: may/will be called first The data… SQL Context public class ExifDirectoryRelation extends BaseRelation implements Serializable, TableScan { private static final long serialVersionUID = 4598175080399877334L; @Override public RDD<Row> buildScan() { ... return rowRDD.rdd(); } @Override public StructType schema() { ... return schema.getSparkSchema(); } @Override public SQLContext sqlContext() { return this.sqlContext; } ... }
17.
/jgperrin /net.jgp.labs.spark.datasources A utility function thatintrospect a Java bean and turn it into a ”Super” schema, which contains the required StructType for Spark Relation – Schema 17@jgperrin - #EUdev6 @Override public StructType schema() { if (schema == null) { schema = SparkBeanUtils.getSchemaFromBean(PhotoMetadata.class); } return schema.getSparkSchema(); }
18.
/jgperrin /net.jgp.labs.spark.datasources Collect the data Relation– Data 18@jgperrin - #EUdev6 @Override public RDD<Row> buildScan() { schema(); List<PhotoMetadata> table = collectData(); JavaSparkContext sparkContext = new JavaSparkContext(sqlContext.sparkContext()); JavaRDD<Row> rowRDD = sparkContext.parallelize(table) .map(photo -> SparkBeanUtils.getRowFromBean(schema, photo)); return rowRDD.rdd(); } private List<PhotoMetadata> collectData() { List<File> photosToProcess = this.photoLister.getFiles(); List<PhotoMetadata> list = new ArrayList<PhotoMetadata>(); PhotoMetadata photo; for (File photoToProcess : photosToProcess) { photo = ExifUtils.processFromFilename(photoToProcess.getAbsolutePath()); list.add(photo); } return list; } Scans the files, extract EXIF information: the interface to your library… Creates the RDD by parallelizing the list of photos
19.
/jgperrin /net.jgp.labs.spark.datasources Application 19@jgperrin - #EUdev6 importorg.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class PhotoMetadataIngestionApp { public static void main(String[] args) { PhotoMetadataIngestionApp app = new PhotoMetadataIngestionApp(); app.start(); } private boolean start() { SparkSession spark = SparkSession.builder() .appName("EXIF to Dataset").master("local[*]").getOrCreate(); Dataset<Row> df = spark.read() .format("exif") .option("recursive", "true") .option("limit", "80000") .option("extensions", "jpg,jpeg") .load("/Users/jgp/Pictures"); df = df .filter(df.col("GeoX").isNotNull()) .filter(df.col("GeoZ").notEqual("NaN")) .orderBy(df.col("GeoZ").desc()); System.out.println("I have imported " + df.count() + " photos."); df.printSchema(); df.show(5); return true; } } Normal imports, no reference to our data source Local mode Classic read Standard dataframe API: getting my ”highest” photos! Standard output mechanism
Schema, Beans, andAnnotations 24@jgperrin - #EUdev6 Schema Bean StructType SparkBeanUtils. getSchemaFromBean() SparkBeanUtils. getRowFromBean() Schema. getSparkSchema() RowColumn Column Column Column Can be augmented via the @SparkColumn annotation
25.
Mass production • Easilyimport from any Java Bean • Conversion done by utility functions • Schema is a superset of StructType 25#EUdev6@jgperrin - #EUdev6
26.
Conclusion • Reuse theBean to schema and Bean to data in your project • Building custom data source to REST server or non standard format is easy • No need for a pricey conversion to CSV or JSON • There is always a solution in Java • On you: check for parallelism, optimization, extend schema (order of columns) 26@jgperrin - #EUdev6
27.
Going Further • Checkout the code (fork/like) https://github.com/jgperrin/net.jgp.labs.spark.dataso urces • Follow me @jgperrin • Watch for my Java + Spark book, coming out soon! • (If you come in the RTP area in NC, USA, come for a Spark meet-up and let’s have a drink!) 27#EUdev6
28.
Go raibh maithagaibh. Jean Georges “JGP” Perrin @jgperrin Don’t forget to rate
More Reading &Resources • My blog: http://jgp.net • This code on GitHub: https://github.com/jgperrin/net.jgp.labs.spark.dat asources • Java Code on GitHub: https://github.com/jgperrin/net.jgp.labs.spark /jgperrin/net.jgp.labs.spark.datasources
Abstract EXTENDING APACHE SPARK'SINGESTION: BUILDING YOUR OWN JAVA DATA SOURCE By Jean Georges Perrin (@jgperrin, Oplo) Apache Spark is a wonderful platform for running your analytics jobs. It has great ingestion features from CSV, Hive, JDBC, etc. however, you may have your own data sources or formats you want to use. Your solution could be to convert your data in a CSV or JSON file and then ask Spark to do ingest it through its built-in tools. However, for enhanced performance, we will explore the way to build a data source, in Java, to extend Spark’s ingestion capabilities. We will first understand how Spark works for ingestion, then walk through the development of this data source plug-in. Targeted audience: Software and data engineers who need to expand Spark’s ingestion capability. Key takeaways: • Requirements, needs & architecture – 15%. • Build the required tool set in Java – 85%. Session hashtag: #EUdev6