ES写入内核流程

网友投稿 707 2022-05-28

1     概述

在ES中,写入单个文档的请求称为Index请求,批量写入的请求称为Bulk请求。它们都使用相同的处理逻辑,被统一封装为BulkRequest。以下从源码角度分析ES的bulk请求写入流程。

2     源码分析

2.1      Rest层请求转化为Transport层请求

2.1.1        RestController.java

1.ES会提前将处理各种http请求(GET、PUT、POST、DELETE等)的handler注册到一个handler列表中,RestController# registerHandler:

2.http请求发送到ES后,在RestController中进行实际的分发过程。首先根据http请求找到对应的handler,再调用handler的handleRequest方法处理请求:

3.handlerRequest调用prepareRequest:

2.1.1        RestBulkAction.java

1.对于bulk操作,其请求对应的handler是RestBulkAction,该类会在其构造函数中将其注册到RestController,代码如下:

2. RestBulkAction会将RestRequest解析并转化为BulkRequest,然后再对BulkRequest做处理,这块的逻辑在prepareRequest方法中,代码如下:

3.上图最后一行是NodeClient对bulk请求的处理

2.1.1        NodeClient.java

NodeClient在处理BulkRequest请求时,会将请求的action转化为对应Transport层的action,然后再由Transport层的action来处理BulkRequest,action转化的代码如下:

TransportAction会调用一个请求过滤链来处理请求:

对于Bulk请求,TransportAction的具体实现类为TransportBulkAction,其doExecute方法继续执行写入逻辑。至此转化完成。

2.2      协调节点处理并转发请求

ES写入内核流程

2.2.1        TransportBulkAction.java

TransportBulkAction#doExecute先判断bulk请求中的索引是否存在,不存在则调用自动创建流程:

可见,逻辑为先遍历bulk中的索引,如果开启了自动创建索引则放到autoCreateIndices集合中,最后通过createIndex方法创建。

创建完index后,index的各shard已在数据节点上创建完成,协调节点将转发写入请求到文档对应的primary shard。协调节点转发的入口为TransportBulkAction #executeBulk方法:

执行逻辑在BulkOperation的doRun方法中:

1)首先遍历bulkRequest请求,然后根据请求的操作类型执行相应逻辑。对于index请求,会先根据IndexMetaData信息为每条写入请求生成路由信息,如果用户没有指定doc id,则会在process方法中生成:

2)然后根据每个IndexRequest请求的路由信息(默认为doc id)得到所要写入的目标shard id,再将DocWriteRequest封装为BulkItemRequest并添加到请求列表:

3)然后将请求按shard分组封装为BulkShardRequest并交由TransportShardBulkAction来处理:

4)执行逻辑最终进入TransportReplicationAction#doRun方法

2.2.2        TransportReplicationAction.java

TransportReplicationAction#doRun会通过ClusterState获取到primary shard的路由信息,然后得到primary shard所在的node,如果node为当前协调节点则直接将请求发往本地,否则发往远端:

2.3      主分片再副本分片节点执行写入

2.3.1        ReplicationOperation.java

ReplicationOperation#execute方法执行主分片节点写入:

primary.perform执行主分片写入,主分片写入完成调用handlerPrimaryResult方法,发送写副本分片的请求:

2.3.2        TransportShardBulkAction.java

着重看写主分片的逻辑,在TransportShardBulkAction#shardOperationOnPrimary方法中:

再调用InternalEngine.Index将数据写入lucene,再写入translog:

写lucene和translog的整体流程如下:

(1)数据写入buffer缓冲和translog日志文件

(2)每隔一秒钟,buffer中的数据被写入新的segment file,并进入os cache,此时segment被打开并供search使用

(3)buffer被清空

(4)重复1~3,新的segment不断添加,buffer不断被清空,而translog中的数据不断累加

(5)当translog长度达到一定程度的时候,commit操作发生:

(5-1)buffer中的所有数据写入一个新的segment,并写入os cache,打开供使用

(5-2)buffer被清空

(5-3)一个commit ponit被写入磁盘,标明了所有的index segment

(5-4)filesystem cache中的所有index segment file缓存数据,被fsync强行刷到磁盘上

(5-5)现有的translog被清空,创建一个新的translog

以上写lucene和translog对应ES中几个关键概念:

(1)fresh: 内存缓冲区被清空写到段中,段被打开可进行搜索

(2)commit point: 记录当前所有可用的segement

(3)flush: 内存缓冲区被清空写到段中,一个提交点被写入硬盘,文件系统缓存通过 fsync 被刷新,老的 translog 被删除

(4)fsync: 即/_flush/sync命令,逻辑是flush translog并且将sync_id同步到各个分片,可以实现快速恢复

Elasticsearch NAT

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:【大数据全栈成长计划 Hadoop学习篇】第三阶段最终积分排行榜和最终考核成绩出炉!
下一篇:《多线程系列二》不理解future怎么能有future?
相关文章