MongoDB and Apache Flink / Spark “How to do Data Processing?” Marc Schwering Sr. Solution Architect – EMEA marc@mongodb.com @m4rcsch
2 Agenda For This Session • Data Processing Architectural Overview • The Life of an Application • Separation of Concerns / Real World Architecture • Apache Spark and Flink Data Processing Projects • Clustering with Apache Flink • Next Steps
3 Data Processing Architectural Overview 1. Profile created 2. Enrich with public data 3. Capture activity 4. Clustering analysis 5. Define Personas 6. Tag with personas 7. Personalize interactions Batch analytics Public data Common technologies • R • Hadoop • Spark • Python • Java • Many other options Personas changed much less often than tagging
4 Evolution of a Profile (1) { "_id" : ObjectId("553ea57b588ac9ef066428e1"), "ipAddress" : "216.58.219.238", "referrer" : ”kay.com", "firstName" : "John", "lastName" : "Doe", "email" : "johndoe@gmail.com" }
5 Evolution of a Profile (n+1) { "_id" : ObjectId("553e7dca588ac9ef066428e0"), "firstName" : "John", "lastName" : "Doe", "address" : "229 W. 43rd St.", "city" : "New York", "state" : "NY", "zipCode" : "10036", "age" : 30, "email" : "john.doe@mongodb.com", "twitterHandle" : "johndoe", "gender" : "male", "interests" : [ "electronics", "basketball", "weightlifting", "ultimate frisbee", "traveling", "technology" ], "visitedCounts" : { "watches" : 3, "shirts" : 1, "sunglasses" : 1, "bags" : 2 }, "purchases" : [ { "id" : 1, "desc" : "Power Oxford Dress Shoe", "category" : "Mens shoes" }, { "id" : 2, "desc" : "Striped Sportshirt", "category" : "Mens shirts" } ], "persona" : "shoe-fanatic” }
6 One size/document fits all? • Profile Data – Preferences – Personal information • Contact information • DOB, gender, ZIP... • Customer Data – Purchase History – Marketing History • „Session Data“ – View History – Shopping Cart Data – Information Broker Data • Personalisation Data – Persona Vectors – Product and Category recommendations Application Batch analytics
7 Separation of Concerns • Profile Data – Preferences – Personal information • Contact information • DOB, gender, ZIP... • Customer Data – Purchase History – Marketing History • „Session Data“ – View History – Shopping Cart Data – Information Broker Data • Personalisation Data – Persona Vectors – Product and Category recommendations Batch analytics Layer Frontend - System Profile Service Customer Service Session Service Persona Service
8 Benefits • Code does less, Document and Code stays focused • Split ability – Different Teams – New Languages – Defined Dependencies
9 Advice for Developers (1) • Code does less, Document and Code stays focused • Split ability – Different Teams – New Languages – Defined Dependencies KISS => Keep it simple and save! => Clean Code <= • Robert C. Marten: https://cleancoders.com/ • M. Fowler / B. Meyer. et. al.: Command Query Separation
Analytics and Personalization From Query to Clustering
11 Separation of Concerns • Profile Data – Preferences – Personal information • Contact information • DOB, gender, ZIP... • Customer Data – Purchase History – Marketing History • „Session Data“ – View History – Shopping Cart Data – Information Broker Data • Personalisation Data – Persona Vectors – Product and Category recommendations Batch analytics Layer Frontend – System Profile Service Customer Service Session Service Persona Service
12 Separation of Concerns • Profile Data – Preferences – Personal information • Contact information • DOB, gender, ZIP... • Customer Data – Purchase History – Marketing History • „Session Data“ – View History – Shopping Cart Data – Information Broker Data • Personalisation Data – Persona Vectors – Product and Category recommendations Batch analytics Layer Frontend – System Profile Service Customer Service Session Service Persona Service
13 Architecture revised Profile Service Customer Service Session Service Persona Service Frontend – System Backend– Systems Data Processing
14 Advice for Developers (2) • OWN YOUR DATA! (but only relevant Data) • Say no! (to direct Data ie. DB Access)
Data Processing Solutions
16 Hadoop in a Nutshell • An open source distributed storage and distributed batch oriented processing framework • Hadoop Distributed File System (HDFS) to store data on commodity hardware • Yarn as resource management platform • MapReduce as programming model working on top of HDFS
17 Spark in a Nutshell • Spark is a top-level Apache project • Can be run on top of YARN and can read any Hadoop API data, including HDFS or MongoDB • Fast and general engine for large-scale data processing and analytics • Advanced DAG execution engine with support for data locality and in-memory computing
18 Flink in a Nutshell • Flink is a top-level Apache project • Can be run on top of YARN and can read any Hadoop API data, including HDFS or MongoDB • A distributed streaming dataflow engine • Streaming and batch • Iterative in memory execution and handling • Cost based optimizer
19 Latency of query operations Query Aggregation MapReduce Cluster Algorithms time MongoDB Hadoop Spark/Flink
Iterative Algorithms / Clustering
21 K-Means in Pictures • Source: Wikipedia K-Means
22 K-Means as a Process
23 Iterations in Hadoop and Spark
24 Iterations in Flink • Dedicated iteration operators • Tasks keep running for the iterations, not redeployed for each step • Caching and optimizations done automatically
Examplecode
26 Reader / Writer Config //reader config public static DataSet<Tuple2<BSONWritable, BSONWritable>> readFromMongo(ExecutionEnvironment env, String uri) { JobConf conf = new JobConf(); conf.set("mongo.input.uri", uri); MongoInputFormat mongoInputFormat = new MongoInputFormat(); return env.createHadoopInput(mongoInputFormat, BSONWritable.class, BSONWritable.class, conf); } //writer config public static void writeToMongo(DataSet<Tuple2<BSONWritable, BSONWritable>> result, String uri) { JobConf conf = new JobConf(); conf.set("mongo.output.uri", uri); MongoOutputFormat<BSONWritable, BSONWritable> mongoOutputFormat = new MongoOutputFormat<BSONWritable, BSONWritable>(); result.output(new HadoopOutputFormat<BSONWritable, BSONWritable>(mongoOutputFormat, conf)); }
27 Import data //points DataSet<Tuple2<BSONWritable, BSONWritable>> inPoints = readFromMongo(env, mongoInputUri + pointsSource); //centers DataSet<Tuple2<BSONWritable, BSONWritable>> inCenters = readFromMongo(env, mongoInputUri + centerSource); DataSet<Point> points = convertToPointSet(inPoints); DataSet<Centroid> centroids = convertToCentroidSet(inCenters);
28 Converting public Tuple2<BSONWritable, BSONWritable> map(Tuple2<Integer, Point> integerPointTuple2) throws Exception { Integer id = integerPointTuple2.f0; Point point = integerPointTuple2.f1; BasicDBObject idDoc = new BasicDBObject(); idDoc.put("_id", id); BSONWritable bsonId = new BSONWritable(); bsonId.setDoc(idDoc); BasicDBObject doc = new BasicDBObject(); doc.put("_id", id); doc.put("x", point.x); doc.put("y", point.y); BSONWritable bsonDoc = new BSONWritable(); bsonDoc.setDoc(doc); return new Tuple2(bsonId,bsonDoc); }
29 Result
30 More…?
31 Takeaways • Evolution is amazing and exiting! – Be ready to learn new things, ask questions across Silos! • Stay focused => Start and stay small – Evaluate with BigDocuments but do a PoC focussed on the topic • Extending functionality could be challenging – Evolution is outpacing help channels – A lot of options (Spark, Flink, Storm, Hadoop….) – More than just a binary • Extending functionality is easy – Aggregation, MapReduce – Connectors opening a new variety of Use Cases
32 Next Steps • Try out Flink – http://flink.apache.org/ – https://github.com/mongodb/mongo-hadoop – https://github.com/m4rcsch/flink-mongodb-example – http://sparkbigdata.com • Participate and ask Questions! – @m4rcsch – marc@mongodb.com
Thank you! Marc Schwering Sr. Solutions Architect – EMEA marc@mongodb.com @m4rcsch

MongoDB Days Germany: Data Processing with MongoDB

  • 1.
    MongoDB and ApacheFlink / Spark “How to do Data Processing?” Marc Schwering Sr. Solution Architect – EMEA marc@mongodb.com @m4rcsch
  • 2.
    2 Agenda For ThisSession • Data Processing Architectural Overview • The Life of an Application • Separation of Concerns / Real World Architecture • Apache Spark and Flink Data Processing Projects • Clustering with Apache Flink • Next Steps
  • 3.
    3 Data Processing ArchitecturalOverview 1. Profile created 2. Enrich with public data 3. Capture activity 4. Clustering analysis 5. Define Personas 6. Tag with personas 7. Personalize interactions Batch analytics Public data Common technologies • R • Hadoop • Spark • Python • Java • Many other options Personas changed much less often than tagging
  • 4.
    4 Evolution of aProfile (1) { "_id" : ObjectId("553ea57b588ac9ef066428e1"), "ipAddress" : "216.58.219.238", "referrer" : ”kay.com", "firstName" : "John", "lastName" : "Doe", "email" : "johndoe@gmail.com" }
  • 5.
    5 Evolution of aProfile (n+1) { "_id" : ObjectId("553e7dca588ac9ef066428e0"), "firstName" : "John", "lastName" : "Doe", "address" : "229 W. 43rd St.", "city" : "New York", "state" : "NY", "zipCode" : "10036", "age" : 30, "email" : "john.doe@mongodb.com", "twitterHandle" : "johndoe", "gender" : "male", "interests" : [ "electronics", "basketball", "weightlifting", "ultimate frisbee", "traveling", "technology" ], "visitedCounts" : { "watches" : 3, "shirts" : 1, "sunglasses" : 1, "bags" : 2 }, "purchases" : [ { "id" : 1, "desc" : "Power Oxford Dress Shoe", "category" : "Mens shoes" }, { "id" : 2, "desc" : "Striped Sportshirt", "category" : "Mens shirts" } ], "persona" : "shoe-fanatic” }
  • 6.
    6 One size/document fitsall? • Profile Data – Preferences – Personal information • Contact information • DOB, gender, ZIP... • Customer Data – Purchase History – Marketing History • „Session Data“ – View History – Shopping Cart Data – Information Broker Data • Personalisation Data – Persona Vectors – Product and Category recommendations Application Batch analytics
  • 7.
    7 Separation of Concerns •Profile Data – Preferences – Personal information • Contact information • DOB, gender, ZIP... • Customer Data – Purchase History – Marketing History • „Session Data“ – View History – Shopping Cart Data – Information Broker Data • Personalisation Data – Persona Vectors – Product and Category recommendations Batch analytics Layer Frontend - System Profile Service Customer Service Session Service Persona Service
  • 8.
    8 Benefits • Code doesless, Document and Code stays focused • Split ability – Different Teams – New Languages – Defined Dependencies
  • 9.
    9 Advice for Developers(1) • Code does less, Document and Code stays focused • Split ability – Different Teams – New Languages – Defined Dependencies KISS => Keep it simple and save! => Clean Code <= • Robert C. Marten: https://cleancoders.com/ • M. Fowler / B. Meyer. et. al.: Command Query Separation
  • 10.
  • 11.
    11 Separation of Concerns •Profile Data – Preferences – Personal information • Contact information • DOB, gender, ZIP... • Customer Data – Purchase History – Marketing History • „Session Data“ – View History – Shopping Cart Data – Information Broker Data • Personalisation Data – Persona Vectors – Product and Category recommendations Batch analytics Layer Frontend – System Profile Service Customer Service Session Service Persona Service
  • 12.
    12 Separation of Concerns •Profile Data – Preferences – Personal information • Contact information • DOB, gender, ZIP... • Customer Data – Purchase History – Marketing History • „Session Data“ – View History – Shopping Cart Data – Information Broker Data • Personalisation Data – Persona Vectors – Product and Category recommendations Batch analytics Layer Frontend – System Profile Service Customer Service Session Service Persona Service
  • 13.
    13 Architecture revised Profile Service Customer Service SessionService Persona Service Frontend – System Backend– Systems Data Processing
  • 14.
    14 Advice for Developers(2) • OWN YOUR DATA! (but only relevant Data) • Say no! (to direct Data ie. DB Access)
  • 15.
  • 16.
    16 Hadoop in aNutshell • An open source distributed storage and distributed batch oriented processing framework • Hadoop Distributed File System (HDFS) to store data on commodity hardware • Yarn as resource management platform • MapReduce as programming model working on top of HDFS
  • 17.
    17 Spark in aNutshell • Spark is a top-level Apache project • Can be run on top of YARN and can read any Hadoop API data, including HDFS or MongoDB • Fast and general engine for large-scale data processing and analytics • Advanced DAG execution engine with support for data locality and in-memory computing
  • 18.
    18 Flink in aNutshell • Flink is a top-level Apache project • Can be run on top of YARN and can read any Hadoop API data, including HDFS or MongoDB • A distributed streaming dataflow engine • Streaming and batch • Iterative in memory execution and handling • Cost based optimizer
  • 19.
    19 Latency of queryoperations Query Aggregation MapReduce Cluster Algorithms time MongoDB Hadoop Spark/Flink
  • 20.
  • 21.
    21 K-Means in Pictures •Source: Wikipedia K-Means
  • 22.
  • 23.
  • 24.
    24 Iterations in Flink •Dedicated iteration operators • Tasks keep running for the iterations, not redeployed for each step • Caching and optimizations done automatically
  • 25.
  • 26.
    26 Reader / WriterConfig //reader config public static DataSet<Tuple2<BSONWritable, BSONWritable>> readFromMongo(ExecutionEnvironment env, String uri) { JobConf conf = new JobConf(); conf.set("mongo.input.uri", uri); MongoInputFormat mongoInputFormat = new MongoInputFormat(); return env.createHadoopInput(mongoInputFormat, BSONWritable.class, BSONWritable.class, conf); } //writer config public static void writeToMongo(DataSet<Tuple2<BSONWritable, BSONWritable>> result, String uri) { JobConf conf = new JobConf(); conf.set("mongo.output.uri", uri); MongoOutputFormat<BSONWritable, BSONWritable> mongoOutputFormat = new MongoOutputFormat<BSONWritable, BSONWritable>(); result.output(new HadoopOutputFormat<BSONWritable, BSONWritable>(mongoOutputFormat, conf)); }
  • 27.
    27 Import data //points DataSet<Tuple2<BSONWritable, BSONWritable>>inPoints = readFromMongo(env, mongoInputUri + pointsSource); //centers DataSet<Tuple2<BSONWritable, BSONWritable>> inCenters = readFromMongo(env, mongoInputUri + centerSource); DataSet<Point> points = convertToPointSet(inPoints); DataSet<Centroid> centroids = convertToCentroidSet(inCenters);
  • 28.
    28 Converting public Tuple2<BSONWritable, BSONWritable>map(Tuple2<Integer, Point> integerPointTuple2) throws Exception { Integer id = integerPointTuple2.f0; Point point = integerPointTuple2.f1; BasicDBObject idDoc = new BasicDBObject(); idDoc.put("_id", id); BSONWritable bsonId = new BSONWritable(); bsonId.setDoc(idDoc); BasicDBObject doc = new BasicDBObject(); doc.put("_id", id); doc.put("x", point.x); doc.put("y", point.y); BSONWritable bsonDoc = new BSONWritable(); bsonDoc.setDoc(doc); return new Tuple2(bsonId,bsonDoc); }
  • 29.
  • 30.
  • 31.
    31 Takeaways • Evolution isamazing and exiting! – Be ready to learn new things, ask questions across Silos! • Stay focused => Start and stay small – Evaluate with BigDocuments but do a PoC focussed on the topic • Extending functionality could be challenging – Evolution is outpacing help channels – A lot of options (Spark, Flink, Storm, Hadoop….) – More than just a binary • Extending functionality is easy – Aggregation, MapReduce – Connectors opening a new variety of Use Cases
  • 32.
    32 Next Steps • Tryout Flink – http://flink.apache.org/ – https://github.com/mongodb/mongo-hadoop – https://github.com/m4rcsch/flink-mongodb-example – http://sparkbigdata.com • Participate and ask Questions! – @m4rcsch – marc@mongodb.com
  • 33.
    Thank you! Marc Schwering Sr.Solutions Architect – EMEA marc@mongodb.com @m4rcsch

Editor's Notes

  • #2 I don not write a lot of code anymore but I visit a lot of prospects and customers handling tons of data and different deployments every week. This talk want’s to prevent you from some pitfalls and shpuld give you some advice how to do it right
  • #3 Personalization Process Review (What We Heard) Access Pattern and Development Cycle Separation of Concerns (MongoDB Point of View)
  • #4 Todo: zoom in common tech
  • #6 Even counts and therefore persona very helpful. A good problem to have is too much information to personalize with – start simple, measure, and add
  • #7 Profile: show logical document parts
  • #8 Frontent caching system like varnish
  • #10 KISS => Keep it simple, stupid! Todo: References!!!
  • #16 Hadoop: great for big data that is partitionable Spark: MapReduce iterations are fast
  • #17 Amongst Hadoop and others these ar... In a distributed system, a conventional program would not work as the data is split across nodes. DAG (Directed Acyclic Graph) is a programming style for distributed systems - You can think of it as an alternative to Map Reduce. While MR has just two steps (map and reduce), DAG can have multiple levels that can form a tree structure. Say if you want to execute a SQL query, DAG is more flexible with more functions like map, filter, union etc. Also DAG execution is faster as in case of Apache Tez that succeeds MR due to intermediate results not being written to disk. Coming to Spark, the main concept is "RDD" - Resilient Distributed Dataset. To understand Spark architecture, it's best to read Berkley paper - Page on berkeley.edu In brief, RDDs are distributed data sets that can stay in memory and fallback to disk gracefully. RDDs if lost can be easily rebuilt using a graph that says how to reconstruct. RDDs are great if you want to keep holding a data set in memory and fire a series of queries - this works better than fetching data from disk every time. Another important RDD concept is that there are two types of things that can be done on an RDD - 1) Transformations like, map, filter than results in another RDD. 2) Actions like count that result in an output. A spark job comprises of a DAG of tasks executing transformations and actions on RDDs.
  • #18 Amongst Hadoop and others these ar... In a distributed system, a conventional program would not work as the data is split across nodes. DAG (Directed Acyclic Graph) is a programming style for distributed systems - You can think of it as an alternative to Map Reduce. While MR has just two steps (map and reduce), DAG can have multiple levels that can form a tree structure. Say if you want to execute a SQL query, DAG is more flexible with more functions like map, filter, union etc. Also DAG execution is faster as in case of Apache Tez that succeeds MR due to intermediate results not being written to disk. Coming to Spark, the main concept is "RDD" - Resilient Distributed Dataset. To understand Spark architecture, it's best to read Berkley paper - Page on berkeley.edu In brief, RDDs are distributed data sets that can stay in memory and fallback to disk gracefully. RDDs if lost can be easily rebuilt using a graph that says how to reconstruct. RDDs are great if you want to keep holding a data set in memory and fire a series of queries - this works better than fetching data from disk every time. Another important RDD concept is that there are two types of things that can be done on an RDD - 1) Transformations like, map, filter than results in another RDD. 2) Actions like count that result in an output. A spark job comprises of a DAG of tasks executing transformations and actions on RDDs.
  • #19 In a distributed system, a conventional program would not work as the data is split across nodes. DAG (Directed Acyclic Graph) is a programming style for distributed systems - You can think of it as an alternative to Map Reduce. While MR has just two steps (map and reduce), DAG can have multiple levels that can form a tree structure. Say if you want to execute a SQL query, DAG is more flexible with more functions like map, filter, union etc. Also DAG execution is faster as in case of Apache Tez that succeeds MR due to intermediate results not being written to disk. Coming to Spark, the main concept is "RDD" - Resilient Distributed Dataset. To understand Spark architecture, it's best to read Berkley paper - Page on berkeley.edu In brief, RDDs are distributed data sets that can stay in memory and fallback to disk gracefully. RDDs if lost can be easily rebuilt using a graph that says how to reconstruct. RDDs are great if you want to keep holding a data set in memory and fire a series of queries - this works better than fetching data from disk every time. Another important RDD concept is that there are two types of things that can be done on an RDD - 1) Transformations like, map, filter than results in another RDD. 2) Actions like count that result in an output. A spark job comprises of a DAG of tasks executing transformations and actions on RDDs.
  • #20 Better graphic.. Ggf die von chris nehmen und abaendern Cluster Alrgorithms… Chris Slides
  • #22 Wikipedia! Gray sqares!
  • #23 Todo: proper graphic
  • #24 Todo: add reference
  • #25 Todo: redesign graphic into MongoDB Version No black box, Logic and hook
  • #26 K means explained, more complex theme also expained
  • #30 Insert grpa
  • #32 Don‘t buy in too early. Solving real problems, Choose the right tool. RDD and / or Clustering Jobs are “natural” Staying operational and low latency focused
  • #33 New frontier, use it!