A unified, open source, parallel data processing framework for big data analytics Spark core engine Spark SQL Interactive queries Spark Streaming Stream processing Spark ML Machine learning GraphX Graph computation Yarn Mesos Standalone scheduler
Unified engine Ecosystem Developer productivity Performance
Primary resource managers: Hadoop 1.0+ or Hadoop YARN HadoopSpark Alternative resource managers: Mesos or the Spark resource manager
102.5 100 72 23 2100 206 50400 6592 2013 Record (Hadoop) Spark 100 TB Data Size (TB) Time (Min) Nodes Cores tinyurl.com/spark-sort Logistic regression 140 120 100 80 40 20 0 60 Hadoop Spark 0.9 Logistic regression on a 100-node cluster with 100 GB of data. Spark is the 2014 Sort Benchmark winner. 3x faster than 2013 winner (Hadoop).
Reads from HDFS Writes to HDFS Reads from HDFS Writes to HDFS Step 1 Step 2 Step 1 Reads and writes from HDFS
ReadReadRead Cluster manager HDFS Worker nodeWorker node Worker node Worker node Driver program SparkContext
Machine learning Real-time stream processing Developer productivity Interactive analyticsHigh performance batch computation
• .map() • .groupByKey() • .join() • .reduce() • .collect()
RDD RDD RDD RDDRDD Transformations ValueActions
tweetDF = spark.read.json('wasb:///libya-sentences/*/*.json') tokenizer = Tokenizer(inputCol="Sentence", outputCol="Words") counter = CountVectorizer(inputCol="Words", outputCol="features", vocabSize=10000, minDF=2.) tokenized = tokenizer.transform(tweetDF) countModel = counter.fit(tokenized) counted = countModel.transform(tokenized) lda = LDA(k=10, maxIter=10) model = lda.fit(counted) topics = model.describeTopics(3) topics.show(truncate=False)
tweetDF = spark.read.json('wasb:///libya-sentences/*/*.json') tokenizer = Tokenizer(inputCol="Sentence", outputCol="Words") counter = CountVectorizer(inputCol="Words", outputCol="features", vocabSize=10000, minDF=2.) lda = LDA(k=10, maxIter=10) pipeline = Pipeline(stages=[tokenizer, counter, lda]) model = pipeline.fit(tweetDF) topics = model.describeTopics(3) topics.show(truncate=False) ldaScored = model.transform(tweetDF)
• • Microsoft R Server
• • • • Microsoft R Server
mySparkCluster <- RxSpark() rxSetComputeContext(mySparkCluster) myData <- read.json('wasb:///creditfraud/*.json') # Run a logistic regression using RevoScaleR model <- rxLogit(Class ~ Amount + V1 + V2 + V3 + V4, data = myData) # Now run the same using SparkR Model2 <- spark.logit(myData, Class ~ Amount + V1 + V2 + V3 + V4, regParam = 0.3, elasticNetParam = 0.8) summary(model) Summary(model2)
tweetDF = spark.read.json('wasb:///libya-sentences/*/*.json') tweetDF = tweetDF[tweetDF.Language == 'en'] tokenizer = Tokenizer(inputCol="Sentence", outputCol="Words") enSW = StopWordsRemover.loadDefaultStopWords('english') + ['rt', '-', '&amp;', ''] swr = StopWordsRemover(inputCol="Words", outputCol="Filtered", stopWords=enSW) tokenized = tokenizer.transform(tweetDF) filtered = swr.transform(tokenized) counter = CountVectorizer(inputCol="Filtered", outputCol=“rawFeatures", vocabSize=10000, minDF=2.) countModel = counter.fit(filtered) counted = countModel.transform(filtered)
  
idf = IDF(inputCol="rawFeatures", outputCol="features") idfModel = idf.fit(counted) idfScaled = idfModel.transform(counted) gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10) # 80/20 train/test split train, test = labeled.randomSplit([0.8, 0.2], seed=1337) model = gbt.fit(train)
predictions = model.transform(test).select('prediction', 'label') metrics = BinaryClassificationMetrics(predictions.rdd) print('Area under PR = %s' % metrics.areaUnderPR) print('Area under ROC = %s' % metrics.areaUnderROC) # Set us up for plotting ROC predictions.registerTempTable('pred_and_labels') # In a new cell, use %%sql magic to pull results down to local context %%sql –q –o predictionResults select * from pred_and_labels
%%local %matplotlib inline from sklearn.metrics import roc_curve,auc prob = predResults['prediction'] fpr, tpr, thresholds = roc_curve(predResults['label'], prob, pos_label=1); roc_auc = auc(fpr, tpr) plt.figure(figsize=(5,5)); plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc) plt.plot([0, 1], [0, 1], 'k--'); plt.xlim([0.0, 1.0]); plt.ylim([0.0, 1.05]) plt.xlabel('False Positive Rate'); plt.ylabel('True Positive Rate') plt.title('ROC Curve'); plt.legend(loc="lower right") plt.show()
HDInsight Spark SparkML HDInsight Rserver RevoScaleR SparkR GitHub Channel 9 Microsoft Virtual Academy
[AI04] Scaling Machine Learning to Big Data Using SparkML and SparkR

[AI04] Scaling Machine Learning to Big Data Using SparkML and SparkR