焦点速讯:大数据NiFi(二十):实时同步MySQL数据到Hive
实时同步MySQL数据到Hive
案例:将mysql中新增的数据实时同步到Hive中。
以上案例需要用到的处理器有:“CaptureChangeMySQL”、“RouteOnAttribute”、“EvaluateJsonPath”、“ReplaceText”、“PutHiveQL”。
首先通过“CaptureChangeMySQL”读取MySQL中数据的变化(需要开启MySQL binlog日志),将Binlog中变化的数据同步到“RouteOnAttribute”处理器,通过此处理器获取上游数据属性,获取对应binlog操作类型,再将想要处理的数据路由到“EvaluateJsonPath”处理器,该处理器可以将json格式的binlog数据解析,通过自定义json 表达式获取json数据中的属性放入FlowFile属性,将FlowFile通过“ReplaceText”处理器获取上游FowFile属性,动态拼接sql替换所有的FlowFile内容,将拼接好的sql组成FlowFile路由到“PutHiveQL”将数据写入到Hive表。
(相关资料图)
一、开启MySQL的binlog日志
mysql-binlog是MySQL数据库的二进制日志,记录了所有的DDL和DML(除了数据查询语句)语句信息。一般来说开启二进制日志大概会有1%的性能损耗。这里需要开启MySQL的binlog日志方便后期使用“CaptureChangeMySQL”处理器来获取MySQL中的CDC事件。MySQL的版本最好是5.7版本之上。
1、登录mysql查看MySQL是否开启binlog日志
[root@node2 ~]# mysql -u root -p123456mysql> show variables like "log_%";2 、开启mysql binlog日志
在/etc/my.cnf文件中[mysqld]下写入以下内容:
[mysqld]#随机指定一个不能和其他集群中机器重名的字符串server-id=123#配置binlog日志目录,配置后会自动开启binlog日志,并写入该目录log-bin=/var/lib/mysql/mysql-bin3、重启mysql 服务,重新查看binlog日志情况
[root@node2 ~]# service mysqld restart[root@node2 ~]# mysql -u root -p123456mysql> show variables like "log_%";二、配置“CaptureChangeMySQL”处理器
“CaptureChangeMySQL”主要是从MySQL数据库捕获CDC(Change Data Capture)事件。CDC事件包括INSERT,UPDATE,DELETE操作,事件按操作发生时的顺序输出为单独的FlowFile文件。
关于“CaptureChangeMySQL”处理器的“Properties”主要配置的说明如下:
配置项 | 默认值 | 允许值 | 描述 |
|---|---|---|---|
MySQL Hosts(MySQL 节点) | MySQL集群节点相对应的主机名/端口项的列表。多个节点使用逗号分隔,格式为:host1:port、host2:port…,处理器将尝试按顺序连接到列表中的主机。如果一个节点关闭,并且群集启用了故障转移,那么处理器将连接到活动节点。 | ||
MySQL Driver Class Name(MySQL驱动名称) | com.mysql.jdbc.Driver | MySQL数据库驱动程序类的类名。 | |
MySQL Driver Location(s)(MySQL驱动的位置) | 包含MySQL驱动程序包及其依赖项的文件/文件夹和/或url的逗号分隔列表(如果有),例如"/var/tmp/mysql-connector-java-5.1.38-bin.jar文件"。 | ||
Username(用户名) | 访问MySQL集群的用户名。 | ||
Password(密码) | 访问MySQL集群的密码。 | ||
Database/Schema Name Pattern(匹配数据库/Schema) | 用于根据CDC事件列表匹配数据库(或模式,具体取决于RDBMS类型)的正则表达式。正则表达式必须与存储在RDBMS中的数据库名称匹配。如果未设置属性,则数据库名称将不会用于筛选CDC事件。 | ||
Table Name Pattern(匹配表) | 用于匹配影响匹配表的CDC事件的正则表达式(regex)。regex必须与存储在数据库中的表名匹配。如果未设置属性,则不会根据表名筛选任何事件。 | ||
Max Wait Time(最大连接等待时长) | 30 seconds | 允许建立连接的最长时间,零表示实际上没有限制。 | |
Distributed Map Cache Client(分布式缓存客户端) | 指定用于保存处理器所需的各种表、列等信息的分布式映射缓存客户端控制器服务。如果未指定,则生成的事件将不包括列类型或名称等信息。 | ||
Retrieve All Records(检索所有记录) | true | ▪true▪false | 指定是否获取所有可用的CDC事件,而不考虑当前的binlog文件名或位置。如果处理器状态中存在binlog文件名和位置值,则忽略此属性的值。这允许4种不同的配置:1).如果处理器State中存在binlog数据,则State用来确定开始位置,并忽略Retrieve All Records的值。(目前NiFi版本测试有问题)2).如果处理器State中不存在binlog数据,此值设置为true意味着从头开始读取Binlog 数据。3).如果处理器State中不存在binlog数据,并且没有指定binlog文件名和位置,此值设置为false意味着从binlog尾部开始读取数据。4).如果处理器State中不存在binlog数据,并指定binlog文件名和位置,此值设置为false意味着从指定binlog尾部开始读取数据。 |
Include Begin/Commit Events(包含开始/提交事件) | false | ▪true▪false | 指定是否发出与二进制日志中的开始或提交事件相对应的事件。如果下游流中需要开始/提交事件,则设置为true,否则设置为false,这将抑制这些事件的生成并可以提高流性能。 |
Include DDL Events(标准表/列名) | false | ▪true▪false | 指定是否发出与数据定义语言(DDL)事件对应的事件,如ALTER TABLE、TRUNCATE TABLE。如果下游流中需要DDL事件,则设置为true,否则设置为false。为false时这将抑制这些事件的生成,并可以提高流性能。 |
配置步骤如下:
1、创建“CaptureChangeMySQL”处理器
2、配置“DistributeMapCacheServer”控制服务
监控mysql变化需要设置“DistributedMapCacheClient”控制服务,其对应的Server中存储处理器所需的各种表、列等信息,所以这里需要首先配置“DistributeMapCacheServer”控制服务。
3、配置“SCHEDULING”
由于这里使用“CaptureChangeMySQL”处理器监控“MySQL”中的数据,所以设置调度访问周期为“10s”,防止一直监听MySQL binlog数据,带来性能消耗。
4、配置“PROPERTIES”
在“CaptureChangeMySQL”处理器中配置“PROPERTIES”,配置如下:
MySQL Host : 192.168.179.5:3306MySQL Driver Class Name:com.mysql.jdbc.DriverMySQL Driver Location(s):/root/test/mysql-connector-java-5.1.47.jar注意:这里需要在每台NiFi节点上创建对应目录,上传mysql驱动包。
“PROPERTIES”配置如下:
此外,在“PROPERTIES”中还需要配置“Distributed Map Cache Client”控制服务,来读取“DistributeMapCacheServer”控制服务中的缓存数据:
另外,这里我们只是监控表“test2”对应的CDC事件,这里设置匹配表名为“test2”,最终“PROPERTIES”的配置如下:
注意:以上“Table Name Pattern”这里配置对应的Value值为:test2,也可以不配置,不配置会监控所有MySQL表的变化对应的binlog事件。当后面向Hive表中插入新增和更新数据时,对应MySQL中的元数据表也会变化,也会监控到对应的binlog事件。为了避免后期出现监控到其他表的binlog日志,这里建议配置上“test2”。
5、启动MySQL,创建表“test2”测试“CaptureChangeMySQL”处理器
登录mysql ,使用“mynifi”库,创建表“test2”。暂时设置“CaptureChangeMySQL”处理器“success”事件自动终止并启动,向表中插入对应的数据查看“CaptureChangeMySQL”处理器能否正常监控事件。
在mysql中创建对应的表:
use mynifi;create table test2 (id int,name varchar(255),age int);启动“CaptureChangeMySQL”处理器:
向表“test2”中插入以下数据:
insert into test2 values (1,"zs",18);update test2 set name = "ls" where id = 1;delete from test2 where id = 1;可以在“CaptureChangeMySQL”处理器中右键“View data provenance”查看捕获到的“insert”、“update”、“delete”事件:
注意问题:在配置好“CaptureChangeMySQL”处理器启动后,当MySQL中有数据插入、修改、删除时当前处理器会读取MySql binlog日志,并在当前处理器中记录读取binlog的位置状态。正常来说这里关闭“CaptureChangeMySQL”处理器后再次启动,会接着保存的binlog位置继续读取(可以参照“PROPERTIES”属性中“Retrieve All Records”配置说明),但是经过测试,此NiFi版本出现以下错误(无效的binlog位置,目测是一个版本bug错误):
所以在之后的测试中,我们可以将“CaptureChangeMysql”处理器读取binlog的状态清空,然后再次启动即可,这里会重复读取MySQL之前已经检测到的新增、修改、删除数据。
清空“CaptureChangeMysql”读取binlog状态:
三、配置“RouteOnAttribute”处理器
“RouteOnAttribute”是根据FlowFile的属性使用属性表达式进行数据路由。
关于“RouteOnAttribute”处理器的“Properties”主要配置的说明如下:
配置项 | 默认值 | 描述 |
|---|---|---|
Routing Strategy(路由策略) | Route to Property name | 指定在计算表达式语言时如何使用哪个关系。有如下几个关系可选择:▪Route to Property nameFlowFile的副本将被路由到对应的表达式计算结果为"true"的每个关系。▪Route to "matched" if all match要求所有用户定义的表达式求值都为"true",才认为FlowFile是匹配的。▪Route to "matched" if any matches至少有一个用户定义的表达式求值为"true",才能认为FlowFile是匹配的。 |
注意:该处理器允许用户自定义属性并指定该属性的匹配表达式。属性与动态属性指定的属性表达式相匹配的FileFlow,映射到动态属性上。
配置如下:
1、创建“RouteOnAttribute”处理器
2、配置“PROPERTIES”自定义属性
注意:以上自定义的属性中update、insert、delete对应的json 表达式写法为:${cdc.event.type:equals("delete")},代表匹配对应类型的FlowFile,“cdc.event.type”是上游FlowFile中的属性,“equales”是对应的方法,“delete”使用单引号引起,表示匹配的CDC事件。
3、连接“CaptureChangeMySQL”处理器与“RouteOnAttribute”处理器
四、配置“EvaluatejsonPath”处理器
“EvaluatejsonPath”处理器将根据上游“RouteOnAttribute”匹配的事件将内容映射成FlowFile属性,方便后期拼接SQL获取数据,上游匹配到的FlowFile中的数据格式为:
EvaluatejsonPath”处理器配置如下:
1、配置“EvaluatejsonPath”的“PROPERTIES”属性
2、连接“RouteOnAttribute”处理器和“EvaluatejsonPath”处理器
连接关系中,我们这里只关注“insert”和“update”的数据,后期获取对应的属性将插入和更新的数据插入到Hive表中,对于“delete”的数据可以路由到其他关系中,例如需要将删除数据插入到另外的Hive表中,可以再设置个分支处理。这里我们将“delete”和“failure”的数据设置自动终止关系。
设置“RouteOnAttribute”处理器其他匹配路由关系为自动终止:
五、配置“ReplaceText”处理器
“ReplaceText”处理器可以获取“EvaluatejsonPath”转换后FlowFile中的属性来替换原有数据组成一个“insert into ... values (... ...)”语句,方便后续将数据插入到Hive中。“ReplaceText”处理器的配置如下:
1、配置“RelaceText”处理器“PROPERTIES”属性
在“Replacement Value”中配置“insert into ${tablename} values (${id},"${name}",${age})”
注意:
以上获取的tablename名称为“test2”,后面这个sql是要将数据插入到Hive中的,所以这里在Hive中也应该创建“test2”的表名称,或者将表名称写成固定表,后期在Hive中创建对应的表即可。
另外,需要注意${name}在插入Hive中时对应的列为字符串,这里需要加上单引号。
2、连接“EvaluatejsonPath”处理器与“ReplaceText”处理器
配置“EvaluatjsonPath”处理器“failure”和“unmatch”路由关系为自动终止。
六、配置Hive 支持HiveServer2
访问Hive有两种方式:HiveServer2和Hive Client,Hive Client需要Hive和Hadoop的jar包,配置环境。HiveServer2使得连接Hive的Client从Yarn和HDFS集群中独立出来,不需要每个几点都配置Hive和Hadoop的jar包和一系列环境。
NiFi连接Hive就是使用了HiveServer2方式连接,所以这里需要配置HiveServer2。
配置HiveServer2步骤如下:
1、在Hive服务端配置hive-site.xml
#在Hive 服务端 $HIVE_HOME/etc/hive-site.xml中配置: hive.server2.thrift.port 10000 hive.server2.thrift.bind.host 192.168.179.4 2、在每台Hadoop 节点配置core-site.xml
hadoop.proxyuser.root.hosts * hadoop.proxyuser.root.groups * 3、重启HDFS ,Hive ,在Hive服务端启动Metastore和HiveServer2服务
nohup hive --service metastore >> ./nohup.out 2>&1 &nohup hive --service hiveserver2 >> ./nohup.out 2>&1 &4、在客户端通过beeline连接Hive
[root@node3 test]# beelinebeeline> !connect jdbc:hive2://node1:10000 rootEnter password for jdbc:hive2://node1:10000: 没有密码直接跳过即可0: jdbc:hive2://node1:10000> show tables;+------------------------------------+| tab_name |+------------------------------------+| personinfo || test2 |+------------------------------------+以上配置完成后,还需要将配置好的core-site.xml文件发送到各个NiFi节点对应的路径/root/test下替换原有的core-site.xml文件。之后重启NiFi集群,各个NiFi节点上执行命令:
service nifi restart七、配置“PutHiveQL”处理器
“PutHiveQL”主要执行HiveQL的DDL/DML命令,传入给该处理器的FlowFile内容是要执行的HiveQL命令。HiveQL命令可以使用“?”来指定参数,这种情况下,参数必须存在于FlowFile的属性中,命名约定为hiveql.args.N.type和hiveql.args.N.value,其中N为正整数。
关于“PutHiveQL”处理器的“Properties”主要配置的说明如下:
配置项 | 默认值 | 允许值 | 描述 |
|---|---|---|---|
Hive Database Connection Pooling Servic(Hive数据库连接池服务) | Hive Controller服务,用于获取与Hive数据库的连接。 | ||
Batch Size(批次大小) | 100 | 一批次读取FlowFile的个数。 | |
Character Set(编码) | UTF-8 | 指定数据的编码格式。 | |
Statement Delimiter(语句分隔符) | ; | 语句分隔符,用于分隔多个语句脚本中的SQL语句。 | |
Rollback On Failure(失败时回滚) | false | ▪true▪false | 指定如何处理错误。默认false指的是如果在处理FlowFile时发生错误,则FlowFile将根据错误类型路由到“failure”或“retry”关系,处理器继续处理下一个FlowFile。相反,可以设置为true回滚当前已处理的FlowFile,并立即停止进一步的处理。如果设置为true启用,失败的FlowFiles将停留在输入关系中并会反复处理,直到成功处理或通过其他方式将其删除为止。可以设置足够大的“Yield Duration”避免重试次数过多。 |
“PutHiveQL”处理器的配置如下:
1、创建“PutHiveQL”处理器
2、 配置“PROPERTIES”
点击之后,配置“HiveConnectionPool”控制服务:
注意以上需要配置:
“Database Connection URL” :这里是Hive的HiveServer2启动的节点,也就是服务端节点。“jdbc:hive2://192.168.179.4:10000”“Hive Configuration Resources”:“/root/test/hive-site.xml,/root/test/core-site.xml,/root/test/hdfs-site.xml”,这里需要将以上各个文件在NiFi集群各个节点对应位置准备好。“Database User”:root,这里防止操作Hive对应的HDFS时权限问题。配置完成后,需要启用对应的“HiveConnectionPool”控制服务:
最终配置“PROPERTIES”为:
3、连接“ReplaceText”处理器与“PutHiveQL”处理器并设置关系
设置“ReplaceText”处理器“failure”路由关系为自动终止:
设置“PutHiveQL”处理器路由关系为自动终止:
八、运行测试
1、在Hive中创建表“test2”
动HDFS,启动Hive服务端和客户端,创建表“test2”
create table test2 (id int,name string,age int )row format delimited fields terminated by "\t";2、启动NiFi处理数据流程,向MySQL中写入数据,查看Hive中表数据
首先清空“CaptureChangeMySQL”处理器的状态,单独启动“CaptureChangeMySQL”处理器,清空重新消费的数据(以上主要就是避免此版本NiFi bug问题),启动当前案例中其他NiFi处理器。
然后向MySQL中插入以下数据:
insert into test2 values (1,"zs",18);update test2 set name = "ls" where id = 1;delete from test2 where id = 1;NiFi页面:
Hive表test2中的结果:
推荐
- 焦点速讯:大数据NiFi(二十):实时同步MySQL数据到Hive
- 世界快看:2月24日国内邻苯产业链价格汇总
- 环球今日报丨袁心玥
- 热文:解褐在文言文中的意思_解褐
- 【世界播资讯】一生只爱你剧情介绍_这部剧讲述的是什么故事
- 当前热文:江苏宿迁:中心城区购房给予契税补贴最高100% 公积金贷款额提至70万
- 焦点快看:ange venus
- 热门看点:梦的作者
- 当前动态:上海农业银行
- 每日聚焦:学思陈刚书记讲话精神 校准果洛公安发展航向——果洛州公安局召开党委扩大会传达学习陈刚调研果洛讲话精神和全省平安建设会议精神
- 即时:斗罗248集:唐三击杀蚂蚁三兄弟,同时吸收三个魂环,千仞雪来了
- 全球聚焦:中国中学生新作文辞典
- 焦点讯息:智障小伙摸着媳妇肚子不知所措,花了60万彩礼,生儿子再奖88万
- 全球快报:炸蜈蚣没有毒吗_炸蜈蚣
- 世界即时看!采菊东篱下悠然见南山赏析
- 天天热消息:河南漯河:高层次人才公积金最高可贷80万元,实行“一人购房全家帮”
- 世界观热点:消费方式有哪几种_消费方式
- 【全球时快讯】八一福建群殴
- 聚焦:我的qq空间被封闭了是怎么回事_我的qq空间
- 当前速看:中药制药化学
- 世界快消息!斯基拉:雷比奇今夏可能离开米兰,一些海外俱乐部询问球员情况
- 环球今日讯!重游成都的第四天,重点是游人民公园
- 焦点滚动:新加坡潘玲玲
- 环球快资讯丨男子来潍务工右腿坏死,无依无靠
- 【世界时快讯】福特称将F150 Lightning的生产继续暂停一周
- 每日快播:进入最终测试阶段新一代三菱欧蓝德登场在即
- 环球最新:贾巴阿叁
- 【时快讯】上交所、深交所更新2023年港股通交易日安排
- 当前讯息:二搭是什么意思_二搭是什么意思
- 【当前热闻】贝克曼温度计
- 焦点报道:泫雅金晓钟什么时候在一起的_泫雅金晓钟将合体出道
- 天天短讯!我,阿拉伯女硕士,8年前来到中国,嫁中国老公,中国男人真能干
- 今头条!江苏省邳州市发布大雾黄色预警
- 热推荐:摒弃和抛弃的区别_摈弃和摒弃的区别
- 天天热门:万安科技:公司主要侧重于毫米波雷达在智能驾驶中的应用方向上的研究
- 天天速递!克霉唑栓需要放多深_克霉唑栓要放入多深
- 今日快讯:中博acca学费价目表_中博acca
- 全球快报:重构一个可配的查询统计到底有多难?
- 当前热文:云南糖网:今日广西现货市场糖价情况金十期货2月23日讯,今日(2月23日)上午白糖期货盘整,早盘截至11:25分主力2305合约成交价5959元,下跌14元,广西现货市场制糖企业、流通商报价如下
- 环球滚动:“龙抬头”Tony老师好忙
- 【快播报】眼角出现细纹怎么办_眼角细纹怎么去除
- 观速讯丨文明校园6个好_文明校园六好标准
- 即时焦点:“抢工”大潮来临,东莞千里奔袭广西招工
- 天天头条:田地的拼音的近义词
- 热资讯!谁能讲清《人世间》里涂志强、骆士宾、水自流三人到底是什么关系?
- 世界百事通!用基因检测辨别原产地,南农大拟推出茶叶“亲子鉴定”技术
- 焦点简讯:撒野歌词_歌曲撒野歌词
- 热讯:舌头溃疡要怎么治疗_舌头溃疡要怎么治
- 焦点简讯:昔日国内共享单车巨头!ofo小黄车被曝无法登录:超1600万人押金没退
- 天天热资讯!惊险!男子高空坠落,钢筋与下身大血管“擦身而过”
- 环球速读:楼月微信聊天记录导出恢复助手使用方法_楼月微信聊天记录导出恢复助手注册码是多少
- 环球热门:柠檬水是酸性的还是碱性的?
- 【环球报资讯】27万亿元银行理财市场发生这些新变化,快来了解一下
- 全球通讯!南网能源(003035)2月20日股东户数15.04万户,较上期增加0.49%
- 当前播报:金杯电工股东及高管减持期满未减持
- 每日速读!海外装机大增,多家企业竞速重注BBA主场
- 即时:蛊虫子在哪里_蛊虫子
- 前沿资讯!炉石传说 黑石山的火焰 科林烈酒 德鲁伊攻略
- 世界快报:重庆:举报社保基金违规行为,最高奖励10万元
- 天天视讯!【奔流·追踪】有期徒刑12年!陕西女子13岁被拐案一审宣判!
- 今日热议:2023年西咸新区推进14条道路建设
- 环球最新:今日簿的多音字组词语组词_簿的多音字组词
- 环球今亮点!陕西省城固县第二人民医院来宁县参观交流
- 【世界新视野】防城港市防城区:金花香飘产业升级路
- 环球播报:Circle K 母公司收购 True Blue Car Wash
- 天天快看:股票交易规则和注意事项_股票交易规则和注意事项有哪些
- 焦点简讯:没开封的香烟会发霉吗_没开封的香烟会过期吗
- 焦点播报:宇晶股份(002943)2月17日主力资金净卖出1488.80万元
- 天天视点!LGD首秀战胜VG,cat首发效果不错,小落成春季赛第一马超
- 全球微头条丨明镜高悬四海清打一生肖_明镜高悬
- 【天天新要闻】男子无保护措施站海盗船船头 具体详细内容是什么
- 报道:后室,但满级联动(哲慧视角(1)
- 世界快讯:每月养车成本6245元,零百加速为3.6秒,试驾体验兰博基尼Urus
- 当前速递!会玩!扣篮大赛间歇现场进行杂技表演 字母哥与梯子“互动”
- 全球短讯!k302次火车座位分布图2车厢_k302火车座位号分布图
- 天天关注:体检查出结节,会不会变癌?
- 全球看热讯:公益海报与商业海报的区别_公益海报设计的意义是什么
- 天天热点!心尚臻品品平台怎么样_心尚SIMSO是哪个国家的品牌
- 动态:一只银行股既在A股上市,又在港股上市,投资哪个收益更高?
- 视焦点讯!方框加对号怎么输入数字_方框加对号怎么输入
- 每日快看:叉烧肉怎么做好吃_叉烧肉怎么做好吃最简单的
- 【天天快播报】荣誉+1 金旺智能荣获“甲级 五星 工业机器人系统集成商”!
- 天天观速讯丨俄方要求美公布“北溪”天然气管道爆炸相关信息
- 新消息丨管道疏通车图片_管道疏通车
- 环球热文:摄像头怎么安装与手机远程监控_手机远程监控安装方法
- 世界观察:FUTURE DATA完成配售8000万股 净筹约1720万港元
- 信息:心花怒放,奔跑垫江!2022-2023重庆垫江牡丹马拉松定档3月26日
- 环球短讯!国内手机市场今年开局回暖 手机企业纷纷推出新款手机抢占市场
- 天天看热讯:北京怀柔区成功创建“基本无违法建设区”
- 焦点速读:苗圩:建议新能源汽车免购置税政策延续
- 焦点速讯:能治子宫肌瘤的小偏方_一个小偏方治子宫肌瘤
- 新动态:龟苓膏的做法与配方_龟苓膏怎么做
- 环球快消息!银翼杀手初观:人格、情感、向往自由,其他并无贵贱
- 【当前独家】韶关武江奋力推动文旅事业高质量发展
- 天天最新:中泰国际:给予国泰君安国际(01788.HK)“增持”评级 目标价0.90港元
- 报道:泸州临港投资11亿元私募项目状态更新为“已反馈”
- 每日观察!2022年深圳工业土地投资TOP50超90亿元,涉地数量68宗
- 每日动态!国家统计局:1月份一线城市商品住宅销售价格环比转涨 二三线城市环比降势趋缓
- 焦点速讯:联想A3000_联想a300手机
- 观焦点:鲁本-迪亚斯:如果我们发挥出最好一面,其他球队很难击败我们
