欢迎光临八叔引擎之家,本站所有资源仅供学习与参考,禁止用于商业用途或从事违法行为!

八叔引擎之家

第3章:Hadoop分布式文件系统(1)
行业应用 2021-01-21

非常大体积的文件

流式数据访问

日常用的硬件设备

  • 低延迟数据读取
    那些要求数据读取几十毫秒延迟的应用不适合使用HDFS。记住,HDFS对于传输高吞吐量数据进行了优化,这也许以延迟为代价。HBase当前是低延迟比较好的选择,见第20章节。
  • 大量的小文件
    由于文件元数据存储在内存中的名称结点中,所以内存中名称节点大小决定了能够存储文件的数量。根据经验,每一个文件名,目录或块名占用150字节,所以如果你有一百万个文件,每一个文件占一块,你将至少需要300M内存空间。虽然存储几百万个文件是没问题的,但存储十亿个文件就超出了当前硬件能够容纳的数量。
  • 多个写入器,文件任意修改
    HDFS的文件只有一个写入器,并且只在文件结束时以追加的方式写入。不支持多个写入器或者能在文件当中任意一个位置修改。这也许在将来被支持,但它们很可能相对低效一些。

名称节点和数据节点

块缓存

HDFS联盟

HDFS高可用性

  • 两个名称节点必须能够使用高速访问的存储空间共享更改日志。当备用的名称节点运行的时候,它会读取更改日志所有内容,并同步状态,然后当激活名称节点写入新内容时,再读取新的状态同步。
  • 数据节点必须将块报告发送给这两个名称节点,因为块之间的映射关系存储在名称节点内存中,而不是在硬盘中。
  • 使用一种对用户透明的机制,客户端必须要被设置成能够处理名称节点的失败后的备援。
  • 这个备用的名称节点包含了第二节点的角色,会对激活的名称节点中的命名空间进行定期检查。

失败备援(Failover)和筑围(Fencing)

基本的文件系统操作

% hadoop fs -copyFromLocal input/docs/quangle.txt  \
hdfs://localhost/user/tom/quangel.txt
% hadoop fs  -copyFromLocal input/docs/quangle.txt /user/tom/quangle.txt
% hadoop fs -copyFromLocal input/docs/quangle.txt quangle.txt
% hadoop fs -copyToLocal quangle.txt quangle.copy.txt
%md5 input/docs/quangle.txt quangle.copy.txt
MD5 (input/docs/quangle.txt) = e7891a2627cf263a079fb0f18256ffb2
MD5 (quangle.copy.txt) = e7891a2627cf263a079fb0f18256ffb2
% hadoop fs -mkdir books
% hadoop fs -ls
drwxr-xr-x - tom supergroup 0 2014-10-04 13:22 books
-rw-r--r-- 1 tom supergroup 119 2014-10-04 13:21 quangle.txt
文件系统 URI协议 Java的实现(所有类在包org.apache.hadoop下) 描述
Local file fs.LocalFileSystem 一个用于本地的具体客户端校验硬盘的文件系统。对于没有校验的硬盘使用RawLocal FileSystem。见"本地文件系统"
HDFS hdfs hdfs.DistributedFileSystem Hadoop的分布式文件系统。HDFS被设计用于和MapReduce连接进行高效地工作
WebHDFS webhdfs hdfs.web.WebHdfsFileSystem 提供对基于HTTP读写HDFS进行权限验证的文件系统,见"HTTP"
安全的WebHDFS swebhdfs hdfs.web.SWebHdfsFileSystem WebHDFS的HTTPS版本
HAR har fs.HarFileSystem 在另一个文件系统之上的一个文件系统,用于归档文件。Hadoop归档用于将HDFS中的文件打包归档进一个文件中,以减少名称节点所占的内存。使用hadoop archive命令创建HAR文件
View viewfs viewfs.ViewFileSystem 一个客户端挂载表,作用于另外一个Hadoop文件系统,通常用于对联盟名称节点创建挂载点。见"HDFS联盟"
FTP ftp fs.ftp.FTPFileSystem 基于FTP服务的文件系统
S3 s3a fs.s3a.S3AFileSystem 基于Amazon S3的文件系统,代替旧的s3n(S3 native)
Azure wasb fs.azure.NativeAzureFileSystem 基于微软Azure的文件系统
Swift swift fs.swift.snative.SwiftNativeFile 基于OpenStack Swift 的文件系统
% hadoop fs -ls file:///

接口

HTTP

C

NFS

FUSE

从Hadoop URL读取数据

InputStream in = null;
try {
   in = new URL("hdfs://host/path").openStream();
// process in
} finally {
    IOUtils.closeStream(in);
}
public class URLCat {
   static {
    URL.setURLStreamHandlerFactory(new 
    FsUrlStreamHandlerFactory());
     }
public static void main(String[] args) throws 
         Exception {
              InputStream in = null;
              try {
                 in = new URL(args[0]).openStream();
                 IOUtils.copyBytes(in,System.out,4096,false);
                } finally {
                   IOUtils.closeStream(in);
                }
           }
   }
% export HADOOP_CLASSPATH=hadoop-examples.jar
% hadoop URLCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,But his face you could not see,On account of his Beaver Hat.

使用FileSystem API读取数据

public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri,Configuration conf) throws IOException
public static FileSystem get(URI uri,Configuration conf,String user)
throws IOException
public static LocalFileSystem getLocal(Configuration conf) throws IOException
public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStream open(Path f,int bufferSize) throws IOException
示例:3-2
public class FileSystemCat {
    public static void main(String[] args) throws Exception {
        String uri = args[0];
         Configuration conf = new Configuration();
         FileSystem fs = FileSystem.get(URI.create(uri),conf);
          InputStream in = null;
          try {
            in = fs.open(new Path(uri));
            IOUtils.copyBytes(in,false);
          } finally {
            IOUtils.closeStream(in);
         }
     }
}
% hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,On account of his Beaver Hat.

FSDataInputStream

package org.apache.hadoop.fs;
public class FSDataInputStream extends DataInputStream
implements Seekable,PositionedReadable {
// 实现部分省略
}
public interface Seekable {
    void seek(long pos) throws IOException;
     long getPos() throws IOException;
}
示例:3-3
public class FileSystemDoubleCat {
      public static void main(String[] args) throws Exception {
          String uri = args[0];
          Configuration conf = new Configuration();
          FileSystem fs = FileSystem.get(URI.create(uri),conf);
          FSDataInputStream in = null;
           try {
                  in = fs.open(new Path(uri));
                  IOUtils.copyBytes(in,false);
                   in.seek(0); // 返回到文件起始位置
                    IOUtils.copyBytes(in,false);
            } finally {
                    IOUtils.closeStream(in);
            }
      }
}
% hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,On account of his Beaver Hat.
On the top of the Crumpetty Tree
The Quangle Wangle sat,On account of his Beaver Hat.
public interface PositionedReadable {
public int read(long position,byte[] buffer,int offset,int length)
throws IOException;
public void readFully(long position,byte[] buffer) throws IOException;
}

写数据

public FSDataOutputStream create(Path f) throws IOException
package org.apache.hadoop.util;
public interface Progressable {
     public void progress();
}
public FSDataOutputStream append(Path f) throws IOException
public class FileCopyWithProgress {
      public static void main(String[] args) throws Exception {
          String localSrc = args[0];
          String dst = args[1];
          InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
          Configuration conf = new Configuration();
          FileSystem fs = FileSystem.get(URI.create(dst),conf);
          OutputStream out = fs.create(new Path(dst),new  Progressable() {
                                      public void progress() {
                                         System.out.print(".");
                                      }
                                    });
          IOUtils.copyBytes(in,out,true);
     }
}
调用示范:
% hadoop FileCopyWithProgress input/docs/1400-8.txt
hdfs://localhost/user/tom/1400-8.txt
.................

FSDataOutputStream

package org.apache.hadoop.fs;
 public class FSDataOutputStream extends DataOutputStream implements Syncable {
    public long getPos() throws IOException {
      // implementation elided
   }
   // implementation elided
 }

目录

public boolean mkdirs(Path f) throws IOException

文件系统查询

public class ShowFileStatusTest {
     private MiniDFSCluster cluster; // use an in-process HDFS cluster for testing
     private FileSystem fs;

    @Before
    public void setUp() throws IOException {
        Configuration conf = new Configuration();
        if (System.getProperty("test.build.data") == null) {
             System.setProperty("test.build.data","/tmp");
        }
        cluster = new MiniDFSCluster.Builder(conf).build();
        fs = cluster.getFileSystem();
       OutputStream out = fs.create(new Path("/dir/file"));
       out.write("content".getBytes("UTF-8"));
       out.close();
     }
  
  @After
  public void tearDown() throws IOException {
   if (fs != null) { fs.close(); }
     if (cluster != null) { cluster.shutdown(); }
   }

 @Test(expected = FileNotFoundException.class) 
  public void throwsFileNotFoundForNonExistentFile() throws                IOException {
         fs.getFileStatus(new Path("no-such-file"));
   }

   @Test
   public void fileStatusForFile() throws IOException {
      Path file = new Path("/dir/file");
      FileStatus stat = fs.getFileStatus(file);
      assertThat(stat.getPath().toUri().getPath(),is("/dir/file"));
      assertThat(stat.isDirectory(),is(false));
      assertThat(stat.getLen(),is(7L));
      assertThat(stat.getModificationTime(),is(lessThanOrEqualTo(System.currentTimeMillis())));
      assertThat(stat.getReplication(),is((short) 1));
      assertThat(stat.getBlockSize(),is(128 * 1024 * 1024L)); assertThat(stat.getOwner(),is(System.getProperty("user.name")));
    assertThat(stat.getGroup(),is("supergroup"));
    assertThat(stat.getPermission().toString(),is("rw-r--r--"));
  }

  @Test
  public void fileStatusForDirectory() throws IOException {
     Path dir = new Path("/dir");
     FileStatus stat = fs.getFileStatus(dir);
    assertThat(stat.getPath().toUri().getPath(),is("/dir"));
    assertThat(stat.isDirectory(),is(true));
    assertThat(stat.getLen(),is(0L));
    assertThat(stat.getModificationTime(),is(lessThanOrEqualTo(System.currentTimeMillis())));
    assertThat(stat.getReplication(),is((short) 0));
    assertThat(stat.getBlockSize(),is(0L));
    assertThat(stat.getOwner(),is("rwxr-xr-x"));
  }
}
public boolean exists(Path f) throws IOException
public FileStatus[] listStatus(Path f) throws IOException
public FileStatus[] listStatus(Path f,PathFilter filter) throws IOException
public FileStatus[] listStatus(Path[] files) throws IOException
public FileStatus[] listStatus(Path[] files,PathFilter filter)
throws IOException
示例:3-6 显示来自Hadoop文件系统中多个路径的文件状态使用示例
public class ListStatus {
    public static void main(String[] args) throws Exception {
       String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri),conf);
        Path[] paths = new Path[args.length];
        for (int i = 0; i < paths.length; i++) {
            paths[i] = new Path(args[i]);
        }
       FileStatus[] status = fs.listStatus(paths);
       Path[] listedPaths = FileUtil.stat2Paths(status);
        for (Path p : listedPaths) {
          System.out.println(p);
     }
   }
}

运行结果如下:
% hadoop ListStatus hdfs://localhost/    hdfs://localhost/user/tom
hdfs://localhost/user
hdfs://localhost/user/tom/books
hdfs://localhost/user/tom/quangle.txt
public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern,PathFilter filter)
throws IOException
通配符 名称 匹配项
* 星号 匹配零或多个字符
? 问号 匹配单个字符
[ab] 字符集 匹配在集合{a,b}中的某个字符
[^ab] 排除字符集 匹配不在集合{a,b}中单个的字符
[a-b] 字符范围 匹配在范围[a,b]内的单个字符,a要小于或等于b
[^a-b] 排除字符范围 匹配不在范围[a,b]内的单个字符,a要小于等于b
{a,b} 二选一 匹配表达式a或b中一个
\c 转义字符 当c是特殊字符时,使用\c匹配c字符
通配符 匹配结果
/* /2007 /2008
// /2007/12 /2008/01
//12/ /2007/12/30 /2007/12/31
/200? /2007 /2008
/200[78] /2007 /2008
/200[7-8] /2007 /2008
/200[^01234569] /2007 /2008
///{31,01} /2007/12/31 /2008/01/01
///3{0,1} /2007/12/30 /2007/12/31
/*/{12/31,01/01} /2007/12/31 /2008/01/01
package org.apache.hadoop.fs;
public interface PathFilter {
    boolean accept(Path path);
}
示例:3-7
public class RegexExcludePathFilter implements PathFilter {
  private final String regex;
  public RegexExcludePathFilter(String regex) {
      this.regex = regex;
   }
  public boolean accept(Path path) {
      return !path.toString().matches(regex);
   }
}
fs.globStatus(new Path("/2007/*/*"),new RegexExcludeFilter("^.*/2007/12/31$"))

删除数据

public boolean delete(Path f,boolean recursive) throws IOException
本文链接:http://www.viiis.cn/news/show_23574.html

本站采用系统自动发货方式,付款后即出现下载入口,如有疑问请咨询在线客服!

售后时间:早10点 - 晚11:30点

咨询售后客服

服务热线 19970861797
服务热线 19970861797服务热线 19970861797
手机二维码
返回顶部
返回顶部返回顶部