Flume抽取Oracle中的数据到Kafka

By | 2019年2月2日

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/gdkyxy2013/article/details/86646781

1.1 Flume的安装

1)下载Flume

       从Flume官网(http://flume.apache.org/download.html)下载对应版本的Flume,这里使用的是Flume的版本是1.7。

2)解压Flume

      新建flume目录,将下载的Flume上传到该目录下,执行如下命令进行解压:

tar -zxvf ./apache-flume-1.7.0-bin.tar.gz -C ./

3)解压后进入Flume的lib目录下,将Oracle数据库的驱动包拷贝到此目录下。

1.2 Flume对接Kafka

       Flume的使用非常的简单,只需要在编辑一个配置文件即可。配置文件可以自定义,这里命名为flume-topic-oracle.conf。

  • 声明source、channel、sink并声明source类型。

  • 声明数据库连接、用户名、密码等。

  • 设置自动提交、Oracle方言、数据库驱动等。

  • 设置查询间隔、Flume状态文件位置及名称等。

       其中,Flume状态文件会在启动Flume的时候自动创建,里面保存了查询语句、最后一次查询的最大索引号以及数据库链接等信息,如下图所示:

        Flume的状态文件在每一次启动Flume的时候都会自动创建一个,理论上不需要删除,但经过测试,在不删除的情况下,导入到Inceptor的数据有时会存在重复的情况。所以建议在每次停止Flume服务之后将Flume状态文件删除。

  • 设置查询的SQL语句以及从哪条数据开始查询。

  • 设置分批参数以及c3p0连接池参数。

  • 设置channel为内存模式。

  • 设置sink的类型

        sink可以设置为输出到本地、Kafka或者HDFS等。此处,设置输出到Kafka,方便后续Flume、Kafka、Slipstream的整合。

  • 连接source、channel、sink

1.3 测试Flume+Kafka

      Flume相应的配置文件写好之后,可以直接在Flume文件夹下执行如下命令启动Flume进行测试:

bin/flume-ng agent --name a1 --conf ./conf --conf-file ./conf/flume-topic-oracle.conf -Dflume.root.logger=INFO,console -Djava.security.auth.login.config=/opt/flume/apache-flume-1.7.0-bin/conf/jaas.conf

       但是,通常情况下,配置文件写好之后,在不变动的情况下,一般不会停止Flume服务,上述命令,会进去一个Flume运行的前台界面,这是最好使用前台转后台的方法启动Flume,如下:

nohup bin/flume-ng agent --name a1 --conf ./conf --conf-file ./conf/flume-topic-oracle.conf -Dflume.root.logger=INFO,console -Djava.security.auth.login.config=/opt/flume/apache-flume-1.7.0-bin/conf/jaas.conf &

       其中,–conf指向参数存在的目录,–conf-file指向2.2中配置的参数。

       启动Flume之后,向Oracle对应表中插入一条数据,如下图所示:

      在集群中,启动Kafka消费者,可以发现,消费者中可以接收到刚刚插入的数据。

      至此,Flume对接Kafka成功。

发表评论