HDFS源码分析:“-put”到底做了些什么(客户端)

By | 2018年12月1日

        源码分析的目的不是为了扣代码中一字一句,更不是为了背记里面的实现细节,而是为了从源码中发现一种实现的思路与框架,有个宏观的把握,当碰到某个细节时,回过去定位某个细节,再咀嚼里边的代码。我们本着这个思路去窥探HDFS底层相关实现。

当我们键入命令:

bin/hadoop fs -put /home/dcc/data/1.3g.rar /rs1/g.rar

往HDFS集群存数据时,HDFS的客户端到底做了些什么?

总体做了如下工作:

解析-put命令;

建立本地文件输入流;

建立针对HDFS集群的远程文件输出流;

在输入流与输出流中拷贝数据;

下面对每个步骤进行详细的分析。

一、解析-put命令

有关fs的处理命令统一由FsShell类处理:

  //org.apache.hadoop.fs.FsShell
  public static void main(String argv[]) throws Exception {
    FsShell shell = newShellInstance();
    int res;
    try {
      res = ToolRunner.run(shell, argv);
    } finally {
      shell.close();
    }
    System.exit(res);
  }

上面进入shell的run()方法,进一步跟踪:

  //org.apache.hadoop.fs.FsShell
  public int run(String argv[]) throws Exception {
    // initialize FsShell
    init();

    int exitCode = -1;
    if (argv.length < 1) {
      printUsage(System.err);
    } else {
      String cmd = argv[0];
      Command instance = null;
      try {
        //根据传入命令,取出对应命令的实例类。
        //跟踪调试时发现instance的类为:org.apache.hadoop.fs.shell.CopyCommands$Put@65694ee6(CopyCommands里边的内部类Put)
        instance = commandFactory.getInstance(cmd);
        if (instance == null) {
          throw new UnknownCommandException();
        }
        //进入run方法。
        exitCode = instance.run(Arrays.copyOfRange(argv, 1, argv.length));
      } catch (IllegalArgumentException e) {
        displayError(cmd, e.getLocalizedMessage());
        if (instance != null) {
          printInstanceUsage(System.err, instance);
        }
      } catch (Exception e) {
        // instance.run catches IOE, so something is REALLY wrong if here
        LOG.debug("Error", e);
        displayError(cmd, "Fatal internal error");
        e.printStackTrace(System.err);
      }
    }
    return exitCode;
  }

上面就完成了对-put命令的解析。这个解析过程对于fs下所有命令都是相同的。
进一步跟踪代码,进入了逻辑主体的实现入口:

  //org.apache.hadoop.fs.shell.Command
  public int run(String...argv) {
    LinkedList<String> args = new LinkedList<String>(Arrays.asList(argv));
    try {
      if (isDeprecated()) {
        displayWarning(
            "DEPRECATED: Please use '"+ getReplacementCommand() + "' instead.");
      }
      //处理逻辑隐藏在这两行代码里边。
      processOptions(args);
      processRawArguments(args);
    } catch (IOException e) {
      displayError(e);
    }
    
    return (numErrors == 0) ? exitCode : exitCodeForError();
  }

上面的Commad类是所有命令的父类,Hadoop自带的命令几乎都是它的子类,如图:


进入逻辑处理的主体:

  //org.apache.hadoop.fs.shell.CopyCommands
    protected void processArguments(LinkedList<PathData> args)
    throws IOException {
      // NOTE: this logic should be better, mimics previous implementation
      if (args.size() == 1 && args.get(0).toString().equals("-")) {
        copyStreamToTarget(System.in, getTargetPath(args.get(0)));
        return;
      }
      //处理完参数后,所有重要的逻辑这个方法入口的里边。
      super.processArguments(args);
    }

由于Command类具有一棵巨大的子类树,为了代码共用,整个调用过程从父类到子类,再子类,又到父类,上下蠢动。单纯看源代码很难看出调用路径,借用Eclipse单步跟踪调试才能摸清整个调用过程。下面进去方法的关键入口:

  //org.apache.hadoop.fs.shell.Command
  protected void processPaths(PathData parent, PathData ... items)
  throws IOException {
    // TODO: this really should be iterative
    for (PathData item : items) {
      try {
        //对于-put命令,这个方法入口进入,就是传输数据的逻辑主体。
        processPath(item);
        if (recursive && item.stat.isDirectory()) {
          recursePath(item);
        }
        //数据上传完成后,这里进去做具体的收尾工作。
        postProcessPath(item);
      } catch (IOException e) {
        displayError(e);
      }
    }
  }

进一步跟踪进入到执行拷贝数据的关键入口:

  //org.apache.hadoop.fs.shell.CommandWithDestination
  protected void processPath(PathData src, PathData dst) throws IOException {
    if (src.stat.isSymlink()) {
      // TODO: remove when FileContext is supported, this needs to either
      // copy the symlink or deref the symlink
      throw new PathOperationException(src.toString());        
    } else if (src.stat.isFile()) {
      //这里进入拷贝数据流程,从src路径拷往dst。
      copyFileToTarget(src, dst);
    } else if (src.stat.isDirectory() && !isRecursive()) {
      throw new PathIsDirectoryException(src.toString());
    }
  }

二、建立本地文件输入流

  //org.apache.hadoop.fs.shell.CommandWithDestination
 protected void copyStreamToTarget(InputStream in, PathData target)
  throws IOException {
    if (target.exists && (target.stat.isDirectory() || !overwrite)) {
      throw new PathExistsException(target.toString());
    }
    TargetFileSystem targetFs = new TargetFileSystem(target.fs);
    try {
      PathData tempTarget = target.suffix("._COPYING_");
      targetFs.setWriteChecksum(writeChecksum);
      //至此,已经建立的关键的输入流。
      targetFs.writeStreamToFile(in, tempTarget);
      //当数据传输完成后,重命名(拷贝的过程中可以发现在目标目录先有个“xxx.<pre name="code" class="java">._COPYING_

的文件”)。
targetFs.rename(tempTarget, target); } finally { targetFs.close(); // last ditch effort to ensure temp file is removed } }


三、建立针对HDFS集群的远程文件输出流

进一步跟踪,看看底层是如何拷贝流的:

  //org.apache.hadoop.fs.shell.CommandWithDestination
    void writeStreamToFile(InputStream in, PathData target) throws IOException {
      FSDataOutputStream out = null;
      try {
        //建立输出流(到HDFS集群上的流)
        out = create(target);
        //由工具类IOUtils执行具体的拷贝工作。
        IOUtils.copyBytes(in, out, getConf(), true);
      } finally {
        IOUtils.closeStream(out); // just in case copyBytes didn't
      }
    }

关键点就到了,至此为止已经建立了输入输流,并且开始传送数据,并未看书整个过程有任何特别,而熟知的HDFS底层复杂的数据传输逻辑隐藏在哪里的呢?

四、在输入流与输出流中拷贝数据

进一步进入IOUtils.copyBytes()方法,看看有没啥玄妙之处:

  //org.apache.hadoop.io.IOUtils
   public static void copyBytes(InputStream in, OutputStream out, int buffSize) 
    throws IOException {
    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
    //buffsize为缓冲区大小。
    byte buf[] = new byte[buffSize];
    //从输入流中读入一个缓冲区的字节。
    int bytesRead = in.read(buf);
    while (bytesRead >= 0) {
      //再把缓冲区里的数据循环写出到输出流中。
      out.write(buf, 0, bytesRead);
      if ((ps != null) && ps.checkError()) {
        throw new IOException("Unable to write to output stream.");
      }
      bytesRead = in.read(buf);
    }
  }

进一步跟踪write()方法进去:

  //org.apache.hadoop.fs.FSOutputSummer
  public synchronized void write(byte b[], int off, int len)
      throws IOException {
    
    checkClosed();
    
    if (off < 0 || len < 0 || off > b.length - len) {
      throw new ArrayIndexOutOfBoundsException();
    }
    至此,具体的写的工作落实到write1()方法上。
    for (int n=0;n<len;n+=write1(b, off+n, len-n)) {
    }
  }

跟踪进去write1()发现:

  //org.apache.hadoop.fs.FSOutputSummer
  private int write1(byte b[], int off, int len) throws IOException {
    if(count==0 && len>=buf.length) {
      // local buffer is empty and user data has one chunk
      // checksum and output data
      final int length = buf.length;
      sum.update(b, off, length);
      //这里是执行数据的主体逻辑,负责构建与传输源始数据与校验和数据。
      writeChecksumChunk(b, off, length, false);
      return length;
    }
    
    // copy user data to local buffer
    int bytesToCopy = buf.length-count;
    bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
    sum.update(b, off, bytesToCopy);
    System.arraycopy(b, off, buf, count, bytesToCopy);
    count += bytesToCopy;
    if (count == buf.length) {
      // local buffer is full
      flushBuffer();
    } 
    return bytesToCopy;
  }

进一步跟踪writeChecksumChunk()方法:

<span style="font-size:18px;">  //org.apache.hadoop.fs.FSOutputSummer
  private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
  throws IOException {
    int tempChecksum = (int)sum.getValue();
    if (!keep) {
      sum.reset();
    }
    //取得之前计算的检验和。
    int2byte(tempChecksum, checksum);
    //进入写数据入口。
    writeChunk(b, off, len, checksum);
  }</span>

进一步跟踪writeChunk()方法:

  //org.apache.hadoop.hdfs.DFSOutputStream
  //往HDFS集群写数据,最关键的逻辑主体落实到这个方法。
  protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum) 
                                                        throws IOException {
    //检查远程文件(流)是否打开。
    dfsClient.checkOpen();
    //检查是否关闭。
    checkClosed();

    int cklen = checksum.length;
    int bytesPerChecksum = this.checksum.getBytesPerChecksum(); 
    if (len > bytesPerChecksum) {
      throw new IOException("writeChunk() buffer size is " + len +
                            " is larger than supported  bytesPerChecksum " +
                            bytesPerChecksum);
    }
    if (checksum.length != this.checksum.getChecksumSize()) {
      throw new IOException("writeChunk() checksum size is supposed to be " +
                            this.checksum.getChecksumSize() + 
                            " but found to be " + checksum.length);
    }

    if (currentPacket == null) {
      //构建一个Packet,Packet是HDFS上传输数据的单元,有一个或多个Chunk构成。
      currentPacket = new Packet(packetSize, chunksPerPacket, 
          bytesCurBlock);
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
            currentPacket.seqno +
            ", src=" + src +
            ", packetSize=" + packetSize +
            ", chunksPerPacket=" + chunksPerPacket +
            ", bytesCurBlock=" + bytesCurBlock);
      }
    }
    //往Packet包里写校验和与数据。
    currentPacket.writeChecksum(checksum, 0, cklen);
    currentPacket.writeData(b, offset, len);
    currentPacket.numChunks++;
    bytesCurBlock += len;

    // If packet is full, enqueue it for transmission
    // 当一个Packet里保存了足够多的Chunk进入下面的发送数据流程。
    if (currentPacket.numChunks == currentPacket.maxChunks ||
        bytesCurBlock == blockSize) {
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
            currentPacket.seqno +
            ", src=" + src +
            ", bytesCurBlock=" + bytesCurBlock +
            ", blockSize=" + blockSize +
            ", appendChunk=" + appendChunk);
      }
      // 这里是数据发送处理的关键入口。
      waitAndQueueCurrentPacket();

      // If the reopened file did not end at chunk boundary and the above
      // write filled up its partial chunk. Tell the summer to generate full 
      // crc chunks from now on.
      if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
        appendChunk = false;
        resetChecksumChunk(bytesPerChecksum);
      }

      if (!appendChunk) {
        int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
        computePacketChunkSize(psize, bytesPerChecksum);
      }
      //
      // if encountering a block boundary, send an empty packet to 
      // indicate the end of block and reset bytesCurBlock.
      //
      if (bytesCurBlock == blockSize) {
        currentPacket = new Packet(0, 0, bytesCurBlock);
        currentPacket.lastPacketInBlock = true;
        currentPacket.syncBlock = shouldSyncBlock;
        waitAndQueueCurrentPacket();
        bytesCurBlock = 0;
        lastFlushOffset = 0;
      }
    }
  }

进一步进入发送数据的关键入口:waitAndQueueCurrentPacket()方法

  //org.apache.hadoop.hdfs.DFSOutputStream
  private void waitAndQueueCurrentPacket() throws IOException {
    synchronized (dataQueue) {
      // If queue is full, then wait till we have enough space
      while (!closed && dataQueue.size() + ackQueue.size()  > MAX_PACKETS) {
        try {
          dataQueue.wait();
        } catch (InterruptedException e) {
          // If we get interrupted while waiting to queue data, we still need to get rid
          // of the current packet. This is because we have an invariant that if
          // currentPacket gets full, it will get queued before the next writeChunk.
          //
          // Rather than wait around for space in the queue, we should instead try to
          // return to the caller as soon as possible, even though we slightly overrun
          // the MAX_PACKETS iength.
          Thread.currentThread().interrupt();
          break;
        }
      }
      checkClosed();
      //把当前Packet添加进去发送队列。
      queueCurrentPacket();
    }
  }

进一步进入queueCurrentPacket()方法:

  //org.apache.hadoop.hdfs.DFSOutputStream
  private void queueCurrentPacket() {
    //dataQueue为一个队列,存储要发送的Packet。
    synchronized (dataQueue) {
      if (currentPacket == null) return;
      //把当前Packet添加进去队列。
      dataQueue.addLast(currentPacket);
      lastQueuedSeqno = currentPacket.seqno;
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Queued packet " + currentPacket.seqno);
      }
      //清零当前Packet,使得拷贝数据能够继续。
      currentPacket = null;
      //唤醒持有dataQueue的其他线程。
      dataQueue.notifyAll();
    }
  }

至此,把构建的Packet放入dataQueue队列,本文开始的四个步骤基本完成。

但到现在为止,还为看到具体发送数据到HDFS集群上的代码。

从源代码分析把数据拷贝进去dataQueue后,没看到其他操作,并且dataQueue是一个共享变量,必然自然让人联想到后台应该有其他线程负责把dataQueue里边的数据发送,搜索对dataQueue的操作,发现在DFSOutputStream有一个DataStreamer类,而此类继承自Daemon具有做后台线程的天然条件。而在DataStreamer类的run()方法里实现了后台发送数据的主要逻辑:

  //org.apache.hadoop.hdfs.DFSOutputStream
    @Override
    public void run() {
      long lastPacket = Time.now();
      //这里循环执行发送数据。
      while (!streamerClosed && dfsClient.clientRunning) {

        // if the Responder encountered an error, shutdown Responder
        if (hasError && response != null) {
          try {
            response.close();
            response.join();
            response = null;
          } catch (InterruptedException  e) {
          }
        }

        Packet one = null;

        try {
          // process datanode IO errors if any
          boolean doSleep = false;
          if (hasError && errorIndex>=0) {
            doSleep = processDatanodeError();
          }
          //同步dataQueue
          synchronized (dataQueue) {
            // wait for a packet to be sent.
            long now = Time.now();
            while ((!streamerClosed && !hasError && dfsClient.clientRunning 
                && dataQueue.size() == 0 && 
                (stage != BlockConstructionStage.DATA_STREAMING || 
                 stage == BlockConstructionStage.DATA_STREAMING && 
                 now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
              long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
              timeout = timeout <= 0 ? 1000 : timeout;
              timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
                 timeout : 1000;
              try {
                dataQueue.wait(timeout);
              } catch (InterruptedException  e) {
              }
              doSleep = false;
              now = Time.now();
            }
            if (streamerClosed || hasError || !dfsClient.clientRunning) {
              continue;
            }
            // get packet to be sent.
            if (dataQueue.isEmpty()) {
              one = new Packet();  // heartbeat packet
            } else {
              //取出一个待发送的Packet
              one = dataQueue.getFirst(); // regular data packet
            }
          }
          assert one != null;

          // get new block from namenode.从namenode中申请block空间。
          if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
            if(DFSClient.LOG.isDebugEnabled()) {
              DFSClient.LOG.debug("Allocating new block");
            }
            // 获得datanode信息,并且建立到datannode的链接,以及申请block。
            nodes = nextBlockOutputStream(src);
            initDataStreaming();
          } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
            if(DFSClient.LOG.isDebugEnabled()) {
              DFSClient.LOG.debug("Append to block " + block);
            }
            setupPipelineForAppendOrRecovery();
            initDataStreaming();
          }

          long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
          if (lastByteOffsetInBlock > blockSize) {
            throw new IOException("BlockSize " + blockSize +
                " is smaller than data size. " +
                " Offset of packet in block " + 
                lastByteOffsetInBlock +
                " Aborting file " + src);
          }

          if (one.lastPacketInBlock) {
            // wait for all data packets have been successfully acked
            synchronized (dataQueue) {
              while (!streamerClosed && !hasError && 
                  ackQueue.size() != 0 && dfsClient.clientRunning) {
                try {
                  // wait for acks to arrive from datanodes
                  dataQueue.wait(1000);
                } catch (InterruptedException  e) {
                }
              }
            }
            if (streamerClosed || hasError || !dfsClient.clientRunning) {
              continue;
            }
            stage = BlockConstructionStage.PIPELINE_CLOSE;
          }
          
          // send the packet
          synchronized (dataQueue) {
            // move packet from dataQueue to ackQueue
            if (!one.isHeartbeatPacket()) {
              dataQueue.removeFirst();
              ackQueue.addLast(one);
              dataQueue.notifyAll();
            }
          }

          if (DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug("DataStreamer block " + block +
                " sending packet " + one);
          }

          // write out data to remote datanode
          try {
            //把Packet数据写往datanode上的block流。
            one.writeTo(blockStream);
            blockStream.flush();   
          } catch (IOException e) {
            // HDFS-3398 treat primary DN is down since client is unable to 
            // write to primary DN 
            errorIndex = 0;
            throw e;
          }
          lastPacket = Time.now();
          
          if (one.isHeartbeatPacket()) {  //heartbeat packet
          }
          
          // update bytesSent
          long tmpBytesSent = one.getLastByteOffsetBlock();
          if (bytesSent < tmpBytesSent) {
            bytesSent = tmpBytesSent;
          }

          if (streamerClosed || hasError || !dfsClient.clientRunning) {
            continue;
          }

          // Is this block full?
          if (one.lastPacketInBlock) {
            // wait for the close packet has been acked
            synchronized (dataQueue) {
              while (!streamerClosed && !hasError && 
                  ackQueue.size() != 0 && dfsClient.clientRunning) {
                dataQueue.wait(1000);// wait for acks to arrive from datanodes
              }
            }
            if (streamerClosed || hasError || !dfsClient.clientRunning) {
              continue;
            }

            endBlock();
          }
          if (progress != null) { progress.progress(); }

          // This is used by unit test to trigger race conditions.
          if (artificialSlowdown != 0 && dfsClient.clientRunning) {
            Thread.sleep(artificialSlowdown); 
          }
        } catch (Throwable e) {
          DFSClient.LOG.warn("DataStreamer Exception", e);
          if (e instanceof IOException) {
            setLastException((IOException)e);
          }
          hasError = true;
          if (errorIndex == -1) { // not a datanode error
            streamerClosed = true;
          }
        }
      }
      closeInternal();
    }

进一步跟踪nextBlockOutputStream()方法,里边实现了获得datanode信息,与datanode建立连接的相关信息:

  //org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer
    private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
      LocatedBlock lb = null;
      DatanodeInfo[] nodes = null;
      int count = dfsClient.getConf().nBlockWriteRetry;
      boolean success = false;
      ExtendedBlock oldBlock = block;
      do {
        hasError = false;
        lastException = null;
        errorIndex = -1;
        success = false;

        long startTime = Time.now();
        DatanodeInfo[] excluded =
            excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
            .keySet()
            .toArray(new DatanodeInfo[0]);
        block = oldBlock;
        //向namenode节点申请block空间(相关元信息)。
        lb = locateFollowingBlock(startTime,
            excluded.length > 0 ? excluded : null);
        block = lb.getBlock();
        block.setNumBytes(0);
        accessToken = lb.getBlockToken();
        //获得保存此block的datanode信息。
        nodes = lb.getLocations();

        //
        // Connect to first DataNode in the list.
        //创建到第一个datanode的数据传输流。
        success = createBlockOutputStream(nodes, 0L, false);

        if (!success) {
          DFSClient.LOG.info("Abandoning " + block);
          dfsClient.namenode.abandonBlock(block, src, dfsClient.clientName);
          block = null;
          DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
          excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
        }
      } while (!success && --count >= 0);

      if (!success) {
        throw new IOException("Unable to create new block.");
      }
      return nodes;
    }

进一步跟踪“创建到datanode的block流”的那个比较中重要的createBlockOutputStream()方法:

  //org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer
    private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
        boolean recoveryFlag) {
      Status pipelineStatus = SUCCESS;
      String firstBadLink = "";
      if (DFSClient.LOG.isDebugEnabled()) {
        for (int i = 0; i < nodes.length; i++) {
          DFSClient.LOG.debug("pipeline = " + nodes[i]);
        }
      }

      // persist blocks on namenode on next flush
      persistBlocks.set(true);

      int refetchEncryptionKey = 1;
      while (true) {
        boolean result = false;
        DataOutputStream out = null;
        try {
          assert null == s : "Previous socket unclosed";
          assert null == blockReplyStream : "Previous blockReplyStream unclosed";
          //建立到datanode的Socket链接。
          s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
          long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
          //建立基于Socket管道Chanel的输入输出流。
          OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
          InputStream unbufIn = NetUtils.getInputStream(s);
          if (dfsClient.shouldEncryptData()) {
            IOStreamPair encryptedStreams =
                DataTransferEncryptor.getEncryptedStreams(unbufOut,
                    unbufIn, dfsClient.getDataEncryptionKey());
            unbufOut = encryptedStreams.out;
            unbufIn = encryptedStreams.in;
          }
          //基于Socket的流向上封装。
          out = new DataOutputStream(new BufferedOutputStream(unbufOut,
              HdfsConstants.SMALL_BUFFER_SIZE));
          blockReplyStream = new DataInputStream(unbufIn);
  
          //
          // Xmit header info to datanode
          //
  
          // send the request构建一个发送器。
          new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
              nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, 
              nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
              cachingStrategy);
  
          // receive ack for connect收取校验信息。
          BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
              PBHelper.vintPrefixed(blockReplyStream));
          pipelineStatus = resp.getStatus();
          firstBadLink = resp.getFirstBadLink();
          
          if (pipelineStatus != SUCCESS) {
            if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
              throw new InvalidBlockTokenException(
                  "Got access token error for connect ack with firstBadLink as "
                      + firstBadLink);
            } else {
              throw new IOException("Bad connect ack with firstBadLink as "
                  + firstBadLink);
            }
          }
          assert null == blockStream : "Previous blockStream unclosed";
          blockStream = out;
          result =  true; // success
  
        } catch (IOException ie) {
          DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
          if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
            DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
                + "encryption key was invalid when connecting to "
                + nodes[0] + " : " + ie);
            // The encryption key used is invalid.
            refetchEncryptionKey--;
            dfsClient.clearDataEncryptionKey();
            // Don't close the socket/exclude this node just yet. Try again with
            // a new encryption key.
            continue;
          }
  
          // find the datanode that matches
          if (firstBadLink.length() != 0) {
            for (int i = 0; i < nodes.length; i++) {
              // NB: Unconditionally using the xfer addr w/o hostname
              if (firstBadLink.equals(nodes[i].getXferAddr())) {
                errorIndex = i;
                break;
              }
            }
          } else {
            errorIndex = 0;
          }
          hasError = true;
          setLastException(ie);
          result =  false;  // error
        } finally {
          if (!result) {
            IOUtils.closeSocket(s);
            s = null;
            IOUtils.closeStream(out);
            out = null;
            IOUtils.closeStream(blockReplyStream);
            blockReplyStream = null;
          }
        }
        return result;
      }
    }

上面方法泛函了很多底层信息。

上面的数据发送后台线程,在DFSOutputStream初始化时就被启动:

  //org.apache.hadoop.hdfs.DFSOutputStream
  //初始化DFSOutputStream流。
  static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
      FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
      short replication, long blockSize, Progressable progress, int buffersize,
      DataChecksum checksum, String[] favoredNodes) throws IOException {
    final HdfsFileStatus stat;
    try {
      stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
          new EnumSetWritable<CreateFlag>(flag), createParent, replication,
          blockSize);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
                                     DSQuotaExceededException.class,
                                     FileAlreadyExistsException.class,
                                     FileNotFoundException.class,
                                     ParentNotDirectoryException.class,
                                     NSQuotaExceededException.class,
                                     SafeModeException.class,
                                     UnresolvedPathException.class,
                                     SnapshotAccessControlException.class);
    }
    final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
        flag, progress, checksum, favoredNodes);
    //启动后台数据发送的线程。
    out.start();
    return out;
  }
  
  
  //启动后台数据发送的线程的start()方法。
  private synchronized void start() {
    streamer.start();
  }

至此由命令“-put”触发的整个流程就完成。上面的流程是在真实集群上单步调试得出的代码执行路径。应该上HDFS里边的类设计很合理,但每个类的调用路线比较纷乱,不通过单步调试,单纯看代码,很容易看走眼。当捋清了上面的代码执行路径,掌握他们的调用关系后,再去把握每个类的设计功用,以及相关类的层次关系。


发表评论