【SparkSQL笔记】SparkSQL的Dataset操作大全(二)(spark中dataset用法)
841
2022-05-29
MapReduce 是一种可用于数据处理的编程模型。该模型比较简单,但要想写出有用的程序却不太容易。Hadoop 可以运行各种语言版本的 MapReduce 程序, 比如Java、Ruby、Python和C++语言版本。最重要的是,MapReduce 程序本质上是并行运行的,因此可以将大规模的数据分析任务分发给任何一个拥有足够多机器的数据中心。 MapReduce 的优势在于处理大规模数据集,所以这里我们先来看一个数据集。
气象数据集
在篇要讲的是,写一个挖掘气象数据的程序。气象数据是通过分布在美国全国各地区的很多气象传感器每隔一小时进行收集,这些数据是半结构化数据且是按照记录方式存储的,因此非常适合使用 MapReduce 程序来统计分析。
MapReduce 是一种可用于数据处理的编程模型。该模型比较简单,但要想写出有用的程序却不太容易。Hadoop 可以运行各种语言版本的 MapReduce 程序, 比如Java、Ruby、Python和C++语言版本。最重要的是,MapReduce 程序本质上是并行运行的,因此可以将大规模的数据分析任务分发给任何一个拥有足够多机器的数据中心。 MapReduce 的优势在于处理大规模数据集,所以这里我们先来看一个数据集。
气象数据集
在篇要讲的是,写一个挖掘气象数据的程序。气象数据是通过分布在美国全国各地区的很多气象传感器每隔一小时进行收集,这些数据是半结构化数据且是按照记录方式存储的,因此非常适合使用 MapReduce 程序来统计分析。
我们使用的数据来自美国国家气候数据中心、美国国家海洋和大气管理局(简称 NCDC NOAA),这些数据按行并以 ASCII 格式存储,其中每一行是一条记录。 下面我们展示一行采样数据,其中重要的字段被突出显示。该行数据被分割成很多行以突出每个字段,但在实际文件中,这些字段被整合成一行且没有任何分隔符。
数据文件按照气象站和日期进行组织,每个气象站都是一个总目录,而且每个气象站下面从 1980 年到 2010 年,每一年又都作为一个子目录。 因为美国有成千上万个气象站,所以整个数据集由大量的小文件组成。通常情况下,处理少量的大型文件更容易、更有效,因此,这些数据需要经过预处理,将每个气象站的数据文件拼接成一个单独的文件。 预处理过的数据文件示例如下所示:
数据集导入HDFS
一旦数据下载并解压到本地目录之后,我们通过已经安装好hadoop-eclipse-plugin-xxx.jar插件,很容易将气象站数据导入HDFS。首先通过插件连接 HDFS(连接地址:hdfs://xxx002:9000):
规划好气象站数据集在HDFS中的目录结构(hdfs://xxx002:9000/middle/weather),然后右键点击weather目录选择Upload files to DFS,最后选中需要上传的气象站数据集。
当所有数据集上传至HDFS,我们就可以使用MapReduce jobs来统计分析气象站数据集。
我们也可以通过命令行访问刚刚上传至HDFS的数据集:
[hadoop@xxx002 hadoop]$ bin/hdfs dfs -ls /middle/weather/
使用 Hadoop 来分析数据
为了充分利用 Hadoop 提供的并行处理优势,我们需要将查询表示成 MapReduce 作业。完成本地小规模测试之后,就可以把作业部署到集群上面运行。 那么 MapReduce 作业到底由哪几个部分组成的呢?接下来我们详细介绍。
map 和 reduce
MapReduce 任务过程分为两个处理阶段:map 阶段和reduce阶段 。每个阶段都以键值对作为输入和输出,其类型由程序员自己来选择。程序员还需要写两个函数:map 函数和 reduce 函数。
在这里,map 阶段的输入是 NCDC NOAA 原始数据。我们选择文本格式作为输入格式,将数据集的每一行作为文本输入。键是某一行起始位置相对于文件起始位置的偏移量,不过我们不需要这个信息,所以将其忽略。
我们的 map 函数很简单。由于我们只对气象站和气温感兴趣,所以只需要取出这两个字段数据。在本课程中,map 函数只是一个数据准备阶段, 通过这种方式来准备数据,使 reducer 函数能够继续对它进行处理:即统计出每个气象站 30 年来的平均气温。map 函数还是一个比较合适去除已损记录的地方,在 map 函数里面,我们可以筛掉缺失的或者错误的气温数据。
为了全面了解 map 的工作方式,我们考虑以下输入数据的示例:
1985 07 31 02 200 94 10137 220 26 1 0 -9999
1985 07 31 03 172 94 10142 240 0 0 0 -9999
1985 07 31 04 156 83 10148 260 10 0 0 -9999
1985 07 31 05 133 78 -9999 250 0 -9999 0 -9999
1985 07 31 06 122 72 -9999 90 0 -9999 0 0
1985 07 31 07 117 67 -9999 60 0 -9999 0 -9999
1985 07 31 08 111 61 -9999 90 0 -9999 0 -9999
1985 07 31 09 111 61 -9999 60 5 -9999 0 -9999
1985 07 31 10 106 67 -9999 80 0 -9999 0 -9999
1985 07 31 11 100 56 -9999 50 5 -9999 0 -9999
这些行以键/值对的方式作为 map 函数的输入:
(0,1985 07 31 02 200 94 10137 220 26 1 0 -9999)
(62,1985 07 31 03 172 94 10142 240 0 0 0 -9999)
(124,1985 07 31 04 156 83 10148 260 10 0 0 -9999)
(186,1985 07 31 05 133 78 -9999 250 0 -9999 0 -9999)
(248,1985 07 31 06 122 72 -9999 90 0 -9999 0 0)
(310,1985 07 31 07 117 67 -9999 60 0 -9999 0 -9999)
(371,1985 07 31 08 111 61 -9999 90 0 -9999 0 -9999)
(434,1985 07 31 09 111 61 -9999 60 5 -9999 0 -9999)
(497,1985 07 31 10 106 67 -9999 80 0 -9999 0 -9999)
(560,1985 07 31 11 100 56 -9999 50 5 -9999 0 -9999)
键(key)是文件中的偏移量,map 函数并不需要这个信息,所以将其忽略。map 函数的功能仅限于提取气象站和气温信息,并将它们作为输出。
map 函数的输出经由 MapReduce 框架处理后,最后发送到 reduce 函数。这个处理过程基于键来对键值对进行排序和分组。因此在这个示例中,reduce 函数看到的是如下输入:
(03103,[200,172,156,133,122,117,111,111,106,100])
每个气象站后面紧跟着一系列气温数据,reduce 函数现在要做的是遍历整个列表并统计出平均气温:
03103 132
这是最终输出结果即每一个气象站历年的平均气温。
下图代表了 MapReduce 高层设计:
Java MapReduce
我们明白 MapReduce 程序的工作原理之后,下一步就是写代码实现它。我们需要编写三块代码内容:一个 map 函数、一个 reduce 函数和一些用来运行作业的代码。
map 函数由 Mapper 类实现来表示,Mapper 声明一个 map() 虚方法,其内容由我们自己来实现。
下面我们来编写 Mapper 类,实现 map() 方法,提取气象站和气温数据。
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
*
* @function 统计美国各个气象站30年来的平均气温
* @author cs
*
*/
public class Temperature extends Configured implements Tool {
public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> {
/**
* @function Mapper 解析气象站数据
* @input key=偏移量 value=气象站数据
* @output key=weatherStationId value=temperature
*/
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString(); //每行气象数据
int temperature = Integer.parseInt(line.substring(14, 19).trim());//每小时气温值
if (temperature != -9999) { //过滤无效数据
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String weatherStationId = fileSplit.getPath().getName().substring(5, 10);//通过文件名称提取气象站id
context.write(new Text(weatherStationId), new IntWritable(temperature));
}
}
}
}
这个 Mapper 类是一个泛型类型,它有四个形参类型,分别指定 map 函数的输入键、输入值、输出键和输出值的类型。 就本示例来说,输入键是一个长整数偏移量,输入值是一行文本,输出键是气象站id,输出值是气温(整数)。Hadoop 本身提供了一套可优化网络序列化传输的基本类型,而不是使用 java 内嵌的类型。这些类型都在 org.apache.hadoop.io 包中。 这里使用 LongWritable 类型(相当于 Java 的 Long 类型)、Text 类型(相当于 Java 中的 String 类型)和 IntWritable 类型(相当于 Java 的 Integer 类型)。
map() 方法的输入是一个键(key)和一个值(value),我们首先将 Text 类型的 value 转换成 Java 的 String 类型, 之后使用 substring()方法截取我们业务需要的值。map() 方法还提供了 Context 实例用于输出内容的写入。 在这种情况下,我们将气象站id按 Text 对象进行读/写(因为我们把气象站id当作键),将气温值封装在 IntWritale 类型中。只有气温数据不缺失,这些数据才会被写入输出记录中。
我们以上面类似的方法用 Reducer 来定义 reduce 函数,统计每个气象站的平均气温:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
*
* @function 统计美国各个气象站30年来的平均气温
* @author cs
*
*/
public class Temperature extends Configured implements Tool {
/**
*
* @function Reducer 统计美国各个气象站的平均气温
* @input key=weatherStationId value=temperature
* @output key=weatherStationId value=average(temperature)
*/
public static class TemperatureReducer extends Reducer< Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable< IntWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0;
int count = 0;
//统计每个气象站的气温值总和
for (IntWritable val : values) {
sum += val.get();
count++;
}
//求每个气象站的气温平均值
result.set(sum / count);
context.write(key, result);
}
}
}
同样,reduce 函数也有四个形式参数类型用于指定输入和输出类型。reduce 函数的输入类型必须匹配 map 函数的输出类型:即 Text 类型和 IntWritable 类型。 在这种情况下,reduce 函数的输出类型也必须是 Text 和 IntWritable 类型,分别是气象站id和平均气温。在 map 的输出结果中,所有相同的气象站(key)被分配到同一个reduce执行,这个平均气温就是针对同一个气象站(key),通过循环所有气温值(values)求和并求平均数所得到的。
第三部分代码负责运行 MapReduce 作业 :
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @function 统计美国各个气象站30年来的平均气温
* @author cs
*
*/
public class Temperature extends Configured implements Tool {
/**
* @function 任务驱动方法
* @param args
* @return
* @throws Exception
*/
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();//读取配置文件
Path mypath = new Path(args[1]);
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}
Job job = new Job(conf, "temperature");//新建一个任务
job.setJarByClass(Temperature.class);// 设置主类
FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
job.setMapperClass(TemperatureMapper.class);// Mapper
job.setReducerClass(TemperatureReducer.class);// Reducer
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true)?0:1;//提交任务
}
/**
* @function main 方法
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//数据输入路径和输出路径
String[] args0 = {
"hdfs://xxx002:9000/middle/weather/",
"hdfs://xxx002:9000/middle/weather/out/"
};
int ec = ToolRunner.run(new Configuration(), new Temperature(), args0);
System.exit(ec);
}
}
Configuration 类读取 Hadoop 的配置文件,如 site-core.xml、mapred-site.xml、hdfs-site.xml 等。
Job 对象指定作业执行规范,我们可以用它来控制整个作业的运行。我们在 Hadoop 集群上运行这个作业时,要把代码打包成一个 JAR 文件(Hadoop 在集群上发布这个文件)。 不必明确指定 JAR 文件的名称,在 Job 对象的 setJarByClass 方法中传递一个类即可,Hadoop 利用这个类来查找包含它的 JAR 文件,进而找到相关的 JAR 文件。
构造 Job 对象之后,需要指定输入和输出数据的路径。调用 FileInputFormat 类的静态方法 addInputPath() 来定义输入数据的路径,这个路径可以是单个的文件、一个目录(此时,将目录下所有文件当作输入)或符合特定文件模式的一系列文件。由函数名可知,可以多次调用 addInputPath() 来实现多路径的输入。 调用 FileOutputFormat 类中的静态方法 setOutputPath() 来指定输出路径(只能有一个输出路径)。这个方法指定的是 reduce 函数输出文件的写入目录。 在运行作业前该目录是不应该存在的,否则 Hadoop 会报错并拒绝运行作业。这种预防措施的目的是防止数据丢失(长时间运行的作业如果结果被意外覆盖,肯定是件可怕的事情)。
接着,通过 setMapperClass() 和 setReducerClass() 指定 map 类型和reduce 类型。
setOutputKeyClass() 和 setOutputValueClass() 控制 map 和 reduce 函数的输出类型,正如本例所示,这两个输出类型一般都是相同的。如果不同,则通过 setMapOutputKeyClass()和setMapOutputValueClass()来设置 map 函数的输出类型。
输入的类型通过 InputFormat 类来控制,我们的例子中没有设置,因为使用的是默认的 TextInputFormat(文本输入格式)。
在设置定义 map 和 reduce 函数的类之后,可以开始运行作业。Job 中的 waitForCompletion() 方法提交作业并等待执行完成。该方法中的布尔参数是个详细标识,所以作业会把进度写到控制台。 waitForCompletion() 方法返回一个布尔值,表示执行的成(true)败(false),这个布尔值被转换成程序的退出代码 0 或者 1。
运行测试
编写好 MapReduce 作业之后,通常要拿一个小型数据集进行测试以排除代码问题。程序测试成功之后,我们通过 eclipse 工具将 MapReduce作业打成jar包即temperature.jar,然后然后上传至/home/hadoop/xxx/ 目录下,由 hadoop 脚本来执行。
[hadoop@xxx002 hadoop-2.2.0-x64]$ export HADOOP_CLASSPATH=/home/hadoop/xxx/temperature.jar
[hadoop@xxx002 hadoop-2.2.0-x64]$ hadoop com.xxx.hadoop.advance.Temperature
如果调用 hadoop 命令的第一个参数是类名,Hadoop 就会启动一个 JVM 来运行这个类。使用 hadoop 命令运行作业比直接使用 Java 命令来运行更方便,因为前者将 Hadoop 库文件(及其依赖关系)路径加入到类路径参数中, 同时也能获得 Hadoop 的配置文件。需要定义一个 HADOOP_CLASSPATH 环境变量用于添加应用程序类的路径,然后由 Hadoop 脚本来执行相关操作。
运行作业所得到的输出提供了一些有用的信息。例如我们可以看到,这个作业有指定的标识,即job_1432108212572_0001,并且执行了一个 map 任务和一个 reduce 任务。
输出的最后一部分,以 Counter 为标题,显示 Hadoop 上运行的每个作业的一些统计信息。这些信息对检查数据是否按照预期进行处理非常有用。
输出数据写入out 目录,其中每个 reducer 都有一个输出文件。我们的例子中只有一个 reducer,所以只能找到一个名为 part-00000 的文件:
[hadoop@xxx002 hadoop-2.2.0-x64]$ hadoop fs -cat /weather/out/part-r-00000
03103 132
这个结果和我们之前手动寻找的结果一样。我们把这个结果解释为编号为03103气象站的平均气温为13.2摄氏度。
注意:上面输出结果的输入数据集是示例中的10条数据,而不是下载的整个数据集。如果大家使用整个数据集跑出的结果不一致,很正常。
监控 Hadoop
Ambari 仪表盘和监视器工具为我们提供了比较直观的Hadoop集群信息。一旦Hadoop 集群启动,Ambari可以监控整个集群的健康状况。
另外我们还可以通过Azkaban 来负责MapReduce的作业调度,同时它能提供MapReduce运行信息和详细的执行日志,包括运行时间:
至于Azkaban 作业调度以及Ambari集群部署监控,在后续的篇幅中再说明。 目前这个阶段重点掌握MapReduce的编程套路即可。
数据可视化
当 MapReduce Job执行完毕,我们可以将结果集入库然后使用可视化技术对数据进行可视化。
如有问题欢迎探讨 谢谢!
Hadoop MapReduce
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。