本项目包含以下示例:
MapReduce
- WordCount: 单词统计
Hive
- sample.hive:表的简单查询
Pig
- sample.pig:Pig处理OSS数据实例
Spark
- SparkPi: 计算Pi
- SparkWordCount: 单词统计
- LinearRegression: 线性回归
- OSSSample: OSS使用示例
- ONSSample: ONS使用示例
- ODPSSample: ODPS使用示例
- MNSSample:MNS使用示例
- LoghubSample:Loghub使用示例
PySpark
- WordCount: 单词统计
依赖资源
测试数据(data目录下):
- The_Sorrows_of_Young_Werther.txt:可作为WordCount(MapReduce/Spark)的输入数据
- patterns.txt:WordCount(MapReduce)作业的过滤字符
- u.data:sample.hive脚本的测试表数据
- abalone:线性回归算法测试数据
依赖jar包(lib目录下)
- tutorial.jar:sample.pig作业需要的依赖jar包
准备工作
本项目提供了一些测试数据,您可以简单地将其上传到OSS中即可使用。其他示例,例如ODPS,MNS,ONS和Loghub等等,需要您自己准备数据如下:
- 【可选】 创建LogStore,参考日志服务用户指南。
- 【可选】 创建ODPS项目和表,参考ODPS快速开始。
- 【可选】 创建ONS,参考消息队列快速开始。
- 【可选】 创建MNS,参考消息服务控制台使用帮助。
基本概念:
- OSSURI: oss://accessKeyId:[email protected]/a/b/c.txt,用户在作业中指定输入输出数据源时使用,可以类比hdfs://。
- 阿里云AccessKeyId/AccessKeySecret是您访问阿里云API的密钥,你可以在这里获取。
集群运行
-
Spark
- SparkWordCount:
spark-submit --class SparkWordCount examples-1.0-SNAPSHOT-shaded.jar <inputPath> <outputPath> <numPartition>
- inputPath: 输入数据路径
- outputPath: 输出路径
- numPartition: 输入数据RDD分片数目
- SparkPi:
spark-submit --class SparkPi examples-1.0-SNAPSHOT-shaded.jar
- SparkOssDemo:
spark-submit --class SparkOssDemo examples-1.0-SNAPSHOT-shaded.jar <accessKeyId> <accessKeySecret> <endpoint> <inputPath> <numPartition>
- accessKeyId: 阿里云AccessKeyId
- accessKeySecret:阿里云AccessKeySecret
- endpoint: 阿里云OSS endpoint
- inputPath: 输入数据路径
- numPartition:输入数据RDD分片数目
- SparkRocketMQDemo:
spark-submit --class SparkRocketMQDemo examples-1.0-SNAPSHOT-shaded.jar <accessKeyId> <accessKeySecret> <consumerId> <topic> <subExpression> <parallelism>
- accessKeyId: 阿里云AccessKeyId
- accessKeySecret:阿里云AccessKeySecret
- consumerId: 参考Consumer ID说明
- topic: 每个消息队列都有一个topic
- subExpression: 参考消息过滤。
- parallelism:指定多少个接收器来消费队列消息。
- SparkMaxComputeDemo:
spark-submit --class SparkMaxComputeDemo examples-1.0-SNAPSHOT-shaded.jar <accessKeyId> <accessKeySecret> <envType> <project> <table> <numPartitions>
- SparkMNSDemo:
spark-submit --class SparkMNSDemo examples-1.0-SNAPSHOT-shaded.jar <queueName> <accessKeyId> <accessKeySecret> <endpoint>
- queueName:队列名,参考MNS名词解释。
- accessKeyId: 阿里云AccessKeyId
- accessKeySecret:阿里云AccessKeySecret
- endpoint:队列数据访问地址
- SparkSLSDemo:
spark-submit --class SparkSLSDemo examples-1.0-SNAPSHOT-shaded.jar <sls project> <sls logstore> <loghub group name> <sls endpoint> <access key id> <access key secret> <batch interval seconds>
- sls project: LogService项目名
- sls logstore: 日志库名
- loghub group name:作业中消费日志数据的组名,可以任意取。sls project,sls store相同时,相同组名的作业会协同消费sls store中的数据;不同组名的作业会相互隔离地消费sls store中的数据。
- sls endpoint: 参考日志服务入口。
- accessKeyId: 阿里云AccessKeyId
- accessKeySecret:阿里云AccessKeySecret
- batch interval seconds: Spark Streaming作业的批次间隔,单位为秒。
- LinearRegression:
spark-submit --class LinearRegression examples-1.0-SNAPSHOT-shaded.jar <inputPath> <numPartitions>
- inputPath:输入数据
- numPartition:输入数据RDD分片数目
- SparkWordCount:
-
PySpark
- WordCount:
spark-submit wordcount.py <inputPath> <outputPath> <numPartition>
- inputPath: 输入数据路径
- outputPath: 输出路径
- numPartition: 输入数据RDD分片数目
- WordCount:
-
Mapreduce
- WordCount:
hadoop jar examples-1.0-SNAPSHOT-shaded.jar WordCount -Dwordcount.case.sensitive=true <inputPath> <outputPath> -skip <patternPath>
- inputPathl:输入数据路径
- outputPath:输出路径
- patternPath:过滤字符文件,可以使用data/patterns.txt
- WordCount:
-
Hadoop Streaming
- WordCount:
hadoop jar /usr/lib/hadoop-current/share/hadoop/tools/lib/hadoop-streaming-*.jar -file <mapperPyFile> -mapper mapper.py -file <reducerPyFile> -reducer reducer.py -input <inputPath> -output <outputPath>
- WordCount:
-
Hive
hive -f sample.hive -hiveconf inputPath=<inputPath>
- inputPath:输入数据路径
-
Pig
pig -x mapreduce -f sample.pig -param tutorial=<tutorialJarPath> -param input=<inputPath> -param result=<resultPath>
- tutorialJarPath:依赖Jar包,可使用lib/tutorial.jar
- inputPath:输入数据路径
- resultPath:输出路径
-
注意:
- 如果在E-MapReduce上使用时,请将测试数据和依赖jar包上传到OSS中,路径规则遵循OSSURI定义,见上。
- 如果集群中使用,可以放在机器本地。
本地运行
这里主要介绍如何在本地运行Spark程序访问阿里云数据源,例如OSS等。如果希望本地调试运行,最好借助一些开发工具,例如Intellij IDEA或者Eclipse。尤其是Windows环境,否则需要在Windows机器上配置Hadoop和Spark运行环境,很麻烦。
-
Intellij IDEA
-
Scala IDE for Eclipse