鍍金池/ 教程/ 大數(shù)據(jù)/ Spark核心編程
Spark 安裝
Spark編程
Spark核心編程
Spark教程
Apache Spark RDD
Spark部署

Spark核心編程

Spark 核心是整個(gè)項(xiàng)目的基礎(chǔ)。它提供了分布式任務(wù)調(diào)度,調(diào)度和基本的 I/O 功能。Spark 使用一種稱為RDD(彈性分布式數(shù)據(jù)集)一個(gè)專門的基礎(chǔ)數(shù)據(jù)結(jié)構(gòu),是整個(gè)機(jī)器分區(qū)數(shù)據(jù)的邏輯集合。RDDS可以用兩種方法來(lái)創(chuàng)建的;一個(gè)是在外部存儲(chǔ)系統(tǒng)引用的數(shù)據(jù)集,第二個(gè)是通過(guò)應(yīng)用轉(zhuǎn)換(如map, filter, reducer, join)在現(xiàn)有RDDS。

RDD抽象通過(guò)語(yǔ)言集成API公開(kāi)。這簡(jiǎn)化了編程的復(fù)雜性,因?yàn)閼?yīng)用程序的處理RDDS方式類似于操縱的本地集合數(shù)據(jù)。

Spark Shell

Spark提供了一個(gè)交互的shell ? 一個(gè)強(qiáng)大的工具,以交互方式分析數(shù)據(jù)。 這是在 Scala或Python語(yǔ)言。Spark主要抽象稱為彈性分布式數(shù)據(jù)集(RDD)項(xiàng)目的分布式采集。RDDS可以從Hadoop的輸入格式來(lái)創(chuàng)建(如HDFS文件)或通過(guò)轉(zhuǎn)化其他RDDS。

打開(kāi) Spark Shell

下面的命令用來(lái)打開(kāi)Spark shell。
$ spark-shell

創(chuàng)建簡(jiǎn)單RDD

讓我們從文本文件中創(chuàng)建一個(gè)簡(jiǎn)單的 RDD。使用下面的命令來(lái)創(chuàng)建一個(gè)簡(jiǎn)單的 RDD。
scala> val inputfile = sc.textFile(“input.txt”)
對(duì)上述命令的輸出為:
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
Spark RDD API引入了一些變革和一些動(dòng)作來(lái)操縱RDD。

RDD 轉(zhuǎn)換

RDD轉(zhuǎn)換返回指向新的RDD,并允許創(chuàng)建RDDS之間的依賴關(guān)系。 在依賴關(guān)系鏈中的每個(gè)RDD(依賴關(guān)系的字串)具有這樣的功能,用于計(jì)算其數(shù)據(jù)并具有一個(gè)指針(依賴性)到其父RDD。

Spark是懶惰的,所以什么都不會(huì)被執(zhí)行,除非調(diào)用一些改造或行動(dòng)將觸發(fā)作業(yè)創(chuàng)建和執(zhí)行??磫卧~計(jì)數(shù)示例,如下面的代碼片段。

因此,RDD轉(zhuǎn)型不是一組數(shù)據(jù)而是在程序中的一個(gè)步驟(可能是唯一的步驟)告訴Spark如何獲取數(shù)據(jù)以及如何使用它。
下面給出是RDD轉(zhuǎn)換的列表。
S.No
轉(zhuǎn)換&含義
1

map(func)

返回一個(gè)新的分布式數(shù)據(jù)集,傳遞源的每個(gè)元素形成通過(guò)一個(gè)函數(shù) func

2

filter(func)

返回由選擇在func返回true,源元素組成了一個(gè)新的數(shù)據(jù)集
3

flatMap(func)

類似映射,但每個(gè)輸入項(xiàng)目可以被映射到0以上輸出項(xiàng)目(所以func應(yīng)返回seq而不是單一的項(xiàng)目)
4

mapPartitions(func)

類似映射,只不過(guò)是單獨(dú)的每個(gè)分區(qū)(塊)上運(yùn)行RDD,因此 func 的類型必須是Iterator<T> ? Iterator<U> 對(duì)類型T在RDD上運(yùn)行時(shí)

5

mapPartitionsWithIndex(func)

類似映射分區(qū),而且還提供func 來(lái)表示分區(qū)的索引的整數(shù)值,因此 func 必須是類型 (Int, Iterator<T>) ? Iterator<U> 當(dāng)類型T在RDD上運(yùn)行時(shí)

6

sample(withReplacement, fraction, seed)

采樣數(shù)據(jù)的一小部分,有或沒(méi)有更換,利用給定的隨機(jī)數(shù)發(fā)生器的種子
7

union(otherDataset)

返回一個(gè)新的數(shù)據(jù)集,其中包含源數(shù)據(jù)和參數(shù)元素的結(jié)合
8

intersection(otherDataset)

返回包含在源數(shù)據(jù)和參數(shù)元素的新RDD交集
9

distinct([numTasks])

返回一個(gè)新的數(shù)據(jù)集包含源數(shù)據(jù)集的不同元素
10

groupByKey([numTasks])

當(dāng)調(diào)用(K,V)數(shù)據(jù)集,返回(K, Iterable<V>) 對(duì)數(shù)據(jù)集

11

reduceByKey(func, [numTasks])

12

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

13

sortByKey([ascending], [numTasks])

14

join(otherDataset, [numTasks])

15

cogroup(otherDataset, [numTasks])

16

cartesian(otherDataset)

當(dāng)上調(diào)用類型T和U的數(shù)據(jù)集,返回(T,U)對(duì)數(shù)據(jù)集(所有元素對(duì))
17

pipe(command, [envVars])

RDD通過(guò)shell命令每個(gè)分區(qū),例如:一個(gè)Perl或bash腳本。RDD元素被寫(xiě)入到進(jìn)程的標(biāo)準(zhǔn)輸入和線路輸出,標(biāo)準(zhǔn)輸出形式返回一個(gè)字符串RDD

18

coalesce(numPartitions)

減少RDD到numPartitions分區(qū)的數(shù)量。過(guò)濾大型數(shù)據(jù)集后,更高效地運(yùn)行的操作
19

repartition(numPartitions)

打亂RDD數(shù)據(jù)隨機(jī)創(chuàng)造更多或更少的分區(qū),并在它們之間平衡。這總是打亂的所有數(shù)據(jù)在網(wǎng)絡(luò)上
20

repartitionAndSortWithinPartitions(partitioner)

根據(jù)給定的分區(qū)重新分區(qū)RDD及在每個(gè)結(jié)果分區(qū),排序鍵記錄。這是調(diào)用重新分配排序在每個(gè)分區(qū)內(nèi),因?yàn)樗梢酝苿?dòng)分揀向下進(jìn)入混洗機(jī)制效率更高。

動(dòng)作

下表給出了操作,及其返回值的列表。
S.No 操作 & 含義
1

reduce(func)

合計(jì)數(shù)據(jù)集的元素,使用函數(shù) func (其中有兩個(gè)參數(shù)和返回一行). 該函數(shù)應(yīng)該是可交換和可結(jié)合,以便它可以正確地在并行計(jì)算。

2

collect()

返回?cái)?shù)據(jù)集的所有作為數(shù)組在驅(qū)動(dòng)程序的元素。這是一個(gè)過(guò)濾器或其它操作之后返回?cái)?shù)據(jù)的一個(gè)足夠小的子集,通常是有用的

3

count()

返回該數(shù)據(jù)集的元素?cái)?shù)
4

first()

返回的數(shù)據(jù)集的第一個(gè)元素(類似于使用(1))
5

take(n)

返回與該數(shù)據(jù)集的前n個(gè)元素的陣列。
6

takeSample (withReplacement,num, [seed])

返回?cái)?shù)組的數(shù)據(jù)集num個(gè)元素,有或沒(méi)有更換隨機(jī)抽樣,預(yù)指定的隨機(jī)數(shù)發(fā)生器的種子可選

7

takeOrdered(n, [ordering])

返回RDD使用或者按其自然順序或自定義比較的前第n個(gè)元素
8

saveAsTextFile(path)

寫(xiě)入數(shù)據(jù)集是一個(gè)文本文件中的元素(或一組文本文件),在給定的目錄的本地文件系統(tǒng),HDFS或任何其他的Hadoop支持的文件系統(tǒng)。Spark調(diào)用每個(gè)元素的 toString,將其轉(zhuǎn)換為文件中的文本行

9

saveAsSequenceFile(path) (Java and Scala)

寫(xiě)入數(shù)據(jù)集,為Hadoop SequenceFile元素在給定的路徑寫(xiě)入在本地文件系統(tǒng),HDFS或任何其他Hadoop支持的文件系統(tǒng)。 這是適用于實(shí)現(xiàn)Hadoop可寫(xiě)接口RDDS的鍵 - 值對(duì)。在Scala中,它也可以在屬于隱式轉(zhuǎn)換為可寫(xiě)(Spark包括轉(zhuǎn)換為基本類型,如 Int, Double, String 等等)類型。

10

saveAsObjectFile(path) (Java and Scala)

寫(xiě)入數(shù)據(jù)集的內(nèi)容使用Java序列化為一個(gè)簡(jiǎn)單的格式,然后可以使用SparkContext.objectFile()加載。

11

countByKey()

僅適用于RDDS的類型 (K, V). 返回(K, Int)對(duì)與每個(gè)鍵的次數(shù)的一個(gè)HashMap。

12

foreach(func)

數(shù)據(jù)集的每個(gè)元素上運(yùn)行函數(shù)func。這通常對(duì)于不良反應(yīng),例如更新累加器或與外部存儲(chǔ)系統(tǒng)進(jìn)行交互進(jìn)行。

 ? 在 foreach()以外修改變量,其他累加器可能會(huì)導(dǎo)致不確定的行為。請(qǐng)參閱了解閉包的更多細(xì)節(jié)。

RDD編程

讓我們來(lái)看看幾個(gè)RDD轉(zhuǎn)換和操作RDD編程實(shí)現(xiàn),用一個(gè)例子的協(xié)助說(shuō)明。

示例

考慮一個(gè)單詞計(jì)數(shù)的例子 ? 它計(jì)算出現(xiàn)在文檔中的每個(gè)單詞。請(qǐng)看下面的文字為輸入并保存在主目錄中的 input.txt 文件。

input.txt ? 作為輸入文件

people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful  as they love, 
as they care as they share.
按照下面給出命令執(zhí)行示例程序。

打開(kāi)Spark-Shell

下面的命令用來(lái)打開(kāi)spark shell. 通常情況下,spark 使用Scala構(gòu)建。因此,Spark 程序需要在 Scala 環(huán)境中運(yùn)行。

$ spark-shell 

如果Spark shell 成功打開(kāi),會(huì)發(fā)現(xiàn)下面的輸出??纯摧敵觥癝park 上下文可作為sc” 的最后一行表示Spark容器會(huì)自動(dòng)創(chuàng)建Spark 上下文對(duì)象名為sc。啟動(dòng)程序的第一步驟之前,SparkContext 對(duì)象應(yīng)該被創(chuàng)建。

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc 
scala>

創(chuàng)建一個(gè)RDD

首先,我們必須使用 Spark-Scala API 讀取輸入文件,并創(chuàng)建一個(gè)RDD。

下面的命令被用于從給定位置讀出的文件。這里,新的 RDD 使用輸入文件名創(chuàng)建。這是在 textFile(“”)方法的參數(shù)字符串是用于輸入文件名的絕對(duì)路徑。然而,如果僅給出文件名,那么它輸入文件則在當(dāng)前位置。

scala> val inputfile = sc.textFile("input.txt")

執(zhí)行字?jǐn)?shù)轉(zhuǎn)換

我們的目標(biāo)是計(jì)算一個(gè)文件中的字?jǐn)?shù)。分裂每一行成詞創(chuàng)建一個(gè)平面地圖(flatMap(line ? line.split(“ ”)).

接下來(lái),讀每個(gè)詞作為一個(gè)鍵和值 ‘1’ (<key, value> = <word,1>) 使用映射函數(shù) (map(word ? (word, 1)).

最后,加入類似的鍵值降低這些鍵 (reduceByKey(_+_)).

下面的命令用于執(zhí)行字?jǐn)?shù)統(tǒng)計(jì)邏輯。執(zhí)行此操作后,不會(huì)有任何輸出,因?yàn)檫@不是一個(gè)動(dòng)作,這是一個(gè)轉(zhuǎn)換; 指向一個(gè)新的RDD或告訴spark,用給定的數(shù)據(jù)來(lái)做什么)。

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

當(dāng)前RDD

同時(shí)用RDD工作,如果想了解當(dāng)前的RDD,那么可使用下面的命令。 它會(huì)告訴你關(guān)于當(dāng)前RDD及其依賴調(diào)試的描述。

scala> counts.toDebugString

緩存轉(zhuǎn)換

可以使用 persist() 或 cache() 方法標(biāo)記一個(gè)RDD。在第一次計(jì)算的操作,這將被保存在存儲(chǔ)器中的節(jié)點(diǎn)上。使用下面的命令來(lái)存儲(chǔ)中間轉(zhuǎn)換在內(nèi)存中。

scala> counts.cache()

應(yīng)用動(dòng)作

應(yīng)用動(dòng)作(操作),比如存儲(chǔ)所有的轉(zhuǎn)換結(jié)果到一個(gè)文本文件中。saveAsTextFile(“”)方法字符串參數(shù)是輸出文件夾的絕對(duì)路徑。試試下面的命令來(lái)保存輸出文本文件。在下面的例子中, ‘output’ 的文件夾為當(dāng)前位置。

scala> counts.saveAsTextFile("output")

檢查輸出

打開(kāi)另一個(gè)終端進(jìn)入主目錄(其中spark 在其他終端中執(zhí)行)。下面的命令用于檢查輸出目錄。

[hadoop@localhost ~]$ cd output/ 
[hadoop@localhost output]$ ls -1 
 
part-00000 
part-00001 
_SUCCESS

下面的命令是用來(lái)查看輸出的 Part-00001 文件。

[hadoop@localhost output]$ cat part-00000

輸出

(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1) 
下面的命令是用來(lái)查看輸出的 Part-00001 文件。
[hadoop@localhost output]$ cat part-00001 

輸出

(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1) 

UN持久存儲(chǔ)

UN持續(xù)存在之前,如果想看到用于該應(yīng)用程序的存儲(chǔ)空間,可使用下面的URL在瀏覽器中查看。
http://localhost:4040
這將會(huì)看到下面的屏幕,該屏幕顯示用于應(yīng)用程序,這些都在 Spark shell 運(yùn)行的存儲(chǔ)空間。
storage space
如果想特別的RDD存儲(chǔ)空間,然后使用下面的命令。
Scala> counts.unpersist() 

將看到如下輸出 ?

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) 
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
為了驗(yàn)證在瀏覽器中的存儲(chǔ)空間,使用下面的URL。
http://localhost:4040/
會(huì)看到下面的畫(huà)面。它用于應(yīng)用程序,這是在Spark shell運(yùn)行存儲(chǔ)空間。
Storage space for application

上一篇:Spark編程下一篇:Spark教程