【Flume】【源码分析】flume中ExecSource源码的详细分析——执行终端命令获取数据

2018-11-30 18:44 
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/chiweitree/article/details/43523569

我们直接看该Source的start方法吧

public void start() {
    logger.info("Exec source starting with command:{}", command);

    executor = Executors.newSingleThreadExecutor();

    runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter,
        restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset);

    // FIXME: Use a callback-like executor / future to signal us upon failure.
    runnerFuture = executor.submit(runner);

    /*
     * NB: This comes at the end rather than the beginning of the method because
     * it sets our state to running. We want to make sure the executor is alive
     * and well first.
     */
    sourceCounter.start();
    super.start();

    logger.debug("Exec source started");
  }

启动了一个线程来运行,运行的详细过程看runner

它是一个线程,实现了Runnable接口,所以直接看它重写的run方法的逻辑,我们一块一块来看:

 if(shell != null) {
            String[] commandArgs = formulateShellCommand(shell, command);
            process = Runtime.getRuntime().exec(commandArgs);
          }  else {
            String[] commandArgs = command.split("\\s+");
            process = new ProcessBuilder(commandArgs).start();
          }
          reader = new BufferedReader(
              new InputStreamReader(process.getInputStream(), charset));

这里就是执行shell命令,并且将shell命令的输出结果作为输入流读到reader中,InputStreamReader是字节流通向字符流的桥梁,它使用指定的charset读取字节并将其解码为字符,每次调用read方法都会从底层输入流读取一个或多个字节。

 while ((line = reader.readLine()) != null) {
            synchronized (eventList) {
              sourceCounter.incrementEventReceivedCount();
              eventList.add(EventBuilder.withBody(line.getBytes(charset)));
              if(eventList.size() >= bufferCount || timeout()) {
                flushEventBatch(eventList);
              }
            }
          }

如果读入的内容非空,先同步eventList,如果eventList超出一定范围未做处理就会flush

还有一个flush的地方

 future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
              @Override
              public void run() {
                try {
                  synchronized (eventList) {
                    if(!eventList.isEmpty() && timeout()) {
                      flushEventBatch(eventList);
                    }
                  }
                } catch (Exception e) {
                  logger.error("Exception occured when processing event batch", e);
                  if(e instanceof InterruptedException) {
                      Thread.currentThread().interrupt();
                  }
                }
              }
          },
          batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);

这是启动了一个线程,每个3000毫秒去检查eventList,如果有event或者超过3秒存留的event,将会送到通道中

private void flushEventBatch(List<Event> eventList){

      channelProcessor.processEventBatch(eventList);
      sourceCounter.addToEventAcceptedCount(eventList.size());
      eventList.clear();
      lastPushToChannel = systemClock.currentTimeMillis();
    }

flush就是现将积攒下来的eventList中的event都处理掉,然后清空

1、将event都放入配置的通道中

 for (Event event : events) {
      List<Channel> reqChannels = selector.getRequiredChannels(event);

      for (Channel ch : reqChannels) {
        List<Event> eventQueue = reqChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          reqChannelQueue.put(ch, eventQueue);
        }
        eventQueue.add(event);
      }

      List<Channel> optChannels = selector.getOptionalChannels(event);

      for (Channel ch: optChannels) {
        List<Event> eventQueue = optChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          optChannelQueue.put(ch, eventQueue);
        }

        eventQueue.add(event);
      }
    }

这里就是将event放到通道中的详细过程了,但是这里大家注意到有两次selector的getchannel的方法,这是因为通道的选择器模式有两种:复用和复制

  if(restart) {
          logger.info("Restarting in {}ms, exit code {}", restartThrottle,
              exitCode);
          try {
            Thread.sleep(restartThrottle);
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
          }
        } else {
          logger.info("Command [" + command + "] exited with " + exitCode);
        }
      } while(restart);

restart参数的含义是,当shell命令执行的时候进程死了,是否重启该命令的进程,默认是false

配置为true的话,就会将刚才的所有代码循环一遍

总结:

1、event如何产出的?

 eventList.add(EventBuilder.withBody(line.getBytes(charset)));
 public static Event withBody(byte[] body, Map<String, String> headers) {
    Event event = new SimpleEvent();

    if(body == null) {
      body = new byte[0];
    }
    event.setBody(body);

    if (headers != null) {
      event.setHeaders(new HashMap<String, String>(headers));
    }

    return event;
  }

2、event如何放入通道?

private void flushEventBatch(List<Event> eventList){
      channelProcessor.processEventBatch(eventList);
      sourceCounter.addToEventAcceptedCount(eventList.size());
      eventList.clear();
      lastPushToChannel = systemClock.currentTimeMillis();
    }

发表评论

您必须 登录 才能发表留言!