用Python实现Hadoop实时作业状态监控

2018-12-07 08:39 
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/GitzLiu/article/details/54137907



基于Python的Hadoop实时作业状态监控


前言:

  任务需要,要求完成这么一个程序,恰好博主以前在虚拟机上部署过hadoop,但是部署完后一直没用过,这次就来尝试下吧。

进入正题:

一、环境及工具:

ubuntu14.04 LTS
Hadoop
Python
PycURL


二、关于 API

  先把语言放在一边,要想监控hadoop的作业状态,那hadoop至少要提供相应的API 吧,上官网一通猛翻,果然找到了,Hadoop RESTful API,
   [ Hadoop RESTful API ]
  这个链接比官网要详细一些,里面提示了hadoop提供了curl工具来使用它的API。curl是什么呢,直接上官网去看WebHDFS REST API例子吧,传送门如下:
  [ WebHDFS REST API ]
  不过看完上述两个链接,我还是不知道如何开始写我的程序,继续在官网奋战,终于找到了这个简单粗暴实用的API:hadoop提供了ResourceManager REST API。
恩,链接放在下面,让各位少走弯路(说多都是泪)。
  [ ResourceManager REST API ]
  API在手,我们就要考虑下如何利用这个资源了,前面说了要利用curl这个工具,但我们是用Python来实现这个监控程序,Python怎么来调用curl呢?带着这个疑问在网上走弯路,神奇的工具出现了,pycurl库,恩,看名字我想大家也明白了,前人的智慧不服不行。

PycURL is a Python interface to libcurl

关于pycurl的使用方法,传送门如下:
  [ PycURL库使用方法 ]

在环境中安装pycurl,安装方法如下:
   [ PycURL库安装 ]
  
  

三、利用PycURL获取json数据

  好了万事具备,我们可以上代码了:

    b = StringIO.StringIO() 
    c = pycurl.Curl() 
    checkurl = "http://<rm http address:port>/ws/v1/cluster/apps" #需要监控的地址
    c.setopt(pycurl.URL, checkurl) 
    c.setopt(pycurl.HTTPHEADER, ["Accept:"]) 
    c.setopt(pycurl.WRITEFUNCTION, b.write) #回调
    c.setopt(pycurl.FOLLOWLOCATION, 1) 
    c.setopt(pycurl.MAXREDIRS, 5) #重定向
    c.setopt(pycurl.CONNECTTIMEOUT, 60) #链接超时
    c.perform() #运行
    status = c.getinfo(c.HTTP_CODE) 
    if status!=200:  #HTTP状态码,200表示成功 
        return
    html = b.getvalue()

  通过运行这段程序以及官网上的API描述,我们发现html里面存储的信息是标准的json数据,这就非常好了,因为python提供了字典这种数据结构,若是我们可以把json数据转换为字典结构,那岂不是会方便很多?
  上网简单一搜索,发现python提供了json.loads()转换函数,果然厉害。
  
  

四、转换成Python字典结构

dic_a=json.loads(html)
dic_b=dic_a['apps']              #dic_b=second dic
list_c=dic_b['app']              #list_c= thrid list
for dic_d in list_c:             #dic_d= fourth dic
        print dic_d['state']

  看上述代码也能明白,json数据转换完后的字典,嵌套着字典和列表,需要一层层访问,才能获得想要的信息,而我想要的就是state这个作业状态 信息。
  至此,我们已经获取到了想要的内容,大家可以根据自己的需求来合理利用。
 
 

五、完整代码

  下面是按我的需求写的完整程序,带有计时功能,如果没有作业处于运行态,就启动倒计时,时间到了仍没有作业运行就发出提示(或者运行其他内容),倒计时期间有其他作业运行就关闭计时,直到又没有作业执行了,再次重新开始倒计时。

# -*- encoding: utf-8 -*-
import time
import threading 
import pycurl
import StringIO
import re
import json


st=0

def time_count():
    global st
    i=10
    while i>0:
        print "i:%d"%i
        i-=1
        time.sleep(1)
        if st==1:
            st=0
            return

    print "there is no running task"


def check_state():
    st_num=0
    b = StringIO.StringIO() 
    c = pycurl.Curl() 
    checkurl = "http://<rm http address:port>/ws/v1/cluster/apps" #需要监控的地址
    c.setopt(pycurl.URL, checkurl) 
    c.setopt(pycurl.HTTPHEADER, ["Accept:"]) 
    c.setopt(pycurl.WRITEFUNCTION, b.write) #回调
    c.setopt(pycurl.FOLLOWLOCATION, 1) 
    c.setopt(pycurl.MAXREDIRS, 5) #重定向
    c.setopt(pycurl.CONNECTTIMEOUT, 60) #链接超时
    c.perform() #运行
    status = c.getinfo(c.HTTP_CODE) 

    html = b.getvalue()

    dic_a=json.loads(html)
    dic_b=dic_a['apps']                  #dic_b=second dic
    if dic_b!=None:                  #没有作业时,列表为空,非空才可获取
        list_c=dic_b['app']              #list_c= thrid liebiao
        for dic_d in list_c:             #dic_d= fourth dic
            if dic_d['state']=='FINISHED':
                st_num+=1
        if st_num==len(list_c):          # 所有作业finished
            return 0
        else:
            return 1

    else:
        return 0

    c.close() 
    b.close() 



if __name__ == '__main__':      


    t1=threading.Thread(target=time_count)
    pnum=0;
    while True:
        pnum+=1
        print 'check state %d\n'%pnum
        re_state=check_state()
        if re_state==0:                                        #no task is running :
            if t1.is_alive() ==False:
                t1=threading.Thread(target=time_count)
                t1.start()
        if re_state==1:                                        # task is running :
            if t1.is_alive() ==True:
                st=1                                   #shutdown thread
        #other program
        time.sleep(1)


    #other program


一些说明
  本程序可以不使用线程,函数完全可以胜任,但由于后期还需改动,使用线程会更具模块化一些,方便扩展。
  

六、后记

  到这就已经结束了,但不知道有没有细心的朋友发现,所谓监控hadoop作业的状态,并且hadoop API还提供了json数据的返回形式,其实…

  来,我们看下面这段代码:

import urllib

url="http://<rm http address:port>/ws/v1/cluster/apps"
page = urllib.urlopen(url)
html = page.read()

reg = r'“state”:[A-Z]*'
imgre = re.compile(reg)
imglist = re.findall(imgre,html)
for content in imglist:
    print content

  是不是有点懵,恩,博主程序写到一半的时候也发现了,既然监控只是要确认作业state信息,那么我们直接把html爬下来,利用正则分析下,不是一样么?

简单粗暴,高效快捷。

  不黑了,当然,对于我们这个程序,爬虫会更高效一些,但hadoop信息千千万,我们所需的数据远不止一个state 这么简单,而利用API获得的json,则会有利于我们进行一些更加复杂的操作。

发表评论

您必须 登录 才能发表留言!