企业管理中,流程管理不可忽略!这几步教你破解流程管理难题,流程型企业打造生产运营管控一体化的几个路径
696
2022-05-28
1 概述
在ES中,写入单个文档的请求称为Index请求,批量写入的请求称为Bulk请求。它们都使用相同的处理逻辑,被统一封装为BulkRequest。以下从源码角度分析ES的bulk请求写入流程。
2 源码分析
2.1 Rest层请求转化为Transport层请求
2.1.1 RestController.java
1.ES会提前将处理各种http请求(GET、PUT、POST、DELETE等)的handler注册到一个handler列表中,RestController# registerHandler:
2.http请求发送到ES后,在RestController中进行实际的分发过程。首先根据http请求找到对应的handler,再调用handler的handleRequest方法处理请求:
3.handlerRequest调用prepareRequest:
2.1.1 RestBulkAction.java
1.对于bulk操作,其请求对应的handler是RestBulkAction,该类会在其构造函数中将其注册到RestController,代码如下:
2. RestBulkAction会将RestRequest解析并转化为BulkRequest,然后再对BulkRequest做处理,这块的逻辑在prepareRequest方法中,代码如下:
3.上图最后一行是NodeClient对bulk请求的处理
2.1.1 NodeClient.java
NodeClient在处理BulkRequest请求时,会将请求的action转化为对应Transport层的action,然后再由Transport层的action来处理BulkRequest,action转化的代码如下:
TransportAction会调用一个请求过滤链来处理请求:
对于Bulk请求,TransportAction的具体实现类为TransportBulkAction,其doExecute方法继续执行写入逻辑。至此转化完成。
2.2 协调节点处理并转发请求
2.2.1 TransportBulkAction.java
TransportBulkAction#doExecute先判断bulk请求中的索引是否存在,不存在则调用自动创建流程:
可见,逻辑为先遍历bulk中的索引,如果开启了自动创建索引则放到autoCreateIndices集合中,最后通过createIndex方法创建。
创建完index后,index的各shard已在数据节点上创建完成,协调节点将转发写入请求到文档对应的primary shard。协调节点转发的入口为TransportBulkAction #executeBulk方法:
执行逻辑在BulkOperation的doRun方法中:
1)首先遍历bulkRequest请求,然后根据请求的操作类型执行相应逻辑。对于index请求,会先根据IndexMetaData信息为每条写入请求生成路由信息,如果用户没有指定doc id,则会在process方法中生成:
2)然后根据每个IndexRequest请求的路由信息(默认为doc id)得到所要写入的目标shard id,再将DocWriteRequest封装为BulkItemRequest并添加到请求列表:
3)然后将请求按shard分组封装为BulkShardRequest并交由TransportShardBulkAction来处理:
4)执行逻辑最终进入TransportReplicationAction#doRun方法
2.2.2 TransportReplicationAction.java
TransportReplicationAction#doRun会通过ClusterState获取到primary shard的路由信息,然后得到primary shard所在的node,如果node为当前协调节点则直接将请求发往本地,否则发往远端:
2.3 主分片再副本分片节点执行写入
2.3.1 ReplicationOperation.java
ReplicationOperation#execute方法执行主分片节点写入:
primary.perform执行主分片写入,主分片写入完成调用handlerPrimaryResult方法,发送写副本分片的请求:
2.3.2 TransportShardBulkAction.java
着重看写主分片的逻辑,在TransportShardBulkAction#shardOperationOnPrimary方法中:
再调用InternalEngine.Index将数据写入lucene,再写入translog:
写lucene和translog的整体流程如下:
(1)数据写入buffer缓冲和translog日志文件
(2)每隔一秒钟,buffer中的数据被写入新的segment file,并进入os cache,此时segment被打开并供search使用
(3)buffer被清空
(4)重复1~3,新的segment不断添加,buffer不断被清空,而translog中的数据不断累加
(5)当translog长度达到一定程度的时候,commit操作发生:
(5-1)buffer中的所有数据写入一个新的segment,并写入os cache,打开供使用
(5-2)buffer被清空
(5-3)一个commit ponit被写入磁盘,标明了所有的index segment
(5-4)filesystem cache中的所有index segment file缓存数据,被fsync强行刷到磁盘上
(5-5)现有的translog被清空,创建一个新的translog
以上写lucene和translog对应ES中几个关键概念:
(1)fresh: 内存缓冲区被清空写到段中,段被打开可进行搜索
(2)commit point: 记录当前所有可用的segement
(3)flush: 内存缓冲区被清空写到段中,一个提交点被写入硬盘,文件系统缓存通过 fsync 被刷新,老的 translog 被删除
(4)fsync: 即/_flush/sync命令,逻辑是flush translog并且将sync_id同步到各个分片,可以实现快速恢复
Elasticsearch NAT
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。