欢迎光临八叔引擎之家,本站所有资源仅供学习与参考,禁止用于商业用途或从事违法行为!

八叔引擎之家

第2章:MapReduce
行业应用 2021-01-21

数据格式

示例:2-1
0057
332130 # USAF 天气基站标识
99999 # WBAN 天气基站标识
19500101 # 观察日期
0300 # 观察时间
4
+51317 # 纬度 (角度 x 1000)
+028783 # 经度 (角度 x 1000)
FM-12
+0171 # 海拔 (米)
99999
V020
320 # 风向 (角度)
1 # 质量码
N
0072
1
00450 # 天空最高高度 (米)
1 # 质量码
C
N
010000 # 可见距离 (米)
1 # 质量码
N
9
-0128 # 空气温度 (摄氏度 x 10)
1 # 质量码
-0139 # 露点温度 (摄氏度 x 10)
1 # 质量码
10268 # 大气压 (百帕 x 10)
1 # 质量码
% ls raw/1990 | head
010010-99999-1990.gz
010014-99999-1990.gz
010015-99999-1990.gz
010016-99999-1990.gz
010017-99999-1990.gz
010030-99999-1990.gz
010040-99999-1990.gz
010080-99999-1990.gz
010100-99999-1990.gz
010150-99999-1990.gz
示例2-2
#!/usr/bin/env bash
for year in all/*
do
echo -ne `basename $year .gz`"\t"
gunzip -c $year | \
awk '{ temp = substr($0,88,5) + 0;
q = substr($0,93,1);
if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
END { print max }'
done
% ./max_temperature.sh
1901 317
1902 244
1903 289
1904 256
1905 283
...

Map和Reduce

JAVA MapReduce

示例2-3
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper
      extends Mapper<LongWritable,Text,IntWritable> {
      private static final int MISSING = 9999;

    @Override
    public void map(LongWritable key,Text value,Context context)
          throws IOException,InterruptedException {
          String line = value.toString();
          String year = line.substring(15,19);
          int airTemperature;
          if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
            airTemperature = Integer.parseInt(line.substring(88,92));
          } else {
            airTemperature = Integer.parseInt(line.substring(87,92));
          }
          String quality = line.substring(92,93);
          if (airTemperature != MISSING && quality.matches("[01459]")) {
            context.write(new Text(year),new IntWritable(airTemperature));
         }
     }
}
示例2-4
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer
    extends Reducer<Text,IntWritable,IntWritable> {
    @Override
    public void reduce(Text key,Iterable<IntWritable> values,Context context)
      throws IOException,InterruptedException {
      int maxValue = Integer.MIN_VALUE;
      for (IntWritable value : values) {
        maxValue = Math.max(maxValue,value.get());
      }
      context.write(key,new IntWritable(maxValue));
    }
}
示例2-5
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
   public class MaxTemperature {
      public static void main(String[] args) throws Exception {
      if (args.length != 2) {
        System.err.println("Usage: MaxTemperature <input path> <output path>");
        System.exit(-1);
      }
      Job job = new Job();
      job.setJarByClass(MaxTemperature.class);
      job.setJobName("Max temperature");
      FileInputFormat.addInputPath(job,new Path(args[0]));
      FileOutputFormat.setOutputPath(job,new Path(args[1]));
      job.setMapperClass(MaxTemperatureMapper.class);
      job.setReducerClass(MaxTemperatureReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
这部分使用的Java MapReduce API以及这本书所使用的所有API被称为"New API"。  
它代替了功能相同的老的API。这两种API区别请查看附录D,并且附录D有如何在这两种API转换的相关建议。当然你也能在这儿用旧的API完成相同功能的获取每年最高温度的应用。

测试运行

% export HADOOP_CLASSPATH=hadoop-examples.jar
% hadoop MaxTemperature input/ncdc/sample.txt output
14/09/16 09:48:39 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/09/16 09:48:40 WARN mapreduce.JobSubmitter: Hadoop command-line option
parsing not performed. Implement the Tool interface and execute your application
with ToolRunner to remedy this.
14/09/16 09:48:40 INFO input.FileInputFormat: Total input paths to process : 1
14/09/16 09:48:40 INFO mapreduce.JobSubmitter: number of splits:1
14/09/16 09:48:40 INFO mapreduce.JobSubmitter: Submitting tokens for job:
job_local26392882_0001
14/09/16 09:48:40 INFO mapreduce.Job: The url to track the job:
http://localhost:8080/
14/09/16 09:48:40 INFO mapreduce.Job: Running job: job_local26392882_0001
14/09/16 09:48:40 INFO mapred.LocalJobRunner: OutputCommitter set in config null
14/09/16 09:48:40 INFO mapred.LocalJobRunner: OutputCommitter is
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Waiting for map tasks
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Starting task:
attempt_local26392882_0001_m_000000_0
14/09/16 09:48:40 INFO mapred.Task: Using ResourceCalculatorProcessTree : null
14/09/16 09:48:40 INFO mapred.LocalJobRunner:
14/09/16 09:48:40 INFO mapred.Task: Task:attempt_local26392882_0001_m_000000_0
is done. And is in the process of committing
14/09/16 09:48:40 INFO mapred.LocalJobRunner: map
14/09/16 09:48:40 INFO mapred.Task: Task 'attempt_local26392882_0001_m_000000_0'
done.
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Finishing task:
attempt_local26392882_0001_m_000000_0
14/09/16 09:48:40 INFO mapred.LocalJobRunner: map task executor complete.
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Waiting for reduce tasks
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Starting task:
attempt_local26392882_0001_r_000000_0
14/09/16 09:48:40 INFO mapred.Task: Using ResourceCalculatorProcessTree : null
14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/16 09:48:40 INFO mapred.Merger: Merging 1 sorted segments
14/09/16 09:48:40 INFO mapred.Merger: Down to the last merge-pass,with 1
segments left of total size: 50 bytes
14/09/16 09:48:40 INFO mapred.Merger: Merging 1 sorted segments
14/09/16 09:48:40 INFO mapred.Merger: Down to the last merge-pass,with 1
segments left of total size: 50 bytes
14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/16 09:48:40 INFO mapred.Task: Task:attempt_local26392882_0001_r_000000_0
is done. And is in the process of committing
14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied.
14/09/16 09:48:40 INFO mapred.Task: Task attempt_local26392882_0001_r_000000_0
28  |  Chapter 2: MapReduce
is allowed to commit now
14/09/16 09:48:40 INFO output.FileOutputCommitter: Saved output of task
'attempt...local26392882_0001_r_000000_0' to file:/Users/tom/book-workspace/
hadoop-book/output/_temporary/0/task_local26392882_0001_r_000000
14/09/16 09:48:40 INFO mapred.LocalJobRunner: reduce > reduce
14/09/16 09:48:40 INFO mapred.Task: Task 'attempt_local26392882_0001_r_000000_0'
done.
14/09/16 09:48:40 INFO mapred.LocalJobRunner: Finishing task:
attempt_local26392882_0001_r_000000_0
14/09/16 09:48:40 INFO mapred.LocalJobRunner: reduce task executor complete.
14/09/16 09:48:41 INFO mapreduce.Job: Job job_local26392882_0001 running in uber
mode : false
14/09/16 09:48:41 INFO mapreduce.Job: map 100% reduce 100%
14/09/16 09:48:41 INFO mapreduce.Job: Job job_local26392882_0001 completed
successfully
14/09/16 09:48:41 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=377168
FILE: Number of bytes written=828464
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=5
Map output records=5
Map output bytes=45
Map output materialized bytes=61
Input split bytes=129
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=61
Reduce input records=5
Reduce output records=2
Spilled Records=10
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=39
Total committed heap usage (bytes)=226754560
File Input Format Counters
Bytes Read=529
File Output Format Counters
Bytes Written=29
当以本地(脱机)模式运行时,这本书中所有程序都假设你已经以这种方法设置了 HADOOP_CLASSPATH环境变量。这条命令应该在示例代码所在目录运行。

工作流

组合函数(Combiner Function)

指定组合函数

示例2-6
      public class MaxTemperatureWithCombiner {
       public static void main(String[] args) throws Exception {
          if (args.length != 2) {
             System.err.println("Usage: MaxTemperatureWithCombiner   
             <input path> " +"<output path>");
            System.exit(-1);
           }
       Job job = new Job();
       job.setJarByClass(MaxTemperatureWithCombiner.class);
       job.setJobName("Max temperature");
       FileInputFormat.addInputPath(job,new Path(args[0]));
       FileOutputFormat.setOutputPath(job,new Path(args[1]));
       job.setMapperClass(MaxTemperatureMapper.class);
       job.setCombinerClass(MaxTemperatureReducer.class);
       job.setReducerClass(MaxTemperatureReducer.class);
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(IntWritable.class);
       System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
    }

运行一个分布式的MapReduce作业

Ruby

示例2-7
#!/usr/bin/env ruby
STDIN.each_line do |line|
val = line
year,temp,q = val[15,4],val[87,5],val[92,1]
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end
% cat input/ncdc/sample.txt | ch02-mr-intro/src/main/ruby/max_temperature_map.rb
1950 +0000
1950 +0022
1950 -0011
1949 +0111
1949 +0078
#!/usr/bin/env ruby
last_key,max_val = nil,-1000000
STDIN.each_line do |line|
key,val = line.split("\t")
 if last_key && last_key != key
   puts "#{last_key}\t#{max_val}"
   last_key,max_val = key,val.to_i 
 else
   last_key,[max_val,val.to_i].max
 end
end
puts "#{last_key}\t#{max_val}" if last_key
% cat input/ncdc/sample.txt | \
ch02-mr-intro/src/main/ruby/max_temperature_map.rb | \
sort | ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb
1949 111
1950 22
% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \
-reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb
% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files ch02-mr-intro/src/main/ruby/max_temperature_map.rb,\
ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \
-input input/ncdc/all \
-output output \
-mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \
-combiner ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \
-reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb

Python

示例2-9:map script
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
(year,q) = (val[15:19],val[87:92],val[92:93])
if (temp != "+9999" and re.match("[01459]",q)):
print "%s\t%s" % (year,temp)

示例:2-10 reduce script
import sys
(last_key,max_val) = (None,-sys.maxint)
for line in sys.stdin:
(key,val) = line.strip().split("\t")
  if last_key and last_key != key:
     print "%s\t%s" % (last_key,max_val)
     (last_key,max_val) = (key,int(val))
else:
     (last_key,max(max_val,int(val)))
 if last_key: 
 print "%s\t%s" % (last_key,max_val)
% cat input/ncdc/sample.txt | \
ch02-mr-intro/src/main/python/max_temperature_map.py | \
sort | ch02-mr-intro/src/main/python/max_temperature_reduce.py
1949 111
1950 22
本文链接:http://www.viiis.cn/news/show_23577.html
第2章:MapReduce

本站采用系统自动发货方式,付款后即出现下载入口,如有疑问请咨询在线客服!

售后时间:早10点 - 晚11:30点

咨询售后客服

服务热线 19970861797
服务热线 19970861797服务热线 19970861797
手机二维码
返回顶部
返回顶部返回顶部