如何使用独立应用程序(self-contained applications) 来使用Spark API, 实现一个单词计数小程序。

新建一个Maven项目

1
$ mvn archetype:generate

编辑MAVEN配置文件

pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<project>
<groupId>com.zengyilun</groupId>
<artifactId>spark-hello</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Spark Hello Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
</project>

新建驱动程序(含主函数的程序) 来实现单词技术的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class WordCount {

public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("wordCount");
JavaSparkContext sc = new JavaSparkContext(conf);
if(args.length < 2){
System.out.println("plz input args[0](input-path) args[1](output-path)");
System.exit(1);
}
JavaRDD<String> input = sc.textFile(args[0]);
JavaRDD<String> words = input.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
JavaPairRDD<Object, Object> counts = words.mapToPair(new PairFunction<String, Object, Object>() {
public Tuple2<Object, Object> call(String s) throws Exception {
return new Tuple2<Object, Object>(s, 1);
}
}).reduceByKey(new Function2<Object, Object, Object>() {
public Integer call(Object x, Object y) throws Exception {
return (Integer)x + (Integer)y;
}
});
counts.saveAsTextFile(args[1]);
}
}

目前的项目结构

1
2
3
4
5
6
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/WordCount.java

构建项目, 使用 ./bin/spark-submit 提交并执行项目。

1
2
3
4
5
$ mvn package

[INFO] Building jar : {..}/{..}/target/spark-hello.jar

$ SPARK_HOME/bin/spark-submit --class "WordCount" target/spark-hello.jar SPARK_HOME/README.md /tmp/wc

通过传入 Spark 目录下的README.md, Spark 统计单词结构将会输出在 /tmp/wc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
$ find /tmp/wc
part-00000
part-00001
_SUCCESS
$ cat /tmp/wc/_part-00000
(package,1)
(this,1)
(Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1)
(Because,1)
(Python,2)
(page](http://spark.apache.org/documentation.html).,1)
(cluster.,1)
(its,1)
([run,1)
(general,3)
(have,1)
(pre-built,1)
(YARN,,1)
([http://spark.apache.org/developer-tools.html](the,1)
(changed,1)
...

参考资料