基于FusionInsight开发智能搜车系统

网友投稿 575 2022-05-30

一、背景

1.1、业务背景

XX市交警智能搜车系统,通过各卡口摄像头采集每天过往的车辆信息,存入数据库。交警人员在页面上可根据灵活条件进行车辆信息查询、违章查询、或者描绘车辆轨迹。

卡口数据包括两种:1、以csv格式保存的历史文本数据;2、实时卡口数据

项目需求:

1.         根据精确车牌号查出车辆信息

2.         根据模糊车牌号查出车辆信息

3.         查出没有系安全带的违法车辆信息

4.         查处符合某时间规则的车辆信息

5.         根据车辆颜色和品牌查出车辆信息

1.2、平台选型

FI选型使用 HD8.0版本,对应Spark版本2.4.5,ES版本7.6.0,HBase版本2.2.3

二、方案设计

2.1、数据模型

车辆详细信息字段有上百个,全量保存在HBase表中,如下是抽取出的部分需做索引的字段,存储到ES中

2.2、数据流向

2.3、HBase表设计

1、避免单表数据量过大,以月为单位建表

2、以单条记录唯一主键 id 作为rowkey

三、历史数据处理

1、历史数据通过bulkload方式导入HBase,具体步骤可参考帖子  https://bbs.huaweicloud.com/forum/thread-66116-1-1.html

基于FusionInsight开发智能搜车系统

2、导入HBase的数据再导入ES,可通过HBase2ES迁移工具,具体可参考 https://bbs.huaweicloud.com/forum/thread-71359-1-1.html

四、实时数据流

4.1、模拟数据流生成

我们通过python脚本(见附件)定时生成数据文件,定时放到指定目录,例如/var/log/realtimeLog

脚本data_gen.py存放于部署有flume客户端的节点的/opt/test目录下,执行命令为

nohup python data_gen.py  -i  /opt/test  -o /var/log/realtimeLog &

生成的样例数据如图:

4.2、配置Flume

我们这里直接将数据文件通过Flume发往Kafka,使用Kafka普通模式端口,检查kafka配置 allow.everyone.if.no.acl.found为true

Flume配置文件通过Manager页面上的Flume配置工具生成,如图:

Flume客户端下载后,解压缩,得到安装脚本,路径为:

/tmp/flume-Client/FusionInsight_Cluster_1_Flume_ClientConfig/Flume/FlumeClient

通过该路径下的install.sh 安装flume客户端

./install.sh -d /opt/realtimeFlume -c /opt/test/properties.properties -f 10.244.230.213 -n realtimeTest

其中-d为安装目录,-c为指定的上面生成的配置文件,-f为Flume Monitor实例的一个IP,-n为客户端指定的名称

安装完毕后,启动测试数据生成脚本

通过kafka脚本验证数据是否有生成:

4.3、SparkStreaming读取kafka数据

该部分详细可参考产品文档《应用开发指南》-《安全模式》-《Spark2x开发指南》-《开发程序》-《SparkStreaming对接Kafka0-10程序》-《Java样例代码》部分

4.4、SparkStreaming写入HBase和ES

获取到 微批 JavaStreamingContext,通过foreachRDD写入HBase和ES

HBase主要调用HBase的API,ES调用的是low level rest Client接口,相关代码可参考附件

五、打包

建议不要将依赖打成一个包,如果有版本更改,可方便后面替换依赖包

通过 Artifacts 打包如图

可能碰到如下错误,是因为没有引入scala的SDK

在Global Libraries中引入scala的SDK即可

六、配置客户端

因为Kafka的认证信息是通过JAAS认证机制,通过Spark自带的—keytab自己无法解决认证对接的问题,需要单独处理。

Spark中driver和executor默认分别使用客户端/Spark2x/spark/conf目录下的jaas.conf(driver端)和jaas-zk.conf(executor端)进行认证。

1、jaas.conf文件内容参考,注意keyTab指定的是提交任务的节点上存放user.keytab的绝对路径

Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/opt/client/Spark2x/spark/user.keytab" principal="fwc" useTicketCache=false storeKey=true debug=true; };

2、jaas-zk.conf文件内容参考,这里keyTab是上传到executor后的user.keytab的路径,使用相对路径

Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="./user.keytab" principal="fwc" useTicketCache=false storeKey=true debug=true; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="./user.keytab" principal="fwc" useTicketCache=false storeKey=true debug=true; };

3、将连接HBase的依赖配置文件拷贝到客户端/Spark2x/spark/conf目录下

包括HBase客户端下的 core-site.xml、hdfs-site.xml和hbase-site.xml

4、创建提交任务的临时目录,例如/opt/sparkTest,如图,将依赖的jar包放到该目录下的lib目录下,创建启动任务的submit.sh

该目录下的连接ES的配置文件为 esParams.properties,该文件参数参考附件

七、提交任务查看执行结果

7.1、提交spark任务

执行上面的 /opt/sparkTest/submit.sh脚本即可

7.2、查看hbase数据

通过hbase shell命令行可查询数据已经导入 vehicle_table_202008 表,后缀是当前的年月

7.3、验证ES中数据已有数据

附件: sparkStreamingDemo.zip 7.41KB 下载次数:3次

FusionInsight 大数据

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Taskflow 有兴趣了解一下?
下一篇:Voronoi图及相关第三方库概述
相关文章