大数据之hadoop【hdfs】

2018-11-26 18:53 

目录

    1、HDFS体系结构
    2、HDFS Shell操作
    3、HDFS Java API
    4、HDFS和RPC
    5、HDFS High Availability
    6、HDFS数据回收和简单运维

==============================================================

Hadoop
HDFS
Namenode
管理datanode,
接收client的操作
Datanode
只负责存储数据
Yarn
MapReduce
…storm spark flink
ResourceManager
从整体上对资源的一个管理和调度
NodeManager
具体的管理和调度

=================================

HDFS

HDFS特点
    分布式
    数据量越来越多,在一个操作系统管辖的范围存不下了,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,因此迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统 。
    高可用
        是一种允许文件通过网络在多台主机上分享的文件系统,可让多机器上的多用户分享文件和存储空间。
    通透性
        实际上是通过网络来访问文件的动作,由程序与用户看来,就像是访问本地的磁盘一般。
    容错
        即使系统中有某些节点脱机,整体来说系统仍然可以持续运作而不会有数据损失。
        分布式文件管理系统很多,hdfs只是其中一种,不合适小文件。
java中操作大文件的一个包:java.nio包

====================================================

NameNode
是整个文件系统的管理节点。它维护着整个文件系统的文件目录树,文件/目录的
元信息和每个文件对应的数据块列表。接收用户的操作请求。
文件包括:
1)fsimage:元数据镜像文件。存储某一时段NameNode内存元数据信息。
2)edits:操作日志文件。
3)fstime:保存最近一次checkpoint的时间
查看fsimage元数据信息可以通过命令将其转化为普通xml文件来进行查看
bin/hdfs oiv -p XML -i inputfile[具体的文件路径] -o outputfile
eg.
[root@master current]# pwd
/opt/hadoop-repo/name/current
current]# hdfs oiv -p XML -i fsimage_0000000000000000116 -o fsimage.xml
查看edits操作日志
bin/hdfs oev -p XML -i inputfile[具体的文件路径] -o outputfile
eg.
[root@master current]# pwd
/opt/hadoop-repo/name/current
current]# hdfs oiv -p XML -i edits_inprogress_0000000000000000119 -o edits.xml
DateNode
说明:datanode中的VERSION中的clusterID必须要和namenode中的VERSION中的clusterID保持一致。
不一致的原因:多次格式化造成(没有清空 name|data|secondary|tmp目录中的数据)
提供真实文件数据的存储服务。
文件块(block):最基本的存储单位。对于文件内容而言,一个文件的长度大小是size,那么从文件的0偏移开始,按照固定的大小,顺序对文件进行划分并编号,划分好的每一个块称一个Block。HDFS默认Block大小是128MB,以一个256MB文件,共有256/128=2个Block。
不同于普通文件系统的是,HDFS中,如果一个文件小于一个数据块的大小,并不占用整个数据块存储空间
Replication。多复本。默认是三个。

====================================================
HDFS shell
data]# hdfs dfs -appendToFile append.txt /hello 向/hello中追加内容append.txt
data]# hdfs dfs -cp /hello /hello1 将/hello拷贝到/hello1
HDFS JAVA

public class HDFStest {
    /*列出目录的内容 listStatus
      读取文件 open
      创建目 mkdirs
      创建文件 create
      显示文件存储位置getFileBlockLocations
     删除文件或目录 delete
    */
    //Permission    Owner   Group   Size    Replication Block Size  Name
    FileSystem fileSystem;
    Configuration configuration;
    @Before
    public  void setUp() throws Exception{
        URI uri=new URI("hdfs://master:9000/");
        configuration=new Configuration();
        fileSystem=FileSystem.get(uri,configuration);
//        System.out.println(fileSystem);
    }
    //获取文件列表状态
    @Test
    public  void  testListStatus()throws IOException{
//        根路径
        Path path=new Path("/");
        FileStatus[] fileStatuses = fileSystem.listStatus(path);
        for (FileStatus f: fileStatuses) {
            FsPermission permission = f.getPermission();
            String fp_prefix = "-";
            if (f.isDirectory()){
                fp_prefix = "d";
            }
            FsAction userAction=permission.getUserAction();
            FsAction groupAction = permission.getGroupAction();
            FsAction otherAction = permission.getOtherAction();
            String acl = fp_prefix+userAction.SYMBOL+groupAction.SYMBOL+otherAction.SYMBOL;
            String owner = f.getOwner();
            String group= f.getGroup();
            long size=f.getLen();
            short replication =f.getReplication();
            long blockSize = f.getBlockSize();
            String name = f.getPath().getName();
            String mTime = new SimpleDateFormat("yyyy-MM-dd hh:mm").format(new Date(f.getModificationTime()));
            System.out.println(acl + " " + replication + " " + owner + " " +
                    group + " " + size + " " + mTime + " " + name);
        }
    }
    //读取文件 open
    @Test
    public void testRead() throws IOException {
        Path path = new Path("/hello");
        FSDataInputStream fis = fileSystem.open(path);
//        BufferedReader br=new BufferedReader(new InputStreamReader(fis));
//        String  line=null;
//        while ((line = br.readLine())!=null)
//        {
//            System.out.println(line);
//        }
//        br.close();
//        第二种读取方法
        IOUtils.copyBytes(fis,System.out,1024,false);
        IOUtils.closeStream(fis);
    }
//   创建目录 mkdirs
     @Test
    public void testMkdir() throws IOException {
         Path path = new Path("/mutil-dir");
         boolean ret = fileSystem.mkdirs(path);
         Assert.assertEquals(true,ret);

     }
//     创建文件 create
    @Test
    public void testCreateFile() throws IOException {
        Path path = new Path("/mutil-dir1/readme.txt");
//        boolean ret = fileSystem.createNewFile(path);
//        Assert.assertEquals(true,ret);
        FSDataOutputStream fos = fileSystem.create(path);
        byte[] bytes= "你好三毛".getBytes();
        fos.write(bytes);
        fos.close();
        //推荐使用第二种
    }
    //显示文件存储位置 getFileBlockLocations
    @Test
    public void testLocation() throws IOException {
        Path path = new Path("/mutil-dir1/readme.txt");
        FileStatus fs=fileSystem.getFileStatus(path);
        long len = fs.getLen();
        BlockLocation[] locations = fileSystem.getFileBlockLocations(path,0,len);
        for (BlockLocation location: locations) {
            String[] hosts = location.getHosts();
            String[] names = location.getNames();
            long length = location.getLength();
            System.out.println(Arrays.toString(hosts));
            System.out.println(Arrays.toString(names));
            System.out.println("length "+ length);
        }
    }
    //删除文件或目录
    @Test
    public void testDelete() throws IOException {
        Path path = new Path("/mutil-dir1/");
        boolean ret = fileSystem.delete(path,true);
        Assert.assertEquals(true,ret);
    }

=====================================================

HDFS和RPC

Hadoop的整个体系结构就是构建在RPC之上的(org.apache.hadoop.ipc)。
RPC(Remote Procedure Call)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为了通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得包括网络分布式多程序在内的应用程序更加容易。
RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个带参数的调用信息到服务进程,然后等待响应信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获取进程结果,最后调用执行。

这里写图片描述

RPC代码演示

服务接口

package com.sanmao.hadoop_02.rpc;

import org.apache.hadoop.ipc.VersionedProtocol;


//要继承VersionedProtocol 这个通信协议
public interface IHelloService extends VersionedProtocol{
    long versionID=123456789L;
    public  String sayHello(String name);
    public String heartBeat(String status);
}

具体服务类

package com.sanmao.hadoop_02.rpc;

import org.apache.hadoop.ipc.ProtocolSignature;

import java.io.IOException;

/**
 * Created by kkk on 2016/10/21.
 */
public class HelloServiceImpl implements IHelloService{
    public String sayHello(String name) {
        System.out.println("hello 方法被调用了");
        return  "hello "+name;
    }

    public String heartBeat(String status) {
        System.out.println("心跳检测");
        return "心跳响应  "+ status;
    }

    public long getProtocolVersion(String s, long l) throws IOException {
        return IHelloService.versionID;
    }

    public ProtocolSignature getProtocolSignature(String s, long l, int i) throws IOException {
        //新建一个签名
        return new ProtocolSignature();
    }
}

RPC 服务器

package com.sanmao.hadoop_02.rpc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

import java.io.IOException;

public class RPCDriver {
    public static void main(String[] args) throws IOException {
        Configuration configuration = new Configuration();
        RPC.Builder builder = new RPC.Builder(configuration);
        RPC.Server server = builder.setBindAddress("localhost").setPort(8888)
                .setProtocol(IHelloService.class)
                .setInstance(new HelloServiceImpl()).build();
        server.start();
        System.out.println("RPC服务器开启");
    }
}

RPC 客户端

package com.sanmao.hadoop_02.rpc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

import java.io.IOException;
import java.net.InetSocketAddress;


public class RPCClient {
     /*Class<T> protocol,
        long clientVersion,
        InetSocketAddress addr,
        Configuration conf*/
    //RPC底层采取的就是TPC/IP协议
     public static void main(String[] args) throws IOException, InterruptedException {
         Configuration configuration= new Configuration();
         InetSocketAddress inetSocketAddress=new InetSocketAddress("localhost",8888);
         IHelloService proxy = RPC.getProxy(IHelloService.class, IHelloService.versionID, inetSocketAddress, configuration);
         //SOAP协议
         String result = proxy.sayHello("三毛");
         System.out.println(result);
         while (true){
             String ret = proxy.heartBeat(System.currentTimeMillis() + "");
             System.out.println(ret);
             Thread.sleep(3000);
         }
     }
}

HDFS调用之数据存储读文件解析

这里写图片描述

HDFS调用之数据存储写文件解析

这里写图片描述
三个关键的接口
ClientProtocol
是客户端(FileSystem)与NameNode通信的接口。
DatanodeProtocol
是DataNode与NameNode通信的接口。
NamenodeProtocol
是SecondaryNameNode与NameNode通信的接口

常见的HDFS运维

    修改回收站清空数据的时间
       在每个节点(不仅仅是主节点)上添加配置 core-site.xml,增加如下内容
       <property>
          <name>fs.trash.interval</name>
          <value>1440</value>
          <description>单位是分钟</description>
       </property>
    hdfs dfsadmin -safemode leave ——>hdfs离开受保护模式
    查看磁盘内容
        df -lh /
        查看具体目录下面文件大小
        du -lh --max-depth=1 path
    查看hdfs
        hdfs dfs -du -lh hdfs_path
        hdfs dfs -df hdfs_path
    查看磁盘健康状况
        hdfs fsck -blocks hdfs_path

发表评论

您必须 登录 才能发表留言!