ElasticSearch 中最重要原理是文档的索引和读取!
单个文档
新建单个文档所需要的步骤顺序:
多个文档
使用 bulk 修改多个文档步骤顺序:
先看下整体的索引流程
shard = hash(document_id) % (num_of_primary_shards)
分步骤看数据持久化过程
通过分步骤看数据持久化过程:write -> refresh -> flush -> merge
这时候数据还没到 segment ,是搜不到这个新文档的。数据只有被 refresh 后,才可以被搜索到。
refresh 默认 1 秒钟,执行一次上图流程。ES 是支持修改这个值的,通过 index.refresh_interval 设置 refresh (冲刷)间隔时间。refresh 流程大致如下:
in-memory buffer 中的文档写入到新的 segment 中,但 segment 是存储在文件系统的缓存中。此时文档可以被搜索到
最后清空 in-memory buffer。注意: Translog 没有被清空,为了将 segment 数据写到磁盘
文档经过 refresh 后,segment 暂时写到文件系统缓存,这样避免了性能 IO 操作,又可以使文档搜索到。refresh 默认 1 秒执行一次,性能损耗太大。一般建议稍微延长这个 refresh 时间间隔,比如 5s。因此,ES 其实就是准实时,达不到真正的实时。
flush 过程
每隔一段时间—例如 translog 变得越来越大—索引被刷新(flush);一个新的 translog 被创建,并且一个全量提交被执行。
上个过程中 segment 在文件系统缓存中,会有意外故障文档丢失。那么,为了保证文档不会丢失,需要将文档写入磁盘。那么文档从文件缓存写入磁盘的过程就是 flush。写入磁盘后,清空 translog。具体过程如下:
所有在内存缓冲区的文档都被写入一个新的段。
缓冲区被清空。
一个Commit Point被写入硬盘。
文件系统缓存通过 fsync 被刷新(flush)。
老的 translog 被删除。
merge 过程
由于自动刷新流程每秒会创建一个新的段 ,这样会导致短时间内的段数量暴增。而段数目太多会带来较大的麻烦。每一个段都会消耗文件句柄、内存和cpu运行周期。更重要的是,每个搜索请求都必须轮流检查每个段;所以段越多,搜索也就越慢。
Elasticsearch通过在后台进行Merge Segment来解决这个问题。小的段被合并到大的段,然后这些大的段再被合并到更大的段。
当索引的时候,刷新(refresh)操作会创建新的段并将段打开以供搜索使用。合并进程选择一小部分大小相似的段,并且在后台将它们合并到更大的段中。这并不会中断索引和搜索。
一旦合并结束,老的段被删除:
写操作的关键点
在考虑或分析一个分布式系统的写操作时,一般需要从下面几个方面考虑:
Elasticsearch作为分布式系统,也需要在写入的时候满足上述的四个特点,我们在后面的写流程介绍中会涉及到上述四个方面。
接下来,我们一层一层剖析Elasticsearch内部的写机制。
Lucene的写
众所周知,Elasticsearch 内部使用了 Lucene 完成索引创建和搜索功能,Lucene中写操作主要是通过 IndexWriter 类实现,IndexWriter 提供三个接口:
public long addDocument();
public long updateDocuments();
public long deleteDocuments();
通过这三个接口可以完成单个文档的写入,更新和删除功能,包括了分词,倒排创建,正排创建等等所有搜索相关的流程。只要Doc通过IndesWriter写入后,后面就可以通过IndexSearcher搜索了,看起来功能已经完善了,但是仍然有一些问题没有解:
上述问题,在Lucene中是没有解决的,那么就需要Elasticsearch中解决上述问题。
我们再来看Elasticsearch中的写机制。
Elasticsearch的写
Elasticsearch采用多Shard方式,通过配置routing规则将数据分成多个数据子集,每个数据子集提供独立的索引和搜索功能。当写入文档的时候,根据routing规则,将文档发送给特定Shard中建立索引。这样就能实现分布式了。
此外,Elasticsearch整体架构上采用了一主多副的方式:
每个Index由多个Shard组成,每个Shard有一个主节点和多个副本节点,副本个数可配。但每次写入的时候,写入请求会先根据_routing规则选择发给哪个Shard,Index Request中可以设置使用哪个Filed的值作为路由参数,如果没有设置,则使用Mapping中的配置,如果mapping中也没有配置,则使用_id作为路由参数,然后通过_routing的Hash值选择出Shard(在OperationRouting类中),最后从集群的Meta中找出出该Shard的Primary节点。
请求接着会发送给Primary Shard,在Primary Shard上执行成功后,再从Primary Shard上将请求同时发送给多个Replica Shard,请求在多个Replica Shard上执行成功并返回给Primary Shard后,写入请求执行成功,返回结果给客户端。
这种模式下,写入操作的延时就等于latency = Latency(Primary Write) + Max(Replicas Write)。只要有副本在,写入延时最小也是两次单Shard的写入时延总和,写入效率会较低,但是这样的好处也很明显,避免写入后,单机或磁盘故障导致数据丢失,在数据重要性和性能方面,一般都是优先选择数据,除非一些允许丢数据的特殊场景。
采用多个副本后,避免了单机或磁盘故障发生时,对已经持久化后的数据造成损害,但是Elasticsearch里为了减少磁盘IO保证读写性能,一般是每隔一段时间(比如5分钟)才会把Lucene的Segment写入磁盘持久化,对于写入内存,但还未Flush到磁盘的Lucene数据,如果发生机器宕机或者掉电,那么内存中的数据也会丢失,这时候如何保证?
对于这种问题,Elasticsearch学习了数据库中的处理方式:增加CommitLog模块,Elasticsearch中叫TransLog。
在每一个Shard中,写入流程分为两部分,先写入Lucene,再写入TransLog。
写入请求到达Shard后,先写Lucene文件,创建好索引,此时索引还在内存里面,接着去写TransLog,写完TransLog后,刷新TransLog数据到磁盘上,写磁盘成功后,请求返回给用户。这里有几个关键点:
上面介绍了Elasticsearch在写入时的两个关键模块,Replica和TransLog,接下来,我们看一下Update流程:
Lucene中不支持部分字段的Update,所以需要在Elasticsearch中实现该功能,具体流程如下:
介绍完部分更新的流程后,大家应该从整体架构上对Elasticsearch的写入有了一个初步的映象,接下来我们详细剖析下写入的详细步骤。
Elasticsearch写入请求类型
Elasticsearch中的写入请求类型,主要包括下列几个:Index(Create),Update,Delete和Bulk,其中前3个是单文档操作,后一个Bulk是多文档操作,其中Bulk中可以包括Index(Create),Update和Delete。
在6.0.0及其之后的版本中,前3个单文档操作的实现基本都和Bulk操作一致,甚至有些就是通过调用Bulk的接口实现的。估计接下来几个版本后,Index(Create),Update,Delete都会被当做Bulk的一种特例化操作被处理。这样,代码和逻辑都会更清晰一些。
下面,我们就以Bulk请求为例来介绍写入流程。
Client Node
Client Node 也包括了前面说过的Parse Request,这里就不再赘述了,接下来看一下其他的部分。
1.Ingest Pipeline
在这一步可以对原始文档做一些处理,比如HTML解析,自定义的处理,具体处理逻辑可以通过插件来实现。在Elasticsearch中,由于Ingest Pipeline会比较耗费CPU等资源,可以设置专门的Ingest Node,专门用来处理Ingest Pipeline逻辑。
如果当前Node不能执行Ingest Pipeline,则会将请求发给另一台可以执行Ingest Pipeline的Node。
2.Auto Create Index
判断当前Index是否存在,如果不存在,则需要自动创建Index,这里需要和Master交互。也可以通过配置关闭自动创建Index的功能。
3.Set Routing
设置路由条件,如果Request中指定了路由条件,则直接使用Request中的Routing,否则使用Mapping中配置的,如果Mapping中无配置,则使用默认的_id字段值。
在这一步中,如果没有指定id字段,则会自动生成一个唯一的_id字段,目前使用的是UUID。
4.Construct BulkShardRequest
由于Bulk Request中会包括多个(Index/Update/Delete)请求,这些请求根据routing可能会落在多个Shard上执行,这一步会按Shard挑拣Single Write Request,同一个Shard中的请求聚集在一起,构建BulkShardRequest,每个BulkShardRequest对应一个Shard。
5.Send Request To Primary
这一步会将每一个BulkShardRequest请求发送给相应Shard的Primary Node。
Primary Node
Primary 请求的入口是在PrimaryOperationTransportHandler的messageReceived,我们来看一下相关的逻辑流程。
1.Index or Update or Delete
循环执行每个Single Write Request,对于每个Request,根据操作类型(CREATE/INDEX/UPDATE/DELETE)选择不同的处理逻辑。
其中,Create/Index是直接新增Doc,Delete是直接根据_id删除Doc,Update会稍微复杂些,我们下面就以Update为例来介绍。
2.Translate Update To Index or Delete
这一步是Update操作的特有步骤,在这里,会将Update请求转换为Index或者Delete请求。首先,会通过GetRequest查询到已经存在的同_id Doc(如果有)的完整字段和值(依赖_source字段),然后和请求中的Doc合并。同时,这里会获取到读到的Doc版本号,记做V1。
3.Parse Doc
这里会解析Doc中各个字段。生成ParsedDocument对象,同时会生成uid Term。在Elasticsearch中,_uid = type # _id,对用户,_Id可见,而Elasticsearch中存储的是_uid。这一部分生成的ParsedDocument中也有Elasticsearch的系统字段,大部分会根据当前内容填充,部分未知的会在后面继续填充ParsedDocument。
4.Update Mapping
Elasticsearch中有个自动更新Mapping的功能,就在这一步生效。会先挑选出Mapping中未包含的新Field,然后判断是否运行自动更新Mapping,如果允许,则更新Mapping。
5.Get Sequence Id and Version
由于当前是Primary Shard,则会从SequenceNumber Service获取一个sequenceID和Version。SequenceID在Shard级别每次递增1,SequenceID在写入Doc成功后,会用来初始化LocalCheckpoint。Version则是根据当前Doc的最大Version递增1。
6.Add Doc To Lucene
这一步开始的时候会给特定_uid加锁,然后判断该_uid对应的Version是否等于之前Translate Update To Index步骤里获取到的Version,如果不相等,则说明刚才读取Doc后,该Doc发生了变化,出现了版本冲突,这时候会抛出一个VersionConflict的异常,该异常会在Primary Node最开始处捕获,重新从“Translate Update To Index or Delete”开始执行。
如果Version相等,则继续执行,如果已经存在同id的Doc,则会调用Lucene的UpdateDocument(uid, doc)接口,先根据uid删除Doc,然后再Index新Doc。如果是首次写入,则直接调用Lucene的AddDocument接口完成Doc的Index,AddDocument也是通过UpdateDocument实现。
这一步中有个问题是,如何保证Delete-Then-Add的原子性,怎么避免中间状态时被Refresh?答案是在开始Delete之前,会加一个Refresh Lock,禁止被Refresh,只有等Add完后释放了Refresh Lock后才能被Refresh,这样就保证了Delete-Then-Add的原子性。
Lucene的UpdateDocument接口中就只是处理多个Field,会遍历每个Field逐个处理,处理顺序是invert index,store field,doc values,point dimension,后续会有文章专门介绍Lucene中的写入。
7.Write Translog
写完Lucene的Segment后,会以keyvalue的形式写TransLog,Key是_id,Value是Doc内容。当查询的时候,如果请求是GetDocByID,则可以直接根据_id从TransLog中读取到,满足NoSQL场景下的实时性要去。
需要注意的是,这里只是写入到内存的TransLog,是否Sync到磁盘的逻辑还在后面。
这一步的最后,会标记当前SequenceID已经成功执行,接着会更新当前Shard的LocalCheckPoint。
8.Renew Bulk Request
这里会重新构造Bulk Request,原因是前面已经将UpdateRequest翻译成了Index或Delete请求,则后续所有Replica中只需要执行Index或Delete请求就可以了,不需要再执行Update逻辑,一是保证Replica中逻辑更简单,性能更好,二是保证同一个请求在Primary和Replica中的执行结果一样。
9.Flush Translog
这里会根据TransLog的策略,选择不同的执行方式,要么是立即Flush到磁盘,要么是等到以后再Flush。Flush的频率越高,可靠性越高,对写入性能影响越大。
10.Send Requests To Replicas
这里会将刚才构造的新的Bulk Request并行发送给多个Replica,然后等待Replica的返回,这里需要等待所有Replica返回后(可能有成功,也有可能失败),Primary Node才会返回用户。如果某个Replica失败了,则Primary会给Master发送一个Remove Shard请求,要求Master将该Replica Shard从可用节点中移除。
这里,同时会将SequenceID,PrimaryTerm,GlobalCheckPoint等传递给Replica。
发送给Replica的请求中,Action Name等于原始ActionName + [R],这里的R表示Replica。通过这个[R]的不同,可以找到处理Replica请求的Handler。
11.Receive Response From Replicas
Replica中请求都处理完后,会更新Primary Node的LocalCheckPoint。
Replica Node
Replica 请求的入口是在ReplicaOperationTransportHandler的messageReceived,我们来看一下相关的逻辑流程。
1.Index or Delete
根据请求类型是Index还是Delete,选择不同的执行逻辑。这里没有Update,是因为在Primary Node中已经将Update转换成了Index或Delete请求了。
2.Parse Doc
3.Update Mapping
以上都和Primary Node中逻辑一致。
4.Get Sequence Id and Version
Primary Node中会生成Sequence ID和Version,然后放入ReplicaRequest中,这里只需要从Request中获取到就行。
5.Add Doc To Lucene
由于已经在Primary Node中将部分Update请求转换成了Index或Delete请求,这里只需要处理Index和Delete两种请求,不再需要处理Update请求了。比Primary Node会更简单一些。
6.Write Translog
7.Flush Translog
以上都和Primary Node中逻辑一致。
上面详细介绍了Elasticsearch的写入流程及其各个流程的工作机制,我们在这里再次总结下之前提出的分布式系统中的六大特性:
前文介绍了索引文档流程,本文带你理解 ES 文档的读取过程。
先看下整体的查询流程
单个文档
以下是从主分片或者副本分片检索文档的步骤顺序:
在处理读取请求时,协调结点在每次请求的时候都会通过轮询所有的副本分片来达到负载均衡。
在文档被检索时,已经被索引的文档可能已经存在于主分片上但是还没有复制到副本分片。在这种情况下,副本分片可能会报告文档不存在,但是主分片可能成功返回文档。一旦索引请求成功返回给用户,文档在主分片和副本分片都是可用的。
多个文档
使用 mget 取回多个文档的步骤顺序:
以下是使用单个 mget 请求取回多个文档所需的步骤顺序:
所有的搜索系统一般都是两阶段查询,第一阶段查询到匹配的DocID,第二阶段再查询DocID对应的完整文档,这种在Elasticsearch中称为query_then_fetch。
一致性指的是写入成功后,下次读操作一定要能读取到最新的数据。对于搜索,这个要求会低一些,可以有一些延迟。但是对于NoSQL数据库,则一般要求最好是强一致性的。
结果匹配上,NoSQL作为数据库,查询过程中只有符合不符合两种情况,而搜索里面还有是否相关,类似于NoSQL的结果只能是0或1,而搜索里面可能会有0.1,0.5,0.9等部分匹配或者更相关的情况。
结果召回上,搜索一般只需要召回最满足条件的Top N结果即可,而NoSQL一般都需要返回满足条件的所有结果。
搜索系统一般都是两阶段查询,第一个阶段查询到对应的Doc ID,也就是PK;第二阶段再通过Doc ID去查询完整文档,而NoSQL数据库一般是一阶段就返回结果。在Elasticsearch中两种都支持。
目前NoSQL的查询,聚合、分析和统计等功能上都是要比搜索弱的。
Elasticsearch使用了Lucene作为搜索引擎库,通过Lucene完成特定字段的搜索等功能,在Lucene中这个功能是通过IndexSearcher的下列接口实现的:
public TopDocs search(Query query, int n);
public Document doc(int docID);
public int count(Query query);
......(其他)
第一个search接口实现搜索功能,返回最满足Query的N个结果;第二个doc接口通过doc id查询Doc内容;第三个count接口通过Query获取到命中数。
这三个功能是搜索中的最基本的三个功能点,对于大部分Elasticsearch中的查询都是比较复杂的,直接用这个接口是无法满足需求的,比如分布式问题。这些问题都留给了Elasticsearch解决,我们接下来看Elasticsearch中相关读功能的剖析。
Elasticsearch的读Elasticsearch中每个Shard都会有多个Replica,主要是为了保证数据可靠性,除此之外,还可以增加读能力,因为写的时候虽然要写大部分Replica Shard,但是查询的时候只需要查询Primary和Replica中的任何一个就可以了。
在上图中,该Shard有1个Primary和2个Replica Node,当查询的时候,从三个节点中根据Request中的preference参数选择一个节点查询。preference可以设置_local,_primary,_replica以及其他选项。如果选择了primary,则每次查询都是直接查询Primary,可以保证每次查询都是最新的。如果设置了其他参数,那么可能会查询到R1或者R2,这时候就有可能查询不到最新的数据。
PS: 上述代码逻辑在 OperationRouting.Java 的 searchShards 方法中。
接下来看一下,Elasticsearch中的查询是如何支持分布式的。
Elasticsearch中通过分区实现分布式,数据写入的时候根据_routing规则将数据写入某一个Shard中,这样就能将海量数据分布在多个Shard以及多台机器上,已达到分布式的目标。这样就导致了查询的时候,潜在数据会在当前index的所有的Shard中,所以Elasticsearch查询的时候需要查询所有Shard,同一个Shard的Primary和Replica选择一个即可,查询请求会分发给所有Shard,每个Shard中都是一个独立的查询引擎,比如需要返回Top 10的结果,那么每个Shard都会查询并且返回Top 10的结果,然后在Client Node里面会接收所有Shard的结果,然后通过优先级队列二次排序,选择出Top 10的结果返回给用户。
这里有一个问题就是请求膨胀,用户的一个搜索请求在Elasticsearch内部会变成Shard个请求,这里有个优化点,虽然是Shard个请求,但是这个Shard个数不一定要是当前Index中的Shard个数,只要是当前查询相关的Shard即可,这个需要基于业务和请求内容优化,通过这种方式可以优化请求膨胀数。
Elasticsearch中的查询主要分为两类,Get请求:通过ID查询特定Doc;Search请求:通过Query查询匹配Doc。
PS:上图中内存中的Segment是指刚Refresh Segment,但是还没持久化到磁盘的新Segment,而非从磁盘加载到内存中的Segment。
对于Search类请求,查询的时候是一起查询内存和磁盘上的Segment,最后将结果合并后返回。这种查询是近实时(Near Real Time)的,主要是由于内存中的Index数据需要一段时间后才会刷新为Segment。
对于Get类请求,查询的时候是先查询内存中的TransLog,如果找到就立即返回,如果没找到再查询磁盘上的TransLog,如果还没有则再去查询磁盘上的Segment。这种查询是实时(Real Time)的。这种查询顺序可以保证查询到的Doc是最新版本的Doc,这个功能也是为了保证NoSQL场景下的实时性要求。
所有的搜索系统一般都是两阶段查询,第一阶段查询到匹配的DocID,第二阶段再查询DocID对应的完整文档,这种在Elasticsearch中称为query_then_fetch,还有一种是一阶段查询的时候就返回完整Doc,在Elasticsearch中称作query_and_fetch,一般第二种适用于只需要查询一个Shard的请求。
除了一阶段,两阶段外,还有一种三阶段查询的情况。搜索里面有一种算分逻辑是根据TF(Term Frequency)和DF(Document Frequency)计算基础分,但是Elasticsearch中查询的时候,是在每个Shard中独立查询的,每个Shard中的TF和DF也是独立的,虽然在写入的时候通过_routing保证Doc分布均匀,但是没法保证TF和DF均匀,那么就有会导致局部的TF和DF不准的情况出现,这个时候基于TF、DF的算分就不准。为了解决这个问题,Elasticsearch中引入了DFS查询,比如DFS_query_then_fetch,会先收集所有Shard中的TF和DF值,然后将这些值带入请求中,再次执行query_then_fetch,这样算分的时候TF和DF就是准确的,类似的有DFS_query_and_fetch。这种查询的优势是算分更加精准,但是效率会变差。另一种选择是用BM25代替TF/DF模型。
在新版本Elasticsearch中,用户没法指定DFS_query_and_fetch和query_and_fetch,这两种只能被Elasticsearch系统改写。
Elasticsearch中的大部分查询,以及核心功能都是Search类型查询,上面我们了解到查询分为一阶段,二阶段和三阶段,这里我们就以最常见的的二阶段查询为例来介绍查询流程。
Client Node 也包括了前面说过的Parse Request,这里就不再赘述了,接下来看一下其他的部分。
1.Get Remove Cluster Shard
判断是否需要跨集群访问,如果需要,则获取到要访问的Shard列表。
2.Get Search Shard Iterator
获取当前Cluster中要访问的Shard,和上一步中的Remove Cluster Shard合并,构建出最终要访问的完整Shard列表。
这一步中,会根据Request请求中的参数从Primary Node和多个Replica Node中选择出一个要访问的Shard。
3.For Every Shard:Perform
遍历每个Shard,对每个Shard执行后面逻辑。
4.Send Request To Query Shard
将查询阶段请求发送给相应的Shard。
5.Merge Docs
上一步将请求发送给多个Shard后,这一步就是异步等待返回结果,然后对结果合并。这里的合并策略是维护一个Top N大小的优先级队列,每当收到一个shard的返回,就把结果放入优先级队列做一次排序,直到所有的Shard都返回。
翻页逻辑也是在这里,如果需要取Top 30~ Top 40的结果,这个的意思是所有Shard查询结果中的第30到40的结果,那么在每个Shard中无法确定最终的结果,每个Shard需要返回Top 40的结果给Client Node,然后Client Node中在merge docs的时候,计算出Top 40的结果,最后再去除掉Top 30,剩余的10个结果就是需要的Top 30~ Top 40的结果。
上述翻页逻辑有一个明显的缺点就是每次Shard返回的数据中包括了已经翻过的历史结果,如果翻页很深,则在这里需要排序的Docs会很多,比如Shard有1000,取第9990到10000的结果,那么这次查询,Shard总共需要返回1000 * 10000,也就是一千万Doc,这种情况很容易导致OOM。
另一种翻页方式是使用search_after,这种方式会更轻量级,如果每次只需要返回10条结构,则每个Shard只需要返回search_after之后的10个结果即可,返回的总数据量只是和Shard个数以及本次需要的个数有关,和历史已读取的个数无关。这种方式更安全一些,推荐使用这种。
如果有aggregate,也会在这里做聚合,但是不同的aggregate类型的merge策略不一样,具体的可以在后面的aggregate文章中再介绍。
6.Send Request To Fetch Shard
选出Top N个Doc ID后发送给这些Doc ID所在的Shard执行Fetch Phase,最后会返回Top N的Doc的内容。
Query Phase接下来我们看第一阶段查询的步骤:
1.Create Search Context
创建Search Context,之后Search过程中的所有中间状态都会存在Context中,这些状态总共有50多个,具体可以查看DefaultSearchContext或者其他SearchContext的子类。
2.Parse Query
解析Query的Source,将结果存入Search Context。这里会根据请求中Query类型的不同创建不同的Query对象,比如TermQuery、FuzzyQuery等,最终真正执行TermQuery、FuzzyQuery等语义的地方是在Lucene中。
这里包括了dfsPhase、queryPhase和fetchPhase三个阶段的preProcess部分,只有queryPhase的preProcess中有执行逻辑,其他两个都是空逻辑,执行完preProcess后,所有需要的参数都会设置完成。
由于Elasticsearch中有些请求之间是相互关联的,并非独立的,比如scroll请求,所以这里同时会设置Context的生命周期。
同时会设置lowLevelCancellation是否打开,这个参数是集群级别配置,同时也能动态开关,打开后会在后面执行时做更多的检测,检测是否需要停止后续逻辑直接返回。
3.Get From Cache
判断请求是否允许被Cache,如果允许,则检查Cache中是否已经有结果,如果有则直接读取Cache,如果没有则继续执行后续步骤,执行完后,再将结果加入Cache。
4.Add Collectors
Collector主要目标是收集查询结果,实现排序,对自定义结果集过滤和收集等。这一步会增加多个Collectors,多个Collector组成一个List。
5.lucene::search
这一步会调用Lucene中IndexSearch的search接口,执行真正的搜索逻辑。每个Shard中会有多个Segment,每个Segment对应一个LeafReaderContext,这里会遍历每个Segment,到每个Segment中去Search结果,然后计算分数。
搜索里面一般有两阶段算分,第一阶段是在这里算的,会对每个Seek到的Doc都计算分数,为了减少CPU消耗,一般是算一个基本分数。这一阶段完成后,会有个排序。然后在第二阶段,再对Top 的结果做一次二阶段算分,在二阶段算分的时候会考虑更多的因子。二阶段算分在后续操作中。
具体请求,比如TermQuery、WildcardQuery的查询逻辑都在Lucene中,后面会有专门文章介绍。
6.rescore
根据Request中是否包含rescore配置决定是否进行二阶段排序,如果有则执行二阶段算分逻辑,会考虑更多的算分因子。二阶段算分也是一种计算机中常见的多层设计,是一种资源消耗和效率的折中。
Elasticsearch中支持配置多个Rescore,这些rescore逻辑会顺序遍历执行。每个rescore内部会先按照请求参数window选择出Top window的doc,然后对这些doc排序,排完后再合并回原有的Top 结果顺序中。
7.suggest::execute()
如果有推荐请求,则在这里执行推荐请求。如果请求中只包含了推荐的部分,则很多地方可以优化。推荐不是今天的重点,这里就不介绍了,后面有机会再介绍。
8.aggregation::execute()
如果含有聚合统计请求,则在这里执行。Elasticsearch中的aggregate的处理逻辑也类似于Search,通过多个Collector来实现。在Client Node中也需要对aggregation做合并。aggregate逻辑更复杂一些,就不在这里赘述了,后面有需要就再单独开文章介绍。
上述逻辑都执行完成后,如果当前查询请求只需要查询一个Shard,那么会直接在当前Node执行Fetch Phase。
Fetch PhaseElasticsearch作为搜索系统时,或者任何搜索系统中,除了Query阶段外,还会有一个Fetch阶段,这个Fetch阶段在数据库类系统中是没有的,是搜索系统中额外增加的阶段。搜索系统中额外增加Fetch阶段的原因是搜索系统中数据分布导致的,在搜索中,数据通过routing分Shard的时候,只能根据一个主字段值来决定,但是查询的时候可能会根据其他非主字段查询,那么这个时候所有Shard中都可能会存在相同非主字段值的Doc,所以需要查询所有Shard才能不会出现结果遗漏。同时如果查询主字段,那么这个时候就能直接定位到Shard,就只需要查询特定Shard即可,这个时候就类似于数据库系统了。另外,数据库中的二级索引又是另外一种情况,但类似于查主字段的情况,这里就不多说了。
基于上述原因,第一阶段查询的时候并不知道最终结果会在哪个Shard上,所以每个Shard中管都需要查询完整结果,比如需要Top 10,那么每个Shard都需要查询当前Shard的所有数据,找出当前Shard的Top 10,然后返回给Client Node。如果有100个Shard,那么就需要返回100 * 10 = 1000个结果,而Fetch Doc内容的操作比较耗费IO和CPU,如果在第一阶段就Fetch Doc,那么这个资源开销就会非常大。所以,一般是当Client Node选择出最终Top N的结果后,再对最终的Top N读取Doc内容。通过增加一点网络开销而避免大量IO和CPU操作,这个折中是非常划算的。
Fetch阶段的目的是通过DocID获取到用户需要的完整Doc内容。这些内容包括了DocValues,Store,Source,Script和Highlight等,具体的功能点是在SearchModule中注册的,系统默认注册的有:
除了系统默认的8种外,还有通过插件的形式注册自定义的功能,这些SubPhase中最重要的是Source和Highlight,Source是加载原文,Highlight是计算高亮显示的内容片断。
上述多个SubPhase会针对每个Doc顺序执行,可能会产生多次的随机IO,这里会有一些优化方案,但是都是针对特定场景的,不具有通用性。
Fetch Phase执行完后,整个查询流程就结束了。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!